Commit 6d22abcc authored by Alex Clarke's avatar Alex Clarke Committed by Commit Bot

TaskQueueImpl: Make sequence_manager const

This simplification is a step towards a lock-free main thread PostTask path.

Split off from: https://chromium-review.googlesource.com/c/chromium/src/+/1304557

Bug: 897751
Change-Id: I6aa2fd98b000b634785f2fee94afa3b11e70423d
Reviewed-on: https://chromium-review.googlesource.com/c/1317568
Commit-Queue: Alex Clarke <alexclarke@chromium.org>
Reviewed-by: default avatarAlexander Timin <altimin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#605311}
parent 2660cad6
......@@ -3349,12 +3349,15 @@ TEST_P(SequenceManagerTest, TaskQueueTaskRunnerDetach) {
// Create without a sequence manager.
std::unique_ptr<TimeDomain> time_domain =
std::make_unique<internal::RealTimeDomain>();
std::make_unique<MockTimeDomain>(TimeTicks());
std::unique_ptr<TaskQueueImpl> queue2 = std::make_unique<TaskQueueImpl>(
nullptr, time_domain.get(), TaskQueue::Spec("stub"));
scoped_refptr<SingleThreadTaskRunner> task_runner2 =
queue2->CreateTaskRunner(0);
EXPECT_FALSE(task_runner2->PostTask(FROM_HERE, BindOnce(&NopTask)));
// Tidy up.
queue2->UnregisterTaskQueue();
}
TEST_P(SequenceManagerTest, DestructorPostChainDuringShutdown) {
......
......@@ -47,11 +47,12 @@ TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
TimeDomain* time_domain,
const TaskQueue::Spec& spec)
: name_(spec.name),
sequence_manager_(sequence_manager),
associated_thread_(sequence_manager
? sequence_manager->associated_thread()
: AssociatedThreadId::CreateBound()),
any_thread_(sequence_manager, time_domain),
main_thread_only_(sequence_manager, this, time_domain),
any_thread_(time_domain),
main_thread_only_(this, time_domain),
proxy_(MakeRefCounted<TaskQueueProxy>(this, associated_thread_)),
should_monitor_quiescence_(spec.should_monitor_quiescence),
should_notify_observers_(spec.should_notify_observers),
......@@ -70,23 +71,19 @@ TaskQueueImpl::~TaskQueueImpl() {
// contains a strong reference to this TaskQueueImpl and the
// SequenceManagerImpl destructor calls UnregisterTaskQueue on all task
// queues.
DCHECK(!any_thread().sequence_manager)
DCHECK(any_thread().unregistered)
<< "UnregisterTaskQueue must be called first!";
#endif
}
TaskQueueImpl::AnyThread::AnyThread(SequenceManagerImpl* sequence_manager,
TimeDomain* time_domain)
: sequence_manager(sequence_manager), time_domain(time_domain) {}
TaskQueueImpl::AnyThread::AnyThread(TimeDomain* time_domain)
: time_domain(time_domain) {}
TaskQueueImpl::AnyThread::~AnyThread() = default;
TaskQueueImpl::MainThreadOnly::MainThreadOnly(
SequenceManagerImpl* sequence_manager,
TaskQueueImpl* task_queue,
TimeDomain* time_domain)
: sequence_manager(sequence_manager),
time_domain(time_domain),
TaskQueueImpl::MainThreadOnly::MainThreadOnly(TaskQueueImpl* task_queue,
TimeDomain* time_domain)
: time_domain(time_domain),
delayed_work_queue(
new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)),
immediate_work_queue(new WorkQueue(task_queue,
......@@ -119,15 +116,12 @@ void TaskQueueImpl::UnregisterTaskQueue() {
if (main_thread_only().time_domain)
main_thread_only().time_domain->UnregisterQueue(this);
if (!any_thread().sequence_manager)
return;
any_thread().unregistered = true;
main_thread_only().on_task_completed_handler = OnTaskCompletedHandler();
any_thread().time_domain = nullptr;
main_thread_only().time_domain = nullptr;
any_thread().sequence_manager = nullptr;
main_thread_only().sequence_manager = nullptr;
any_thread().on_next_wake_up_changed_callback =
OnNextWakeUpChangedCallback();
main_thread_only().on_next_wake_up_changed_callback =
......@@ -182,10 +176,8 @@ void TaskQueueImpl::PostImmediateTaskImpl(PostedTask task) {
// for details.
CHECK(task.callback);
AutoLock lock(any_thread_lock_);
DCHECK(any_thread().sequence_manager);
EnqueueOrder sequence_number =
any_thread().sequence_manager->GetNextSequenceNumber();
EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
PushOntoImmediateIncomingQueueLocked(Task(
std::move(task),
......@@ -212,10 +204,7 @@ void TaskQueueImpl::PostDelayedTaskImpl(PostedTask task,
if (current_thread == CurrentThread::kMainThread) {
// Lock-free fast path for delayed tasks posted from the main thread.
DCHECK(main_thread_only().sequence_manager);
EnqueueOrder sequence_number =
main_thread_only().sequence_manager->GetNextSequenceNumber();
EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
TimeTicks time_domain_now = main_thread_only().time_domain->Now();
TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay;
......@@ -229,10 +218,7 @@ void TaskQueueImpl::PostDelayedTaskImpl(PostedTask task,
// because it causes two main thread tasks to be run. Should this
// assumption prove to be false in future, we may need to revisit this.
AutoLock lock(any_thread_lock_);
DCHECK(any_thread().sequence_manager);
EnqueueOrder sequence_number =
any_thread().sequence_manager->GetNextSequenceNumber();
EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
TimeTicks time_domain_now = any_thread().time_domain->Now();
TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay;
......@@ -247,7 +233,7 @@ void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
TimeTicks now,
bool notify_task_annotator) {
if (notify_task_annotator)
main_thread_only().sequence_manager->WillQueueTask(&pending_task);
sequence_manager_->WillQueueTask(&pending_task);
main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
LazyNow lazy_now(now);
......@@ -257,10 +243,10 @@ void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
}
void TaskQueueImpl::PushOntoDelayedIncomingQueueLocked(Task pending_task) {
any_thread().sequence_manager->WillQueueTask(&pending_task);
sequence_manager_->WillQueueTask(&pending_task);
EnqueueOrder thread_hop_task_sequence_number =
any_thread().sequence_manager->GetNextSequenceNumber();
sequence_manager_->GetNextSequenceNumber();
// TODO(altimin): Add a copy method to Task to capture metadata here.
PushOntoImmediateIncomingQueueLocked(
Task(PostedTask(BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
......@@ -303,7 +289,7 @@ void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(Task task) {
{
AutoLock lock(immediate_incoming_queue_lock_);
was_immediate_incoming_queue_empty = immediate_incoming_queue().empty();
any_thread().sequence_manager->WillQueueTask(&task);
sequence_manager_->WillQueueTask(&task);
immediate_incoming_queue().push_back(std::move(task));
}
......@@ -313,8 +299,8 @@ void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(Task task) {
bool queue_is_blocked =
RunsTasksInCurrentSequence() &&
(!IsQueueEnabled() || main_thread_only().current_fence);
any_thread().sequence_manager->OnQueueHasIncomingImmediateWork(
this, sequence_number, queue_is_blocked);
sequence_manager_->OnQueueHasIncomingImmediateWork(this, sequence_number,
queue_is_blocked);
if (!any_thread().on_next_wake_up_changed_callback.is_null())
any_thread().on_next_wake_up_changed_callback.Run(desired_run_time);
}
......@@ -433,8 +419,7 @@ void TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) {
break;
ActivateDelayedFenceIfNeeded(task.delayed_run_time);
DCHECK(!task.enqueue_order_set());
task.set_enqueue_order(
main_thread_only().sequence_manager->GetNextSequenceNumber());
task.set_enqueue_order(sequence_manager_->GetNextSequenceNumber());
main_thread_only().delayed_work_queue->Push(std::move(task));
main_thread_only().delayed_incoming_queue.pop();
......@@ -443,8 +428,7 @@ void TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) {
// delayed tasks). Ensure that there is a DoWork posting. No-op inside
// existing DoWork due to DoWork deduplication.
if (IsQueueEnabled() || !main_thread_only().current_fence) {
main_thread_only().sequence_manager->MaybeScheduleImmediateWork(
FROM_HERE);
sequence_manager_->MaybeScheduleImmediateWork(FROM_HERE);
}
}
......@@ -472,11 +456,10 @@ void TaskQueueImpl::TraceQueueSize() const {
}
void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) {
if (!main_thread_only().sequence_manager || priority == GetQueuePriority())
if (priority == GetQueuePriority())
return;
main_thread_only()
.sequence_manager->main_thread_only()
.selector.SetQueuePriority(this, priority);
sequence_manager_->main_thread_only().selector.SetQueuePriority(this,
priority);
}
TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const {
......@@ -491,7 +474,7 @@ void TaskQueueImpl::AsValueInto(TimeTicks now,
AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
state->BeginDictionary();
state->SetString("name", GetName());
if (!main_thread_only().sequence_manager) {
if (any_thread().unregistered) {
state->SetBoolean("unregistered", true);
state->EndDictionary();
return;
......@@ -590,12 +573,8 @@ void TaskQueueImpl::SetTimeDomain(TimeDomain* time_domain) {
{
AutoLock lock(any_thread_lock_);
DCHECK(time_domain);
// NOTE this is similar to checking |any_thread().sequence_manager| but
// the TaskQueueSelectorTests constructs TaskQueueImpl directly with a null
// sequence_manager. Instead we check |any_thread().time_domain| which is
// another way of asserting that UnregisterTaskQueue has not been called.
DCHECK(any_thread().time_domain);
if (!any_thread().time_domain)
DCHECK(!any_thread().unregistered);
if (any_thread().unregistered)
return;
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
if (time_domain == main_thread_only().time_domain)
......@@ -630,17 +609,13 @@ void TaskQueueImpl::SetBlameContext(trace_event::BlameContext* blame_context) {
}
void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
if (!main_thread_only().sequence_manager)
return;
// Only one fence may be present at a time.
main_thread_only().delayed_fence = nullopt;
EnqueueOrder previous_fence = main_thread_only().current_fence;
EnqueueOrder current_fence =
position == TaskQueue::InsertFencePosition::kNow
? main_thread_only().sequence_manager->GetNextSequenceNumber()
: EnqueueOrder::blocking_fence();
EnqueueOrder current_fence = position == TaskQueue::InsertFencePosition::kNow
? sequence_manager_->GetNextSequenceNumber()
: EnqueueOrder::blocking_fence();
// Tasks posted after this point will have a strictly higher enqueue order
// and will be blocked from running.
......@@ -659,9 +634,8 @@ void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
}
}
if (IsQueueEnabled() && task_unblocked) {
main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE);
}
if (IsQueueEnabled() && task_unblocked)
sequence_manager_->MaybeScheduleImmediateWork(FROM_HERE);
}
void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
......@@ -675,9 +649,6 @@ void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
}
void TaskQueueImpl::RemoveFence() {
if (!main_thread_only().sequence_manager)
return;
EnqueueOrder previous_fence = main_thread_only().current_fence;
main_thread_only().current_fence = EnqueueOrder::none();
main_thread_only().delayed_fence = nullopt;
......@@ -693,9 +664,8 @@ void TaskQueueImpl::RemoveFence() {
}
}
if (IsQueueEnabled() && task_unblocked) {
main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE);
}
if (IsQueueEnabled() && task_unblocked)
sequence_manager_->MaybeScheduleImmediateWork(FROM_HERE);
}
bool TaskQueueImpl::BlockedByFence() const {
......@@ -840,7 +810,8 @@ void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) {
}
void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) {
if (!main_thread_only().sequence_manager)
// |sequence_manager_| can be null in tests.
if (!sequence_manager_)
return;
LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow();
......@@ -855,13 +826,9 @@ void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) {
// Note the selector calls SequenceManager::OnTaskQueueEnabled which posts
// a DoWork if needed.
main_thread_only()
.sequence_manager->main_thread_only()
.selector.EnableQueue(this);
sequence_manager_->main_thread_only().selector.EnableQueue(this);
} else {
main_thread_only()
.sequence_manager->main_thread_only()
.selector.DisableQueue(this);
sequence_manager_->main_thread_only().selector.DisableQueue(this);
}
}
......@@ -876,8 +843,7 @@ TaskQueueImpl::CreateQueueEnabledVoter(scoped_refptr<TaskQueue> task_queue) {
void TaskQueueImpl::SweepCanceledDelayedTasks(TimeTicks now) {
if (main_thread_only().delayed_incoming_queue.empty())
return;
const SequenceManagerImpl* sequence_manager =
main_thread_only().sequence_manager;
const SequenceManagerImpl* sequence_manager = sequence_manager_;
main_thread_only().delayed_incoming_queue.SweepCancelledTasks(
sequence_manager);
......@@ -995,14 +961,13 @@ bool TaskQueueImpl::RequiresTaskTiming() const {
bool TaskQueueImpl::IsUnregistered() const {
AutoLock lock(any_thread_lock_);
return !any_thread().sequence_manager;
return any_thread().unregistered;
}
WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() {
return main_thread_only().sequence_manager->GetWeakPtr();
return sequence_manager_->GetWeakPtr();
}
void TaskQueueImpl::SetQueueEnabledForTest(bool enabled) {
main_thread_only().is_enabled_for_test = enabled;
EnableOrDisableWithSelector(IsQueueEnabled());
......@@ -1051,12 +1016,6 @@ bool TaskQueueImpl::HasTasks() const {
return false;
}
void TaskQueueImpl::ClearSequenceManagerForTesting() {
AutoLock lock(any_thread_lock_);
any_thread().sequence_manager = nullptr;
main_thread_only().sequence_manager = nullptr;
}
TaskQueueImpl::DelayedIncomingQueue::DelayedIncomingQueue() = default;
TaskQueueImpl::DelayedIncomingQueue::~DelayedIncomingQueue() = default;
......
......@@ -231,9 +231,8 @@ class BASE_EXPORT TaskQueueImpl {
bool RequiresTaskTiming() const;
WeakPtr<SequenceManagerImpl> GetSequenceManagerWeakPtr();
SequenceManagerImpl* sequence_manager() {
return main_thread_only().sequence_manager;
}
SequenceManagerImpl* sequence_manager() const { return sequence_manager_; }
// Returns true if this queue is unregistered or task queue manager is deleted
// and this queue can be safely deleted on any thread.
......@@ -250,9 +249,6 @@ class BASE_EXPORT TaskQueueImpl {
// constructed due to not having TaskQueue.
void SetQueueEnabledForTest(bool enabled);
// TODO(alexclarke): Remove when possible.
void ClearSequenceManagerForTesting();
protected:
void SetDelayedWakeUpForTesting(Optional<DelayedWakeUp> wake_up);
......@@ -261,17 +257,17 @@ class BASE_EXPORT TaskQueueImpl {
friend class WorkQueueTest;
struct AnyThread {
AnyThread(SequenceManagerImpl* sequence_manager, TimeDomain* time_domain);
explicit AnyThread(TimeDomain* time_domain);
~AnyThread();
// SequenceManagerImpl, TimeDomain and Observer are maintained in two
// copies: inside AnyThread and inside MainThreadOnly. They can be changed
// only from main thread, so it should be locked before accessing from other
// threads.
SequenceManagerImpl* sequence_manager;
// TimeDomain and Observer are maintained in two copies: inside AnyThread
// and inside MainThreadOnly. They can be changed only from main thread, so
// it should be locked before accessing from other threads.
TimeDomain* time_domain;
// Callback corresponding to TaskQueue::Observer::OnQueueNextChanged.
OnNextWakeUpChangedCallback on_next_wake_up_changed_callback;
bool unregistered = false;
};
// A queue for holding delayed tasks before their delay has expired.
......@@ -304,15 +300,12 @@ class BASE_EXPORT TaskQueueImpl {
};
struct MainThreadOnly {
MainThreadOnly(SequenceManagerImpl* sequence_manager,
TaskQueueImpl* task_queue,
TimeDomain* time_domain);
MainThreadOnly(TaskQueueImpl* task_queue, TimeDomain* time_domain);
~MainThreadOnly();
// Another copy of SequenceManagerImpl, TimeDomain and Observer
// for lock-free access from the main thread.
// See description inside struct AnyThread for details.
SequenceManagerImpl* sequence_manager;
TimeDomain* time_domain;
// Callback corresponding to TaskQueue::Observer::OnQueueNextChanged.
OnNextWakeUpChangedCallback on_next_wake_up_changed_callback;
......@@ -390,6 +383,7 @@ class BASE_EXPORT TaskQueueImpl {
void ActivateDelayedFenceIfNeeded(TimeTicks now);
const char* name_;
SequenceManagerImpl* const sequence_manager_;
scoped_refptr<AssociatedThreadId> associated_thread_;
......
......@@ -186,7 +186,7 @@ TEST_F(TimeDomainTest, SetNextDelayedDoWork_OnlyCalledForEarlierTasks) {
}
TEST_F(TimeDomainTest, UnregisterQueue) {
std::unique_ptr<TaskQueueImplForTest> task_queue2_ =
std::unique_ptr<TaskQueueImplForTest> task_queue2 =
std::make_unique<TaskQueueImplForTest>(nullptr, time_domain_.get(),
TaskQueue::Spec("test"));
......@@ -196,8 +196,7 @@ TEST_F(TimeDomainTest, UnregisterQueue) {
EXPECT_CALL(*time_domain_.get(), SetNextDelayedDoWork(_, wake_up1)).Times(1);
task_queue_->SetDelayedWakeUpForTesting(internal::DelayedWakeUp{wake_up1, 0});
TimeTicks wake_up2 = now + TimeDelta::FromMilliseconds(100);
task_queue2_->SetDelayedWakeUpForTesting(
internal::DelayedWakeUp{wake_up2, 0});
task_queue2->SetDelayedWakeUpForTesting(internal::DelayedWakeUp{wake_up2, 0});
EXPECT_EQ(task_queue_.get(), time_domain_->NextScheduledTaskQueue());
......@@ -206,16 +205,21 @@ TEST_F(TimeDomainTest, UnregisterQueue) {
EXPECT_CALL(*time_domain_.get(), SetNextDelayedDoWork(_, wake_up2)).Times(1);
time_domain_->UnregisterQueue(task_queue_.get());
task_queue_ = std::unique_ptr<TaskQueueImplForTest>();
EXPECT_EQ(task_queue2_.get(), time_domain_->NextScheduledTaskQueue());
EXPECT_EQ(task_queue2.get(), time_domain_->NextScheduledTaskQueue());
task_queue_->UnregisterTaskQueue();
task_queue_ = nullptr;
testing::Mock::VerifyAndClearExpectations(time_domain_.get());
EXPECT_CALL(*time_domain_.get(), SetNextDelayedDoWork(_, TimeTicks::Max()))
.Times(1);
time_domain_->UnregisterQueue(task_queue2_.get());
time_domain_->UnregisterQueue(task_queue2.get());
EXPECT_FALSE(time_domain_->NextScheduledTaskQueue());
task_queue2->UnregisterTaskQueue();
task_queue2 = nullptr;
}
TEST_F(TimeDomainTest, WakeUpReadyDelayedQueues) {
......@@ -369,6 +373,10 @@ TEST_F(TimeDomainTest, HighResolutionWakeUps) {
time_domain_->SetNextWakeUpForQueue(
&q1, nullopt, internal::WakeUpResolution::kLow, &lazy_now);
EXPECT_FALSE(time_domain_->HasPendingHighResolutionTasks());
// Tidy up.
q1.UnregisterTaskQueue();
q2.UnregisterTaskQueue();
}
} // namespace sequence_manager
......
......@@ -36,7 +36,11 @@ class WorkQueueTest : public testing::Test {
public:
void SetUp() override {
dummy_sequence_manager_ = SequenceManagerImpl::CreateUnbound(nullptr);
scoped_refptr<AssociatedThreadId> thread_checker =
dummy_sequence_manager_->associated_thread();
thread_checker->BindToCurrentThread();
time_domain_.reset(new RealTimeDomain());
dummy_sequence_manager_->RegisterTimeDomain(time_domain_.get());
task_queue_ = std::make_unique<TaskQueueImpl>(dummy_sequence_manager_.get(),
time_domain_.get(),
TaskQueue::Spec("test"));
......@@ -49,8 +53,8 @@ class WorkQueueTest : public testing::Test {
void TearDown() override {
work_queue_sets_->RemoveQueue(work_queue_.get());
task_queue_->ClearSequenceManagerForTesting();
task_queue_->UnregisterTaskQueue();
dummy_sequence_manager_->UnregisterTimeDomain(time_domain_.get());
}
protected:
......
......@@ -206,6 +206,7 @@ void WorkerThreadScheduler::Shutdown() {
"WorkerThread.Runtime", delta, base::TimeDelta::FromSeconds(1),
base::TimeDelta::FromDays(1), 50 /* bucket count */);
task_queue_throttler_.reset();
idle_helper_.Shutdown();
helper()->Shutdown();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment