Commit 2e90c157 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[ThreadPool]: Best effort max tasks is increased only after a delay.

Best effort max tasks always acts like MayBlock.
This is to prevent issue where many WILL_BLOCK BEST_EFFORT tasks increase the
thread pool capacity, which is undesirable.

This new behavior is similar to FixedMaxBestEffortTasks, but keeps the increase
around to prevent deadlocks.
https://uma.googleplex.com/p/chrome/variations/?sid=4a990d53992445c75cf9665227ab221c

Bug: 1026785
Change-Id: I4be2f478c730cc75f8d358665edec72ad651902f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2300343
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Reviewed-by: default avatarRobert Kaplow <rkaplow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#796545}
parent 583e944f
......@@ -17,9 +17,6 @@ const Feature kNoDetachBelowInitialCapacity = {
const Feature kMayBlockWithoutDelay = {"MayBlockWithoutDelay",
base::FEATURE_DISABLED_BY_DEFAULT};
const Feature kFixedMaxBestEffortTasks = {"FixedMaxBestEffortTasks",
base::FEATURE_DISABLED_BY_DEFAULT};
#if defined(OS_WIN) || defined(OS_APPLE)
const Feature kUseNativeThreadPool = {"UseNativeThreadPool",
base::FEATURE_DISABLED_BY_DEFAULT};
......
......@@ -23,12 +23,6 @@ extern const BASE_EXPORT Feature kNoDetachBelowInitialCapacity;
// instead of waiting for a threshold in the foreground thread group.
extern const BASE_EXPORT Feature kMayBlockWithoutDelay;
// Under this feature, best effort capacity is never increased.
// While it's unlikely we'd ship this as-is, this experiment allows us to
// determine whether blocked worker replacement logic on best-effort tasks has
// any impact on guardian metrics.
extern const BASE_EXPORT Feature kFixedMaxBestEffortTasks;
#if defined(OS_WIN) || defined(OS_APPLE)
#define HAS_NATIVE_THREAD_POOL() 1
#else
......
......@@ -239,19 +239,15 @@ class ThreadGroupImpl::WorkerThreadDelegateImpl : public WorkerThread::Delegate,
void BlockingTypeUpgraded() override;
void BlockingEnded() override;
void MayBlockEntered();
void WillBlockEntered();
// Returns true iff the worker can get work. Cleans up the worker or puts it
// on the idle stack if it can't get work.
bool CanGetWorkLockRequired(ScopedCommandsExecutor* executor,
WorkerThread* worker)
EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
// Returns true iff this worker has been within a MAY_BLOCK ScopedBlockingCall
// for more than |may_block_threshold|. The max tasks must be
// incremented if this returns true.
bool MustIncrementMaxTasksLockRequired()
// Increments max [best effort] tasks iff this worker has been within a
// ScopedBlockingCall for more than |may_block_threshold|.
void MaybeIncrementMaxTasksLockRequired()
EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
TaskPriority current_task_priority_lock_required() const
......@@ -306,7 +302,7 @@ class ThreadGroupImpl::WorkerThreadDelegateImpl : public WorkerThread::Delegate,
// Time when MayBlockScopeEntered() was last called. Reset when
// BlockingScopeExited() is called.
TimeTicks may_block_start_time;
TimeTicks blocking_start_time;
} write_worker_read_any_;
WorkerOnly& worker_only() {
......@@ -331,9 +327,11 @@ class ThreadGroupImpl::WorkerThreadDelegateImpl : public WorkerThread::Delegate,
const TrackedRef<ThreadGroupImpl> outer_;
// Whether |outer_->max_tasks_| was incremented due to a ScopedBlockingCall on
// the thread.
// Whether |outer_->max_tasks_|/|outer_->max_best_effort_tasks_| was
// incremented due to a ScopedBlockingCall on the thread.
bool incremented_max_tasks_since_blocked_ GUARDED_BY(outer_->lock_) = false;
bool incremented_max_best_effort_tasks_since_blocked_
GUARDED_BY(outer_->lock_) = false;
// Verifies that specific calls are always made from the worker thread.
THREAD_CHECKER(worker_thread_checker_);
......@@ -393,8 +391,6 @@ void ThreadGroupImpl::Start(
in_start().may_block_without_delay =
FeatureList::IsEnabled(kMayBlockWithoutDelay);
in_start().fixed_max_best_effort_tasks =
FeatureList::IsEnabled(kFixedMaxBestEffortTasks);
in_start().may_block_threshold =
may_block_threshold ? may_block_threshold.value()
: (priority_hint_ == ThreadPriority::NORMAL
......@@ -532,6 +528,11 @@ size_t ThreadGroupImpl::GetMaxTasksForTesting() const {
return max_tasks_;
}
size_t ThreadGroupImpl::GetMaxBestEffortTasksForTesting() const {
CheckedAutoLock auto_lock(lock_);
return max_best_effort_tasks_;
}
size_t ThreadGroupImpl::NumberOfIdleWorkersForTesting() const {
CheckedAutoLock auto_lock(lock_);
return idle_workers_stack_.Size();
......@@ -637,7 +638,7 @@ void ThreadGroupImpl::WorkerThreadDelegateImpl::DidProcessTask(
RegisteredTaskSource task_source) {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
DCHECK(worker_only().is_running_task);
DCHECK(read_worker().may_block_start_time.is_null());
DCHECK(read_worker().blocking_start_time.is_null());
++worker_only().num_tasks_since_last_detach;
......@@ -656,6 +657,7 @@ void ThreadGroupImpl::WorkerThreadDelegateImpl::DidProcessTask(
CheckedAutoLock auto_lock(outer_->lock_);
DCHECK(!incremented_max_tasks_since_blocked_);
DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
// Running task bookkeeping.
outer_->DecrementTasksRunningLockRequired(
......@@ -799,14 +801,26 @@ void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingStarted(
blocking_type = BlockingType::WILL_BLOCK;
}
switch (blocking_type) {
case BlockingType::MAY_BLOCK:
MayBlockEntered();
break;
case BlockingType::WILL_BLOCK:
WillBlockEntered();
break;
ScopedCommandsExecutor executor(outer_.get());
CheckedAutoLock auto_lock(outer_->lock_);
DCHECK(!incremented_max_tasks_since_blocked_);
DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
DCHECK(read_worker().blocking_start_time.is_null());
write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();
if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT)
++outer_->num_unresolved_best_effort_may_block_;
if (blocking_type == BlockingType::WILL_BLOCK) {
incremented_max_tasks_since_blocked_ = true;
outer_->IncrementMaxTasksLockRequired();
outer_->EnsureEnoughWorkersLockRequired(&executor);
} else {
++outer_->num_unresolved_may_block_;
}
outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor);
}
void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingTypeUpgraded() {
......@@ -820,7 +834,7 @@ void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingTypeUpgraded() {
return;
}
{
ScopedCommandsExecutor executor(outer_.get());
CheckedAutoLock auto_lock(outer_->lock_);
// Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
......@@ -830,15 +844,11 @@ void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingTypeUpgraded() {
// Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
// same scope.
if (!read_worker().may_block_start_time.is_null()) {
write_worker().may_block_start_time = TimeTicks();
--outer_->num_unresolved_may_block_;
if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT)
--outer_->num_unresolved_best_effort_may_block_;
}
}
WillBlockEntered();
incremented_max_tasks_since_blocked_ = true;
outer_->IncrementMaxTasksLockRequired();
outer_->EnsureEnoughWorkersLockRequired(&executor);
}
void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingEnded() {
......@@ -846,48 +856,22 @@ void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingEnded() {
DCHECK(worker_only().is_running_task);
CheckedAutoLock auto_lock(outer_->lock_);
if (incremented_max_tasks_since_blocked_) {
outer_->DecrementMaxTasksLockRequired(*read_worker().current_task_priority);
} else {
DCHECK(!read_worker().may_block_start_time.is_null());
DCHECK(!read_worker().blocking_start_time.is_null());
if (incremented_max_tasks_since_blocked_)
outer_->DecrementMaxTasksLockRequired();
else
--outer_->num_unresolved_may_block_;
if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT)
if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
if (incremented_max_best_effort_tasks_since_blocked_)
outer_->DecrementMaxBestEffortTasksLockRequired();
else
--outer_->num_unresolved_best_effort_may_block_;
}
incremented_max_tasks_since_blocked_ = false;
write_worker().may_block_start_time = TimeTicks();
}
void ThreadGroupImpl::WorkerThreadDelegateImpl::MayBlockEntered() {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
DCHECK(worker_only().is_running_task);
ScopedCommandsExecutor executor(outer_.get());
CheckedAutoLock auto_lock(outer_->lock_);
DCHECK(!incremented_max_tasks_since_blocked_);
DCHECK(read_worker().may_block_start_time.is_null());
write_worker().may_block_start_time = subtle::TimeTicksNowIgnoringOverride();
++outer_->num_unresolved_may_block_;
if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT)
++outer_->num_unresolved_best_effort_may_block_;
outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor);
}
void ThreadGroupImpl::WorkerThreadDelegateImpl::WillBlockEntered() {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
DCHECK(worker_only().is_running_task);
ScopedCommandsExecutor executor(outer_.get());
CheckedAutoLock auto_lock(outer_->lock_);
DCHECK(!incremented_max_tasks_since_blocked_);
DCHECK(read_worker().may_block_start_time.is_null());
incremented_max_tasks_since_blocked_ = true;
outer_->IncrementMaxTasksLockRequired(*read_worker().current_task_priority);
outer_->EnsureEnoughWorkersLockRequired(&executor);
incremented_max_best_effort_tasks_since_blocked_ = false;
write_worker().blocking_start_time = TimeTicks();
}
bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanGetWorkLockRequired(
......@@ -921,23 +905,25 @@ bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanGetWorkLockRequired(
return true;
}
bool ThreadGroupImpl::WorkerThreadDelegateImpl::
MustIncrementMaxTasksLockRequired() {
if (!incremented_max_tasks_since_blocked_ &&
!read_any().may_block_start_time.is_null() &&
subtle::TimeTicksNowIgnoringOverride() -
read_any().may_block_start_time >=
void ThreadGroupImpl::WorkerThreadDelegateImpl::
MaybeIncrementMaxTasksLockRequired() {
if (read_any().blocking_start_time.is_null() ||
subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
outer_->after_start().may_block_threshold) {
incremented_max_tasks_since_blocked_ = true;
return;
}
if (!incremented_max_tasks_since_blocked_) {
incremented_max_tasks_since_blocked_ = true;
--outer_->num_unresolved_may_block_;
if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT)
outer_->IncrementMaxTasksLockRequired();
}
if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
!incremented_max_best_effort_tasks_since_blocked_) {
incremented_max_best_effort_tasks_since_blocked_ = true;
--outer_->num_unresolved_best_effort_may_block_;
return true;
outer_->IncrementMaxBestEffortTasksLockRequired();
}
return false;
}
void ThreadGroupImpl::WaitForWorkersIdleLockRequiredForTesting(size_t n) {
......@@ -1095,10 +1081,7 @@ void ThreadGroupImpl::AdjustMaxTasks() {
WorkerThreadDelegateImpl* delegate =
static_cast<WorkerThreadDelegateImpl*>(worker->delegate());
AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
if (delegate->MustIncrementMaxTasksLockRequired()) {
IncrementMaxTasksLockRequired(
delegate->current_task_priority_lock_required());
}
delegate->MaybeIncrementMaxTasksLockRequired();
}
// Wake up workers according to the updated |max_tasks_|. This will also
......@@ -1186,24 +1169,29 @@ void ThreadGroupImpl::IncrementTasksRunningLockRequired(TaskPriority priority) {
UpdateMinAllowedPriorityLockRequired();
}
void ThreadGroupImpl::DecrementMaxTasksLockRequired(TaskPriority priority) {
void ThreadGroupImpl::DecrementMaxTasksLockRequired() {
DCHECK_GT(num_running_tasks_, 0U);
DCHECK_GT(max_tasks_, 0U);
--max_tasks_;
if (priority == TaskPriority::BEST_EFFORT &&
!after_start().fixed_max_best_effort_tasks) {
--max_best_effort_tasks_;
}
UpdateMinAllowedPriorityLockRequired();
}
void ThreadGroupImpl::IncrementMaxTasksLockRequired(TaskPriority priority) {
void ThreadGroupImpl::IncrementMaxTasksLockRequired() {
DCHECK_GT(num_running_tasks_, 0U);
++max_tasks_;
if (priority == TaskPriority::BEST_EFFORT &&
!after_start().fixed_max_best_effort_tasks) {
UpdateMinAllowedPriorityLockRequired();
}
void ThreadGroupImpl::DecrementMaxBestEffortTasksLockRequired() {
DCHECK_GT(num_running_tasks_, 0U);
DCHECK_GT(max_best_effort_tasks_, 0U);
--max_best_effort_tasks_;
UpdateMinAllowedPriorityLockRequired();
}
void ThreadGroupImpl::IncrementMaxBestEffortTasksLockRequired() {
DCHECK_GT(num_running_tasks_, 0U);
++max_best_effort_tasks_;
}
UpdateMinAllowedPriorityLockRequired();
}
......
......@@ -122,8 +122,9 @@ class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
// Returns the number of workers in this thread group.
size_t NumberOfWorkersForTesting() const;
// Returns |max_tasks_|.
// Returns |max_tasks_|/|max_best_effort_tasks_|.
size_t GetMaxTasksForTesting() const;
size_t GetMaxBestEffortTasksForTesting() const;
// Returns the number of workers that are idle (i.e. not running tasks).
size_t NumberOfIdleWorkersForTesting() const;
......@@ -138,6 +139,8 @@ class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
friend class ThreadGroupImplMayBlockTest;
FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
ThreadBlockUnblockPremature);
FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
ThreadBlockUnblockPrematureBestEffort);
// ThreadGroup:
void UpdateSortKey(TaskSource::Transaction transaction) override;
......@@ -211,12 +214,13 @@ class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
void IncrementTasksRunningLockRequired(TaskPriority priority)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Increments/decrements the number of tasks that can run in this thread
// group. May only be called in a scope where a task is running with
// |priority|.
void DecrementMaxTasksLockRequired(TaskPriority priority)
// Increments/decrements the number of [best effort] tasks that can run in
// this thread group.
void DecrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
void DecrementMaxBestEffortTasksLockRequired()
EXCLUSIVE_LOCKS_REQUIRED(lock_);
void IncrementMaxTasksLockRequired(TaskPriority priority)
void IncrementMaxBestEffortTasksLockRequired()
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Values set at Start() and never modified afterwards.
......@@ -244,7 +248,6 @@ class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
WorkerThreadObserver* worker_thread_observer = nullptr;
bool may_block_without_delay;
bool fixed_max_best_effort_tasks;
// Threshold after which the max tasks is increased to compensate for a
// worker that is within a MAY_BLOCK ScopedBlockingCall.
......
......@@ -916,15 +916,20 @@ class ThreadGroupImplBlockingTest
// Saturates the thread group with a task that first blocks, waits to be
// unblocked, then exits.
void SaturateWithBlockingTasks(
const NestedBlockingType& nested_blocking_type) {
const NestedBlockingType& nested_blocking_type,
TaskPriority priority = TaskPriority::USER_BLOCKING) {
TestWaitableEvent threads_running;
const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
{MayBlock(), WithBaseSyncPrimitives(), priority},
&mock_pooled_task_runner_delegate_);
RepeatingClosure threads_running_barrier = BarrierClosure(
kMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner_->PostTask(
task_runner->PostTask(
FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier,
nested_blocking_type]() {
NestedScopedBlockingCall nested_scoped_blocking_call(
......@@ -938,15 +943,20 @@ class ThreadGroupImplBlockingTest
// Saturates the thread group with a task that waits for other tasks without
// entering a ScopedBlockingCall, then exits.
void SaturateWithBusyTasks() {
void SaturateWithBusyTasks(
TaskPriority priority = TaskPriority::USER_BLOCKING) {
TestWaitableEvent threads_running;
const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
{MayBlock(), WithBaseSyncPrimitives(), priority},
&mock_pooled_task_runner_delegate_);
RepeatingClosure threads_running_barrier = BarrierClosure(
kMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
// Posting these tasks should cause new workers to be created.
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner_->PostTask(
task_runner->PostTask(
FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier]() {
threads_running_barrier.Run();
busy_threads_continue_.Wait();
......@@ -1014,6 +1024,30 @@ TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblocked) {
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
}
// Verify that SaturateWithBlockingTasks() of BEST_EFFORT tasks causes max best
// effort tasks to increase and creates a worker if needed. Also verify that
// UnblockBlockingTasks() decreases max best effort tasks after an increase.
TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedBestEffort) {
CreateAndStartThreadGroup();
ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
SaturateWithBlockingTasks(GetParam(), TaskPriority::BEST_EFFORT);
// Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
// should not block forever.
SaturateWithBusyTasks(TaskPriority::BEST_EFFORT);
EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
UnblockBusyTasks();
UnblockBlockingTasks();
task_tracker_.FlushForTesting();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
}
// Verify that flooding the thread group with more BEST_EFFORT tasks than
// kMaxBestEffortTasks doesn't prevent USER_VISIBLE tasks from running.
TEST_P(ThreadGroupImplBlockingTest, TooManyBestEffortTasks) {
......@@ -1286,6 +1320,37 @@ TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPremature) {
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
}
// Verify that if a BEST_EFFORT task enters the scope of a WILL_BLOCK
// ScopedBlockingCall, but exits the scope before the MayBlock threshold is
// reached, that the max best effort tasks does not increase.
TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPrematureBestEffort) {
// Create a thread group with an infinite MayBlock threshold so that a
// MAY_BLOCK ScopedBlockingCall never increases the max tasks.
CreateAndStartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
kMaxTasks, // |max_tasks|
kMaxTasks, // |max_best_effort_tasks|
nullptr, // |worker_observer|
TimeDelta::Max() // |may_block_threshold|
);
ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
SaturateWithBlockingTasks(NestedBlockingType(BlockingType::WILL_BLOCK,
OptionalBlockingType::NO_BLOCK,
BlockingType::WILL_BLOCK),
TaskPriority::BEST_EFFORT);
PlatformThread::Sleep(
2 * thread_group_->blocked_workers_poll_period_for_testing());
EXPECT_GE(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), 2 * kMaxTasks);
EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
UnblockBlockingTasks();
task_tracker_.FlushForTesting();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
}
// Verify that if max tasks is incremented because of a MAY_BLOCK
// ScopedBlockingCall, it isn't incremented again when there is a nested
// WILL_BLOCK ScopedBlockingCall.
......
......@@ -2914,25 +2914,6 @@
]
}
],
"FixedMaxBestEffortTasks": [
{
"platforms": [
"android",
"chromeos",
"linux",
"mac",
"windows"
],
"experiments": [
{
"name": "Enabled",
"enable_features": [
"FixedMaxBestEffortTasks"
]
}
]
}
],
"FlexNG": [
{
"platforms": [
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment