Commit 37e79479 authored by fdoray's avatar fdoray Committed by Commit bot

Test SequencedWorkerPool with redirection to the TaskScheduler.

This CL makes most SequencedWorkerPoolTest a TestWithParam. The parameter
controls whether redirection to TaskScheduler is enabled.

BUG=622400

Review-Url: https://codereview.chromium.org/2285633003
Cr-Commit-Position: refs/heads/master@{#419322}
parent 10118bff
...@@ -54,12 +54,16 @@ namespace base { ...@@ -54,12 +54,16 @@ namespace base {
namespace { namespace {
// An enum representing the state of all pools. Any given process should only // An enum representing the state of all pools. Any given non-test process
// ever transition from NONE_ACTIVE to the active states, transitions between // should only ever transition from NONE_ACTIVE to one of the active states.
// actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition // Transitions between actives states are unexpected. The
// occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called // REDIRECTED_TO_TASK_SCHEDULER transition occurs when
// and the WORKER_CREATED transition occurs when a Worker needs to be created // RedirectToTaskSchedulerForProcess() is called. The WORKER_CREATED transition
// because the first task was posted and the state is still NONE_ACTIVE. // occurs when a Worker needs to be created because the first task was posted
// and the state is still NONE_ACTIVE. In a test process, a transition to
// NONE_ACTIVE occurs when ResetRedirectToTaskSchedulerForProcessForTesting() is
// called.
//
// |g_all_pools_state| uses relaxed atomic operations to ensure no data race // |g_all_pools_state| uses relaxed atomic operations to ensure no data race
// between reads/writes, strict memory ordering isn't required per no other // between reads/writes, strict memory ordering isn't required per no other
// state being inferred from its value. Explicit synchronization (e.g. locks or // state being inferred from its value. Explicit synchronization (e.g. locks or
...@@ -67,6 +71,7 @@ namespace { ...@@ -67,6 +71,7 @@ namespace {
// NONE_ACTIVE after the first Worker was created -- this is not possible for // NONE_ACTIVE after the first Worker was created -- this is not possible for
// REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no // REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no
// other threads are active). // other threads are active).
//
// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
// will be phased out completely otherwise). // will be phased out completely otherwise).
enum AllPoolsState : subtle::Atomic32 { enum AllPoolsState : subtle::Atomic32 {
...@@ -902,6 +907,8 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( ...@@ -902,6 +907,8 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
// See https://code.google.com/p/chromium/issues/detail?id=168415 // See https://code.google.com/p/chromium/issues/detail?id=168415
void SequencedWorkerPool::Inner::CleanupForTesting() { void SequencedWorkerPool::Inner::CleanupForTesting() {
DCHECK_NE(subtle::NoBarrier_Load(&g_all_pools_state),
AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
DCHECK(!RunsTasksOnCurrentThread()); DCHECK(!RunsTasksOnCurrentThread());
base::ThreadRestrictions::ScopedAllowWait allow_wait; base::ThreadRestrictions::ScopedAllowWait allow_wait;
AutoLock lock(lock_); AutoLock lock(lock_);
...@@ -1449,8 +1456,7 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { ...@@ -1449,8 +1456,7 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
} }
// static // static
void SequencedWorkerPool:: void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() {
RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() {
DCHECK(TaskScheduler::GetInstance()); DCHECK(TaskScheduler::GetInstance());
// Hitting this DCHECK indicates that a task was posted to a // Hitting this DCHECK indicates that a task was posted to a
// SequencedWorkerPool before the TaskScheduler was initialized and // SequencedWorkerPool before the TaskScheduler was initialized and
...@@ -1462,6 +1468,16 @@ void SequencedWorkerPool:: ...@@ -1462,6 +1468,16 @@ void SequencedWorkerPool::
AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
} }
// static
void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() {
// This can be called when the current state is REDIRECTED_TO_TASK_SCHEDULER
// to stop redirecting tasks. It can also be called when the current state is
// WORKER_CREATED to allow RedirectToTaskSchedulerForProcess() to be called
// (RedirectToTaskSchedulerForProcess() cannot be called after a worker has
// been created if this isn't called).
subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::NONE_ACTIVE);
}
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix, const std::string& thread_name_prefix,
base::TaskPriority task_priority) base::TaskPriority task_priority)
......
...@@ -172,16 +172,27 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { ...@@ -172,16 +172,27 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
// PostSequencedWorkerTask(). Valid tokens are always nonzero. // PostSequencedWorkerTask(). Valid tokens are always nonzero.
static SequenceToken GetSequenceToken(); static SequenceToken GetSequenceToken();
// Invoke this once on the main thread of a process, before any other threads // Starts redirecting tasks posted to this process' SequencedWorkerPools to
// are created and before any tasks are posted to that process' // the registered TaskScheduler. This cannot be called after a task has been
// SequencedWorkerPools but after TaskScheduler was instantiated, to force all // posted to a SequencedWorkerPool. This is not thread-safe; proper
// SequencedWorkerPools in that process to redirect their tasks to the // synchronization is required to use any SequencedWorkerPool method after
// TaskScheduler. Note: SequencedWorkerPool instances with |max_threads == 1| // calling this. There must be a registered TaskScheduler when this is called.
// will be special cased to send all of their work as // Ideally, call this on the main thread of a process, before any other
// ExecutionMode::SINGLE_THREADED. // threads are created and before any tasks are posted to that process'
// SequencedWorkerPools.
// Note: SequencedWorkerPool instances with |max_threads == 1| will be special
// cased to send all of their work as ExecutionMode::SINGLE_THREADED.
// TODO(gab): Remove this if http://crbug.com/622400 fails // TODO(gab): Remove this if http://crbug.com/622400 fails
// (SequencedWorkerPool will be phased out completely otherwise). // (SequencedWorkerPool will be phased out completely otherwise).
static void RedirectSequencedWorkerPoolsToTaskSchedulerForProcess(); static void RedirectToTaskSchedulerForProcess();
// Stops redirecting tasks posted to this process' SequencedWorkerPools to the
// registered TaskScheduler and allows RedirectToTaskSchedulerForProcess() to
// be called even if tasks have already posted to a SequencedWorkerPool in
// this process. Calling this while there are active SequencedWorkerPools is
// not supported. This is not thread-safe; proper synchronization is required
// to use any SequencedWorkerPool method after calling this.
static void ResetRedirectToTaskSchedulerForProcessForTesting();
// When constructing a SequencedWorkerPool, there must be a // When constructing a SequencedWorkerPool, there must be a
// ThreadTaskRunnerHandle on the current thread unless you plan to // ThreadTaskRunnerHandle on the current thread unless you plan to
......
...@@ -17,6 +17,9 @@ ...@@ -17,6 +17,9 @@
#include "base/stl_util.h" #include "base/stl_util.h"
#include "base/synchronization/condition_variable.h" #include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#include "base/task_scheduler/scheduler_worker_pool_params.h"
#include "base/task_scheduler/task_scheduler.h"
#include "base/task_scheduler/task_scheduler_impl.h"
#include "base/test/sequenced_task_runner_test_template.h" #include "base/test/sequenced_task_runner_test_template.h"
#include "base/test/sequenced_worker_pool_owner.h" #include "base/test/sequenced_worker_pool_owner.h"
#include "base/test/task_runner_test_template.h" #include "base/test/task_runner_test_template.h"
...@@ -231,11 +234,41 @@ class TestTracker : public base::RefCountedThreadSafe<TestTracker> { ...@@ -231,11 +234,41 @@ class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
size_t started_events_; size_t started_events_;
}; };
class SequencedWorkerPoolTest : public testing::Test { enum class SequencedWorkerPoolRedirection { NONE, TO_TASK_SCHEDULER };
class SequencedWorkerPoolTest
: public testing::TestWithParam<SequencedWorkerPoolRedirection> {
public: public:
SequencedWorkerPoolTest() SequencedWorkerPoolTest()
: tracker_(new TestTracker) { : pool_owner_(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test")),
ResetPool(); tracker_(new TestTracker) {}
void SetUp() override {
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER) {
std::vector<SchedulerWorkerPoolParams> worker_pool_params;
worker_pool_params.emplace_back(
"SchedulerWorkerPoolName", ThreadPriority::NORMAL,
SchedulerWorkerPoolParams::IORestriction::ALLOWED, kNumWorkerThreads,
TimeDelta::Max());
TaskScheduler::CreateAndSetDefaultTaskScheduler(
std::move(worker_pool_params),
base::Bind([](const TaskTraits&) -> size_t { return 0U; }));
SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting();
SequencedWorkerPool::RedirectToTaskSchedulerForProcess();
}
}
void TearDown() override {
// Wait until all references to the SequencedWorkerPool are gone and destroy
// it. This must be done before destroying the TaskScheduler. Otherwise, the
// SequencedWorkerPool could try to redirect tasks to a destroyed
// TaskScheduler.
DeletePool();
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER) {
SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting();
DeleteTaskScheduler();
}
} }
const scoped_refptr<SequencedWorkerPool>& pool() { const scoped_refptr<SequencedWorkerPool>& pool() {
...@@ -243,10 +276,17 @@ class SequencedWorkerPoolTest : public testing::Test { ...@@ -243,10 +276,17 @@ class SequencedWorkerPoolTest : public testing::Test {
} }
TestTracker* tracker() { return tracker_.get(); } TestTracker* tracker() { return tracker_.get(); }
// Destroys the SequencedWorkerPool instance, blocking until it is fully shut // Waits until no tasks are running in the SequencedWorkerPool and no
// down, and creates a new instance. // reference to it remain. Then, destroys the SequencedWorkerPool.
void ResetPool() { void DeletePool() { pool_owner_.reset(); }
pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
// Destroys and unregisters the registered TaskScheduler, if any.
void DeleteTaskScheduler() {
if (TaskScheduler::GetInstance()) {
static_cast<internal::TaskSchedulerImpl*>(TaskScheduler::GetInstance())
->JoinForTesting();
TaskScheduler::SetInstance(nullptr);
}
} }
void SetWillWaitForShutdownCallback(const Closure& callback) { void SetWillWaitForShutdownCallback(const Closure& callback) {
...@@ -326,13 +366,12 @@ class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> { ...@@ -326,13 +366,12 @@ class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
DISALLOW_COPY_AND_ASSIGN(DeletionHelper); DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
}; };
void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool, void ShouldNotRun(const scoped_refptr<DeletionHelper>& helper) {
const scoped_refptr<DeletionHelper>& helper) {
ADD_FAILURE() << "Should never run"; ADD_FAILURE() << "Should never run";
} }
// Tests that delayed tasks are deleted upon shutdown of the pool. // Tests that shutdown does not wait for delayed tasks.
TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) { TEST_P(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
// Post something to verify the pool is started up. // Post something to verify the pool is started up.
EXPECT_TRUE(pool()->PostTask( EXPECT_TRUE(pool()->PostTask(
FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1))); FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
...@@ -344,8 +383,7 @@ TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) { ...@@ -344,8 +383,7 @@ TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
// Post something that shouldn't run. // Post something that shouldn't run.
EXPECT_TRUE(pool()->PostDelayedTask( EXPECT_TRUE(pool()->PostDelayedTask(
FROM_HERE, FROM_HERE,
base::Bind(&HoldPoolReference, base::Bind(&ShouldNotRun,
pool(),
make_scoped_refptr(new DeletionHelper(deleted_flag))), make_scoped_refptr(new DeletionHelper(deleted_flag))),
TestTimeouts::action_timeout())); TestTimeouts::action_timeout()));
...@@ -353,20 +391,25 @@ TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) { ...@@ -353,20 +391,25 @@ TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
ASSERT_EQ(1u, completion_sequence.size()); ASSERT_EQ(1u, completion_sequence.size());
ASSERT_EQ(1, completion_sequence[0]); ASSERT_EQ(1, completion_sequence[0]);
// Shutdown is asynchronous, so use ResetPool() to block until the pool is // Shutdown the pool.
// fully destroyed (and thus shut down). pool()->Shutdown();
ResetPool(); if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
TaskScheduler::GetInstance()->Shutdown();
// Verify that we didn't block until the task was due. // Verify that we didn't block until the task was due.
ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout()); ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
// Verify that the deferred task has not only not run, but has also been // Verify that the delayed task is deleted when the SequencedWorkerPool (and
// destroyed. // the TaskScheduler when applicable) are deleted.
ASSERT_TRUE(deleted_flag->data); EXPECT_FALSE(deleted_flag->data);
DeletePool();
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
DeleteTaskScheduler();
EXPECT_TRUE(deleted_flag->data);
} }
// Tests that same-named tokens have the same ID. // Tests that same-named tokens have the same ID.
TEST_F(SequencedWorkerPoolTest, NamedTokens) { TEST_P(SequencedWorkerPoolTest, NamedTokens) {
const std::string name1("hello"); const std::string name1("hello");
SequencedWorkerPool::SequenceToken token1 = SequencedWorkerPool::SequenceToken token1 =
pool()->GetNamedSequenceToken(name1); pool()->GetNamedSequenceToken(name1);
...@@ -394,7 +437,7 @@ TEST_F(SequencedWorkerPoolTest, NamedTokens) { ...@@ -394,7 +437,7 @@ TEST_F(SequencedWorkerPoolTest, NamedTokens) {
// Tests that posting a bunch of tasks (many more than the number of worker // Tests that posting a bunch of tasks (many more than the number of worker
// threads) runs them all. // threads) runs them all.
TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { TEST_P(SequencedWorkerPoolTest, LotsOfTasks) {
pool()->PostWorkerTask(FROM_HERE, pool()->PostWorkerTask(FROM_HERE,
base::Bind(&TestTracker::SlowTask, tracker(), 0)); base::Bind(&TestTracker::SlowTask, tracker(), 0));
...@@ -412,7 +455,7 @@ TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { ...@@ -412,7 +455,7 @@ TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
// worker threads) to two pools simultaneously runs them all twice. // worker threads) to two pools simultaneously runs them all twice.
// This test is meant to shake out any concurrency issues between // This test is meant to shake out any concurrency issues between
// pools (like histograms). // pools (like histograms).
TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) { TEST_P(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1"); SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2"); SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
...@@ -435,7 +478,7 @@ TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) { ...@@ -435,7 +478,7 @@ TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
// Test that tasks with the same sequence token are executed in order but don't // Test that tasks with the same sequence token are executed in order but don't
// affect other tasks. // affect other tasks.
TEST_F(SequencedWorkerPoolTest, Sequence) { TEST_P(SequencedWorkerPoolTest, Sequence) {
// Fill all the worker threads except one. // Fill all the worker threads except one.
const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
ThreadBlocker background_blocker; ThreadBlocker background_blocker;
...@@ -496,7 +539,7 @@ TEST_F(SequencedWorkerPoolTest, Sequence) { ...@@ -496,7 +539,7 @@ TEST_F(SequencedWorkerPoolTest, Sequence) {
// Tests that any tasks posted after Shutdown are ignored. // Tests that any tasks posted after Shutdown are ignored.
// Disabled for flakiness. See http://crbug.com/166451. // Disabled for flakiness. See http://crbug.com/166451.
TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) { TEST_P(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
// Start tasks to take all the threads and block them. // Start tasks to take all the threads and block them.
EnsureAllWorkersCreated(); EnsureAllWorkersCreated();
ThreadBlocker blocker; ThreadBlocker blocker;
...@@ -543,7 +586,14 @@ TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) { ...@@ -543,7 +586,14 @@ TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
ASSERT_EQ(old_has_work_call_count, has_work_call_count()); ASSERT_EQ(old_has_work_call_count, has_work_call_count());
} }
TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) { TEST_P(SequencedWorkerPoolTest, AllowsAfterShutdown) {
// As tested by TaskSchedulerTaskTrackerTest.WillPostAndRunDuringShutdown,
// TaskScheduler allows tasks to be posted during shutdown. However, since it
// doesn't provide a way to run a callback from inside its Shutdown() method,
// it would be hard to make this test work with redirection enabled.
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
return;
// Test that <n> new blocking tasks are allowed provided they're posted // Test that <n> new blocking tasks are allowed provided they're posted
// by a running tasks. // by a running tasks.
EnsureAllWorkersCreated(); EnsureAllWorkersCreated();
...@@ -589,8 +639,15 @@ TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) { ...@@ -589,8 +639,15 @@ TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
// Tests that blocking tasks can still be posted during shutdown, as long as // Tests that blocking tasks can still be posted during shutdown, as long as
// the task is not being posted within the context of a running task. // the task is not being posted within the context of a running task.
TEST_F(SequencedWorkerPoolTest, TEST_P(SequencedWorkerPoolTest,
AllowsBlockingTasksDuringShutdownOutsideOfRunningTask) { AllowsBlockingTasksDuringShutdownOutsideOfRunningTask) {
// As tested by TaskSchedulerTaskTrackerTest.WillPostAndRunDuringShutdown,
// TaskScheduler allows tasks to be posted during shutdown. However, since it
// doesn't provide a way to run a callback from inside its Shutdown() method,
// it would be hard to make this test work with redirection enabled.
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
return;
EnsureAllWorkersCreated(); EnsureAllWorkersCreated();
ThreadBlocker blocker; ThreadBlocker blocker;
...@@ -617,7 +674,16 @@ TEST_F(SequencedWorkerPoolTest, ...@@ -617,7 +674,16 @@ TEST_F(SequencedWorkerPoolTest,
// Tests that unrun tasks are discarded properly according to their shutdown // Tests that unrun tasks are discarded properly according to their shutdown
// mode. // mode.
TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { TEST_P(SequencedWorkerPoolTest, DiscardOnShutdown) {
// As tested by
// TaskSchedulerTaskTrackerTest.WillPostBeforeShutdownRunDuringShutdown, on
// shutdown, the TaskScheduler discards SKIP_ON_SHUTDOWN and
// CONTINUE_ON_SHUTDOWN tasks and runs BLOCK_SHUTDOWN tasks. However, since it
// doesn't provide a way to run a callback from inside its Shutdown() method,
// it would be hard to make this test work with redirection enabled.
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
return;
// Start tasks to take all the threads and block them. // Start tasks to take all the threads and block them.
EnsureAllWorkersCreated(); EnsureAllWorkersCreated();
ThreadBlocker blocker; ThreadBlocker blocker;
...@@ -661,7 +727,7 @@ TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { ...@@ -661,7 +727,7 @@ TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
} }
// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { TEST_P(SequencedWorkerPoolTest, ContinueOnShutdown) {
scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior( scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
scoped_refptr<SequencedTaskRunner> sequenced_runner( scoped_refptr<SequencedTaskRunner> sequenced_runner(
...@@ -688,6 +754,8 @@ TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { ...@@ -688,6 +754,8 @@ TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
// This should not block. If this test hangs, it means it failed. // This should not block. If this test hangs, it means it failed.
pool()->Shutdown(); pool()->Shutdown();
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
TaskScheduler::GetInstance()->Shutdown();
// The task should not have completed yet. // The task should not have completed yet.
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
...@@ -709,7 +777,16 @@ TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { ...@@ -709,7 +777,16 @@ TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
// Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
// until they stop, but tasks not yet started do not. // until they stop, but tasks not yet started do not.
TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) { TEST_P(SequencedWorkerPoolTest, SkipOnShutdown) {
// As tested by
// TaskSchedulerTaskTrackerTest.WillPostAndRunLongTaskBeforeShutdown and
// TaskSchedulerTaskTrackerTest.WillPostBeforeShutdownRunDuringShutdown, the
// TaskScheduler correctly handles SKIP_ON_SHUTDOWN tasks. However, since it
// doesn't provide a way to run a callback from inside its Shutdown() method,
// it would be hard to make this test work with redirection enabled.
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
return;
// Start tasks to take all the threads and block them. // Start tasks to take all the threads and block them.
EnsureAllWorkersCreated(); EnsureAllWorkersCreated();
ThreadBlocker blocker; ThreadBlocker blocker;
...@@ -760,7 +837,11 @@ TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) { ...@@ -760,7 +837,11 @@ TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
// Ensure all worker threads are created, and then trigger a spurious // Ensure all worker threads are created, and then trigger a spurious
// work signal. This shouldn't cause any other work signals to be // work signal. This shouldn't cause any other work signals to be
// triggered. This is a regression test for http://crbug.com/117469. // triggered. This is a regression test for http://crbug.com/117469.
TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) { TEST_P(SequencedWorkerPoolTest, SpuriousWorkSignal) {
// This test doesn't apply when tasks are redirected to the TaskScheduler.
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
return;
EnsureAllWorkersCreated(); EnsureAllWorkersCreated();
int old_has_work_call_count = has_work_call_count(); int old_has_work_call_count = has_work_call_count();
pool()->SignalHasWorkForTesting(); pool()->SignalHasWorkForTesting();
...@@ -770,6 +851,7 @@ TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) { ...@@ -770,6 +851,7 @@ TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
} }
void VerifyRunsTasksOnCurrentThread( void VerifyRunsTasksOnCurrentThread(
SequencedWorkerPoolRedirection redirection,
scoped_refptr<TaskRunner> test_positive_task_runner, scoped_refptr<TaskRunner> test_positive_task_runner,
scoped_refptr<TaskRunner> test_negative_task_runner, scoped_refptr<TaskRunner> test_negative_task_runner,
SequencedWorkerPool* pool, SequencedWorkerPool* pool,
...@@ -777,12 +859,18 @@ void VerifyRunsTasksOnCurrentThread( ...@@ -777,12 +859,18 @@ void VerifyRunsTasksOnCurrentThread(
EXPECT_TRUE(test_positive_task_runner->RunsTasksOnCurrentThread()); EXPECT_TRUE(test_positive_task_runner->RunsTasksOnCurrentThread());
EXPECT_FALSE(test_negative_task_runner->RunsTasksOnCurrentThread()); EXPECT_FALSE(test_negative_task_runner->RunsTasksOnCurrentThread());
EXPECT_TRUE(pool->RunsTasksOnCurrentThread()); EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
// Tasks posted to different SequencedWorkerPools may run on the same
// TaskScheduler threads.
if (redirection == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
EXPECT_TRUE(unused_pool->RunsTasksOnCurrentThread());
else
EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
} }
// Verify correctness of the RunsTasksOnCurrentThread() method on // Verify correctness of the RunsTasksOnCurrentThread() method on
// SequencedWorkerPool and on TaskRunners it returns. // SequencedWorkerPool and on TaskRunners it returns.
TEST_F(SequencedWorkerPoolTest, RunsTasksOnCurrentThread) { TEST_P(SequencedWorkerPoolTest, RunsTasksOnCurrentThread) {
const scoped_refptr<SequencedTaskRunner> sequenced_task_runner_1 = const scoped_refptr<SequencedTaskRunner> sequenced_task_runner_1 =
pool()->GetSequencedTaskRunner(SequencedWorkerPool::GetSequenceToken()); pool()->GetSequencedTaskRunner(SequencedWorkerPool::GetSequenceToken());
const scoped_refptr<SequencedTaskRunner> sequenced_task_runner_2 = const scoped_refptr<SequencedTaskRunner> sequenced_task_runner_2 =
...@@ -805,26 +893,29 @@ TEST_F(SequencedWorkerPoolTest, RunsTasksOnCurrentThread) { ...@@ -805,26 +893,29 @@ TEST_F(SequencedWorkerPoolTest, RunsTasksOnCurrentThread) {
// - pool()->RunsTasksOnCurrentThread() returns true. // - pool()->RunsTasksOnCurrentThread() returns true.
// - unused_pool_owner.pool()->RunsTasksOnCurrentThread() returns false. // - unused_pool_owner.pool()->RunsTasksOnCurrentThread() returns false.
sequenced_task_runner_1->PostTask( sequenced_task_runner_1->PostTask(
FROM_HERE, FROM_HERE, base::Bind(&VerifyRunsTasksOnCurrentThread, GetParam(),
base::Bind(&VerifyRunsTasksOnCurrentThread, sequenced_task_runner_1, sequenced_task_runner_1, sequenced_task_runner_2,
sequenced_task_runner_2, base::RetainedRef(pool()), base::RetainedRef(pool()),
base::RetainedRef(unused_pool_owner.pool()))); base::RetainedRef(unused_pool_owner.pool())));
// From a task posted to |unsequenced_task_runner|: // From a task posted to |unsequenced_task_runner|:
// - unsequenced_task_runner->RunsTasksOnCurrentThread() returns true. // - unsequenced_task_runner->RunsTasksOnCurrentThread() returns true.
// - sequenced_task_runner_1->RunsTasksOnCurrentThread() returns false. // - sequenced_task_runner_1->RunsTasksOnCurrentThread() returns false.
// - pool()->RunsTasksOnCurrentThread() returns true. // - pool()->RunsTasksOnCurrentThread() returns true.
// - unused_pool_owner.pool()->RunsTasksOnCurrentThread() returns false. // - unused_pool_owner.pool()->RunsTasksOnCurrentThread() returns false.
unsequenced_task_runner->PostTask( unsequenced_task_runner->PostTask(
FROM_HERE, FROM_HERE, base::Bind(&VerifyRunsTasksOnCurrentThread, GetParam(),
base::Bind(&VerifyRunsTasksOnCurrentThread, unsequenced_task_runner, unsequenced_task_runner, sequenced_task_runner_1,
sequenced_task_runner_1, base::RetainedRef(pool()), base::RetainedRef(pool()),
base::RetainedRef(unused_pool_owner.pool()))); base::RetainedRef(unused_pool_owner.pool())));
} }
// Checks that tasks are destroyed in the right context during shutdown. If a // Checks that tasks are destroyed in the right context during shutdown. If a
// task is destroyed while SequencedWorkerPool's global lock is held, // task is destroyed while SequencedWorkerPool's global lock is held,
// SequencedWorkerPool might deadlock. // SequencedWorkerPool might deadlock.
TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) { TEST_P(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) {
// Note: TaskScheduler destroys tasks when it is deleted rather than on
// shutdown. In production, it should never be destroyed.
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
scoped_refptr<DestructionDeadlockChecker> checker( scoped_refptr<DestructionDeadlockChecker> checker(
new DestructionDeadlockChecker(pool())); new DestructionDeadlockChecker(pool()));
...@@ -838,8 +929,15 @@ TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) { ...@@ -838,8 +929,15 @@ TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) {
// Similar to the test AvoidsDeadlockOnShutdown, but there are now also // Similar to the test AvoidsDeadlockOnShutdown, but there are now also
// sequenced, blocking tasks in the queue during shutdown. // sequenced, blocking tasks in the queue during shutdown.
TEST_F(SequencedWorkerPoolTest, TEST_P(SequencedWorkerPoolTest,
AvoidsDeadlockOnShutdownWithSequencedBlockingTasks) { AvoidsDeadlockOnShutdownWithSequencedBlockingTasks) {
// This test continuously posts BLOCK_SHUTDOWN tasks
// (PostRepostingBlockingTask). It can't run when tasks are redirected to
// TaskScheduler because TaskScheduler doesn't provide a way to limit the
// number of BLOCK_SHUTDOWN tasks posted during shutdown.
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
return;
const std::string sequence_token_name("name"); const std::string sequence_token_name("name");
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
scoped_refptr<DestructionDeadlockChecker> checker( scoped_refptr<DestructionDeadlockChecker> checker(
...@@ -857,7 +955,12 @@ TEST_F(SequencedWorkerPoolTest, ...@@ -857,7 +955,12 @@ TEST_F(SequencedWorkerPoolTest,
} }
// Verify that FlushForTesting works as intended. // Verify that FlushForTesting works as intended.
TEST_F(SequencedWorkerPoolTest, FlushForTesting) { TEST_P(SequencedWorkerPoolTest, FlushForTesting) {
// FlushForTesting() can't be called when tasks are redirected to the
// TaskScheduler.
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
return;
// Should be fine to call on a new instance. // Should be fine to call on a new instance.
pool()->FlushForTesting(); pool()->FlushForTesting();
...@@ -912,7 +1015,15 @@ void CheckWorkerPoolAndSequenceToken( ...@@ -912,7 +1015,15 @@ void CheckWorkerPoolAndSequenceToken(
} // namespace } // namespace
TEST_F(SequencedWorkerPoolTest, GetWorkerPoolAndSequenceTokenForCurrentThread) { TEST_P(SequencedWorkerPoolTest, GetWorkerPoolAndSequenceTokenForCurrentThread) {
// GetSequenceTokenForCurrentThread() and GetWorkerPoolForCurrentThread()
// respectively return an invalid token and nullptr from a task posted to a
// SequencedWorkerPool when redirection to TaskScheduler is enabled. These
// methods are only used from SequencedTaskRunnerHandle and
// SequenceCheckerImpl which work fine in TaskScheduler.
if (GetParam() == SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER)
return;
EnsureAllWorkersCreated(); EnsureAllWorkersCreated();
// The current thread should have neither a worker pool nor a sequence token. // The current thread should have neither a worker pool nor a sequence token.
...@@ -939,7 +1050,7 @@ TEST_F(SequencedWorkerPoolTest, GetWorkerPoolAndSequenceTokenForCurrentThread) { ...@@ -939,7 +1050,7 @@ TEST_F(SequencedWorkerPoolTest, GetWorkerPoolAndSequenceTokenForCurrentThread) {
pool()->FlushForTesting(); pool()->FlushForTesting();
} }
TEST_F(SequencedWorkerPoolTest, ShutsDownCleanWithContinueOnShutdown) { TEST_P(SequencedWorkerPoolTest, ShutsDownCleanWithContinueOnShutdown) {
scoped_refptr<SequencedTaskRunner> task_runner = scoped_refptr<SequencedTaskRunner> task_runner =
pool()->GetSequencedTaskRunnerWithShutdownBehavior( pool()->GetSequencedTaskRunnerWithShutdownBehavior(
pool()->GetSequenceToken(), pool()->GetSequenceToken(),
...@@ -949,6 +1060,15 @@ TEST_F(SequencedWorkerPoolTest, ShutsDownCleanWithContinueOnShutdown) { ...@@ -949,6 +1060,15 @@ TEST_F(SequencedWorkerPoolTest, ShutsDownCleanWithContinueOnShutdown) {
pool()->Shutdown(); pool()->Shutdown();
} }
INSTANTIATE_TEST_CASE_P(
NoRedirection,
SequencedWorkerPoolTest,
::testing::Values(SequencedWorkerPoolRedirection::NONE));
INSTANTIATE_TEST_CASE_P(
RedirectionToTaskScheduler,
SequencedWorkerPoolTest,
::testing::Values(SequencedWorkerPoolRedirection::TO_TASK_SCHEDULER));
class SequencedWorkerPoolTaskRunnerTestDelegate { class SequencedWorkerPoolTaskRunnerTestDelegate {
public: public:
SequencedWorkerPoolTaskRunnerTestDelegate() {} SequencedWorkerPoolTaskRunnerTestDelegate() {}
......
...@@ -450,8 +450,7 @@ void MaybeInitializeTaskScheduler() { ...@@ -450,8 +450,7 @@ void MaybeInitializeTaskScheduler() {
variation_params.find("RedirectSequencedWorkerPools"); variation_params.find("RedirectSequencedWorkerPools");
if (sequenced_worker_pool_param != variation_params.end() && if (sequenced_worker_pool_param != variation_params.end() &&
sequenced_worker_pool_param->second == "true") { sequenced_worker_pool_param->second == "true") {
base::SequencedWorkerPool:: base::SequencedWorkerPool::RedirectToTaskSchedulerForProcess();
RedirectSequencedWorkerPoolsToTaskSchedulerForProcess();
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment