Commit cee951ec authored by robliao's avatar robliao Committed by Commit bot

Introduce SchedulerSingleThreadTaskRunnerManager

This component provides one dedicated thread per
SingleThreadTaskRunner.

Reference Change:
https://codereview.chromium.org/2686593003/

BUG=684080, 694823, 697697

Review-Url: https://codereview.chromium.org/2698843006
Cr-Original-Commit-Position: refs/heads/master@{#453162}
Committed: https://chromium.googlesource.com/chromium/src/+/fc203f52e046a8d82e053525d764671f7128f3d4
Review-Url: https://codereview.chromium.org/2698843006
Cr-Commit-Position: refs/heads/master@{#456148}
parent 67435cd8
...@@ -850,6 +850,8 @@ component("base") { ...@@ -850,6 +850,8 @@ component("base") {
"task_scheduler/scheduler_lock.h", "task_scheduler/scheduler_lock.h",
"task_scheduler/scheduler_lock_impl.cc", "task_scheduler/scheduler_lock_impl.cc",
"task_scheduler/scheduler_lock_impl.h", "task_scheduler/scheduler_lock_impl.h",
"task_scheduler/scheduler_single_thread_task_runner_manager.cc",
"task_scheduler/scheduler_single_thread_task_runner_manager.h",
"task_scheduler/scheduler_worker.cc", "task_scheduler/scheduler_worker.cc",
"task_scheduler/scheduler_worker.h", "task_scheduler/scheduler_worker.h",
"task_scheduler/scheduler_worker_pool.h", "task_scheduler/scheduler_worker_pool.h",
...@@ -2095,6 +2097,7 @@ test("base_unittests") { ...@@ -2095,6 +2097,7 @@ test("base_unittests") {
"task_scheduler/delayed_task_manager_unittest.cc", "task_scheduler/delayed_task_manager_unittest.cc",
"task_scheduler/priority_queue_unittest.cc", "task_scheduler/priority_queue_unittest.cc",
"task_scheduler/scheduler_lock_unittest.cc", "task_scheduler/scheduler_lock_unittest.cc",
"task_scheduler/scheduler_single_thread_task_runner_manager_unittest.cc",
"task_scheduler/scheduler_worker_pool_impl_unittest.cc", "task_scheduler/scheduler_worker_pool_impl_unittest.cc",
"task_scheduler/scheduler_worker_stack_unittest.cc", "task_scheduler/scheduler_worker_stack_unittest.cc",
"task_scheduler/scheduler_worker_unittest.cc", "task_scheduler/scheduler_worker_unittest.cc",
......
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_SCHEDULER_SCHEDULER_SINGLE_THREAD_TASK_RUNNER_MANAGER_H_
#define BASE_TASK_SCHEDULER_SCHEDULER_SINGLE_THREAD_TASK_RUNNER_MANAGER_H_
#include <vector>
#include "base/atomicops.h"
#include "base/base_export.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/task_scheduler/scheduler_lock.h"
#include "base/task_scheduler/scheduler_worker_pool_params.h"
#include "base/task_scheduler/task_scheduler.h"
namespace base {
class TaskTraits;
class SingleThreadTaskRunner;
namespace internal {
class DelayedTaskManager;
class SchedulerWorker;
class TaskTracker;
class BASE_EXPORT SchedulerSingleThreadTaskRunnerManager final {
public:
SchedulerSingleThreadTaskRunnerManager(
const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
const TaskScheduler::WorkerPoolIndexForTraitsCallback&
worker_pool_index_for_traits_callback,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager);
~SchedulerSingleThreadTaskRunnerManager();
scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunnerWithTraits(
const TaskTraits& traits);
void JoinForTesting();
private:
class SchedulerSingleThreadTaskRunner;
SchedulerWorker* CreateAndRegisterSchedulerWorker(
const SchedulerWorkerPoolParams& params);
void UnregisterSchedulerWorker(SchedulerWorker* worker);
const std::vector<SchedulerWorkerPoolParams> worker_pool_params_vector_;
const TaskScheduler::WorkerPoolIndexForTraitsCallback
worker_pool_index_for_traits_callback_;
TaskTracker* const task_tracker_;
DelayedTaskManager* const delayed_task_manager_;
// Synchronizes access to |workers_| and |worker_id_|.
SchedulerLock workers_lock_;
std::vector<scoped_refptr<SchedulerWorker>> workers_;
int next_worker_id_ = 0;
#if DCHECK_IS_ON()
subtle::Atomic32 workers_unregistered_during_join_ = 0;
#endif
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunnerManager);
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_SCHEDULER_SCHEDULER_SINGLE_THREAD_TASK_RUNNER_MANAGER_H_
...@@ -67,7 +67,6 @@ class SchedulerWorker::Thread : public PlatformThread::Delegate { ...@@ -67,7 +67,6 @@ class SchedulerWorker::Thread : public PlatformThread::Delegate {
if (outer_->delegate_->CanDetach(outer_.get())) { if (outer_->delegate_->CanDetach(outer_.get())) {
detached_thread = outer_->DetachThreadObject(DetachNotify::DELEGATE); detached_thread = outer_->DetachThreadObject(DetachNotify::DELEGATE);
if (detached_thread) { if (detached_thread) {
outer_ = nullptr;
DCHECK_EQ(detached_thread.get(), this); DCHECK_EQ(detached_thread.get(), this);
PlatformThread::Detach(thread_handle_); PlatformThread::Detach(thread_handle_);
break; break;
...@@ -119,6 +118,8 @@ class SchedulerWorker::Thread : public PlatformThread::Delegate { ...@@ -119,6 +118,8 @@ class SchedulerWorker::Thread : public PlatformThread::Delegate {
// nullptr. JoinForTesting() cleans up if we get nullptr. // nullptr. JoinForTesting() cleans up if we get nullptr.
if (!detached_thread) if (!detached_thread)
detached_thread = outer_->DetachThreadObject(DetachNotify::SILENT); detached_thread = outer_->DetachThreadObject(DetachNotify::SILENT);
outer_->delegate_->OnMainExit();
} }
void Join() { PlatformThread::Join(thread_handle_); } void Join() { PlatformThread::Join(thread_handle_); }
......
...@@ -85,6 +85,9 @@ class BASE_EXPORT SchedulerWorker ...@@ -85,6 +85,9 @@ class BASE_EXPORT SchedulerWorker
// acquire a SchedulerLock because it is called within the scope of another // acquire a SchedulerLock because it is called within the scope of another
// SchedulerLock. // SchedulerLock.
virtual void OnDetach() = 0; virtual void OnDetach() = 0;
// Called by a thread right before the main function exits.
virtual void OnMainExit() {}
}; };
enum class InitialState { ALIVE, DETACHED }; enum class InitialState { ALIVE, DETACHED };
......
...@@ -141,6 +141,7 @@ bool ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>>& workers, ...@@ -141,6 +141,7 @@ bool ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>>& workers,
} // namespace } // namespace
// TODO(http://crbug.com/694823): Remove this and supporting framework.
// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner :
public SingleThreadTaskRunner { public SingleThreadTaskRunner {
......
...@@ -21,9 +21,9 @@ SchedulerWorkerPoolParams::SchedulerWorkerPoolParams( ...@@ -21,9 +21,9 @@ SchedulerWorkerPoolParams::SchedulerWorkerPoolParams(
backward_compatibility_(backward_compatibility) {} backward_compatibility_(backward_compatibility) {}
SchedulerWorkerPoolParams::SchedulerWorkerPoolParams( SchedulerWorkerPoolParams::SchedulerWorkerPoolParams(
SchedulerWorkerPoolParams&& other) = default; const SchedulerWorkerPoolParams& other) = default;
SchedulerWorkerPoolParams& SchedulerWorkerPoolParams::operator=( SchedulerWorkerPoolParams& SchedulerWorkerPoolParams::operator=(
SchedulerWorkerPoolParams&& other) = default; const SchedulerWorkerPoolParams& other) = default;
} // namespace base } // namespace base
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
#include <string> #include <string>
#include "base/macros.h"
#include "base/task_scheduler/scheduler_worker_params.h" #include "base/task_scheduler/scheduler_worker_params.h"
#include "base/threading/platform_thread.h" #include "base/threading/platform_thread.h"
#include "base/time/time.h" #include "base/time/time.h"
...@@ -42,8 +41,8 @@ class BASE_EXPORT SchedulerWorkerPoolParams final { ...@@ -42,8 +41,8 @@ class BASE_EXPORT SchedulerWorkerPoolParams final {
TimeDelta suggested_reclaim_time, TimeDelta suggested_reclaim_time,
SchedulerBackwardCompatibility backward_compatibility = SchedulerBackwardCompatibility backward_compatibility =
SchedulerBackwardCompatibility::DISABLED); SchedulerBackwardCompatibility::DISABLED);
SchedulerWorkerPoolParams(SchedulerWorkerPoolParams&& other); SchedulerWorkerPoolParams(const SchedulerWorkerPoolParams& other);
SchedulerWorkerPoolParams& operator=(SchedulerWorkerPoolParams&& other); SchedulerWorkerPoolParams& operator=(const SchedulerWorkerPoolParams& other);
const std::string& name() const { return name_; } const std::string& name() const { return name_; }
ThreadPriority priority_hint() const { return priority_hint_; } ThreadPriority priority_hint() const { return priority_hint_; }
...@@ -63,8 +62,6 @@ class BASE_EXPORT SchedulerWorkerPoolParams final { ...@@ -63,8 +62,6 @@ class BASE_EXPORT SchedulerWorkerPoolParams final {
size_t max_threads_; size_t max_threads_;
TimeDelta suggested_reclaim_time_; TimeDelta suggested_reclaim_time_;
SchedulerBackwardCompatibility backward_compatibility_; SchedulerBackwardCompatibility backward_compatibility_;
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerPoolParams);
}; };
} // namespace base } // namespace base
......
...@@ -322,5 +322,49 @@ TEST_F(TaskSchedulerImplTest, GetMaxConcurrentTasksWithTraitsDeprecated) { ...@@ -322,5 +322,49 @@ TEST_F(TaskSchedulerImplTest, GetMaxConcurrentTasksWithTraitsDeprecated) {
TaskTraits().WithPriority(TaskPriority::USER_BLOCKING).MayBlock())); TaskTraits().WithPriority(TaskPriority::USER_BLOCKING).MayBlock()));
} }
// Verify that the RunsTasksOnCurrentThread() method of a SequencedTaskRunner
// returns false when called from a task that isn't part of the sequence.
TEST_F(TaskSchedulerImplTest, SequencedRunsTasksOnCurrentThread) {
auto single_thread_task_runner =
scheduler_->CreateSingleThreadTaskRunnerWithTraits(TaskTraits());
auto sequenced_task_runner =
scheduler_->CreateSequencedTaskRunnerWithTraits(TaskTraits());
WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
single_thread_task_runner->PostTask(
FROM_HERE,
Bind(
[](scoped_refptr<TaskRunner> sequenced_task_runner,
WaitableEvent* task_ran) {
EXPECT_FALSE(sequenced_task_runner->RunsTasksOnCurrentThread());
task_ran->Signal();
},
sequenced_task_runner, Unretained(&task_ran)));
task_ran.Wait();
}
// Verify that the RunsTasksOnCurrentThread() method of a SingleThreadTaskRunner
// returns false when called from a task that isn't part of the sequence.
TEST_F(TaskSchedulerImplTest, SingleThreadRunsTasksOnCurrentThread) {
auto sequenced_task_runner =
scheduler_->CreateSequencedTaskRunnerWithTraits(TaskTraits());
auto single_thread_task_runner =
scheduler_->CreateSingleThreadTaskRunnerWithTraits(TaskTraits());
WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
sequenced_task_runner->PostTask(
FROM_HERE,
Bind(
[](scoped_refptr<TaskRunner> single_thread_task_runner,
WaitableEvent* task_ran) {
EXPECT_FALSE(single_thread_task_runner->RunsTasksOnCurrentThread());
task_ran->Signal();
},
single_thread_task_runner, Unretained(&task_ran)));
task_ran.Wait();
}
} // namespace internal } // namespace internal
} // namespace base } // namespace base
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment