Commit 5577143d authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[ThreadPool]: Implement JobTaskSource::Cancel.

This CL uses the lower bit of worker_count to indicate that
a JobTaskSource is canceled. It implements the logic
to cancel the task source and force workers to yield.

Bug: 839091
Change-Id: Ia621cd174d57dabdfa1df74e0b48a7afb20e8b51
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1772239
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#696907}
parent 11a39b97
......@@ -42,6 +42,7 @@ bool JobDelegate::ShouldYield() {
AssertExpectedConcurrency(recorded_max_concurrency_);
#endif // DCHECK_IS_ON()
const bool should_yield =
task_source_->ShouldYield() ||
pooled_task_runner_delegate_->ShouldYield(task_source_);
#if DCHECK_IS_ON()
......
......@@ -19,6 +19,39 @@
namespace base {
namespace internal {
JobTaskSource::State::State() = default;
JobTaskSource::State::~State() = default;
JobTaskSource::State::Value JobTaskSource::State::Cancel() {
return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)};
}
JobTaskSource::State::Value
JobTaskSource::State::TryIncrementWorkerCountRelease(size_t max_concurrency) {
uint32_t value_before_add = value_.load(std::memory_order_relaxed);
// std::memory_order_release on success to establish Release-Acquire ordering
// with DecrementWorkerCountAcquire() (see WillRunTask()).
while (!(value_before_add & kCanceledMask) &&
(value_before_add >> kWorkerCountBitOffset) < max_concurrency &&
!value_.compare_exchange_weak(
value_before_add, value_before_add + kWorkerCountIncrement,
std::memory_order_release, std::memory_order_relaxed)) {
}
return {value_before_add};
}
JobTaskSource::State::Value
JobTaskSource::State::DecrementWorkerCountAcquire() {
const size_t value_before_sub = value_.fetch_sub(kWorkerCountIncrement);
DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
return {value_before_sub};
}
JobTaskSource::State::Value JobTaskSource::State::Load() const {
return {value_.load(std::memory_order_relaxed)};
}
JobTaskSource::JobTaskSource(
const Location& from_here,
const TaskTraits& traits,
......@@ -44,48 +77,46 @@ JobTaskSource::JobTaskSource(
}
JobTaskSource::~JobTaskSource() {
#if DCHECK_IS_ON()
auto worker_count = worker_count_.load(std::memory_order_relaxed);
// Make sure there's no outstanding active run operation left.
DCHECK(worker_count == 0U || worker_count == kInvalidWorkerCount)
<< worker_count;
#endif
DCHECK_EQ(state_.Load().worker_count(), 0U);
}
ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
return {SequenceToken::Create(), nullptr};
}
void JobTaskSource::Cancel(TaskSource::Transaction* transaction) {
// Sets the kCanceledMask bit on |state_| so that further calls to
// WillRunTask() never succeed. std::memory_order_relaxed is sufficient
// because this task source never needs to be re-enqueued after Cancel().
state_.Cancel();
}
TaskSource::RunStatus JobTaskSource::WillRunTask() {
// When this call is caused by an increase of max concurrency followed by an
// associated NotifyConcurrencyIncrease(), the priority queue lock guarantees
// an happens-after relation with NotifyConcurrencyIncrease(). The memory
// operations on |worker_count| below and in DidProcessTask() use
// operations on |state| below and in DidProcessTask() use
// std::memory_order_release and std::memory_order_acquire respectively to
// establish a Release-Acquire ordering. This ensures that all memory
// side-effects made before this point, including an increase of max
// concurrency followed by NotifyConcurrencyIncrease() are visible to a
// DidProcessTask() call which is ordered after this one.
const size_t max_concurrency = GetMaxConcurrency();
size_t worker_count_before_add =
worker_count_.load(std::memory_order_relaxed);
// std::memory_order_release on success to make the newest |max_concurrency|
// visible to a thread that calls DidProcessTask() containing a matching
// std::memory_order_acquire.
while (worker_count_before_add < max_concurrency &&
!worker_count_.compare_exchange_weak(
worker_count_before_add, worker_count_before_add + 1,
std::memory_order_release, std::memory_order_relaxed)) {
}
const auto state_before_add =
state_.TryIncrementWorkerCountRelease(max_concurrency);
// Don't allow this worker to run the task if either:
// A) |worker_count_| is already at |max_concurrency|.
// B) |max_concurrency| was lowered below or to |worker_count_|.
// C) |worker_count_| was invalidated.
if (worker_count_before_add >= max_concurrency) {
// The caller is prevented from running a task from this TaskSource.
// A) |state_| was canceled.
// B) |worker_count| is already at |max_concurrency|.
// C) |max_concurrency| was lowered below or to |worker_count|.
// Case A:
if (state_before_add.is_canceled())
return RunStatus::kDisallowed;
const size_t worker_count_before_add = state_before_add.worker_count();
// Case B) or C):
if (worker_count_before_add >= max_concurrency)
return RunStatus::kDisallowed;
}
DCHECK_LT(worker_count_before_add, max_concurrency);
return max_concurrency == worker_count_before_add + 1
......@@ -96,12 +127,12 @@ TaskSource::RunStatus JobTaskSource::WillRunTask() {
size_t JobTaskSource::GetRemainingConcurrency() const {
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with GetRemainingConcurrency().
const size_t worker_count = worker_count_.load(std::memory_order_relaxed);
const auto state = state_.Load();
const size_t max_concurrency = GetMaxConcurrency();
// Avoid underflows.
if (worker_count > max_concurrency)
if (state.is_canceled() || state.worker_count() > max_concurrency)
return 0;
return max_concurrency - worker_count;
return max_concurrency - state.worker_count();
}
void JobTaskSource::NotifyConcurrencyIncrease() {
......@@ -125,6 +156,10 @@ size_t JobTaskSource::GetMaxConcurrency() const {
return max_concurrency_callback_.Run();
}
bool JobTaskSource::ShouldYield() const {
return state_.Load().is_canceled();
}
#if DCHECK_IS_ON()
size_t JobTaskSource::GetConcurrencyIncreaseVersion() const {
......@@ -151,16 +186,15 @@ bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
#endif // DCHECK_IS_ON()
Optional<Task> JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
DCHECK_GT(worker_count_.load(std::memory_order_relaxed), 0U);
// JobTaskSource members are not lock-protected so no need to acquire a lock
// if |transaction| is nullptr.
DCHECK_GT(state_.Load().worker_count(), 0U);
DCHECK(worker_task_);
return base::make_optional<Task>(from_here_, worker_task_, TimeDelta());
}
bool JobTaskSource::DidProcessTask(TaskSource::Transaction* transaction) {
size_t worker_count_before_sub =
worker_count_.load(std::memory_order_relaxed);
// std::memory_order_acquire on |worker_count_| is necessary to establish
// std::memory_order_acquire on |state_| is necessary to establish
// Release-Acquire ordering (see WillRunTask()).
// When the task source needs to be queued, either because the current task
// yielded or because of NotifyConcurrencyIncrease(), one of the following is
......@@ -169,7 +203,7 @@ bool JobTaskSource::DidProcessTask(TaskSource::Transaction* transaction) {
// extra work yet): Incorrectly returning false is fine and the memory
// barrier may be ineffective.
// B) The JobTaskSource() is no longer in the queue: The Release-Acquire
// ordering with WillRunTask() established by |worker_count| ensures that
// ordering with WillRunTask() established by |state_| ensures that
// the upcoming call for GetMaxConcurrency() happens-after any
// NotifyConcurrencyIncrease() that happened-before WillRunTask(). If
// this task completed because it yielded, this barrier guarantees that
......@@ -177,19 +211,17 @@ bool JobTaskSource::DidProcessTask(TaskSource::Transaction* transaction) {
//
// Note that stale values the other way around (incorrectly re-enqueuing) are
// not an issue because the queues support empty task sources.
while (worker_count_before_sub != kInvalidWorkerCount &&
!worker_count_.compare_exchange_weak(
worker_count_before_sub, worker_count_before_sub - 1,
std::memory_order_acquire, std::memory_order_relaxed)) {
}
if (worker_count_before_sub == kInvalidWorkerCount)
const auto state_before_sub = state_.DecrementWorkerCountAcquire();
// A canceled task source should never get re-enqueued.
if (state_before_sub.is_canceled())
return false;
DCHECK_GT(worker_count_before_sub, 0U);
DCHECK_GT(state_before_sub.worker_count(), 0U);
// Re-enqueue the TaskSource if the task ran and the worker count is below the
// max concurrency.
return worker_count_before_sub <= GetMaxConcurrency();
return state_before_sub.worker_count() <= GetMaxConcurrency();
}
SequenceSortKey JobTaskSource::GetSortKey() const {
......@@ -197,12 +229,7 @@ SequenceSortKey JobTaskSource::GetSortKey() const {
}
Optional<Task> JobTaskSource::Clear(TaskSource::Transaction* transaction) {
// Invalidate |worker_count_| so that further calls to WillRunTask() never
// succeed. std::memory_order_relaxed is sufficient because this task source
// never needs to be re-enqueued after Clear().
size_t worker_count_before_store =
worker_count_.exchange(kInvalidWorkerCount, std::memory_order_relaxed);
DCHECK_GT(worker_count_before_store, 0U);
Cancel();
// Nothing is cleared since other workers might still racily run tasks. For
// simplicity, the destructor will take care of it once all references are
// released.
......
......@@ -42,6 +42,10 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// number of worker should be adjusted.
void NotifyConcurrencyIncrease();
// Cancels this JobTaskSource, causing all workers to yield and WillRunTask()
// to return RunStatus::kDisallowed.
void Cancel(TaskSource::Transaction* transaction = nullptr);
// TaskSource:
ExecutionEnvironment GetExecutionEnvironment() override;
size_t GetRemainingConcurrency() const override;
......@@ -50,6 +54,10 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// concurrently.
size_t GetMaxConcurrency() const;
// Returns true if a worker should return from the worker task on the current
// thread ASAP.
bool ShouldYield() const;
#if DCHECK_IS_ON()
size_t GetConcurrencyIncreaseVersion() const;
// Returns true if the concurrency version was updated above
......@@ -58,8 +66,44 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
#endif // DCHECK_IS_ON()
private:
static constexpr size_t kInvalidWorkerCount =
std::numeric_limits<size_t>::max();
// Atomic internal state to track the number of workers running a task from
// this JobTaskSource and whether this JobTaskSource is canceled.
class State {
public:
static constexpr size_t kCanceledMask = 1;
static constexpr size_t kWorkerCountBitOffset = 1;
static constexpr size_t kWorkerCountIncrement = 1 << kWorkerCountBitOffset;
struct Value {
size_t worker_count() const { return value >> kWorkerCountBitOffset; }
// Returns true if canceled.
bool is_canceled() const { return value & kCanceledMask; }
uint32_t value;
};
State();
~State();
// Sets as canceled using std::memory_order_relaxed. Returns the state
// before the operation.
Value Cancel();
// Increments the worker count by 1 if smaller than |max_concurrency| and if
// |!is_canceled()|, using std::memory_order_release, and returns the state
// before the operation. Equivalent to Load() otherwise.
Value TryIncrementWorkerCountRelease(size_t max_concurrency);
// Decrements the worker count by 1 using std::memory_order_acquire. Returns
// the state before the operation.
Value DecrementWorkerCountAcquire();
// Loads and returns the state, using std::memory_order_relaxed.
Value Load() const;
private:
std::atomic<uint32_t> value_{0};
};
~JobTaskSource() override;
......@@ -70,9 +114,8 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
bool DidProcessTask(TaskSource::Transaction* transaction) override;
SequenceSortKey GetSortKey() const override;
// The current number of workers concurrently running tasks from this
// TaskSource.
std::atomic_size_t worker_count_{0U};
// Current atomic state.
State state_;
const Location from_here_;
base::RepeatingCallback<size_t()> max_concurrency_callback_;
......
......@@ -108,6 +108,8 @@ TEST_F(ThreadPoolJobTaskSourceTest, Clear) {
EXPECT_EQ(registered_task_source_d.WillRunTask(),
TaskSource::RunStatus::kAllowedNotSaturated);
EXPECT_FALSE(task_source->ShouldYield());
{
EXPECT_EQ(1U, task_source->GetRemainingConcurrency());
auto task = registered_task_source_c.Clear();
......@@ -116,6 +118,7 @@ TEST_F(ThreadPoolJobTaskSourceTest, Clear) {
EXPECT_EQ(0U, task_source->GetRemainingConcurrency());
}
// The task source shouldn't allow any further tasks after Clear.
EXPECT_TRUE(task_source->ShouldYield());
EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
TaskSource::RunStatus::kDisallowed);
......@@ -131,15 +134,53 @@ TEST_F(ThreadPoolJobTaskSourceTest, Clear) {
std::move(task_a->task).Run();
registered_task_source_a.DidProcessTask();
// A valid outstanding RunStatus can also take & run a task.
// A valid outstanding RunStatus can also take and run a task.
{
auto task = registered_task_source_b.TakeTask();
std::move(task->task).Run();
registered_task_source_b.DidProcessTask();
}
// Sanity check.
}
// Verifies that a job task source doesn't return an "allowed" RunStatus after
// Cancel() is called.
TEST_F(ThreadPoolJobTaskSourceTest, Cancel) {
auto job_task = base::MakeRefCounted<test::MockJobTask>(
DoNothing(), /* num_tasks_to_run */ 3);
scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
FROM_HERE, {ThreadPool(), TaskPriority::BEST_EFFORT},
&pooled_task_runner_delegate_);
auto registered_task_source_a =
RegisteredTaskSource::CreateForTesting(task_source);
EXPECT_EQ(registered_task_source_a.WillRunTask(),
TaskSource::RunStatus::kAllowedNotSaturated);
auto task_a = registered_task_source_a.TakeTask();
auto registered_task_source_b =
RegisteredTaskSource::CreateForTesting(task_source);
EXPECT_EQ(registered_task_source_b.WillRunTask(),
TaskSource::RunStatus::kAllowedNotSaturated);
EXPECT_FALSE(task_source->ShouldYield());
task_source->Cancel();
EXPECT_TRUE(task_source->ShouldYield());
// The task source shouldn't allow any further tasks after Cancel.
EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
TaskSource::RunStatus::kDisallowed);
// A task that was already acquired can still run.
std::move(task_a->task).Run();
registered_task_source_a.DidProcessTask();
// A RegisteredTaskSource that's ready can also take and run a task.
{
auto task = registered_task_source_b.TakeTask();
std::move(task->task).Run();
registered_task_source_b.DidProcessTask();
}
}
// Verifies that multiple tasks can run in parallel up to |max_concurrency|.
......
......@@ -53,6 +53,7 @@ using ThreadGroupNativeType =
#endif
constexpr size_t kMaxTasks = 4;
constexpr size_t kTooManyTasks = 1000;
// By default, tests allow half of the thread group to be used by best-effort
// tasks.
constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
......@@ -654,6 +655,48 @@ TEST_P(ThreadGroupTest, ScheduleJobTaskSourceMultipleTime) {
task_tracker_.FlushForTesting();
}
// Verify that Cancel() on a job stops running the worker task and causes
// current workers to yield.
TEST_P(ThreadGroupTest, CancelJobTaskSource) {
StartThreadGroup();
CheckedLock tasks_running_lock;
std::unique_ptr<ConditionVariable> tasks_running_cv =
tasks_running_lock.CreateConditionVariable();
bool tasks_running = false;
// Schedule a big number of tasks.
auto job_task = base::MakeRefCounted<test::MockJobTask>(
BindLambdaForTesting([&](experimental::JobDelegate* delegate) {
{
CheckedAutoLock auto_lock(tasks_running_lock);
tasks_running = true;
}
tasks_running_cv->Signal();
while (!delegate->ShouldYield()) {
}
}),
/* num_tasks_to_run */ kTooManyTasks);
scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
FROM_HERE, {ThreadPool()}, &mock_pooled_task_runner_delegate_);
mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
// Wait for at least 1 task to start running.
{
CheckedAutoLock auto_lock(tasks_running_lock);
while (!tasks_running)
tasks_running_cv->Wait();
}
// Cancels pending tasks and unblocks running ones.
task_source->Cancel();
// This should not block since the job got cancelled.
task_tracker_.FlushForTesting();
}
// Verify that calling JobTaskSource::NotifyConcurrencyIncrease() (re-)schedule
// tasks with the intended concurrency.
TEST_P(ThreadGroupTest, JobTaskSourceConcurrencyIncrease) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment