Commit 65143c3c authored by Jeffrey He's avatar Jeffrey He Committed by Commit Bot

Protect |workers_| under the |idle_workers_stack_lock_|

The SchedulerWorkerPoolImpl in TaskScheduler will be changed to have a
dynamic number of SchedulerWorkers, so |workers_| can be read and 
written to from multiple threads. The |idle_workers_stack_lock_|
will additionally cover |workers_| and be renamed to |lock_|.

Bug: 738104
Change-Id: I23a49e18253a4a9a375be790be5a6735cb844323
Reviewed-on: https://chromium-review.googlesource.com/556341
Commit-Queue: Jeffrey He <jeffreyhe@google.com>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Reviewed-by: default avatarRobert Liao <robliao@chromium.org>
Cr-Commit-Position: refs/heads/master@{#486074}
parent 308c483c
...@@ -193,9 +193,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( ...@@ -193,9 +193,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
DelayedTaskManager* delayed_task_manager) DelayedTaskManager* delayed_task_manager)
: name_(name), : name_(name),
priority_hint_(priority_hint), priority_hint_(priority_hint),
idle_workers_stack_lock_(shared_priority_queue_.container_lock()), lock_(shared_priority_queue_.container_lock()),
idle_workers_stack_cv_for_testing_( idle_workers_stack_cv_for_testing_(lock_.CreateConditionVariable()),
idle_workers_stack_lock_.CreateConditionVariable()),
join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED), WaitableEvent::InitialState::NOT_SIGNALED),
// Mimics the UMA_HISTOGRAM_LONG_TIMES macro. // Mimics the UMA_HISTOGRAM_LONG_TIMES macro.
...@@ -233,8 +232,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( ...@@ -233,8 +232,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) { void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) {
suggested_reclaim_time_ = params.suggested_reclaim_time(); suggested_reclaim_time_ = params.suggested_reclaim_time();
{ AutoSchedulerLock auto_lock(lock_);
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
DCHECK(!workers_created_.IsSet()); DCHECK(!workers_created_.IsSet());
...@@ -258,8 +256,7 @@ void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) { ...@@ -258,8 +256,7 @@ void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) {
for (int index = params.max_threads() - 1; index >= 0; --index) { for (int index = params.max_threads() - 1; index >= 0; --index) {
workers_[index] = make_scoped_refptr(new SchedulerWorker( workers_[index] = make_scoped_refptr(new SchedulerWorker(
priority_hint_, MakeUnique<SchedulerWorkerDelegateImpl>(this, index), priority_hint_, MakeUnique<SchedulerWorkerDelegateImpl>(this, index),
task_tracker_, &idle_workers_stack_lock_, task_tracker_, &lock_, params.backward_compatibility(),
params.backward_compatibility(),
index < num_alive_workers ? SchedulerWorker::InitialState::ALIVE index < num_alive_workers ? SchedulerWorker::InitialState::ALIVE
: SchedulerWorker::InitialState::DETACHED)); : SchedulerWorker::InitialState::DETACHED));
...@@ -275,14 +272,13 @@ void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) { ...@@ -275,14 +272,13 @@ void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) {
// Start all workers. CHECK that the first worker can be started (assume // Start all workers. CHECK that the first worker can be started (assume
// that failure means that threads can't be created on this machine). Note // that failure means that threads can't be created on this machine). Note
// that the workers must be started before the idle_workers_stack_lock_ is // that the workers must be started before the |lock_| is
// released, otherwise WakeUpOneWorker() could WakeUp() a worker before it's // released, otherwise WakeUpOneWorker() could WakeUp() a worker before it's
// started (after the lock's released, but before it's started). // started (after the lock's released, but before it's started).
for (size_t index = 0; index < workers_.size(); ++index) { for (size_t index = 0; index < workers_.size(); ++index) {
const bool start_success = workers_[index]->Start(); const bool start_success = workers_[index]->Start();
CHECK(start_success || index > 0); CHECK(start_success || index > 0);
} }
}
// Wake up one worker for each wake up that occurred before Start(). // Wake up one worker for each wake up that occurred before Start().
for (size_t index = 0; index < workers_.size(); ++index) { for (size_t index = 0; index < workers_.size(); ++index) {
...@@ -294,7 +290,10 @@ void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) { ...@@ -294,7 +290,10 @@ void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) {
SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
// SchedulerWorkerPool should never be deleted in production unless its // SchedulerWorkerPool should never be deleted in production unless its
// initialization failed. // initialization failed.
#if DCHECK_IS_ON()
AutoSchedulerLock auto_lock(lock_);
DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty());
#endif
} }
scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
...@@ -370,6 +369,7 @@ void SchedulerWorkerPoolImpl::GetHistograms( ...@@ -370,6 +369,7 @@ void SchedulerWorkerPoolImpl::GetHistograms(
} }
int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const {
AutoSchedulerLock auto_lock(lock_);
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
DCHECK(workers_created_.IsSet()); DCHECK(workers_created_.IsSet());
#endif #endif
...@@ -380,7 +380,7 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { ...@@ -380,7 +380,7 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
DCHECK(workers_created_.IsSet()); DCHECK(workers_created_.IsSet());
#endif #endif
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); AutoSchedulerLock auto_lock(lock_);
while (idle_workers_stack_.Size() < workers_.size()) while (idle_workers_stack_.Size() < workers_.size())
idle_workers_stack_cv_for_testing_->Wait(); idle_workers_stack_cv_for_testing_->Wait();
} }
...@@ -391,9 +391,31 @@ void SchedulerWorkerPoolImpl::JoinForTesting() { ...@@ -391,9 +391,31 @@ void SchedulerWorkerPoolImpl::JoinForTesting() {
#endif #endif
DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max())
<< "Workers can detach during join."; << "Workers can detach during join.";
for (const auto& worker : workers_)
decltype(workers_) workers_copy;
// Make a copy of the SchedulerWorkers so that we can call
// SchedulerWorker::JoinForTesting() without holding |lock_| since
// SchedulerWorkers may need to access |workers_|.
{
AutoSchedulerLock auto_lock(lock_);
workers_copy = workers_;
}
for (const auto& worker : workers_copy)
worker->JoinForTesting(); worker->JoinForTesting();
// The |workers_| vector is currently expected not to change after
// JoinForTesting is called, so the copy is not expected to become stale.
//
// TODO(jeffreyhe): After detachment is removed, remove this DCHECK and
// create a flag to prevent changes to |workers_|. See crbug.com/740612.
{
AutoSchedulerLock auto_lock(lock_);
DCHECK(workers_copy == workers_);
}
DCHECK(!join_for_testing_returned_.IsSignaled()); DCHECK(!join_for_testing_returned_.IsSignaled());
join_for_testing_returned_.Signal(); join_for_testing_returned_.Signal();
} }
...@@ -403,6 +425,7 @@ void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { ...@@ -403,6 +425,7 @@ void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() {
} }
size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() {
AutoSchedulerLock auto_lock(lock_);
size_t num_alive_workers = 0; size_t num_alive_workers = 0;
for (const auto& worker : workers_) { for (const auto& worker : workers_) {
if (worker->ThreadAliveForTesting()) if (worker->ThreadAliveForTesting())
...@@ -423,10 +446,14 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: ...@@ -423,10 +446,14 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
SchedulerWorker* worker) { SchedulerWorker* worker) {
{
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
AutoSchedulerLock auto_lock(outer_->lock_);
DCHECK(outer_->workers_created_.IsSet()); DCHECK(outer_->workers_created_.IsSet());
DCHECK(ContainsWorker(outer_->workers_, worker)); DCHECK(ContainsWorker(outer_->workers_, worker));
#endif #endif
}
DCHECK_EQ(num_tasks_since_last_wait_, 0U); DCHECK_EQ(num_tasks_since_last_wait_, 0U);
if (!last_detach_time_.is_null()) { if (!last_detach_time_.is_null()) {
...@@ -447,8 +474,12 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( ...@@ -447,8 +474,12 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
scoped_refptr<Sequence> scoped_refptr<Sequence>
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
SchedulerWorker* worker) { SchedulerWorker* worker) {
#if DCHECK_IS_ON()
{
AutoSchedulerLock auto_lock(outer_->lock_);
DCHECK(ContainsWorker(outer_->workers_, worker)); DCHECK(ContainsWorker(outer_->workers_, worker));
}
#endif
// Record the TaskScheduler.NumTasksBetweenWaits histogram if the // Record the TaskScheduler.NumTasksBetweenWaits histogram if the
// SchedulerWorker waited on its WaitableEvent since the last GetWork(). // SchedulerWorker waited on its WaitableEvent since the last GetWork().
// //
...@@ -545,7 +576,7 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { ...@@ -545,7 +576,7 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() {
void SchedulerWorkerPoolImpl::WakeUpOneWorker() { void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
SchedulerWorker* worker = nullptr; SchedulerWorker* worker = nullptr;
{ {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); AutoSchedulerLock auto_lock(lock_);
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
DCHECK_EQ(workers_.empty(), !workers_created_.IsSet()); DCHECK_EQ(workers_.empty(), !workers_created_.IsSet());
...@@ -565,7 +596,8 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { ...@@ -565,7 +596,8 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
SchedulerWorker* worker) { SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); AutoSchedulerLock auto_lock(lock_);
// Detachment may cause multiple attempts to add because the delegate cannot // Detachment may cause multiple attempts to add because the delegate cannot
// determine who woke it up. As a result, when it wakes up, it may conclude // determine who woke it up. As a result, when it wakes up, it may conclude
// there's no work to be done and attempt to add itself to the idle stack // there's no work to be done and attempt to add itself to the idle stack
...@@ -580,13 +612,13 @@ void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( ...@@ -580,13 +612,13 @@ void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
} }
const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); AutoSchedulerLock auto_lock(lock_);
return idle_workers_stack_.Peek(); return idle_workers_stack_.Peek();
} }
void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack(
SchedulerWorker* worker) { SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); AutoSchedulerLock auto_lock(lock_);
idle_workers_stack_.Remove(worker); idle_workers_stack_.Remove(worker);
} }
......
...@@ -138,22 +138,20 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool { ...@@ -138,22 +138,20 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// PriorityQueue from which all threads of this worker pool get work. // PriorityQueue from which all threads of this worker pool get work.
PriorityQueue shared_priority_queue_; PriorityQueue shared_priority_queue_;
// All workers owned by this worker pool. Initialized by Start() within the
// scope of |idle_workers_stack_lock_|. Never modified afterwards (i.e. can be
// read without synchronization once |workers_created_.IsSet()|).
std::vector<scoped_refptr<SchedulerWorker>> workers_;
// Suggested reclaim time for workers. Initialized by Start(). Never modified // Suggested reclaim time for workers. Initialized by Start(). Never modified
// afterwards (i.e. can be read without synchronization once // afterwards (i.e. can be read without synchronization once
// |workers_created_.IsSet()|). // |workers_created_.IsSet()|).
TimeDelta suggested_reclaim_time_; TimeDelta suggested_reclaim_time_;
// Synchronizes access to |idle_workers_stack_|, // Synchronizes accesses to |workers_|, |idle_workers_stack_|,
// |idle_workers_stack_cv_for_testing_| and |num_wake_ups_before_start_|. Has // |idle_workers_stack_cv_for_testing_| and
// |shared_priority_queue_|'s lock as its predecessor so that a worker can be // |num_wake_ups_before_start_|. Has |shared_priority_queue_|'s lock as
// pushed to |idle_workers_stack_| within the scope of a Transaction (more // its predecessor so that a worker can be pushed to |idle_workers_stack_|
// details in GetWork()). // within the scope of a Transaction (more details in GetWork()).
mutable SchedulerLock idle_workers_stack_lock_; mutable SchedulerLock lock_;
// All workers owned by this worker pool.
std::vector<scoped_refptr<SchedulerWorker>> workers_;
// Stack of idle workers. Initially, all workers are on this stack. A worker // Stack of idle workers. Initially, all workers are on this stack. A worker
// is removed from the stack before its WakeUp() function is called and when // is removed from the stack before its WakeUp() function is called and when
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment