Commit cfcfba66 authored by Francois Doray's avatar Francois Doray Committed by Commit Bot

[TaskScheduler] Clear blocking observer when using WaitableEvent in tests.

When a task posted to TaskScheduler waits on a WaitableEvent, the
pool capacity is automatically incremented after a short delay
(https://cs.chromium.org/chromium/src/base/synchronization/waitable_event_win.cc?l=72&rcl=c60b0bcd6830c32c30128867e55a53f27642df5f and
https://cs.chromium.org/chromium/src/base/synchronization/waitable_event_posix.cc?l=166&rcl=c60b0bcd6830c32c30128867e55a53f27642df5f and
https://cs.chromium.org/chromium/src/base/synchronization/waitable_event_mac.cc?l=115&rcl=c60b0bcd6830c32c30128867e55a53f27642df5f).

This is desirable in production (if a task blocks for a long time,
another thread is created and the number of tasks that are using
the CPU stays constant).

However, it is undesirable when a WaitableEvent is used in
TaskScheduler tests, because it leads to hard to predict changes
of pool capacity.

This CL adds a ScopedClearBlockingObserverForTesting annotation to
all WaitableEvent::Wait that happen in code that tests
SchedulerWorkerPoolImpl, to avoid these hard to predict changes
of pool capacity.

The annotation is not needed when the WaitableEvent::Wait doesn't
happen on a thread managed by SchedulerWorkerPoolImpl, because that
already has no effect on pool capacity.

Bug: 768436
Change-Id: Ie166ca55f914d89c5dd0fe6dad9ae1533e0f6c29
Reviewed-on: https://chromium-review.googlesource.com/1033533Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Commit-Queue: François Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#554732}
parent 0dba0995
......@@ -63,6 +63,13 @@ constexpr size_t kNumTasksPostedPerThread = 150;
constexpr TimeDelta kReclaimTimeForCleanupTests =
TimeDelta::FromMilliseconds(500);
// Waits on |event| in a scope where the blocking observer is null, to avoid
// affecting the worker capacity.
void WaitWithoutBlockingObserver(WaitableEvent* event) {
internal::ScopedClearBlockingObserverForTesting clear_blocking_observer;
event->Wait();
}
class TaskSchedulerWorkerPoolImplTestBase {
protected:
TaskSchedulerWorkerPoolImplTestBase()
......@@ -220,7 +227,8 @@ TEST_P(TaskSchedulerWorkerPoolImplTestParam, PostTasksWithOneAvailableWorker) {
CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
GetParam()));
EXPECT_TRUE(blocked_task_factories.back()->PostTask(
PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event))));
PostNestedTask::NO,
BindOnce(&WaitWithoutBlockingObserver, Unretained(&event))));
blocked_task_factories.back()->WaitForAllTasksToRun();
}
......@@ -254,7 +262,8 @@ TEST_P(TaskSchedulerWorkerPoolImplTestParam, Saturate) {
CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
GetParam()));
EXPECT_TRUE(factories.back()->PostTask(
PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event))));
PostNestedTask::NO,
BindOnce(&WaitWithoutBlockingObserver, Unretained(&event))));
factories.back()->WaitForAllTasksToRun();
}
......@@ -372,7 +381,7 @@ void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
WaitableEvent* barrier) {
*platform_thread_ref = PlatformThread::CurrentRef();
task_running->Signal();
barrier->Wait();
WaitWithoutBlockingObserver(barrier);
}
} // namespace
......@@ -454,7 +463,7 @@ class TaskSchedulerWorkerPoolCheckTlsReuse
public:
void SetTlsValueAndWait() {
slot_.Set(reinterpret_cast<void*>(kMagicTlsValue));
waiter_.Wait();
WaitWithoutBlockingObserver(&waiter_);
}
void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) {
......@@ -462,7 +471,7 @@ class TaskSchedulerWorkerPoolCheckTlsReuse
subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1);
count_waiter->Signal();
waiter_.Wait();
WaitWithoutBlockingObserver(&waiter_);
}
protected:
......@@ -573,7 +582,7 @@ class TaskSchedulerWorkerPoolHistogramTest
BindOnce(
[](OnceClosure on_running, WaitableEvent* continue_event) {
std::move(on_running).Run();
continue_event->Wait();
WaitWithoutBlockingObserver(continue_event);
},
all_workers_running_barrier, continue_event));
}
......@@ -597,8 +606,8 @@ TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaits) {
{WithBaseSyncPrimitives()});
// Post a task.
task_runner->PostTask(FROM_HERE,
BindOnce(&WaitableEvent::Wait, Unretained(&event)));
task_runner->PostTask(
FROM_HERE, BindOnce(&WaitWithoutBlockingObserver, Unretained(&event)));
// Post 2 more tasks while the first task hasn't completed its execution. It
// is guaranteed that these tasks will run immediately after the first task,
......@@ -722,7 +731,7 @@ TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeCleanup) {
ASSERT_FALSE(thread_ref->is_null());
EXPECT_EQ(*thread_ref, PlatformThread::CurrentRef());
cleanup_thread_running->Signal();
cleanup_thread_continue->Wait();
WaitWithoutBlockingObserver(cleanup_thread_continue);
},
Unretained(&thread_ref), Unretained(&cleanup_thread_running),
Unretained(&cleanup_thread_continue)));
......@@ -755,7 +764,7 @@ TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeCleanup) {
<< "Worker reused. Worker will not cleanup and the "
"histogram value will be wrong.";
top_idle_thread_running->Signal();
top_idle_thread_continue->Wait();
WaitWithoutBlockingObserver(top_idle_thread_continue);
},
thread_ref, Unretained(&top_idle_thread_running),
Unretained(&top_idle_thread_continue)));
......@@ -829,7 +838,7 @@ TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, VerifyStandbyThread) {
RepeatingClosure closure = BindRepeating(
[](WaitableEvent* thread_running, WaitableEvent* thread_continue) {
thread_running->Signal();
thread_continue->Wait();
WaitWithoutBlockingObserver(thread_continue);
},
Unretained(&thread_running), Unretained(&thread_continue));
......@@ -948,15 +957,7 @@ class TaskSchedulerWorkerPoolBlockingTest
NestedScopedBlockingCall nested_scoped_blocking_call(
nested_blocking_type);
blocking_thread_running_closure->Run();
{
// Use ScopedClearBlockingObserverForTesting to avoid
// affecting the worker capacity with this WaitableEvent.
internal::ScopedClearBlockingObserverForTesting
scoped_clear_blocking_observer;
blocking_thread_continue_->Wait();
}
WaitWithoutBlockingObserver(blocking_thread_continue_);
},
Unretained(&blocking_thread_running_closure),
Unretained(&blocking_thread_continue_), nested_blocking_type));
......@@ -1039,24 +1040,11 @@ TEST_P(TaskSchedulerWorkerPoolBlockingTest, PostBeforeBlocking) {
WaitableEvent* thread_running, WaitableEvent* thread_can_block,
WaitableEvent* thread_continue) {
thread_running->Signal();
{
// Use ScopedClearBlockingObserverForTesting to avoid affecting
// the worker capacity with this WaitableEvent.
internal::ScopedClearBlockingObserverForTesting
scoped_clear_blocking_observer;
thread_can_block->Wait();
}
WaitWithoutBlockingObserver(thread_can_block);
NestedScopedBlockingCall nested_scoped_blocking_call(
nested_blocking_type);
{
// Use ScopedClearBlockingObserverForTesting to avoid affecting
// the worker capacity with this WaitableEvent.
internal::ScopedClearBlockingObserverForTesting
scoped_clear_blocking_observer;
thread_continue->Wait();
}
WaitWithoutBlockingObserver(thread_continue);
},
GetParam(), Unretained(&thread_running),
Unretained(&thread_can_block), Unretained(&thread_continue)));
......@@ -1083,15 +1071,8 @@ TEST_P(TaskSchedulerWorkerPoolBlockingTest, PostBeforeBlocking) {
[](Closure* extra_threads_running_barrier,
WaitableEvent* extra_threads_continue) {
extra_threads_running_barrier->Run();
{
// Use ScopedClearBlockingObserverForTesting
// to avoid affecting the worker capacity
// with this WaitableEvent.
internal::
ScopedClearBlockingObserverForTesting
scoped_clear_blocking_observer;
extra_threads_continue->Wait();
}
WaitWithoutBlockingObserver(
extra_threads_continue);
},
Unretained(&extra_threads_running_barrier),
Unretained(&extra_threads_continue)));
......@@ -1143,13 +1124,7 @@ TEST_P(TaskSchedulerWorkerPoolBlockingTest, WorkersIdleWhenOverCapacity) {
auto callback = BindOnce(
[](Closure* thread_running_barrier, WaitableEvent* thread_continue) {
thread_running_barrier->Run();
{
// Use ScopedClearBlockingObserver ForTesting to avoid affecting the
// worker capacity with this WaitableEvent.
internal::ScopedClearBlockingObserverForTesting
scoped_clear_blocking_observer;
thread_continue->Wait();
}
WaitWithoutBlockingObserver(thread_continue);
},
Unretained(&thread_running_barrier), Unretained(&thread_continue));
task_runner_->PostTask(FROM_HERE, std::move(callback));
......@@ -1255,17 +1230,8 @@ TEST_F(TaskSchedulerWorkerPoolBlockingTest,
// Saturate the pool so that a MAY_BLOCK ScopedBlockingCall would increment
// the worker capacity.
for (size_t i = 0; i < kNumWorkersInWorkerPool - 1; ++i) {
task_runner->PostTask(FROM_HERE,
BindOnce(
[](WaitableEvent* can_return) {
// Use ScopedClearBlockingObserverForTesting to
// avoid affecting the worker capacity with this
// WaitableEvent.
internal::ScopedClearBlockingObserverForTesting
scoped_clear_blocking_observer;
can_return->Wait();
},
Unretained(&can_return)));
task_runner->PostTask(FROM_HERE, BindOnce(&WaitWithoutBlockingObserver,
Unretained(&can_return)));
}
WaitableEvent can_instantiate_will_block(
......@@ -1283,22 +1249,10 @@ TEST_F(TaskSchedulerWorkerPoolBlockingTest,
WaitableEvent* did_instantiate_will_block,
WaitableEvent* can_return) {
ScopedBlockingCall may_block(BlockingType::MAY_BLOCK);
{
// Use ScopedClearBlockingObserverForTesting to avoid affecting
// the worker capacity with this WaitableEvent.
internal::ScopedClearBlockingObserverForTesting
scoped_clear_blocking_observer;
can_instantiate_will_block->Wait();
}
WaitWithoutBlockingObserver(can_instantiate_will_block);
ScopedBlockingCall will_block(BlockingType::WILL_BLOCK);
did_instantiate_will_block->Signal();
{
// Use ScopedClearBlockingObserverForTesting to avoid affecting
// the worker capacity with this WaitableEvent.
internal::ScopedClearBlockingObserverForTesting
scoped_clear_blocking_observer;
can_return->Wait();
}
WaitWithoutBlockingObserver(can_return);
},
Unretained(&can_instantiate_will_block),
Unretained(&did_instantiate_will_block), Unretained(&can_return)));
......@@ -1360,10 +1314,9 @@ TEST(TaskSchedulerWorkerPoolOverWorkerCapacityTest, VerifyCleanup) {
threads_running_barrier->Run();
{
ScopedBlockingCall scoped_blocking_call(BlockingType::WILL_BLOCK);
blocked_call_continue->Wait();
WaitWithoutBlockingObserver(blocked_call_continue);
}
threads_continue->Wait();
WaitWithoutBlockingObserver(threads_continue);
},
Unretained(&threads_running_barrier), Unretained(&threads_continue),
Unretained(&blocked_call_continue));
......@@ -1390,7 +1343,8 @@ TEST(TaskSchedulerWorkerPoolOverWorkerCapacityTest, VerifyCleanup) {
[](Closure* extra_threads_running_barrier,
WaitableEvent* extra_threads_continue) {
extra_threads_running_barrier->Run();
extra_threads_continue->Wait();
WaitWithoutBlockingObserver(
extra_threads_continue);
},
Unretained(&extra_threads_running_barrier),
Unretained(&extra_threads_continue)));
......@@ -1455,7 +1409,8 @@ TEST_F(TaskSchedulerWorkerPoolBlockingTest, MaximumWorkersTest) {
ScopedBlockingCall scoped_blocking_call(
BlockingType::WILL_BLOCK);
early_threads_barrier_closure->Run();
early_release_thread_continue->Wait();
WaitWithoutBlockingObserver(
early_release_thread_continue);
}
early_threads_finished->Run();
},
......@@ -1490,7 +1445,7 @@ TEST_F(TaskSchedulerWorkerPoolBlockingTest, MaximumWorkersTest) {
WaitableEvent* late_release_thread_contine) {
ScopedBlockingCall scoped_blocking_call(BlockingType::WILL_BLOCK);
late_threads_barrier_closure->Run();
late_release_thread_contine->Wait();
WaitWithoutBlockingObserver(late_release_thread_contine);
},
Unretained(&late_threads_barrier_closure),
Unretained(&late_release_thread_contine)));
......@@ -1519,7 +1474,7 @@ TEST_F(TaskSchedulerWorkerPoolBlockingTest, MaximumWorkersTest) {
BindOnce(
[](Closure* closure, WaitableEvent* final_tasks_continue) {
closure->Run();
final_tasks_continue->Wait();
WaitWithoutBlockingObserver(final_tasks_continue);
},
Unretained(&final_tasks_running_barrier),
Unretained(&final_tasks_continue)));
......@@ -1575,7 +1530,7 @@ TEST(TaskSchedulerWorkerPoolTest, RacyCleanup) {
BindOnce(
[](OnceClosure on_running, WaitableEvent* unblock_threads) {
std::move(on_running).Run();
unblock_threads->Wait();
WaitWithoutBlockingObserver(unblock_threads);
},
threads_running_barrier, Unretained(&unblock_threads)));
}
......
......@@ -33,12 +33,12 @@ TestTaskFactory::~TestTaskFactory() {
}
bool TestTaskFactory::PostTask(PostNestedTask post_nested_task,
const Closure& after_task_closure) {
OnceClosure after_task_closure) {
AutoLock auto_lock(lock_);
return task_runner_->PostTask(
FROM_HERE,
BindOnce(&TestTaskFactory::RunTaskCallback, Unretained(this),
num_posted_tasks_++, post_nested_task, after_task_closure));
FROM_HERE, BindOnce(&TestTaskFactory::RunTaskCallback, Unretained(this),
num_posted_tasks_++, post_nested_task,
std::move(after_task_closure)));
}
void TestTaskFactory::WaitForAllTasksToRun() const {
......@@ -49,7 +49,7 @@ void TestTaskFactory::WaitForAllTasksToRun() const {
void TestTaskFactory::RunTaskCallback(size_t task_index,
PostNestedTask post_nested_task,
const Closure& after_task_closure) {
OnceClosure after_task_closure) {
if (post_nested_task == PostNestedTask::YES)
PostTask(PostNestedTask::NO, Closure());
......@@ -98,7 +98,7 @@ void TestTaskFactory::RunTaskCallback(size_t task_index,
}
if (!after_task_closure.is_null())
after_task_closure.Run();
std::move(after_task_closure).Run();
}
} // namespace test
......
......@@ -54,7 +54,7 @@ class TestTaskFactory {
// - Verify conditions in which the task runs (see potential failures above).
// - Run |after_task_closure| if it is not null.
bool PostTask(PostNestedTask post_nested_task,
const Closure& after_task_closure);
OnceClosure after_task_closure);
// Waits for all tasks posted by PostTask() to start running. It is not
// guaranteed that the tasks have completed their execution when this returns.
......@@ -65,7 +65,7 @@ class TestTaskFactory {
private:
void RunTaskCallback(size_t task_index,
PostNestedTask post_nested_task,
const Closure& after_task_closure);
OnceClosure after_task_closure);
// Synchronizes access to all members.
mutable Lock lock_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment