Commit 8e4c5d6e authored by Jeffrey He's avatar Jeffrey He Committed by Commit Bot

Reland "Protect |workers_| under the |idle_workers_stack_lock_|"

This is a reland of 65143c3c
Original change's description:
> Protect |workers_| under the |idle_workers_stack_lock_|
> 
> The SchedulerWorkerPoolImpl in TaskScheduler will be changed to have a
> dynamic number of SchedulerWorkers, so |workers_| can be read and 
> written to from multiple threads. The |idle_workers_stack_lock_|
> will additionally cover |workers_| and be renamed to |lock_|.
> 
> Bug: 738104
> Change-Id: I23a49e18253a4a9a375be790be5a6735cb844323
> Reviewed-on: https://chromium-review.googlesource.com/556341
> Commit-Queue: Jeffrey He <jeffreyhe@google.com>
> Reviewed-by: Gabriel Charette <gab@chromium.org>
> Reviewed-by: Robert Liao <robliao@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#486074}

Bug: 738104
Change-Id: Ib4eb5533577abaf190715961483da7550cc01ece
Reviewed-on: https://chromium-review.googlesource.com/574634
Commit-Queue: Jeffrey He <jeffreyhe@google.com>
Commit-Queue: Gabriel Charette <gab@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#487156}
parent 3a5fc78c
......@@ -193,9 +193,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
DelayedTaskManager* delayed_task_manager)
: name_(name),
priority_hint_(priority_hint),
idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
idle_workers_stack_cv_for_testing_(
idle_workers_stack_lock_.CreateConditionVariable()),
lock_(shared_priority_queue_.container_lock()),
idle_workers_stack_cv_for_testing_(lock_.CreateConditionVariable()),
join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
// Mimics the UMA_HISTOGRAM_LONG_TIMES macro.
......@@ -233,55 +232,52 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) {
suggested_reclaim_time_ = params.suggested_reclaim_time();
{
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
AutoSchedulerLock auto_lock(lock_);
#if DCHECK_IS_ON()
DCHECK(!workers_created_.IsSet());
DCHECK(!workers_created_.IsSet());
#endif
DCHECK(workers_.empty());
workers_.resize(params.max_threads());
// The number of workers created alive is |num_wake_ups_before_start_|, plus
// one if the standby thread policy is ONE (in order to start with one alive
// idle worker).
const int num_alive_workers =
num_wake_ups_before_start_ +
(params.standby_thread_policy() ==
SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE
? 1
: 0);
// Create workers in reverse order of index so that the worker with the
// highest index is at the bottom of the idle stack.
for (int index = params.max_threads() - 1; index >= 0; --index) {
workers_[index] = make_scoped_refptr(new SchedulerWorker(
priority_hint_, MakeUnique<SchedulerWorkerDelegateImpl>(this, index),
task_tracker_, &idle_workers_stack_lock_,
params.backward_compatibility(),
index < num_alive_workers ? SchedulerWorker::InitialState::ALIVE
: SchedulerWorker::InitialState::DETACHED));
// Put workers that won't be woken up at the end of this method on the
// idle stack.
if (index >= num_wake_ups_before_start_)
idle_workers_stack_.Push(workers_[index].get());
}
DCHECK(workers_.empty());
workers_.resize(params.max_threads());
// The number of workers created alive is |num_wake_ups_before_start_|, plus
// one if the standby thread policy is ONE (in order to start with one alive
// idle worker).
const int num_alive_workers =
num_wake_ups_before_start_ +
(params.standby_thread_policy() ==
SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE
? 1
: 0);
// Create workers in reverse order of index so that the worker with the
// highest index is at the bottom of the idle stack.
for (int index = params.max_threads() - 1; index >= 0; --index) {
workers_[index] = make_scoped_refptr(new SchedulerWorker(
priority_hint_, MakeUnique<SchedulerWorkerDelegateImpl>(this, index),
task_tracker_, &lock_, params.backward_compatibility(),
index < num_alive_workers ? SchedulerWorker::InitialState::ALIVE
: SchedulerWorker::InitialState::DETACHED));
// Put workers that won't be woken up at the end of this method on the
// idle stack.
if (index >= num_wake_ups_before_start_)
idle_workers_stack_.Push(workers_[index].get());
}
#if DCHECK_IS_ON()
workers_created_.Set();
workers_created_.Set();
#endif
// Start all workers. CHECK that the first worker can be started (assume
// that failure means that threads can't be created on this machine). Note
// that the workers must be started before the idle_workers_stack_lock_ is
// released, otherwise WakeUpOneWorker() could WakeUp() a worker before it's
// started (after the lock's released, but before it's started).
for (size_t index = 0; index < workers_.size(); ++index) {
const bool start_success = workers_[index]->Start();
CHECK(start_success || index > 0);
}
// Start all workers. CHECK that the first worker can be started (assume
// that failure means that threads can't be created on this machine). Note
// that the workers must be started before the |lock_| is
// released, otherwise WakeUpOneWorker() could WakeUp() a worker before it's
// started (after the lock's released, but before it's started).
for (size_t index = 0; index < workers_.size(); ++index) {
const bool start_success = workers_[index]->Start();
CHECK(start_success || index > 0);
}
// Wake up one worker for each wake up that occurred before Start().
......@@ -294,7 +290,10 @@ void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) {
SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
// SchedulerWorkerPool should never be deleted in production unless its
// initialization failed.
#if DCHECK_IS_ON()
AutoSchedulerLock auto_lock(lock_);
DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty());
#endif
}
scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
......@@ -370,6 +369,7 @@ void SchedulerWorkerPoolImpl::GetHistograms(
}
int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const {
AutoSchedulerLock auto_lock(lock_);
#if DCHECK_IS_ON()
DCHECK(workers_created_.IsSet());
#endif
......@@ -380,7 +380,7 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
#if DCHECK_IS_ON()
DCHECK(workers_created_.IsSet());
#endif
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
AutoSchedulerLock auto_lock(lock_);
while (idle_workers_stack_.Size() < workers_.size())
idle_workers_stack_cv_for_testing_->Wait();
}
......@@ -391,9 +391,31 @@ void SchedulerWorkerPoolImpl::JoinForTesting() {
#endif
DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max())
<< "Workers can detach during join.";
for (const auto& worker : workers_)
decltype(workers_) workers_copy;
// Make a copy of the SchedulerWorkers so that we can call
// SchedulerWorker::JoinForTesting() without holding |lock_| since
// SchedulerWorkers may need to access |workers_|.
{
AutoSchedulerLock auto_lock(lock_);
workers_copy = workers_;
}
for (const auto& worker : workers_copy)
worker->JoinForTesting();
// The |workers_| vector is currently expected not to change after
// JoinForTesting is called, so the copy is not expected to become stale.
//
// TODO(jeffreyhe): After detachment is removed, remove this DCHECK and
// create a flag to prevent changes to |workers_|. See crbug.com/740612.
{
AutoSchedulerLock auto_lock(lock_);
DCHECK(workers_copy == workers_);
}
DCHECK(!join_for_testing_returned_.IsSignaled());
join_for_testing_returned_.Signal();
}
......@@ -403,6 +425,7 @@ void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() {
}
size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() {
AutoSchedulerLock auto_lock(lock_);
size_t num_alive_workers = 0;
for (const auto& worker : workers_) {
if (worker->ThreadAliveForTesting())
......@@ -423,10 +446,14 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
SchedulerWorker* worker) {
{
#if DCHECK_IS_ON()
DCHECK(outer_->workers_created_.IsSet());
DCHECK(ContainsWorker(outer_->workers_, worker));
AutoSchedulerLock auto_lock(outer_->lock_);
DCHECK(outer_->workers_created_.IsSet());
DCHECK(ContainsWorker(outer_->workers_, worker));
#endif
}
DCHECK_EQ(num_tasks_since_last_wait_, 0U);
if (!last_detach_time_.is_null()) {
......@@ -447,7 +474,12 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
scoped_refptr<Sequence>
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
SchedulerWorker* worker) {
DCHECK(ContainsWorker(outer_->workers_, worker));
{
#if DCHECK_IS_ON()
AutoSchedulerLock auto_lock(outer_->lock_);
#endif
DCHECK(ContainsWorker(outer_->workers_, worker));
}
// Record the TaskScheduler.NumTasksBetweenWaits histogram if the
// SchedulerWorker waited on its WaitableEvent since the last GetWork().
......@@ -545,7 +577,7 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() {
void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
SchedulerWorker* worker = nullptr;
{
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
AutoSchedulerLock auto_lock(lock_);
#if DCHECK_IS_ON()
DCHECK_EQ(workers_.empty(), !workers_created_.IsSet());
......@@ -565,7 +597,8 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
AutoSchedulerLock auto_lock(lock_);
// Detachment may cause multiple attempts to add because the delegate cannot
// determine who woke it up. As a result, when it wakes up, it may conclude
// there's no work to be done and attempt to add itself to the idle stack
......@@ -580,13 +613,13 @@ void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
}
const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
AutoSchedulerLock auto_lock(lock_);
return idle_workers_stack_.Peek();
}
void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack(
SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
AutoSchedulerLock auto_lock(lock_);
idle_workers_stack_.Remove(worker);
}
......
......@@ -138,22 +138,20 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// PriorityQueue from which all threads of this worker pool get work.
PriorityQueue shared_priority_queue_;
// All workers owned by this worker pool. Initialized by Start() within the
// scope of |idle_workers_stack_lock_|. Never modified afterwards (i.e. can be
// read without synchronization once |workers_created_.IsSet()|).
std::vector<scoped_refptr<SchedulerWorker>> workers_;
// Suggested reclaim time for workers. Initialized by Start(). Never modified
// afterwards (i.e. can be read without synchronization once
// |workers_created_.IsSet()|).
TimeDelta suggested_reclaim_time_;
// Synchronizes access to |idle_workers_stack_|,
// |idle_workers_stack_cv_for_testing_| and |num_wake_ups_before_start_|. Has
// |shared_priority_queue_|'s lock as its predecessor so that a worker can be
// pushed to |idle_workers_stack_| within the scope of a Transaction (more
// details in GetWork()).
mutable SchedulerLock idle_workers_stack_lock_;
// Synchronizes accesses to |workers_|, |idle_workers_stack_|,
// |idle_workers_stack_cv_for_testing_| and
// |num_wake_ups_before_start_|. Has |shared_priority_queue_|'s lock as
// its predecessor so that a worker can be pushed to |idle_workers_stack_|
// within the scope of a Transaction (more details in GetWork()).
mutable SchedulerLock lock_;
// All workers owned by this worker pool.
std::vector<scoped_refptr<SchedulerWorker>> workers_;
// Stack of idle workers. Initially, all workers are on this stack. A worker
// is removed from the stack before its WakeUp() function is called and when
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment