Commit 6eb566d2 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[Jobs API]: Use worker_lock in JobTaskSource.

Possible race when Join():
- thread A: Join: worker_released_condition_->Wait()
- thread C: WillRunTask:
              GetMaxConcurrency() returns > 0
- thread B: already running, finishes all the work
              GetMaxConcurrency() goes 0
- thread B: DidProcessTask:
              worker_released_condition_->Signal(),
- thread A: Join returns (GetMaxConcurrency() is 0)
- thread C: TryIncrementWorkerCountFromWorkerRelease
              worker count goes 1
- thread C: runs worker_task after Join

To fix race when Joining, all writes to |state_| are protected by
|worker_lock|. Memory ordering is no longer necessary.

Alternative: cancel before Join returns with a compare and swap and
loop again if new workers.

Change-Id: I4e478ffae2bdaec56386739f78de089d0e74e42c
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2248159
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#781453}
parent fe6083c1
This diff is collapsed.
......@@ -15,7 +15,7 @@
#include "base/macros.h"
#include "base/optional.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/task/common/checked_lock.h"
#include "base/task/post_job.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/sequence_sort_key.h"
......@@ -88,7 +88,9 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
private:
// Atomic internal state to track the number of workers running a task from
// this JobTaskSource and whether this JobTaskSource is canceled.
// this JobTaskSource and whether this JobTaskSource is canceled. All
// operations are performed with std::memory_order_relaxed as State is only
// ever modified under a lock or read atomically (optimistic read).
class State {
public:
static constexpr size_t kCanceledMask = 1;
......@@ -106,28 +108,17 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
State();
~State();
// Sets as canceled using std::memory_order_relaxed. Returns the state
// Sets as canceled. Returns the state
// before the operation.
Value Cancel();
// Increments the worker count by 1 if smaller than |max_concurrency| and if
// |!is_canceled()|, using std::memory_order_release, and returns the state
// before the operation. Equivalent to Load() otherwise.
Value TryIncrementWorkerCountFromWorkerRelease(size_t max_concurrency);
// Increments the worker count by 1. Returns the state before the operation.
Value IncrementWorkerCount();
// Decrements the worker count by 1 using std::memory_order_acquire. Returns
// the state before the operation.
Value DecrementWorkerCountFromWorkerAcquire();
// Decrements the worker count by 1. Returns the state before the operation.
Value DecrementWorkerCount();
// Increments the worker count by 1 using std::memory_order_relaxed. Returns
// the state before the operation.
Value IncrementWorkerCountFromJoiningThread();
// Decrements the worker count by 1 using std::memory_order_relaxed. Returns
// the state before the operation.
Value DecrementWorkerCountFromJoiningThread();
// Loads and returns the state, using std::memory_order_relaxed.
// Loads and returns the state.
Value Load() const;
private:
......@@ -183,7 +174,7 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// DidProcessTask()). Returns true if the joining thread should run a task, or
// false if joining was completed and all other workers returned because
// either there's no work remaining or Job was cancelled.
bool WaitForParticipationOpportunity();
bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_);
// TaskSource:
RunStatus WillRunTask() override;
......@@ -192,14 +183,18 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
bool DidProcessTask(TaskSource::Transaction* transaction) override;
SequenceSortKey GetSortKey() const override;
// Current atomic state.
State state_;
// Synchronizes access to workers state.
mutable CheckedLock worker_lock_{UniversalSuccessor()};
// Current atomic state (atomic despite the lock to allow optimistic reads
// without the lock).
State state_ GUARDED_BY(worker_lock_);
// Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
// hence the use of atomics.
JoinFlag join_flag_ GUARDED_BY(lock_);
JoinFlag join_flag_ GUARDED_BY(worker_lock_);
// Signaled when |join_flag_| is kWaiting* and a worker returns.
std::unique_ptr<ConditionVariable> worker_released_condition_
GUARDED_BY(lock_);
GUARDED_BY(worker_lock_);
const Location from_here_;
RepeatingCallback<size_t()> max_concurrency_callback_;
......@@ -213,12 +208,10 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
PooledTaskRunnerDelegate* delegate_;
#if DCHECK_IS_ON()
// Synchronizes accesses to |increase_version_|.
mutable Lock version_lock_;
// Signaled whenever increase_version_ is updated.
ConditionVariable version_condition_{&version_lock_};
// Signaled whenever |increase_version_| is updated.
std::unique_ptr<ConditionVariable> version_condition_for_dcheck_;
// Incremented every time max concurrency is increased.
size_t increase_version_ GUARDED_BY(version_lock_) = 0;
size_t increase_version_ GUARDED_BY(worker_lock_) = 0;
#endif // DCHECK_IS_ON()
DISALLOW_COPY_AND_ASSIGN(JobTaskSource);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment