Commit 8760a095 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[TaskScheduler]: Distribute waking-up workers.

This CL distributes waking-up workers to avoid having 1 thread waking-up
many workers. This implies:
- Cap num_workers_to_wakeup to 2 in EnsureEnoughWorkersLockRequired.
- Call EnsureEnoughWorkersLockRequired from GetWork(), so
  it can wakeup more workers (which will, in turn wakeup other workers).

This is ground work for supporting TaskSource with concurrency > 1
and shouldn't have a big impact currently since it's rare
for EnsureEnoughWorkersLockRequired() to wakeup many workers.

RepeatedWillBlockDoesNotCreateTooManyWorkers had to be adapted;
The test didn't take into account the idle thread, and it seems
there was no code path maintaining the idle thread in this situation.
This CL changes this and an idle thread is now created, which the test
need to take into account.

TBR=fdoray@chromium.org

Change-Id: If2479b703ff6d85a0f11e73908ff1e09ed249c1a
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1474462
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#638407}
parent 7568f395
...@@ -110,47 +110,11 @@ class LOCKABLE BASE_EXPORT Lock { ...@@ -110,47 +110,11 @@ class LOCKABLE BASE_EXPORT Lock {
}; };
// A helper class that acquires the given Lock while the AutoLock is in scope. // A helper class that acquires the given Lock while the AutoLock is in scope.
class SCOPED_LOCKABLE AutoLock { using AutoLock = internal::BasicAutoLock<Lock>;
public:
struct AlreadyAcquired {};
explicit AutoLock(Lock& lock) EXCLUSIVE_LOCK_FUNCTION(lock) : lock_(lock) {
lock_.Acquire();
}
AutoLock(Lock& lock, const AlreadyAcquired&) EXCLUSIVE_LOCKS_REQUIRED(lock)
: lock_(lock) {
lock_.AssertAcquired();
}
~AutoLock() UNLOCK_FUNCTION() {
lock_.AssertAcquired();
lock_.Release();
}
private:
Lock& lock_;
DISALLOW_COPY_AND_ASSIGN(AutoLock);
};
// AutoUnlock is a helper that will Release() the |lock| argument in the // AutoUnlock is a helper that will Release() the |lock| argument in the
// constructor, and re-Acquire() it in the destructor. // constructor, and re-Acquire() it in the destructor.
class AutoUnlock { using AutoUnlock = internal::BasicAutoUnlock<Lock>;
public:
explicit AutoUnlock(Lock& lock) : lock_(lock) {
// We require our caller to have the lock.
lock_.AssertAcquired();
lock_.Release();
}
~AutoUnlock() {
lock_.Acquire();
}
private:
Lock& lock_;
DISALLOW_COPY_AND_ASSIGN(AutoUnlock);
};
} // namespace base } // namespace base
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "base/base_export.h" #include "base/base_export.h"
#include "base/logging.h" #include "base/logging.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/thread_annotations.h"
#include "build/build_config.h" #include "build/build_config.h"
#if defined(OS_WIN) #if defined(OS_WIN)
...@@ -72,6 +73,50 @@ void LockImpl::Unlock() { ...@@ -72,6 +73,50 @@ void LockImpl::Unlock() {
} }
#endif #endif
// This is an implementation used for AutoLock templated on the lock type.
template <class LockType>
class SCOPED_LOCKABLE BasicAutoLock {
public:
struct AlreadyAcquired {};
explicit BasicAutoLock(LockType& lock) EXCLUSIVE_LOCK_FUNCTION(lock)
: lock_(lock) {
lock_.Acquire();
}
BasicAutoLock(LockType& lock, const AlreadyAcquired&)
EXCLUSIVE_LOCKS_REQUIRED(lock)
: lock_(lock) {
lock_.AssertAcquired();
}
~BasicAutoLock() UNLOCK_FUNCTION() {
lock_.AssertAcquired();
lock_.Release();
}
private:
LockType& lock_;
DISALLOW_COPY_AND_ASSIGN(BasicAutoLock);
};
// This is an implementation used for AutoUnlock templated on the lock type.
template <class LockType>
class BasicAutoUnlock {
public:
explicit BasicAutoUnlock(LockType& lock) : lock_(lock) {
// We require our caller to have the lock.
lock_.AssertAcquired();
lock_.Release();
}
~BasicAutoUnlock() { lock_.Acquire(); }
private:
LockType& lock_;
DISALLOW_COPY_AND_ASSIGN(BasicAutoUnlock);
};
} // namespace internal } // namespace internal
} // namespace base } // namespace base
......
...@@ -79,23 +79,10 @@ class LOCKABLE SchedulerLock : public Lock { ...@@ -79,23 +79,10 @@ class LOCKABLE SchedulerLock : public Lock {
#endif // DCHECK_IS_ON() #endif // DCHECK_IS_ON()
// Provides the same functionality as base::AutoLock for SchedulerLock. // Provides the same functionality as base::AutoLock for SchedulerLock.
class SCOPED_LOCKABLE AutoSchedulerLock { using AutoSchedulerLock = internal::BasicAutoLock<SchedulerLock>;
public:
explicit AutoSchedulerLock(SchedulerLock& lock) EXCLUSIVE_LOCK_FUNCTION(lock)
: lock_(lock) {
lock_.Acquire();
}
~AutoSchedulerLock() UNLOCK_FUNCTION() {
lock_.AssertAcquired();
lock_.Release();
}
private: // Provides the same functionality as base::AutoUnlock for SchedulerLock.
SchedulerLock& lock_; using AutoSchedulerUnlock = internal::BasicAutoUnlock<SchedulerLock>;
DISALLOW_COPY_AND_ASSIGN(AutoSchedulerLock);
};
// Informs the clang thread safety analysis that an aliased lock is acquired. // Informs the clang thread safety analysis that an aliased lock is acquired.
// Because the clang thread safety analysis doesn't understand aliased locks // Because the clang thread safety analysis doesn't understand aliased locks
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "base/location.h" #include "base/location.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
#include "base/metrics/histogram.h" #include "base/metrics/histogram.h"
#include "base/numerics/clamped_math.h"
#include "base/sequence_token.h" #include "base/sequence_token.h"
#include "base/strings/string_util.h" #include "base/strings/string_util.h"
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
...@@ -100,20 +101,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerActionExecutor { ...@@ -100,20 +101,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerActionExecutor {
SchedulerWorkerActionExecutor(SchedulerWorkerPoolImpl* outer) SchedulerWorkerActionExecutor(SchedulerWorkerPoolImpl* outer)
: outer_(outer) {} : outer_(outer) {}
~SchedulerWorkerActionExecutor() { ~SchedulerWorkerActionExecutor() { FlushImpl(); }
SchedulerLock::AssertNoLockHeldOnCurrentThread();
// Wake up workers.
workers_to_wake_up_.ForEachWorker(
[](SchedulerWorker* worker) { worker->WakeUp(); });
// Start workers. Happens after wake ups to prevent the case where a worker
// enters its main function, is descheduled because it wasn't woken up yet,
// and is woken up immediately after.
workers_to_start_.ForEachWorker([&](SchedulerWorker* worker) {
worker->Start(outer_->after_start().scheduler_worker_observer);
});
}
void ScheduleWakeUp(scoped_refptr<SchedulerWorker> worker) { void ScheduleWakeUp(scoped_refptr<SchedulerWorker> worker) {
workers_to_wake_up_.AddWorker(std::move(worker)); workers_to_wake_up_.AddWorker(std::move(worker));
...@@ -123,6 +111,15 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerActionExecutor { ...@@ -123,6 +111,15 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerActionExecutor {
workers_to_start_.AddWorker(std::move(worker)); workers_to_start_.AddWorker(std::move(worker));
} }
void Flush(SchedulerLock* held_lock) {
if (workers_to_wake_up_.empty() && workers_to_start_.empty())
return;
AutoSchedulerUnlock auto_unlock(*held_lock);
FlushImpl();
workers_to_wake_up_.clear();
workers_to_start_.clear();
}
private: private:
class WorkerContainer { class WorkerContainer {
public: public:
...@@ -148,6 +145,13 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerActionExecutor { ...@@ -148,6 +145,13 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerActionExecutor {
} }
} }
bool empty() const { return first_worker_ == nullptr; }
void clear() {
first_worker_.reset();
additional_workers_.clear();
}
private: private:
// The purpose of |first_worker| is to avoid a heap allocation by the vector // The purpose of |first_worker| is to avoid a heap allocation by the vector
// in the case where there is only one worker in the container. // in the case where there is only one worker in the container.
...@@ -157,6 +161,21 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerActionExecutor { ...@@ -157,6 +161,21 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerActionExecutor {
DISALLOW_COPY_AND_ASSIGN(WorkerContainer); DISALLOW_COPY_AND_ASSIGN(WorkerContainer);
}; };
void FlushImpl() {
SchedulerLock::AssertNoLockHeldOnCurrentThread();
// Wake up workers.
workers_to_wake_up_.ForEachWorker(
[](SchedulerWorker* worker) { worker->WakeUp(); });
// Start workers. Happens after wake ups to prevent the case where a worker
// enters its main function, is descheduled because it wasn't woken up yet,
// and is woken up immediately after.
workers_to_start_.ForEachWorker([&](SchedulerWorker* worker) {
worker->Start(outer_->after_start().scheduler_worker_observer);
});
}
SchedulerWorkerPoolImpl* const outer_; SchedulerWorkerPoolImpl* const outer_;
WorkerContainer workers_to_wake_up_; WorkerContainer workers_to_wake_up_;
...@@ -370,7 +389,7 @@ void SchedulerWorkerPoolImpl::Start( ...@@ -370,7 +389,7 @@ void SchedulerWorkerPoolImpl::Start(
DCHECK(workers_.empty()); DCHECK(workers_.empty());
in_start().may_block_without_delay_ = in_start().may_block_without_delay =
FeatureList::IsEnabled(kMayBlockWithoutDelay); FeatureList::IsEnabled(kMayBlockWithoutDelay);
in_start().may_block_threshold = in_start().may_block_threshold =
may_block_threshold ? may_block_threshold.value() may_block_threshold ? may_block_threshold.value()
...@@ -603,6 +622,12 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( ...@@ -603,6 +622,12 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
DCHECK(ContainsWorker(outer_->workers_, worker)); DCHECK(ContainsWorker(outer_->workers_, worker));
// Use this opportunity, before assigning work to this worker, to create/wake
// additional workers if needed (doing this here allows us to reduce
// potentially expensive create/wake directly on PostTask()).
outer_->EnsureEnoughWorkersLockRequired(&executor);
executor.Flush(&outer_->lock_);
if (!CanGetWorkLockRequired(worker)) if (!CanGetWorkLockRequired(worker))
return nullptr; return nullptr;
...@@ -622,9 +647,6 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( ...@@ -622,9 +647,6 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
return nullptr; return nullptr;
} }
// Replace this worker if it was the last one, capacity permitting.
outer_->MaintainAtLeastOneIdleWorkerLockRequired(&executor);
// Running task bookkeeping. // Running task bookkeeping.
worker_only().is_running_task = true; worker_only().is_running_task = true;
++outer_->num_running_tasks_; ++outer_->num_running_tasks_;
...@@ -833,7 +855,7 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::BlockingStarted( ...@@ -833,7 +855,7 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::BlockingStarted(
DCHECK(worker_only().is_running_task); DCHECK(worker_only().is_running_task);
// MayBlock with no delay reuses WillBlock implementation. // MayBlock with no delay reuses WillBlock implementation.
if (outer_->after_start().may_block_without_delay_) if (outer_->after_start().may_block_without_delay)
blocking_type = BlockingType::WILL_BLOCK; blocking_type = BlockingType::WILL_BLOCK;
switch (blocking_type) { switch (blocking_type) {
...@@ -853,7 +875,7 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: ...@@ -853,7 +875,7 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
// The blocking type always being WILL_BLOCK in this experiment, it should // The blocking type always being WILL_BLOCK in this experiment, it should
// never be considered "upgraded". // never be considered "upgraded".
if (outer_->after_start().may_block_without_delay_) if (outer_->after_start().may_block_without_delay)
return; return;
{ {
...@@ -1063,27 +1085,31 @@ size_t SchedulerWorkerPoolImpl::GetDesiredNumAwakeWorkersLockRequired() const { ...@@ -1063,27 +1085,31 @@ size_t SchedulerWorkerPoolImpl::GetDesiredNumAwakeWorkersLockRequired() const {
void SchedulerWorkerPoolImpl::EnsureEnoughWorkersLockRequired( void SchedulerWorkerPoolImpl::EnsureEnoughWorkersLockRequired(
SchedulerWorkerActionExecutor* executor) { SchedulerWorkerActionExecutor* executor) {
// Don't do anything if the pool isn't started.
if (max_tasks_ == 0)
return;
const size_t desired_num_awake_workers = const size_t desired_num_awake_workers =
GetDesiredNumAwakeWorkersLockRequired(); GetDesiredNumAwakeWorkersLockRequired();
workers_.reserve(desired_num_awake_workers); const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();
size_t num_workers_to_wake_up =
ClampSub(desired_num_awake_workers, num_awake_workers);
num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));
// Wake up the appropriate number of workers. // Wake up the appropriate number of workers.
for (size_t i = GetNumAwakeWorkersLockRequired(); for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
i < desired_num_awake_workers; ++i) {
MaintainAtLeastOneIdleWorkerLockRequired(executor); MaintainAtLeastOneIdleWorkerLockRequired(executor);
SchedulerWorker* worker_to_wakeup = idle_workers_stack_.Pop(); SchedulerWorker* worker_to_wakeup = idle_workers_stack_.Pop();
DCHECK(worker_to_wakeup); DCHECK(worker_to_wakeup);
executor->ScheduleWakeUp(worker_to_wakeup); executor->ScheduleWakeUp(worker_to_wakeup);
} }
// If no worker is about to call MaintainAtLeastOneIdleWorkerLockRequired(), // In the case where the loop above didn't wake up any worker and we don't
// call it here. This is useful in the case where the loop above didn't wake // have excess workers, the idle worker should be maintained. This happens
// up any worker but a recent increase in |max_tasks| now makes it possible to // when called from the last worker awake, or a recent increase in |max_tasks|
// keep an idle worker. // now makes it possible to keep an idle worker.
DCHECK_GE(GetNumAwakeWorkersLockRequired(), num_running_tasks_); if (desired_num_awake_workers == num_awake_workers)
const size_t num_awake_workers_not_running_task =
GetNumAwakeWorkersLockRequired() - num_running_tasks_;
if (num_awake_workers_not_running_task == 0)
MaintainAtLeastOneIdleWorkerLockRequired(executor); MaintainAtLeastOneIdleWorkerLockRequired(executor);
} }
......
...@@ -259,7 +259,7 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool { ...@@ -259,7 +259,7 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// Optional observer notified when a worker enters and exits its main. // Optional observer notified when a worker enters and exits its main.
SchedulerWorkerObserver* scheduler_worker_observer = nullptr; SchedulerWorkerObserver* scheduler_worker_observer = nullptr;
bool may_block_without_delay_; bool may_block_without_delay;
// Threshold after which the max tasks is increased to compensate for a // Threshold after which the max tasks is increased to compensate for a
// worker that is within a MAY_BLOCK ScopedBlockingCall. // worker that is within a MAY_BLOCK ScopedBlockingCall.
......
...@@ -454,22 +454,42 @@ TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, PostTasksBeforeStart) { ...@@ -454,22 +454,42 @@ TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, PostTasksBeforeStart) {
} }
// Verify that posting many tasks before Start will cause the number of workers // Verify that posting many tasks before Start will cause the number of workers
// to grow to |max_tasks_| during Start. // to grow to |max_tasks_| after Start.
TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, PostManyTasks) { TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, PostManyTasks) {
scoped_refptr<TaskRunner> task_runner = test::CreateTaskRunnerWithTraits( scoped_refptr<TaskRunner> task_runner = test::CreateTaskRunnerWithTraits(
{WithBaseSyncPrimitives()}, &mock_scheduler_task_runner_delegate_); {WithBaseSyncPrimitives()}, &mock_scheduler_task_runner_delegate_);
constexpr size_t kNumTasksPosted = 2 * kMaxTasks; constexpr size_t kNumTasksPosted = 2 * kMaxTasks;
for (size_t i = 0; i < kNumTasksPosted; ++i)
WaitableEvent threads_running;
WaitableEvent threads_continue;
RepeatingClosure threads_running_barrier = BarrierClosure(
kMaxTasks,
BindOnce(&WaitableEvent::Signal, Unretained(&threads_running)));
// Posting these tasks should cause new workers to be created.
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
threads_running_barrier.Run();
test::WaitWithoutBlockingObserver(&threads_continue);
}));
}
// Post the remaining |kNumTasksPosted - kMaxTasks| tasks, don't wait for them
// as they'll be blocked behind the above kMaxtasks.
for (size_t i = kMaxTasks; i < kNumTasksPosted; ++i)
task_runner->PostTask(FROM_HERE, DoNothing()); task_runner->PostTask(FROM_HERE, DoNothing());
EXPECT_EQ(0U, worker_pool_->NumberOfWorkersForTesting()); EXPECT_EQ(0U, worker_pool_->NumberOfWorkersForTesting());
StartWorkerPool(TimeDelta::Max(), kMaxTasks); StartWorkerPool(TimeDelta::Max(), kMaxTasks);
ASSERT_GT(kNumTasksPosted, worker_pool_->GetMaxTasksForTesting()); EXPECT_GT(worker_pool_->NumberOfWorkersForTesting(), 0U);
EXPECT_EQ(kMaxTasks, worker_pool_->GetMaxTasksForTesting()); EXPECT_EQ(kMaxTasks, worker_pool_->GetMaxTasksForTesting());
threads_running.Wait();
EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(),
worker_pool_->GetMaxTasksForTesting()); worker_pool_->GetMaxTasksForTesting());
threads_continue.Signal();
task_tracker_.FlushForTesting();
} }
namespace { namespace {
...@@ -1698,7 +1718,7 @@ TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, ...@@ -1698,7 +1718,7 @@ TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest,
// to accommodate queued and running sequences. // to accommodate queued and running sequences.
ScopedBlockingCall scoped_blocking_call(FROM_HERE, ScopedBlockingCall scoped_blocking_call(FROM_HERE,
BlockingType::WILL_BLOCK); BlockingType::WILL_BLOCK);
EXPECT_EQ(kNumWorkers, worker_pool_->NumberOfWorkersForTesting()); EXPECT_LE(kNumWorkers + 1, worker_pool_->NumberOfWorkersForTesting());
} }
worker_observer.UnblockWorkers(); worker_observer.UnblockWorkers();
...@@ -1706,7 +1726,7 @@ TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, ...@@ -1706,7 +1726,7 @@ TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest,
runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() { runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(), EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(),
kNumWorkers); kNumWorkers + 1);
})); }));
hold_will_block_task.Signal(); hold_will_block_task.Signal();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment