Commit f45c2c54 authored by Jesse McKenna's avatar Jesse McKenna Committed by Commit Bot

TaskScheduler: Lock Sequence via Sequence::Transaction where needed for priority update

Bug: 889029
Change-Id: I6e3fa2029dc9875fd742d84fea65ceb8c071c61f
Reviewed-on: https://chromium-review.googlesource.com/c/1333991
Commit-Queue: Jesse McKenna <jessemckenna@google.com>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#609041}
parent b68b4651
......@@ -61,8 +61,8 @@ void PlatformNativeWorkerPoolWin::JoinForTesting() {
}
void PlatformNativeWorkerPoolWin::ReEnqueueSequence(
scoped_refptr<Sequence> sequence) {
OnCanScheduleSequence(std::move(sequence));
std::unique_ptr<Sequence::Transaction> sequence_transaction) {
OnCanScheduleSequence(std::move(sequence_transaction));
}
// static
......@@ -100,11 +100,16 @@ scoped_refptr<Sequence> PlatformNativeWorkerPoolWin::GetWork() {
void PlatformNativeWorkerPoolWin::OnCanScheduleSequence(
scoped_refptr<Sequence> sequence) {
const SequenceSortKey sequence_sort_key =
sequence->BeginTransaction()->GetSortKey();
auto transaction(priority_queue_.BeginTransaction());
DCHECK(sequence);
OnCanScheduleSequence(sequence->BeginTransaction());
}
void PlatformNativeWorkerPoolWin::OnCanScheduleSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) {
DCHECK(sequence_transaction);
transaction->Push(std::move(sequence), sequence_sort_key);
priority_queue_.BeginTransaction()->Push(sequence_transaction->sequence(),
sequence_transaction->GetSortKey());
if (started_) {
// TODO(fdoray): Handle priorities by having different work objects and
// using ::SetThreadpoolCallbackPriority() and
......
......@@ -42,7 +42,8 @@ class BASE_EXPORT PlatformNativeWorkerPoolWin : public SchedulerWorkerPool {
// SchedulerWorkerPool:
void JoinForTesting() override;
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
void ReEnqueueSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) override;
private:
// Callback that gets run by |pool_|. It runs a task off the next sequence on
......@@ -53,6 +54,8 @@ class BASE_EXPORT PlatformNativeWorkerPoolWin : public SchedulerWorkerPool {
// SchedulerWorkerPool:
void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override;
void OnCanScheduleSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) override;
// Returns the top Sequence off the |priority_queue_|. Returns nullptr
// if the |priority_queue_| is empty.
......
......@@ -338,8 +338,8 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
const bool sequence_was_empty =
sequence_->BeginTransaction()->PushTask(std::move(task));
if (sequence_was_empty) {
if (outer_->task_tracker_->WillScheduleSequence(sequence_,
GetDelegate())) {
if (outer_->task_tracker_->WillScheduleSequence(
sequence_->BeginTransaction(), GetDelegate())) {
GetDelegate()->ReEnqueueSequence(sequence_);
worker_->WakeUp();
}
......
......@@ -51,25 +51,27 @@ bool SchedulerWorkerPool::IsBoundToCurrentThread() const {
void SchedulerWorkerPool::PostTaskWithSequenceNow(
Task task,
scoped_refptr<Sequence> sequence) {
std::unique_ptr<Sequence::Transaction> sequence_transaction) {
DCHECK(task.task);
DCHECK(sequence);
DCHECK(sequence_transaction);
// Confirm that |task| is ready to run (its delayed run time is either null or
// in the past).
DCHECK_LE(task.delayed_run_time, TimeTicks::Now());
const bool sequence_was_empty =
sequence->BeginTransaction()->PushTask(std::move(task));
sequence_transaction->PushTask(std::move(task));
if (sequence_was_empty) {
// Try to schedule |sequence| if it was empty before |task| was inserted
// into it. Otherwise, one of these must be true:
// - |sequence| is already scheduled, or,
// - The pool is running a Task from |sequence|. The pool is expected to
// reschedule |sequence| once it's done running the Task.
sequence = task_tracker_->WillScheduleSequence(std::move(sequence), this);
if (sequence)
OnCanScheduleSequence(std::move(sequence));
// Try to schedule the Sequence locked by |sequence_transaction| if it was
// empty before |task| was inserted into it. Otherwise, one of these must be
// true:
// - The Sequence is already scheduled, or,
// - The pool is running a Task from the Sequence. The pool is expected to
// reschedule the Sequence once it's done running the Task.
sequence_transaction = task_tracker_->WillScheduleSequence(
std::move(sequence_transaction), this);
if (sequence_transaction)
OnCanScheduleSequence(std::move(sequence_transaction));
}
}
......
......@@ -25,19 +25,26 @@ class BASE_EXPORT SchedulerWorkerPool : public CanScheduleSequenceObserver {
public:
virtual ~Delegate() = default;
// Invoked when a |sequence| is non-empty after the SchedulerWorkerPool has
// run a task from it. The implementation must enqueue |sequence| in the
// appropriate priority queue, depending on |sequence| traits.
virtual void ReEnqueueSequence(scoped_refptr<Sequence> sequence) = 0;
// Invoked when the Sequence locked by |sequence_transaction| is non-empty
// after the SchedulerWorkerPool has run a task from it. The implementation
// must enqueue the Sequence in the appropriate priority queue, depending
// on the Sequence's traits.
virtual void ReEnqueueSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) = 0;
};
~SchedulerWorkerPool() override;
// Posts |task| to be executed by this SchedulerWorkerPool as part of
// |sequence|. This must only be called after |task| has gone through
// TaskTracker::WillPostTask() and after |task|'s delayed run
// time.
void PostTaskWithSequenceNow(Task task, scoped_refptr<Sequence> sequence);
// CanScheduleSequenceObserver:
void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override = 0;
// Posts |task| to be executed by this SchedulerWorkerPool as part of the
// Sequence locked by |sequence_transaction|. This must only be called after
// |task| has gone through TaskTracker::WillPostTask() and after |task|'s
// delayed run time.
void PostTaskWithSequenceNow(
Task task,
std::unique_ptr<Sequence::Transaction> sequence_transaction);
// Registers the worker pool in TLS.
void BindToCurrentThread();
......@@ -56,10 +63,17 @@ class BASE_EXPORT SchedulerWorkerPool : public CanScheduleSequenceObserver {
// task during JoinForTesting(). This can only be called once.
virtual void JoinForTesting() = 0;
// Enqueues |sequence| in the worker pool's priority queue, then wakes up a
// worker if the worker pool is not bound to the current thread, i.e. if
// |sequence| is changing pools.
virtual void ReEnqueueSequence(scoped_refptr<Sequence> sequence) = 0;
// Enqueues the Sequence locked by |sequence_transaction| in the worker
// pool's priority queue, then wakes up a worker if the worker pool is not
// bound to the current thread, i.e. if the Sequence is changing pools.
virtual void ReEnqueueSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) = 0;
// Called when a Sequence locked by |sequence_transaction| can be scheduled.
// It is expected that TaskTracker::RunNextTask() will be called with the
// Sequence as argument after this is called.
virtual void OnCanScheduleSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) = 0;
protected:
SchedulerWorkerPool(TrackedRef<TaskTracker> task_tracker,
......
......@@ -273,16 +273,21 @@ SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
void SchedulerWorkerPoolImpl::OnCanScheduleSequence(
scoped_refptr<Sequence> sequence) {
PushSequenceToPriorityQueue(std::move(sequence));
DCHECK(sequence);
OnCanScheduleSequence(sequence->BeginTransaction());
}
void SchedulerWorkerPoolImpl::OnCanScheduleSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) {
PushSequenceToPriorityQueue(std::move(sequence_transaction));
WakeUpOneWorker();
}
void SchedulerWorkerPoolImpl::PushSequenceToPriorityQueue(
scoped_refptr<Sequence> sequence) {
DCHECK(sequence);
const auto sequence_sort_key = sequence->BeginTransaction()->GetSortKey();
shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
sequence_sort_key);
std::unique_ptr<Sequence::Transaction> sequence_transaction) {
DCHECK(sequence_transaction);
shared_priority_queue_.BeginTransaction()->Push(
sequence_transaction->sequence(), sequence_transaction->GetSortKey());
}
void SchedulerWorkerPoolImpl::GetHistograms(
......@@ -364,8 +369,8 @@ void SchedulerWorkerPoolImpl::JoinForTesting() {
}
void SchedulerWorkerPoolImpl::ReEnqueueSequence(
scoped_refptr<Sequence> sequence) {
PushSequenceToPriorityQueue(std::move(sequence));
std::unique_ptr<Sequence::Transaction> sequence_transaction) {
PushSequenceToPriorityQueue(std::move(sequence_transaction));
if (!IsBoundToCurrentThread())
WakeUpOneWorker();
}
......@@ -558,7 +563,8 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() {
void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::ReEnqueueSequence(
scoped_refptr<Sequence> sequence) {
outer_->delegate_->ReEnqueueSequence(std::move(sequence));
DCHECK(sequence);
outer_->delegate_->ReEnqueueSequence(sequence->BeginTransaction());
}
TimeDelta
......
......@@ -95,7 +95,8 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// SchedulerWorkerPool:
void JoinForTesting() override;
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
void ReEnqueueSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) override;
const HistogramBase* num_tasks_before_detach_histogram() const {
return num_tasks_before_detach_histogram_;
......@@ -163,9 +164,13 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// SchedulerWorkerPool:
void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override;
void OnCanScheduleSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) override;
// Pushes |sequence| to |shared_priority_queue_|.
void PushSequenceToPriorityQueue(scoped_refptr<Sequence> sequence);
// Pushes the Sequence locked by |sequence_transaction| to
// |shared_priority_queue_|.
void PushSequenceToPriorityQueue(
std::unique_ptr<Sequence::Transaction> sequence_transaction);
// Waits until at least |n| workers are idle. |lock_| must be held to call
// this function.
......
......@@ -132,8 +132,9 @@ class TaskSchedulerWorkerPoolImplTestBase
private:
// SchedulerWorkerPool::Delegate:
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
worker_pool_->ReEnqueueSequence(std::move(sequence));
void ReEnqueueSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) override {
worker_pool_->ReEnqueueSequence(std::move(sequence_transaction));
}
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestBase);
......
......@@ -164,8 +164,9 @@ class TaskSchedulerWorkerPoolTest
private:
// SchedulerWorkerPool::Delegate:
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
worker_pool_->ReEnqueueSequence(std::move(sequence));
void ReEnqueueSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) override {
worker_pool_->ReEnqueueSequence(std::move(sequence_transaction));
}
TrackedRefFactory<SchedulerWorkerPool::Delegate> tracked_ref_factory_;
......
......@@ -181,6 +181,8 @@ class TaskSchedulerWorkerTest : public testing::TestWithParam<size_t> {
// Create a Sequence with TasksPerSequence() Tasks.
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits());
std::unique_ptr<Sequence::Transaction> sequence_transaction =
sequence->BeginTransaction();
for (size_t i = 0; i < outer_->TasksPerSequence(); ++i) {
Task task(FROM_HERE,
BindOnce(&TaskSchedulerWorkerTest::RunTaskCallback,
......@@ -188,7 +190,7 @@ class TaskSchedulerWorkerTest : public testing::TestWithParam<size_t> {
TimeDelta());
EXPECT_TRUE(outer_->task_tracker_.WillPostTask(
&task, sequence->traits().shutdown_behavior()));
sequence->BeginTransaction()->PushTask(std::move(task));
sequence_transaction->PushTask(std::move(task));
}
ExpectCallToDidRunTask();
......@@ -199,9 +201,9 @@ class TaskSchedulerWorkerTest : public testing::TestWithParam<size_t> {
outer_->created_sequences_.push_back(sequence);
}
sequence = outer_->task_tracker_.WillScheduleSequence(std::move(sequence),
nullptr);
EXPECT_TRUE(sequence);
sequence_transaction = outer_->task_tracker_.WillScheduleSequence(
std::move(sequence_transaction), nullptr);
EXPECT_TRUE(sequence_transaction);
return sequence;
}
......@@ -221,10 +223,11 @@ class TaskSchedulerWorkerTest : public testing::TestWithParam<size_t> {
// Verify that |sequence| contains TasksPerSequence() - 1 Tasks.
for (size_t i = 0; i < outer_->TasksPerSequence() - 1; ++i) {
std::unique_ptr<Sequence::Transaction> transaction =
std::unique_ptr<Sequence::Transaction> sequence_transaction =
sequence->BeginTransaction();
EXPECT_TRUE(transaction->TakeTask());
EXPECT_EQ(i == outer_->TasksPerSequence() - 2, transaction->Pop());
EXPECT_TRUE(sequence_transaction->TakeTask());
EXPECT_EQ(i == outer_->TasksPerSequence() - 2,
sequence_transaction->Pop());
}
// Add |sequence| to |re_enqueued_sequences_|.
......@@ -457,10 +460,12 @@ class ControllableCleanupDelegate : public SchedulerWorkerDefaultDelegate {
TimeDelta());
EXPECT_TRUE(task_tracker_->WillPostTask(
&task, sequence->traits().shutdown_behavior()));
sequence->BeginTransaction()->PushTask(std::move(task));
sequence =
task_tracker_->WillScheduleSequence(std::move(sequence), nullptr);
EXPECT_TRUE(sequence);
std::unique_ptr<Sequence::Transaction> sequence_transaction =
sequence->BeginTransaction();
sequence_transaction->PushTask(std::move(task));
sequence_transaction = task_tracker_->WillScheduleSequence(
std::move(sequence_transaction), nullptr);
EXPECT_TRUE(sequence_transaction);
return sequence;
}
......
......@@ -123,7 +123,7 @@ class BASE_EXPORT Sequence : public RefCountedThreadSafe<Sequence> {
const SequenceToken token_ = SequenceToken::Create();
// Synchronizes access to all members.
mutable SchedulerLock lock_;
mutable SchedulerLock lock_{UniversalPredecessor()};
// Queue of tasks to execute.
base::queue<Task> queue_;
......
......@@ -44,56 +44,56 @@ TEST(TaskSchedulerSequenceTest, PushTakeRemove) {
testing::StrictMock<MockTask> mock_task_d;
testing::StrictMock<MockTask> mock_task_e;
std::unique_ptr<Sequence::Transaction> transaction =
std::unique_ptr<Sequence::Transaction> sequence_transaction =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::BEST_EFFORT))
->BeginTransaction();
// Push task A in the sequence. PushTask() should return true since it's the
// first task->
EXPECT_TRUE(transaction->PushTask(CreateTask(&mock_task_a)));
EXPECT_TRUE(sequence_transaction->PushTask(CreateTask(&mock_task_a)));
// Push task B, C and D in the sequence. PushTask() should return false since
// there is already a task in a sequence.
EXPECT_FALSE(transaction->PushTask(CreateTask(&mock_task_b)));
EXPECT_FALSE(transaction->PushTask(CreateTask(&mock_task_c)));
EXPECT_FALSE(transaction->PushTask(CreateTask(&mock_task_d)));
EXPECT_FALSE(sequence_transaction->PushTask(CreateTask(&mock_task_b)));
EXPECT_FALSE(sequence_transaction->PushTask(CreateTask(&mock_task_c)));
EXPECT_FALSE(sequence_transaction->PushTask(CreateTask(&mock_task_d)));
// Take the task in front of the sequence. It should be task A.
Optional<Task> task = transaction->TakeTask();
Optional<Task> task = sequence_transaction->TakeTask();
ExpectMockTask(&mock_task_a, &task.value());
EXPECT_FALSE(task->sequenced_time.is_null());
// Remove the empty slot. Task B should now be in front.
EXPECT_FALSE(transaction->Pop());
task = transaction->TakeTask();
EXPECT_FALSE(sequence_transaction->Pop());
task = sequence_transaction->TakeTask();
ExpectMockTask(&mock_task_b, &task.value());
EXPECT_FALSE(task->sequenced_time.is_null());
// Remove the empty slot. Task C should now be in front.
EXPECT_FALSE(transaction->Pop());
task = transaction->TakeTask();
EXPECT_FALSE(sequence_transaction->Pop());
task = sequence_transaction->TakeTask();
ExpectMockTask(&mock_task_c, &task.value());
EXPECT_FALSE(task->sequenced_time.is_null());
// Remove the empty slot.
EXPECT_FALSE(transaction->Pop());
EXPECT_FALSE(sequence_transaction->Pop());
// Push task E in the sequence.
EXPECT_FALSE(transaction->PushTask(CreateTask(&mock_task_e)));
EXPECT_FALSE(sequence_transaction->PushTask(CreateTask(&mock_task_e)));
// Task D should be in front.
task = transaction->TakeTask();
task = sequence_transaction->TakeTask();
ExpectMockTask(&mock_task_d, &task.value());
EXPECT_FALSE(task->sequenced_time.is_null());
// Remove the empty slot. Task E should now be in front.
EXPECT_FALSE(transaction->Pop());
task = transaction->TakeTask();
EXPECT_FALSE(sequence_transaction->Pop());
task = sequence_transaction->TakeTask();
ExpectMockTask(&mock_task_e, &task.value());
EXPECT_FALSE(task->sequenced_time.is_null());
// Remove the empty slot. The sequence should now be empty.
EXPECT_TRUE(transaction->Pop());
EXPECT_TRUE(sequence_transaction->Pop());
}
// Verifies the sort key of a BEST_EFFORT sequence that contains one task.
......@@ -152,29 +152,29 @@ TEST(TaskSchedulerSequenceTest, GetSortKeyForeground) {
// Verify that a DCHECK fires if Pop() is called on a sequence whose front slot
// isn't empty.
TEST(TaskSchedulerSequenceTest, PopNonEmptyFrontSlot) {
std::unique_ptr<Sequence::Transaction> transaction =
std::unique_ptr<Sequence::Transaction> sequence_transaction =
MakeRefCounted<Sequence>(TaskTraits())->BeginTransaction();
transaction->PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
sequence_transaction->PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
EXPECT_DCHECK_DEATH({ transaction->Pop(); });
EXPECT_DCHECK_DEATH({ sequence_transaction->Pop(); });
}
// Verify that a DCHECK fires if TakeTask() is called on a sequence whose front
// slot is empty.
TEST(TaskSchedulerSequenceTest, TakeEmptyFrontSlot) {
std::unique_ptr<Sequence::Transaction> transaction =
std::unique_ptr<Sequence::Transaction> sequence_transaction =
MakeRefCounted<Sequence>(TaskTraits())->BeginTransaction();
transaction->PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
sequence_transaction->PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
EXPECT_TRUE(transaction->TakeTask());
EXPECT_DCHECK_DEATH({ transaction->TakeTask(); });
EXPECT_TRUE(sequence_transaction->TakeTask());
EXPECT_DCHECK_DEATH({ sequence_transaction->TakeTask(); });
}
// Verify that a DCHECK fires if TakeTask() is called on an empty sequence.
TEST(TaskSchedulerSequenceTest, TakeEmptySequence) {
std::unique_ptr<Sequence::Transaction> transaction =
std::unique_ptr<Sequence::Transaction> sequence_transaction =
MakeRefCounted<Sequence>(TaskTraits())->BeginTransaction();
EXPECT_DCHECK_DEATH({ transaction->TakeTask(); });
EXPECT_DCHECK_DEATH({ sequence_transaction->TakeTask(); });
}
} // namespace internal
......
......@@ -219,8 +219,9 @@ bool TaskSchedulerImpl::PostDelayedTaskWithTraits(const Location& from_here,
OnceClosure task,
TimeDelta delay) {
// Post |task| as part of a one-off single-task Sequence.
const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits);
return PostTaskWithSequence(Task(from_here, std::move(task), delay),
MakeRefCounted<Sequence>(traits));
MakeRefCounted<Sequence>(new_traits));
}
scoped_refptr<TaskRunner> TaskSchedulerImpl::CreateTaskRunnerWithTraits(
......@@ -305,11 +306,13 @@ void TaskSchedulerImpl::SetExecutionFenceEnabled(bool execution_fence_enabled) {
task_tracker_->SetExecutionFenceEnabled(execution_fence_enabled);
}
void TaskSchedulerImpl::ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
DCHECK(sequence);
const TaskTraits new_traits =
SetUserBlockingPriorityIfNeeded(sequence->traits());
GetWorkerPoolForTraits(new_traits)->ReEnqueueSequence(std::move(sequence));
void TaskSchedulerImpl::ReEnqueueSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) {
DCHECK(sequence_transaction);
const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(
sequence_transaction->sequence()->traits());
GetWorkerPoolForTraits(new_traits)
->ReEnqueueSequence(std::move(sequence_transaction));
}
bool TaskSchedulerImpl::PostTaskWithSequence(Task task,
......@@ -319,27 +322,25 @@ bool TaskSchedulerImpl::PostTaskWithSequence(Task task,
CHECK(task.task);
DCHECK(sequence);
const TaskTraits new_traits =
SetUserBlockingPriorityIfNeeded(sequence->traits());
if (!task_tracker_->WillPostTask(&task, new_traits.shutdown_behavior()))
if (!task_tracker_->WillPostTask(&task,
sequence->traits().shutdown_behavior()))
return false;
if (task.delayed_run_time.is_null()) {
GetWorkerPoolForTraits(new_traits)
->PostTaskWithSequenceNow(std::move(task), std::move(sequence));
std::unique_ptr<Sequence::Transaction> sequence_transaction =
sequence->BeginTransaction();
GetWorkerPoolForTraits(sequence->traits())
->PostTaskWithSequenceNow(std::move(task),
std::move(sequence_transaction));
} else {
delayed_task_manager_.AddDelayedTask(
std::move(task),
BindOnce(
[](scoped_refptr<Sequence> sequence,
TaskSchedulerImpl* task_scheduler_impl, Task task) {
const TaskTraits new_traits =
task_scheduler_impl->SetUserBlockingPriorityIfNeeded(
sequence->traits());
task_scheduler_impl->GetWorkerPoolForTraits(new_traits)
task_scheduler_impl->GetWorkerPoolForTraits(sequence->traits())
->PostTaskWithSequenceNow(std::move(task),
std::move(sequence));
sequence->BeginTransaction());
},
std::move(sequence), Unretained(this)));
}
......
......@@ -106,7 +106,8 @@ class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler,
void ReportHeartbeatMetrics() const;
// SchedulerWorkerPool::Delegate:
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
void ReEnqueueSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction) override;
// SchedulerTaskRunnerDelegate:
bool PostTaskWithSequence(Task task,
......
......@@ -453,11 +453,11 @@ bool TaskTracker::WillPostTask(Task* task,
return true;
}
scoped_refptr<Sequence> TaskTracker::WillScheduleSequence(
scoped_refptr<Sequence> sequence,
std::unique_ptr<Sequence::Transaction> TaskTracker::WillScheduleSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction,
CanScheduleSequenceObserver* observer) {
DCHECK(sequence);
const SequenceSortKey sort_key = sequence->BeginTransaction()->GetSortKey();
DCHECK(sequence_transaction);
const SequenceSortKey sort_key = sequence_transaction->GetSortKey();
const int priority_index = static_cast<int>(sort_key.priority());
AutoSchedulerLock auto_lock(preemption_state_[priority_index].lock);
......@@ -465,7 +465,7 @@ scoped_refptr<Sequence> TaskTracker::WillScheduleSequence(
if (preemption_state_[priority_index].current_scheduled_sequences <
preemption_state_[priority_index].max_scheduled_sequences) {
++preemption_state_[priority_index].current_scheduled_sequences;
return sequence;
return sequence_transaction;
}
// It is convenient not to have to specify an observer when scheduling
......@@ -473,7 +473,8 @@ scoped_refptr<Sequence> TaskTracker::WillScheduleSequence(
DCHECK(observer);
preemption_state_[priority_index].preempted_sequences.emplace(
std::move(sequence), sort_key.next_task_sequenced_time(), observer);
sequence_transaction->sequence(), sort_key.next_task_sequenced_time(),
observer);
return nullptr;
}
......
......@@ -135,17 +135,18 @@ class BASE_EXPORT TaskTracker {
// metadata on |task| if desired.
bool WillPostTask(Task* task, TaskShutdownBehavior shutdown_behavior);
// Informs this TaskTracker that |sequence| is about to be scheduled. If this
// returns |sequence|, it is expected that RunAndPopNextTask() will soon be
// called with |sequence| as argument. Otherwise, RunAndPopNextTask() must not
// be called with |sequence| as argument until |observer| is notified that
// |sequence| can be scheduled (the caller doesn't need to keep a pointer to
// |sequence|; it will be included in the notification to |observer|).
// WillPostTask() must have allowed the task in front of |sequence| to be
// posted before this is called. |observer| is only required if the priority
// of |sequence| is TaskPriority::BEST_EFFORT
scoped_refptr<Sequence> WillScheduleSequence(
scoped_refptr<Sequence> sequence,
// Informs this TaskTracker that the Sequence locked by |sequence_transaction|
// is about to be scheduled. If this returns |sequence_transaction|, it is
// expected that RunAndPopNextTask() will soon be called with the Sequence as
// argument. Otherwise, RunAndPopNextTask() must not be called with the
// Sequence as argument until |observer| is notified that the Sequence can be
// scheduled (the caller doesn't need to keep a pointer to the Sequence; it
// will be included in the notification to |observer|). WillPostTask() must
// have allowed the task in front of the Sequence to be posted before this is
// called. |observer| is only required if the priority of the Sequence is
// TaskPriority::BEST_EFFORT.
std::unique_ptr<Sequence::Transaction> WillScheduleSequence(
std::unique_ptr<Sequence::Transaction> sequence_transaction,
CanScheduleSequenceObserver* observer);
// Runs the next task in |sequence| unless the current shutdown state prevents
......
......@@ -61,7 +61,8 @@ TEST_F(TaskSchedulerTaskTrackerPosixTest, RunTask) {
EXPECT_TRUE(tracker_.WillPostTask(&task, default_traits.shutdown_behavior()));
auto sequence = test::CreateSequenceWithTask(std::move(task), default_traits);
EXPECT_EQ(sequence, tracker_.WillScheduleSequence(sequence, nullptr));
EXPECT_TRUE(
tracker_.WillScheduleSequence(sequence->BeginTransaction(), nullptr));
// Expect RunAndPopNextTask to return nullptr since |sequence| is empty after
// popping a task from it.
EXPECT_FALSE(tracker_.RunAndPopNextTask(sequence, nullptr));
......@@ -85,7 +86,8 @@ TEST_F(TaskSchedulerTaskTrackerPosixTest, FileDescriptorWatcher) {
EXPECT_TRUE(tracker_.WillPostTask(&task, default_traits.shutdown_behavior()));
auto sequence = test::CreateSequenceWithTask(std::move(task), default_traits);
EXPECT_EQ(sequence, tracker_.WillScheduleSequence(sequence, nullptr));
EXPECT_TRUE(
tracker_.WillScheduleSequence(sequence->BeginTransaction(), nullptr));
// Expect RunAndPopNextTask to return nullptr since |sequence| is empty after
// popping a task from it.
EXPECT_FALSE(tracker_.RunAndPopNextTask(sequence, nullptr));
......
......@@ -75,19 +75,24 @@ bool MockSchedulerTaskRunnerDelegate::PostTaskWithSequence(
DCHECK(task.task);
DCHECK(sequence);
std::unique_ptr<Sequence::Transaction> sequence_transaction =
sequence->BeginTransaction();
if (!task_tracker_->WillPostTask(&task,
sequence->traits().shutdown_behavior()))
return false;
if (task.delayed_run_time.is_null()) {
worker_pool_->PostTaskWithSequenceNow(std::move(task), std::move(sequence));
worker_pool_->PostTaskWithSequenceNow(std::move(task),
std::move(sequence_transaction));
} else {
delayed_task_manager_->AddDelayedTask(
std::move(task), BindOnce(
[](scoped_refptr<Sequence> sequence,
SchedulerWorkerPool* worker_pool, Task task) {
worker_pool->PostTaskWithSequenceNow(
std::move(task), std::move(sequence));
std::move(task),
sequence->BeginTransaction());
},
std::move(sequence), worker_pool_));
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment