Commit 99f09b6d authored by Etienne Pierre-Doray's avatar Etienne Pierre-Doray Committed by Commit Bot

Revert "[TaskScheduler]: Simplify sequence preemption logic."

This reverts commit 5d4c05db.

Reason for revert: Issue 950383

Original change's description:
> [TaskScheduler]: Simplify sequence preemption logic.
> 
> This CL removes preempted sequences from TaskTracker,
> removing need for CanScheduleSequenceObserver and
> simplifying shutdown.
> A CanRunPolicy enum is used instead, specifying whether
> or not tasks at a given priority can run.
> When CanRunPolicy is updated, wakeup workers needs
> to be done as appropriate.
> 
> In addition, Shutdown was split in 2 phases start + complete
> to allow caller (TaskScheduler) to update CanRunPolicy after
> shutdown was started.
> 
> This CL was recycled from:
> https://chromium-review.googlesource.com/c/chromium/src/+/1478007
> I took ownership because simplifying Sequence lifecycle
> will simplify implementation of job API.
> 
> TBR=gab@chromium.org
> 
> Bug: 889029
> Change-Id: Ie5dbc226705880c5e598e013aa2dd3e7e874158b
> Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1531161
> Reviewed-by: Etienne Pierre-Doray <etiennep@chromium.org>
> Reviewed-by: François Doray <fdoray@chromium.org>
> Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#648346}

TBR=gab@chromium.org,fdoray@chromium.org,etiennep@chromium.org

# Not skipping CQ checks because original CL landed > 1 day ago.

Bug: 889029
Change-Id: Id15ef90caa4ddec43db925968268d2942a11fc1b
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1557785Reviewed-by: default avatarEtienne Pierre-Doray <etiennep@chromium.org>
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Cr-Commit-Position: refs/heads/master@{#648673}
parent 515092e4
......@@ -804,6 +804,7 @@ jumbo_component("base") {
"task/task_features.cc",
"task/task_features.h",
"task/task_observer.h",
"task/task_scheduler/can_schedule_sequence_observer.h",
"task/task_scheduler/delayed_task_manager.cc",
"task/task_scheduler/delayed_task_manager.h",
"task/task_scheduler/environment_config.cc",
......@@ -2580,7 +2581,6 @@ test("base_unittests") {
"task/sequence_manager/work_deduplicator_unittest.cc",
"task/sequence_manager/work_queue_sets_unittest.cc",
"task/sequence_manager/work_queue_unittest.cc",
"task/task_scheduler/can_run_policy_test.h",
"task/task_scheduler/delayed_task_manager_unittest.cc",
"task/task_scheduler/priority_queue_unittest.cc",
"task/task_scheduler/scheduler_lock_unittest.cc",
......
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_TASK_SCHEDULER_CAN_RUN_POLICY_TEST_H_
#define BASE_TASK_TASK_SCHEDULER_CAN_RUN_POLICY_TEST_H_
#include "base/synchronization/atomic_flag.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/task_scheduler/task_tracker.h"
#include "base/task/task_scheduler/test_utils.h"
#include "base/task_runner.h"
#include "base/test/bind_test_util.h"
#include "base/test/test_timeouts.h"
#include "base/threading/platform_thread.h"
namespace base {
namespace internal {
namespace test {
// Verify that tasks only run when allowed by the CanRunPolicy. |target| is the
// object on which DidUpdateCanRunPolicy() must be called after updating the
// CanRunPolicy in |task_tracker|. |create_task_runner| is a function that
// receives a TaskPriority and returns a TaskRunner. |task_tracker| is the
// TaskTracker.
template <typename Target, typename CreateTaskRunner>
void TestCanRunPolicyBasic(Target* target,
CreateTaskRunner create_task_runner,
TaskTracker* task_tracker) {
AtomicFlag foreground_can_run;
WaitableEvent foreground_did_run;
AtomicFlag best_effort_can_run;
WaitableEvent best_effort_did_run;
task_tracker->SetCanRunPolicy(CanRunPolicy::kNone);
target->DidUpdateCanRunPolicy();
const auto user_visible_task_runner =
create_task_runner(TaskPriority::USER_VISIBLE);
user_visible_task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
EXPECT_TRUE(foreground_can_run.IsSet());
foreground_did_run.Signal();
}));
const auto best_effort_task_runner =
create_task_runner(TaskPriority::BEST_EFFORT);
best_effort_task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
EXPECT_TRUE(best_effort_can_run.IsSet());
best_effort_did_run.Signal();
}));
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
foreground_can_run.Set();
task_tracker->SetCanRunPolicy(CanRunPolicy::kForegroundOnly);
target->DidUpdateCanRunPolicy();
foreground_did_run.Wait();
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
best_effort_can_run.Set();
task_tracker->SetCanRunPolicy(CanRunPolicy::kAll);
target->DidUpdateCanRunPolicy();
best_effort_did_run.Wait();
}
// Verify that if a task was allowed to run by the CanRunPolicy when it was
// posted, but the CanRunPolicy is updated to disallow it from running before it
// starts running, it doesn't run. |target| is the object on which
// DidUpdateCanRunPolicy() must be called after updating the CanRunPolicy in
// |task_tracker|. |create_task_runner| is a function that receives a
// TaskPriority and returns a *Sequenced*TaskRunner. |task_tracker| is the
// TaskTracker.
template <typename Target, typename CreateTaskRunner>
void TestCanRunPolicyChangedBeforeRun(Target* target,
CreateTaskRunner create_task_runner,
TaskTracker* task_tracker) {
constexpr struct {
// Descriptor for the test case.
const char* descriptor;
// Task priority being tested.
TaskPriority priority;
// Policy that disallows running tasks with |priority|.
CanRunPolicy disallow_policy;
// Policy that allows running tasks with |priority|.
CanRunPolicy allow_policy;
} kTestCases[] = {
{"BestEffort/kNone/kAll", TaskPriority::BEST_EFFORT, CanRunPolicy::kNone,
CanRunPolicy::kAll},
{"BestEffort/kForegroundOnly/kAll", TaskPriority::BEST_EFFORT,
CanRunPolicy::kForegroundOnly, CanRunPolicy::kAll},
{"UserVisible/kNone/kForegroundOnly", TaskPriority::USER_VISIBLE,
CanRunPolicy::kNone, CanRunPolicy::kForegroundOnly},
{"UserVisible/kNone/kAll", TaskPriority::USER_VISIBLE,
CanRunPolicy::kNone, CanRunPolicy::kAll}};
for (auto& test_case : kTestCases) {
SCOPED_TRACE(test_case.descriptor);
WaitableEvent first_task_started;
WaitableEvent first_task_blocked;
AtomicFlag second_task_can_run;
task_tracker->SetCanRunPolicy(test_case.allow_policy);
target->DidUpdateCanRunPolicy();
const auto task_runner = create_task_runner(test_case.priority);
task_runner->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
first_task_started.Signal();
test::WaitWithoutBlockingObserver(&first_task_blocked);
}));
task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
EXPECT_TRUE(second_task_can_run.IsSet());
}));
first_task_started.Wait();
task_tracker->SetCanRunPolicy(test_case.disallow_policy);
target->DidUpdateCanRunPolicy();
first_task_blocked.Signal();
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
second_task_can_run.Set();
task_tracker->SetCanRunPolicy(test_case.allow_policy);
target->DidUpdateCanRunPolicy();
task_tracker->FlushForTesting();
}
}
} // namespace test
} // namespace internal
} // namespace base
#endif // BASE_TASK_TASK_SCHEDULER_CAN_RUN_POLICY_TEST_H_
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_TASK_SCHEDULER_CAN_SCHEDULE_SEQUENCE_OBSERVER_H_
#define BASE_TASK_TASK_SCHEDULER_CAN_SCHEDULE_SEQUENCE_OBSERVER_H_
#include "base/task/task_scheduler/sequence.h"
namespace base {
namespace internal {
class CanScheduleSequenceObserver {
public:
// Called when |sequence| can be scheduled. It is expected that
// TaskTracker::RunNextTask() will be called with |sequence| as argument after
// this is called.
virtual void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) = 0;
protected:
virtual ~CanScheduleSequenceObserver() = default;
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_TASK_SCHEDULER_CAN_SCHEDULE_SEQUENCE_OBSERVER_H_
......@@ -80,7 +80,7 @@ void PlatformNativeWorkerPool::RunNextSequenceImpl() {
scoped_refptr<Sequence> sequence = GetWork();
DCHECK(sequence);
sequence = task_tracker_->RunAndPopNextTask(std::move(sequence));
sequence = task_tracker_->RunAndPopNextTask(std::move(sequence), this);
if (sequence) {
ScopedWorkersExecutor workers_executor(this);
......@@ -103,11 +103,6 @@ scoped_refptr<Sequence> PlatformNativeWorkerPool::GetWork() {
// PriorityQueue after RemoveSequence().
if (priority_queue_.IsEmpty())
return nullptr;
// Enforce the CanRunPolicy.
const TaskPriority priority = priority_queue_.PeekSortKey().priority();
if (!task_tracker_->CanRunPriority(priority))
return nullptr;
return priority_queue_.PopSequence();
}
......@@ -130,10 +125,7 @@ void PlatformNativeWorkerPool::EnsureEnoughWorkersLockRequired(
return;
// Ensure that there is at least one pending threadpool work per Sequence in
// the PriorityQueue.
const size_t desired_num_pending_threadpool_work =
GetNumQueuedCanRunBestEffortSequences() +
GetNumQueuedCanRunForegroundSequences();
const size_t desired_num_pending_threadpool_work = priority_queue_.Size();
if (desired_num_pending_threadpool_work > num_pending_threadpool_work_) {
static_cast<ScopedWorkersExecutor*>(executor)
->set_num_threadpool_work_to_submit(
......@@ -157,11 +149,5 @@ void PlatformNativeWorkerPool::ReportHeartbeatMetrics() const {
// number of worker threads created.
}
void PlatformNativeWorkerPool::DidUpdateCanRunPolicy() {
ScopedWorkersExecutor executor(this);
AutoSchedulerLock auto_lock(lock_);
EnsureEnoughWorkersLockRequired(&executor);
}
} // namespace internal
} // namespace base
......@@ -26,7 +26,6 @@ class BASE_EXPORT PlatformNativeWorkerPool : public SchedulerWorkerPool {
void JoinForTesting() override;
size_t GetMaxConcurrentNonBlockedTasksDeprecated() const override;
void ReportHeartbeatMetrics() const override;
void DidUpdateCanRunPolicy() override;
protected:
PlatformNativeWorkerPool(TrackedRef<TaskTracker> task_tracker,
......
......@@ -63,10 +63,6 @@ class BASE_EXPORT SchedulerSingleThreadTaskRunnerManager final {
// JoinForTesting() has returned (must never be destroyed in production).
void Start(SchedulerWorkerObserver* scheduler_worker_observer = nullptr);
// Wakes up workers as appropriate for the new CanRunPolicy policy. Must be
// called after an update to CanRunPolicy in TaskTracker.
void DidUpdateCanRunPolicy();
// Creates a SingleThreadTaskRunner which runs tasks with |traits| on a thread
// named "TaskSchedulerSingleThread[Shared]" +
// kEnvironmentParams[GetEnvironmentIndexForTraits(traits)].name_suffix +
......
......@@ -11,14 +11,11 @@
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/post_task.h"
#include "base/task/task_scheduler/can_run_policy_test.h"
#include "base/task/task_scheduler/delayed_task_manager.h"
#include "base/task/task_scheduler/environment_config.h"
#include "base/task/task_scheduler/scheduler_worker_pool_params.h"
#include "base/task/task_scheduler/task_tracker.h"
#include "base/task/task_scheduler/test_utils.h"
#include "base/task/task_traits.h"
#include "base/test/bind_test_util.h"
#include "base/test/gtest_util.h"
#include "base/test/test_timeouts.h"
#include "base/threading/platform_thread.h"
......@@ -41,7 +38,7 @@ namespace internal {
namespace {
class TaskSchedulerSingleThreadTaskRunnerManagerTest : public testing::Test {
protected:
public:
TaskSchedulerSingleThreadTaskRunnerManagerTest()
: service_thread_("TaskSchedulerServiceThread") {}
......@@ -60,6 +57,7 @@ class TaskSchedulerSingleThreadTaskRunnerManagerTest : public testing::Test {
service_thread_.Stop();
}
protected:
virtual void StartSingleThreadTaskRunnerManagerFromSetUp() {
single_thread_task_runner_manager_->Start();
}
......@@ -118,7 +116,7 @@ TEST_F(TaskSchedulerSingleThreadTaskRunnerManagerTest, DifferentThreadsUsed) {
task_runner_2->PostTask(FROM_HERE,
BindOnce(&CaptureThreadRef, &thread_ref_2));
test::ShutdownTaskTracker(&task_tracker_);
task_tracker_.Shutdown();
ASSERT_FALSE(thread_ref_1.is_null());
ASSERT_FALSE(thread_ref_2.is_null());
......@@ -144,7 +142,7 @@ TEST_F(TaskSchedulerSingleThreadTaskRunnerManagerTest, SameThreadUsed) {
task_runner_2->PostTask(FROM_HERE,
BindOnce(&CaptureThreadRef, &thread_ref_2));
test::ShutdownTaskTracker(&task_tracker_);
task_tracker_.Shutdown();
ASSERT_FALSE(thread_ref_1.is_null());
ASSERT_FALSE(thread_ref_2.is_null());
......@@ -187,7 +185,7 @@ TEST_F(TaskSchedulerSingleThreadTaskRunnerManagerTest,
},
task_runner_1, task_runner_2));
test::ShutdownTaskTracker(&task_tracker_);
task_tracker_.Shutdown();
}
TEST_F(TaskSchedulerSingleThreadTaskRunnerManagerTest,
......@@ -232,7 +230,7 @@ TEST_F(TaskSchedulerSingleThreadTaskRunnerManagerTest,
->PostTask(FROM_HERE, DoNothing());
// Shutdown should not hang even though the first task hasn't finished.
test::ShutdownTaskTracker(&task_tracker_);
task_tracker_.Shutdown();
// Let the first task finish.
task_can_continue.Signal();
......@@ -250,12 +248,6 @@ class TaskSchedulerSingleThreadTaskRunnerManagerCommonTest
public:
TaskSchedulerSingleThreadTaskRunnerManagerCommonTest() = default;
scoped_refptr<SingleThreadTaskRunner> CreateTaskRunner(
TaskTraits traits = TaskTraits()) {
return single_thread_task_runner_manager_
->CreateSingleThreadTaskRunnerWithTraits(traits, GetParam());
}
private:
DISALLOW_COPY_AND_ASSIGN(
TaskSchedulerSingleThreadTaskRunnerManagerCommonTest);
......@@ -269,9 +261,13 @@ TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest,
// Shutting down can cause priorities to get raised. This means we have to use
// events to determine when a task is run.
scoped_refptr<SingleThreadTaskRunner> task_runner_background =
CreateTaskRunner({TaskPriority::BEST_EFFORT});
single_thread_task_runner_manager_
->CreateSingleThreadTaskRunnerWithTraits({TaskPriority::BEST_EFFORT},
GetParam());
scoped_refptr<SingleThreadTaskRunner> task_runner_normal =
CreateTaskRunner({TaskPriority::USER_VISIBLE});
single_thread_task_runner_manager_
->CreateSingleThreadTaskRunnerWithTraits({TaskPriority::USER_VISIBLE},
GetParam());
ThreadPriority thread_priority_background;
task_runner_background->PostTask(
......@@ -303,7 +299,8 @@ TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest, ThreadNamesSet) {
constexpr TaskTraits foo_traits = {TaskPriority::BEST_EFFORT,
TaskShutdownBehavior::BLOCK_SHUTDOWN};
scoped_refptr<SingleThreadTaskRunner> foo_task_runner =
CreateTaskRunner(foo_traits);
single_thread_task_runner_manager_
->CreateSingleThreadTaskRunnerWithTraits(foo_traits, GetParam());
std::string foo_captured_name;
foo_task_runner->PostTask(FROM_HERE,
BindOnce(&CaptureThreadName, &foo_captured_name));
......@@ -320,7 +317,7 @@ TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest, ThreadNamesSet) {
user_blocking_task_runner->PostTask(
FROM_HERE, BindOnce(&CaptureThreadName, &user_blocking_captured_name));
test::ShutdownTaskTracker(&task_tracker_);
task_tracker_.Shutdown();
EXPECT_NE(std::string::npos,
foo_captured_name.find(
......@@ -343,8 +340,10 @@ TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest, ThreadNamesSet) {
TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest,
PostTaskAfterShutdown) {
auto task_runner = CreateTaskRunner();
test::ShutdownTaskTracker(&task_tracker_);
auto task_runner =
single_thread_task_runner_manager_
->CreateSingleThreadTaskRunnerWithTraits(TaskTraits(), GetParam());
task_tracker_.Shutdown();
EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
}
......@@ -354,7 +353,9 @@ TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest, PostDelayedTask) {
WaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC,
WaitableEvent::InitialState::NOT_SIGNALED);
auto task_runner = CreateTaskRunner();
auto task_runner =
single_thread_task_runner_manager_
->CreateSingleThreadTaskRunnerWithTraits(TaskTraits(), GetParam());
// Wait until the task runner is up and running to make sure the test below is
// solely timing the delayed task, not bringing up a physical thread.
......@@ -383,30 +384,15 @@ TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest, PostDelayedTask) {
// but doesn't crash.
TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest,
PostTaskAfterDestroy) {
auto task_runner = CreateTaskRunner();
auto task_runner =
single_thread_task_runner_manager_
->CreateSingleThreadTaskRunnerWithTraits(TaskTraits(), GetParam());
EXPECT_TRUE(task_runner->PostTask(FROM_HERE, DoNothing()));
test::ShutdownTaskTracker(&task_tracker_);
task_tracker_.Shutdown();
TearDownSingleThreadTaskRunnerManager();
EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
}
// Verify that tasks only run when allowed by the CanRunPolicy.
TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest,
CanRunPolicyBasic) {
test::TestCanRunPolicyBasic(
single_thread_task_runner_manager_.get(),
[this](TaskPriority priority) { return CreateTaskRunner({priority}); },
&task_tracker_);
}
TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest,
CanRunPolicyUpdatedBeforeRun) {
test::TestCanRunPolicyChangedBeforeRun(
single_thread_task_runner_manager_.get(),
[this](TaskPriority priority) { return CreateTaskRunner({priority}); },
&task_tracker_);
}
INSTANTIATE_TEST_SUITE_P(
AllModes,
TaskSchedulerSingleThreadTaskRunnerManagerCommonTest,
......@@ -523,7 +509,7 @@ TEST_P(TaskSchedulerSingleThreadTaskRunnerManagerCommonTest,
com_task_runner->PostTask(FROM_HERE, BindOnce(&win::AssertComApartmentType,
win::ComApartmentType::STA));
test::ShutdownTaskTracker(&task_tracker_);
task_tracker_.Shutdown();
}
TEST_F(TaskSchedulerSingleThreadTaskRunnerManagerTest, COMSTASameThreadUsed) {
......@@ -543,7 +529,7 @@ TEST_F(TaskSchedulerSingleThreadTaskRunnerManagerTest, COMSTASameThreadUsed) {
task_runner_2->PostTask(FROM_HERE,
BindOnce(&CaptureThreadRef, &thread_ref_2));
test::ShutdownTaskTracker(&task_tracker_);
task_tracker_.Shutdown();
ASSERT_FALSE(thread_ref_1.is_null());
ASSERT_FALSE(thread_ref_2.is_null());
......@@ -621,7 +607,7 @@ TEST_F(TaskSchedulerSingleThreadTaskRunnerManagerTestWin, PumpsMessages) {
com_task_runner->PostTask(
FROM_HERE, BindOnce([](HWND hwnd) { ::DestroyWindow(hwnd); }, hwnd));
test::ShutdownTaskTracker(&task_tracker_);
task_tracker_.Shutdown();
}
#endif // defined(OS_WIN)
......
......@@ -337,7 +337,8 @@ void SchedulerWorker::RunWorker() {
continue;
}
sequence = task_tracker_->RunAndPopNextTask(std::move(sequence));
sequence =
task_tracker_->RunAndPopNextTask(std::move(sequence), delegate_.get());
delegate_->DidRunTask(std::move(sequence));
......
......@@ -12,6 +12,7 @@
#include "base/memory/ref_counted.h"
#include "base/synchronization/atomic_flag.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/task_scheduler/can_schedule_sequence_observer.h"
#include "base/task/task_scheduler/scheduler_lock.h"
#include "base/task/task_scheduler/scheduler_worker_params.h"
#include "base/task/task_scheduler/sequence.h"
......@@ -60,11 +61,12 @@ class BASE_EXPORT SchedulerWorker
#endif // defined(OS_WIN)
};
// Delegate interface for SchedulerWorker. All methods are called from the
// thread managed by the SchedulerWorker instance.
class BASE_EXPORT Delegate {
// Delegate interface for SchedulerWorker. All methods except
// OnCanScheduleSequence() (inherited from CanScheduleSequenceObserver) are
// called from the thread managed by the SchedulerWorker instance.
class BASE_EXPORT Delegate : public CanScheduleSequenceObserver {
public:
virtual ~Delegate() = default;
~Delegate() override = default;
// Returns the ThreadLabel the Delegate wants its SchedulerWorkers' stacks
// to be labeled with.
......
......@@ -73,6 +73,17 @@ bool SchedulerWorkerPool::IsBoundToCurrentThread() const {
return GetCurrentWorkerPool() == this;
}
void SchedulerWorkerPool::OnCanScheduleSequence(
scoped_refptr<Sequence> sequence) {
if (replacement_pool_) {
replacement_pool_->OnCanScheduleSequence(std::move(sequence));
return;
}
PushSequenceAndWakeUpWorkers(
SequenceAndTransaction::FromSequence(std::move(sequence)));
}
void SchedulerWorkerPool::PostTaskWithSequenceNow(
Task task,
SequenceAndTransaction sequence_and_transaction) {
......@@ -85,29 +96,12 @@ void SchedulerWorkerPool::PostTaskWithSequenceNow(
const bool task_source_should_be_queued =
sequence_and_transaction.transaction.PushTask(std::move(task));
if (task_source_should_be_queued) {
PushSequenceAndWakeUpWorkers(std::move(sequence_and_transaction));
}
}
size_t SchedulerWorkerPool::GetNumQueuedCanRunBestEffortSequences() const {
const size_t num_queued =
priority_queue_.GetNumSequencesWithPriority(TaskPriority::BEST_EFFORT);
if (num_queued == 0 ||
!task_tracker_->CanRunPriority(TaskPriority::BEST_EFFORT)) {
return 0U;
}
return num_queued;
}
size_t SchedulerWorkerPool::GetNumQueuedCanRunForegroundSequences() const {
const size_t num_queued =
priority_queue_.GetNumSequencesWithPriority(TaskPriority::USER_VISIBLE) +
priority_queue_.GetNumSequencesWithPriority(TaskPriority::USER_BLOCKING);
if (num_queued == 0 ||
!task_tracker_->CanRunPriority(TaskPriority::HIGHEST)) {
return 0U;
// Try to schedule the Sequence locked by |sequence_transaction|.
if (task_tracker_->WillScheduleSequence(
sequence_and_transaction.transaction, this)) {
PushSequenceAndWakeUpWorkers(std::move(sequence_and_transaction));
}
}
return num_queued;
}
bool SchedulerWorkerPool::RemoveSequence(scoped_refptr<Sequence> sequence) {
......
......@@ -7,6 +7,7 @@
#include "base/base_export.h"
#include "base/memory/ref_counted.h"
#include "base/task/task_scheduler/can_schedule_sequence_observer.h"
#include "base/task/task_scheduler/priority_queue.h"
#include "base/task/task_scheduler/scheduler_lock.h"
#include "base/task/task_scheduler/sequence.h"
......@@ -19,8 +20,8 @@ namespace internal {
class TaskTracker;
// Interface and base implementation for a worker pool.
class BASE_EXPORT SchedulerWorkerPool {
// Interface for a worker pool.
class BASE_EXPORT SchedulerWorkerPool : public CanScheduleSequenceObserver {
public:
// Delegate interface for SchedulerWorkerPool.
class BASE_EXPORT Delegate {
......@@ -43,7 +44,10 @@ class BASE_EXPORT SchedulerWorkerPool {
#endif // defined(OS_WIN)
};
virtual ~SchedulerWorkerPool();
~SchedulerWorkerPool() override;
// CanScheduleSequenceObserver:
void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) final;
// Posts |task| to be executed by this SchedulerWorkerPool as part of
// the Sequence in |sequence_and_transaction|. This must only be called after
......@@ -108,10 +112,6 @@ class BASE_EXPORT SchedulerWorkerPool {
// Reports relevant metrics per implementation.
virtual void ReportHeartbeatMetrics() const = 0;
// Wakes up workers as appropriate for the new CanRunPolicy policy. Must be
// called after an update to CanRunPolicy in TaskTracker.
virtual void DidUpdateCanRunPolicy() = 0;
protected:
// Derived classes must implement a ScopedWorkersExecutor that derives from
// this to perform operations on workers at the end of a scope, when all locks
......@@ -156,16 +156,6 @@ class BASE_EXPORT SchedulerWorkerPool {
const TrackedRef<TaskTracker> task_tracker_;
const TrackedRef<Delegate> delegate_;
// Returns the number of queued BEST_EFFORT sequences allowed to run by the
// current CanRunPolicy.
size_t GetNumQueuedCanRunBestEffortSequences() const
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Returns the number of queued USER_VISIBLE/USER_BLOCKING sequences allowed
// to run by the current CanRunPolicy.
size_t GetNumQueuedCanRunForegroundSequences() const
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Ensures that there are enough workers to run queued sequences. |executor|
// is forwarded from the one received in PushSequenceAndWakeUpWorkersImpl()
virtual void EnsureEnoughWorkersLockRequired(
......
......@@ -210,6 +210,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
~SchedulerWorkerDelegateImpl() override;
// SchedulerWorker::Delegate:
void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override;
SchedulerWorker::ThreadLabel GetThreadLabel() const override;
void OnMainEntry(const SchedulerWorker* worker) override;
scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override;
......@@ -563,6 +564,11 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
~SchedulerWorkerDelegateImpl() = default;
void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
OnCanScheduleSequence(scoped_refptr<Sequence> sequence) {
outer_->OnCanScheduleSequence(std::move(sequence));
}
SchedulerWorker::ThreadLabel
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetThreadLabel() const {
return SchedulerWorker::ThreadLabel::POOLED;
......@@ -628,14 +634,13 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
return nullptr;
}
// Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
// BEST_EFFORT tasks run concurrently.
const TaskPriority priority =
outer_->priority_queue_.PeekSortKey().priority();
if (!outer_->task_tracker_->CanRunPriority(priority) ||
(priority == TaskPriority::BEST_EFFORT &&
outer_->num_running_best_effort_tasks_ >=
outer_->max_best_effort_tasks_)) {
// Enforce that no more than |max_best_effort_tasks_| BEST_EFFORT tasks run
// concurrently.
const bool next_sequence_is_best_effort =
outer_->priority_queue_.PeekSortKey().priority() ==
TaskPriority::BEST_EFFORT;
if (next_sequence_is_best_effort && outer_->num_running_best_effort_tasks_ >=
outer_->max_best_effort_tasks_) {
OnWorkerBecomesIdleLockRequired(worker);
return nullptr;
}
......@@ -646,7 +651,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
DCHECK(!outer_->idle_workers_stack_.Contains(worker));
// Running BEST_EFFORT task bookkeeping.
if (priority == TaskPriority::BEST_EFFORT) {
if (next_sequence_is_best_effort) {
write_worker().is_running_best_effort_task = true;
++outer_->num_running_best_effort_tasks_;
}
......@@ -1034,25 +1039,15 @@ size_t SchedulerWorkerPoolImpl::GetNumAwakeWorkersLockRequired() const {
}
size_t SchedulerWorkerPoolImpl::GetDesiredNumAwakeWorkersLockRequired() const {
if (!task_tracker_->CanRunPriority(TaskPriority::HIGHEST))
return 0U;
// Number of BEST_EFFORT sequences that are running or queued and allowed to
// run by the CanRunPolicy.
const size_t num_running_or_queued_can_run_best_effort_sequences =
num_running_best_effort_tasks_ + GetNumQueuedCanRunBestEffortSequences();
// Number of USER_{VISIBLE|BLOCKING} sequences that are running or queued.
const size_t num_running_or_queued_best_effort_sequences =
num_running_best_effort_tasks_ +
priority_queue_.GetNumSequencesWithPriority(TaskPriority::BEST_EFFORT);
const size_t num_running_or_queued_foreground_sequences =
(num_running_tasks_ - num_running_best_effort_tasks_) +
priority_queue_.GetNumSequencesWithPriority(TaskPriority::USER_VISIBLE) +
priority_queue_.GetNumSequencesWithPriority(TaskPriority::USER_BLOCKING);
const size_t workers_for_best_effort_sequences =
std::max(std::min(num_running_or_queued_can_run_best_effort_sequences,
max_best_effort_tasks_),
num_running_best_effort_tasks_);
num_running_tasks_ + priority_queue_.Size() -
num_running_or_queued_best_effort_sequences;
const size_t workers_for_best_effort_sequences = std::min(
num_running_or_queued_best_effort_sequences, max_best_effort_tasks_);
const size_t workers_for_foreground_sequences =
num_running_or_queued_foreground_sequences;
......@@ -1061,12 +1056,6 @@ size_t SchedulerWorkerPoolImpl::GetDesiredNumAwakeWorkersLockRequired() const {
max_tasks_, kMaxNumberOfWorkers});
}
void SchedulerWorkerPoolImpl::DidUpdateCanRunPolicy() {
ScopedWorkersExecutor executor(this);
AutoSchedulerLock auto_lock(lock_);
EnsureEnoughWorkersLockRequired(&executor);
}
void SchedulerWorkerPoolImpl::EnsureEnoughWorkersLockRequired(
BaseScopedWorkersExecutor* base_executor) {
// Don't do anything if the pool isn't started.
......
......@@ -91,7 +91,6 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
void JoinForTesting() override;
size_t GetMaxConcurrentNonBlockedTasksDeprecated() const override;
void ReportHeartbeatMetrics() const override;
void DidUpdateCanRunPolicy() override;
const HistogramBase* num_tasks_before_detach_histogram() const {
return num_tasks_before_detach_histogram_;
......
......@@ -1044,9 +1044,9 @@ class TaskSchedulerWorkerPoolBlockingTest
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolBlockingTest);
};
// Verify that SaturateWithBlockingTasks() causes max tasks to increase and
// creates a worker if needed. Also verify that UnblockBlockingTasks() decreases
// max tasks after an increase.
// Verify that BlockingScopeEntered() causes max tasks to increase and creates a
// worker if needed. Also verify that BlockingScopeExited() decreases max tasks
// after an increase.
TEST_P(TaskSchedulerWorkerPoolBlockingTest, ThreadBlockedUnblocked) {
CreateAndStartWorkerPool();
......@@ -1066,69 +1066,6 @@ TEST_P(TaskSchedulerWorkerPoolBlockingTest, ThreadBlockedUnblocked) {
EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
}
// Verify that flooding the pool with more BEST_EFFORT tasks than
// kMaxBestEffortTasks doesn't prevent USER_VISIBLE tasks from running.
TEST_P(TaskSchedulerWorkerPoolBlockingTest, TooManyBestEffortTasks) {
constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
CreateAndStartWorkerPool(TimeDelta::Max(), kMaxTasks, kMaxBestEffortTasks);
WaitableEvent threads_continue;
{
WaitableEvent entered_blocking_scope;
RepeatingClosure entered_blocking_scope_barrier = BarrierClosure(
kMaxBestEffortTasks + 1,
BindOnce(&WaitableEvent::Signal, Unretained(&entered_blocking_scope)));
WaitableEvent exit_blocking_scope;
WaitableEvent threads_running;
RepeatingClosure threads_running_barrier = BarrierClosure(
kMaxBestEffortTasks + 1,
BindOnce(&WaitableEvent::Signal, Unretained(&threads_running)));
const auto best_effort_task_runner = test::CreateTaskRunnerWithTraits(
{TaskPriority::BEST_EFFORT, MayBlock()},
&mock_scheduler_task_runner_delegate_);
for (size_t i = 0; i < kMaxBestEffortTasks + 1; ++i) {
best_effort_task_runner->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
{
NestedScopedBlockingCall scoped_blocking_call(GetParam());
entered_blocking_scope_barrier.Run();
test::WaitWithoutBlockingObserver(&exit_blocking_scope);
}
threads_running_barrier.Run();
test::WaitWithoutBlockingObserver(&threads_continue);
}));
}
entered_blocking_scope.Wait();
exit_blocking_scope.Signal();
threads_running.Wait();
}
// At this point, kMaxBestEffortTasks + 1 threads are running (plus
// potentially the idle thread), but max_task and max_best_effort_task are
// back to normal.
EXPECT_GE(worker_pool_->NumberOfWorkersForTesting(), kMaxBestEffortTasks + 1);
EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(), kMaxBestEffortTasks + 2);
EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
WaitableEvent threads_running;
task_runner_->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
threads_running.Signal();
test::WaitWithoutBlockingObserver(&threads_continue);
}));
// This should not block forever.
threads_running.Wait();
EXPECT_GE(worker_pool_->NumberOfWorkersForTesting(), kMaxBestEffortTasks + 2);
EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(), kMaxBestEffortTasks + 3);
threads_continue.Signal();
task_tracker_.FlushForTesting();
}
// Verify that tasks posted in a saturated pool before a ScopedBlockingCall will
// execute after ScopedBlockingCall is instantiated.
TEST_P(TaskSchedulerWorkerPoolBlockingTest, PostBeforeBlocking) {
......
......@@ -11,7 +11,6 @@
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/memory/ref_counted.h"
#include "base/task/task_scheduler/can_run_policy_test.h"
#include "base/task/task_scheduler/delayed_task_manager.h"
#include "base/task/task_scheduler/scheduler_sequenced_task_runner.h"
#include "base/task/task_scheduler/scheduler_worker_pool_impl.h"
......@@ -167,13 +166,6 @@ class TaskSchedulerWorkerPoolTest
}
}
scoped_refptr<TaskRunner> CreateTaskRunner(
const TaskTraits& traits = TaskTraits()) {
return test::CreateTaskRunnerWithExecutionMode(
GetParam().execution_mode, &mock_scheduler_task_runner_delegate_,
traits);
}
Thread service_thread_;
TaskTracker task_tracker_ = {"Test"};
DelayedTaskManager delayed_task_manager_;
......@@ -248,8 +240,9 @@ TEST_P(TaskSchedulerWorkerPoolTest, NestedPostTasks) {
// Verify that a Task can't be posted after shutdown.
TEST_P(TaskSchedulerWorkerPoolTest, PostTaskAfterShutdown) {
StartWorkerPool();
auto task_runner = CreateTaskRunner();
test::ShutdownTaskTracker(&task_tracker_);
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
GetParam().execution_mode, &mock_scheduler_task_runner_delegate_);
task_tracker_.Shutdown();
EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
}
......@@ -257,9 +250,10 @@ TEST_P(TaskSchedulerWorkerPoolTest, PostTaskAfterShutdown) {
// crash.
TEST_P(TaskSchedulerWorkerPoolTest, PostAfterDestroy) {
StartWorkerPool();
auto task_runner = CreateTaskRunner();
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
GetParam().execution_mode, &mock_scheduler_task_runner_delegate_);
EXPECT_TRUE(task_runner->PostTask(FROM_HERE, DoNothing()));
test::ShutdownTaskTracker(&task_tracker_);
task_tracker_.Shutdown();
worker_pool_->JoinForTesting();
worker_pool_.reset();
EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
......@@ -271,7 +265,9 @@ TEST_P(TaskSchedulerWorkerPoolTest, PostDelayedTask) {
WaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC,
WaitableEvent::InitialState::NOT_SIGNALED);
auto task_runner = CreateTaskRunner();
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
GetParam().execution_mode, &mock_scheduler_task_runner_delegate_);
// Wait until the task runner is up and running to make sure the test below is
// solely timing the delayed task, not bringing up a physical thread.
......@@ -304,7 +300,8 @@ TEST_P(TaskSchedulerWorkerPoolTest, PostDelayedTask) {
// complements it to get full coverage of that method.
TEST_P(TaskSchedulerWorkerPoolTest, SequencedRunsTasksInCurrentSequence) {
StartWorkerPool();
auto task_runner = CreateTaskRunner();
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
GetParam().execution_mode, &mock_scheduler_task_runner_delegate_);
auto sequenced_task_runner = test::CreateSequencedTaskRunnerWithTraits(
TaskTraits(), &mock_scheduler_task_runner_delegate_);
......@@ -326,7 +323,9 @@ TEST_P(TaskSchedulerWorkerPoolTest, PostBeforeStart) {
WaitableEvent task_1_running;
WaitableEvent task_2_running;
auto task_runner = CreateTaskRunner();
scoped_refptr<TaskRunner> task_runner = test::CreateTaskRunnerWithTraits(
{WithBaseSyncPrimitives()}, &mock_scheduler_task_runner_delegate_);
task_runner->PostTask(
FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_1_running)));
task_runner->PostTask(
......@@ -347,27 +346,6 @@ TEST_P(TaskSchedulerWorkerPoolTest, PostBeforeStart) {
task_tracker_.FlushForTesting();
}
// Verify that tasks only run when allowed by the CanRunPolicy.
TEST_P(TaskSchedulerWorkerPoolTest, CanRunPolicyBasic) {
StartWorkerPool();
test::TestCanRunPolicyBasic(
worker_pool_.get(),
[this](TaskPriority priority) { return CreateTaskRunner({priority}); },
&task_tracker_);
}
TEST_P(TaskSchedulerWorkerPoolTest, CanRunPolicyUpdatedBeforeRun) {
StartWorkerPool();
// This test only works with SequencedTaskRunner become it assumes
// ordered execution of 2 posted tasks.
if (GetParam().execution_mode != test::ExecutionMode::SEQUENCED)
return;
test::TestCanRunPolicyChangedBeforeRun(
worker_pool_.get(),
[this](TaskPriority priority) { return CreateTaskRunner({priority}); },
&task_tracker_);
}
// Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
// in a pool does not affect Sequences with a priority that was increased from
// BEST_EFFORT to USER_BLOCKING.
......
......@@ -21,6 +21,9 @@ namespace {
class MockSchedulerWorkerDelegate : public SchedulerWorker::Delegate {
public:
void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override {
ADD_FAILURE() << "Unexpected call to OnCanScheduleSequence().";
}
SchedulerWorker::ThreadLabel GetThreadLabel() const override {
return SchedulerWorker::ThreadLabel::DEDICATED;
}
......
......@@ -53,6 +53,9 @@ class SchedulerWorkerDefaultDelegate : public SchedulerWorker::Delegate {
SchedulerWorkerDefaultDelegate() = default;
// SchedulerWorker::Delegate:
void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override {
ADD_FAILURE() << "Unexpected call to OnCanScheduleSequence().";
}
SchedulerWorker::ThreadLabel GetThreadLabel() const override {
return SchedulerWorker::ThreadLabel::DEDICATED;
}
......@@ -195,6 +198,8 @@ class TaskSchedulerWorkerTest : public testing::TestWithParam<int> {
outer_->created_sequences_.push_back(sequence);
}
EXPECT_TRUE(outer_->task_tracker_.WillScheduleSequence(
sequence_transaction, nullptr));
return sequence;
}
......@@ -457,7 +462,10 @@ class ControllableCleanupDelegate : public SchedulerWorkerDefaultDelegate {
TimeDelta());
EXPECT_TRUE(
task_tracker_->WillPostTask(&task, sequence->shutdown_behavior()));
sequence->BeginTransaction().PushTask(std::move(task));
Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
sequence_transaction.PushTask(std::move(task));
EXPECT_TRUE(
task_tracker_->WillScheduleSequence(sequence_transaction, nullptr));
return sequence;
}
......@@ -593,7 +601,7 @@ TEST(TaskSchedulerWorkerTest, WorkerCleanupDuringShutdown) {
worker->WakeUp();
controls->WaitForWorkToRun();
test::ShutdownTaskTracker(&task_tracker);
task_tracker.Shutdown();
worker->Cleanup();
worker = nullptr;
controls->UnblockWork();
......@@ -737,11 +745,6 @@ TEST(TaskSchedulerWorkerTest, BumpPriorityOfAliveThreadDuringShutdown) {
TaskTracker task_tracker("Test");
// Block shutdown to ensure that the worker doesn't exit when StartShutdown()
// is called.
Task task(FROM_HERE, DoNothing(), TimeDelta());
task_tracker.WillPostTask(&task, TaskShutdownBehavior::BLOCK_SHUTDOWN);
std::unique_ptr<ExpectThreadPriorityDelegate> delegate(
new ExpectThreadPriorityDelegate);
ExpectThreadPriorityDelegate* delegate_raw = delegate.get();
......@@ -758,7 +761,7 @@ TEST(TaskSchedulerWorkerTest, BumpPriorityOfAliveThreadDuringShutdown) {
// Verify that the thread priority is bumped to NORMAL during shutdown.
delegate_raw->SetExpectedThreadPriority(ThreadPriority::NORMAL);
task_tracker.StartShutdown();
task_tracker.SetHasShutdownStartedForTesting();
worker->WakeUp();
delegate_raw->WaitForPriorityVerifiedInGetWork();
......
......@@ -35,12 +35,12 @@ TaskScheduler::InitParams::~InitParams() = default;
TaskScheduler::ScopedExecutionFence::ScopedExecutionFence() {
DCHECK(g_task_scheduler);
g_task_scheduler->SetCanRun(false);
g_task_scheduler->SetExecutionFenceEnabled(true);
}
TaskScheduler::ScopedExecutionFence::~ScopedExecutionFence() {
DCHECK(g_task_scheduler);
g_task_scheduler->SetCanRun(true);
g_task_scheduler->SetExecutionFenceEnabled(false);
}
#if !defined(OS_NACL)
......
......@@ -209,9 +209,8 @@ class BASE_EXPORT TaskScheduler : public TaskExecutor {
virtual int GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated(
const TaskTraits& traits) const = 0;
// Sets whether tasks of any / BEST_EFFORT priority are allowed to run.
virtual void SetCanRun(bool can_run) = 0;
virtual void SetCanRunBestEffort(bool can_run) = 0;
// Enables/disables an execution fence that prevents tasks from running.
virtual void SetExecutionFenceEnabled(bool execution_fence_enabled) = 0;
};
} // namespace base
......
......@@ -8,10 +8,8 @@
#include <string>
#include <utility>
#include "base/base_switches.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/command_line.h"
#include "base/compiler_specific.h"
#include "base/feature_list.h"
#include "base/message_loop/message_loop.h"
......@@ -40,15 +38,6 @@ constexpr EnvironmentParams kForegroundPoolEnvironmentParams{
constexpr EnvironmentParams kBackgroundPoolEnvironmentParams{
"Background", base::ThreadPriority::BACKGROUND};
// Indicates whether BEST_EFFORT tasks are disabled by a command line switch.
bool HasDisableBestEffortTasksSwitch() {
// The CommandLine might not be initialized if TaskScheduler is initialized
// in a dynamic library which doesn't have access to argc/argv.
return CommandLine::InitializedForCurrentProcess() &&
CommandLine::ForCurrentProcess()->HasSwitch(
switches::kDisableBestEffortTasks);
}
} // namespace
TaskSchedulerImpl::TaskSchedulerImpl(StringPiece histogram_label)
......@@ -65,7 +54,6 @@ TaskSchedulerImpl::TaskSchedulerImpl(
Unretained(this)))),
single_thread_task_runner_manager_(task_tracker_->GetTrackedRef(),
&delayed_task_manager_),
can_run_best_effort_(!HasDisableBestEffortTasksSwitch()),
tracked_ref_factory_(this) {
DCHECK(!histogram_label.empty());
......@@ -103,9 +91,6 @@ TaskSchedulerImpl::~TaskSchedulerImpl() {
void TaskSchedulerImpl::Start(
const TaskScheduler::InitParams& init_params,
SchedulerWorkerObserver* scheduler_worker_observer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!started_);
internal::InitializeThreadPrioritiesFeature();
// This is set in Start() and not in the constructor because variation params
......@@ -187,8 +172,6 @@ void TaskSchedulerImpl::Start(
service_thread_task_runner, scheduler_worker_observer,
worker_environment);
}
started_ = true;
}
bool TaskSchedulerImpl::PostDelayedTaskWithTraits(const Location& from_here,
......@@ -252,18 +235,7 @@ int TaskSchedulerImpl::GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated(
}
void TaskSchedulerImpl::Shutdown() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
task_tracker_->StartShutdown();
// Allow all tasks to run. Done after initiating shutdown to ensure that non-
// BLOCK_SHUTDOWN tasks don't get a chance to run and that BLOCK_SHUTDOWN
// tasks run with a normal thread priority.
can_run_ = true;
can_run_best_effort_ = true;
UpdateCanRunPolicy();
task_tracker_->CompleteShutdown();
task_tracker_->Shutdown();
}
void TaskSchedulerImpl::FlushForTesting() {
......@@ -292,18 +264,8 @@ void TaskSchedulerImpl::JoinForTesting() {
#endif
}
void TaskSchedulerImpl::SetCanRun(bool can_run) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK_NE(can_run_, can_run);
can_run_ = can_run;
UpdateCanRunPolicy();
}
void TaskSchedulerImpl::SetCanRunBestEffort(bool can_run_best_effort) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK_NE(can_run_best_effort_, can_run_best_effort);
can_run_best_effort_ = can_run_best_effort;
UpdateCanRunPolicy();
void TaskSchedulerImpl::SetExecutionFenceEnabled(bool execution_fence_enabled) {
task_tracker_->SetExecutionFenceEnabled(execution_fence_enabled);
}
bool TaskSchedulerImpl::PostTaskWithSequence(Task task,
......@@ -406,20 +368,6 @@ SchedulerWorkerPool* TaskSchedulerImpl::GetForegroundWorkerPool() {
return &foreground_pool_.value();
}
void TaskSchedulerImpl::UpdateCanRunPolicy() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
const CanRunPolicy can_run_policy =
can_run_ ? (can_run_best_effort_ ? CanRunPolicy::kAll
: CanRunPolicy::kForegroundOnly)
: CanRunPolicy::kNone;
task_tracker_->SetCanRunPolicy(can_run_policy);
GetForegroundWorkerPool()->DidUpdateCanRunPolicy();
if (background_pool_)
background_pool_->DidUpdateCanRunPolicy();
single_thread_task_runner_manager_.DidUpdateCanRunPolicy();
}
TaskTraits TaskSchedulerImpl::SetUserBlockingPriorityIfNeeded(
TaskTraits traits) const {
if (all_tasks_user_blocking_.IsSet())
......
......@@ -14,7 +14,6 @@
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/sequence_checker.h"
#include "base/strings/string_piece.h"
#include "base/synchronization/atomic_flag.h"
#include "base/task/single_thread_task_runner_thread_mode.h"
......@@ -79,8 +78,7 @@ class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler,
void FlushForTesting() override;
void FlushAsyncForTesting(OnceClosure flush_callback) override;
void JoinForTesting() override;
void SetCanRun(bool can_run) override;
void SetCanRunBestEffort(bool can_run_best_effort) override;
void SetExecutionFenceEnabled(bool execution_fence_enabled) override;
// TaskExecutor:
bool PostDelayedTaskWithTraits(const Location& from_here,
......@@ -104,10 +102,6 @@ class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler,
const TaskTraits& traits);
private:
// Invoked after |can_run_| or |can_run_best_effort_| is updated. Sets the
// CanRunPolicy in TaskTracker and wakes up workers as appropriate.
void UpdateCanRunPolicy();
// Returns |traits|, with priority set to TaskPriority::USER_BLOCKING if
// |all_tasks_user_blocking_| is set.
TaskTraits SetUserBlockingPriorityIfNeeded(TaskTraits traits) const;
......@@ -148,15 +142,6 @@ class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler,
Optional<SchedulerWorkerPoolImpl> foreground_pool_;
Optional<SchedulerWorkerPoolImpl> background_pool_;
// Whether this TaskScheduler was started. Access controlled by
// |sequence_checker_|.
bool started_ = false;
// Whether starting to run a Task with any/BEST_EFFORT priority is currently
// allowed. Access controlled by |sequence_checker_|.
bool can_run_ = true;
bool can_run_best_effort_;
#if defined(OS_WIN)
Optional<PlatformNativeWorkerPoolWin> native_foreground_pool_;
#elif defined(OS_MACOSX)
......@@ -173,9 +158,6 @@ class BASE_EXPORT TaskSchedulerImpl : public TaskScheduler,
base::win::ComInitCheckHook com_init_check_hook_;
#endif
// Asserts that operations occur in sequence with Start().
SEQUENCE_CHECKER(sequence_checker_);
TrackedRefFactory<SchedulerWorkerPool::Delegate> tracked_ref_factory_;
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerImpl);
......
......@@ -495,75 +495,6 @@ TEST_P(TaskSchedulerImplTest, FlushAsyncForTestingSimple) {
flush_event.Wait();
}
// Verifies that tasks only run when allowed by SetCanRun().
TEST_P(TaskSchedulerImplTest, SetCanRun) {
StartTaskScheduler();
AtomicFlag can_run;
WaitableEvent did_run;
scheduler_.SetCanRun(false);
CreateTaskRunnerWithTraitsAndExecutionMode(&scheduler_, GetParam().traits,
GetParam().execution_mode)
->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
EXPECT_TRUE(can_run.IsSet());
did_run.Signal();
}));
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
can_run.Set();
scheduler_.SetCanRun(true);
did_run.Wait();
}
// Verifies that a call to SetCanRun(false) before Start() is honored.
TEST_P(TaskSchedulerImplTest, SetCanRunBeforeStart) {
scheduler_.SetCanRun(false);
StartTaskScheduler();
AtomicFlag can_run;
WaitableEvent did_run;
CreateTaskRunnerWithTraitsAndExecutionMode(&scheduler_, GetParam().traits,
GetParam().execution_mode)
->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
EXPECT_TRUE(can_run.IsSet());
did_run.Signal();
}));
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
can_run.Set();
scheduler_.SetCanRun(true);
did_run.Wait();
}
// Verifies that BEST_EFFORT tasks only run when allowed by
// SetCanRunBestEffort().
TEST_P(TaskSchedulerImplTest, SetCanRunBestEffort) {
StartTaskScheduler();
AtomicFlag can_run;
WaitableEvent did_run;
scheduler_.SetCanRunBestEffort(false);
CreateTaskRunnerWithTraitsAndExecutionMode(&scheduler_, GetParam().traits,
GetParam().execution_mode)
->PostTask(
FROM_HERE, BindLambdaForTesting([&]() {
if (GetParam().traits.priority() == TaskPriority::BEST_EFFORT)
EXPECT_TRUE(can_run.IsSet());
did_run.Signal();
}));
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
can_run.Set();
scheduler_.SetCanRunBestEffort(true);
did_run.Wait();
}
INSTANTIATE_TEST_SUITE_P(OneTaskSchedulerImplTestParams,
TaskSchedulerImplTest,
::testing::ValuesIn(GetTaskSchedulerImplTestParams()));
......
This diff is collapsed.
This diff is collapsed.
......@@ -61,9 +61,11 @@ TEST_F(TaskSchedulerTaskTrackerPosixTest, RunTask) {
EXPECT_TRUE(tracker_.WillPostTask(&task, default_traits.shutdown_behavior()));
auto sequence = test::CreateSequenceWithTask(std::move(task), default_traits);
EXPECT_TRUE(
tracker_.WillScheduleSequence(sequence->BeginTransaction(), nullptr));
// Expect RunAndPopNextTask to return nullptr since |sequence| is empty after
// popping a task from it.
EXPECT_FALSE(tracker_.RunAndPopNextTask(sequence));
EXPECT_FALSE(tracker_.RunAndPopNextTask(sequence, nullptr));
EXPECT_TRUE(did_run);
}
......@@ -85,10 +87,12 @@ TEST_F(TaskSchedulerTaskTrackerPosixTest, FileDescriptorWatcher) {
auto sequence = test::CreateSequenceWithTask(
std::move(task), default_traits, MakeRefCounted<NullTaskRunner>(),
TaskSourceExecutionMode::kSequenced);
EXPECT_TRUE(
tracker_.WillScheduleSequence(sequence->BeginTransaction(), nullptr));
// Expect RunAndPopNextTask to return nullptr since |sequence| is empty after
// popping a task from it.
EXPECT_FALSE(tracker_.RunAndPopNextTask(sequence));
EXPECT_FALSE(tracker_.RunAndPopNextTask(sequence, nullptr));
// Join the service thread to make sure that the read watch is registered and
// unregistered before file descriptors are closed.
......
......@@ -58,8 +58,9 @@ scoped_refptr<Sequence> CreateSequenceWithTask(
scoped_refptr<TaskRunner> CreateTaskRunnerWithExecutionMode(
test::ExecutionMode execution_mode,
MockSchedulerTaskRunnerDelegate* mock_scheduler_task_runner_delegate,
const TaskTraits& traits) {
MockSchedulerTaskRunnerDelegate* mock_scheduler_task_runner_delegate) {
// Allow tasks posted to the returned TaskRunner to wait on a WaitableEvent.
const TaskTraits traits = {WithBaseSyncPrimitives()};
switch (execution_mode) {
case test::ExecutionMode::PARALLEL:
return CreateTaskRunnerWithTraits(traits,
......@@ -162,11 +163,6 @@ void MockSchedulerTaskRunnerDelegate::SetWorkerPool(
worker_pool_ = worker_pool;
}
void ShutdownTaskTracker(TaskTracker* task_tracker) {
task_tracker->StartShutdown();
task_tracker->CompleteShutdown();
}
} // namespace test
} // namespace internal
} // namespace base
......@@ -98,8 +98,7 @@ scoped_refptr<Sequence> CreateSequenceWithTask(
// Caveat: this does not support ExecutionMode::SINGLE_THREADED.
scoped_refptr<TaskRunner> CreateTaskRunnerWithExecutionMode(
test::ExecutionMode execution_mode,
MockSchedulerTaskRunnerDelegate* mock_scheduler_task_runner_delegate,
const TaskTraits& traits = TaskTraits());
MockSchedulerTaskRunnerDelegate* mock_scheduler_task_runner_delegate);
scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits(
const TaskTraits& traits,
......@@ -111,9 +110,6 @@ scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunnerWithTraits(
void WaitWithoutBlockingObserver(WaitableEvent* event);
// Calls StartShutdown() and CompleteShutdown() on |task_tracker|.
void ShutdownTaskTracker(TaskTracker* task_tracker);
} // namespace test
} // namespace internal
} // namespace base
......
......@@ -19,7 +19,8 @@ class TaskSchedulerTestHelpers {
// static
void TaskSchedulerTestHelpers::SetTaskSchedulerExecutionFenceEnabledForTesting(
bool execution_fence_enabled) {
TaskScheduler::GetInstance()->SetCanRun(!execution_fence_enabled);
TaskScheduler::GetInstance()->SetExecutionFenceEnabled(
execution_fence_enabled);
}
} // namespace base
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment