Commit 7f8a21e8 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

Reland "Reland "[Jobs API]: Use worker_lock in JobTaskSource.""

This is a reland of fb56fbc5
Issue: Trace event under lock cause PostTask and lock inversion.
Fix: Move trace event outside of lock (to both callsites of
WaitForParticipationOpportunity), this
should give us mostly the same information but scope is a bit bigger.
See patchset 2

Original change's description:
> Reland "[Jobs API]: Use worker_lock in JobTaskSource."
>
> This is a reland of 6eb566d2
>
> Reason for revert: Cause failure in CheckedLockImpl::Acquire
> crbug.com/1099649
> Mark worker_released_condition_ declare_only_used_while_idle
> to prevent priority queue from being acquired in ScopedBlockingCall.
>
> Original change's description:
> > [Jobs API]: Use worker_lock in JobTaskSource.
> >
> > Possible race when Join():
> > - thread A: Join: worker_released_condition_->Wait()
> > - thread C: WillRunTask:
> >               GetMaxConcurrency() returns > 0
> > - thread B: already running, finishes all the work
> >               GetMaxConcurrency() goes 0
> > - thread B: DidProcessTask:
> >               worker_released_condition_->Signal(),
> > - thread A: Join returns (GetMaxConcurrency() is 0)
> > - thread C: TryIncrementWorkerCountFromWorkerRelease
> >               worker count goes 1
> > - thread C: runs worker_task after Join
> >
> > To fix race when Joining, all writes to |state_| are protected by
> > |worker_lock|. Memory ordering is no longer necessary.
> >
> > Alternative: cancel before Join returns with a compare and swap and
> > loop again if new workers.
> >
> > Change-Id: I4e478ffae2bdaec56386739f78de089d0e74e42c
> > Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2248159
> > Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
> > Reviewed-by: Gabriel Charette <gab@chromium.org>
> > Cr-Commit-Position: refs/heads/master@{#781453}
>
> Change-Id: I1c7c0054a52b9b12dd6d0edd049ab2a7912df361
> Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2272942
> Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
> Reviewed-by: Gabriel Charette <gab@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#789526}

Change-Id: I0b08ee9fb7efd6b5cfdf9d171f2a280ccbb1fa5d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2309572Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Cr-Commit-Position: refs/heads/master@{#790489}
parent 30003e96
......@@ -18,6 +18,7 @@
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "base/time/time_override.h"
#include "base/trace_event/base_tracing.h"
namespace base {
namespace internal {
......@@ -34,48 +35,6 @@ static_assert(
} // namespace
// Memory ordering on |state_| operations
//
// The write operation on |state_| in WillRunTask() uses
// std::memory_order_release, matched by std::memory_order_acquire on read
// operations (in DidProcessTask()) to establish a
// Release-Acquire ordering. When a call to WillRunTask() is caused by an
// increase of max concurrency followed by an associated
// NotifyConcurrencyIncrease(), the priority queue lock guarantees an
// happens-after relation with NotifyConcurrencyIncrease(). This ensures that an
// increase of max concurrency that happened-before NotifyConcurrencyIncrease()
// is visible to a read operation that happens-after WillRunTask().
//
// In DidProcessTask(), this is necessary to
// ensure that the task source is always re-enqueued when it needs to. When the
// task source needs to be queued, either because the current task yielded or
// because of NotifyConcurrencyIncrease(), one of the following is true:
// A) DidProcessTask() happens-after WillRunTask():
// T1: Current task returns (because it is done) or yields.
// T2: Increases the value returned by GetMaxConcurrency()
// NotifyConcurrencyIncrease() enqueues the task source
// T3: WillRunTask(), in response to the concurrency increase - Release
// Does not keep the TaskSource in PriorityQueue because it is at max
// concurrency
// T1: DidProcessTask() - Acquire - Because of memory barrier, sees the same
// (or newer) max concurrency as T2
// Re-enqueues the TaskSource because no longer at max concurrency
// Without the memory barrier, T1 may see an outdated max concurrency that
// is lower than the actual max concurrency and won't re-enqueue the
// task source, because it thinks it's already saturated.
// The task source often needs to be re-enqueued if its task
// completed because it yielded and |max_concurrency| wasn't decreased.
// B) DidProcessTask() happens-before WillRunTask():
// T1: Current task returns (because it is done) or yields
// T2: Increases the value returned by GetMaxConcurrency()
// NotifyConcurrencyIncrease() enqueues the task source
// T1: DidProcessTask() - Acquire (ineffective)
// Since the task source is already in the queue, it doesn't matter
// whether T1 re-enqueues the task source or not.
// Note that stale values the other way around can cause incorrectly
// re-enqueuing this task_source, which is not an issue because the queues
// support empty task sources.
JobTaskSource::State::State() = default;
JobTaskSource::State::~State() = default;
......@@ -83,46 +42,19 @@ JobTaskSource::State::Value JobTaskSource::State::Cancel() {
return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)};
}
JobTaskSource::State::Value
JobTaskSource::State::TryIncrementWorkerCountFromWorkerRelease(
size_t max_concurrency) {
uint32_t value_before_add = value_.load(std::memory_order_relaxed);
// std::memory_order_release on success to establish Release-Acquire ordering
// with DecrementWorkerCountAcquire() (see Memory Ordering comment at top of
// the file).
while (!(value_before_add & kCanceledMask) &&
(value_before_add >> kWorkerCountBitOffset) < max_concurrency &&
!value_.compare_exchange_weak(
value_before_add, value_before_add + kWorkerCountIncrement,
std::memory_order_release, std::memory_order_relaxed)) {
}
return {value_before_add};
}
JobTaskSource::State::Value
JobTaskSource::State::DecrementWorkerCountFromWorkerAcquire() {
JobTaskSource::State::Value JobTaskSource::State::DecrementWorkerCount() {
const size_t value_before_sub =
value_.fetch_sub(kWorkerCountIncrement, std::memory_order_acquire);
value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
return {value_before_sub};
}
JobTaskSource::State::Value
JobTaskSource::State::IncrementWorkerCountFromJoiningThread() {
JobTaskSource::State::Value JobTaskSource::State::IncrementWorkerCount() {
size_t value_before_add =
value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed);
return {value_before_add};
}
JobTaskSource::State::Value
JobTaskSource::State::DecrementWorkerCountFromJoiningThread() {
const size_t value_before_sub =
value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
return {value_before_sub};
}
JobTaskSource::State::Value JobTaskSource::State::Load() const {
return {value_.load(std::memory_order_relaxed)};
}
......@@ -167,6 +99,12 @@ JobTaskSource::JobTaskSource(
queue_time_(TimeTicks::Now()),
delegate_(delegate) {
DCHECK(delegate_);
#if DCHECK_IS_ON()
version_condition_for_dcheck_ = worker_lock_.CreateConditionVariable();
// Prevent wait from triggering a ScopedBlockingCall as this would add
// complexity outside this DCHECK-only code.
version_condition_for_dcheck_->declare_only_used_while_idle();
#endif // DCHECK_IS_ON()
}
JobTaskSource::~JobTaskSource() {
......@@ -179,15 +117,14 @@ ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
}
bool JobTaskSource::WillJoin() {
{
CheckedAutoLock auto_lock(lock_);
DCHECK(!worker_released_condition_); // This may only be called once.
worker_released_condition_ = lock_.CreateConditionVariable();
}
// std::memory_order_relaxed on |worker_count_| is sufficient because call to
// GetMaxConcurrency() is used for a best effort early exit. Stale values will
// only cause WaitForParticipationOpportunity() to be called.
const auto state_before_add = state_.IncrementWorkerCountFromJoiningThread();
TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
CheckedAutoLock auto_lock(worker_lock_);
DCHECK(!worker_released_condition_); // This may only be called once.
worker_released_condition_ = worker_lock_.CreateConditionVariable();
// Prevent wait from triggering a ScopedBlockingCall as this would cause
// |ThreadGroup::lock_| to be acquired, causing lock inversion.
worker_released_condition_->declare_only_used_while_idle();
const auto state_before_add = state_.IncrementWorkerCount();
if (!state_before_add.is_canceled() &&
state_before_add.worker_count() < GetMaxConcurrency()) {
......@@ -200,33 +137,34 @@ bool JobTaskSource::RunJoinTask() {
JobDelegate job_delegate{this, nullptr};
worker_task_.Run(&job_delegate);
// std::memory_order_relaxed on |worker_count_| is sufficient because the call
// to GetMaxConcurrency() is used for a best effort early exit. Stale values
// will only cause WaitForParticipationOpportunity() to be called.
const auto state = state_.Load();
// It is safe to read |state_| without a lock since this variable is atomic
// and the call to GetMaxConcurrency() is used for a best effort early exit.
// Stale values will only cause WaitForParticipationOpportunity() to be
// called.
const auto state = TS_UNCHECKED_READ(state_).Load();
if (!state.is_canceled() && state.worker_count() <= GetMaxConcurrency())
return true;
TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
CheckedAutoLock auto_lock(worker_lock_);
return WaitForParticipationOpportunity();
}
void JobTaskSource::Cancel(TaskSource::Transaction* transaction) {
CheckedAutoLock auto_lock(worker_lock_);
// Sets the kCanceledMask bit on |state_| so that further calls to
// WillRunTask() never succeed. std::memory_order_relaxed is sufficient
// because this task source never needs to be re-enqueued after Cancel().
state_.Cancel();
#if DCHECK_IS_ON()
{
AutoLock auto_lock(version_lock_);
++increase_version_;
version_condition_.Broadcast();
}
version_condition_for_dcheck_->Broadcast();
#endif // DCHECK_IS_ON()
}
// EXCLUSIVE_LOCK_REQUIRED(worker_lock_)
bool JobTaskSource::WaitForParticipationOpportunity() {
CheckedAutoLock auto_lock(lock_);
DCHECK(!join_flag_.IsWaiting());
// std::memory_order_relaxed is sufficient because no other state is
......@@ -259,16 +197,19 @@ bool JobTaskSource::WaitForParticipationOpportunity() {
// Only the joining thread remains.
DCHECK_EQ(state.worker_count(), 1U);
DCHECK(state.is_canceled() || max_concurrency == 0U);
state_.DecrementWorkerCountFromJoiningThread();
state_.DecrementWorkerCount();
return false;
}
TaskSource::RunStatus JobTaskSource::WillRunTask() {
CheckedAutoLock auto_lock(worker_lock_);
const size_t max_concurrency = GetMaxConcurrency();
// std::memory_order_release on success to establish Release-Acquire ordering
// with read operations (see Memory Ordering comment at top of the file).
const auto state_before_add =
state_.TryIncrementWorkerCountFromWorkerRelease(max_concurrency);
auto state_before_add = state_.Load();
if (!state_before_add.is_canceled() &&
state_before_add.worker_count() < max_concurrency) {
state_before_add = state_.IncrementWorkerCount();
}
// Don't allow this worker to run the task if either:
// A) |state_| was canceled.
......@@ -289,9 +230,9 @@ TaskSource::RunStatus JobTaskSource::WillRunTask() {
}
size_t JobTaskSource::GetRemainingConcurrency() const {
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with GetRemainingConcurrency().
const auto state = state_.Load();
// It is safe to read |state_| without a lock since this variable is atomic,
// and no other state is synchronized with GetRemainingConcurrency().
const auto state = TS_UNCHECKED_READ(state_).Load();
const size_t max_concurrency = GetMaxConcurrency();
// Avoid underflows.
if (state.is_canceled() || state.worker_count() > max_concurrency)
......@@ -302,9 +243,9 @@ size_t JobTaskSource::GetRemainingConcurrency() const {
void JobTaskSource::NotifyConcurrencyIncrease() {
#if DCHECK_IS_ON()
{
AutoLock auto_lock(version_lock_);
CheckedAutoLock auto_lock(worker_lock_);
++increase_version_;
version_condition_.Broadcast();
version_condition_for_dcheck_->Broadcast();
}
#endif // DCHECK_IS_ON()
......@@ -316,7 +257,7 @@ void JobTaskSource::NotifyConcurrencyIncrease() {
{
// Lock is taken to access |join_flag_| below and signal
// |worker_released_condition_|.
CheckedAutoLock auto_lock(lock_);
CheckedAutoLock auto_lock(worker_lock_);
if (join_flag_.ShouldWorkerSignal())
worker_released_condition_->Signal();
}
......@@ -358,22 +299,22 @@ void JobTaskSource::ReleaseTaskId(uint8_t task_id) {
}
bool JobTaskSource::ShouldYield() {
// It is safe to read |join_flag_| without a lock since this
// variable is atomic, keeping in mind that threads may not immediately see
// It is safe to read |join_flag_| and |state_| without a lock since these
// variables are atomic, keeping in mind that threads may not immediately see
// the new value when it is updated.
return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() ||
state_.Load().is_canceled();
TS_UNCHECKED_READ(state_).Load().is_canceled();
}
#if DCHECK_IS_ON()
size_t JobTaskSource::GetConcurrencyIncreaseVersion() const {
AutoLock auto_lock(version_lock_);
CheckedAutoLock auto_lock(worker_lock_);
return increase_version_;
}
bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
AutoLock auto_lock(version_lock_);
CheckedAutoLock auto_lock(worker_lock_);
constexpr TimeDelta timeout = TimeDelta::FromSeconds(1);
const base::TimeTicks start_time = subtle::TimeTicksNowIgnoringOverride();
do {
......@@ -384,7 +325,7 @@ bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
// Waiting is acceptable because it is in DCHECK-only code.
ScopedAllowBaseSyncPrimitivesOutsideBlockingScope
allow_base_sync_primitives;
version_condition_.TimedWait(timeout);
version_condition_for_dcheck_->TimedWait(timeout);
} while (subtle::TimeTicksNowIgnoringOverride() - start_time < timeout);
return false;
}
......@@ -394,21 +335,16 @@ bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
Task JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
// JobTaskSource members are not lock-protected so no need to acquire a lock
// if |transaction| is nullptr.
DCHECK_GT(state_.Load().worker_count(), 0U);
DCHECK_GT(TS_UNCHECKED_READ(state_).Load().worker_count(), 0U);
DCHECK(primary_task_);
return Task(from_here_, primary_task_, TimeDelta());
}
bool JobTaskSource::DidProcessTask(TaskSource::Transaction* transaction) {
bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) {
// Lock is needed to access |join_flag_| below and signal
// |worker_released_condition_|. If |transaction|, then |lock_| is already
// taken.
CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
AnnotateAcquiredLockAlias annotate(lock_, lock_);
// std::memory_order_acquire to establish Release-Acquire ordering with
// WillRunTask() (see Memory Ordering comment at top of the file).
const auto state_before_sub = state_.DecrementWorkerCountFromWorkerAcquire();
// |worker_released_condition_|.
CheckedAutoLock auto_lock(worker_lock_);
const auto state_before_sub = state_.DecrementWorkerCount();
if (join_flag_.ShouldWorkerSignal())
worker_released_condition_->Signal();
......
......@@ -15,7 +15,7 @@
#include "base/macros.h"
#include "base/optional.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/task/common/checked_lock.h"
#include "base/task/post_job.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/sequence_sort_key.h"
......@@ -91,7 +91,9 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
private:
// Atomic internal state to track the number of workers running a task from
// this JobTaskSource and whether this JobTaskSource is canceled.
// this JobTaskSource and whether this JobTaskSource is canceled. All
// operations are performed with std::memory_order_relaxed as State is only
// ever modified under a lock or read atomically (optimistic read).
class State {
public:
static constexpr size_t kCanceledMask = 1;
......@@ -109,28 +111,17 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
State();
~State();
// Sets as canceled using std::memory_order_relaxed. Returns the state
// Sets as canceled. Returns the state
// before the operation.
Value Cancel();
// Increments the worker count by 1 if smaller than |max_concurrency| and if
// |!is_canceled()|, using std::memory_order_release, and returns the state
// before the operation. Equivalent to Load() otherwise.
Value TryIncrementWorkerCountFromWorkerRelease(size_t max_concurrency);
// Increments the worker count by 1. Returns the state before the operation.
Value IncrementWorkerCount();
// Decrements the worker count by 1 using std::memory_order_acquire. Returns
// the state before the operation.
Value DecrementWorkerCountFromWorkerAcquire();
// Decrements the worker count by 1. Returns the state before the operation.
Value DecrementWorkerCount();
// Increments the worker count by 1 using std::memory_order_relaxed. Returns
// the state before the operation.
Value IncrementWorkerCountFromJoiningThread();
// Decrements the worker count by 1 using std::memory_order_relaxed. Returns
// the state before the operation.
Value DecrementWorkerCountFromJoiningThread();
// Loads and returns the state, using std::memory_order_relaxed.
// Loads and returns the state.
Value Load() const;
private:
......@@ -186,7 +177,7 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// DidProcessTask()). Returns true if the joining thread should run a task, or
// false if joining was completed and all other workers returned because
// either there's no work remaining or Job was cancelled.
bool WaitForParticipationOpportunity();
bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_);
// TaskSource:
RunStatus WillRunTask() override;
......@@ -195,15 +186,20 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
bool DidProcessTask(TaskSource::Transaction* transaction) override;
SequenceSortKey GetSortKey() const override;
// Current atomic state.
State state_;
std::atomic<uint32_t> assigned_task_ids_{0};
// Synchronizes access to workers state.
mutable CheckedLock worker_lock_{UniversalSuccessor()};
// Current atomic state (atomic despite the lock to allow optimistic reads
// without the lock).
State state_ GUARDED_BY(worker_lock_);
// Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
// hence the use of atomics.
JoinFlag join_flag_ GUARDED_BY(lock_);
JoinFlag join_flag_ GUARDED_BY(worker_lock_);
// Signaled when |join_flag_| is kWaiting* and a worker returns.
std::unique_ptr<ConditionVariable> worker_released_condition_
GUARDED_BY(lock_);
GUARDED_BY(worker_lock_);
std::atomic<uint32_t> assigned_task_ids_{0};
const Location from_here_;
RepeatingCallback<size_t()> max_concurrency_callback_;
......@@ -217,12 +213,10 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
PooledTaskRunnerDelegate* delegate_;
#if DCHECK_IS_ON()
// Synchronizes accesses to |increase_version_|.
mutable Lock version_lock_;
// Signaled whenever increase_version_ is updated.
ConditionVariable version_condition_{&version_lock_};
// Signaled whenever |increase_version_| is updated.
std::unique_ptr<ConditionVariable> version_condition_for_dcheck_;
// Incremented every time max concurrency is increased.
size_t increase_version_ GUARDED_BY(version_lock_) = 0;
size_t increase_version_ GUARDED_BY(worker_lock_) = 0;
#endif // DCHECK_IS_ON()
DISALLOW_COPY_AND_ASSIGN(JobTaskSource);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment