Commit 9fae0279 authored by Jesse McKenna's avatar Jesse McKenna Committed by Commit Bot

TaskScheduler: Move TaskTraits from Task class to Sequence class

Change-Id: I9094d0e239084cd77381949ae12cc7982a96e63c
Bug: 889029
Reviewed-on: https://chromium-review.googlesource.com/c/1250085
Commit-Queue: Jesse McKenna <jessemckenna@google.com>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#597295}
parent ad391c1e
......@@ -2460,7 +2460,6 @@ test("base_unittests") {
"task/task_scheduler/service_thread_unittest.cc",
"task/task_scheduler/task_scheduler_impl_unittest.cc",
"task/task_scheduler/task_tracker_unittest.cc",
"task/task_scheduler/task_unittest.cc",
"task/task_scheduler/test_task_factory.cc",
"task/task_scheduler/test_task_factory.h",
"task/task_scheduler/test_utils.cc",
......
......@@ -37,8 +37,7 @@ void RunTask(Task task) {
Task ConstructMockedTask(testing::StrictMock<MockTask>& mock_task,
TimeTicks now,
TimeDelta delay) {
Task task = Task(FROM_HERE, BindOnce(&MockTask::Run, Unretained(&mock_task)),
TaskTraits(), delay);
Task task(FROM_HERE, BindOnce(&MockTask::Run, Unretained(&mock_task)), delay);
// The constructor of Task computes |delayed_run_time| by adding |delay| to
// the real time. Recompute it by adding |delay| to the given |now| (usually
// mock time).
......
......@@ -57,28 +57,24 @@ class ThreadBeginningTransaction : public SimpleThread {
TEST(TaskSchedulerPriorityQueueTest, PushPopPeek) {
// Create test sequences.
scoped_refptr<Sequence> sequence_a(new Sequence);
sequence_a->PushTask(Task(FROM_HERE, DoNothing(),
TaskTraits(TaskPriority::USER_VISIBLE),
TimeDelta()));
scoped_refptr<Sequence> sequence_a =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::USER_VISIBLE));
sequence_a->PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
SequenceSortKey sort_key_a = sequence_a->GetSortKey();
scoped_refptr<Sequence> sequence_b(new Sequence);
sequence_b->PushTask(Task(FROM_HERE, DoNothing(),
TaskTraits(TaskPriority::USER_BLOCKING),
TimeDelta()));
scoped_refptr<Sequence> sequence_b =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::USER_BLOCKING));
sequence_b->PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
SequenceSortKey sort_key_b = sequence_b->GetSortKey();
scoped_refptr<Sequence> sequence_c(new Sequence);
sequence_c->PushTask(Task(FROM_HERE, DoNothing(),
TaskTraits(TaskPriority::USER_BLOCKING),
TimeDelta()));
scoped_refptr<Sequence> sequence_c =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::USER_BLOCKING));
sequence_c->PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
SequenceSortKey sort_key_c = sequence_c->GetSortKey();
scoped_refptr<Sequence> sequence_d(new Sequence);
sequence_d->PushTask(Task(FROM_HERE, DoNothing(),
TaskTraits(TaskPriority::BEST_EFFORT),
TimeDelta()));
scoped_refptr<Sequence> sequence_d =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::BEST_EFFORT));
sequence_d->PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
SequenceSortKey sort_key_d = sequence_d->GetSortKey();
// Create a PriorityQueue and a Transaction.
......
......@@ -16,6 +16,7 @@
#include "base/strings/stringprintf.h"
#include "base/synchronization/atomic_flag.h"
#include "base/task/task_scheduler/delayed_task_manager.h"
#include "base/task/task_scheduler/priority_queue.h"
#include "base/task/task_scheduler/scheduler_worker.h"
#include "base/task/task_scheduler/sequence.h"
#include "base/task/task_scheduler/task.h"
......@@ -102,24 +103,19 @@ class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
}
scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
AutoSchedulerLock auto_lock(sequence_lock_);
bool has_work = has_work_;
has_work_ = false;
return has_work ? sequence_ : nullptr;
std::unique_ptr<PriorityQueue::Transaction> transaction(
priority_queue_.BeginTransaction());
return transaction->IsEmpty() ? nullptr : transaction->PopSequence();
}
void DidRunTask() override {}
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
AutoSchedulerLock auto_lock(sequence_lock_);
// We've shut down, so no-op this work request. Any sequence cleanup will
// occur in the caller's context.
if (!sequence_)
return;
DCHECK_EQ(sequence, sequence_);
DCHECK(!has_work_);
has_work_ = true;
DCHECK(sequence);
const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
std::unique_ptr<PriorityQueue::Transaction> transaction(
priority_queue_.BeginTransaction());
transaction->Push(std::move(sequence), sequence_sort_key);
}
TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
......@@ -130,27 +126,7 @@ class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
}
void OnMainExit(SchedulerWorker* /* worker */) override {
// Move |sequence_| to |local_sequence| so that if we have the last
// reference to the sequence we don't destroy it (and its tasks) within
// |sequence_lock_|.
scoped_refptr<Sequence> local_sequence;
{
AutoSchedulerLock auto_lock(sequence_lock_);
// To reclaim skipped tasks on shutdown, we null out the sequence to allow
// the tasks to destroy themselves.
local_sequence = std::move(sequence_);
}
}
// SchedulerWorkerDelegate:
// Consumers should release their sequence reference as soon as possible to
// ensure timely cleanup for general shutdown.
scoped_refptr<Sequence> sequence() {
AutoSchedulerLock auto_lock(sequence_lock_);
return sequence_;
}
void OnMainExit(SchedulerWorker* /* worker */) override {}
private:
const std::string thread_name_;
......@@ -161,11 +137,7 @@ class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
// OnMainEntry() and OnCanScheduleSequence() (called when a sequence held up
// by WillScheduleSequence() in PostTaskNow() can be scheduled).
SchedulerWorker* worker_ = nullptr;
// Synchronizes access to |sequence_| and |has_work_|.
SchedulerLock sequence_lock_;
scoped_refptr<Sequence> sequence_ = new Sequence;
bool has_work_ = false;
PriorityQueue priority_queue_;
AtomicThreadRefChecker thread_ref_checker_;
......@@ -248,8 +220,9 @@ class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate {
DispatchMessage(&msg);
},
std::move(msg)),
TaskTraits(MayBlock()), TimeDelta());
if (task_tracker_->WillPostTask(&pump_message_task)) {
TimeDelta());
if (task_tracker_->WillPostTask(&pump_message_task,
TaskShutdownBehavior::SKIP_ON_SHUTDOWN)) {
bool was_empty =
message_pump_sequence_->PushTask(std::move(pump_message_task));
DCHECK(was_empty) << "GetWorkFromWindowsMessageQueue() does not expect "
......@@ -261,7 +234,8 @@ class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate {
}
bool get_work_first_ = true;
const scoped_refptr<Sequence> message_pump_sequence_ = new Sequence;
const scoped_refptr<Sequence> message_pump_sequence_ =
MakeRefCounted<Sequence>(TaskTraits(MayBlock()));
const TrackedRef<TaskTracker> task_tracker_;
std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
......@@ -283,9 +257,9 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
SchedulerWorker* worker,
SingleThreadTaskRunnerThreadMode thread_mode)
: outer_(outer),
traits_(traits),
worker_(worker),
thread_mode_(thread_mode) {
thread_mode_(thread_mode),
sequence_(MakeRefCounted<Sequence>(traits)) {
DCHECK(outer_);
DCHECK(worker_);
}
......@@ -297,11 +271,13 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
if (!g_manager_is_alive)
return false;
Task task(from_here, std::move(closure), traits_, delay);
Task task(from_here, std::move(closure), delay);
task.single_thread_task_runner_ref = this;
if (!outer_->task_tracker_->WillPostTask(&task))
if (!outer_->task_tracker_->WillPostTask(
&task, sequence_->traits().shutdown_behavior())) {
return false;
}
if (task.delayed_run_time.is_null()) {
PostTaskNow(std::move(task));
......@@ -354,18 +330,11 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
}
void PostTaskNow(Task task) {
scoped_refptr<Sequence> sequence = GetDelegate()->sequence();
// If |sequence| is null, then the thread is effectively gone (either
// shutdown or joined).
if (!sequence)
return;
const bool sequence_was_empty = sequence->PushTask(std::move(task));
const bool sequence_was_empty = sequence_->PushTask(std::move(task));
if (sequence_was_empty) {
sequence = outer_->task_tracker_->WillScheduleSequence(
std::move(sequence), GetDelegate());
if (sequence) {
GetDelegate()->ReEnqueueSequence(std::move(sequence));
if (outer_->task_tracker_->WillScheduleSequence(sequence_,
GetDelegate())) {
GetDelegate()->ReEnqueueSequence(sequence_);
worker_->WakeUp();
}
}
......@@ -376,9 +345,9 @@ class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
}
SchedulerSingleThreadTaskRunnerManager* const outer_;
const TaskTraits traits_;
SchedulerWorker* const worker_;
const SingleThreadTaskRunnerThreadMode thread_mode_;
const scoped_refptr<Sequence> sequence_;
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
};
......
......@@ -62,8 +62,8 @@ class SchedulerParallelTaskRunner : public TaskRunner {
// Post the task as part of a one-off single-task Sequence.
return worker_pool_->PostTaskWithSequence(
Task(from_here, std::move(closure), traits_, delay),
MakeRefCounted<Sequence>());
Task(from_here, std::move(closure), delay),
MakeRefCounted<Sequence>(traits_));
}
bool RunsTasksInCurrentSequence() const override {
......@@ -87,7 +87,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerSequencedTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
: traits_(traits), worker_pool_(worker_pool) {
: sequence_(MakeRefCounted<Sequence>(traits)), worker_pool_(worker_pool) {
DCHECK(worker_pool_);
}
......@@ -98,7 +98,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
if (!g_active_pools_count)
return false;
Task task(from_here, std::move(closure), traits_, delay);
Task task(from_here, std::move(closure), delay);
task.sequenced_task_runner_ref = this;
// Post the task as part of |sequence_|.
......@@ -120,9 +120,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
~SchedulerSequencedTaskRunner() override = default;
// Sequence for all Tasks posted through this TaskRunner.
const scoped_refptr<Sequence> sequence_ = MakeRefCounted<Sequence>();
const TaskTraits traits_;
const scoped_refptr<Sequence> sequence_;
SchedulerWorkerPool* const worker_pool_;
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
......@@ -145,8 +143,10 @@ bool SchedulerWorkerPool::PostTaskWithSequence(
DCHECK(task.task);
DCHECK(sequence);
if (!task_tracker_->WillPostTask(&task))
if (!task_tracker_->WillPostTask(&task,
sequence->traits().shutdown_behavior())) {
return false;
}
if (task.delayed_run_time.is_null()) {
PostTaskWithSequenceNow(std::move(task), std::move(sequence));
......
......@@ -180,13 +180,14 @@ class TaskSchedulerWorkerTest : public testing::TestWithParam<size_t> {
}
// Create a Sequence with TasksPerSequence() Tasks.
scoped_refptr<Sequence> sequence(new Sequence);
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits());
for (size_t i = 0; i < outer_->TasksPerSequence(); ++i) {
Task task(FROM_HERE,
BindOnce(&TaskSchedulerWorkerTest::RunTaskCallback,
Unretained(outer_)),
TaskTraits(), TimeDelta());
EXPECT_TRUE(outer_->task_tracker_.WillPostTask(&task));
TimeDelta());
EXPECT_TRUE(outer_->task_tracker_.WillPostTask(
&task, sequence->traits().shutdown_behavior()));
sequence->PushTask(std::move(task));
}
......@@ -440,7 +441,8 @@ class ControllableCleanupDelegate : public SchedulerWorkerDefaultDelegate {
}
controls_->work_requested_ = true;
scoped_refptr<Sequence> sequence(new Sequence);
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits(
WithBaseSyncPrimitives(), TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN));
Task task(
FROM_HERE,
BindOnce(
......@@ -450,9 +452,9 @@ class ControllableCleanupDelegate : public SchedulerWorkerDefaultDelegate {
},
Unretained(&controls_->work_processed_),
Unretained(&controls_->work_running_)),
{WithBaseSyncPrimitives(), TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN},
TimeDelta());
EXPECT_TRUE(task_tracker_->WillPostTask(&task));
EXPECT_TRUE(task_tracker_->WillPostTask(
&task, sequence->traits().shutdown_behavior()));
sequence->PushTask(std::move(task));
sequence =
task_tracker_->WillScheduleSequence(std::move(sequence), nullptr);
......
......@@ -6,13 +6,14 @@
#include <utility>
#include "base/critical_closure.h"
#include "base/logging.h"
#include "base/time/time.h"
namespace base {
namespace internal {
Sequence::Sequence() = default;
Sequence::Sequence(const TaskTraits& traits) : traits_(traits) {}
bool Sequence::PushTask(Task task) {
// Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
......@@ -21,8 +22,12 @@ bool Sequence::PushTask(Task task) {
DCHECK(task.sequenced_time.is_null());
task.sequenced_time = base::TimeTicks::Now();
task.task =
traits_.shutdown_behavior() == TaskShutdownBehavior::BLOCK_SHUTDOWN
? MakeCriticalClosure(std::move(task.task))
: std::move(task.task);
AutoSchedulerLock auto_lock(lock_);
++num_tasks_per_priority_[static_cast<int>(task.traits.priority())];
queue_.push(std::move(task));
// Return true if the sequence was empty before the push.
......@@ -34,10 +39,6 @@ Optional<Task> Sequence::TakeTask() {
DCHECK(!queue_.empty());
DCHECK(queue_.front().task);
const int priority_index = static_cast<int>(queue_.front().traits.priority());
DCHECK_GT(num_tasks_per_priority_[priority_index], 0U);
--num_tasks_per_priority_[priority_index];
return std::move(queue_.front());
}
......@@ -50,28 +51,17 @@ bool Sequence::Pop() {
}
SequenceSortKey Sequence::GetSortKey() const {
TaskPriority priority = TaskPriority::LOWEST;
base::TimeTicks next_task_sequenced_time;
{
AutoSchedulerLock auto_lock(lock_);
DCHECK(!queue_.empty());
// Find the highest task priority in the sequence.
const int highest_priority_index = static_cast<int>(TaskPriority::HIGHEST);
const int lowest_priority_index = static_cast<int>(TaskPriority::LOWEST);
for (int i = highest_priority_index; i > lowest_priority_index; --i) {
if (num_tasks_per_priority_[i] > 0) {
priority = static_cast<TaskPriority>(i);
break;
}
}
// Save the sequenced time of the next task in the sequence.
next_task_sequenced_time = queue_.front().sequenced_time;
}
return SequenceSortKey(priority, next_task_sequenced_time);
return SequenceSortKey(traits_.priority(), next_task_sequenced_time);
}
Sequence::~Sequence() = default;
......
......@@ -16,6 +16,7 @@
#include "base/task/task_scheduler/scheduler_lock.h"
#include "base/task/task_scheduler/sequence_sort_key.h"
#include "base/task/task_scheduler/task.h"
#include "base/task/task_traits.h"
#include "base/threading/sequence_local_storage_map.h"
namespace base {
......@@ -41,7 +42,8 @@ namespace internal {
// This class is thread-safe.
class BASE_EXPORT Sequence : public RefCountedThreadSafe<Sequence> {
public:
Sequence();
// |traits| is metadata that applies to all Tasks in the Sequence.
explicit Sequence(const TaskTraits& traits);
// Adds |task| in a new slot at the end of the Sequence. Returns true if the
// Sequence was empty before this operation.
......@@ -74,6 +76,9 @@ class BASE_EXPORT Sequence : public RefCountedThreadSafe<Sequence> {
return &sequence_local_storage_;
}
// Returns the TaskTraits for all Tasks in the Sequence.
TaskTraits traits() const { return traits_; }
private:
friend class RefCountedThreadSafe<Sequence>;
~Sequence();
......@@ -86,13 +91,12 @@ class BASE_EXPORT Sequence : public RefCountedThreadSafe<Sequence> {
// Queue of tasks to execute.
base::queue<Task> queue_;
// Number of tasks contained in the Sequence for each priority.
size_t num_tasks_per_priority_[static_cast<int>(TaskPriority::HIGHEST) + 1] =
{};
// Holds data stored through the SequenceLocalStorageSlot API.
SequenceLocalStorageMap sequence_local_storage_;
// The TaskTraits of all Tasks in the Sequence.
const TaskTraits traits_;
DISALLOW_COPY_AND_ASSIGN(Sequence);
};
......
......@@ -26,7 +26,7 @@ class MockTask {
Task CreateTask(MockTask* mock_task) {
return Task(FROM_HERE, BindOnce(&MockTask::Run, Unretained(mock_task)),
{TaskPriority::BEST_EFFORT}, TimeDelta());
TimeDelta());
}
void ExpectMockTask(MockTask* mock_task, Task* task) {
......@@ -44,7 +44,8 @@ TEST(TaskSchedulerSequenceTest, PushTakeRemove) {
testing::StrictMock<MockTask> mock_task_d;
testing::StrictMock<MockTask> mock_task_e;
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>();
scoped_refptr<Sequence> sequence =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::BEST_EFFORT));
// Push task A in the sequence. PushTask() should return true since it's the
// first task->
......@@ -94,12 +95,12 @@ TEST(TaskSchedulerSequenceTest, PushTakeRemove) {
EXPECT_TRUE(sequence->Pop());
}
// Verifies the sort key of a sequence that contains one BEST_EFFORT task.
// Verifies the sort key of a BEST_EFFORT sequence that contains one task.
TEST(TaskSchedulerSequenceTest, GetSortKeyBestEffort) {
// Create a sequence with a BEST_EFFORT task.
Task best_effort_task(FROM_HERE, DoNothing(), {TaskPriority::BEST_EFFORT},
TimeDelta());
scoped_refptr<Sequence> best_effort_sequence = MakeRefCounted<Sequence>();
// Create a BEST_EFFORT sequence with a task.
Task best_effort_task(FROM_HERE, DoNothing(), TimeDelta());
scoped_refptr<Sequence> best_effort_sequence =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::BEST_EFFORT));
best_effort_sequence->PushTask(std::move(best_effort_task));
// Get the sort key.
......@@ -120,12 +121,12 @@ TEST(TaskSchedulerSequenceTest, GetSortKeyBestEffort) {
}
// Same as TaskSchedulerSequenceTest.GetSortKeyBestEffort, but with a
// USER_VISIBLE task.
// USER_VISIBLE sequence.
TEST(TaskSchedulerSequenceTest, GetSortKeyForeground) {
// Create a sequence with a USER_VISIBLE task.
Task foreground_task(FROM_HERE, DoNothing(), {TaskPriority::USER_VISIBLE},
TimeDelta());
scoped_refptr<Sequence> foreground_sequence = MakeRefCounted<Sequence>();
// Create a USER_VISIBLE sequence with a task.
Task foreground_task(FROM_HERE, DoNothing(), TimeDelta());
scoped_refptr<Sequence> foreground_sequence =
MakeRefCounted<Sequence>(TaskTraits(TaskPriority::USER_VISIBLE));
foreground_sequence->PushTask(std::move(foreground_task));
// Get the sort key.
......@@ -147,8 +148,8 @@ TEST(TaskSchedulerSequenceTest, GetSortKeyForeground) {
// Verify that a DCHECK fires if Pop() is called on a sequence whose front slot
// isn't empty.
TEST(TaskSchedulerSequenceTest, PopNonEmptyFrontSlot) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>();
sequence->PushTask(Task(FROM_HERE, DoNothing(), TaskTraits(), TimeDelta()));
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits());
sequence->PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
EXPECT_DCHECK_DEATH({ sequence->Pop(); });
}
......@@ -156,8 +157,8 @@ TEST(TaskSchedulerSequenceTest, PopNonEmptyFrontSlot) {
// Verify that a DCHECK fires if TakeTask() is called on a sequence whose front
// slot is empty.
TEST(TaskSchedulerSequenceTest, TakeEmptyFrontSlot) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>();
sequence->PushTask(Task(FROM_HERE, DoNothing(), TaskTraits(), TimeDelta()));
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits());
sequence->PushTask(Task(FROM_HERE, DoNothing(), TimeDelta()));
EXPECT_TRUE(sequence->TakeTask());
EXPECT_DCHECK_DEATH({ sequence->TakeTask(); });
......@@ -165,7 +166,7 @@ TEST(TaskSchedulerSequenceTest, TakeEmptyFrontSlot) {
// Verify that a DCHECK fires if TakeTask() is called on an empty sequence.
TEST(TaskSchedulerSequenceTest, TakeEmptySequence) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>();
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(TaskTraits());
EXPECT_DCHECK_DEATH({ sequence->TakeTask(); });
}
......
......@@ -7,7 +7,6 @@
#include <utility>
#include "base/atomic_sequence_num.h"
#include "base/critical_closure.h"
namespace base {
namespace internal {
......@@ -18,25 +17,11 @@ AtomicSequenceNumber g_sequence_nums_for_tracing;
} // namespace
Task::Task(const Location& posted_from,
OnceClosure task,
const TaskTraits& traits,
TimeDelta delay)
: PendingTask(
posted_from,
traits.shutdown_behavior() == TaskShutdownBehavior::BLOCK_SHUTDOWN
? MakeCriticalClosure(std::move(task))
: std::move(task),
Task::Task(const Location& posted_from, OnceClosure task, TimeDelta delay)
: PendingTask(posted_from,
std::move(task),
delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay,
Nestable::kNonNestable),
// Prevent a delayed BLOCK_SHUTDOWN task from blocking shutdown before it
// starts running by changing its shutdown behavior to SKIP_ON_SHUTDOWN.
traits(
(!delay.is_zero() &&
traits.shutdown_behavior() == TaskShutdownBehavior::BLOCK_SHUTDOWN)
? TaskTraits::Override(traits,
{TaskShutdownBehavior::SKIP_ON_SHUTDOWN})
: traits),
delay(delay) {
// TaskScheduler doesn't use |sequence_num| but tracing (toplevel.flow) relies
// on it being unique. While this subtle dependency is a bit overreaching,
......@@ -51,7 +36,6 @@ Task::Task(const Location& posted_from,
// this case.
Task::Task(Task&& other) noexcept
: PendingTask(std::move(other)),
traits(other.traits),
delay(other.delay),
sequenced_time(other.sequenced_time),
sequenced_task_runner_ref(std::move(other.sequenced_task_runner_ref)),
......
......@@ -13,7 +13,6 @@
#include "base/pending_task.h"
#include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h"
#include "base/task/task_traits.h"
#include "base/time/time.h"
namespace base {
......@@ -23,13 +22,9 @@ namespace internal {
// profiling inherited from PendingTask.
struct BASE_EXPORT Task : public PendingTask {
// |posted_from| is the site the task was posted from. |task| is the closure
// to run. |traits_in| is metadata about the task. |delay| is a delay that
// must expire before the Task runs. If |delay| is non-zero and the shutdown
// behavior in |traits| is BLOCK_SHUTDOWN, the shutdown behavior is
// automatically adjusted to SKIP_ON_SHUTDOWN.
// to run. |delay| is a delay that must expire before the Task runs.
Task(const Location& posted_from,
OnceClosure task,
const TaskTraits& traits,
TimeDelta delay);
// Task is move-only to avoid mistakes that cause reference counts to be
......@@ -40,9 +35,6 @@ struct BASE_EXPORT Task : public PendingTask {
Task& operator=(Task&& other);
// The TaskTraits of this task.
TaskTraits traits;
// The delay that must expire before the task runs.
TimeDelta delay;
......
......@@ -184,9 +184,8 @@ bool TaskSchedulerImpl::PostDelayedTaskWithTraits(const Location& from_here,
// Post |task| as part of a one-off single-task Sequence.
const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits);
return GetWorkerPoolForTraits(new_traits)
->PostTaskWithSequence(
Task(from_here, std::move(task), new_traits, delay),
MakeRefCounted<Sequence>());
->PostTaskWithSequence(Task(from_here, std::move(task), delay),
MakeRefCounted<Sequence>(new_traits));
}
scoped_refptr<TaskRunner> TaskSchedulerImpl::CreateTaskRunnerWithTraits(
......
......@@ -158,6 +158,18 @@ int GetMaxNumScheduledBestEffortSequences() {
return std::numeric_limits<int>::max();
}
// Returns shutdown behavior based on |traits|; returns SKIP_ON_SHUTDOWN if
// shutdown behavior is BLOCK_SHUTDOWN and |is_delayed|, because delayed tasks
// are not allowed to block shutdown.
TaskShutdownBehavior GetEffectiveShutdownBehavior(const TaskTraits& traits,
bool is_delayed) {
const TaskShutdownBehavior shutdown_behavior = traits.shutdown_behavior();
if (shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN && is_delayed) {
return TaskShutdownBehavior::SKIP_ON_SHUTDOWN;
}
return shutdown_behavior;
}
} // namespace
// Atomic internal state used by TaskTracker. Sequential consistency shouldn't
......@@ -429,10 +441,13 @@ void TaskTracker::FlushAsyncForTesting(OnceClosure flush_callback) {
}
}
bool TaskTracker::WillPostTask(Task* task) {
bool TaskTracker::WillPostTask(Task* task,
TaskShutdownBehavior shutdown_behavior) {
DCHECK(task);
DCHECK(task->task);
if (!BeforePostTask(task->traits.shutdown_behavior()))
if (!BeforePostTask(GetEffectiveShutdownBehavior(shutdown_behavior,
!task->delay.is_zero())))
return false;
if (task->delayed_run_time.is_null())
......@@ -453,6 +468,7 @@ bool TaskTracker::WillPostTask(Task* task) {
scoped_refptr<Sequence> TaskTracker::WillScheduleSequence(
scoped_refptr<Sequence> sequence,
CanScheduleSequenceObserver* observer) {
DCHECK(sequence);
const SequenceSortKey sort_key = sequence->GetSortKey();
const int priority_index = static_cast<int>(sort_key.priority());
......@@ -483,22 +499,23 @@ scoped_refptr<Sequence> TaskTracker::RunAndPopNextTask(
// TODO(fdoray): Support TakeTask() returning null. https://crbug.com/783309
DCHECK(task);
const TaskShutdownBehavior shutdown_behavior =
task->traits.shutdown_behavior();
const TaskPriority task_priority = task->traits.priority();
const bool can_run_task = BeforeRunTask(shutdown_behavior);
const bool is_delayed = !task->delayed_run_time.is_null();
const TaskShutdownBehavior effective_shutdown_behavior =
GetEffectiveShutdownBehavior(sequence->traits().shutdown_behavior(),
!task->delay.is_zero());
const bool can_run_task = BeforeRunTask(effective_shutdown_behavior);
RunOrSkipTask(std::move(task.value()), sequence.get(), can_run_task);
if (can_run_task) {
IncrementNumTasksRun();
AfterRunTask(shutdown_behavior);
AfterRunTask(effective_shutdown_behavior);
}
if (!is_delayed)
if (task->delayed_run_time.is_null())
DecrementNumIncompleteUndelayedTasks();
const bool sequence_is_empty_after_pop = sequence->Pop();
const TaskPriority priority = sequence->traits().priority();
// Never reschedule a Sequence emptied by Pop(). The contract is such that
// next poster to make it non-empty is responsible to schedule it.
......@@ -508,7 +525,7 @@ scoped_refptr<Sequence> TaskTracker::RunAndPopNextTask(
// Allow |sequence| to be rescheduled only if its next task is set to run
// earlier than the earliest currently preempted sequence
return ManageSequencesAfterRunningTask(std::move(sequence), observer,
task_priority);
priority);
}
bool TaskTracker::HasShutdownStarted() const {
......@@ -572,17 +589,18 @@ void TaskTracker::IncrementNumTasksRun() {
void TaskTracker::RunOrSkipTask(Task task,
Sequence* sequence,
bool can_run_task) {
RecordLatencyHistogram(LatencyHistogramType::TASK_LATENCY, task.traits,
DCHECK(sequence);
RecordLatencyHistogram(LatencyHistogramType::TASK_LATENCY, sequence->traits(),
task.sequenced_time);
const bool previous_singleton_allowed =
ThreadRestrictions::SetSingletonAllowed(
task.traits.shutdown_behavior() !=
sequence->traits().shutdown_behavior() !=
TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN);
const bool previous_io_allowed =
ThreadRestrictions::SetIOAllowed(task.traits.may_block());
ThreadRestrictions::SetIOAllowed(sequence->traits().may_block());
const bool previous_wait_allowed = ThreadRestrictions::SetWaitAllowed(
task.traits.with_base_sync_primitives());
sequence->traits().with_base_sync_primitives());
{
const SequenceToken& sequence_token = sequence->token();
......@@ -590,7 +608,8 @@ void TaskTracker::RunOrSkipTask(Task task,
ScopedSetSequenceTokenForCurrentThread
scoped_set_sequence_token_for_current_thread(sequence_token);
ScopedSetTaskPriorityForCurrentThread
scoped_set_task_priority_for_current_thread(task.traits.priority());
scoped_set_task_priority_for_current_thread(
sequence->traits().priority());
ScopedSetSequenceLocalStorageMapForCurrentThread
scoped_set_sequence_local_storage_map_for_current_thread(
sequence->sequence_local_storage());
......@@ -620,7 +639,7 @@ void TaskTracker::RunOrSkipTask(Task task,
// http://crbug.com/652692 is resolved.
TRACE_EVENT1("task_scheduler", "TaskTracker::RunTask", "task_info",
std::make_unique<TaskTracingInfo>(
task.traits, execution_mode, sequence_token));
sequence->traits(), execution_mode, sequence_token));
{
// Put this in its own scope so it preceeds rather than overlaps with
......@@ -768,8 +787,9 @@ bool TaskTracker::HasIncompleteUndelayedTasksForTesting() const {
return subtle::Acquire_Load(&num_incomplete_undelayed_tasks_) != 0;
}
bool TaskTracker::BeforePostTask(TaskShutdownBehavior shutdown_behavior) {
if (shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
bool TaskTracker::BeforePostTask(
TaskShutdownBehavior effective_shutdown_behavior) {
if (effective_shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
// BLOCK_SHUTDOWN tasks block shutdown between the moment they are posted
// and the moment they complete their execution.
const bool shutdown_started = state_->IncrementNumTasksBlockingShutdown();
......@@ -819,8 +839,9 @@ bool TaskTracker::BeforePostTask(TaskShutdownBehavior shutdown_behavior) {
return !state_->HasShutdownStarted();
}
bool TaskTracker::BeforeRunTask(TaskShutdownBehavior shutdown_behavior) {
switch (shutdown_behavior) {
bool TaskTracker::BeforeRunTask(
TaskShutdownBehavior effective_shutdown_behavior) {
switch (effective_shutdown_behavior) {
case TaskShutdownBehavior::BLOCK_SHUTDOWN: {
// The number of tasks blocking shutdown has been incremented when the
// task was posted.
......@@ -863,9 +884,10 @@ bool TaskTracker::BeforeRunTask(TaskShutdownBehavior shutdown_behavior) {
return false;
}
void TaskTracker::AfterRunTask(TaskShutdownBehavior shutdown_behavior) {
if (shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN ||
shutdown_behavior == TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
void TaskTracker::AfterRunTask(
TaskShutdownBehavior effective_shutdown_behavior) {
if (effective_shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN ||
effective_shutdown_behavior == TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
const bool shutdown_started_and_no_tasks_block_shutdown =
state_->DecrementNumTasksBlockingShutdown();
if (shutdown_started_and_no_tasks_block_shutdown)
......
......@@ -129,10 +129,11 @@ class BASE_EXPORT TaskTracker {
// FlushAsyncForTesting() may be pending at any given time.
void FlushAsyncForTesting(OnceClosure flush_callback);
// Informs this TaskTracker that |task| is about to be posted. Returns true if
// this operation is allowed (|task| should be posted if-and-only-if it is).
// This method may also modify metadata on |task| if desired.
bool WillPostTask(Task* task);
// Informs this TaskTracker that |task| from a |shutdown_behavior| sequence
// is about to be posted. Returns true if this operation is allowed (|task|
// should be posted if-and-only-if it is). This method may also modify
// metadata on |task| if desired.
bool WillPostTask(Task* task, TaskShutdownBehavior shutdown_behavior);
// Informs this TaskTracker that |sequence| is about to be scheduled. If this
// returns |sequence|, it is expected that RunAndPopNextTask() will soon be
......@@ -292,17 +293,17 @@ class BASE_EXPORT TaskTracker {
// Called before WillPostTask() informs the tracing system that a task has
// been posted. Updates |num_tasks_blocking_shutdown_| if necessary and
// returns true if the current shutdown state allows the task to be posted.
bool BeforePostTask(TaskShutdownBehavior shutdown_behavior);
bool BeforePostTask(TaskShutdownBehavior effective_shutdown_behavior);
// Called before a task with |shutdown_behavior| is run by RunTask(). Updates
// |num_tasks_blocking_shutdown_| if necessary and returns true if the current
// shutdown state allows the task to be run.
bool BeforeRunTask(TaskShutdownBehavior shutdown_behavior);
// Called before a task with |effective_shutdown_behavior| is run by
// RunTask(). Updates |num_tasks_blocking_shutdown_| if necessary and returns
// true if the current shutdown state allows the task to be run.
bool BeforeRunTask(TaskShutdownBehavior effective_shutdown_behavior);
// Called after a task with |shutdown_behavior| has been run by RunTask().
// Updates |num_tasks_blocking_shutdown_| and signals |shutdown_cv_| if
// necessary.
void AfterRunTask(TaskShutdownBehavior shutdown_behavior);
// Called after a task with |effective_shutdown_behavior| has been run by
// RunTask(). Updates |num_tasks_blocking_shutdown_| and signals
// |shutdown_cv_| if necessary.
void AfterRunTask(TaskShutdownBehavior effective_shutdown_behavior);
// Called when the number of tasks blocking shutdown becomes zero after
// shutdown has started.
......
......@@ -56,11 +56,12 @@ TEST_F(TaskSchedulerTaskTrackerPosixTest, RunTask) {
bool did_run = false;
Task task(FROM_HERE,
Bind([](bool* did_run) { *did_run = true; }, Unretained(&did_run)),
TaskTraits(), TimeDelta());
TimeDelta());
TaskTraits default_traits = {};
EXPECT_TRUE(tracker_.WillPostTask(&task));
EXPECT_TRUE(tracker_.WillPostTask(&task, default_traits.shutdown_behavior()));
auto sequence = test::CreateSequenceWithTask(std::move(task));
auto sequence = test::CreateSequenceWithTask(std::move(task), default_traits);
EXPECT_EQ(sequence, tracker_.WillScheduleSequence(sequence, nullptr));
// Expect RunAndPopNextTask to return nullptr since |sequence| is empty after
// popping a task from it.
......@@ -77,13 +78,14 @@ TEST_F(TaskSchedulerTaskTrackerPosixTest, FileDescriptorWatcher) {
Task task(FROM_HERE,
Bind(IgnoreResult(&FileDescriptorWatcher::WatchReadable), fds[0],
DoNothing()),
TaskTraits(), TimeDelta());
TimeDelta());
TaskTraits default_traits = {};
// FileDescriptorWatcher::WatchReadable needs a SequencedTaskRunnerHandle.
task.sequenced_task_runner_ref = MakeRefCounted<NullTaskRunner>();
EXPECT_TRUE(tracker_.WillPostTask(&task));
EXPECT_TRUE(tracker_.WillPostTask(&task, default_traits.shutdown_behavior()));
auto sequence = test::CreateSequenceWithTask(std::move(task));
auto sequence = test::CreateSequenceWithTask(std::move(task), default_traits);
EXPECT_EQ(sequence, tracker_.WillScheduleSequence(sequence, nullptr));
// Expect RunAndPopNextTask to return nullptr since |sequence| is empty after
// popping a task from it.
......
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/task_scheduler/task.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/task/task_traits.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
namespace internal {
// Verify that the shutdown behavior of a BLOCK_SHUTDOWN delayed task is
// adjusted to SKIP_ON_SHUTDOWN. The shutown behavior of other delayed tasks
// should not change.
TEST(TaskSchedulerTaskTest, ShutdownBehaviorChangeWithDelay) {
Task continue_on_shutdown(FROM_HERE, DoNothing(),
{TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN},
TimeDelta::FromSeconds(1));
EXPECT_EQ(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN,
continue_on_shutdown.traits.shutdown_behavior());
Task skip_on_shutdown(FROM_HERE, DoNothing(),
{TaskShutdownBehavior::SKIP_ON_SHUTDOWN},
TimeDelta::FromSeconds(1));
EXPECT_EQ(TaskShutdownBehavior::SKIP_ON_SHUTDOWN,
skip_on_shutdown.traits.shutdown_behavior());
Task block_shutdown(FROM_HERE, DoNothing(),
{TaskShutdownBehavior::BLOCK_SHUTDOWN},
TimeDelta::FromSeconds(1));
EXPECT_EQ(TaskShutdownBehavior::SKIP_ON_SHUTDOWN,
block_shutdown.traits.shutdown_behavior());
}
// Verify that the shutdown behavior of undelayed tasks is not adjusted.
TEST(TaskSchedulerTaskTest, NoShutdownBehaviorChangeNoDelay) {
Task continue_on_shutdown(FROM_HERE, DoNothing(),
{TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN},
TimeDelta());
EXPECT_EQ(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN,
continue_on_shutdown.traits.shutdown_behavior());
Task skip_on_shutdown(FROM_HERE, DoNothing(),
{TaskShutdownBehavior::SKIP_ON_SHUTDOWN}, TimeDelta());
EXPECT_EQ(TaskShutdownBehavior::SKIP_ON_SHUTDOWN,
skip_on_shutdown.traits.shutdown_behavior());
Task block_shutdown(FROM_HERE, DoNothing(),
{TaskShutdownBehavior::BLOCK_SHUTDOWN}, TimeDelta());
EXPECT_EQ(TaskShutdownBehavior::BLOCK_SHUTDOWN,
block_shutdown.traits.shutdown_behavior());
}
} // namespace internal
} // namespace base
......@@ -16,8 +16,9 @@ namespace test {
MockSchedulerWorkerObserver::MockSchedulerWorkerObserver() = default;
MockSchedulerWorkerObserver::~MockSchedulerWorkerObserver() = default;
scoped_refptr<Sequence> CreateSequenceWithTask(Task task) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>();
scoped_refptr<Sequence> CreateSequenceWithTask(Task task,
const TaskTraits& traits) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(traits);
sequence->PushTask(std::move(task));
return sequence;
}
......
......@@ -8,6 +8,7 @@
#include "base/memory/ref_counted.h"
#include "base/task/task_scheduler/scheduler_worker_observer.h"
#include "base/task/task_scheduler/sequence.h"
#include "base/task/task_traits.h"
#include "base/task_runner.h"
#include "testing/gmock/include/gmock/gmock.h"
......@@ -35,8 +36,10 @@ class MockSchedulerWorkerObserver : public SchedulerWorkerObserver {
// parametrize relevant task_scheduler tests.
enum class ExecutionMode { PARALLEL, SEQUENCED, SINGLE_THREADED };
// Creates a Sequence and pushes |task| to it. Returns that sequence.
scoped_refptr<Sequence> CreateSequenceWithTask(Task task);
// Creates a Sequence with given |traits| and pushes |task| to it. Returns that
// Sequence.
scoped_refptr<Sequence> CreateSequenceWithTask(Task task,
const TaskTraits& traits);
// Creates a TaskRunner that posts tasks to |worker_pool| with the
// |execution_mode| execution mode and the WithBaseSyncPrimitives() trait.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment