Commit bcab5d9d authored by Jesse McKenna's avatar Jesse McKenna Committed by Commit Bot

TaskScheduler: Delegate reinsertion of Sequences in SchedulerWorkerPool...

TaskScheduler: Delegate reinsertion of Sequences in SchedulerWorkerPool priority queue to TaskScheduler

Bug: 889029
Change-Id: I424024245e74c0c08d76a94263c94fad7a3631cc
Reviewed-on: https://chromium-review.googlesource.com/c/1271794
Commit-Queue: Jesse McKenna <jessemckenna@google.com>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#601615}
parent 54fcb574
......@@ -11,8 +11,11 @@ namespace internal {
PlatformNativeWorkerPoolWin::PlatformNativeWorkerPoolWin(
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager)
: SchedulerWorkerPool(task_tracker, delayed_task_manager) {}
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate)
: SchedulerWorkerPool(std::move(task_tracker),
delayed_task_manager,
std::move(delegate)) {}
PlatformNativeWorkerPoolWin::~PlatformNativeWorkerPoolWin() {
#if DCHECK_IS_ON()
......@@ -59,6 +62,11 @@ void PlatformNativeWorkerPoolWin::JoinForTesting() {
#endif
}
void PlatformNativeWorkerPoolWin::ReEnqueueSequence(
scoped_refptr<Sequence> sequence) {
OnCanScheduleSequence(std::move(sequence));
}
// static
void CALLBACK PlatformNativeWorkerPoolWin::RunNextSequence(
PTP_CALLBACK_INSTANCE,
......
......@@ -30,7 +30,8 @@ namespace internal {
class BASE_EXPORT PlatformNativeWorkerPoolWin : public SchedulerWorkerPool {
public:
PlatformNativeWorkerPoolWin(TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager);
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate);
// Destroying a PlatformNativeWorkerPoolWin is not allowed in
// production; it is always leaked. In tests, it can only be destroyed after
......@@ -42,6 +43,7 @@ class BASE_EXPORT PlatformNativeWorkerPoolWin : public SchedulerWorkerPool {
// SchedulerWorkerPool:
void JoinForTesting() override;
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
private:
// Callback that gets run by |pool_|. It runs a task off the next sequence on
......
......@@ -169,9 +169,11 @@ bool SchedulerWorkerPool::PostTaskWithSequence(
SchedulerWorkerPool::SchedulerWorkerPool(
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager)
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate)
: task_tracker_(std::move(task_tracker)),
delayed_task_manager_(delayed_task_manager) {
delayed_task_manager_(delayed_task_manager),
delegate_(std::move(delegate)) {
DCHECK(task_tracker_);
DCHECK(delayed_task_manager_);
++g_active_pools_count;
......@@ -192,6 +194,10 @@ void SchedulerWorkerPool::UnbindFromCurrentThread() {
tls_current_worker_pool.Get().Set(nullptr);
}
bool SchedulerWorkerPool::IsBoundToCurrentThread() const {
return GetCurrentWorkerPool() == this;
}
void SchedulerWorkerPool::PostTaskWithSequenceNow(
Task task,
scoped_refptr<Sequence> sequence) {
......
......@@ -24,6 +24,17 @@ class TaskTracker;
// Interface for a worker pool.
class BASE_EXPORT SchedulerWorkerPool : public CanScheduleSequenceObserver {
public:
// Delegate interface for SchedulerWorkerPool.
class BASE_EXPORT Delegate {
public:
virtual ~Delegate() = default;
// Invoked when a |sequence| is non-empty after the SchedulerWorkerPool has
// run a task from it. The implementation must enqueue |sequence| in the
// appropriate priority queue, depending on |sequence| traits.
virtual void ReEnqueueSequence(scoped_refptr<Sequence> sequence) = 0;
};
~SchedulerWorkerPool() override;
// Returns a TaskRunner whose PostTask invocations result in scheduling tasks
......@@ -49,6 +60,9 @@ class BASE_EXPORT SchedulerWorkerPool : public CanScheduleSequenceObserver {
// Resets the worker pool in TLS.
void UnbindFromCurrentThread();
// Returns true if the worker pool is registered in TLS.
bool IsBoundToCurrentThread() const;
// Prevents new tasks from starting to run and waits for currently running
// tasks to complete their execution. It is guaranteed that no thread will do
// work on behalf of this SchedulerWorkerPool after this returns. It is
......@@ -57,9 +71,15 @@ class BASE_EXPORT SchedulerWorkerPool : public CanScheduleSequenceObserver {
// task during JoinForTesting(). This can only be called once.
virtual void JoinForTesting() = 0;
// Enqueues |sequence| in the worker pool's priority queue, then wakes up a
// worker if the worker pool is not bound to the current thread, i.e. if
// |sequence| is changing pools.
virtual void ReEnqueueSequence(scoped_refptr<Sequence> sequence) = 0;
protected:
SchedulerWorkerPool(TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager);
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate);
// Posts |task| to be executed by this SchedulerWorkerPool as part of
// |sequence|. This must only be called after |task| has gone through
......@@ -68,6 +88,7 @@ class BASE_EXPORT SchedulerWorkerPool : public CanScheduleSequenceObserver {
const TrackedRef<TaskTracker> task_tracker_;
DelayedTaskManager* const delayed_task_manager_;
const TrackedRef<Delegate> delegate_;
private:
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerPool);
......
......@@ -159,8 +159,11 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
StringPiece pool_label,
ThreadPriority priority_hint,
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager)
: SchedulerWorkerPool(std::move(task_tracker), delayed_task_manager),
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate)
: SchedulerWorkerPool(std::move(task_tracker),
delayed_task_manager,
std::move(delegate)),
pool_label_(pool_label.as_string()),
priority_hint_(priority_hint),
lock_(shared_priority_queue_.container_lock()),
......@@ -272,11 +275,16 @@ SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
void SchedulerWorkerPoolImpl::OnCanScheduleSequence(
scoped_refptr<Sequence> sequence) {
PushSequenceToPriorityQueue(std::move(sequence));
WakeUpOneWorker();
}
void SchedulerWorkerPoolImpl::PushSequenceToPriorityQueue(
scoped_refptr<Sequence> sequence) {
DCHECK(sequence);
const auto sequence_sort_key = sequence->GetSortKey();
shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
sequence_sort_key);
WakeUpOneWorker();
}
void SchedulerWorkerPoolImpl::GetHistograms(
......@@ -355,6 +363,13 @@ void SchedulerWorkerPoolImpl::JoinForTesting() {
workers_.clear();
}
void SchedulerWorkerPoolImpl::ReEnqueueSequence(
scoped_refptr<Sequence> sequence) {
PushSequenceToPriorityQueue(std::move(sequence));
if (!IsBoundToCurrentThread())
WakeUpOneWorker();
}
size_t SchedulerWorkerPoolImpl::NumberOfWorkersForTesting() const {
AutoSchedulerLock auto_lock(lock_);
return workers_.size();
......@@ -543,14 +558,7 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() {
void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::ReEnqueueSequence(
scoped_refptr<Sequence> sequence) {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
outer_->shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
sequence_sort_key);
// This worker will soon call GetWork(). Therefore, there is no need to wake
// up a worker to run the sequence that was just inserted into
// |outer_->shared_priority_queue_|.
outer_->delegate_->ReEnqueueSequence(std::move(sequence));
}
TimeDelta
......
......@@ -73,7 +73,8 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
StringPiece pool_label,
ThreadPriority priority_hint,
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager);
DelayedTaskManager* delayed_task_manager,
TrackedRef<Delegate> delegate);
// Creates workers following the |params| specification, allowing existing and
// future tasks to run. The pool will run at most |max_best_effort_tasks|
......@@ -97,6 +98,7 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// SchedulerWorkerPool:
void JoinForTesting() override;
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
const HistogramBase* num_tasks_before_detach_histogram() const {
return num_tasks_before_detach_histogram_;
......@@ -165,6 +167,9 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// SchedulerWorkerPool:
void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override;
// Pushes |sequence| to |shared_priority_queue_|.
void PushSequenceToPriorityQueue(scoped_refptr<Sequence> sequence);
// Waits until at least |n| workers are idle. |lock_| must be held to call
// this function.
void WaitForWorkersIdleLockRequiredForTesting(size_t n);
......
......@@ -75,10 +75,12 @@ void WaitWithoutBlockingObserver(WaitableEvent* event) {
event->Wait();
}
class TaskSchedulerWorkerPoolImplTestBase {
class TaskSchedulerWorkerPoolImplTestBase
: public SchedulerWorkerPool::Delegate {
protected:
TaskSchedulerWorkerPoolImplTestBase()
: service_thread_("TaskSchedulerServiceThread"){};
: service_thread_("TaskSchedulerServiceThread"),
tracked_ref_factory_(this){};
void CommonSetUp(TimeDelta suggested_reclaim_time = TimeDelta::Max()) {
CreateAndStartWorkerPool(suggested_reclaim_time, kMaxTasks);
......@@ -89,6 +91,7 @@ class TaskSchedulerWorkerPoolImplTestBase {
task_tracker_.FlushForTesting();
if (worker_pool_)
worker_pool_->JoinForTesting();
worker_pool_.reset();
}
void CreateWorkerPool() {
......@@ -97,7 +100,8 @@ class TaskSchedulerWorkerPoolImplTestBase {
delayed_task_manager_.Start(service_thread_.task_runner());
worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>(
"TestWorkerPool", "A", ThreadPriority::NORMAL,
task_tracker_.GetTrackedRef(), &delayed_task_manager_);
task_tracker_.GetTrackedRef(), &delayed_task_manager_,
tracked_ref_factory_.GetTrackedRef());
ASSERT_TRUE(worker_pool_);
}
......@@ -118,11 +122,15 @@ class TaskSchedulerWorkerPoolImplTestBase {
Thread service_thread_;
TaskTracker task_tracker_ = {"Test"};
std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_;
DelayedTaskManager delayed_task_manager_;
TrackedRefFactory<SchedulerWorkerPool::Delegate> tracked_ref_factory_;
private:
DelayedTaskManager delayed_task_manager_;
// SchedulerWorkerPool::Delegate:
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
worker_pool_->ReEnqueueSequence(std::move(sequence));
}
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestBase);
};
......@@ -1314,28 +1322,43 @@ TEST_F(TaskSchedulerWorkerPoolBlockingTest,
EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
}
// Verify that workers that become idle due to the pool being over capacity will
// eventually cleanup.
TEST(TaskSchedulerWorkerPoolOverCapacityTest, VerifyCleanup) {
constexpr size_t kLocalMaxTasks = 3;
TaskTracker task_tracker("Test");
DelayedTaskManager delayed_task_manager;
scoped_refptr<TaskRunner> service_thread_task_runner =
MakeRefCounted<TestSimpleTaskRunner>();
delayed_task_manager.Start(service_thread_task_runner);
SchedulerWorkerPoolImpl worker_pool(
"OverCapacityTestWorkerPool", "A", ThreadPriority::NORMAL,
task_tracker.GetTrackedRef(), &delayed_task_manager);
worker_pool.Start(
SchedulerWorkerPoolParams(kLocalMaxTasks, kReclaimTimeForCleanupTests),
kLocalMaxTasks, service_thread_task_runner, nullptr,
SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
class TaskSchedulerWorkerPoolOverCapacityTest
: public TaskSchedulerWorkerPoolImplTestBase,
public testing::Test {
public:
TaskSchedulerWorkerPoolOverCapacityTest() = default;
scoped_refptr<TaskRunner> task_runner =
worker_pool.CreateTaskRunnerWithTraits(
{MayBlock(), WithBaseSyncPrimitives()});
void SetUp() override {
CreateAndStartWorkerPool(kReclaimTimeForCleanupTests, kLocalMaxTasks);
task_runner_ = worker_pool_->CreateTaskRunnerWithTraits(
{MayBlock(), WithBaseSyncPrimitives()});
}
void TearDown() override {
TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
}
protected:
scoped_refptr<TaskRunner> task_runner_;
static constexpr size_t kLocalMaxTasks = 3;
void CreateWorkerPool() {
ASSERT_FALSE(worker_pool_);
service_thread_.Start();
delayed_task_manager_.Start(service_thread_.task_runner());
worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>(
"OverCapacityTestWorkerPool", "A", ThreadPriority::NORMAL,
task_tracker_.GetTrackedRef(), &delayed_task_manager_,
tracked_ref_factory_.GetTrackedRef());
ASSERT_TRUE(worker_pool_);
}
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolOverCapacityTest);
};
// Verify that workers that become idle due to the pool being over capacity will
// eventually cleanup.
TEST_F(TaskSchedulerWorkerPoolOverCapacityTest, VerifyCleanup) {
WaitableEvent threads_running;
WaitableEvent threads_continue;
RepeatingClosure threads_running_barrier = BarrierClosure(
......@@ -1357,7 +1380,7 @@ TEST(TaskSchedulerWorkerPoolOverCapacityTest, VerifyCleanup) {
Unretained(&blocked_call_continue));
for (size_t i = 0; i < kLocalMaxTasks; ++i)
task_runner->PostTask(FROM_HERE, closure);
task_runner_->PostTask(FROM_HERE, closure);
threads_running.Wait();
......@@ -1369,7 +1392,7 @@ TEST(TaskSchedulerWorkerPoolOverCapacityTest, VerifyCleanup) {
BindOnce(&WaitableEvent::Signal, Unretained(&extra_threads_running)));
// These tasks should run on the new threads from increasing max tasks.
for (size_t i = 0; i < kLocalMaxTasks; ++i) {
task_runner->PostTask(
task_runner_->PostTask(
FROM_HERE, BindOnce(
[](Closure* extra_threads_running_barrier,
WaitableEvent* extra_threads_continue) {
......@@ -1381,26 +1404,25 @@ TEST(TaskSchedulerWorkerPoolOverCapacityTest, VerifyCleanup) {
}
extra_threads_running.Wait();
ASSERT_EQ(kLocalMaxTasks * 2, worker_pool.NumberOfWorkersForTesting());
EXPECT_EQ(kLocalMaxTasks * 2, worker_pool.GetMaxTasksForTesting());
ASSERT_EQ(kLocalMaxTasks * 2, worker_pool_->NumberOfWorkersForTesting());
EXPECT_EQ(kLocalMaxTasks * 2, worker_pool_->GetMaxTasksForTesting());
blocked_call_continue.Signal();
extra_threads_continue.Signal();
// Periodically post tasks to ensure that posting tasks does not prevent
// workers that are idle due to the pool being over capacity from cleaning up.
for (int i = 0; i < 16; ++i) {
task_runner->PostDelayedTask(FROM_HERE, DoNothing(),
kReclaimTimeForCleanupTests * i * 0.5);
task_runner_->PostDelayedTask(FROM_HERE, DoNothing(),
kReclaimTimeForCleanupTests * i * 0.5);
}
// Note: one worker above capacity will not get cleaned up since it's on the
// top of the idle stack.
worker_pool.WaitForWorkersCleanedUpForTesting(kLocalMaxTasks - 1);
EXPECT_EQ(kLocalMaxTasks + 1, worker_pool.NumberOfWorkersForTesting());
worker_pool_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks - 1);
EXPECT_EQ(kLocalMaxTasks + 1, worker_pool_->NumberOfWorkersForTesting());
threads_continue.Signal();
worker_pool.JoinForTesting();
task_tracker_.FlushForTesting();
}
// Verify that the maximum number of workers is 256 and that hitting the max
......
......@@ -91,10 +91,12 @@ class ThreadPostingTasks : public SimpleThread {
};
class TaskSchedulerWorkerPoolTest
: public testing::TestWithParam<PoolExecutionType> {
: public testing::TestWithParam<PoolExecutionType>,
public SchedulerWorkerPool::Delegate {
protected:
TaskSchedulerWorkerPoolTest()
: service_thread_("TaskSchedulerServiceThread") {}
: service_thread_("TaskSchedulerServiceThread"),
tracked_ref_factory_(this) {}
void SetUp() override {
service_thread_.Start();
......@@ -106,6 +108,7 @@ class TaskSchedulerWorkerPoolTest
service_thread_.Stop();
if (worker_pool_)
worker_pool_->JoinForTesting();
worker_pool_.reset();
}
void CreateWorkerPool() {
......@@ -114,12 +117,14 @@ class TaskSchedulerWorkerPoolTest
case PoolType::GENERIC:
worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>(
"TestWorkerPool", "A", ThreadPriority::NORMAL,
task_tracker_.GetTrackedRef(), &delayed_task_manager_);
task_tracker_.GetTrackedRef(), &delayed_task_manager_,
tracked_ref_factory_.GetTrackedRef());
break;
#if defined(OS_WIN)
case PoolType::WINDOWS:
worker_pool_ = std::make_unique<PlatformNativeWorkerPoolWin>(
task_tracker_.GetTrackedRef(), &delayed_task_manager_);
task_tracker_.GetTrackedRef(), &delayed_task_manager_,
tracked_ref_factory_.GetTrackedRef());
break;
#endif
}
......@@ -156,6 +161,13 @@ class TaskSchedulerWorkerPoolTest
std::unique_ptr<SchedulerWorkerPool> worker_pool_;
private:
// SchedulerWorkerPool::Delegate:
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
worker_pool_->ReEnqueueSequence(std::move(sequence));
}
TrackedRefFactory<SchedulerWorkerPool::Delegate> tracked_ref_factory_;
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolTest);
};
......
......@@ -63,7 +63,8 @@ TaskSchedulerImpl::TaskSchedulerImpl(
BindRepeating(&TaskSchedulerImpl::ReportHeartbeatMetrics,
Unretained(this)))),
single_thread_task_runner_manager_(task_tracker_->GetTrackedRef(),
&delayed_task_manager_) {
&delayed_task_manager_),
tracked_ref_factory_(this) {
DCHECK(!histogram_label.empty());
static_assert(arraysize(environment_to_worker_pool_) == ENVIRONMENT_COUNT,
......@@ -84,7 +85,8 @@ TaskSchedulerImpl::TaskSchedulerImpl(
"."),
kEnvironmentParams[environment_type].name_suffix,
kEnvironmentParams[environment_type].priority_hint,
task_tracker_->GetTrackedRef(), &delayed_task_manager_));
task_tracker_->GetTrackedRef(), &delayed_task_manager_,
tracked_ref_factory_.GetTrackedRef()));
}
// Map environment indexes to pools. |kMergeBlockingNonBlockingPools| is
......@@ -107,6 +109,9 @@ TaskSchedulerImpl::~TaskSchedulerImpl() {
#if DCHECK_IS_ON()
DCHECK(join_for_testing_returned_.IsSet());
#endif
// Clear |worker_pools_| to release held TrackedRefs, which block teardown.
worker_pools_.clear();
}
void TaskSchedulerImpl::Start(
......@@ -312,6 +317,13 @@ void TaskSchedulerImpl::SetExecutionFenceEnabled(bool execution_fence_enabled) {
task_tracker_->SetExecutionFenceEnabled(execution_fence_enabled);
}
void TaskSchedulerImpl::ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
DCHECK(sequence);
const TaskTraits new_traits =
SetUserBlockingPriorityIfNeeded(sequence->traits());
GetWorkerPoolForTraits(new_traits)->ReEnqueueSequence(std::move(sequence));
}
SchedulerWorkerPoolImpl* TaskSchedulerImpl::GetWorkerPoolForTraits(
const TaskTraits& traits) const {
return environment_to_worker_pool_[GetEnvironmentIndexForTraits(traits)];
......
......@@ -45,7 +45,8 @@ namespace internal {
extern const BASE_EXPORT base::Feature kMergeBlockingNonBlockingPools;
// Default TaskScheduler implementation. This class is thread-safe.
class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler {
class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler,
public SchedulerWorkerPool::Delegate {
public:
using TaskTrackerImpl =
#if defined(OS_POSIX) && !defined(OS_NACL_SFI)
......@@ -105,6 +106,9 @@ class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler {
void ReportHeartbeatMetrics() const;
// SchedulerWorkerPool::Delegate:
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
const std::unique_ptr<TaskTrackerImpl> task_tracker_;
std::unique_ptr<Thread> service_thread_;
DelayedTaskManager delayed_task_manager_;
......@@ -135,6 +139,8 @@ class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler {
base::win::ComInitCheckHook com_init_check_hook_;
#endif
TrackedRefFactory<SchedulerWorkerPool::Delegate> tracked_ref_factory_;
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerImpl);
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment