Commit ea2b21eb authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[Jobs API]: Create study on Jobs API.

Ablation study to evaluate impact of Jobs API.
This CL splits job specific behavior in several features:
- worker wake up strategy (with additional serial strategy)
- yield
- fair scheduling
- priority update

Additional study specific to Jobs:
wakeup after getwork: This has the potential benefit of reducing lock
contention when workers wakeup.

Bug: 1139440
Change-Id: I5397977352650ee30557dc8972f32df34798b398
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2481522
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#820953}
parent 48e87325
...@@ -81,7 +81,7 @@ bool JobHandle::IsCompleted() const { ...@@ -81,7 +81,7 @@ bool JobHandle::IsCompleted() const {
} }
void JobHandle::UpdatePriority(TaskPriority new_priority) { void JobHandle::UpdatePriority(TaskPriority new_priority) {
task_source_->delegate()->UpdatePriority(task_source_, new_priority); task_source_->delegate()->UpdateJobPriority(task_source_, new_priority);
} }
void JobHandle::NotifyConcurrencyIncrease() { void JobHandle::NotifyConcurrencyIncrease() {
......
...@@ -17,6 +17,30 @@ const Feature kNoDetachBelowInitialCapacity = { ...@@ -17,6 +17,30 @@ const Feature kNoDetachBelowInitialCapacity = {
const Feature kMayBlockWithoutDelay = {"MayBlockWithoutDelay", const Feature kMayBlockWithoutDelay = {"MayBlockWithoutDelay",
base::FEATURE_DISABLED_BY_DEFAULT}; base::FEATURE_DISABLED_BY_DEFAULT};
const Feature kDisableJobYield = {"DisableJobYield",
base::FEATURE_DISABLED_BY_DEFAULT};
const Feature kDisableFairJobScheduling = {"DisableFairJobScheduling",
base::FEATURE_DISABLED_BY_DEFAULT};
const Feature kDisableJobUpdatePriority = {"DisableJobUpdatePriority",
base::FEATURE_DISABLED_BY_DEFAULT};
const Feature kWakeUpStrategyFeature = {"WakeUpStrategyFeature",
base::FEATURE_DISABLED_BY_DEFAULT};
constexpr FeatureParam<WakeUpStrategy>::Option kWakeUpStrategyOptions[] = {
{WakeUpStrategy::kCentralizedWakeUps, "centralized-wakeups"},
{WakeUpStrategy::kSerializedWakeUps, "serialized-wakeups"},
{WakeUpStrategy::kExponentialWakeUps, "exponential-wakeups"}};
const base::FeatureParam<WakeUpStrategy> kWakeUpStrategyParam{
&kWakeUpStrategyFeature, "strategy", WakeUpStrategy::kExponentialWakeUps,
&kWakeUpStrategyOptions};
const Feature kWakeUpAfterGetWork = {"WakeUpAfterGetWork",
base::FEATURE_DISABLED_BY_DEFAULT};
#if defined(OS_WIN) || defined(OS_APPLE) #if defined(OS_WIN) || defined(OS_APPLE)
const Feature kUseNativeThreadPool = {"UseNativeThreadPool", const Feature kUseNativeThreadPool = {"UseNativeThreadPool",
base::FEATURE_DISABLED_BY_DEFAULT}; base::FEATURE_DISABLED_BY_DEFAULT};
......
...@@ -23,6 +23,34 @@ extern const BASE_EXPORT Feature kNoDetachBelowInitialCapacity; ...@@ -23,6 +23,34 @@ extern const BASE_EXPORT Feature kNoDetachBelowInitialCapacity;
// instead of waiting for a threshold in the foreground thread group. // instead of waiting for a threshold in the foreground thread group.
extern const BASE_EXPORT Feature kMayBlockWithoutDelay; extern const BASE_EXPORT Feature kMayBlockWithoutDelay;
// Under this feature, ThreadPool::ShouldYield() always returns false
extern const BASE_EXPORT Feature kDisableJobYield;
// Under this feature, JobTaskSource doesn't use worker count in its sort key
// such that worker threads are not distributed among running jobs equally.
extern const BASE_EXPORT Feature kDisableFairJobScheduling;
// Under this feature, priority update on Jobs is disabled.
extern const BASE_EXPORT Feature kDisableJobUpdatePriority;
// Under this feature, another WorkerThread is signaled only after the current
// thread was assigned work.
extern const BASE_EXPORT Feature kWakeUpAfterGetWork;
// Strategy affecting how WorkerThreads are signaled to pick up pending work.
enum class WakeUpStrategy {
// A single thread scheduling new work signals all required WorkerThreads.
kCentralizedWakeUps,
// Each thread signals at most a single thread, either when scheduling new
// work or picking up pending work.
kSerializedWakeUps,
// Each thread signals at most 2 threads, either when scheduling new
// work or picking up pending work.
kExponentialWakeUps,
};
// Under this feature, a given WakeUpStrategy param is used.
extern const BASE_EXPORT Feature kWakeUpStrategyFeature;
extern const BASE_EXPORT base::FeatureParam<WakeUpStrategy>
kWakeUpStrategyParam;
#if defined(OS_WIN) || defined(OS_APPLE) #if defined(OS_WIN) || defined(OS_APPLE)
#define HAS_NATIVE_THREAD_POOL() 1 #define HAS_NATIVE_THREAD_POOL() 1
#else #else
......
...@@ -347,7 +347,11 @@ bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) { ...@@ -347,7 +347,11 @@ bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) {
GetMaxConcurrency(state_before_sub.worker_count() - 1); GetMaxConcurrency(state_before_sub.worker_count() - 1);
} }
TaskSourceSortKey JobTaskSource::GetSortKey() const { TaskSourceSortKey JobTaskSource::GetSortKey(
bool disable_fair_scheduling) const {
if (disable_fair_scheduling) {
return TaskSourceSortKey(priority_racy(), ready_time_);
}
return TaskSourceSortKey(priority_racy(), ready_time_, return TaskSourceSortKey(priority_racy(), ready_time_,
TS_UNCHECKED_READ(state_).Load().worker_count()); TS_UNCHECKED_READ(state_).Load().worker_count());
} }
......
...@@ -68,7 +68,8 @@ class BASE_EXPORT JobTaskSource : public TaskSource { ...@@ -68,7 +68,8 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// TaskSource: // TaskSource:
ExecutionEnvironment GetExecutionEnvironment() override; ExecutionEnvironment GetExecutionEnvironment() override;
size_t GetRemainingConcurrency() const override; size_t GetRemainingConcurrency() const override;
TaskSourceSortKey GetSortKey() const override; TaskSourceSortKey GetSortKey(
bool disable_fair_scheduling = false) const override;
bool IsCompleted() const; bool IsCompleted() const;
size_t GetWorkerCount() const; size_t GetWorkerCount() const;
......
...@@ -36,6 +36,9 @@ class MockPooledTaskRunnerDelegate : public PooledTaskRunnerDelegate { ...@@ -36,6 +36,9 @@ class MockPooledTaskRunnerDelegate : public PooledTaskRunnerDelegate {
MOCK_METHOD2(UpdatePriority, MOCK_METHOD2(UpdatePriority,
void(scoped_refptr<TaskSource> task_source, void(scoped_refptr<TaskSource> task_source,
TaskPriority priority)); TaskPriority priority));
MOCK_METHOD2(UpdateJobPriority,
void(scoped_refptr<TaskSource> task_source,
TaskPriority priority));
}; };
class ThreadPoolJobTaskSourceTest : public testing::Test { class ThreadPoolJobTaskSourceTest : public testing::Test {
......
...@@ -207,7 +207,10 @@ class WorkerThreadDelegate : public WorkerThread::Delegate { ...@@ -207,7 +207,10 @@ class WorkerThreadDelegate : public WorkerThread::Delegate {
bool EnqueueTaskSource( bool EnqueueTaskSource(
TransactionWithRegisteredTaskSource transaction_with_task_source) { TransactionWithRegisteredTaskSource transaction_with_task_source) {
CheckedAutoLock auto_lock(lock_); CheckedAutoLock auto_lock(lock_);
priority_queue_.Push(std::move(transaction_with_task_source)); auto sort_key = transaction_with_task_source.task_source->GetSortKey(
/* disable_fair_scheduling */ false);
priority_queue_.Push(std::move(transaction_with_task_source.task_source),
sort_key);
if (!worker_awake_ && CanRunNextTaskSource()) { if (!worker_awake_ && CanRunNextTaskSource()) {
worker_awake_ = true; worker_awake_ = true;
return true; return true;
......
...@@ -56,6 +56,8 @@ class BASE_EXPORT PooledTaskRunnerDelegate { ...@@ -56,6 +56,8 @@ class BASE_EXPORT PooledTaskRunnerDelegate {
// thread group. // thread group.
virtual void UpdatePriority(scoped_refptr<TaskSource> task_source, virtual void UpdatePriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) = 0; TaskPriority priority) = 0;
virtual void UpdateJobPriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) = 0;
}; };
} // namespace internal } // namespace internal
......
...@@ -97,13 +97,10 @@ PriorityQueue::~PriorityQueue() { ...@@ -97,13 +97,10 @@ PriorityQueue::~PriorityQueue() {
PriorityQueue& PriorityQueue::operator=(PriorityQueue&& other) = default; PriorityQueue& PriorityQueue::operator=(PriorityQueue&& other) = default;
void PriorityQueue::Push( void PriorityQueue::Push(RegisteredTaskSource task_source,
TransactionWithRegisteredTaskSource transaction_with_task_source) { TaskSourceSortKey task_source_sort_key) {
auto task_source_sort_key =
transaction_with_task_source.task_source->GetSortKey();
container_.insert( container_.insert(
TaskSourceAndSortKey(std::move(transaction_with_task_source.task_source), TaskSourceAndSortKey(std::move(task_source), task_source_sort_key));
task_source_sort_key));
IncrementNumTaskSourcesForPriority(task_source_sort_key.priority()); IncrementNumTaskSourcesForPriority(task_source_sort_key.priority());
} }
......
...@@ -28,7 +28,8 @@ class BASE_EXPORT PriorityQueue { ...@@ -28,7 +28,8 @@ class BASE_EXPORT PriorityQueue {
PriorityQueue& operator=(PriorityQueue&& other); PriorityQueue& operator=(PriorityQueue&& other);
// Inserts |task_source| in the PriorityQueue with |task_source_sort_key|. // Inserts |task_source| in the PriorityQueue with |task_source_sort_key|.
void Push(TransactionWithRegisteredTaskSource transaction_with_task_source); void Push(RegisteredTaskSource task_source,
TaskSourceSortKey task_source_sort_key);
// Returns a reference to the TaskSourceSortKey representing the priority of // Returns a reference to the TaskSourceSortKey representing the priority of
// the highest pending task in this PriorityQueue. The reference becomes // the highest pending task in this PriorityQueue. The reference becomes
......
...@@ -50,8 +50,9 @@ class PriorityQueueWithSequencesTest : public testing::Test { ...@@ -50,8 +50,9 @@ class PriorityQueueWithSequencesTest : public testing::Test {
} }
void Push(scoped_refptr<TaskSource> task_source) { void Push(scoped_refptr<TaskSource> task_source) {
pq.Push(TransactionWithRegisteredTaskSource::FromTaskSource( auto sort_key = task_source->GetSortKey(false);
RegisteredTaskSource::CreateForTesting(std::move(task_source)))); pq.Push(RegisteredTaskSource::CreateForTesting(std::move(task_source)),
sort_key);
} }
test::TaskEnvironment task_environment{ test::TaskEnvironment task_environment{
...@@ -59,19 +60,19 @@ class PriorityQueueWithSequencesTest : public testing::Test { ...@@ -59,19 +60,19 @@ class PriorityQueueWithSequencesTest : public testing::Test {
scoped_refptr<TaskSource> sequence_a = scoped_refptr<TaskSource> sequence_a =
MakeSequenceWithTraitsAndTask(TaskTraits(TaskPriority::USER_VISIBLE)); MakeSequenceWithTraitsAndTask(TaskTraits(TaskPriority::USER_VISIBLE));
TaskSourceSortKey sort_key_a = sequence_a->GetSortKey(); TaskSourceSortKey sort_key_a = sequence_a->GetSortKey(false);
scoped_refptr<TaskSource> sequence_b = scoped_refptr<TaskSource> sequence_b =
MakeSequenceWithTraitsAndTask(TaskTraits(TaskPriority::USER_BLOCKING)); MakeSequenceWithTraitsAndTask(TaskTraits(TaskPriority::USER_BLOCKING));
TaskSourceSortKey sort_key_b = sequence_b->GetSortKey(); TaskSourceSortKey sort_key_b = sequence_b->GetSortKey(false);
scoped_refptr<TaskSource> sequence_c = scoped_refptr<TaskSource> sequence_c =
MakeSequenceWithTraitsAndTask(TaskTraits(TaskPriority::USER_BLOCKING)); MakeSequenceWithTraitsAndTask(TaskTraits(TaskPriority::USER_BLOCKING));
TaskSourceSortKey sort_key_c = sequence_c->GetSortKey(); TaskSourceSortKey sort_key_c = sequence_c->GetSortKey(false);
scoped_refptr<TaskSource> sequence_d = scoped_refptr<TaskSource> sequence_d =
MakeSequenceWithTraitsAndTask(TaskTraits(TaskPriority::BEST_EFFORT)); MakeSequenceWithTraitsAndTask(TaskTraits(TaskPriority::BEST_EFFORT));
TaskSourceSortKey sort_key_d = sequence_d->GetSortKey(); TaskSourceSortKey sort_key_d = sequence_d->GetSortKey(false);
PriorityQueue pq; PriorityQueue pq;
}; };
...@@ -193,7 +194,7 @@ TEST_F(PriorityQueueWithSequencesTest, UpdateSortKey) { ...@@ -193,7 +194,7 @@ TEST_F(PriorityQueueWithSequencesTest, UpdateSortKey) {
auto sequence_b_transaction = sequence_b->BeginTransaction(); auto sequence_b_transaction = sequence_b->BeginTransaction();
sequence_b_transaction.UpdatePriority(TaskPriority::BEST_EFFORT); sequence_b_transaction.UpdatePriority(TaskPriority::BEST_EFFORT);
pq.UpdateSortKey(*sequence_b, sequence_b->GetSortKey()); pq.UpdateSortKey(*sequence_b, sequence_b->GetSortKey(false));
EXPECT_EQ(sort_key_c, pq.PeekSortKey()); EXPECT_EQ(sort_key_c, pq.PeekSortKey());
ExpectNumSequences(2U, 1U, 1U); ExpectNumSequences(2U, 1U, 1U);
} }
...@@ -205,7 +206,7 @@ TEST_F(PriorityQueueWithSequencesTest, UpdateSortKey) { ...@@ -205,7 +206,7 @@ TEST_F(PriorityQueueWithSequencesTest, UpdateSortKey) {
auto sequence_c_transaction = sequence_c->BeginTransaction(); auto sequence_c_transaction = sequence_c->BeginTransaction();
sequence_c_transaction.UpdatePriority(TaskPriority::USER_BLOCKING); sequence_c_transaction.UpdatePriority(TaskPriority::USER_BLOCKING);
pq.UpdateSortKey(*sequence_c, sequence_c->GetSortKey()); pq.UpdateSortKey(*sequence_c, sequence_c->GetSortKey(false));
ExpectNumSequences(2U, 1U, 1U); ExpectNumSequences(2U, 1U, 1U);
// Note: |sequence_c| is popped for comparison as |sort_key_c| becomes // Note: |sequence_c| is popped for comparison as |sort_key_c| becomes
...@@ -222,7 +223,7 @@ TEST_F(PriorityQueueWithSequencesTest, UpdateSortKey) { ...@@ -222,7 +223,7 @@ TEST_F(PriorityQueueWithSequencesTest, UpdateSortKey) {
auto sequence_d_and_transaction = sequence_d->BeginTransaction(); auto sequence_d_and_transaction = sequence_d->BeginTransaction();
sequence_d_and_transaction.UpdatePriority(TaskPriority::USER_BLOCKING); sequence_d_and_transaction.UpdatePriority(TaskPriority::USER_BLOCKING);
pq.UpdateSortKey(*sequence_d, sequence_d->GetSortKey()); pq.UpdateSortKey(*sequence_d, sequence_d->GetSortKey(false));
ExpectNumSequences(1U, 1U, 1U); ExpectNumSequences(1U, 1U, 1U);
// Note: |sequence_d| is popped for comparison as |sort_key_d| becomes // Note: |sequence_d| is popped for comparison as |sort_key_d| becomes
...@@ -235,7 +236,7 @@ TEST_F(PriorityQueueWithSequencesTest, UpdateSortKey) { ...@@ -235,7 +236,7 @@ TEST_F(PriorityQueueWithSequencesTest, UpdateSortKey) {
} }
{ {
pq.UpdateSortKey(*sequence_d, sequence_d->GetSortKey()); pq.UpdateSortKey(*sequence_d, sequence_d->GetSortKey(false));
ExpectNumSequences(1U, 1U, 0U); ExpectNumSequences(1U, 1U, 0U);
EXPECT_EQ(sequence_a, pq.PopTaskSource().Unregister()); EXPECT_EQ(sequence_a, pq.PopTaskSource().Unregister());
ExpectNumSequences(1U, 0U, 0U); ExpectNumSequences(1U, 0U, 0U);
...@@ -245,7 +246,7 @@ TEST_F(PriorityQueueWithSequencesTest, UpdateSortKey) { ...@@ -245,7 +246,7 @@ TEST_F(PriorityQueueWithSequencesTest, UpdateSortKey) {
{ {
// No-op if UpdateSortKey() is called on an empty PriorityQueue. // No-op if UpdateSortKey() is called on an empty PriorityQueue.
pq.UpdateSortKey(*sequence_b, sequence_b->GetSortKey()); pq.UpdateSortKey(*sequence_b, sequence_b->GetSortKey(false));
EXPECT_TRUE(pq.IsEmpty()); EXPECT_TRUE(pq.IsEmpty());
ExpectNumSequences(0U, 0U, 0U); ExpectNumSequences(0U, 0U, 0U);
} }
......
...@@ -108,7 +108,8 @@ bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) { ...@@ -108,7 +108,8 @@ bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) {
return true; return true;
} }
TaskSourceSortKey Sequence::GetSortKey() const { TaskSourceSortKey Sequence::GetSortKey(
bool /* disable_fair_scheduling */) const {
return TaskSourceSortKey(priority_racy(), return TaskSourceSortKey(priority_racy(),
ready_time_.load(std::memory_order_relaxed)); ready_time_.load(std::memory_order_relaxed));
} }
......
...@@ -86,7 +86,8 @@ class BASE_EXPORT Sequence : public TaskSource { ...@@ -86,7 +86,8 @@ class BASE_EXPORT Sequence : public TaskSource {
// TaskSource: // TaskSource:
ExecutionEnvironment GetExecutionEnvironment() override; ExecutionEnvironment GetExecutionEnvironment() override;
size_t GetRemainingConcurrency() const override; size_t GetRemainingConcurrency() const override;
TaskSourceSortKey GetSortKey() const override; TaskSourceSortKey GetSortKey(
bool disable_fair_scheduling = false) const override;
// Returns a token that uniquely identifies this Sequence. // Returns a token that uniquely identifies this Sequence.
const SequenceToken& token() const { return token_; } const SequenceToken& token() const { return token_; }
......
...@@ -145,7 +145,7 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> { ...@@ -145,7 +145,7 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
virtual size_t GetRemainingConcurrency() const = 0; virtual size_t GetRemainingConcurrency() const = 0;
// Returns a TaskSourceSortKey representing the priority of the TaskSource. // Returns a TaskSourceSortKey representing the priority of the TaskSource.
virtual TaskSourceSortKey GetSortKey() const = 0; virtual TaskSourceSortKey GetSortKey(bool disable_fair_scheduling) const = 0;
// Support for IntrusiveHeap. // Support for IntrusiveHeap.
void SetHeapHandle(const HeapHandle& handle); void SetHeapHandle(const HeapHandle& handle);
......
...@@ -204,7 +204,7 @@ void MockPooledTaskRunnerDelegate::PostTaskWithSequenceNow( ...@@ -204,7 +204,7 @@ void MockPooledTaskRunnerDelegate::PostTaskWithSequenceNow(
} }
bool MockPooledTaskRunnerDelegate::ShouldYield(const TaskSource* task_source) { bool MockPooledTaskRunnerDelegate::ShouldYield(const TaskSource* task_source) {
return thread_group_->ShouldYield(task_source->GetSortKey()); return thread_group_->ShouldYield(task_source->GetSortKey(false));
} }
bool MockPooledTaskRunnerDelegate::EnqueueJobTaskSource( bool MockPooledTaskRunnerDelegate::EnqueueJobTaskSource(
...@@ -237,6 +237,12 @@ void MockPooledTaskRunnerDelegate::UpdatePriority( ...@@ -237,6 +237,12 @@ void MockPooledTaskRunnerDelegate::UpdatePriority(
thread_group_->UpdateSortKey(std::move(transaction)); thread_group_->UpdateSortKey(std::move(transaction));
} }
void MockPooledTaskRunnerDelegate::UpdateJobPriority(
scoped_refptr<TaskSource> task_source,
TaskPriority priority) {
UpdatePriority(std::move(task_source), priority);
}
void MockPooledTaskRunnerDelegate::SetThreadGroup(ThreadGroup* thread_group) { void MockPooledTaskRunnerDelegate::SetThreadGroup(ThreadGroup* thread_group) {
thread_group_ = thread_group; thread_group_ = thread_group;
} }
......
...@@ -66,6 +66,8 @@ class MockPooledTaskRunnerDelegate : public PooledTaskRunnerDelegate { ...@@ -66,6 +66,8 @@ class MockPooledTaskRunnerDelegate : public PooledTaskRunnerDelegate {
bool ShouldYield(const TaskSource* task_source) override; bool ShouldYield(const TaskSource* task_source) override;
void UpdatePriority(scoped_refptr<TaskSource> task_source, void UpdatePriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) override; TaskPriority priority) override;
void UpdateJobPriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) override;
void SetThreadGroup(ThreadGroup* thread_group); void SetThreadGroup(ThreadGroup* thread_group);
......
...@@ -8,7 +8,9 @@ ...@@ -8,7 +8,9 @@
#include "base/bind.h" #include "base/bind.h"
#include "base/bind_helpers.h" #include "base/bind_helpers.h"
#include "base/feature_list.h"
#include "base/lazy_instance.h" #include "base/lazy_instance.h"
#include "base/task/task_features.h"
#include "base/task/thread_pool/task_tracker.h" #include "base/task/thread_pool/task_tracker.h"
#include "base/threading/thread_local.h" #include "base/threading/thread_local.h"
...@@ -94,6 +96,11 @@ bool ThreadGroup::IsBoundToCurrentThread() const { ...@@ -94,6 +96,11 @@ bool ThreadGroup::IsBoundToCurrentThread() const {
return GetCurrentThreadGroup() == this; return GetCurrentThreadGroup() == this;
} }
void ThreadGroup::Start() {
CheckedAutoLock auto_lock(lock_);
disable_fair_scheduling_ = FeatureList::IsEnabled(kDisableFairJobScheduling);
}
size_t size_t
ThreadGroup::GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() ThreadGroup::GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired()
const { const {
...@@ -164,7 +171,10 @@ void ThreadGroup::ReEnqueueTaskSourceLockRequired( ...@@ -164,7 +171,10 @@ void ThreadGroup::ReEnqueueTaskSourceLockRequired(
} else { } else {
// If the TaskSource should be reenqueued in the current thread group, // If the TaskSource should be reenqueued in the current thread group,
// reenqueue it inside the scope of the lock. // reenqueue it inside the scope of the lock.
priority_queue_.Push(std::move(transaction_with_task_source)); auto sort_key = transaction_with_task_source.task_source->GetSortKey(
disable_fair_scheduling_);
priority_queue_.Push(std::move(transaction_with_task_source.task_source),
sort_key);
} }
// This is called unconditionally to ensure there are always workers to run // This is called unconditionally to ensure there are always workers to run
// task sources in the queue. Some ThreadGroup implementations only invoke // task sources in the queue. Some ThreadGroup implementations only invoke
...@@ -207,15 +217,19 @@ RegisteredTaskSource ThreadGroup::TakeRegisteredTaskSource( ...@@ -207,15 +217,19 @@ RegisteredTaskSource ThreadGroup::TakeRegisteredTaskSource(
return priority_queue_.PopTaskSource(); return priority_queue_.PopTaskSource();
// Replace the top task_source and then update the queue. // Replace the top task_source and then update the queue.
std::swap(priority_queue_.PeekTaskSource(), task_source); std::swap(priority_queue_.PeekTaskSource(), task_source);
priority_queue_.UpdateSortKey(*task_source.get(), task_source->GetSortKey()); if (!disable_fair_scheduling_) {
priority_queue_.UpdateSortKey(*task_source.get(),
task_source->GetSortKey(false));
}
return task_source; return task_source;
} }
void ThreadGroup::UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor, void ThreadGroup::UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
TaskSource::Transaction transaction) { TaskSource::Transaction transaction) {
CheckedAutoLock auto_lock(lock_); CheckedAutoLock auto_lock(lock_);
priority_queue_.UpdateSortKey(*transaction.task_source(), priority_queue_.UpdateSortKey(
transaction.task_source()->GetSortKey()); *transaction.task_source(),
transaction.task_source()->GetSortKey(disable_fair_scheduling_));
EnsureEnoughWorkersLockRequired(executor); EnsureEnoughWorkersLockRequired(executor);
} }
...@@ -234,7 +248,10 @@ void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl( ...@@ -234,7 +248,10 @@ void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl(
std::move(transaction_with_task_source.task_source)); std::move(transaction_with_task_source.task_source));
return; return;
} }
priority_queue_.Push(std::move(transaction_with_task_source)); auto sort_key = transaction_with_task_source.task_source->GetSortKey(
disable_fair_scheduling_);
priority_queue_.Push(std::move(transaction_with_task_source.task_source),
sort_key);
EnsureEnoughWorkersLockRequired(executor); EnsureEnoughWorkersLockRequired(executor);
} }
...@@ -249,6 +266,7 @@ void ThreadGroup::InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup( ...@@ -249,6 +266,7 @@ void ThreadGroup::InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup(
bool ThreadGroup::ShouldYield(TaskSourceSortKey sort_key) { bool ThreadGroup::ShouldYield(TaskSourceSortKey sort_key) {
DCHECK(TS_UNCHECKED_READ(max_allowed_sort_key_).is_lock_free()); DCHECK(TS_UNCHECKED_READ(max_allowed_sort_key_).is_lock_free());
if (!task_tracker_->CanRunPriority(sort_key.priority())) if (!task_tracker_->CanRunPriority(sort_key.priority()))
return true; return true;
// It is safe to read |max_allowed_sort_key_| without a lock since this // It is safe to read |max_allowed_sort_key_| without a lock since this
......
...@@ -176,6 +176,8 @@ class BASE_EXPORT ThreadGroup { ...@@ -176,6 +176,8 @@ class BASE_EXPORT ThreadGroup {
const TrackedRef<TaskTracker> task_tracker_; const TrackedRef<TaskTracker> task_tracker_;
const TrackedRef<Delegate> delegate_; const TrackedRef<Delegate> delegate_;
void Start();
// Returns the number of workers required of workers to run all queued // Returns the number of workers required of workers to run all queued
// BEST_EFFORT task sources allowed to run by the current CanRunPolicy. // BEST_EFFORT task sources allowed to run by the current CanRunPolicy.
size_t GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const size_t GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const
...@@ -221,6 +223,8 @@ class BASE_EXPORT ThreadGroup { ...@@ -221,6 +223,8 @@ class BASE_EXPORT ThreadGroup {
// within its scope (no thread creation or wake up). // within its scope (no thread creation or wake up).
mutable CheckedLock lock_; mutable CheckedLock lock_;
bool disable_fair_scheduling_ GUARDED_BY(lock_){false};
// PriorityQueue from which all threads of this ThreadGroup get work. // PriorityQueue from which all threads of this ThreadGroup get work.
PriorityQueue priority_queue_ GUARDED_BY(lock_); PriorityQueue priority_queue_ GUARDED_BY(lock_);
......
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
#include "base/sequence_token.h" #include "base/sequence_token.h"
#include "base/strings/string_util.h" #include "base/strings/string_util.h"
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
#include "base/task/task_features.h"
#include "base/task/task_traits.h" #include "base/task/task_traits.h"
#include "base/task/thread_pool/task_tracker.h" #include "base/task/thread_pool/task_tracker.h"
#include "base/threading/platform_thread.h" #include "base/threading/platform_thread.h"
...@@ -375,8 +374,12 @@ void ThreadGroupImpl::Start( ...@@ -375,8 +374,12 @@ void ThreadGroupImpl::Start(
WorkerEnvironment worker_environment, WorkerEnvironment worker_environment,
bool synchronous_thread_start_for_testing, bool synchronous_thread_start_for_testing,
Optional<TimeDelta> may_block_threshold) { Optional<TimeDelta> may_block_threshold) {
ThreadGroup::Start();
DCHECK(!replacement_thread_group_); DCHECK(!replacement_thread_group_);
in_start().wakeup_after_getwork = FeatureList::IsEnabled(kWakeUpAfterGetWork);
in_start().wakeup_strategy = kWakeUpStrategyParam.Get();
in_start().may_block_without_delay = in_start().may_block_without_delay =
FeatureList::IsEnabled(kMayBlockWithoutDelay); FeatureList::IsEnabled(kMayBlockWithoutDelay);
in_start().may_block_threshold = in_start().may_block_threshold =
...@@ -587,8 +590,12 @@ RegisteredTaskSource ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork( ...@@ -587,8 +590,12 @@ RegisteredTaskSource ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork(
// Use this opportunity, before assigning work to this worker, to create/wake // Use this opportunity, before assigning work to this worker, to create/wake
// additional workers if needed (doing this here allows us to reduce // additional workers if needed (doing this here allows us to reduce
// potentially expensive create/wake directly on PostTask()). // potentially expensive create/wake directly on PostTask()).
outer_->EnsureEnoughWorkersLockRequired(&executor); if (!outer_->after_start().wakeup_after_getwork &&
executor.FlushWorkerCreation(&outer_->lock_); outer_->after_start().wakeup_strategy !=
WakeUpStrategy::kCentralizedWakeUps) {
outer_->EnsureEnoughWorkersLockRequired(&executor);
executor.FlushWorkerCreation(&outer_->lock_);
}
if (!CanGetWorkLockRequired(&executor, worker)) if (!CanGetWorkLockRequired(&executor, worker))
return nullptr; return nullptr;
...@@ -619,6 +626,12 @@ RegisteredTaskSource ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork( ...@@ -619,6 +626,12 @@ RegisteredTaskSource ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork(
DCHECK(!outer_->idle_workers_stack_.Contains(worker)); DCHECK(!outer_->idle_workers_stack_.Contains(worker));
write_worker().current_task_priority = priority; write_worker().current_task_priority = priority;
if (outer_->after_start().wakeup_after_getwork &&
outer_->after_start().wakeup_strategy !=
WakeUpStrategy::kCentralizedWakeUps) {
outer_->EnsureEnoughWorkersLockRequired(&executor);
}
return task_source; return task_source;
} }
...@@ -1015,7 +1028,12 @@ void ThreadGroupImpl::EnsureEnoughWorkersLockRequired( ...@@ -1015,7 +1028,12 @@ void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
size_t num_workers_to_wake_up = size_t num_workers_to_wake_up =
ClampSub(desired_num_awake_workers, num_awake_workers); ClampSub(desired_num_awake_workers, num_awake_workers);
num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U)); if (after_start().wakeup_strategy == WakeUpStrategy::kExponentialWakeUps) {
num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));
} else if (after_start().wakeup_strategy ==
WakeUpStrategy::kSerializedWakeUps) {
num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(1U));
}
// Wake up the appropriate number of workers. // Wake up the appropriate number of workers.
for (size_t i = 0; i < num_workers_to_wake_up; ++i) { for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "base/strings/string_piece.h" #include "base/strings/string_piece.h"
#include "base/synchronization/condition_variable.h" #include "base/synchronization/condition_variable.h"
#include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event.h"
#include "base/task/task_features.h"
#include "base/task/thread_pool/task.h" #include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h" #include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/thread_group.h" #include "base/task/thread_pool/thread_group.h"
...@@ -246,6 +247,8 @@ class BASE_EXPORT ThreadGroupImpl : public ThreadGroup { ...@@ -246,6 +247,8 @@ class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
// Optional observer notified when a worker enters and exits its main. // Optional observer notified when a worker enters and exits its main.
WorkerThreadObserver* worker_thread_observer = nullptr; WorkerThreadObserver* worker_thread_observer = nullptr;
WakeUpStrategy wakeup_strategy;
bool wakeup_after_getwork;
bool may_block_without_delay; bool may_block_without_delay;
// Threshold after which the max tasks is increased to compensate for a // Threshold after which the max tasks is increased to compensate for a
......
...@@ -54,6 +54,8 @@ ThreadGroupNative::~ThreadGroupNative() { ...@@ -54,6 +54,8 @@ ThreadGroupNative::~ThreadGroupNative() {
} }
void ThreadGroupNative::Start(WorkerEnvironment worker_environment) { void ThreadGroupNative::Start(WorkerEnvironment worker_environment) {
ThreadGroup::Start();
worker_environment_ = worker_environment; worker_environment_ = worker_environment;
StartImpl(); StartImpl();
......
...@@ -122,6 +122,11 @@ void ThreadPoolImpl::Start(const ThreadPoolInstance::InitParams& init_params, ...@@ -122,6 +122,11 @@ void ThreadPoolImpl::Start(const ThreadPoolInstance::InitParams& init_params,
internal::InitializeThreadPrioritiesFeature(); internal::InitializeThreadPrioritiesFeature();
disable_job_yield_ = FeatureList::IsEnabled(kDisableJobYield);
disable_fair_scheduling_ = FeatureList::IsEnabled(kDisableFairJobScheduling);
disable_job_update_priority_ =
FeatureList::IsEnabled(kDisableJobUpdatePriority);
// The max number of concurrent BEST_EFFORT tasks is |kMaxBestEffortTasks|, // The max number of concurrent BEST_EFFORT tasks is |kMaxBestEffortTasks|,
// unless the max number of foreground threads is lower. // unless the max number of foreground threads is lower.
const int max_best_effort_tasks = const int max_best_effort_tasks =
...@@ -430,6 +435,8 @@ bool ThreadPoolImpl::PostTaskWithSequence(Task task, ...@@ -430,6 +435,8 @@ bool ThreadPoolImpl::PostTaskWithSequence(Task task,
} }
bool ThreadPoolImpl::ShouldYield(const TaskSource* task_source) { bool ThreadPoolImpl::ShouldYield(const TaskSource* task_source) {
if (disable_job_yield_)
return false;
const TaskPriority priority = task_source->priority_racy(); const TaskPriority priority = task_source->priority_racy();
auto* const thread_group = auto* const thread_group =
GetThreadGroupForTraits({priority, task_source->thread_policy()}); GetThreadGroupForTraits({priority, task_source->thread_policy()});
...@@ -438,7 +445,7 @@ bool ThreadPoolImpl::ShouldYield(const TaskSource* task_source) { ...@@ -438,7 +445,7 @@ bool ThreadPoolImpl::ShouldYield(const TaskSource* task_source) {
if (!thread_group->IsBoundToCurrentThread()) if (!thread_group->IsBoundToCurrentThread())
return true; return true;
return GetThreadGroupForTraits({priority, task_source->thread_policy()}) return GetThreadGroupForTraits({priority, task_source->thread_policy()})
->ShouldYield(task_source->GetSortKey()); ->ShouldYield(task_source->GetSortKey(disable_fair_scheduling_));
} }
bool ThreadPoolImpl::EnqueueJobTaskSource( bool ThreadPoolImpl::EnqueueJobTaskSource(
...@@ -499,6 +506,13 @@ void ThreadPoolImpl::UpdatePriority(scoped_refptr<TaskSource> task_source, ...@@ -499,6 +506,13 @@ void ThreadPoolImpl::UpdatePriority(scoped_refptr<TaskSource> task_source,
} }
} }
void ThreadPoolImpl::UpdateJobPriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) {
if (disable_job_update_priority_)
return;
UpdatePriority(std::move(task_source), priority);
}
const ThreadGroup* ThreadPoolImpl::GetThreadGroupForTraits( const ThreadGroup* ThreadPoolImpl::GetThreadGroupForTraits(
const TaskTraits& traits) const { const TaskTraits& traits) const {
return const_cast<ThreadPoolImpl*>(this)->GetThreadGroupForTraits(traits); return const_cast<ThreadPoolImpl*>(this)->GetThreadGroupForTraits(traits);
......
...@@ -108,6 +108,8 @@ class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance, ...@@ -108,6 +108,8 @@ class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance,
void RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source) override; void RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source) override;
void UpdatePriority(scoped_refptr<TaskSource> task_source, void UpdatePriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) override; TaskPriority priority) override;
void UpdateJobPriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) override;
// Returns the TimeTicks of the next task scheduled on ThreadPool (Now() if // Returns the TimeTicks of the next task scheduled on ThreadPool (Now() if
// immediate, nullopt if none). This is thread-safe, i.e., it's safe if tasks // immediate, nullopt if none). This is thread-safe, i.e., it's safe if tasks
...@@ -167,6 +169,10 @@ class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance, ...@@ -167,6 +169,10 @@ class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance,
std::unique_ptr<ThreadGroup> foreground_thread_group_; std::unique_ptr<ThreadGroup> foreground_thread_group_;
std::unique_ptr<ThreadGroupImpl> background_thread_group_; std::unique_ptr<ThreadGroupImpl> background_thread_group_;
bool disable_job_yield_ = false;
bool disable_fair_scheduling_ = false;
std::atomic<bool> disable_job_update_priority_{false};
// Whether this TaskScheduler was started. Access controlled by // Whether this TaskScheduler was started. Access controlled by
// |sequence_checker_|. // |sequence_checker_|.
bool started_ = false; bool started_ = false;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment