Commit cf418066 authored by Jesse McKenna's avatar Jesse McKenna Committed by Commit Bot

TaskScheduler: Delegate insertion of Sequences by...

TaskScheduler: Delegate insertion of Sequences by Scheduler(Parallel|Sequenced)TaskRunner to TaskScheduler

Bug: 889029
Change-Id: I06e871a56204ae64b14fe75e58e77ba4a757e5ac
Reviewed-on: https://chromium-review.googlesource.com/c/1294471
Commit-Queue: Jesse McKenna <jessemckenna@google.com>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#603147}
parent a6a3384f
......@@ -794,8 +794,14 @@ jumbo_component("base") {
"task/task_scheduler/scheduler_lock.h",
"task/task_scheduler/scheduler_lock_impl.cc",
"task/task_scheduler/scheduler_lock_impl.h",
"task/task_scheduler/scheduler_parallel_task_runner.cc",
"task/task_scheduler/scheduler_parallel_task_runner.h",
"task/task_scheduler/scheduler_sequenced_task_runner.cc",
"task/task_scheduler/scheduler_sequenced_task_runner.h",
"task/task_scheduler/scheduler_single_thread_task_runner_manager.cc",
"task/task_scheduler/scheduler_single_thread_task_runner_manager.h",
"task/task_scheduler/scheduler_task_runner_delegate.cc",
"task/task_scheduler/scheduler_task_runner_delegate.h",
"task/task_scheduler/scheduler_worker.cc",
"task/task_scheduler/scheduler_worker.h",
"task/task_scheduler/scheduler_worker_observer.h",
......
......@@ -11,10 +11,8 @@ namespace internal {
PlatformNativeWorkerPoolWin::PlatformNativeWorkerPoolWin(
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate)
: SchedulerWorkerPool(std::move(task_tracker),
delayed_task_manager,
std::move(delegate)) {}
PlatformNativeWorkerPoolWin::~PlatformNativeWorkerPoolWin() {
......
......@@ -30,7 +30,6 @@ namespace internal {
class BASE_EXPORT PlatformNativeWorkerPoolWin : public SchedulerWorkerPool {
public:
PlatformNativeWorkerPoolWin(TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate);
// Destroying a PlatformNativeWorkerPoolWin is not allowed in
......
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/task_scheduler/scheduler_parallel_task_runner.h"
#include "base/task/task_scheduler/sequence.h"
namespace base {
namespace internal {
SchedulerParallelTaskRunner::SchedulerParallelTaskRunner(
const TaskTraits& traits,
SchedulerTaskRunnerDelegate* scheduler_task_runner_delegate)
: traits_(traits),
scheduler_task_runner_delegate_(scheduler_task_runner_delegate) {}
SchedulerParallelTaskRunner::~SchedulerParallelTaskRunner() = default;
bool SchedulerParallelTaskRunner::PostDelayedTask(const Location& from_here,
OnceClosure closure,
TimeDelta delay) {
if (!SchedulerTaskRunnerDelegate::Exists())
return false;
// Post the task as part of a one-off single-task Sequence.
return scheduler_task_runner_delegate_->PostTaskWithSequence(
Task(from_here, std::move(closure), delay),
MakeRefCounted<Sequence>(traits_));
}
bool SchedulerParallelTaskRunner::RunsTasksInCurrentSequence() const {
return scheduler_task_runner_delegate_->IsRunningPoolWithTraits(traits_);
}
} // namespace internal
} // namespace base
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_TASK_SCHEDULER_SCHEDULER_PARALLEL_TASK_RUNNER_H_
#define BASE_TASK_TASK_SCHEDULER_SCHEDULER_PARALLEL_TASK_RUNNER_H_
#include "base/base_export.h"
#include "base/callback_forward.h"
#include "base/location.h"
#include "base/task/task_scheduler/scheduler_task_runner_delegate.h"
#include "base/task/task_traits.h"
#include "base/task_runner.h"
#include "base/time/time.h"
namespace base {
namespace internal {
// A task runner that runs tasks in parallel.
class BASE_EXPORT SchedulerParallelTaskRunner : public TaskRunner {
public:
// Constructs a SchedulerParallelTaskRunner which can be used to post tasks.
SchedulerParallelTaskRunner(
const TaskTraits& traits,
SchedulerTaskRunnerDelegate* scheduler_task_runner_delegate);
// TaskRunner:
bool PostDelayedTask(const Location& from_here,
OnceClosure closure,
TimeDelta delay) override;
bool RunsTasksInCurrentSequence() const override;
private:
~SchedulerParallelTaskRunner() override;
const TaskTraits traits_;
SchedulerTaskRunnerDelegate* const scheduler_task_runner_delegate_;
DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_TASK_SCHEDULER_SCHEDULER_PARALLEL_TASK_RUNNER_H_
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/task_scheduler/scheduler_sequenced_task_runner.h"
#include "base/sequence_token.h"
namespace base {
namespace internal {
SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner(
const TaskTraits& traits,
SchedulerTaskRunnerDelegate* scheduler_task_runner_delegate)
: traits_(traits),
scheduler_task_runner_delegate_(scheduler_task_runner_delegate),
sequence_(MakeRefCounted<Sequence>(traits)) {}
SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default;
bool SchedulerSequencedTaskRunner::PostDelayedTask(const Location& from_here,
OnceClosure closure,
TimeDelta delay) {
if (!SchedulerTaskRunnerDelegate::Exists())
return false;
Task task(from_here, std::move(closure), delay);
task.sequenced_task_runner_ref = this;
// Post the task as part of |sequence_|.
return scheduler_task_runner_delegate_->PostTaskWithSequence(std::move(task),
sequence_);
}
bool SchedulerSequencedTaskRunner::PostNonNestableDelayedTask(
const Location& from_here,
OnceClosure closure,
TimeDelta delay) {
// Tasks are never nested within the task scheduler.
return PostDelayedTask(from_here, std::move(closure), delay);
}
bool SchedulerSequencedTaskRunner::RunsTasksInCurrentSequence() const {
return sequence_->token() == SequenceToken::GetForCurrentThread();
}
} // namespace internal
} // namespace base
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_TASK_SCHEDULER_SCHEDULER_SEQUENCED_TASK_RUNNER_H_
#define BASE_TASK_TASK_SCHEDULER_SCHEDULER_SEQUENCED_TASK_RUNNER_H_
#include "base/base_export.h"
#include "base/callback_forward.h"
#include "base/location.h"
#include "base/task/task_scheduler/scheduler_task_runner_delegate.h"
#include "base/task/task_scheduler/sequence.h"
#include "base/task/task_traits.h"
#include "base/task_runner.h"
#include "base/time/time.h"
namespace base {
namespace internal {
// A task runner that runs tasks in sequence.
class BASE_EXPORT SchedulerSequencedTaskRunner : public SequencedTaskRunner {
public:
// Constructs a SchedulerSequencedTaskRunner which can be used to post tasks.
SchedulerSequencedTaskRunner(
const TaskTraits& traits,
SchedulerTaskRunnerDelegate* scheduler_task_runner_delegate);
// SequencedTaskRunner:
bool PostDelayedTask(const Location& from_here,
OnceClosure closure,
TimeDelta delay) override;
bool PostNonNestableDelayedTask(const Location& from_here,
OnceClosure closure,
TimeDelta delay) override;
bool RunsTasksInCurrentSequence() const override;
private:
~SchedulerSequencedTaskRunner() override;
const TaskTraits traits_;
SchedulerTaskRunnerDelegate* const scheduler_task_runner_delegate_;
// Sequence for all Tasks posted through this TaskRunner.
const scoped_refptr<Sequence> sequence_;
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_TASK_SCHEDULER_SCHEDULER_SEQUENCED_TASK_RUNNER_H_
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/task_scheduler/scheduler_task_runner_delegate.h"
namespace base {
namespace internal {
namespace {
// Indicates whether a SchedulerTaskRunnerDelegate instance exists in the
// process. Used to tell when a task is posted from the main thread after the
// task environment was brought down in unit tests so that TaskRunners can
// return false on PostTask, letting callers know they should complete
// necessary work synchronously. A SchedulerTaskRunnerDelegate is usually
// instantiated before worker threads are started and deleted after worker
// threads have been joined. This makes the variable const while worker threads
// are up and as such it doesn't need to be atomic.
bool g_exists = false;
} // namespace
SchedulerTaskRunnerDelegate::SchedulerTaskRunnerDelegate() {
DCHECK(!g_exists);
g_exists = true;
}
SchedulerTaskRunnerDelegate::~SchedulerTaskRunnerDelegate() {
DCHECK(g_exists);
g_exists = false;
}
// static
bool SchedulerTaskRunnerDelegate::Exists() {
return g_exists;
}
} // namespace internal
} // namespace base
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_TASK_SCHEDULER_SCHEDULER_TASK_RUNNER_DELEGATE_H_
#define BASE_TASK_TASK_SCHEDULER_SCHEDULER_TASK_RUNNER_DELEGATE_H_
#include "base/base_export.h"
#include "base/task/task_scheduler/sequence.h"
#include "base/task/task_scheduler/task.h"
#include "base/task/task_traits.h"
namespace base {
namespace internal {
// Delegate interface for SchedulerParallelTaskRunner and
// SchedulerSequencedTaskRunner.
class BASE_EXPORT SchedulerTaskRunnerDelegate {
public:
SchedulerTaskRunnerDelegate();
virtual ~SchedulerTaskRunnerDelegate();
// Returns true if a SchedulerTaskRunnerDelegate instance exists in the
// process. This is needed in case of unit tests wherein a TaskRunner
// outlives the TaskScheduler that created it.
static bool Exists();
// Invoked when a |task| is posted to the SchedulerParallelTaskRunner or
// SchedulerSequencedTaskRunner. The implementation must post |task| to
// |sequence| within the appropriate priority queue, depending on |sequence|
// traits. Returns true if task was successfully posted.
virtual bool PostTaskWithSequence(Task task,
scoped_refptr<Sequence> sequence) = 0;
// Invoked when RunsTasksInCurrentSequence() is called on a
// SchedulerParallelTaskRunner. Returns true if the worker pool used by the
// SchedulerParallelTaskRunner (as determined by |traits|) is running on
// this thread.
virtual bool IsRunningPoolWithTraits(const TaskTraits& traits) const = 0;
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_TASK_SCHEDULER_SCHEDULER_TASK_RUNNER_DELEGATE_H_
......@@ -7,7 +7,6 @@
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
#include "base/task/task_scheduler/delayed_task_manager.h"
#include "base/task/task_scheduler/task_tracker.h"
#include "base/threading/thread_local.h"
......@@ -16,21 +15,6 @@ namespace internal {
namespace {
// The number of SchedulerWorkerPool that are alive in this process. This
// variable should only be incremented when the SchedulerWorkerPool instances
// are brought up (on the main thread; before any tasks are posted) and
// decremented when the same instances are brought down (i.e., only when unit
// tests tear down the task environment and never in production). This makes the
// variable const while worker threads are up and as such it doesn't need to be
// atomic. It is used to tell when a task is posted from the main thread after
// the task environment was brought down in unit tests so that
// SchedulerWorkerPool bound TaskRunners can return false on PostTask, letting
// such callers know they should complete necessary work synchronously. Note:
// |!g_active_pools_count| is generally equivalent to
// |!TaskScheduler::GetInstance()| but has the advantage of being valid in
// task_scheduler unit tests that don't instantiate a full TaskScheduler.
int g_active_pools_count = 0;
// SchedulerWorkerPool that owns the current thread, if any.
LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky
tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER;
......@@ -41,148 +25,15 @@ const SchedulerWorkerPool* GetCurrentWorkerPool() {
} // namespace
// A task runner that runs tasks in parallel.
class SchedulerParallelTaskRunner : public TaskRunner {
public:
// Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
// long as |worker_pool| is alive.
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerParallelTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
: traits_(traits), worker_pool_(worker_pool) {
DCHECK(worker_pool_);
}
// TaskRunner:
bool PostDelayedTask(const Location& from_here,
OnceClosure closure,
TimeDelta delay) override {
if (!g_active_pools_count)
return false;
// Post the task as part of a one-off single-task Sequence.
return worker_pool_->PostTaskWithSequence(
Task(from_here, std::move(closure), delay),
MakeRefCounted<Sequence>(traits_));
}
bool RunsTasksInCurrentSequence() const override {
return GetCurrentWorkerPool() == worker_pool_;
}
private:
~SchedulerParallelTaskRunner() override = default;
const TaskTraits traits_;
SchedulerWorkerPool* const worker_pool_;
DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
};
// A task runner that runs tasks in sequence.
class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
public:
// Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
// so long as |worker_pool| is alive.
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerSequencedTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
: sequence_(MakeRefCounted<Sequence>(traits)), worker_pool_(worker_pool) {
DCHECK(worker_pool_);
}
// SequencedTaskRunner:
bool PostDelayedTask(const Location& from_here,
OnceClosure closure,
TimeDelta delay) override {
if (!g_active_pools_count)
return false;
Task task(from_here, std::move(closure), delay);
task.sequenced_task_runner_ref = this;
// Post the task as part of |sequence_|.
return worker_pool_->PostTaskWithSequence(std::move(task), sequence_);
}
bool PostNonNestableDelayedTask(const Location& from_here,
OnceClosure closure,
base::TimeDelta delay) override {
// Tasks are never nested within the task scheduler.
return PostDelayedTask(from_here, std::move(closure), delay);
}
bool RunsTasksInCurrentSequence() const override {
return sequence_->token() == SequenceToken::GetForCurrentThread();
}
private:
~SchedulerSequencedTaskRunner() override = default;
// Sequence for all Tasks posted through this TaskRunner.
const scoped_refptr<Sequence> sequence_;
SchedulerWorkerPool* const worker_pool_;
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
};
scoped_refptr<TaskRunner> SchedulerWorkerPool::CreateTaskRunnerWithTraits(
const TaskTraits& traits) {
return MakeRefCounted<SchedulerParallelTaskRunner>(traits, this);
}
scoped_refptr<SequencedTaskRunner>
SchedulerWorkerPool::CreateSequencedTaskRunnerWithTraits(
const TaskTraits& traits) {
return MakeRefCounted<SchedulerSequencedTaskRunner>(traits, this);
}
bool SchedulerWorkerPool::PostTaskWithSequence(
Task task,
scoped_refptr<Sequence> sequence) {
DCHECK(task.task);
DCHECK(sequence);
if (!task_tracker_->WillPostTask(&task,
sequence->traits().shutdown_behavior())) {
return false;
}
if (task.delayed_run_time.is_null()) {
PostTaskWithSequenceNow(std::move(task), std::move(sequence));
} else {
// Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
// for details.
CHECK(task.task);
delayed_task_manager_->AddDelayedTask(
std::move(task), BindOnce(
[](scoped_refptr<Sequence> sequence,
SchedulerWorkerPool* worker_pool, Task task) {
worker_pool->PostTaskWithSequenceNow(
std::move(task), std::move(sequence));
},
std::move(sequence), Unretained(this)));
}
return true;
}
SchedulerWorkerPool::SchedulerWorkerPool(
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate)
: task_tracker_(std::move(task_tracker)),
delayed_task_manager_(delayed_task_manager),
delegate_(std::move(delegate)) {
DCHECK(task_tracker_);
DCHECK(delayed_task_manager_);
++g_active_pools_count;
}
SchedulerWorkerPool::~SchedulerWorkerPool() {
--g_active_pools_count;
DCHECK_GE(g_active_pools_count, 0);
}
SchedulerWorkerPool::~SchedulerWorkerPool() = default;
void SchedulerWorkerPool::BindToCurrentThread() {
DCHECK(!GetCurrentWorkerPool());
......
......@@ -7,18 +7,14 @@
#include "base/base_export.h"
#include "base/memory/ref_counted.h"
#include "base/sequenced_task_runner.h"
#include "base/task/task_scheduler/can_schedule_sequence_observer.h"
#include "base/task/task_scheduler/sequence.h"
#include "base/task/task_scheduler/task.h"
#include "base/task/task_scheduler/tracked_ref.h"
#include "base/task/task_traits.h"
#include "base/task_runner.h"
namespace base {
namespace internal {
class DelayedTaskManager;
class TaskTracker;
// Interface for a worker pool.
......@@ -37,22 +33,11 @@ class BASE_EXPORT SchedulerWorkerPool : public CanScheduleSequenceObserver {
~SchedulerWorkerPool() override;
// Returns a TaskRunner whose PostTask invocations result in scheduling tasks
// in this SchedulerWorkerPool using |traits|. Tasks may run in any order and
// in parallel.
scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits(
const TaskTraits& traits);
// Returns a SequencedTaskRunner whose PostTask invocations result in
// scheduling tasks in this SchedulerWorkerPool using |traits|. Tasks run one
// at a time in posting order.
scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunnerWithTraits(
const TaskTraits& traits);
// Posts |task| to be executed by this SchedulerWorkerPool as part of
// |sequence|. |task| won't be executed before its delayed run time, if any.
// Returns true if |task| is posted.
bool PostTaskWithSequence(Task task, scoped_refptr<Sequence> sequence);
// |sequence|. This must only be called after |task| has gone through
// TaskTracker::WillPostTask() and after |task|'s delayed run
// time.
void PostTaskWithSequenceNow(Task task, scoped_refptr<Sequence> sequence);
// Registers the worker pool in TLS.
void BindToCurrentThread();
......@@ -78,16 +63,9 @@ class BASE_EXPORT SchedulerWorkerPool : public CanScheduleSequenceObserver {
protected:
SchedulerWorkerPool(TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate);
// Posts |task| to be executed by this SchedulerWorkerPool as part of
// |sequence|. This must only be called after |task| has gone through
// PostTaskWithSequence() and after |task|'s delayed run time.
void PostTaskWithSequenceNow(Task task, scoped_refptr<Sequence> sequence);
const TrackedRef<TaskTracker> task_tracker_;
DelayedTaskManager* const delayed_task_manager_;
const TrackedRef<Delegate> delegate_;
private:
......
......@@ -159,10 +159,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
StringPiece pool_label,
ThreadPriority priority_hint,
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate)
: SchedulerWorkerPool(std::move(task_tracker),
delayed_task_manager,
std::move(delegate)),
pool_label_(pool_label.as_string()),
priority_hint_(priority_hint),
......
......@@ -40,7 +40,6 @@ class SchedulerWorkerPoolParams;
namespace internal {
class DelayedTaskManager;
class TaskTracker;
// A pool of workers that run Tasks.
......@@ -67,13 +66,11 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// be empty. |pool_label| is used to label the pool's threads, it must not be
// empty. |priority_hint| is the preferred thread priority; the actual thread
// priority depends on shutdown state and platform capabilities.
// |task_tracker| keeps track of tasks. |delayed_task_manager| handles tasks
// posted with a delay.
// |task_tracker| keeps track of tasks.
SchedulerWorkerPoolImpl(StringPiece histogram_label,
StringPiece pool_label,
ThreadPriority priority_hint,
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate);
// Creates workers following the |params| specification, allowing existing and
......
......@@ -60,17 +60,16 @@ class ThreadPostingTasks : public SimpleThread {
// |worker_pool| through an |execution_mode| task runner. If
// |post_nested_task| is YES, each task posted by this thread posts another
// task when it runs.
ThreadPostingTasks(SchedulerWorkerPool* worker_pool,
ThreadPostingTasks(test::MockSchedulerTaskRunnerDelegate*
mock_scheduler_task_runner_delegate_,
test::ExecutionMode execution_mode,
PostNestedTask post_nested_task)
: SimpleThread("ThreadPostingTasks"),
worker_pool_(worker_pool),
post_nested_task_(post_nested_task),
factory_(test::CreateTaskRunnerWithExecutionMode(worker_pool,
execution_mode),
execution_mode) {
DCHECK(worker_pool_);
}
factory_(test::CreateTaskRunnerWithExecutionMode(
execution_mode,
mock_scheduler_task_runner_delegate_),
execution_mode) {}
const test::TestTaskFactory* factory() const { return &factory_; }
......@@ -82,7 +81,6 @@ class ThreadPostingTasks : public SimpleThread {
EXPECT_TRUE(factory_.PostTask(post_nested_task_, Closure()));
}
SchedulerWorkerPool* const worker_pool_;
const scoped_refptr<TaskRunner> task_runner_;
const PostNestedTask post_nested_task_;
test::TestTaskFactory factory_;
......@@ -117,18 +115,20 @@ class TaskSchedulerWorkerPoolTest
case PoolType::GENERIC:
worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>(
"TestWorkerPool", "A", ThreadPriority::NORMAL,
task_tracker_.GetTrackedRef(), &delayed_task_manager_,
task_tracker_.GetTrackedRef(),
tracked_ref_factory_.GetTrackedRef());
break;
#if defined(OS_WIN)
case PoolType::WINDOWS:
worker_pool_ = std::make_unique<PlatformNativeWorkerPoolWin>(
task_tracker_.GetTrackedRef(), &delayed_task_manager_,
task_tracker_.GetTrackedRef(),
tracked_ref_factory_.GetTrackedRef());
break;
#endif
}
ASSERT_TRUE(worker_pool_);
mock_scheduler_task_runner_delegate_.SetWorkerPool(worker_pool_.get());
}
void StartWorkerPool() {
......@@ -157,6 +157,8 @@ class TaskSchedulerWorkerPoolTest
Thread service_thread_;
TaskTracker task_tracker_ = {"Test"};
DelayedTaskManager delayed_task_manager_;
test::MockSchedulerTaskRunnerDelegate mock_scheduler_task_runner_delegate_ = {
task_tracker_.GetTrackedRef(), &delayed_task_manager_};
std::unique_ptr<SchedulerWorkerPool> worker_pool_;
......@@ -183,7 +185,8 @@ TEST_P(TaskSchedulerWorkerPoolTest, PostTasks) {
std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
worker_pool_.get(), GetParam().execution_mode, PostNestedTask::NO));
&mock_scheduler_task_runner_delegate_, GetParam().execution_mode,
PostNestedTask::NO));
threads_posting_tasks.back()->Start();
}
......@@ -205,7 +208,8 @@ TEST_P(TaskSchedulerWorkerPoolTest, NestedPostTasks) {
std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
worker_pool_.get(), GetParam().execution_mode, PostNestedTask::YES));
&mock_scheduler_task_runner_delegate_, GetParam().execution_mode,
PostNestedTask::YES));
threads_posting_tasks.back()->Start();
}
......@@ -224,7 +228,7 @@ TEST_P(TaskSchedulerWorkerPoolTest, NestedPostTasks) {
TEST_P(TaskSchedulerWorkerPoolTest, PostTaskAfterShutdown) {
StartWorkerPool();
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
worker_pool_.get(), GetParam().execution_mode);
GetParam().execution_mode, &mock_scheduler_task_runner_delegate_);
task_tracker_.Shutdown();
EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
}
......@@ -234,7 +238,7 @@ TEST_P(TaskSchedulerWorkerPoolTest, PostTaskAfterShutdown) {
TEST_P(TaskSchedulerWorkerPoolTest, PostAfterDestroy) {
StartWorkerPool();
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
worker_pool_.get(), GetParam().execution_mode);
GetParam().execution_mode, &mock_scheduler_task_runner_delegate_);
EXPECT_TRUE(task_runner->PostTask(FROM_HERE, DoNothing()));
task_tracker_.Shutdown();
worker_pool_->JoinForTesting();
......@@ -250,7 +254,7 @@ TEST_P(TaskSchedulerWorkerPoolTest, PostDelayedTask) {
WaitableEvent::InitialState::NOT_SIGNALED);
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
worker_pool_.get(), GetParam().execution_mode);
GetParam().execution_mode, &mock_scheduler_task_runner_delegate_);
// Wait until the task runner is up and running to make sure the test below is
// solely timing the delayed task, not bringing up a physical thread.
......@@ -284,9 +288,9 @@ TEST_P(TaskSchedulerWorkerPoolTest, PostDelayedTask) {
TEST_P(TaskSchedulerWorkerPoolTest, SequencedRunsTasksInCurrentSequence) {
StartWorkerPool();
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
worker_pool_.get(), GetParam().execution_mode);
auto sequenced_task_runner =
worker_pool_->CreateSequencedTaskRunnerWithTraits(TaskTraits());
GetParam().execution_mode, &mock_scheduler_task_runner_delegate_);
auto sequenced_task_runner = test::CreateSequencedTaskRunnerWithTraits(
TaskTraits(), &mock_scheduler_task_runner_delegate_);
WaitableEvent task_ran;
task_runner->PostTask(
......@@ -306,8 +310,8 @@ TEST_P(TaskSchedulerWorkerPoolTest, PostBeforeStart) {
WaitableEvent task_1_running;
WaitableEvent task_2_running;
scoped_refptr<TaskRunner> task_runner =
worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
scoped_refptr<TaskRunner> task_runner = test::CreateTaskRunnerWithTraits(
{WithBaseSyncPrimitives()}, &mock_scheduler_task_runner_delegate_);
task_runner->PostTask(
FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_1_running)));
......
......@@ -16,14 +16,13 @@
#include "base/metrics/field_trial_params.h"
#include "base/stl_util.h"
#include "base/strings/string_util.h"
#include "base/task/task_scheduler/delayed_task_manager.h"
#include "base/task/task_scheduler/environment_config.h"
#include "base/task/task_scheduler/scheduler_parallel_task_runner.h"
#include "base/task/task_scheduler/scheduler_sequenced_task_runner.h"
#include "base/task/task_scheduler/scheduler_worker_pool_params.h"
#include "base/task/task_scheduler/sequence.h"
#include "base/task/task_scheduler/sequence_sort_key.h"
#include "base/task/task_scheduler/service_thread.h"
#include "base/task/task_scheduler/task.h"
#include "base/task/task_scheduler/task_tracker.h"
#include "base/time/time.h"
namespace base {
......@@ -85,8 +84,7 @@ TaskSchedulerImpl::TaskSchedulerImpl(
"."),
kEnvironmentParams[environment_type].name_suffix,
kEnvironmentParams[environment_type].priority_hint,
task_tracker_->GetTrackedRef(), &delayed_task_manager_,
tracked_ref_factory_.GetTrackedRef()));
task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef()));
}
// Map environment indexes to pools. |kMergeBlockingNonBlockingPools| is
......@@ -223,25 +221,21 @@ bool TaskSchedulerImpl::PostDelayedTaskWithTraits(const Location& from_here,
OnceClosure task,
TimeDelta delay) {
// Post |task| as part of a one-off single-task Sequence.
const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits);
return GetWorkerPoolForTraits(new_traits)
->PostTaskWithSequence(Task(from_here, std::move(task), delay),
MakeRefCounted<Sequence>(new_traits));
return PostTaskWithSequence(Task(from_here, std::move(task), delay),
MakeRefCounted<Sequence>(traits));
}
scoped_refptr<TaskRunner> TaskSchedulerImpl::CreateTaskRunnerWithTraits(
const TaskTraits& traits) {
const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits);
return GetWorkerPoolForTraits(new_traits)
->CreateTaskRunnerWithTraits(new_traits);
return MakeRefCounted<SchedulerParallelTaskRunner>(new_traits, this);
}
scoped_refptr<SequencedTaskRunner>
TaskSchedulerImpl::CreateSequencedTaskRunnerWithTraits(
const TaskTraits& traits) {
const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits);
return GetWorkerPoolForTraits(new_traits)
->CreateSequencedTaskRunnerWithTraits(new_traits);
return MakeRefCounted<SchedulerSequencedTaskRunner>(new_traits, this);
}
scoped_refptr<SingleThreadTaskRunner>
......@@ -320,6 +314,46 @@ void TaskSchedulerImpl::ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
GetWorkerPoolForTraits(new_traits)->ReEnqueueSequence(std::move(sequence));
}
bool TaskSchedulerImpl::PostTaskWithSequence(Task task,
scoped_refptr<Sequence> sequence) {
// Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
// for details.
CHECK(task.task);
DCHECK(sequence);
const TaskTraits new_traits =
SetUserBlockingPriorityIfNeeded(sequence->traits());
if (!task_tracker_->WillPostTask(&task, new_traits.shutdown_behavior()))
return false;
if (task.delayed_run_time.is_null()) {
GetWorkerPoolForTraits(new_traits)
->PostTaskWithSequenceNow(std::move(task), std::move(sequence));
} else {
delayed_task_manager_.AddDelayedTask(
std::move(task),
BindOnce(
[](scoped_refptr<Sequence> sequence,
TaskSchedulerImpl* task_scheduler_impl, Task task) {
const TaskTraits new_traits =
task_scheduler_impl->SetUserBlockingPriorityIfNeeded(
sequence->traits());
task_scheduler_impl->GetWorkerPoolForTraits(new_traits)
->PostTaskWithSequenceNow(std::move(task),
std::move(sequence));
},
std::move(sequence), Unretained(this)));
}
return true;
}
bool TaskSchedulerImpl::IsRunningPoolWithTraits(
const TaskTraits& traits) const {
return GetWorkerPoolForTraits(traits)->IsBoundToCurrentThread();
}
SchedulerWorkerPoolImpl* TaskSchedulerImpl::GetWorkerPoolForTraits(
const TaskTraits& traits) const {
return environment_to_worker_pool_[GetEnvironmentIndexForTraits(traits)];
......
......@@ -20,6 +20,7 @@
#include "base/task/task_scheduler/delayed_task_manager.h"
#include "base/task/task_scheduler/environment_config.h"
#include "base/task/task_scheduler/scheduler_single_thread_task_runner_manager.h"
#include "base/task/task_scheduler/scheduler_task_runner_delegate.h"
#include "base/task/task_scheduler/scheduler_worker_pool_impl.h"
#include "base/task/task_scheduler/task_scheduler.h"
#include "base/task/task_scheduler/task_tracker.h"
......@@ -46,7 +47,8 @@ extern const BASE_EXPORT base::Feature kMergeBlockingNonBlockingPools;
// Default TaskScheduler implementation. This class is thread-safe.
class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler,
public SchedulerWorkerPool::Delegate {
public SchedulerWorkerPool::Delegate,
public SchedulerTaskRunnerDelegate {
public:
using TaskTrackerImpl =
#if defined(OS_POSIX) && !defined(OS_NACL_SFI)
......@@ -109,6 +111,11 @@ class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler,
// SchedulerWorkerPool::Delegate:
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
// SchedulerTaskRunnerDelegate:
bool PostTaskWithSequence(Task task,
scoped_refptr<Sequence> sequence) override;
bool IsRunningPoolWithTraits(const TaskTraits& traits) const override;
const std::unique_ptr<TaskTrackerImpl> task_tracker_;
std::unique_ptr<Thread> service_thread_;
DelayedTaskManager delayed_task_manager_;
......
......@@ -386,7 +386,7 @@ TEST_P(TaskSchedulerImplTest, PostDelayedTaskWithTraitsWithDelayBeforeStart) {
scheduler_.PostDelayedTaskWithTraits(
FROM_HERE, GetParam().traits,
BindOnce(&VerifyTimeAndTaskEnvironmentAndSignalEvent, GetParam().traits,
SchedulerState::kBeforeSchedulerStart,
SchedulerState::kAfterSchedulerStart,
TimeTicks::Now() + TestTimeouts::tiny_timeout(),
Unretained(&task_running)),
TestTimeouts::tiny_timeout());
......
......@@ -6,7 +6,9 @@
#include <utility>
#include "base/task/task_scheduler/scheduler_worker_pool.h"
#include "base/bind.h"
#include "base/task/task_scheduler/scheduler_parallel_task_runner.h"
#include "base/task/task_scheduler/scheduler_sequenced_task_runner.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
......@@ -24,15 +26,17 @@ scoped_refptr<Sequence> CreateSequenceWithTask(Task task,
}
scoped_refptr<TaskRunner> CreateTaskRunnerWithExecutionMode(
SchedulerWorkerPool* worker_pool,
test::ExecutionMode execution_mode) {
test::ExecutionMode execution_mode,
MockSchedulerTaskRunnerDelegate* mock_scheduler_task_runner_delegate) {
// Allow tasks posted to the returned TaskRunner to wait on a WaitableEvent.
const TaskTraits traits = {WithBaseSyncPrimitives()};
switch (execution_mode) {
case test::ExecutionMode::PARALLEL:
return worker_pool->CreateTaskRunnerWithTraits(traits);
return CreateTaskRunnerWithTraits(traits,
mock_scheduler_task_runner_delegate);
case test::ExecutionMode::SEQUENCED:
return worker_pool->CreateSequencedTaskRunnerWithTraits(traits);
return CreateSequencedTaskRunnerWithTraits(
traits, mock_scheduler_task_runner_delegate);
default:
// Fall through.
break;
......@@ -41,6 +45,69 @@ scoped_refptr<TaskRunner> CreateTaskRunnerWithExecutionMode(
return nullptr;
}
scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits(
const TaskTraits& traits,
MockSchedulerTaskRunnerDelegate* mock_scheduler_task_runner_delegate) {
return MakeRefCounted<SchedulerParallelTaskRunner>(
traits, mock_scheduler_task_runner_delegate);
}
scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunnerWithTraits(
const TaskTraits& traits,
MockSchedulerTaskRunnerDelegate* mock_scheduler_task_runner_delegate) {
return MakeRefCounted<SchedulerSequencedTaskRunner>(
traits, mock_scheduler_task_runner_delegate);
}
MockSchedulerTaskRunnerDelegate::MockSchedulerTaskRunnerDelegate(
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager)
: task_tracker_(task_tracker),
delayed_task_manager_(delayed_task_manager) {}
MockSchedulerTaskRunnerDelegate::~MockSchedulerTaskRunnerDelegate() = default;
bool MockSchedulerTaskRunnerDelegate::PostTaskWithSequence(
Task task,
scoped_refptr<Sequence> sequence) {
// |worker_pool_| must be initialized with SetWorkerPool() before proceeding.
DCHECK(worker_pool_);
DCHECK(task.task);
DCHECK(sequence);
if (!task_tracker_->WillPostTask(&task,
sequence->traits().shutdown_behavior()))
return false;
if (task.delayed_run_time.is_null()) {
worker_pool_->PostTaskWithSequenceNow(std::move(task), std::move(sequence));
} else {
delayed_task_manager_->AddDelayedTask(
std::move(task), BindOnce(
[](scoped_refptr<Sequence> sequence,
SchedulerWorkerPool* worker_pool, Task task) {
worker_pool->PostTaskWithSequenceNow(
std::move(task), std::move(sequence));
},
std::move(sequence), worker_pool_));
}
return true;
}
bool MockSchedulerTaskRunnerDelegate::IsRunningPoolWithTraits(
const TaskTraits& traits) const {
// |worker_pool_| must be initialized with SetWorkerPool() before proceeding.
DCHECK(worker_pool_);
return worker_pool_->IsBoundToCurrentThread();
}
void MockSchedulerTaskRunnerDelegate::SetWorkerPool(
SchedulerWorkerPool* worker_pool) {
worker_pool_ = worker_pool;
}
} // namespace test
} // namespace internal
} // namespace base
......@@ -5,9 +5,12 @@
#ifndef BASE_TASK_TASK_SCHEDULER_TEST_UTILS_H_
#define BASE_TASK_TASK_SCHEDULER_TEST_UTILS_H_
#include "base/memory/ref_counted.h"
#include "base/task/task_scheduler/delayed_task_manager.h"
#include "base/task/task_scheduler/scheduler_task_runner_delegate.h"
#include "base/task/task_scheduler/scheduler_worker_observer.h"
#include "base/task/task_scheduler/scheduler_worker_pool.h"
#include "base/task/task_scheduler/sequence.h"
#include "base/task/task_scheduler/task_tracker.h"
#include "base/task/task_traits.h"
#include "base/task_runner.h"
#include "testing/gmock/include/gmock/gmock.h"
......@@ -15,7 +18,6 @@
namespace base {
namespace internal {
class SchedulerWorkerPool;
struct Task;
namespace test {
......@@ -32,6 +34,25 @@ class MockSchedulerWorkerObserver : public SchedulerWorkerObserver {
DISALLOW_COPY_AND_ASSIGN(MockSchedulerWorkerObserver);
};
class MockSchedulerTaskRunnerDelegate : public SchedulerTaskRunnerDelegate {
public:
MockSchedulerTaskRunnerDelegate(TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager);
~MockSchedulerTaskRunnerDelegate() override;
// SchedulerTaskRunnerDelegate:
bool PostTaskWithSequence(Task task,
scoped_refptr<Sequence> sequence) override;
bool IsRunningPoolWithTraits(const TaskTraits& traits) const override;
void SetWorkerPool(SchedulerWorkerPool* worker_pool);
private:
const TrackedRef<TaskTracker> task_tracker_;
DelayedTaskManager* const delayed_task_manager_;
SchedulerWorkerPool* worker_pool_ = nullptr;
};
// An enumeration of possible task scheduler TaskRunner types. Used to
// parametrize relevant task_scheduler tests.
enum class ExecutionMode { PARALLEL, SEQUENCED, SINGLE_THREADED };
......@@ -41,12 +62,21 @@ enum class ExecutionMode { PARALLEL, SEQUENCED, SINGLE_THREADED };
scoped_refptr<Sequence> CreateSequenceWithTask(Task task,
const TaskTraits& traits);
// Creates a TaskRunner that posts tasks to |worker_pool| with the
// |execution_mode| execution mode and the WithBaseSyncPrimitives() trait.
// Creates a TaskRunner that posts tasks to the worker pool owned by
// |scheduler_task_runner_delegate| with the |execution_mode| execution mode
// and the WithBaseSyncPrimitives() trait.
// Caveat: this does not support ExecutionMode::SINGLE_THREADED.
scoped_refptr<TaskRunner> CreateTaskRunnerWithExecutionMode(
SchedulerWorkerPool* worker_pool,
test::ExecutionMode execution_mode);
test::ExecutionMode execution_mode,
MockSchedulerTaskRunnerDelegate* mock_scheduler_task_runner_delegate);
scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits(
const TaskTraits& traits,
MockSchedulerTaskRunnerDelegate* mock_scheduler_task_runner_delegate);
scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunnerWithTraits(
const TaskTraits& traits,
MockSchedulerTaskRunnerDelegate* mock_scheduler_task_runner_delegate);
} // namespace test
} // namespace internal
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment