Commit fb56fbc5 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

Reland "[Jobs API]: Use worker_lock in JobTaskSource."

This is a reland of 6eb566d2

Reason for revert: Cause failure in CheckedLockImpl::Acquire
crbug.com/1099649
Mark worker_released_condition_ declare_only_used_while_idle
to prevent priority queue from being acquired in ScopedBlockingCall.

Original change's description:
> [Jobs API]: Use worker_lock in JobTaskSource.
>
> Possible race when Join():
> - thread A: Join: worker_released_condition_->Wait()
> - thread C: WillRunTask:
>               GetMaxConcurrency() returns > 0
> - thread B: already running, finishes all the work
>               GetMaxConcurrency() goes 0
> - thread B: DidProcessTask:
>               worker_released_condition_->Signal(),
> - thread A: Join returns (GetMaxConcurrency() is 0)
> - thread C: TryIncrementWorkerCountFromWorkerRelease
>               worker count goes 1
> - thread C: runs worker_task after Join
>
> To fix race when Joining, all writes to |state_| are protected by
> |worker_lock|. Memory ordering is no longer necessary.
>
> Alternative: cancel before Join returns with a compare and swap and
> loop again if new workers.
>
> Change-Id: I4e478ffae2bdaec56386739f78de089d0e74e42c
> Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2248159
> Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
> Reviewed-by: Gabriel Charette <gab@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#781453}

Change-Id: I1c7c0054a52b9b12dd6d0edd049ab2a7912df361
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2272942
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#789526}
parent d6d31fa8
......@@ -18,6 +18,7 @@
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "base/time/time_override.h"
#include "base/trace_event/base_tracing.h"
namespace base {
namespace internal {
......@@ -34,48 +35,6 @@ static_assert(
} // namespace
// Memory ordering on |state_| operations
//
// The write operation on |state_| in WillRunTask() uses
// std::memory_order_release, matched by std::memory_order_acquire on read
// operations (in DidProcessTask()) to establish a
// Release-Acquire ordering. When a call to WillRunTask() is caused by an
// increase of max concurrency followed by an associated
// NotifyConcurrencyIncrease(), the priority queue lock guarantees an
// happens-after relation with NotifyConcurrencyIncrease(). This ensures that an
// increase of max concurrency that happened-before NotifyConcurrencyIncrease()
// is visible to a read operation that happens-after WillRunTask().
//
// In DidProcessTask(), this is necessary to
// ensure that the task source is always re-enqueued when it needs to. When the
// task source needs to be queued, either because the current task yielded or
// because of NotifyConcurrencyIncrease(), one of the following is true:
// A) DidProcessTask() happens-after WillRunTask():
// T1: Current task returns (because it is done) or yields.
// T2: Increases the value returned by GetMaxConcurrency()
// NotifyConcurrencyIncrease() enqueues the task source
// T3: WillRunTask(), in response to the concurrency increase - Release
// Does not keep the TaskSource in PriorityQueue because it is at max
// concurrency
// T1: DidProcessTask() - Acquire - Because of memory barrier, sees the same
// (or newer) max concurrency as T2
// Re-enqueues the TaskSource because no longer at max concurrency
// Without the memory barrier, T1 may see an outdated max concurrency that
// is lower than the actual max concurrency and won't re-enqueue the
// task source, because it thinks it's already saturated.
// The task source often needs to be re-enqueued if its task
// completed because it yielded and |max_concurrency| wasn't decreased.
// B) DidProcessTask() happens-before WillRunTask():
// T1: Current task returns (because it is done) or yields
// T2: Increases the value returned by GetMaxConcurrency()
// NotifyConcurrencyIncrease() enqueues the task source
// T1: DidProcessTask() - Acquire (ineffective)
// Since the task source is already in the queue, it doesn't matter
// whether T1 re-enqueues the task source or not.
// Note that stale values the other way around can cause incorrectly
// re-enqueuing this task_source, which is not an issue because the queues
// support empty task sources.
JobTaskSource::State::State() = default;
JobTaskSource::State::~State() = default;
......@@ -83,46 +42,19 @@ JobTaskSource::State::Value JobTaskSource::State::Cancel() {
return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)};
}
JobTaskSource::State::Value
JobTaskSource::State::TryIncrementWorkerCountFromWorkerRelease(
size_t max_concurrency) {
uint32_t value_before_add = value_.load(std::memory_order_relaxed);
// std::memory_order_release on success to establish Release-Acquire ordering
// with DecrementWorkerCountAcquire() (see Memory Ordering comment at top of
// the file).
while (!(value_before_add & kCanceledMask) &&
(value_before_add >> kWorkerCountBitOffset) < max_concurrency &&
!value_.compare_exchange_weak(
value_before_add, value_before_add + kWorkerCountIncrement,
std::memory_order_release, std::memory_order_relaxed)) {
}
return {value_before_add};
}
JobTaskSource::State::Value
JobTaskSource::State::DecrementWorkerCountFromWorkerAcquire() {
JobTaskSource::State::Value JobTaskSource::State::DecrementWorkerCount() {
const size_t value_before_sub =
value_.fetch_sub(kWorkerCountIncrement, std::memory_order_acquire);
value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
return {value_before_sub};
}
JobTaskSource::State::Value
JobTaskSource::State::IncrementWorkerCountFromJoiningThread() {
JobTaskSource::State::Value JobTaskSource::State::IncrementWorkerCount() {
size_t value_before_add =
value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed);
return {value_before_add};
}
JobTaskSource::State::Value
JobTaskSource::State::DecrementWorkerCountFromJoiningThread() {
const size_t value_before_sub =
value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
return {value_before_sub};
}
JobTaskSource::State::Value JobTaskSource::State::Load() const {
return {value_.load(std::memory_order_relaxed)};
}
......@@ -167,6 +99,12 @@ JobTaskSource::JobTaskSource(
queue_time_(TimeTicks::Now()),
delegate_(delegate) {
DCHECK(delegate_);
#if DCHECK_IS_ON()
version_condition_for_dcheck_ = worker_lock_.CreateConditionVariable();
// Prevent wait from triggering a ScopedBlockingCall as this would add
// complexity outside this DCHECK-only code.
version_condition_for_dcheck_->declare_only_used_while_idle();
#endif // DCHECK_IS_ON()
}
JobTaskSource::~JobTaskSource() {
......@@ -179,15 +117,13 @@ ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
}
bool JobTaskSource::WillJoin() {
{
CheckedAutoLock auto_lock(lock_);
CheckedAutoLock auto_lock(worker_lock_);
DCHECK(!worker_released_condition_); // This may only be called once.
worker_released_condition_ = lock_.CreateConditionVariable();
}
// std::memory_order_relaxed on |worker_count_| is sufficient because call to
// GetMaxConcurrency() is used for a best effort early exit. Stale values will
// only cause WaitForParticipationOpportunity() to be called.
const auto state_before_add = state_.IncrementWorkerCountFromJoiningThread();
worker_released_condition_ = worker_lock_.CreateConditionVariable();
// Prevent wait from triggering a ScopedBlockingCall as this would cause
// |ThreadGroup::lock_| to be acquired, causing lock inversion.
worker_released_condition_->declare_only_used_while_idle();
const auto state_before_add = state_.IncrementWorkerCount();
if (!state_before_add.is_canceled() &&
state_before_add.worker_count() < GetMaxConcurrency()) {
......@@ -200,33 +136,33 @@ bool JobTaskSource::RunJoinTask() {
JobDelegate job_delegate{this, nullptr};
worker_task_.Run(&job_delegate);
// std::memory_order_relaxed on |worker_count_| is sufficient because the call
// to GetMaxConcurrency() is used for a best effort early exit. Stale values
// will only cause WaitForParticipationOpportunity() to be called.
const auto state = state_.Load();
// It is safe to read |state_| without a lock since this variable is atomic
// and the call to GetMaxConcurrency() is used for a best effort early exit.
// Stale values will only cause WaitForParticipationOpportunity() to be
// called.
const auto state = TS_UNCHECKED_READ(state_).Load();
if (!state.is_canceled() && state.worker_count() <= GetMaxConcurrency())
return true;
CheckedAutoLock auto_lock(worker_lock_);
return WaitForParticipationOpportunity();
}
void JobTaskSource::Cancel(TaskSource::Transaction* transaction) {
CheckedAutoLock auto_lock(worker_lock_);
// Sets the kCanceledMask bit on |state_| so that further calls to
// WillRunTask() never succeed. std::memory_order_relaxed is sufficient
// because this task source never needs to be re-enqueued after Cancel().
state_.Cancel();
#if DCHECK_IS_ON()
{
AutoLock auto_lock(version_lock_);
++increase_version_;
version_condition_.Broadcast();
}
version_condition_for_dcheck_->Broadcast();
#endif // DCHECK_IS_ON()
}
// EXCLUSIVE_LOCK_REQUIRED(worker_lock_)
bool JobTaskSource::WaitForParticipationOpportunity() {
CheckedAutoLock auto_lock(lock_);
DCHECK(!join_flag_.IsWaiting());
// std::memory_order_relaxed is sufficient because no other state is
......@@ -248,6 +184,7 @@ bool JobTaskSource::WaitForParticipationOpportunity() {
// |lock_| is taken and |worker_released_condition_| signaled if necessary:
// 1- In DidProcessTask(), after worker count is decremented.
// 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase.
TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
worker_released_condition_->Wait();
state = state_.Load();
max_concurrency = GetMaxConcurrency();
......@@ -259,16 +196,19 @@ bool JobTaskSource::WaitForParticipationOpportunity() {
// Only the joining thread remains.
DCHECK_EQ(state.worker_count(), 1U);
DCHECK(state.is_canceled() || max_concurrency == 0U);
state_.DecrementWorkerCountFromJoiningThread();
state_.DecrementWorkerCount();
return false;
}
TaskSource::RunStatus JobTaskSource::WillRunTask() {
CheckedAutoLock auto_lock(worker_lock_);
const size_t max_concurrency = GetMaxConcurrency();
// std::memory_order_release on success to establish Release-Acquire ordering
// with read operations (see Memory Ordering comment at top of the file).
const auto state_before_add =
state_.TryIncrementWorkerCountFromWorkerRelease(max_concurrency);
auto state_before_add = state_.Load();
if (!state_before_add.is_canceled() &&
state_before_add.worker_count() < max_concurrency) {
state_before_add = state_.IncrementWorkerCount();
}
// Don't allow this worker to run the task if either:
// A) |state_| was canceled.
......@@ -289,9 +229,9 @@ TaskSource::RunStatus JobTaskSource::WillRunTask() {
}
size_t JobTaskSource::GetRemainingConcurrency() const {
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with GetRemainingConcurrency().
const auto state = state_.Load();
// It is safe to read |state_| without a lock since this variable is atomic,
// and no other state is synchronized with GetRemainingConcurrency().
const auto state = TS_UNCHECKED_READ(state_).Load();
const size_t max_concurrency = GetMaxConcurrency();
// Avoid underflows.
if (state.is_canceled() || state.worker_count() > max_concurrency)
......@@ -302,9 +242,9 @@ size_t JobTaskSource::GetRemainingConcurrency() const {
void JobTaskSource::NotifyConcurrencyIncrease() {
#if DCHECK_IS_ON()
{
AutoLock auto_lock(version_lock_);
CheckedAutoLock auto_lock(worker_lock_);
++increase_version_;
version_condition_.Broadcast();
version_condition_for_dcheck_->Broadcast();
}
#endif // DCHECK_IS_ON()
......@@ -316,7 +256,7 @@ void JobTaskSource::NotifyConcurrencyIncrease() {
{
// Lock is taken to access |join_flag_| below and signal
// |worker_released_condition_|.
CheckedAutoLock auto_lock(lock_);
CheckedAutoLock auto_lock(worker_lock_);
if (join_flag_.ShouldWorkerSignal())
worker_released_condition_->Signal();
}
......@@ -358,22 +298,22 @@ void JobTaskSource::ReleaseTaskId(uint8_t task_id) {
}
bool JobTaskSource::ShouldYield() {
// It is safe to read |join_flag_| without a lock since this
// variable is atomic, keeping in mind that threads may not immediately see
// It is safe to read |join_flag_| and |state_| without a lock since these
// variables are atomic, keeping in mind that threads may not immediately see
// the new value when it is updated.
return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() ||
state_.Load().is_canceled();
TS_UNCHECKED_READ(state_).Load().is_canceled();
}
#if DCHECK_IS_ON()
size_t JobTaskSource::GetConcurrencyIncreaseVersion() const {
AutoLock auto_lock(version_lock_);
CheckedAutoLock auto_lock(worker_lock_);
return increase_version_;
}
bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
AutoLock auto_lock(version_lock_);
CheckedAutoLock auto_lock(worker_lock_);
constexpr TimeDelta timeout = TimeDelta::FromSeconds(1);
const base::TimeTicks start_time = subtle::TimeTicksNowIgnoringOverride();
do {
......@@ -384,7 +324,7 @@ bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
// Waiting is acceptable because it is in DCHECK-only code.
ScopedAllowBaseSyncPrimitivesOutsideBlockingScope
allow_base_sync_primitives;
version_condition_.TimedWait(timeout);
version_condition_for_dcheck_->TimedWait(timeout);
} while (subtle::TimeTicksNowIgnoringOverride() - start_time < timeout);
return false;
}
......@@ -394,21 +334,16 @@ bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
Task JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
// JobTaskSource members are not lock-protected so no need to acquire a lock
// if |transaction| is nullptr.
DCHECK_GT(state_.Load().worker_count(), 0U);
DCHECK_GT(TS_UNCHECKED_READ(state_).Load().worker_count(), 0U);
DCHECK(primary_task_);
return Task(from_here_, primary_task_, TimeDelta());
}
bool JobTaskSource::DidProcessTask(TaskSource::Transaction* transaction) {
bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) {
// Lock is needed to access |join_flag_| below and signal
// |worker_released_condition_|. If |transaction|, then |lock_| is already
// taken.
CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
AnnotateAcquiredLockAlias annotate(lock_, lock_);
// std::memory_order_acquire to establish Release-Acquire ordering with
// WillRunTask() (see Memory Ordering comment at top of the file).
const auto state_before_sub = state_.DecrementWorkerCountFromWorkerAcquire();
// |worker_released_condition_|.
CheckedAutoLock auto_lock(worker_lock_);
const auto state_before_sub = state_.DecrementWorkerCount();
if (join_flag_.ShouldWorkerSignal())
worker_released_condition_->Signal();
......
......@@ -15,7 +15,7 @@
#include "base/macros.h"
#include "base/optional.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/task/common/checked_lock.h"
#include "base/task/post_job.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/sequence_sort_key.h"
......@@ -91,7 +91,9 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
private:
// Atomic internal state to track the number of workers running a task from
// this JobTaskSource and whether this JobTaskSource is canceled.
// this JobTaskSource and whether this JobTaskSource is canceled. All
// operations are performed with std::memory_order_relaxed as State is only
// ever modified under a lock or read atomically (optimistic read).
class State {
public:
static constexpr size_t kCanceledMask = 1;
......@@ -109,28 +111,17 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
State();
~State();
// Sets as canceled using std::memory_order_relaxed. Returns the state
// Sets as canceled. Returns the state
// before the operation.
Value Cancel();
// Increments the worker count by 1 if smaller than |max_concurrency| and if
// |!is_canceled()|, using std::memory_order_release, and returns the state
// before the operation. Equivalent to Load() otherwise.
Value TryIncrementWorkerCountFromWorkerRelease(size_t max_concurrency);
// Increments the worker count by 1. Returns the state before the operation.
Value IncrementWorkerCount();
// Decrements the worker count by 1 using std::memory_order_acquire. Returns
// the state before the operation.
Value DecrementWorkerCountFromWorkerAcquire();
// Decrements the worker count by 1. Returns the state before the operation.
Value DecrementWorkerCount();
// Increments the worker count by 1 using std::memory_order_relaxed. Returns
// the state before the operation.
Value IncrementWorkerCountFromJoiningThread();
// Decrements the worker count by 1 using std::memory_order_relaxed. Returns
// the state before the operation.
Value DecrementWorkerCountFromJoiningThread();
// Loads and returns the state, using std::memory_order_relaxed.
// Loads and returns the state.
Value Load() const;
private:
......@@ -186,7 +177,7 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// DidProcessTask()). Returns true if the joining thread should run a task, or
// false if joining was completed and all other workers returned because
// either there's no work remaining or Job was cancelled.
bool WaitForParticipationOpportunity();
bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_);
// TaskSource:
RunStatus WillRunTask() override;
......@@ -195,15 +186,20 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
bool DidProcessTask(TaskSource::Transaction* transaction) override;
SequenceSortKey GetSortKey() const override;
// Current atomic state.
State state_;
std::atomic<uint32_t> assigned_task_ids_{0};
// Synchronizes access to workers state.
mutable CheckedLock worker_lock_{UniversalSuccessor()};
// Current atomic state (atomic despite the lock to allow optimistic reads
// without the lock).
State state_ GUARDED_BY(worker_lock_);
// Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
// hence the use of atomics.
JoinFlag join_flag_ GUARDED_BY(lock_);
JoinFlag join_flag_ GUARDED_BY(worker_lock_);
// Signaled when |join_flag_| is kWaiting* and a worker returns.
std::unique_ptr<ConditionVariable> worker_released_condition_
GUARDED_BY(lock_);
GUARDED_BY(worker_lock_);
std::atomic<uint32_t> assigned_task_ids_{0};
const Location from_here_;
RepeatingCallback<size_t()> max_concurrency_callback_;
......@@ -217,12 +213,10 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
PooledTaskRunnerDelegate* delegate_;
#if DCHECK_IS_ON()
// Synchronizes accesses to |increase_version_|.
mutable Lock version_lock_;
// Signaled whenever increase_version_ is updated.
ConditionVariable version_condition_{&version_lock_};
// Signaled whenever |increase_version_| is updated.
std::unique_ptr<ConditionVariable> version_condition_for_dcheck_;
// Incremented every time max concurrency is increased.
size_t increase_version_ GUARDED_BY(version_lock_) = 0;
size_t increase_version_ GUARDED_BY(worker_lock_) = 0;
#endif // DCHECK_IS_ON()
DISALLOW_COPY_AND_ASSIGN(JobTaskSource);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment