Commit 6ddc0ac9 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[TaskScheduler]: Refactor TaskRunner ownership cycle.

This CL takes Task out of the reference loop, so that TaskSource
refers to its TaskRunner directly. Task can be replaced by PendingTask
as a result (will be done as a follow-up).
A TaskRunner reference is cleared when the TaskSource becomes empty
and set by the caller when pushing a task to a Sequence.


Change-Id: I9ffeca51d392ba286494ef85bac9697da48b893d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1506769
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#643019}
parent e6395fe3
...@@ -17,9 +17,13 @@ namespace internal { ...@@ -17,9 +17,13 @@ namespace internal {
DelayedTaskManager::DelayedTask::DelayedTask() = default; DelayedTaskManager::DelayedTask::DelayedTask() = default;
DelayedTaskManager::DelayedTask::DelayedTask(Task task, DelayedTaskManager::DelayedTask::DelayedTask(
PostTaskNowCallback callback) Task task,
: task(std::move(task)), callback(std::move(callback)) {} PostTaskNowCallback callback,
scoped_refptr<TaskRunner> task_runner)
: task(std::move(task)),
callback(std::move(callback)),
task_runner(std::move(task_runner)) {}
DelayedTaskManager::DelayedTask::DelayedTask( DelayedTaskManager::DelayedTask::DelayedTask(
DelayedTaskManager::DelayedTask&& other) = default; DelayedTaskManager::DelayedTask&& other) = default;
...@@ -69,7 +73,8 @@ void DelayedTaskManager::Start( ...@@ -69,7 +73,8 @@ void DelayedTaskManager::Start(
void DelayedTaskManager::AddDelayedTask( void DelayedTaskManager::AddDelayedTask(
Task task, Task task,
PostTaskNowCallback post_task_now_callback) { PostTaskNowCallback post_task_now_callback,
scoped_refptr<TaskRunner> task_runner) {
DCHECK(task.task); DCHECK(task.task);
DCHECK(!task.delayed_run_time.is_null()); DCHECK(!task.delayed_run_time.is_null());
...@@ -79,8 +84,9 @@ void DelayedTaskManager::AddDelayedTask( ...@@ -79,8 +84,9 @@ void DelayedTaskManager::AddDelayedTask(
TimeTicks process_ripe_tasks_time; TimeTicks process_ripe_tasks_time;
{ {
AutoSchedulerLock auto_lock(queue_lock_); AutoSchedulerLock auto_lock(queue_lock_);
delayed_task_queue_.insert( delayed_task_queue_.insert(DelayedTask(std::move(task),
DelayedTask(std::move(task), std::move(post_task_now_callback))); std::move(post_task_now_callback),
std::move(task_runner)));
// Not started yet. // Not started yet.
if (service_thread_task_runner_ == nullptr) if (service_thread_task_runner_ == nullptr)
return; return;
......
...@@ -46,13 +46,18 @@ class BASE_EXPORT DelayedTaskManager { ...@@ -46,13 +46,18 @@ class BASE_EXPORT DelayedTaskManager {
void Start(scoped_refptr<TaskRunner> service_thread_task_runner); void Start(scoped_refptr<TaskRunner> service_thread_task_runner);
// Schedules a call to |post_task_now_callback| with |task| as argument when // Schedules a call to |post_task_now_callback| with |task| as argument when
// |task| is ripe for execution. // |task| is ripe for execution. |task_runner| is passed to retain a
void AddDelayedTask(Task task, PostTaskNowCallback post_task_now_callback); // reference until |task| is ripe.
void AddDelayedTask(Task task,
PostTaskNowCallback post_task_now_callback,
scoped_refptr<TaskRunner> task_runner);
private: private:
struct DelayedTask { struct DelayedTask {
DelayedTask(); DelayedTask();
DelayedTask(Task task, PostTaskNowCallback callback); DelayedTask(Task task,
PostTaskNowCallback callback,
scoped_refptr<TaskRunner> task_runner);
DelayedTask(DelayedTask&& other); DelayedTask(DelayedTask&& other);
~DelayedTask(); ~DelayedTask();
...@@ -64,6 +69,7 @@ class BASE_EXPORT DelayedTaskManager { ...@@ -64,6 +69,7 @@ class BASE_EXPORT DelayedTaskManager {
Task task; Task task;
PostTaskNowCallback callback; PostTaskNowCallback callback;
scoped_refptr<TaskRunner> task_runner;
// True iff the delayed task has been marked as scheduled. // True iff the delayed task has been marked as scheduled.
bool IsScheduled() const; bool IsScheduled() const;
......
...@@ -71,7 +71,8 @@ class TaskSchedulerDelayedTaskManagerTest : public testing::Test { ...@@ -71,7 +71,8 @@ class TaskSchedulerDelayedTaskManagerTest : public testing::Test {
// Verify that a delayed task isn't forwarded before Start(). // Verify that a delayed task isn't forwarded before Start().
TEST_F(TaskSchedulerDelayedTaskManagerTest, DelayedTaskDoesNotRunBeforeStart) { TEST_F(TaskSchedulerDelayedTaskManagerTest, DelayedTaskDoesNotRunBeforeStart) {
// Send |task| to the DelayedTaskManager. // Send |task| to the DelayedTaskManager.
delayed_task_manager_.AddDelayedTask(std::move(task_), BindOnce(&RunTask)); delayed_task_manager_.AddDelayedTask(std::move(task_), BindOnce(&RunTask),
nullptr);
// Fast-forward time until the task is ripe for execution. Since Start() has // Fast-forward time until the task is ripe for execution. Since Start() has
// not been called, the task should not be forwarded to RunTask() (MockTask is // not been called, the task should not be forwarded to RunTask() (MockTask is
...@@ -84,7 +85,8 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, DelayedTaskDoesNotRunBeforeStart) { ...@@ -84,7 +85,8 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, DelayedTaskDoesNotRunBeforeStart) {
TEST_F(TaskSchedulerDelayedTaskManagerTest, TEST_F(TaskSchedulerDelayedTaskManagerTest,
DelayedTaskPostedBeforeStartExpiresAfterStartRunsOnExpire) { DelayedTaskPostedBeforeStartExpiresAfterStartRunsOnExpire) {
// Send |task| to the DelayedTaskManager. // Send |task| to the DelayedTaskManager.
delayed_task_manager_.AddDelayedTask(std::move(task_), BindOnce(&RunTask)); delayed_task_manager_.AddDelayedTask(std::move(task_), BindOnce(&RunTask),
nullptr);
delayed_task_manager_.Start(service_thread_task_runner_); delayed_task_manager_.Start(service_thread_task_runner_);
...@@ -103,7 +105,8 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, ...@@ -103,7 +105,8 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest,
TEST_F(TaskSchedulerDelayedTaskManagerTest, TEST_F(TaskSchedulerDelayedTaskManagerTest,
DelayedTaskPostedBeforeStartExpiresBeforeStartRunsOnStart) { DelayedTaskPostedBeforeStartExpiresBeforeStartRunsOnStart) {
// Send |task| to the DelayedTaskManager. // Send |task| to the DelayedTaskManager.
delayed_task_manager_.AddDelayedTask(std::move(task_), BindOnce(&RunTask)); delayed_task_manager_.AddDelayedTask(std::move(task_), BindOnce(&RunTask),
nullptr);
// Run tasks on the service thread. Don't expect any forwarding to // Run tasks on the service thread. Don't expect any forwarding to
// |task_target_| since the task isn't ripe for execution. // |task_target_| since the task isn't ripe for execution.
...@@ -125,7 +128,8 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, DelayedTaskDoesNotRunTooEarly) { ...@@ -125,7 +128,8 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, DelayedTaskDoesNotRunTooEarly) {
delayed_task_manager_.Start(service_thread_task_runner_); delayed_task_manager_.Start(service_thread_task_runner_);
// Send |task| to the DelayedTaskManager. // Send |task| to the DelayedTaskManager.
delayed_task_manager_.AddDelayedTask(std::move(task_), BindOnce(&RunTask)); delayed_task_manager_.AddDelayedTask(std::move(task_), BindOnce(&RunTask),
nullptr);
// Run tasks that are ripe for execution. Don't expect any forwarding to // Run tasks that are ripe for execution. Don't expect any forwarding to
// RunTask(). // RunTask().
...@@ -138,7 +142,8 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, DelayedTaskRunsAfterDelay) { ...@@ -138,7 +142,8 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, DelayedTaskRunsAfterDelay) {
delayed_task_manager_.Start(service_thread_task_runner_); delayed_task_manager_.Start(service_thread_task_runner_);
// Send |task| to the DelayedTaskManager. // Send |task| to the DelayedTaskManager.
delayed_task_manager_.AddDelayedTask(std::move(task_), BindOnce(&RunTask)); delayed_task_manager_.AddDelayedTask(std::move(task_), BindOnce(&RunTask),
nullptr);
// Fast-forward time. Expect the task to be forwarded to RunTask(). // Fast-forward time. Expect the task to be forwarded to RunTask().
EXPECT_CALL(mock_task_, Run()); EXPECT_CALL(mock_task_, Run());
...@@ -166,9 +171,12 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, DelayedTasksRunAfterDelay) { ...@@ -166,9 +171,12 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, DelayedTasksRunAfterDelay) {
TimeDelta::FromHours(1)); TimeDelta::FromHours(1));
// Send tasks to the DelayedTaskManager. // Send tasks to the DelayedTaskManager.
delayed_task_manager_.AddDelayedTask(std::move(task_a), BindOnce(&RunTask)); delayed_task_manager_.AddDelayedTask(std::move(task_a), BindOnce(&RunTask),
delayed_task_manager_.AddDelayedTask(std::move(task_b), BindOnce(&RunTask)); nullptr);
delayed_task_manager_.AddDelayedTask(std::move(task_c), BindOnce(&RunTask)); delayed_task_manager_.AddDelayedTask(std::move(task_b), BindOnce(&RunTask),
nullptr);
delayed_task_manager_.AddDelayedTask(std::move(task_c), BindOnce(&RunTask),
nullptr);
// Run tasks that are ripe for execution on the service thread. Don't expect // Run tasks that are ripe for execution on the service thread. Don't expect
// any call to RunTask(). // any call to RunTask().
...@@ -194,12 +202,12 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, PostTaskDuringStart) { ...@@ -194,12 +202,12 @@ TEST_F(TaskSchedulerDelayedTaskManagerTest, PostTaskDuringStart) {
WaitableEvent task_posted; WaitableEvent task_posted;
other_thread.task_runner()->PostTask(FROM_HERE, BindLambdaForTesting([&]() { other_thread.task_runner()->PostTask(
delayed_task_manager_.AddDelayedTask( FROM_HERE, BindLambdaForTesting([&]() {
std::move(task_), delayed_task_manager_.AddDelayedTask(
BindOnce(&RunTask)); std::move(task_), BindOnce(&RunTask), other_thread.task_runner());
task_posted.Signal(); task_posted.Signal();
})); }));
delayed_task_manager_.Start(service_thread_task_runner_); delayed_task_manager_.Start(service_thread_task_runner_);
......
...@@ -25,7 +25,8 @@ namespace { ...@@ -25,7 +25,8 @@ namespace {
scoped_refptr<Sequence> MakeSequenceWithTraitsAndTask( scoped_refptr<Sequence> MakeSequenceWithTraitsAndTask(
const TaskTraits& traits) { const TaskTraits& traits) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(traits); scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
traits, nullptr, TaskSourceExecutionMode::kParallel);
sequence->BeginTransaction().PushTask( sequence->BeginTransaction().PushTask(
Task(FROM_HERE, DoNothing(), TimeDelta())); Task(FROM_HERE, DoNothing(), TimeDelta()));
return sequence; return sequence;
......
...@@ -25,7 +25,8 @@ bool SchedulerParallelTaskRunner::PostDelayedTask(const Location& from_here, ...@@ -25,7 +25,8 @@ bool SchedulerParallelTaskRunner::PostDelayedTask(const Location& from_here,
return false; return false;
// Post the task as part of a one-off single-task Sequence. // Post the task as part of a one-off single-task Sequence.
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(traits_, this); scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
traits_, this, TaskSourceExecutionMode::kParallel);
{ {
AutoSchedulerLock auto_lock(lock_); AutoSchedulerLock auto_lock(lock_);
......
...@@ -13,7 +13,10 @@ SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner( ...@@ -13,7 +13,10 @@ SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner(
const TaskTraits& traits, const TaskTraits& traits,
SchedulerTaskRunnerDelegate* scheduler_task_runner_delegate) SchedulerTaskRunnerDelegate* scheduler_task_runner_delegate)
: scheduler_task_runner_delegate_(scheduler_task_runner_delegate), : scheduler_task_runner_delegate_(scheduler_task_runner_delegate),
sequence_(MakeRefCounted<Sequence>(traits)) {} sequence_(MakeRefCounted<Sequence>(traits,
this,
TaskSourceExecutionMode::kSequenced)) {
}
SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default; SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default;
...@@ -24,7 +27,6 @@ bool SchedulerSequencedTaskRunner::PostDelayedTask(const Location& from_here, ...@@ -24,7 +27,6 @@ bool SchedulerSequencedTaskRunner::PostDelayedTask(const Location& from_here,
return false; return false;
Task task(from_here, std::move(closure), delay); Task task(from_here, std::move(closure), delay);
task.sequenced_task_runner_ref = this;
// Post the task as part of |sequence_|. // Post the task as part of |sequence_|.
return scheduler_task_runner_delegate_->PostTaskWithSequence(std::move(task), return scheduler_task_runner_delegate_->PostTaskWithSequence(std::move(task),
......
...@@ -248,7 +248,9 @@ class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate { ...@@ -248,7 +248,9 @@ class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate {
bool get_work_first_ = true; bool get_work_first_ = true;
const scoped_refptr<Sequence> message_pump_sequence_ = const scoped_refptr<Sequence> message_pump_sequence_ =
MakeRefCounted<Sequence>(TaskTraits(MayBlock())); MakeRefCounted<Sequence>(TaskTraits(MayBlock()),
nullptr,
TaskSourceExecutionMode::kParallel);
const TrackedRef<TaskTracker> task_tracker_; const TrackedRef<TaskTracker> task_tracker_;
std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_; std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
...@@ -272,7 +274,10 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner ...@@ -272,7 +274,10 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
: outer_(outer), : outer_(outer),
worker_(worker), worker_(worker),
thread_mode_(thread_mode), thread_mode_(thread_mode),
sequence_(MakeRefCounted<Sequence>(traits)) { sequence_(
MakeRefCounted<Sequence>(traits,
this,
TaskSourceExecutionMode::kSingleThread)) {
DCHECK(outer_); DCHECK(outer_);
DCHECK(worker_); DCHECK(worker_);
} }
...@@ -285,7 +290,6 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner ...@@ -285,7 +290,6 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
return false; return false;
Task task(from_here, std::move(closure), delay); Task task(from_here, std::move(closure), delay);
task.single_thread_task_runner_ref = this;
if (!outer_->task_tracker_->WillPostTask(&task, if (!outer_->task_tracker_->WillPostTask(&task,
sequence_->shutdown_behavior())) { sequence_->shutdown_behavior())) {
...@@ -297,8 +301,7 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner ...@@ -297,8 +301,7 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
} else { } else {
outer_->delayed_task_manager_->AddDelayedTask( outer_->delayed_task_manager_->AddDelayedTask(
std::move(task), std::move(task),
BindOnce(&SchedulerSingleThreadTaskRunner::PostTaskNow, BindOnce(&SchedulerSingleThreadTaskRunner::PostTaskNow, this), this);
Unretained(this)));
} }
return true; return true;
} }
......
...@@ -177,7 +177,8 @@ class TaskSchedulerWorkerTest : public testing::TestWithParam<int> { ...@@ -177,7 +177,8 @@ class TaskSchedulerWorkerTest : public testing::TestWithParam<int> {
} }
// Create a Sequence with TasksPerSequence() Tasks. // Create a Sequence with TasksPerSequence() Tasks.
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits()); scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel);
Sequence::Transaction sequence_transaction(sequence->BeginTransaction()); Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
for (int i = 0; i < outer_->TasksPerSequence(); ++i) { for (int i = 0; i < outer_->TasksPerSequence(); ++i) {
Task task(FROM_HERE, Task task(FROM_HERE,
...@@ -445,8 +446,10 @@ class ControllableCleanupDelegate : public SchedulerWorkerDefaultDelegate { ...@@ -445,8 +446,10 @@ class ControllableCleanupDelegate : public SchedulerWorkerDefaultDelegate {
} }
controls_->work_requested_ = true; controls_->work_requested_ = true;
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits( scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
WithBaseSyncPrimitives(), TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN)); TaskTraits(WithBaseSyncPrimitives(),
TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN),
nullptr, TaskSourceExecutionMode::kParallel);
Task task( Task task(
FROM_HERE, FROM_HERE,
BindOnce( BindOnce(
......
...@@ -39,6 +39,12 @@ bool Sequence::Transaction::PushTask(Task task) { ...@@ -39,6 +39,12 @@ bool Sequence::Transaction::PushTask(Task task) {
// for details. // for details.
CHECK(task.task); CHECK(task.task);
DCHECK(task.queue_time.is_null()); DCHECK(task.queue_time.is_null());
// AddRef() matched by manual Release() when the queue becomes empty again
// (in DidRunTask() or Clear()).
if (sequence()->queue_.empty() && sequence()->task_runner())
sequence()->task_runner()->AddRef();
task.queue_time = base::TimeTicks::Now(); task.queue_time = base::TimeTicks::Now();
task.task = sequence()->traits_.shutdown_behavior() == task.task = sequence()->traits_.shutdown_behavior() ==
...@@ -66,6 +72,12 @@ Optional<Task> Sequence::TakeTask() { ...@@ -66,6 +72,12 @@ Optional<Task> Sequence::TakeTask() {
return std::move(next_task); return std::move(next_task);
} }
bool Sequence::DidRunTask() {
if (queue_.empty())
ReleaseTaskRunner();
return !queue_.empty();
}
SequenceSortKey Sequence::GetSortKey() const { SequenceSortKey Sequence::GetSortKey() const {
DCHECK(!IsEmpty()); DCHECK(!IsEmpty());
return SequenceSortKey(traits_.priority(), queue_.front().queue_time); return SequenceSortKey(traits_.priority(), queue_.front().queue_time);
...@@ -76,22 +88,35 @@ bool Sequence::IsEmpty() const { ...@@ -76,22 +88,35 @@ bool Sequence::IsEmpty() const {
} }
void Sequence::Clear() { void Sequence::Clear() {
while (!IsEmpty()) bool queue_was_empty = queue_.empty();
while (!queue_.empty())
TakeTask(); TakeTask();
if (!queue_was_empty) {
// No member access after this point, ReleaseTaskRunner() might have deleted
// |this|.
ReleaseTaskRunner();
}
} }
Sequence::Sequence( void Sequence::ReleaseTaskRunner() {
const TaskTraits& traits, if (!task_runner())
scoped_refptr<SchedulerParallelTaskRunner> scheduler_parallel_task_runner) return;
: TaskSource(traits), if (execution_mode() == TaskSourceExecutionMode::kParallel) {
scheduler_parallel_task_runner_(scheduler_parallel_task_runner) {} static_cast<SchedulerParallelTaskRunner*>(task_runner())
->UnregisterSequence(this);
Sequence::~Sequence() {
if (scheduler_parallel_task_runner_) {
scheduler_parallel_task_runner_->UnregisterSequence(this);
} }
// No member access after this point, releasing |task_runner()| might delete
// |this|.
task_runner()->Release();
} }
Sequence::Sequence(const TaskTraits& traits,
TaskRunner* task_runner,
TaskSourceExecutionMode execution_mode)
: TaskSource(traits, task_runner, execution_mode) {}
Sequence::~Sequence() = default;
Sequence::Transaction Sequence::BeginTransaction() { Sequence::Transaction Sequence::BeginTransaction() {
return Transaction(this); return Transaction(this);
} }
......
...@@ -66,11 +66,13 @@ class BASE_EXPORT Sequence : public TaskSource { ...@@ -66,11 +66,13 @@ class BASE_EXPORT Sequence : public TaskSource {
}; };
// |traits| is metadata that applies to all Tasks in the Sequence. // |traits| is metadata that applies to all Tasks in the Sequence.
// |scheduler_parallel_task_runner| is a reference to the // |task_runner| is a reference to the TaskRunner feeding this TaskSource.
// SchedulerParallelTaskRunner that created this Sequence, if any. // |task_runner| can be nullptr only for tasks with no TaskRunner, in which
// case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
// execution mode of |task_runner|.
Sequence(const TaskTraits& traits, Sequence(const TaskTraits& traits,
scoped_refptr<SchedulerParallelTaskRunner> TaskRunner* task_runner,
scheduler_parallel_task_runner = nullptr); TaskSourceExecutionMode execution_mode);
// Begins a Transaction. This method cannot be called on a thread which has an // Begins a Transaction. This method cannot be called on a thread which has an
// active Sequence::Transaction. // active Sequence::Transaction.
...@@ -90,10 +92,15 @@ class BASE_EXPORT Sequence : public TaskSource { ...@@ -90,10 +92,15 @@ class BASE_EXPORT Sequence : public TaskSource {
// TaskSource: // TaskSource:
Optional<Task> TakeTask() override; Optional<Task> TakeTask() override;
bool DidRunTask() override;
SequenceSortKey GetSortKey() const override; SequenceSortKey GetSortKey() const override;
bool IsEmpty() const override; bool IsEmpty() const override;
void Clear() override; void Clear() override;
// Releases reference to TaskRunner. This might cause this object to be
// deleted; therefore, no member access should be made after this method.
void ReleaseTaskRunner();
const SequenceToken token_ = SequenceToken::Create(); const SequenceToken token_ = SequenceToken::Create();
// Queue of tasks to execute. // Queue of tasks to execute.
...@@ -102,12 +109,6 @@ class BASE_EXPORT Sequence : public TaskSource { ...@@ -102,12 +109,6 @@ class BASE_EXPORT Sequence : public TaskSource {
// Holds data stored through the SequenceLocalStorageSlot API. // Holds data stored through the SequenceLocalStorageSlot API.
SequenceLocalStorageMap sequence_local_storage_; SequenceLocalStorageMap sequence_local_storage_;
// A reference to the SchedulerParallelTaskRunner that created this Sequence,
// if any. Used to remove Sequence from the TaskRunner's list of Sequence
// references when Sequence is deleted.
const scoped_refptr<SchedulerParallelTaskRunner>
scheduler_parallel_task_runner_;
DISALLOW_COPY_AND_ASSIGN(Sequence); DISALLOW_COPY_AND_ASSIGN(Sequence);
}; };
......
...@@ -45,7 +45,8 @@ TEST(TaskSchedulerSequenceTest, PushTakeRemove) { ...@@ -45,7 +45,8 @@ TEST(TaskSchedulerSequenceTest, PushTakeRemove) {
testing::StrictMock<MockTask> mock_task_e; testing::StrictMock<MockTask> mock_task_e;
scoped_refptr<Sequence> sequence = scoped_refptr<Sequence> sequence =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::BEST_EFFORT)); MakeRefCounted<Sequence>(TaskTraits(TaskPriority::BEST_EFFORT), nullptr,
TaskSourceExecutionMode::kParallel);
Sequence::Transaction sequence_transaction(sequence->BeginTransaction()); Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
// Push task A in the sequence. PushTask() should return true since it's the // Push task A in the sequence. PushTask() should return true since it's the
...@@ -101,7 +102,8 @@ TEST(TaskSchedulerSequenceTest, GetSortKeyBestEffort) { ...@@ -101,7 +102,8 @@ TEST(TaskSchedulerSequenceTest, GetSortKeyBestEffort) {
// Create a BEST_EFFORT sequence with a task. // Create a BEST_EFFORT sequence with a task.
Task best_effort_task(FROM_HERE, DoNothing(), TimeDelta()); Task best_effort_task(FROM_HERE, DoNothing(), TimeDelta());
scoped_refptr<Sequence> best_effort_sequence = scoped_refptr<Sequence> best_effort_sequence =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::BEST_EFFORT)); MakeRefCounted<Sequence>(TaskTraits(TaskPriority::BEST_EFFORT), nullptr,
TaskSourceExecutionMode::kParallel);
Sequence::Transaction best_effort_sequence_transaction( Sequence::Transaction best_effort_sequence_transaction(
best_effort_sequence->BeginTransaction()); best_effort_sequence->BeginTransaction());
best_effort_sequence_transaction.PushTask(std::move(best_effort_task)); best_effort_sequence_transaction.PushTask(std::move(best_effort_task));
...@@ -129,7 +131,8 @@ TEST(TaskSchedulerSequenceTest, GetSortKeyForeground) { ...@@ -129,7 +131,8 @@ TEST(TaskSchedulerSequenceTest, GetSortKeyForeground) {
// Create a USER_VISIBLE sequence with a task. // Create a USER_VISIBLE sequence with a task.
Task foreground_task(FROM_HERE, DoNothing(), TimeDelta()); Task foreground_task(FROM_HERE, DoNothing(), TimeDelta());
scoped_refptr<Sequence> foreground_sequence = scoped_refptr<Sequence> foreground_sequence =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::USER_VISIBLE)); MakeRefCounted<Sequence>(TaskTraits(TaskPriority::USER_VISIBLE), nullptr,
TaskSourceExecutionMode::kParallel);
Sequence::Transaction foreground_sequence_transaction( Sequence::Transaction foreground_sequence_transaction(
foreground_sequence->BeginTransaction()); foreground_sequence->BeginTransaction());
foreground_sequence_transaction.PushTask(std::move(foreground_task)); foreground_sequence_transaction.PushTask(std::move(foreground_task));
...@@ -154,7 +157,8 @@ TEST(TaskSchedulerSequenceTest, GetSortKeyForeground) { ...@@ -154,7 +157,8 @@ TEST(TaskSchedulerSequenceTest, GetSortKeyForeground) {
// Verify that a DCHECK fires if DidRunTask() is called on a sequence which // Verify that a DCHECK fires if DidRunTask() is called on a sequence which
// didn't return a Task. // didn't return a Task.
TEST(TaskSchedulerSequenceTest, DidRunTaskWithoutTakeTask) { TEST(TaskSchedulerSequenceTest, DidRunTaskWithoutTakeTask) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits()); scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel);
Sequence::Transaction sequence_transaction(sequence->BeginTransaction()); Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
sequence_transaction.PushTask(Task(FROM_HERE, DoNothing(), TimeDelta())); sequence_transaction.PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
...@@ -164,7 +168,8 @@ TEST(TaskSchedulerSequenceTest, DidRunTaskWithoutTakeTask) { ...@@ -164,7 +168,8 @@ TEST(TaskSchedulerSequenceTest, DidRunTaskWithoutTakeTask) {
// Verify that a DCHECK fires if TakeTask() is called on a sequence whose front // Verify that a DCHECK fires if TakeTask() is called on a sequence whose front
// slot is empty. // slot is empty.
TEST(TaskSchedulerSequenceTest, TakeEmptyFrontSlot) { TEST(TaskSchedulerSequenceTest, TakeEmptyFrontSlot) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits()); scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel);
Sequence::Transaction sequence_transaction(sequence->BeginTransaction()); Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
sequence_transaction.PushTask(Task(FROM_HERE, DoNothing(), TimeDelta())); sequence_transaction.PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
...@@ -174,7 +179,8 @@ TEST(TaskSchedulerSequenceTest, TakeEmptyFrontSlot) { ...@@ -174,7 +179,8 @@ TEST(TaskSchedulerSequenceTest, TakeEmptyFrontSlot) {
// Verify that a DCHECK fires if TakeTask() is called on an empty sequence. // Verify that a DCHECK fires if TakeTask() is called on an empty sequence.
TEST(TaskSchedulerSequenceTest, TakeEmptySequence) { TEST(TaskSchedulerSequenceTest, TakeEmptySequence) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits()); scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel);
Sequence::Transaction sequence_transaction(sequence->BeginTransaction()); Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
EXPECT_DCHECK_DEATH({ sequence_transaction.TakeTask(); }); EXPECT_DCHECK_DEATH({ sequence_transaction.TakeTask(); });
} }
......
...@@ -35,11 +35,7 @@ Task::Task(const Location& posted_from, OnceClosure task, TimeDelta delay) ...@@ -35,11 +35,7 @@ Task::Task(const Location& posted_from, OnceClosure task, TimeDelta delay)
// This should be "= default but MSVC has trouble with "noexcept = default" in // This should be "= default but MSVC has trouble with "noexcept = default" in
// this case. // this case.
Task::Task(Task&& other) noexcept Task::Task(Task&& other) noexcept : PendingTask(std::move(other)) {}
: PendingTask(std::move(other)),
sequenced_task_runner_ref(std::move(other.sequenced_task_runner_ref)),
single_thread_task_runner_ref(
std::move(other.single_thread_task_runner_ref)) {}
Task::~Task() = default; Task::~Task() = default;
......
...@@ -20,6 +20,7 @@ namespace internal { ...@@ -20,6 +20,7 @@ namespace internal {
// A task is a unit of work inside the task scheduler. Support for tracing and // A task is a unit of work inside the task scheduler. Support for tracing and
// profiling inherited from PendingTask. // profiling inherited from PendingTask.
// TODO(etiennep): This class is now equivalent to PendingTask, remove it.
struct BASE_EXPORT Task : public PendingTask { struct BASE_EXPORT Task : public PendingTask {
Task(); Task();
...@@ -37,18 +38,6 @@ struct BASE_EXPORT Task : public PendingTask { ...@@ -37,18 +38,6 @@ struct BASE_EXPORT Task : public PendingTask {
Task& operator=(Task&& other); Task& operator=(Task&& other);
// A reference to the SequencedTaskRunner or SingleThreadTaskRunner that
// posted this task, if any. Used to set ThreadTaskRunnerHandle and/or
// SequencedTaskRunnerHandle while the task is running.
// Note: this creates an ownership cycle
// Sequence -> Task -> TaskRunner -> Sequence -> ...
// but that's okay as it's broken when the Task is popped from its Sequence
// after being executed which means this cycle forces the TaskRunner to stick
// around until all its tasks have been executed which is a requirement to
// support TaskRunnerHandles.
scoped_refptr<SequencedTaskRunner> sequenced_task_runner_ref;
scoped_refptr<SingleThreadTaskRunner> single_thread_task_runner_ref;
private: private:
DISALLOW_COPY_AND_ASSIGN(Task); DISALLOW_COPY_AND_ASSIGN(Task);
}; };
......
...@@ -159,8 +159,10 @@ bool TaskSchedulerImpl::PostDelayedTaskWithTraits(const Location& from_here, ...@@ -159,8 +159,10 @@ bool TaskSchedulerImpl::PostDelayedTaskWithTraits(const Location& from_here,
TimeDelta delay) { TimeDelta delay) {
// Post |task| as part of a one-off single-task Sequence. // Post |task| as part of a one-off single-task Sequence.
const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits); const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits);
return PostTaskWithSequence(Task(from_here, std::move(task), delay), return PostTaskWithSequence(
MakeRefCounted<Sequence>(new_traits)); Task(from_here, std::move(task), delay),
MakeRefCounted<Sequence>(new_traits, nullptr,
TaskSourceExecutionMode::kParallel));
} }
scoped_refptr<TaskRunner> TaskSchedulerImpl::CreateTaskRunnerWithTraits( scoped_refptr<TaskRunner> TaskSchedulerImpl::CreateTaskRunnerWithTraits(
...@@ -262,6 +264,9 @@ bool TaskSchedulerImpl::PostTaskWithSequence(Task task, ...@@ -262,6 +264,9 @@ bool TaskSchedulerImpl::PostTaskWithSequence(Task task,
GetWorkerPoolForTraits(traits)->PostTaskWithSequenceNow( GetWorkerPoolForTraits(traits)->PostTaskWithSequenceNow(
std::move(task), std::move(sequence_and_transaction)); std::move(task), std::move(sequence_and_transaction));
} else { } else {
// It's safe to take a ref on this pointer since the caller must have a ref
// to the TaskRunner in order to post.
scoped_refptr<TaskRunner> task_runner = sequence->task_runner();
delayed_task_manager_.AddDelayedTask( delayed_task_manager_.AddDelayedTask(
std::move(task), std::move(task),
BindOnce( BindOnce(
...@@ -275,7 +280,8 @@ bool TaskSchedulerImpl::PostTaskWithSequence(Task task, ...@@ -275,7 +280,8 @@ bool TaskSchedulerImpl::PostTaskWithSequence(Task task,
->PostTaskWithSequenceNow( ->PostTaskWithSequenceNow(
std::move(task), std::move(sequence_and_transaction)); std::move(task), std::move(sequence_and_transaction));
}, },
std::move(sequence), Unretained(this))); std::move(sequence), Unretained(this)),
std::move(task_runner));
} }
return true; return true;
......
...@@ -51,7 +51,7 @@ Optional<Task> TaskSource::Transaction::TakeTask() { ...@@ -51,7 +51,7 @@ Optional<Task> TaskSource::Transaction::TakeTask() {
bool TaskSource::Transaction::DidRunTask() { bool TaskSource::Transaction::DidRunTask() {
DCHECK(task_source_->has_worker_); DCHECK(task_source_->has_worker_);
task_source_->has_worker_ = false; task_source_->has_worker_ = false;
return !task_source_->IsEmpty(); return task_source_->DidRunTask();
} }
SequenceSortKey TaskSource::Transaction::GetSortKey() const { SequenceSortKey TaskSource::Transaction::GetSortKey() const {
...@@ -80,7 +80,14 @@ void TaskSource::ClearHeapHandle() { ...@@ -80,7 +80,14 @@ void TaskSource::ClearHeapHandle() {
heap_handle_ = HeapHandle(); heap_handle_ = HeapHandle();
} }
TaskSource::TaskSource(const TaskTraits& traits) : traits_(traits) {} TaskSource::TaskSource(const TaskTraits& traits,
TaskRunner* task_runner,
TaskSourceExecutionMode execution_mode)
: traits_(traits),
task_runner_(task_runner),
execution_mode_(execution_mode) {
DCHECK(task_runner_ || execution_mode_ == TaskSourceExecutionMode::kParallel);
}
TaskSource::~TaskSource() = default; TaskSource::~TaskSource() = default;
......
...@@ -22,6 +22,13 @@ ...@@ -22,6 +22,13 @@
namespace base { namespace base {
namespace internal { namespace internal {
enum class TaskSourceExecutionMode {
kParallel,
kSequenced,
kSingleThread,
kMax = kSingleThread,
};
struct BASE_EXPORT ExecutionEnvironment { struct BASE_EXPORT ExecutionEnvironment {
SequenceToken token; SequenceToken token;
SequenceLocalStorageMap* sequence_local_storage; SequenceLocalStorageMap* sequence_local_storage;
...@@ -108,7 +115,13 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> { ...@@ -108,7 +115,13 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
}; };
// |traits| is metadata that applies to all Tasks in the TaskSource. // |traits| is metadata that applies to all Tasks in the TaskSource.
explicit TaskSource(const TaskTraits& traits); // |task_runner| is a reference to the TaskRunner feeding this TaskSource.
// |task_runner| can be nullptr only for tasks with no TaskRunner, in which
// case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
// execution mode of |task_runner|.
TaskSource(const TaskTraits& traits,
TaskRunner* task_runner,
TaskSourceExecutionMode execution_mode);
// Begins a Transaction. This method cannot be called on a thread which has an // Begins a Transaction. This method cannot be called on a thread which has an
// active TaskSource::Transaction. // active TaskSource::Transaction.
...@@ -127,11 +140,23 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> { ...@@ -127,11 +140,23 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
return traits_.shutdown_behavior(); return traits_.shutdown_behavior();
} }
// A reference to TaskRunner is only retained between PushTask() and when
// DidRunTask() returns false, guaranteeing it is safe to dereference this
// pointer. Otherwise, the caller should guarantee such TaskRunner still
// exists before dereferencing.
TaskRunner* task_runner() const { return task_runner_; }
TaskSourceExecutionMode execution_mode() const { return execution_mode_; }
protected: protected:
virtual ~TaskSource(); virtual ~TaskSource();
virtual Optional<Task> TakeTask() = 0; virtual Optional<Task> TakeTask() = 0;
// Returns true if the TaskSource should be queued after this
// operation.
virtual bool DidRunTask() = 0;
virtual SequenceSortKey GetSortKey() const = 0; virtual SequenceSortKey GetSortKey() const = 0;
virtual bool IsEmpty() const = 0; virtual bool IsEmpty() const = 0;
...@@ -154,6 +179,13 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> { ...@@ -154,6 +179,13 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
// by the PriorityQueue's lock. // by the PriorityQueue's lock.
HeapHandle heap_handle_; HeapHandle heap_handle_;
// A pointer to the TaskRunner that posts to this TaskSource, if any. The
// derived class is responsible for calling AddRef() when IsEmpty() becomes
// false and Release() when IsEmpty() becomes true in DidRunTask().
TaskRunner* task_runner_;
TaskSourceExecutionMode execution_mode_;
// TODO(etiennep): Add support for TaskSources with more than one worker. // TODO(etiennep): Add support for TaskSources with more than one worker.
bool has_worker_ = false; bool has_worker_ = false;
......
...@@ -21,7 +21,8 @@ namespace { ...@@ -21,7 +21,8 @@ namespace {
class MockTaskSource : public TaskSource { class MockTaskSource : public TaskSource {
public: public:
MockTaskSource(TaskTraits traits) : TaskSource(traits) {} MockTaskSource(TaskTraits traits)
: TaskSource(traits, nullptr, TaskSourceExecutionMode::kParallel) {}
MOCK_METHOD0(GetExecutionEnvironment, ExecutionEnvironment()); MOCK_METHOD0(GetExecutionEnvironment, ExecutionEnvironment());
...@@ -31,6 +32,8 @@ class MockTaskSource : public TaskSource { ...@@ -31,6 +32,8 @@ class MockTaskSource : public TaskSource {
MOCK_CONST_METHOD0(IsEmpty, bool()); MOCK_CONST_METHOD0(IsEmpty, bool());
MOCK_METHOD0(Clear, void()); MOCK_METHOD0(Clear, void());
bool DidRunTask() override { return !IsEmpty(); }
private: private:
~MockTaskSource() override = default; ~MockTaskSource() override = default;
}; };
......
...@@ -33,9 +33,12 @@ namespace internal { ...@@ -33,9 +33,12 @@ namespace internal {
namespace { namespace {
constexpr char kParallelExecutionMode[] = "parallel"; constexpr const char* kExecutionModeString[] = {"parallel", "sequenced",
constexpr char kSequencedExecutionMode[] = "sequenced"; "single thread"};
constexpr char kSingleThreadExecutionMode[] = "single thread"; static_assert(
size(kExecutionModeString) ==
static_cast<size_t>(TaskSourceExecutionMode::kMax) + 1,
"Array kExecutionModeString is out of sync with TaskSourceExecutionMode.");
// An immutable copy of a scheduler task's info required by tracing. // An immutable copy of a scheduler task's info required by tracing.
class TaskTracingInfo : public trace_event::ConvertableToTraceFormat { class TaskTracingInfo : public trace_event::ConvertableToTraceFormat {
...@@ -64,7 +67,7 @@ void TaskTracingInfo::AppendAsTraceFormat(std::string* out) const { ...@@ -64,7 +67,7 @@ void TaskTracingInfo::AppendAsTraceFormat(std::string* out) const {
dict.SetString("task_priority", dict.SetString("task_priority",
base::TaskPriorityToString(task_traits_.priority())); base::TaskPriorityToString(task_traits_.priority()));
dict.SetString("execution_mode", execution_mode_); dict.SetString("execution_mode", execution_mode_);
if (execution_mode_ != kParallelExecutionMode) if (sequence_token_.IsValid())
dict.SetInteger("sequence_token", sequence_token_.ToInternalValue()); dict.SetInteger("sequence_token", sequence_token_.ToInternalValue());
std::string tmp; std::string tmp;
...@@ -609,29 +612,33 @@ void TaskTracker::RunOrSkipTask(Task task, ...@@ -609,29 +612,33 @@ void TaskTracker::RunOrSkipTask(Task task,
// Set up TaskRunnerHandle as expected for the scope of the task. // Set up TaskRunnerHandle as expected for the scope of the task.
Optional<SequencedTaskRunnerHandle> sequenced_task_runner_handle; Optional<SequencedTaskRunnerHandle> sequenced_task_runner_handle;
Optional<ThreadTaskRunnerHandle> single_thread_task_runner_handle; Optional<ThreadTaskRunnerHandle> single_thread_task_runner_handle;
DCHECK(!task.sequenced_task_runner_ref || switch (sequence->execution_mode()) {
!task.single_thread_task_runner_ref); case TaskSourceExecutionMode::kParallel:
if (task.sequenced_task_runner_ref) { break;
sequenced_task_runner_handle.emplace(task.sequenced_task_runner_ref); case TaskSourceExecutionMode::kSequenced:
} else if (task.single_thread_task_runner_ref) { DCHECK(sequence->task_runner());
single_thread_task_runner_handle.emplace( sequenced_task_runner_handle.emplace(
task.single_thread_task_runner_ref); static_cast<SequencedTaskRunner*>(sequence->task_runner()));
break;
case TaskSourceExecutionMode::kSingleThread:
DCHECK(sequence->task_runner());
single_thread_task_runner_handle.emplace(
static_cast<SingleThreadTaskRunner*>(sequence->task_runner()));
break;
} }
if (can_run_task) { if (can_run_task) {
TRACE_TASK_EXECUTION("TaskScheduler_RunTask", task); TRACE_TASK_EXECUTION("TaskScheduler_RunTask", task);
const char* const execution_mode =
task.single_thread_task_runner_ref
? kSingleThreadExecutionMode
: (task.sequenced_task_runner_ref ? kSequencedExecutionMode
: kParallelExecutionMode);
// TODO(gab): In a better world this would be tacked on as an extra arg // TODO(gab): In a better world this would be tacked on as an extra arg
// to the trace event generated above. This is not possible however until // to the trace event generated above. This is not possible however until
// http://crbug.com/652692 is resolved. // http://crbug.com/652692 is resolved.
TRACE_EVENT1("task_scheduler", "TaskScheduler_TaskInfo", "task_info", TRACE_EVENT1("task_scheduler", "TaskScheduler_TaskInfo", "task_info",
std::make_unique<TaskTracingInfo>(traits, execution_mode, std::make_unique<TaskTracingInfo>(
environment.token)); traits,
kExecutionModeString[static_cast<size_t>(
sequence->execution_mode())],
environment.token));
RunTaskWithShutdownBehavior(traits.shutdown_behavior(), &task); RunTaskWithShutdownBehavior(traits.shutdown_behavior(), &task);
} }
......
...@@ -80,14 +80,16 @@ TEST_F(TaskSchedulerTaskTrackerPosixTest, FileDescriptorWatcher) { ...@@ -80,14 +80,16 @@ TEST_F(TaskSchedulerTaskTrackerPosixTest, FileDescriptorWatcher) {
DoNothing()), DoNothing()),
TimeDelta()); TimeDelta());
constexpr TaskTraits default_traits = {}; constexpr TaskTraits default_traits = {};
// FileDescriptorWatcher::WatchReadable needs a SequencedTaskRunnerHandle.
task.sequenced_task_runner_ref = MakeRefCounted<NullTaskRunner>();
EXPECT_TRUE(tracker_.WillPostTask(&task, default_traits.shutdown_behavior())); EXPECT_TRUE(tracker_.WillPostTask(&task, default_traits.shutdown_behavior()));
auto sequence = test::CreateSequenceWithTask(std::move(task), default_traits); // FileDescriptorWatcher::WatchReadable needs a SequencedTaskRunnerHandle.
auto sequence = test::CreateSequenceWithTask(
std::move(task), default_traits, MakeRefCounted<NullTaskRunner>(),
TaskSourceExecutionMode::kSequenced);
EXPECT_TRUE( EXPECT_TRUE(
tracker_.WillScheduleSequence(sequence->BeginTransaction(), nullptr)); tracker_.WillScheduleSequence(sequence->BeginTransaction(), nullptr));
// Expect RunAndPopNextTask to return nullptr since |sequence| is empty after // Expect RunAndPopNextTask to return nullptr since |sequence| is empty after
// popping a task from it. // popping a task from it.
EXPECT_FALSE(tracker_.RunAndPopNextTask(sequence, nullptr)); EXPECT_FALSE(tracker_.RunAndPopNextTask(sequence, nullptr));
......
...@@ -512,9 +512,12 @@ TEST_P(TaskSchedulerTaskTrackerTest, IOAllowed) { ...@@ -512,9 +512,12 @@ TEST_P(TaskSchedulerTaskTrackerTest, IOAllowed) {
traits_without_may_block); traits_without_may_block);
} }
static void RunTaskRunnerHandleVerificationTask(TaskTracker* tracker, static void RunTaskRunnerHandleVerificationTask(
Task verify_task, TaskTracker* tracker,
TaskTraits traits) { Task verify_task,
TaskTraits traits,
scoped_refptr<TaskRunner> task_runner,
TaskSourceExecutionMode execution_mode) {
// Pretend |verify_task| is posted to respect TaskTracker's contract. // Pretend |verify_task| is posted to respect TaskTracker's contract.
EXPECT_TRUE(tracker->WillPostTask(&verify_task, traits.shutdown_behavior())); EXPECT_TRUE(tracker->WillPostTask(&verify_task, traits.shutdown_behavior()));
...@@ -524,7 +527,9 @@ static void RunTaskRunnerHandleVerificationTask(TaskTracker* tracker, ...@@ -524,7 +527,9 @@ static void RunTaskRunnerHandleVerificationTask(TaskTracker* tracker,
EXPECT_FALSE(SequencedTaskRunnerHandle::IsSet()); EXPECT_FALSE(SequencedTaskRunnerHandle::IsSet());
testing::StrictMock<MockCanScheduleSequenceObserver> never_notified_observer; testing::StrictMock<MockCanScheduleSequenceObserver> never_notified_observer;
auto sequence = test::CreateSequenceWithTask(std::move(verify_task), traits); auto sequence = test::CreateSequenceWithTask(
std::move(verify_task), traits, std::move(task_runner), execution_mode);
ASSERT_TRUE(tracker->WillScheduleSequence(sequence->BeginTransaction(), ASSERT_TRUE(tracker->WillScheduleSequence(sequence->BeginTransaction(),
&never_notified_observer)); &never_notified_observer));
tracker->RunAndPopNextTask(std::move(sequence), &never_notified_observer); tracker->RunAndPopNextTask(std::move(sequence), &never_notified_observer);
...@@ -545,7 +550,8 @@ TEST_P(TaskSchedulerTaskTrackerTest, TaskRunnerHandleIsNotSetOnParallel) { ...@@ -545,7 +550,8 @@ TEST_P(TaskSchedulerTaskTrackerTest, TaskRunnerHandleIsNotSetOnParallel) {
Task verify_task(FROM_HERE, BindOnce(&VerifyNoTaskRunnerHandle), TimeDelta()); Task verify_task(FROM_HERE, BindOnce(&VerifyNoTaskRunnerHandle), TimeDelta());
RunTaskRunnerHandleVerificationTask(&tracker_, std::move(verify_task), RunTaskRunnerHandleVerificationTask(&tracker_, std::move(verify_task),
TaskTraits(GetParam())); TaskTraits(GetParam()), nullptr,
TaskSourceExecutionMode::kParallel);
} }
static void VerifySequencedTaskRunnerHandle( static void VerifySequencedTaskRunnerHandle(
...@@ -566,10 +572,10 @@ TEST_P(TaskSchedulerTaskTrackerTest, ...@@ -566,10 +572,10 @@ TEST_P(TaskSchedulerTaskTrackerTest,
BindOnce(&VerifySequencedTaskRunnerHandle, BindOnce(&VerifySequencedTaskRunnerHandle,
Unretained(test_task_runner.get())), Unretained(test_task_runner.get())),
TimeDelta()); TimeDelta());
verify_task.sequenced_task_runner_ref = test_task_runner;
RunTaskRunnerHandleVerificationTask(&tracker_, std::move(verify_task), RunTaskRunnerHandleVerificationTask(
TaskTraits(GetParam())); &tracker_, std::move(verify_task), TaskTraits(GetParam()),
std::move(test_task_runner), TaskSourceExecutionMode::kSequenced);
} }
static void VerifyThreadTaskRunnerHandle( static void VerifyThreadTaskRunnerHandle(
...@@ -592,10 +598,10 @@ TEST_P(TaskSchedulerTaskTrackerTest, ...@@ -592,10 +598,10 @@ TEST_P(TaskSchedulerTaskTrackerTest,
BindOnce(&VerifyThreadTaskRunnerHandle, BindOnce(&VerifyThreadTaskRunnerHandle,
Unretained(test_task_runner.get())), Unretained(test_task_runner.get())),
TimeDelta()); TimeDelta());
verify_task.single_thread_task_runner_ref = test_task_runner;
RunTaskRunnerHandleVerificationTask(&tracker_, std::move(verify_task), RunTaskRunnerHandleVerificationTask(
TaskTraits(GetParam())); &tracker_, std::move(verify_task), TaskTraits(GetParam()),
std::move(test_task_runner), TaskSourceExecutionMode::kSingleThread);
} }
TEST_P(TaskSchedulerTaskTrackerTest, FlushPendingDelayedTask) { TEST_P(TaskSchedulerTaskTrackerTest, FlushPendingDelayedTask) {
...@@ -890,7 +896,8 @@ void ExpectSequenceToken(SequenceToken sequence_token) { ...@@ -890,7 +896,8 @@ void ExpectSequenceToken(SequenceToken sequence_token) {
// Verify that SequenceToken::GetForCurrentThread() returns the Sequence's token // Verify that SequenceToken::GetForCurrentThread() returns the Sequence's token
// when a Task runs. // when a Task runs.
TEST_F(TaskSchedulerTaskTrackerTest, CurrentSequenceToken) { TEST_F(TaskSchedulerTaskTrackerTest, CurrentSequenceToken) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits()); scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel);
const SequenceToken sequence_token = sequence->token(); const SequenceToken sequence_token = sequence->token();
Task task(FROM_HERE, Bind(&ExpectSequenceToken, sequence_token), TimeDelta()); Task task(FROM_HERE, Bind(&ExpectSequenceToken, sequence_token), TimeDelta());
......
...@@ -45,9 +45,13 @@ void MockSchedulerWorkerObserver::OnSchedulerWorkerMainExit() { ...@@ -45,9 +45,13 @@ void MockSchedulerWorkerObserver::OnSchedulerWorkerMainExit() {
on_main_exit_cv_->Signal(); on_main_exit_cv_->Signal();
} }
scoped_refptr<Sequence> CreateSequenceWithTask(Task task, scoped_refptr<Sequence> CreateSequenceWithTask(
const TaskTraits& traits) { Task task,
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(traits); const TaskTraits& traits,
scoped_refptr<TaskRunner> task_runner,
TaskSourceExecutionMode execution_mode) {
scoped_refptr<Sequence> sequence =
MakeRefCounted<Sequence>(traits, task_runner.get(), execution_mode);
sequence->BeginTransaction().PushTask(std::move(task)); sequence->BeginTransaction().PushTask(std::move(task));
return sequence; return sequence;
} }
...@@ -118,6 +122,9 @@ bool MockSchedulerTaskRunnerDelegate::PostTaskWithSequence( ...@@ -118,6 +122,9 @@ bool MockSchedulerTaskRunnerDelegate::PostTaskWithSequence(
std::move(task), std::move(task),
SequenceAndTransaction::FromSequence(std::move(sequence))); SequenceAndTransaction::FromSequence(std::move(sequence)));
} else { } else {
// It's safe to take a ref on this pointer since the caller must have a ref
// to the TaskRunner in order to post.
scoped_refptr<TaskRunner> task_runner = sequence->task_runner();
delayed_task_manager_->AddDelayedTask( delayed_task_manager_->AddDelayedTask(
std::move(task), std::move(task),
BindOnce( BindOnce(
...@@ -127,7 +134,8 @@ bool MockSchedulerTaskRunnerDelegate::PostTaskWithSequence( ...@@ -127,7 +134,8 @@ bool MockSchedulerTaskRunnerDelegate::PostTaskWithSequence(
std::move(task), std::move(task),
SequenceAndTransaction::FromSequence(std::move(sequence))); SequenceAndTransaction::FromSequence(std::move(sequence)));
}, },
std::move(sequence), worker_pool_)); std::move(sequence), worker_pool_),
std::move(task_runner));
} }
return true; return true;
......
...@@ -69,12 +69,18 @@ class MockSchedulerTaskRunnerDelegate : public SchedulerTaskRunnerDelegate { ...@@ -69,12 +69,18 @@ class MockSchedulerTaskRunnerDelegate : public SchedulerTaskRunnerDelegate {
// An enumeration of possible task scheduler TaskRunner types. Used to // An enumeration of possible task scheduler TaskRunner types. Used to
// parametrize relevant task_scheduler tests. // parametrize relevant task_scheduler tests.
// TODO(etiennep): Migrate to TaskSourceExecutionMode.
enum class ExecutionMode { PARALLEL, SEQUENCED, SINGLE_THREADED }; enum class ExecutionMode { PARALLEL, SEQUENCED, SINGLE_THREADED };
// Creates a Sequence with given |traits| and pushes |task| to it. Returns that // Creates a Sequence with given |traits| and pushes |task| to it. If a
// Sequence. // TaskRunner is associated with |task|, it should be be passed as |task_runner|
scoped_refptr<Sequence> CreateSequenceWithTask(Task task, // along with its |execution_mode|. Returns the created Sequence.
const TaskTraits& traits); scoped_refptr<Sequence> CreateSequenceWithTask(
Task task,
const TaskTraits& traits,
scoped_refptr<TaskRunner> task_runner = nullptr,
TaskSourceExecutionMode execution_mode =
TaskSourceExecutionMode::kParallel);
// Creates a TaskRunner that posts tasks to the worker pool owned by // Creates a TaskRunner that posts tasks to the worker pool owned by
// |scheduler_task_runner_delegate| with the |execution_mode| execution mode // |scheduler_task_runner_delegate| with the |execution_mode| execution mode
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment