Commit e4e020b9 authored by François Doray's avatar François Doray Committed by Commit Bot

ThreadPool: Add base::CreateUpdateableSequencedTaskRunnerWithTraits().

This function creates a SequencedTaskRunner whose priority can be
updated at any time.

To update the priority from BEST_EFFORT to USER_VISIBLE/USER_BLOCKING,
a ThreadPolicy must be specified.

Bug: 889029
Change-Id: I4e6d119eda0a4a880922d49f97f84c30e8725516
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1532826
Commit-Queue: François Doray <fdoray@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#664774}
parent 2148146b
......@@ -52,10 +52,8 @@ TaskTraits PostTaskAndroid::CreateTaskTraits(
jbyte extension_id,
const base::android::JavaParamRef<jbyteArray>& extension_data) {
return TaskTraits(priority_set_explicitly,
static_cast<TaskPriority>(priority),
/* shutdown_behavior_set_explicitly */ false,
TaskShutdownBehavior::SKIP_ON_SHUTDOWN, may_block,
/* with_base_sync_primitives */ false, use_thread_pool,
static_cast<TaskPriority>(priority), may_block,
use_thread_pool,
TaskTraitsExtensionStorage(
extension_id, GetExtensionData(env, extension_data)));
}
......
......@@ -110,6 +110,24 @@ scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunnerWithTraits(
->CreateSequencedTaskRunnerWithTraits(adjusted_traits);
}
scoped_refptr<UpdateableSequencedTaskRunner>
CreateUpdateableSequencedTaskRunnerWithTraits(const TaskTraits& traits) {
DCHECK(ThreadPoolInstance::Get())
<< "Ref. Prerequisite section of post_task.h.\n\n"
"Hint: if this is in a unit test, you're likely merely missing a "
"base::test::ScopedTaskEnvironment member in your fixture.\n";
DCHECK(traits.use_thread_pool())
<< "The base::UseThreadPool() trait is mandatory with "
"CreateUpdateableSequencedTaskRunnerWithTraits().";
CHECK_EQ(traits.extension_id(),
TaskTraitsExtensionStorage::kInvalidExtensionId)
<< "Extension traits cannot be used with "
"CreateUpdateableSequencedTaskRunnerWithTraits().";
const TaskTraits adjusted_traits = GetTaskTraitsWithExplicitPriority(traits);
return static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get())
->CreateUpdateableSequencedTaskRunnerWithTraits(adjusted_traits);
}
scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunnerWithTraits(
const TaskTraits& traits,
SingleThreadTaskRunnerThreadMode thread_mode) {
......
......@@ -21,6 +21,7 @@
#include "base/task/task_traits.h"
#include "base/task_runner.h"
#include "base/time/time.h"
#include "base/updateable_sequenced_task_runner.h"
#include "build/build_config.h"
namespace base {
......@@ -178,6 +179,20 @@ BASE_EXPORT scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits(
BASE_EXPORT scoped_refptr<SequencedTaskRunner>
CreateSequencedTaskRunnerWithTraits(const TaskTraits& traits);
// Returns a task runner whose PostTask invocations result in scheduling tasks
// using |traits|. The priority in |traits| can be updated at any time via
// UpdateableSequencedTaskRunner::UpdatePriority(). An update affects all tasks
// posted to the task runner that aren't running yet. Tasks run one at a time in
// posting order.
//
// |traits| requirements:
// - base::ThreadPool() must be specified.
// - Extension traits (e.g. BrowserThread) cannot be specified.
// - base::ThreadPolicy must be specified if the priority of the task runner
// will ever be increased from BEST_EFFORT.
BASE_EXPORT scoped_refptr<UpdateableSequencedTaskRunner>
CreateUpdateableSequencedTaskRunnerWithTraits(const TaskTraits& traits);
// Returns a SingleThreadTaskRunner whose PostTask invocations result in
// scheduling tasks using |traits| on a thread determined by |thread_mode|. See
// base/task/single_thread_task_runner_thread_mode.h for |thread_mode| details.
......
This diff is collapsed.
......@@ -12,7 +12,9 @@ TEST(TaskTraitsTest, Default) {
constexpr TaskTraits traits = {};
EXPECT_FALSE(traits.priority_set_explicitly());
EXPECT_EQ(TaskPriority::USER_BLOCKING, traits.priority());
EXPECT_FALSE(traits.shutdown_behavior_set_explicitly());
EXPECT_EQ(TaskShutdownBehavior::SKIP_ON_SHUTDOWN, traits.shutdown_behavior());
EXPECT_FALSE(traits.thread_policy_set_explicitly());
EXPECT_EQ(ThreadPolicy::PREFER_BACKGROUND, traits.thread_policy());
EXPECT_FALSE(traits.may_block());
EXPECT_FALSE(traits.with_base_sync_primitives());
......@@ -22,7 +24,9 @@ TEST(TaskTraitsTest, TaskPriority) {
constexpr TaskTraits traits = {TaskPriority::BEST_EFFORT};
EXPECT_TRUE(traits.priority_set_explicitly());
EXPECT_EQ(TaskPriority::BEST_EFFORT, traits.priority());
EXPECT_FALSE(traits.shutdown_behavior_set_explicitly());
EXPECT_EQ(TaskShutdownBehavior::SKIP_ON_SHUTDOWN, traits.shutdown_behavior());
EXPECT_FALSE(traits.thread_policy_set_explicitly());
EXPECT_EQ(ThreadPolicy::PREFER_BACKGROUND, traits.thread_policy());
EXPECT_FALSE(traits.may_block());
EXPECT_FALSE(traits.with_base_sync_primitives());
......@@ -32,7 +36,9 @@ TEST(TaskTraitsTest, TaskShutdownBehavior) {
constexpr TaskTraits traits = {TaskShutdownBehavior::BLOCK_SHUTDOWN};
EXPECT_FALSE(traits.priority_set_explicitly());
EXPECT_EQ(TaskPriority::USER_BLOCKING, traits.priority());
EXPECT_TRUE(traits.shutdown_behavior_set_explicitly());
EXPECT_EQ(TaskShutdownBehavior::BLOCK_SHUTDOWN, traits.shutdown_behavior());
EXPECT_FALSE(traits.thread_policy_set_explicitly());
EXPECT_EQ(ThreadPolicy::PREFER_BACKGROUND, traits.thread_policy());
EXPECT_FALSE(traits.may_block());
EXPECT_FALSE(traits.with_base_sync_primitives());
......@@ -42,7 +48,9 @@ TEST(TaskTraitsTest, ThreadPolicy) {
constexpr TaskTraits traits = {ThreadPolicy::MUST_USE_FOREGROUND};
EXPECT_FALSE(traits.priority_set_explicitly());
EXPECT_EQ(TaskPriority::USER_BLOCKING, traits.priority());
EXPECT_FALSE(traits.shutdown_behavior_set_explicitly());
EXPECT_EQ(TaskShutdownBehavior::SKIP_ON_SHUTDOWN, traits.shutdown_behavior());
EXPECT_TRUE(traits.thread_policy_set_explicitly());
EXPECT_EQ(ThreadPolicy::MUST_USE_FOREGROUND, traits.thread_policy());
EXPECT_FALSE(traits.may_block());
EXPECT_FALSE(traits.with_base_sync_primitives());
......@@ -52,7 +60,9 @@ TEST(TaskTraitsTest, MayBlock) {
constexpr TaskTraits traits = {MayBlock()};
EXPECT_FALSE(traits.priority_set_explicitly());
EXPECT_EQ(TaskPriority::USER_BLOCKING, traits.priority());
EXPECT_FALSE(traits.shutdown_behavior_set_explicitly());
EXPECT_EQ(TaskShutdownBehavior::SKIP_ON_SHUTDOWN, traits.shutdown_behavior());
EXPECT_FALSE(traits.thread_policy_set_explicitly());
EXPECT_EQ(ThreadPolicy::PREFER_BACKGROUND, traits.thread_policy());
EXPECT_TRUE(traits.may_block());
EXPECT_FALSE(traits.with_base_sync_primitives());
......@@ -62,7 +72,9 @@ TEST(TaskTraitsTest, WithBaseSyncPrimitives) {
constexpr TaskTraits traits = {WithBaseSyncPrimitives()};
EXPECT_FALSE(traits.priority_set_explicitly());
EXPECT_EQ(TaskPriority::USER_BLOCKING, traits.priority());
EXPECT_FALSE(traits.shutdown_behavior_set_explicitly());
EXPECT_EQ(TaskShutdownBehavior::SKIP_ON_SHUTDOWN, traits.shutdown_behavior());
EXPECT_FALSE(traits.thread_policy_set_explicitly());
EXPECT_EQ(ThreadPolicy::PREFER_BACKGROUND, traits.thread_policy());
EXPECT_FALSE(traits.may_block());
EXPECT_TRUE(traits.with_base_sync_primitives());
......@@ -74,7 +86,9 @@ TEST(TaskTraitsTest, MultipleTraits) {
ThreadPolicy::MUST_USE_FOREGROUND, MayBlock(), WithBaseSyncPrimitives()};
EXPECT_TRUE(traits.priority_set_explicitly());
EXPECT_EQ(TaskPriority::BEST_EFFORT, traits.priority());
EXPECT_TRUE(traits.shutdown_behavior_set_explicitly());
EXPECT_EQ(TaskShutdownBehavior::BLOCK_SHUTDOWN, traits.shutdown_behavior());
EXPECT_TRUE(traits.thread_policy_set_explicitly());
EXPECT_EQ(ThreadPolicy::MUST_USE_FOREGROUND, traits.thread_policy());
EXPECT_TRUE(traits.may_block());
EXPECT_TRUE(traits.with_base_sync_primitives());
......@@ -85,10 +99,17 @@ TEST(TaskTraitsTest, Copy) {
TaskPriority::BEST_EFFORT, TaskShutdownBehavior::BLOCK_SHUTDOWN,
ThreadPolicy::MUST_USE_FOREGROUND, MayBlock(), WithBaseSyncPrimitives()};
constexpr TaskTraits traits_copy(traits);
EXPECT_EQ(traits, traits_copy);
EXPECT_EQ(traits.priority_set_explicitly(),
traits_copy.priority_set_explicitly());
EXPECT_EQ(traits.priority(), traits_copy.priority());
EXPECT_EQ(traits.shutdown_behavior_set_explicitly(),
traits_copy.shutdown_behavior_set_explicitly());
EXPECT_EQ(traits.shutdown_behavior(), traits_copy.shutdown_behavior());
EXPECT_EQ(traits.thread_policy_set_explicitly(),
traits_copy.thread_policy_set_explicitly());
EXPECT_EQ(traits.thread_policy(), traits_copy.thread_policy());
EXPECT_EQ(traits.may_block(), traits_copy.may_block());
EXPECT_EQ(traits.with_base_sync_primitives(),
......
......@@ -256,7 +256,7 @@ ThreadPoolImpl::CreateCOMSTATaskRunnerWithTraits(
#endif // defined(OS_WIN)
scoped_refptr<UpdateableSequencedTaskRunner>
ThreadPoolImpl::CreateUpdateableSequencedTaskRunnerWithTraitsForTesting(
ThreadPoolImpl::CreateUpdateableSequencedTaskRunnerWithTraits(
const TaskTraits& traits) {
const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits);
return MakeRefCounted<PooledSequencedTaskRunner>(new_traits, this);
......@@ -383,6 +383,16 @@ void ThreadPoolImpl::UpdatePriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) {
auto transaction = task_source->BeginTransaction();
if (transaction.traits().priority() == priority)
return;
if (transaction.traits().priority() == TaskPriority::BEST_EFFORT) {
DCHECK(transaction.traits().thread_policy_set_explicitly())
<< "A ThreadPolicy must be specified in the TaskTraits of an "
"UpdateableSequencedTaskRunner whose priority is increased from "
"BEST_EFFORT. See ThreadPolicy documentation.";
}
ThreadGroup* const current_thread_group =
GetThreadGroupForTraits(transaction.traits());
transaction.UpdatePriority(priority);
......
......@@ -99,8 +99,7 @@ class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance,
SingleThreadTaskRunnerThreadMode thread_mode) override;
#endif // defined(OS_WIN)
scoped_refptr<UpdateableSequencedTaskRunner>
CreateUpdateableSequencedTaskRunnerWithTraitsForTesting(
const TaskTraits& traits);
CreateUpdateableSequencedTaskRunnerWithTraits(const TaskTraits& traits);
private:
// Invoked after |has_fence_| or |has_best_effort_fence_| is updated. Sets the
......
......@@ -246,7 +246,7 @@ std::vector<TraitsExecutionModePair> GetTraitsExecutionModePair() {
}
class ThreadPoolImplTestBase : public testing::Test {
protected:
public:
ThreadPoolImplTestBase() : thread_pool_("Test") {}
void EnableAllTasksUserBlocking() {
......@@ -1100,77 +1100,103 @@ struct TaskRunnerAndEvents {
: task_runner(std::move(task_runner)),
updated_priority(updated_priority),
expected_previous_event(expected_previous_event) {}
// The UpdateableSequencedTaskRunner.
scoped_refptr<UpdateableSequencedTaskRunner> task_runner;
// The priority to use in UpdatePriority().
const TaskPriority updated_priority;
// Signaled when a task blocking |task_runner| is scheduled.
WaitableEvent scheduled;
// Signaled to release the task blocking |task_runner|.
WaitableEvent blocked;
// Signaled in the task that runs following the priority update.
WaitableEvent task_ran;
// An event that should be signaled before the task following the priority
// update runs.
WaitableEvent* expected_previous_event;
};
// Create a series of sample task runners that will post tasks at various
// initial priorities, then update priority.
std::vector<std::unique_ptr<TaskRunnerAndEvents>> CreateTaskRunnersAndEvents(
ThreadPoolImpl* thread_pool) {
ThreadPoolImpl* thread_pool,
ThreadPolicy thread_policy) {
std::vector<std::unique_ptr<TaskRunnerAndEvents>> task_runners_and_events;
// -----
// Task runner that will start as USER_VISIBLE and update to USER_BLOCKING.
// Its task is expected to run first.
task_runners_and_events.push_back(std::make_unique<TaskRunnerAndEvents>(
thread_pool->CreateUpdateableSequencedTaskRunnerWithTraitsForTesting(
TaskTraits({TaskPriority::USER_VISIBLE})),
thread_pool->CreateUpdateableSequencedTaskRunnerWithTraits(
TaskTraits({TaskPriority::USER_VISIBLE, thread_policy})),
TaskPriority::USER_BLOCKING, nullptr));
// -----
// Task runner that will start as BEST_EFFORT and update to USER_VISIBLE.
// Its task is expected to run after the USER_BLOCKING task runner's task.
task_runners_and_events.push_back(std::make_unique<TaskRunnerAndEvents>(
thread_pool->CreateUpdateableSequencedTaskRunnerWithTraitsForTesting(
TaskTraits({TaskPriority::BEST_EFFORT})),
thread_pool->CreateUpdateableSequencedTaskRunnerWithTraits(
TaskTraits({TaskPriority::BEST_EFFORT, thread_policy})),
TaskPriority::USER_VISIBLE, &task_runners_and_events.back()->task_ran));
// Task runner that will start as USER_BLOCKING and update to BEST_EFFORT.
// Its task is expected to run asynchronously with the other two task task
// runners' tasks if background thread groups exist, or after the
// USER_VISIBLE task runner's task if not.
task_runners_and_events.push_back(std::make_unique<TaskRunnerAndEvents>(
thread_pool->CreateUpdateableSequencedTaskRunnerWithTraitsForTesting(
TaskTraits({TaskPriority::USER_BLOCKING})),
TaskPriority::BEST_EFFORT,
// -----
// Task runner that will start as USER_BLOCKING and update to BEST_EFFORT. Its
// task is expected to run asynchronously with the other two task runners'
// tasks if background thread groups exist, or after the USER_VISIBLE task
// runner's task if not.
//
// If the task following the priority update is expected to run in the
// foreground group, it should be after the task posted to the TaskRunner
// whose priority is updated to USER_VISIBLE.
WaitableEvent* expected_previous_event =
CanUseBackgroundPriorityForWorkerThread()
? nullptr
: &task_runners_and_events.back()->task_ran));
: &task_runners_and_events.back()->task_ran;
task_runners_and_events.push_back(std::make_unique<TaskRunnerAndEvents>(
thread_pool->CreateUpdateableSequencedTaskRunnerWithTraits(
TaskTraits({TaskPriority::USER_BLOCKING, thread_policy})),
TaskPriority::BEST_EFFORT, expected_previous_event));
return task_runners_and_events;
}
} // namespace
// Update the priority of a sequence when it is not scheduled.
TEST_P(ThreadPoolImplTest, UpdatePrioritySequenceNotScheduled) {
void TestUpdatePrioritySequenceNotScheduled(ThreadPoolImplTest* test,
ThreadPolicy thread_policy) {
// This test verifies that tasks run in priority order. With more than 1
// thread per pool, it is possible that tasks don't run in order even if
// threads got tasks from the PriorityQueue in order. Therefore, enforce a
// maximum of 1 thread per pool.
constexpr int kLocalMaxNumForegroundThreads = 1;
StartThreadPool(kLocalMaxNumForegroundThreads);
auto task_runners_and_events = CreateTaskRunnersAndEvents(&thread_pool_);
test->StartThreadPool(kLocalMaxNumForegroundThreads);
auto task_runners_and_events =
CreateTaskRunnersAndEvents(&test->thread_pool_, thread_policy);
// Prevent tasks from running.
thread_pool_.SetHasFence(true);
test->thread_pool_.SetHasFence(true);
// Post tasks to multiple task runners while they are at initial priority.
// They won't run immediately because of the call to SetHasFence(true) above.
for (auto& task_runner_and_events : task_runners_and_events) {
task_runner_and_events->task_runner->PostTask(
FROM_HERE,
BindOnce(&VerifyOrderAndTaskEnvironmentAndSignalEvent,
task_runner_and_events->updated_priority, GetPoolType(),
BindOnce(
&VerifyOrderAndTaskEnvironmentAndSignalEvent,
TaskTraits(task_runner_and_events->updated_priority, thread_policy),
test->GetPoolType(),
// Native pools ignore the maximum number of threads per pool
// and therefore don't guarantee that tasks run in priority
// order (see comment at beginning of test).
Unretained(
#if HAS_NATIVE_THREAD_POOL()
GetPoolType() == test::PoolType::NATIVE
test->GetPoolType() == test::PoolType::NATIVE
? nullptr
:
#endif
......@@ -1185,7 +1211,7 @@ TEST_P(ThreadPoolImplTest, UpdatePrioritySequenceNotScheduled) {
}
// Allow tasks to run.
thread_pool_.SetHasFence(false);
test->thread_pool_.SetHasFence(false);
for (auto& task_runner_and_events : task_runners_and_events)
test::WaitWithoutBlockingObserver(&task_runner_and_events->task_ran);
......@@ -1193,9 +1219,11 @@ TEST_P(ThreadPoolImplTest, UpdatePrioritySequenceNotScheduled) {
// Update the priority of a sequence when it is scheduled, i.e. not currently
// in a priority queue.
TEST_P(ThreadPoolImplTest, UpdatePrioritySequenceScheduled) {
StartThreadPool();
auto task_runners_and_events = CreateTaskRunnersAndEvents(&thread_pool_);
void TestUpdatePrioritySequenceScheduled(ThreadPoolImplTest* test,
ThreadPolicy thread_policy) {
test->StartThreadPool();
auto task_runners_and_events =
CreateTaskRunnersAndEvents(&test->thread_pool_, thread_policy);
// Post blocking tasks to all task runners to prevent tasks from being
// scheduled later in the test.
......@@ -1222,9 +1250,10 @@ TEST_P(ThreadPoolImplTest, UpdatePrioritySequenceScheduled) {
for (auto& task_runner_and_events : task_runners_and_events) {
task_runner_and_events->task_runner->PostTask(
FROM_HERE,
BindOnce(&VerifyOrderAndTaskEnvironmentAndSignalEvent,
TaskTraits(task_runner_and_events->updated_priority),
GetPoolType(),
BindOnce(
&VerifyOrderAndTaskEnvironmentAndSignalEvent,
TaskTraits(task_runner_and_events->updated_priority, thread_policy),
test->GetPoolType(),
Unretained(task_runner_and_events->expected_previous_event),
Unretained(&task_runner_and_events->task_ran)));
}
......@@ -1238,6 +1267,47 @@ TEST_P(ThreadPoolImplTest, UpdatePrioritySequenceScheduled) {
}
}
} // namespace
TEST_P(ThreadPoolImplTest,
UpdatePrioritySequenceNotScheduled_PreferBackground) {
TestUpdatePrioritySequenceNotScheduled(this, ThreadPolicy::PREFER_BACKGROUND);
}
TEST_P(ThreadPoolImplTest,
UpdatePrioritySequenceNotScheduled_MustUseForeground) {
TestUpdatePrioritySequenceNotScheduled(this,
ThreadPolicy::MUST_USE_FOREGROUND);
}
TEST_P(ThreadPoolImplTest, UpdatePrioritySequenceScheduled_PreferBackground) {
TestUpdatePrioritySequenceScheduled(this, ThreadPolicy::PREFER_BACKGROUND);
}
TEST_P(ThreadPoolImplTest, UpdatePrioritySequenceScheduled_MustUseForeground) {
TestUpdatePrioritySequenceScheduled(this, ThreadPolicy::MUST_USE_FOREGROUND);
}
// Verify that a ThreadPolicy has to be specified in TaskTraits to increase
// TaskPriority from BEST_EFFORT.
TEST_P(ThreadPoolImplTest, UpdatePriorityFromBestEffortNoThreadPolicy) {
StartThreadPool();
{
auto task_runner =
thread_pool_.CreateUpdateableSequencedTaskRunnerWithTraits(
{TaskPriority::BEST_EFFORT});
EXPECT_DCHECK_DEATH(
{ task_runner->UpdatePriority(TaskPriority::USER_VISIBLE); });
}
{
auto task_runner =
thread_pool_.CreateUpdateableSequencedTaskRunnerWithTraits(
{TaskPriority::BEST_EFFORT});
EXPECT_DCHECK_DEATH(
{ task_runner->UpdatePriority(TaskPriority::USER_BLOCKING); });
}
}
INSTANTIATE_TEST_SUITE_P(,
ThreadPoolImplTest,
::testing::Values(test::PoolType::GENERIC
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment