Commit 8a402921 authored by robliao's avatar robliao Committed by Commit bot

Add Lazy Creation and Thread Detachment Support in the Scheduler Worker Pool

BUG=553459

Review-Url: https://codereview.chromium.org/2116163002
Cr-Commit-Position: refs/heads/master@{#407205}
parent c23c19cc
......@@ -59,7 +59,8 @@ class TaskSchedulerServiceThreadTest : public testing::Test {
ThreadPriority::BACKGROUND,
SchedulerWorkerPoolParams::IORestriction::
DISALLOWED,
1u),
1u,
TimeDelta::Max()),
Bind(&ReEnqueueSequenceCallback), &task_tracker_,
&delayed_task_manager_);
ASSERT_TRUE(scheduler_worker_pool_);
......
......@@ -31,6 +31,9 @@
#include "base/threading/platform_thread.h"
namespace base {
class TimeDelta;
namespace internal {
class DelayedTaskManager;
......@@ -67,6 +70,14 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// allowed to complete their execution. This can only be called once.
void JoinForTesting();
// Disallows worker thread detachment. If the suggested reclaim time is not
// TimeDelta::Max(), then the test should call this before the detach code can
// run. The safest place to do this is before the a set of work is dispatched
// (the worker pool is idle and steady state) or before the last
// synchronization point for all workers (all threads are busy and can't be
// reclaimed).
void DisallowWorkerDetachmentForTesting();
// SchedulerWorkerPool:
scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits(
const TaskTraits& traits,
......@@ -81,11 +92,13 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
SchedulerWorker* worker) override;
private:
class SchedulerSingleThreadTaskRunner;
class SchedulerWorkerDelegateImpl;
SchedulerWorkerPoolImpl(StringPiece name,
SchedulerWorkerPoolParams::IORestriction
io_restriction,
const TimeDelta& suggested_reclaim_time,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager);
......@@ -100,9 +113,15 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// Adds |worker| to |idle_workers_stack_|.
void AddToIdleWorkersStack(SchedulerWorker* worker);
// Peeks from |idle_workers_stack_|.
const SchedulerWorker* PeekAtIdleWorkersStack() const;
// Removes |worker| from |idle_workers_stack_|.
void RemoveFromIdleWorkersStack(SchedulerWorker* worker);
// Returns true if worker thread detachment is permitted.
bool CanWorkerDetachForTesting();
// The name of this worker pool, used to label its worker threads.
const std::string name_;
......@@ -123,12 +142,15 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// Indicates whether Tasks on this worker pool are allowed to make I/O calls.
const SchedulerWorkerPoolParams::IORestriction io_restriction_;
// Suggested reclaim time for workers.
const TimeDelta suggested_reclaim_time_;
// Synchronizes access to |idle_workers_stack_| and
// |idle_workers_stack_cv_for_testing_|. Has |shared_priority_queue_|'s
// lock as its predecessor so that a worker can be pushed to
// |idle_workers_stack_| within the scope of a Transaction (more
// details in GetWork()).
SchedulerLock idle_workers_stack_lock_;
mutable SchedulerLock idle_workers_stack_lock_;
// Stack of idle workers.
SchedulerWorkerStack idle_workers_stack_;
......@@ -139,6 +161,13 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// Signaled once JoinForTesting() has returned.
WaitableEvent join_for_testing_returned_;
// Synchronizes access to |worker_detachment_allowed_|.
SchedulerLock worker_detachment_allowed_lock_;
// Indicates to the delegates that workers are permitted to detach their
// threads.
bool worker_detachment_allowed_ = true;
#if DCHECK_IS_ON()
// Signaled when all workers have been created.
WaitableEvent workers_created_;
......
......@@ -10,6 +10,7 @@
#include <unordered_set>
#include <vector>
#include "base/atomicops.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/callback.h"
......@@ -29,7 +30,10 @@
#include "base/task_scheduler/test_utils.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread_checker_impl.h"
#include "base/threading/thread_local_storage.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
......@@ -39,6 +43,8 @@ namespace {
const size_t kNumWorkersInWorkerPool = 4;
const size_t kNumThreadsPostingTasks = 4;
const size_t kNumTasksPostedPerThread = 150;
constexpr TimeDelta kReclaimTimeForDetachTests =
TimeDelta::FromMilliseconds(10);
using IORestriction = SchedulerWorkerPoolParams::IORestriction;
......@@ -63,22 +69,27 @@ class TaskSchedulerWorkerPoolImplTest
TaskSchedulerWorkerPoolImplTest() = default;
void SetUp() override {
InitializeWorkerPool(TimeDelta::Max());
}
void TearDown() override {
worker_pool_->WaitForAllWorkersIdleForTesting();
worker_pool_->JoinForTesting();
}
void InitializeWorkerPool(const TimeDelta& suggested_reclaim_time) {
worker_pool_ = SchedulerWorkerPoolImpl::Create(
SchedulerWorkerPoolParams("TestWorkerPoolWithFileIO",
ThreadPriority::NORMAL,
IORestriction::ALLOWED,
kNumWorkersInWorkerPool),
kNumWorkersInWorkerPool,
suggested_reclaim_time),
Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback,
Unretained(this)),
&task_tracker_, &delayed_task_manager_);
ASSERT_TRUE(worker_pool_);
}
void TearDown() override {
worker_pool_->WaitForAllWorkersIdleForTesting();
worker_pool_->JoinForTesting();
}
std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_;
TaskTracker task_tracker_;
......@@ -367,7 +378,8 @@ TEST_P(TaskSchedulerWorkerPoolImplIORestrictionTest, IORestriction) {
auto worker_pool = SchedulerWorkerPoolImpl::Create(
SchedulerWorkerPoolParams("TestWorkerPoolWithParam",
ThreadPriority::NORMAL, GetParam(), 1U),
ThreadPriority::NORMAL, GetParam(), 1U,
TimeDelta::Max()),
Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker,
&delayed_task_manager);
ASSERT_TRUE(worker_pool);
......@@ -388,5 +400,161 @@ INSTANTIATE_TEST_CASE_P(IODisallowed,
TaskSchedulerWorkerPoolImplIORestrictionTest,
::testing::Values(IORestriction::DISALLOWED));
namespace {
class TaskSchedulerWorkerPoolSingleThreadedTest
: public TaskSchedulerWorkerPoolImplTest {
public:
void InitializeThreadChecker() {
thread_checker_.reset(new ThreadCheckerImpl());
}
void CheckValidThread() {
EXPECT_TRUE(thread_checker_->CalledOnValidThread());
}
protected:
void SetUp() override {
InitializeWorkerPool(kReclaimTimeForDetachTests);
}
TaskSchedulerWorkerPoolSingleThreadedTest() = default;
private:
std::unique_ptr<ThreadCheckerImpl> thread_checker_;
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolSingleThreadedTest);
};
} // namespace
// Verify that thread resources for a single thread remain.
TEST_F(TaskSchedulerWorkerPoolSingleThreadedTest, SingleThreadTask) {
auto single_thread_task_runner =
worker_pool_->CreateTaskRunnerWithTraits(
TaskTraits().
WithShutdownBehavior(TaskShutdownBehavior::BLOCK_SHUTDOWN),
ExecutionMode::SINGLE_THREADED);
single_thread_task_runner->PostTask(
FROM_HERE,
Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::InitializeThreadChecker,
Unretained(this)));
WaitableEvent task_waiter(WaitableEvent::ResetPolicy::AUTOMATIC,
WaitableEvent::InitialState::NOT_SIGNALED);
single_thread_task_runner->PostTask(
FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter)));
task_waiter.Wait();
worker_pool_->WaitForAllWorkersIdleForTesting();
// Give the worker pool a chance to reclaim its threads.
PlatformThread::Sleep(
kReclaimTimeForDetachTests + TimeDelta::FromMilliseconds(200));
worker_pool_->DisallowWorkerDetachmentForTesting();
single_thread_task_runner->PostTask(
FROM_HERE,
Bind(&TaskSchedulerWorkerPoolSingleThreadedTest::CheckValidThread,
Unretained(this)));
single_thread_task_runner->PostTask(
FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&task_waiter)));
task_waiter.Wait();
}
namespace {
constexpr size_t kMagicTlsValue = 42;
class TaskSchedulerWorkerPoolCheckTlsReuse
: public TaskSchedulerWorkerPoolImplTest {
public:
void SetTlsValueAndWait() {
slot_.Set(reinterpret_cast<void*>(kMagicTlsValue));
waiter_.Wait();
}
void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) {
if (!slot_.Get())
subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1);
count_waiter->Signal();
waiter_.Wait();
}
protected:
TaskSchedulerWorkerPoolCheckTlsReuse() :
waiter_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED) {}
void SetUp() override {
InitializeWorkerPool(kReclaimTimeForDetachTests);
}
subtle::Atomic32 zero_tls_values_ = 0;
WaitableEvent waiter_;
private:
ThreadLocalStorage::Slot slot_;
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse);
};
} // namespace
// Checks that at least one thread has detached by checking the TLS.
TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckDetachedThreads) {
// Saturate the threads and mark each thread with a magic TLS value.
std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
factories.push_back(WrapUnique(new test::TestTaskFactory(
worker_pool_->CreateTaskRunnerWithTraits(
TaskTraits(), ExecutionMode::PARALLEL),
ExecutionMode::PARALLEL)));
ASSERT_TRUE(factories.back()->PostTask(
PostNestedTask::NO,
Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait,
Unretained(this))));
factories.back()->WaitForAllTasksToRun();
}
// Release tasks waiting on |waiter_|.
waiter_.Signal();
worker_pool_->WaitForAllWorkersIdleForTesting();
// All threads should be done running by now, so reset for the next phase.
waiter_.Reset();
// Give the worker pool a chance to detach its threads.
PlatformThread::Sleep(
kReclaimTimeForDetachTests + TimeDelta::FromMilliseconds(200));
worker_pool_->DisallowWorkerDetachmentForTesting();
// Saturate and count the threads that do not have the magic TLS value. If the
// value is not there, that means we're at a new thread.
std::vector<std::unique_ptr<WaitableEvent>> count_waiters;
for (auto& factory : factories) {
count_waiters.push_back(WrapUnique(new WaitableEvent(
WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED)));
ASSERT_TRUE(factory->PostTask(
PostNestedTask::NO,
Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait,
Unretained(this),
count_waiters.back().get())));
factory->WaitForAllTasksToRun();
}
// Wait for all counters to complete.
for (auto& count_waiter : count_waiters)
count_waiter->Wait();
EXPECT_GT(subtle::NoBarrier_Load(&zero_tls_values_), 0);
// Release tasks waiting on |waiter_|.
waiter_.Signal();
}
} // namespace internal
} // namespace base
......@@ -4,6 +4,8 @@
#include "base/task_scheduler/scheduler_worker_pool_params.h"
#include "base/time/time.h"
namespace base {
namespace internal {
......@@ -11,11 +13,13 @@ SchedulerWorkerPoolParams::SchedulerWorkerPoolParams(
const std::string& name,
ThreadPriority thread_priority,
IORestriction io_restriction,
int max_threads)
int max_threads,
const TimeDelta& suggested_reclaim_time)
: name_(name),
thread_priority_(thread_priority),
io_restriction_(io_restriction),
max_threads_(max_threads) {}
max_threads_(max_threads),
suggested_reclaim_time_(suggested_reclaim_time) {}
SchedulerWorkerPoolParams::SchedulerWorkerPoolParams(
SchedulerWorkerPoolParams&& other) = default;
......
......@@ -11,6 +11,9 @@
#include "base/threading/platform_thread.h"
namespace base {
class TimeDelta;
namespace internal {
class BASE_EXPORT SchedulerWorkerPoolParams final {
......@@ -24,12 +27,15 @@ class BASE_EXPORT SchedulerWorkerPoolParams final {
// scheduler worker pool to use the label |name| and create up to
// |max_threads| threads of priority |thread_priority|. |io_restriction|
// indicates whether Tasks on the scheduler worker pool are allowed to make
// I/O calls.
// I/O calls. |suggested_reclaim_time| sets a suggestion on when to reclaim
// idle threads. The worker pool is free to ignore this value for performance
// or correctness reasons.
SchedulerWorkerPoolParams(
const std::string& name,
ThreadPriority thread_priority,
IORestriction io_restriction,
int max_threads);
int max_threads,
const TimeDelta& suggested_reclaim_time);
SchedulerWorkerPoolParams(SchedulerWorkerPoolParams&& other);
SchedulerWorkerPoolParams& operator=(SchedulerWorkerPoolParams&& other);
......@@ -45,11 +51,17 @@ class BASE_EXPORT SchedulerWorkerPoolParams final {
// Maximum number of threads in the pool.
size_t max_threads() const { return max_threads_; }
// Suggested reclaim time for threads in the worker pool.
const TimeDelta& suggested_reclaim_time() const {
return suggested_reclaim_time_;
}
private:
std::string name_;
ThreadPriority thread_priority_;
IORestriction io_restriction_;
size_t max_threads_;
TimeDelta suggested_reclaim_time_;
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerPoolParams);
};
......
......@@ -16,8 +16,7 @@ SchedulerWorkerStack::SchedulerWorkerStack() = default;
SchedulerWorkerStack::~SchedulerWorkerStack() = default;
void SchedulerWorkerStack::Push(SchedulerWorker* worker) {
DCHECK(std::find(stack_.begin(), stack_.end(), worker) == stack_.end())
<< "SchedulerWorker already on stack";
DCHECK(!Contains(worker)) << "SchedulerWorker already on stack";
stack_.push_back(worker);
}
......@@ -29,6 +28,16 @@ SchedulerWorker* SchedulerWorkerStack::Pop() {
return worker;
}
SchedulerWorker* SchedulerWorkerStack::Peek() const {
if (IsEmpty())
return nullptr;
return stack_.back();
}
bool SchedulerWorkerStack::Contains(const SchedulerWorker* worker) const {
return std::find(stack_.begin(), stack_.end(), worker) != stack_.end();
}
void SchedulerWorkerStack::Remove(const SchedulerWorker* worker) {
auto it = std::find(stack_.begin(), stack_.end(), worker);
if (it != stack_.end())
......
......@@ -17,11 +17,11 @@ namespace internal {
class SchedulerWorker;
// A stack of SchedulerWorkers. Supports removal of arbitrary
// SchedulerWorkers. DCHECKs when a SchedulerWorker is inserted
// multiple times. SchedulerWorkers are not owned by the stack. Push() is
// amortized O(1). Pop(), Size() and Empty() are O(1). Remove is O(n). This
// class is NOT thread-safe.
// A stack of SchedulerWorkers. Supports removal of arbitrary SchedulerWorkers.
// DCHECKs when a SchedulerWorker is inserted multiple times. SchedulerWorkers
// are not owned by the stack. Push() is amortized O(1). Pop(), Peek(), Size()
// and Empty() are O(1). Contains() and Remove() are O(n).
// This class is NOT thread-safe.
class BASE_EXPORT SchedulerWorkerStack {
public:
SchedulerWorkerStack();
......@@ -35,6 +35,12 @@ class BASE_EXPORT SchedulerWorkerStack {
// Returns nullptr if the stack is empty.
SchedulerWorker* Pop();
// Returns the top SchedulerWorker from the stack, nullptr if empty.
SchedulerWorker* Peek() const;
// Returns true if |worker| is already on the stack.
bool Contains(const SchedulerWorker* worker) const;
// Removes |worker| from the stack.
void Remove(const SchedulerWorker* worker);
......
......@@ -76,6 +76,8 @@ class TaskSchedulerWorkerStackTest : public testing::Test {
// Verify that Push() and Pop() add/remove values in FIFO order.
TEST_F(TaskSchedulerWorkerStackTest, PushPop) {
SchedulerWorkerStack stack;
EXPECT_EQ(nullptr, stack.Pop());
EXPECT_TRUE(stack.IsEmpty());
EXPECT_EQ(0U, stack.Size());
......@@ -110,6 +112,86 @@ TEST_F(TaskSchedulerWorkerStackTest, PushPop) {
EXPECT_EQ(worker_a_.get(), stack.Pop());
EXPECT_TRUE(stack.IsEmpty());
EXPECT_EQ(0U, stack.Size());
EXPECT_EQ(nullptr, stack.Pop());
}
// Verify that Peek() returns the correct values in FIFO order.
TEST_F(TaskSchedulerWorkerStackTest, PeekPop) {
SchedulerWorkerStack stack;
EXPECT_EQ(nullptr, stack.Peek());
EXPECT_TRUE(stack.IsEmpty());
EXPECT_EQ(0U, stack.Size());
stack.Push(worker_a_.get());
EXPECT_EQ(worker_a_.get(), stack.Peek());
EXPECT_FALSE(stack.IsEmpty());
EXPECT_EQ(1U, stack.Size());
stack.Push(worker_b_.get());
EXPECT_EQ(worker_b_.get(), stack.Peek());
EXPECT_FALSE(stack.IsEmpty());
EXPECT_EQ(2U, stack.Size());
stack.Push(worker_c_.get());
EXPECT_EQ(worker_c_.get(), stack.Peek());
EXPECT_FALSE(stack.IsEmpty());
EXPECT_EQ(3U, stack.Size());
EXPECT_EQ(worker_c_.get(), stack.Pop());
EXPECT_EQ(worker_b_.get(), stack.Peek());
EXPECT_FALSE(stack.IsEmpty());
EXPECT_EQ(2U, stack.Size());
EXPECT_EQ(worker_b_.get(), stack.Pop());
EXPECT_EQ(worker_a_.get(), stack.Peek());
EXPECT_FALSE(stack.IsEmpty());
EXPECT_EQ(1U, stack.Size());
EXPECT_EQ(worker_a_.get(), stack.Pop());
EXPECT_TRUE(stack.IsEmpty());
EXPECT_EQ(0U, stack.Size());
EXPECT_EQ(nullptr, stack.Peek());
}
// Verify that Contains() returns true for workers on the stack.
TEST_F(TaskSchedulerWorkerStackTest, Contains) {
SchedulerWorkerStack stack;
EXPECT_FALSE(stack.Contains(worker_a_.get()));
EXPECT_FALSE(stack.Contains(worker_b_.get()));
EXPECT_FALSE(stack.Contains(worker_c_.get()));
stack.Push(worker_a_.get());
EXPECT_TRUE(stack.Contains(worker_a_.get()));
EXPECT_FALSE(stack.Contains(worker_b_.get()));
EXPECT_FALSE(stack.Contains(worker_c_.get()));
stack.Push(worker_b_.get());
EXPECT_TRUE(stack.Contains(worker_a_.get()));
EXPECT_TRUE(stack.Contains(worker_b_.get()));
EXPECT_FALSE(stack.Contains(worker_c_.get()));
stack.Push(worker_c_.get());
EXPECT_TRUE(stack.Contains(worker_a_.get()));
EXPECT_TRUE(stack.Contains(worker_b_.get()));
EXPECT_TRUE(stack.Contains(worker_c_.get()));
stack.Pop();
EXPECT_TRUE(stack.Contains(worker_a_.get()));
EXPECT_TRUE(stack.Contains(worker_b_.get()));
EXPECT_FALSE(stack.Contains(worker_c_.get()));
stack.Pop();
EXPECT_TRUE(stack.Contains(worker_a_.get()));
EXPECT_FALSE(stack.Contains(worker_b_.get()));
EXPECT_FALSE(stack.Contains(worker_c_.get()));
stack.Pop();
EXPECT_FALSE(stack.Contains(worker_a_.get()));
EXPECT_FALSE(stack.Contains(worker_b_.get()));
EXPECT_FALSE(stack.Contains(worker_c_.get()));
}
// Verify that a value can be removed by Remove().
......
......@@ -23,6 +23,7 @@
#include "base/threading/simple_thread.h"
#include "base/threading/thread.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
......@@ -166,22 +167,22 @@ class TaskSchedulerImplTest
ASSERT_EQ(BACKGROUND_WORKER_POOL, params_vector.size());
params_vector.emplace_back("TaskSchedulerBackground",
ThreadPriority::BACKGROUND,
IORestriction::DISALLOWED, 1U);
IORestriction::DISALLOWED, 1U, TimeDelta::Max());
ASSERT_EQ(BACKGROUND_FILE_IO_WORKER_POOL, params_vector.size());
params_vector.emplace_back("TaskSchedulerBackgroundFileIO",
ThreadPriority::BACKGROUND,
IORestriction::ALLOWED, 3U);
IORestriction::ALLOWED, 3U, TimeDelta::Max());
ASSERT_EQ(FOREGROUND_WORKER_POOL, params_vector.size());
params_vector.emplace_back("TaskSchedulerForeground",
ThreadPriority::NORMAL,
IORestriction::DISALLOWED, 4U);
IORestriction::DISALLOWED, 4U, TimeDelta::Max());
ASSERT_EQ(FOREGROUND_FILE_IO_WORKER_POOL, params_vector.size());
params_vector.emplace_back("TaskSchedulerForegroundFileIO",
ThreadPriority::NORMAL, IORestriction::ALLOWED,
12U);
12U, TimeDelta::Max());
scheduler_ = TaskSchedulerImpl::Create(params_vector,
Bind(&GetThreadPoolIndexForTraits));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment