Commit f533db5c authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[Jobs]: Convert test launcher to using jobs.

Test launcher relies on the fact that TaskScheduler won't have more
concurrency than max_tasks to control |num_parallel_jobs|. The
TaskScheduler is allowed to run more tasks concurrently than
|max_tasks| when some of these tasks are blocked.
This CL converts Test launcher to using Jobs and ensures that
|runner_count_| concurrency is respected.

Bug: 905788
Change-Id: Ia090271e11b322140da1bf7e1526f4ba16bfb524
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2414556
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#812145}
parent c47ff7fb
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "base/files/file_path.h" #include "base/files/file_path.h"
#include "base/files/file_util.h" #include "base/files/file_util.h"
#include "base/files/scoped_file.h" #include "base/files/scoped_file.h"
#include "base/files/scoped_temp_dir.h"
#include "base/format_macros.h" #include "base/format_macros.h"
#include "base/hash/hash.h" #include "base/hash/hash.h"
#include "base/lazy_instance.h" #include "base/lazy_instance.h"
...@@ -41,6 +42,7 @@ ...@@ -41,6 +42,7 @@
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
#include "base/strings/utf_string_conversions.h" #include "base/strings/utf_string_conversions.h"
#include "base/system/sys_info.h" #include "base/system/sys_info.h"
#include "base/task/post_job.h"
#include "base/task/post_task.h" #include "base/task/post_task.h"
#include "base/task/thread_pool/thread_pool_instance.h" #include "base/task/thread_pool/thread_pool_instance.h"
#include "base/test/gtest_util.h" #include "base/test/gtest_util.h"
...@@ -637,15 +639,14 @@ std::vector<std::string> ExtractTestsFromFilter(const std::string& filter, ...@@ -637,15 +639,14 @@ std::vector<std::string> ExtractTestsFromFilter(const std::string& filter,
class TestRunner { class TestRunner {
public: public:
explicit TestRunner(TestLauncher* launcher, explicit TestRunner(TestLauncher* launcher,
size_t runner_count = 1u, size_t max_workers = 1u,
size_t batch_size = 1u) size_t batch_size = 1u)
: launcher_(launcher), : launcher_(launcher),
runner_count_(runner_count), max_workers_(max_workers),
batch_size_(batch_size) {} batch_size_(batch_size) {}
// Sets |test_names| to be run, with |batch_size| tests per process. // Sets |test_names| to be run, with |batch_size| tests per process.
// Posts LaunchNextTask |runner_count| number of times, each with a separate // Posts a job to run LaunchChildGTestProcess on |max_workers| workers.
// task runner.
void Run(const std::vector<std::string>& test_names); void Run(const std::vector<std::string>& test_names);
private: private:
...@@ -657,104 +658,131 @@ class TestRunner { ...@@ -657,104 +658,131 @@ class TestRunner {
test_names.front().find(kPreTestPrefix) != std::string::npos; test_names.front().find(kPreTestPrefix) != std::string::npos;
} }
// Launches the next child process on |task_runner| and clears bool IsSingleThreaded() const { return batch_size_ == 0; }
// |last_task_temp_dir| from the previous task.
void LaunchNextTask(scoped_refptr<TaskRunner> task_runner,
const FilePath& last_task_temp_dir);
// Forwards |last_task_temp_dir| and launches the next task on main thread. void WorkerTask(scoped_refptr<TaskRunner> main_task_runner,
// The method is called on |task_runner|. base::JobDelegate* delegate);
void ClearAndLaunchNext(scoped_refptr<TaskRunner> main_thread_runner, size_t GetMaxConcurrency(size_t worker_count) {
scoped_refptr<TaskRunner> task_runner, AutoLock auto_lock(lock_);
const FilePath& last_task_temp_dir) { if (IsSingleThreaded())
main_thread_runner->PostTask( return tests_to_run_.empty() ? 0 : 1;
FROM_HERE,
BindOnce(&TestRunner::LaunchNextTask, weak_ptr_factory_.GetWeakPtr(), // Round up the division to ensure enough workers for all tests.
task_runner, last_task_temp_dir)); return std::min((tests_to_run_.size() + batch_size_ - 1) / batch_size_,
max_workers_);
} }
// Cleans up |task_temp_dir| from a previous task and quits |run_loop| if
// |done|.
void CleanupTask(base::ScopedTempDir task_temp_dir, bool done);
ThreadChecker thread_checker_; ThreadChecker thread_checker_;
std::vector<std::string> tests_to_run_;
TestLauncher* const launcher_; TestLauncher* const launcher_;
std::vector<scoped_refptr<TaskRunner>> task_runners_; JobHandle job_handle_;
// Number of sequenced task runners to use. // Max number of workers to use.
const size_t runner_count_; const size_t max_workers_;
// Number of TaskRunners that have finished.
size_t runners_done_ = 0;
// Number of tests per process, 0 is special case for all tests. // Number of tests per process, 0 is special case for all tests.
const size_t batch_size_; const size_t batch_size_;
RunLoop run_loop_; RunLoop run_loop_;
// Protects member used concurrently by worker tasks.
base::Lock lock_;
std::vector<std::string> tests_to_run_ GUARDED_BY(lock_);
base::WeakPtrFactory<TestRunner> weak_ptr_factory_{this}; base::WeakPtrFactory<TestRunner> weak_ptr_factory_{this};
}; };
void TestRunner::Run(const std::vector<std::string>& test_names) { void TestRunner::Run(const std::vector<std::string>& test_names) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
// No sequence runners, fail immediately. // No workers, fail immediately.
CHECK_GT(runner_count_, 0u); CHECK_GT(max_workers_, 0u);
tests_to_run_ = test_names; if (test_names.empty())
// Reverse test order to avoid coping the whole vector when removing tests. return;
std::reverse(tests_to_run_.begin(), tests_to_run_.end());
runners_done_ = 0; {
task_runners_.clear(); AutoLock auto_lock(lock_);
for (size_t i = 0; i < runner_count_; i++) { tests_to_run_ = test_names;
task_runners_.push_back(ThreadPool::CreateSequencedTaskRunner( // Reverse test order to avoid coping the whole vector when removing tests.
{MayBlock(), TaskShutdownBehavior::BLOCK_SHUTDOWN})); std::reverse(tests_to_run_.begin(), tests_to_run_.end());
ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE,
BindOnce(&TestRunner::LaunchNextTask, weak_ptr_factory_.GetWeakPtr(),
task_runners_.back(), FilePath()));
} }
job_handle_ = base::PostJob(
FROM_HERE, {TaskPriority::USER_BLOCKING, MayBlock()},
BindRepeating(&TestRunner::WorkerTask, Unretained(this),
ThreadTaskRunnerHandle::Get()),
BindRepeating(&TestRunner::GetMaxConcurrency, Unretained(this)));
run_loop_.Run(); run_loop_.Run();
} }
void TestRunner::LaunchNextTask(scoped_refptr<TaskRunner> task_runner, void TestRunner::WorkerTask(scoped_refptr<TaskRunner> main_task_runner,
const FilePath& last_task_temp_dir) { base::JobDelegate* delegate) {
bool done = false;
while (!done && !delegate->ShouldYield()) {
// Create a temporary directory for this task. This directory will hold the
// flags and results files for the child processes as well as their User
// Data dir, where appropriate. For platforms that support per-child temp
// dirs, this directory will also contain one subdirectory per child for
// that child's process-wide temp dir.
base::ScopedTempDir task_temp_dir;
CHECK(task_temp_dir.CreateUniqueTempDir());
int child_index = 0;
bool reuse_state = true;
while (reuse_state) {
std::vector<std::string> batch;
{
AutoLock auto_lock(lock_);
size_t batch_size;
// Single threaded case runs all tests in one batch.
if (IsSingleThreaded())
batch_size = tests_to_run_.size();
// Run remaining tests up to |batch_size_|.
else
batch_size = std::min(batch_size_, tests_to_run_.size());
batch.assign(tests_to_run_.rbegin(),
tests_to_run_.rbegin() + batch_size);
tests_to_run_.erase(tests_to_run_.end() - batch_size,
tests_to_run_.end());
done = tests_to_run_.empty();
}
if (batch.empty())
break;
launcher_->LaunchChildGTestProcess(
main_task_runner, batch, task_temp_dir.GetPath(),
CreateChildTempDirIfSupported(task_temp_dir.GetPath(),
child_index++));
reuse_state = !done && ShouldReuseStateFromLastBatch(batch);
}
// Cleaning up test results is scheduled to |main_task_runner| because it
// must happen after all post processing step that was scheduled in
// LaunchChildGTestProcess to |main_task_runner|.
main_task_runner->PostTask(
FROM_HERE,
BindOnce(&TestRunner::CleanupTask, weak_ptr_factory_.GetWeakPtr(),
std::move(task_temp_dir), done));
}
}
void TestRunner::CleanupTask(base::ScopedTempDir task_temp_dir, bool done) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
// delete previous temporary directory // delete previous temporary directory
if (!last_task_temp_dir.empty() && if (!task_temp_dir.Delete()) {
!DeletePathRecursively(last_task_temp_dir)) {
// This needs to be non-fatal at least for Windows. // This needs to be non-fatal at least for Windows.
LOG(WARNING) << "Failed to delete " << last_task_temp_dir.AsUTF8Unsafe(); LOG(WARNING) << "Failed to delete "
<< task_temp_dir.GetPath().AsUTF8Unsafe();
} }
// No more tests to run, finish sequence. if (!done)
if (tests_to_run_.empty()) {
runners_done_++;
// All sequence runners are done, quit the loop.
if (runners_done_ == runner_count_)
run_loop_.QuitWhenIdle();
return; return;
}
// Create a temporary directory for this task. This directory will hold the if (job_handle_) {
// flags and results files for the child processes as well as their User Data job_handle_.Cancel();
// dir, where appropriate. For platforms that support per-child temp dirs, run_loop_.QuitWhenIdle();
// this directory will also contain one subdirectory per child for that
// child's process-wide temp dir.
base::FilePath task_temp_dir;
CHECK(CreateNewTempDirectory(FilePath::StringType(), &task_temp_dir));
bool post_to_current_runner = true;
size_t batch_size = (batch_size_ == 0) ? tests_to_run_.size() : batch_size_;
int child_index = 0;
while (post_to_current_runner && !tests_to_run_.empty()) {
batch_size = std::min(batch_size, tests_to_run_.size());
std::vector<std::string> batch(tests_to_run_.rbegin(),
tests_to_run_.rbegin() + batch_size);
tests_to_run_.erase(tests_to_run_.end() - batch_size, tests_to_run_.end());
task_runner->PostTask(
FROM_HERE,
BindOnce(&TestLauncher::LaunchChildGTestProcess, Unretained(launcher_),
ThreadTaskRunnerHandle::Get(), batch, task_temp_dir,
CreateChildTempDirIfSupported(task_temp_dir, child_index++)));
post_to_current_runner = ShouldReuseStateFromLastBatch(batch);
} }
task_runner->PostTask(
FROM_HERE,
BindOnce(&TestRunner::ClearAndLaunchNext, Unretained(this),
ThreadTaskRunnerHandle::Get(), task_runner, task_temp_dir));
} }
// Returns the number of files and directories in |dir|, or 0 if |dir| is empty. // Returns the number of files and directories in |dir|, or 0 if |dir| is empty.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment