Commit 1ae9a275 authored by Gabriel Charette's avatar Gabriel Charette Committed by Commit Bot

[SchedulerWorkerPoolImpl] Reinforce JoinForTesting to prevent use-after-free post Cleanup().

SchedulerWorker can touch SchedulerWorkerPoolImpl
through SchedulerWorker::delegate_->outer_.

This could cause use-after-free in tests (not a problem in prod because
we never join workers nor destroy the scheduler/pools) because
SchedulerWorkerPoolImpl no longer accounts for a SchedulerWorker
after invoking SchedulerWorker::Cleanup and hence will not join it
in JoinForTesting() before it is itself destroyed.

Before this CL: a SchedulerWorker could access SchedulerWorkerPoolImpl
after Cleanup() and hence potentially use after free if the pool is
destroyed before the detached thread completes execution.

This CL reinforces JoinForTesting() to wait for all workers that
ever had a reference to |this| (i.e. its |workers_| + any recently
reclaimed workers).

Added TaskSchedulerWorkerPoolTest.RacyCleanup as regression test although
it passes in most scenarios without the fix because this race is hard to
exercise...

And added a DCHECK on start that the requested pool size isn't greater than
kMaxNumberOfWorkers (as I tried to add 1000 worker in my new test and had to
debug why it hung until I rediscovered this intentional behavior).

Bug: 810464
Change-Id: Ibaefb5446d462c325e4ddfd34a54ad3c448c1417
Reviewed-on: https://chromium-review.googlesource.com/931547
Commit-Queue: Gabriel Charette <gab@chromium.org>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#539095}
parent 5a38d2f2
......@@ -99,6 +99,9 @@ class SchedulerWorker::Thread : public PlatformThread::Delegate {
wake_up_event_.Reset();
}
// Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
// after invoking OnMainExit().
outer_->delegate_->OnMainExit(outer_.get());
// Break the ownership circle between SchedulerWorker and Thread.
......
......@@ -76,7 +76,10 @@ class BASE_EXPORT SchedulerWorker
// SchedulerWorker::WakeUp()
virtual void WaitForWork(WaitableEvent* wake_up_event);
// Called by |worker|'s thread right before the main function exits.
// Called by |worker|'s thread right before the main function exits. The
// Delegate is free to release any associated resources in this call. It is
// guaranteed that SchedulerWorker won't access the Delegate or the
// TaskTracker after calling OnMainExit() on the Delegate.
virtual void OnMainExit(SchedulerWorker* worker) {}
};
......
......@@ -219,6 +219,7 @@ void SchedulerWorkerPoolImpl::Start(
worker_capacity_ = params.max_threads();
initial_worker_capacity_ = worker_capacity_;
DCHECK_LE(initial_worker_capacity_, kMaxNumberOfWorkers);
suggested_reclaim_time_ = params.suggested_reclaim_time();
backward_compatibility_ = params.backward_compatibility();
worker_environment_ = worker_environment;
......@@ -306,6 +307,8 @@ void SchedulerWorkerPoolImpl::JoinForTesting() {
{
AutoSchedulerLock auto_lock(lock_);
DCHECK_GT(workers_.size(), size_t(0)) << "Joined an unstarted worker pool.";
DCHECK(!CanWorkerCleanupForTestingLockRequired() ||
suggested_reclaim_time_.is_max())
<< "Workers can cleanup during join.";
......@@ -319,10 +322,20 @@ void SchedulerWorkerPoolImpl::JoinForTesting() {
worker->JoinForTesting();
#if DCHECK_IS_ON()
AutoSchedulerLock auto_lock(lock_);
DCHECK(workers_ == workers_copy);
{
AutoSchedulerLock auto_lock(lock_);
DCHECK(workers_ == workers_copy);
}
#endif
// Make sure recently cleaned up workers (ref.
// SchedulerWorkerDelegateImpl::CleanupLockRequired()) had time to exit as
// they have a raw reference to |this| (and to TaskTracker) which can
// otherwise result in racy use-after-frees per no longer being part of
// |workers_| and hence not being explicitly joined above :
// https://crbug.com/810464.
no_workers_remaining_for_testing_.Wait();
DCHECK(!join_for_testing_returned_.IsSignaled());
join_for_testing_returned_.Signal();
}
......@@ -356,6 +369,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
: outer_(outer) {
// Bound in OnMainEntry().
DETACH_FROM_THREAD(worker_thread_checker_);
outer_->live_workers_count_for_testing_.Increment();
}
// OnMainExit() handles the thread-affine cleanup; SchedulerWorkerDelegateImpl
......@@ -544,22 +559,29 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainExit(
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
#if DCHECK_IS_ON()
bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
AutoSchedulerLock auto_lock(outer_->lock_);
{
bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
AutoSchedulerLock auto_lock(outer_->lock_);
// |worker| should already have been removed from the idle workers stack and
// |workers_| by the time the thread is about to exit. (except in the cases
// where the pool is no longer going to be used - in which case, it's fine for
// there to be invalid workers in the pool.
if (!shutdown_complete && !outer_->join_for_testing_started_.IsSet()) {
DCHECK(!outer_->idle_workers_stack_.Contains(worker));
DCHECK(!ContainsWorker(outer_->workers_, worker));
// |worker| should already have been removed from the idle workers stack and
// |workers_| by the time the thread is about to exit. (except in the cases
// where the pool is no longer going to be used - in which case, it's fine
// for there to be invalid workers in the pool.
if (!shutdown_complete && !outer_->join_for_testing_started_.IsSet()) {
DCHECK(!outer_->idle_workers_stack_.Contains(worker));
DCHECK(!ContainsWorker(outer_->workers_, worker));
}
}
#endif
#if defined(OS_WIN)
win_thread_environment_.reset();
#endif // defined(OS_WIN)
if (!outer_->live_workers_count_for_testing_.Decrement()) {
DCHECK(!outer_->no_workers_remaining_for_testing_.IsSignaled());
outer_->no_workers_remaining_for_testing_.Signal();
}
}
void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
......
......@@ -11,6 +11,7 @@
#include <string>
#include <vector>
#include "base/atomic_ref_count.h"
#include "base/base_export.h"
#include "base/containers/stack.h"
#include "base/logging.h"
......@@ -19,6 +20,7 @@
#include "base/strings/string_piece.h"
#include "base/synchronization/atomic_flag.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/waitable_event.h"
#include "base/task_runner.h"
#include "base/task_scheduler/priority_queue.h"
#include "base/task_scheduler/scheduler_lock.h"
......@@ -239,6 +241,22 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// All workers owned by this worker pool.
std::vector<scoped_refptr<SchedulerWorker>> workers_;
// The number of live worker threads with a reference to this
// SchedulerWorkerPoolImpl. This is always greater-than-or-equal to
// |workers_.size()| as it includes those as well as reclaimed threads that
// haven't yet completed their exit. JoinForTesting() must wait for this count
// to reach 0 before returning.
AtomicRefCount live_workers_count_for_testing_{0};
// Signaled when |live_workers_count_| reaches 0 (which can only happen after
// initiating JoinForTesting() as the pool always keeps at least one idle
// worker otherwise). Note: a Semaphore would be a better suited construct
// than |live_workers_count_for_testing_| +
// |no_workers_remaining_for_testing_| but //base currently doesn't provide it
// and this use case doesn't justify it.
WaitableEvent no_workers_remaining_for_testing_{
WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED};
// Workers can be added as needed up until there are |worker_capacity_|
// workers.
size_t worker_capacity_ = 0;
......
......@@ -1536,5 +1536,62 @@ TEST_F(TaskSchedulerWorkerPoolBlockingTest, MaximumWorkersTest) {
task_tracker_.FlushForTesting();
}
// Verify that worker detachement doesn't race with worker cleanup, regression
// test for https://crbug.com/810464.
TEST(TaskSchedulerWorkerPoolTest, RacyCleanup) {
constexpr size_t kWorkerCapacity = 256;
constexpr TimeDelta kReclaimTimeForRacyCleanupTest =
TimeDelta::FromMilliseconds(10);
TaskTracker task_tracker("Test");
DelayedTaskManager delayed_task_manager;
scoped_refptr<TaskRunner> service_thread_task_runner =
MakeRefCounted<TestSimpleTaskRunner>();
delayed_task_manager.Start(service_thread_task_runner);
SchedulerWorkerPoolImpl worker_pool("RacyCleanupTestWorkerPool", "A",
ThreadPriority::NORMAL, &task_tracker,
&delayed_task_manager);
worker_pool.Start(SchedulerWorkerPoolParams(kWorkerCapacity,
kReclaimTimeForRacyCleanupTest),
service_thread_task_runner,
SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
scoped_refptr<TaskRunner> task_runner =
worker_pool.CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
WaitableEvent threads_running(WaitableEvent::ResetPolicy::AUTOMATIC,
WaitableEvent::InitialState::NOT_SIGNALED);
WaitableEvent unblock_threads(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
RepeatingClosure threads_running_barrier = BarrierClosure(
kWorkerCapacity,
BindOnce(&WaitableEvent::Signal, Unretained(&threads_running)));
for (size_t i = 0; i < kWorkerCapacity; ++i) {
task_runner->PostTask(
FROM_HERE,
BindOnce(
[](OnceClosure on_running, WaitableEvent* unblock_threads) {
std::move(on_running).Run();
unblock_threads->Wait();
},
threads_running_barrier, Unretained(&unblock_threads)));
}
// Wait for all workers to be ready and release them all at once.
threads_running.Wait();
unblock_threads.Signal();
// Sleep to wakeup precisely when all workers are going to try to cleanup per
// being idle.
PlatformThread::Sleep(kReclaimTimeForRacyCleanupTest);
worker_pool.DisallowWorkerCleanupForTesting();
worker_pool.JoinForTesting();
// Unwinding this test will be racy if worker cleanup can race with
// SchedulerWorkerPoolImpl destruction : https://crbug.com/810464.
}
} // namespace internal
} // namespace base
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment