Commit 1035e3e0 authored by Greg Thompson's avatar Greg Thompson Committed by Commit Bot

Revert "[Jobs]: Convert test launcher to using jobs."

This reverts commit f533db5c.

Reason for revert: Breaks sequential running of PRE_ tests. See, for example, ChromeTracingDelegateBrowserTestOnStartup.StartupTracingThrottle failures in https://ci.chromium.org/p/chromium/builders/ci/Linux%20ASan%20LSan%20Tests%20(1)/82236. Each PRE_* test must run to completion before the next one is started.

Original change's description:
> [Jobs]: Convert test launcher to using jobs.
>
> Test launcher relies on the fact that TaskScheduler won't have more
> concurrency than max_tasks to control |num_parallel_jobs|. The
> TaskScheduler is allowed to run more tasks concurrently than
> |max_tasks| when some of these tasks are blocked.
> This CL converts Test launcher to using Jobs and ensures that
> |runner_count_| concurrency is respected.
>
> Bug: 905788
> Change-Id: Ia090271e11b322140da1bf7e1526f4ba16bfb524
> Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2414556
> Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
> Reviewed-by: François Doray <fdoray@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#812145}

TBR=fdoray@chromium.org,etiennep@chromium.org

# Not skipping CQ checks because original CL landed > 1 day ago.

Bug: 905788
Change-Id: I82463e729c7be1bd7dc5d73ba90b1715f7d9bba8
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2445013Reviewed-by: default avatarGreg Thompson <grt@chromium.org>
Commit-Queue: Greg Thompson <grt@chromium.org>
Cr-Commit-Position: refs/heads/master@{#813138}
parent 8c6daa14
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "base/files/file_path.h" #include "base/files/file_path.h"
#include "base/files/file_util.h" #include "base/files/file_util.h"
#include "base/files/scoped_file.h" #include "base/files/scoped_file.h"
#include "base/files/scoped_temp_dir.h"
#include "base/format_macros.h" #include "base/format_macros.h"
#include "base/hash/hash.h" #include "base/hash/hash.h"
#include "base/lazy_instance.h" #include "base/lazy_instance.h"
...@@ -42,7 +41,6 @@ ...@@ -42,7 +41,6 @@
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
#include "base/strings/utf_string_conversions.h" #include "base/strings/utf_string_conversions.h"
#include "base/system/sys_info.h" #include "base/system/sys_info.h"
#include "base/task/post_job.h"
#include "base/task/post_task.h" #include "base/task/post_task.h"
#include "base/task/thread_pool/thread_pool_instance.h" #include "base/task/thread_pool/thread_pool_instance.h"
#include "base/test/gtest_util.h" #include "base/test/gtest_util.h"
...@@ -639,14 +637,15 @@ std::vector<std::string> ExtractTestsFromFilter(const std::string& filter, ...@@ -639,14 +637,15 @@ std::vector<std::string> ExtractTestsFromFilter(const std::string& filter,
class TestRunner { class TestRunner {
public: public:
explicit TestRunner(TestLauncher* launcher, explicit TestRunner(TestLauncher* launcher,
size_t max_workers = 1u, size_t runner_count = 1u,
size_t batch_size = 1u) size_t batch_size = 1u)
: launcher_(launcher), : launcher_(launcher),
max_workers_(max_workers), runner_count_(runner_count),
batch_size_(batch_size) {} batch_size_(batch_size) {}
// Sets |test_names| to be run, with |batch_size| tests per process. // Sets |test_names| to be run, with |batch_size| tests per process.
// Posts a job to run LaunchChildGTestProcess on |max_workers| workers. // Posts LaunchNextTask |runner_count| number of times, each with a separate
// task runner.
void Run(const std::vector<std::string>& test_names); void Run(const std::vector<std::string>& test_names);
private: private:
...@@ -658,131 +657,104 @@ class TestRunner { ...@@ -658,131 +657,104 @@ class TestRunner {
test_names.front().find(kPreTestPrefix) != std::string::npos; test_names.front().find(kPreTestPrefix) != std::string::npos;
} }
bool IsSingleThreaded() const { return batch_size_ == 0; } // Launches the next child process on |task_runner| and clears
// |last_task_temp_dir| from the previous task.
void LaunchNextTask(scoped_refptr<TaskRunner> task_runner,
const FilePath& last_task_temp_dir);
void WorkerTask(scoped_refptr<TaskRunner> main_task_runner, // Forwards |last_task_temp_dir| and launches the next task on main thread.
base::JobDelegate* delegate); // The method is called on |task_runner|.
size_t GetMaxConcurrency(size_t worker_count) { void ClearAndLaunchNext(scoped_refptr<TaskRunner> main_thread_runner,
AutoLock auto_lock(lock_); scoped_refptr<TaskRunner> task_runner,
if (IsSingleThreaded()) const FilePath& last_task_temp_dir) {
return tests_to_run_.empty() ? 0 : 1; main_thread_runner->PostTask(
FROM_HERE,
// Round up the division to ensure enough workers for all tests. BindOnce(&TestRunner::LaunchNextTask, weak_ptr_factory_.GetWeakPtr(),
return std::min((tests_to_run_.size() + batch_size_ - 1) / batch_size_, task_runner, last_task_temp_dir));
max_workers_);
} }
// Cleans up |task_temp_dir| from a previous task and quits |run_loop| if
// |done|.
void CleanupTask(base::ScopedTempDir task_temp_dir, bool done);
ThreadChecker thread_checker_; ThreadChecker thread_checker_;
std::vector<std::string> tests_to_run_;
TestLauncher* const launcher_; TestLauncher* const launcher_;
JobHandle job_handle_; std::vector<scoped_refptr<TaskRunner>> task_runners_;
// Max number of workers to use. // Number of sequenced task runners to use.
const size_t max_workers_; const size_t runner_count_;
// Number of TaskRunners that have finished.
size_t runners_done_ = 0;
// Number of tests per process, 0 is special case for all tests. // Number of tests per process, 0 is special case for all tests.
const size_t batch_size_; const size_t batch_size_;
RunLoop run_loop_; RunLoop run_loop_;
// Protects member used concurrently by worker tasks.
base::Lock lock_;
std::vector<std::string> tests_to_run_ GUARDED_BY(lock_);
base::WeakPtrFactory<TestRunner> weak_ptr_factory_{this}; base::WeakPtrFactory<TestRunner> weak_ptr_factory_{this};
}; };
void TestRunner::Run(const std::vector<std::string>& test_names) { void TestRunner::Run(const std::vector<std::string>& test_names) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
// No workers, fail immediately. // No sequence runners, fail immediately.
CHECK_GT(max_workers_, 0u); CHECK_GT(runner_count_, 0u);
if (test_names.empty()) tests_to_run_ = test_names;
return; // Reverse test order to avoid coping the whole vector when removing tests.
std::reverse(tests_to_run_.begin(), tests_to_run_.end());
{ runners_done_ = 0;
AutoLock auto_lock(lock_); task_runners_.clear();
tests_to_run_ = test_names; for (size_t i = 0; i < runner_count_; i++) {
// Reverse test order to avoid coping the whole vector when removing tests. task_runners_.push_back(ThreadPool::CreateSequencedTaskRunner(
std::reverse(tests_to_run_.begin(), tests_to_run_.end()); {MayBlock(), TaskShutdownBehavior::BLOCK_SHUTDOWN}));
} ThreadTaskRunnerHandle::Get()->PostTask(
job_handle_ = base::PostJob(
FROM_HERE, {TaskPriority::USER_BLOCKING, MayBlock()},
BindRepeating(&TestRunner::WorkerTask, Unretained(this),
ThreadTaskRunnerHandle::Get()),
BindRepeating(&TestRunner::GetMaxConcurrency, Unretained(this)));
run_loop_.Run();
}
void TestRunner::WorkerTask(scoped_refptr<TaskRunner> main_task_runner,
base::JobDelegate* delegate) {
bool done = false;
while (!done && !delegate->ShouldYield()) {
// Create a temporary directory for this task. This directory will hold the
// flags and results files for the child processes as well as their User
// Data dir, where appropriate. For platforms that support per-child temp
// dirs, this directory will also contain one subdirectory per child for
// that child's process-wide temp dir.
base::ScopedTempDir task_temp_dir;
CHECK(task_temp_dir.CreateUniqueTempDir());
int child_index = 0;
bool reuse_state = true;
while (reuse_state) {
std::vector<std::string> batch;
{
AutoLock auto_lock(lock_);
size_t batch_size;
// Single threaded case runs all tests in one batch.
if (IsSingleThreaded())
batch_size = tests_to_run_.size();
// Run remaining tests up to |batch_size_|.
else
batch_size = std::min(batch_size_, tests_to_run_.size());
batch.assign(tests_to_run_.rbegin(),
tests_to_run_.rbegin() + batch_size);
tests_to_run_.erase(tests_to_run_.end() - batch_size,
tests_to_run_.end());
done = tests_to_run_.empty();
}
if (batch.empty())
break;
launcher_->LaunchChildGTestProcess(
main_task_runner, batch, task_temp_dir.GetPath(),
CreateChildTempDirIfSupported(task_temp_dir.GetPath(),
child_index++));
reuse_state = !done && ShouldReuseStateFromLastBatch(batch);
}
// Cleaning up test results is scheduled to |main_task_runner| because it
// must happen after all post processing step that was scheduled in
// LaunchChildGTestProcess to |main_task_runner|.
main_task_runner->PostTask(
FROM_HERE, FROM_HERE,
BindOnce(&TestRunner::CleanupTask, weak_ptr_factory_.GetWeakPtr(), BindOnce(&TestRunner::LaunchNextTask, weak_ptr_factory_.GetWeakPtr(),
std::move(task_temp_dir), done)); task_runners_.back(), FilePath()));
} }
run_loop_.Run();
} }
void TestRunner::CleanupTask(base::ScopedTempDir task_temp_dir, bool done) { void TestRunner::LaunchNextTask(scoped_refptr<TaskRunner> task_runner,
const FilePath& last_task_temp_dir) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
// delete previous temporary directory // delete previous temporary directory
if (!task_temp_dir.Delete()) { if (!last_task_temp_dir.empty() &&
!DeletePathRecursively(last_task_temp_dir)) {
// This needs to be non-fatal at least for Windows. // This needs to be non-fatal at least for Windows.
LOG(WARNING) << "Failed to delete " LOG(WARNING) << "Failed to delete " << last_task_temp_dir.AsUTF8Unsafe();
<< task_temp_dir.GetPath().AsUTF8Unsafe();
} }
if (!done) // No more tests to run, finish sequence.
if (tests_to_run_.empty()) {
runners_done_++;
// All sequence runners are done, quit the loop.
if (runners_done_ == runner_count_)
run_loop_.QuitWhenIdle();
return; return;
}
if (job_handle_) { // Create a temporary directory for this task. This directory will hold the
job_handle_.Cancel(); // flags and results files for the child processes as well as their User Data
run_loop_.QuitWhenIdle(); // dir, where appropriate. For platforms that support per-child temp dirs,
// this directory will also contain one subdirectory per child for that
// child's process-wide temp dir.
base::FilePath task_temp_dir;
CHECK(CreateNewTempDirectory(FilePath::StringType(), &task_temp_dir));
bool post_to_current_runner = true;
size_t batch_size = (batch_size_ == 0) ? tests_to_run_.size() : batch_size_;
int child_index = 0;
while (post_to_current_runner && !tests_to_run_.empty()) {
batch_size = std::min(batch_size, tests_to_run_.size());
std::vector<std::string> batch(tests_to_run_.rbegin(),
tests_to_run_.rbegin() + batch_size);
tests_to_run_.erase(tests_to_run_.end() - batch_size, tests_to_run_.end());
task_runner->PostTask(
FROM_HERE,
BindOnce(&TestLauncher::LaunchChildGTestProcess, Unretained(launcher_),
ThreadTaskRunnerHandle::Get(), batch, task_temp_dir,
CreateChildTempDirIfSupported(task_temp_dir, child_index++)));
post_to_current_runner = ShouldReuseStateFromLastBatch(batch);
} }
task_runner->PostTask(
FROM_HERE,
BindOnce(&TestRunner::ClearAndLaunchNext, Unretained(this),
ThreadTaskRunnerHandle::Get(), task_runner, task_temp_dir));
} }
// Returns the number of files and directories in |dir|, or 0 if |dir| is empty. // Returns the number of files and directories in |dir|, or 0 if |dir| is empty.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment