Commit 611b9293 authored by Francois Doray's avatar Francois Doray Committed by Commit Bot

TaskScheduler: Create threads outside the scope of the pool lock.

Hang reports show that there can be contention on the pool lock when
a thread is created in a scope where it is held. To avoid this
contention, this CL ensures that no lock is held when
SchedulerWorker::Start() is called.

Bug: 905288
Change-Id: I4158fb061e6c0ec0de820bd061c1cc5b2d309e6b
Reviewed-on: https://chromium-review.googlesource.com/c/1355427Reviewed-by: default avatarEtienne Pierre-Doray <etiennep@chromium.org>
Commit-Queue: François Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#612809}
parent 98ba5fa0
......@@ -69,6 +69,7 @@ class SchedulerLock : public Lock {
SchedulerLock() = default;
explicit SchedulerLock(const SchedulerLock*) {}
explicit SchedulerLock(UniversalPredecessor) {}
static void AssertNoLockHeldOnCurrentThread() {}
std::unique_ptr<ConditionVariable> CreateConditionVariable() {
return std::unique_ptr<ConditionVariable>(new ConditionVariable(this));
......
......@@ -11,6 +11,7 @@
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/synchronization/condition_variable.h"
#include "base/task/task_scheduler/scheduler_lock.h"
#include "base/threading/platform_thread.h"
#include "base/threading/thread_local_storage.h"
......@@ -49,6 +50,10 @@ class SafeAcquisitionTracker {
acquired_locks->erase(iter_at_lock);
}
void AssertNoLockHeldOnCurrentThread() {
DCHECK(GetAcquiredLocksOnCurrentThread()->empty());
}
private:
using LockVector = std::vector<const SchedulerLockImpl*>;
using PredecessorMap =
......@@ -143,6 +148,10 @@ SchedulerLockImpl::~SchedulerLockImpl() {
g_safe_acquisition_tracker.Get().UnregisterLock(this);
}
void SchedulerLockImpl::AssertNoLockHeldOnCurrentThread() {
g_safe_acquisition_tracker.Get().AssertNoLockHeldOnCurrentThread();
}
void SchedulerLockImpl::Acquire() {
lock_.Acquire();
g_safe_acquisition_tracker.Get().RecordAcquisition(this);
......
......@@ -30,6 +30,8 @@ class BASE_EXPORT SchedulerLockImpl {
explicit SchedulerLockImpl(UniversalPredecessor);
~SchedulerLockImpl();
static void AssertNoLockHeldOnCurrentThread();
void Acquire();
void Release();
......
......@@ -342,6 +342,18 @@ TEST(TaskSchedulerLock, AcquireUniversalPredecessorAfterUniversalPredecessor) {
});
}
TEST(TaskSchedulerLock, AssertNoLockHeldOnCurrentThread) {
// AssertNoLockHeldOnCurrentThread() shouldn't fail when no lock is acquired.
SchedulerLock::AssertNoLockHeldOnCurrentThread();
// AssertNoLockHeldOnCurrentThread() should fail when a lock is acquired.
SchedulerLock lock;
{
AutoSchedulerLock auto_lock(lock);
EXPECT_DCHECK_DEATH({ SchedulerLock::AssertNoLockHeldOnCurrentThread(); });
}
}
} // namespace
} // namespace internal
} // namespace base
......@@ -63,10 +63,11 @@ SchedulerWorker::SchedulerWorker(
bool SchedulerWorker::Start(
SchedulerWorkerObserver* scheduler_worker_observer) {
SchedulerLock::AssertNoLockHeldOnCurrentThread();
AutoSchedulerLock auto_lock(thread_lock_);
DCHECK(thread_handle_.is_null());
if (should_exit_.IsSet())
if (should_exit_.IsSet() || join_called_for_testing_.IsSet())
return true;
DCHECK(!scheduler_worker_observer_);
......@@ -103,7 +104,10 @@ void SchedulerWorker::JoinForTesting() {
{
AutoSchedulerLock auto_lock(thread_lock_);
DCHECK(!thread_handle_.is_null());
if (thread_handle_.is_null())
return;
thread_handle = thread_handle_;
// Reset |thread_handle_| so it isn't joined by the destructor.
thread_handle_ = PlatformThreadHandle();
......
......@@ -64,6 +64,42 @@ bool ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>>& workers,
} // namespace
// Accumulates workers and starts them on destruction. Useful to ensure that
// workers are started after a lock is released.
class SchedulerWorkerPoolImpl::SchedulerWorkerStarter {
public:
SchedulerWorkerStarter(TrackedRef<SchedulerWorkerPoolImpl> outer)
: outer_(outer) {}
~SchedulerWorkerStarter() {
if (worker_to_start_) {
worker_to_start_->Start(outer_->scheduler_worker_observer_);
for (auto& worker_to_start : additional_workers_to_start_)
worker_to_start->Start(outer_->scheduler_worker_observer_);
} else {
DCHECK(additional_workers_to_start_.empty());
}
}
void ScheduleStart(scoped_refptr<SchedulerWorker> worker) {
if (!worker)
return;
if (!worker_to_start_)
worker_to_start_ = std::move(worker);
else
additional_workers_to_start_.push_back(std::move(worker));
}
private:
const TrackedRef<SchedulerWorkerPoolImpl> outer_;
// The purpose of |worker_to_start_| is to avoid a heap allocation for the
// vector in the case where there is only one worker to start.
scoped_refptr<SchedulerWorker> worker_to_start_;
std::vector<scoped_refptr<SchedulerWorker>> additional_workers_to_start_;
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerStarter);
};
class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
: public SchedulerWorker::Delegate,
public BlockingObserver {
......@@ -221,6 +257,8 @@ void SchedulerWorkerPoolImpl::Start(
scoped_refptr<TaskRunner> service_thread_task_runner,
SchedulerWorkerObserver* scheduler_worker_observer,
WorkerEnvironment worker_environment) {
SchedulerWorkerStarter starter(tracked_ref_factory_.GetTrackedRef());
AutoSchedulerLock auto_lock(lock_);
may_block_threshold_ =
......@@ -251,20 +289,17 @@ void SchedulerWorkerPoolImpl::Start(
workers_.reserve(num_initial_workers);
for (int index = 0; index < num_initial_workers; ++index) {
SchedulerWorker* worker =
CreateRegisterAndStartSchedulerWorkerLockRequired();
scoped_refptr<SchedulerWorker> worker =
CreateAndRegisterWorkerLockRequired();
DCHECK(worker);
// CHECK that the first worker can be started (assume that failure means
// that threads can't be created on this machine).
CHECK(worker || index > 0);
if (index < num_wake_ups_before_start_)
worker->WakeUp();
else
idle_workers_stack_.Push(worker.get());
if (worker) {
if (index < num_wake_ups_before_start_) {
worker->WakeUp();
} else {
idle_workers_stack_.Push(worker);
}
}
// SchedulerWorker::Start() will happen after the lock is released.
starter.ScheduleStart(std::move(worker));
}
}
......@@ -481,6 +516,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
DCHECK(!is_running_task_);
DCHECK(!is_running_best_effort_task_);
SchedulerWorkerStarter starter(outer_);
{
AutoSchedulerLock auto_lock(outer_->lock_);
......@@ -504,7 +541,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
}
// Replace this worker if it was the last one, capacity permitting.
outer_->MaintainAtLeastOneIdleWorkerLockRequired();
starter.ScheduleStart(outer_->MaintainAtLeastOneIdleWorkerLockRequired());
// Excess workers should not get work, until they are no longer excess (i.e.
// max tasks increases or another worker cleans up). This ensures that if we
......@@ -783,6 +820,8 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::WillBlockEntered() {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
bool wake_up_allowed = false;
SchedulerWorkerStarter starter(outer_);
{
std::unique_ptr<PriorityQueue::Transaction> transaction(
outer_->shared_priority_queue_.BeginTransaction());
......@@ -800,15 +839,12 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::WillBlockEntered() {
return;
if (transaction->IsEmpty()) {
outer_->MaintainAtLeastOneIdleWorkerLockRequired();
starter.ScheduleStart(outer_->MaintainAtLeastOneIdleWorkerLockRequired());
} else {
// TODO(crbug.com/757897): We may create extra workers in this case:
// |workers.size()| was equal to the old |max_tasks_|, we had multiple
// ScopedBlockingCalls in parallel and we had work on the PQ.
wake_up_allowed = outer_->WakeUpOneWorkerLockRequired();
// |wake_up_allowed| is true when the pool is started, and a WILL_BLOCK
// scope cannot be entered before the pool starts.
DCHECK(wake_up_allowed);
starter.ScheduleStart(outer_->WakeUpOneWorkerLockRequired());
}
}
// TODO(crbug.com/813857): This can be better handled in the PostTask()
......@@ -850,52 +886,58 @@ void SchedulerWorkerPoolImpl::WaitForWorkersIdleLockRequiredForTesting(
idle_workers_stack_cv_for_testing_->Wait();
}
bool SchedulerWorkerPoolImpl::WakeUpOneWorkerLockRequired() {
scoped_refptr<SchedulerWorker>
SchedulerWorkerPoolImpl::WakeUpOneWorkerLockRequired() {
lock_.AssertAcquired();
if (workers_.empty()) {
++num_wake_ups_before_start_;
return false;
return nullptr;
}
// Ensure that there is one worker that can run tasks on top of the idle
// stack, capacity permitting.
MaintainAtLeastOneIdleWorkerLockRequired();
scoped_refptr<SchedulerWorker> worker_to_start =
MaintainAtLeastOneIdleWorkerLockRequired();
// If the worker on top of the idle stack can run tasks, wake it up.
if (NumberOfExcessWorkersLockRequired() < idle_workers_stack_.Size()) {
SchedulerWorker* worker = idle_workers_stack_.Pop();
if (worker) {
worker->WakeUp();
}
SchedulerWorker* worker_to_wakeup = idle_workers_stack_.Pop();
DCHECK(!worker_to_start || worker_to_start == worker_to_wakeup);
worker_to_wakeup->WakeUp();
}
return true;
return worker_to_start;
}
void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
bool wake_up_allowed;
SchedulerWorkerStarter starter(tracked_ref_factory_.GetTrackedRef());
{
AutoSchedulerLock auto_lock(lock_);
wake_up_allowed = WakeUpOneWorkerLockRequired();
starter.ScheduleStart(WakeUpOneWorkerLockRequired());
}
if (wake_up_allowed)
ScheduleAdjustMaxTasksIfNeeded();
ScheduleAdjustMaxTasksIfNeeded();
}
void SchedulerWorkerPoolImpl::MaintainAtLeastOneIdleWorkerLockRequired() {
scoped_refptr<SchedulerWorker>
SchedulerWorkerPoolImpl::MaintainAtLeastOneIdleWorkerLockRequired() {
lock_.AssertAcquired();
if (workers_.size() == kMaxNumberOfWorkers)
return;
return nullptr;
DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
if (idle_workers_stack_.IsEmpty() && workers_.size() < max_tasks_) {
SchedulerWorker* new_worker =
CreateRegisterAndStartSchedulerWorkerLockRequired();
if (new_worker)
idle_workers_stack_.Push(new_worker);
}
if (!idle_workers_stack_.IsEmpty())
return nullptr;
if (workers_.size() >= max_tasks_)
return nullptr;
scoped_refptr<SchedulerWorker> new_worker =
CreateAndRegisterWorkerLockRequired();
DCHECK(new_worker);
idle_workers_stack_.Push(new_worker.get());
return new_worker;
}
void SchedulerWorkerPoolImpl::AddToIdleWorkersStackLockRequired(
......@@ -916,8 +958,8 @@ void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStackLockRequired(
idle_workers_stack_.Remove(worker);
}
SchedulerWorker*
SchedulerWorkerPoolImpl::CreateRegisterAndStartSchedulerWorkerLockRequired() {
scoped_refptr<SchedulerWorker>
SchedulerWorkerPoolImpl::CreateAndRegisterWorkerLockRequired() {
lock_.AssertAcquired();
DCHECK_LT(workers_.size(), max_tasks_);
......@@ -931,9 +973,6 @@ SchedulerWorkerPoolImpl::CreateRegisterAndStartSchedulerWorkerLockRequired() {
tracked_ref_factory_.GetTrackedRef()),
task_tracker_, &lock_, backward_compatibility_);
if (!worker->Start(scheduler_worker_observer_))
return nullptr;
workers_.push_back(worker);
DCHECK_LE(workers_.size(), max_tasks_);
......@@ -942,7 +981,7 @@ SchedulerWorkerPoolImpl::CreateRegisterAndStartSchedulerWorkerLockRequired() {
cleanup_timestamps_.top());
cleanup_timestamps_.pop();
}
return worker.get();
return worker;
}
size_t SchedulerWorkerPoolImpl::NumberOfExcessWorkersLockRequired() const {
......@@ -953,6 +992,7 @@ size_t SchedulerWorkerPoolImpl::NumberOfExcessWorkersLockRequired() const {
void SchedulerWorkerPoolImpl::AdjustMaxTasks() {
DCHECK(service_thread_task_runner_->RunsTasksInCurrentSequence());
SchedulerWorkerStarter starter(tracked_ref_factory_.GetTrackedRef());
std::unique_ptr<PriorityQueue::Transaction> transaction(
shared_priority_queue_.BeginTransaction());
AutoSchedulerLock auto_lock(lock_);
......@@ -980,10 +1020,10 @@ void SchedulerWorkerPoolImpl::AdjustMaxTasks() {
for (size_t i = 0; i < num_wake_ups_needed; ++i) {
// No need to call ScheduleAdjustMaxTasksIfNeeded() as the caller will
// take care of that for us.
WakeUpOneWorkerLockRequired();
starter.ScheduleStart(WakeUpOneWorkerLockRequired());
}
MaintainAtLeastOneIdleWorkerLockRequired();
starter.ScheduleStart(MaintainAtLeastOneIdleWorkerLockRequired());
}
TimeDelta SchedulerWorkerPoolImpl::MayBlockThreshold() const {
......
......@@ -12,6 +12,7 @@
#include <vector>
#include "base/base_export.h"
#include "base/compiler_specific.h"
#include "base/containers/stack.h"
#include "base/logging.h"
#include "base/macros.h"
......@@ -158,6 +159,7 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
bool RemoveSequence(scoped_refptr<Sequence> sequence);
private:
class SchedulerWorkerStarter;
class SchedulerWorkerDelegateImpl;
// Friend tests so that they can access |kBlockedWorkersPollPeriod| and
......@@ -180,16 +182,21 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
void WaitForWorkersIdleLockRequiredForTesting(size_t n);
// Wakes up the last worker from this worker pool to go idle, if any.
// Otherwise, creates and starts a worker, if permitted, and wakes it up.
void WakeUpOneWorker();
// Performs the same action as WakeUpOneWorker() except asserts |lock_| is
// acquired rather than acquires it and returns true if worker wakeups are
// permitted.
bool WakeUpOneWorkerLockRequired();
// Performs the same action as WakeUpOneWorker(), except:
// - Asserts |lock_| is acquired rather than acquires it.
// - Instead of starting a worker it creates, returns it and expects that the
// caller will start it once |lock_| is released.
scoped_refptr<SchedulerWorker> WakeUpOneWorkerLockRequired()
WARN_UNUSED_RESULT;
// Adds a worker, if needed, to maintain one idle worker, |max_tasks_|
// permitting.
void MaintainAtLeastOneIdleWorkerLockRequired();
// Creates a worker, if needed, to maintain one idle worker, |max_tasks_|
// permitting. Expects the caller to start the returned worker once |lock_| is
// released.
scoped_refptr<SchedulerWorker> MaintainAtLeastOneIdleWorkerLockRequired()
WARN_UNUSED_RESULT;
// Adds |worker| to |idle_workers_stack_|.
void AddToIdleWorkersStackLockRequired(SchedulerWorker* worker);
......@@ -200,10 +207,11 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// Returns true if worker cleanup is permitted.
bool CanWorkerCleanupForTestingLockRequired();
// Tries to add a new SchedulerWorker to the pool. Returns the new
// SchedulerWorker on success, nullptr otherwise. Cannot be called before
// Start(). Must be called under the protection of |lock_|.
SchedulerWorker* CreateRegisterAndStartSchedulerWorkerLockRequired();
// Creates a worker, adds it to the pool and returns it. Expects the caller to
// start the returned worker. Cannot be called before Start(). Must be called
// under the protection of |lock_|.
scoped_refptr<SchedulerWorker> CreateAndRegisterWorkerLockRequired()
WARN_UNUSED_RESULT;
// Returns the number of workers in the pool that should not run tasks due to
// the pool being over capacity.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment