Commit f91d7a0b authored by Etienne Pierre-Doray's avatar Etienne Pierre-Doray Committed by Commit Bot

[Jobs]: Follow-up on "Expose worker count to Job users"

Addressing gab comments on
https://chromium-review.googlesource.com/c/chromium/src/+/2304972

Change-Id: I4ac6d2378bdf509c3c606d0bafab965212f07638
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2390860
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#806199}
parent effd7ca0
...@@ -126,7 +126,7 @@ void JobHandle::Detach() { ...@@ -126,7 +126,7 @@ void JobHandle::Detach() {
JobHandle PostJob(const Location& from_here, JobHandle PostJob(const Location& from_here,
const TaskTraits& traits, const TaskTraits& traits,
RepeatingCallback<void(JobDelegate*)> worker_task, RepeatingCallback<void(JobDelegate*)> worker_task,
RepeatingCallback<size_t(size_t)> max_concurrency_callback) { MaxConcurrencyCallback max_concurrency_callback) {
DCHECK(ThreadPoolInstance::Get()) DCHECK(ThreadPoolInstance::Get())
<< "Ref. Prerequisite section of post_task.h.\n\n" << "Ref. Prerequisite section of post_task.h.\n\n"
"Hint: if this is in a unit test, you're likely merely missing a " "Hint: if this is in a unit test, you're likely merely missing a "
......
...@@ -125,6 +125,15 @@ class BASE_EXPORT JobHandle { ...@@ -125,6 +125,15 @@ class BASE_EXPORT JobHandle {
DISALLOW_COPY_AND_ASSIGN(JobHandle); DISALLOW_COPY_AND_ASSIGN(JobHandle);
}; };
// Callback used in PostJob() to control the maximum number of threads calling
// the worker task concurrently.
// Returns the maximum number of threads which may call a job's worker task
// concurrently. |worker_count| is the number of threads currently assigned to
// this job which some callers may need to determine their return value.
using MaxConcurrencyCallback =
RepeatingCallback<size_t(size_t /*worker_count*/)>;
// Posts a repeating |worker_task| with specific |traits| to run in parallel on // Posts a repeating |worker_task| with specific |traits| to run in parallel on
// base::ThreadPool. // base::ThreadPool.
// Returns a JobHandle associated with the Job, which can be joined, canceled or // Returns a JobHandle associated with the Job, which can be joined, canceled or
...@@ -154,9 +163,8 @@ class BASE_EXPORT JobHandle { ...@@ -154,9 +163,8 @@ class BASE_EXPORT JobHandle {
// } // }
// //
// |max_concurrency_callback| controls the maximum number of threads calling // |max_concurrency_callback| controls the maximum number of threads calling
// |worker_task| concurrently, given the number of threads currently assigned to // |worker_task| concurrently. |worker_task| is only invoked if the number of
// this job. |worker_task| is only invoked if the number of threads previously // threads previously running |worker_task| was less than the value returned by
// running |worker_task| was less than the value returned by
// |max_concurrency_callback|. In general, |max_concurrency_callback| should // |max_concurrency_callback|. In general, |max_concurrency_callback| should
// return the latest number of incomplete work items (smallest unit of work) // return the latest number of incomplete work items (smallest unit of work)
// left to processed. JobHandle/JobDelegate::NotifyConcurrencyIncrease() *must* // left to processed. JobHandle/JobDelegate::NotifyConcurrencyIncrease() *must*
...@@ -171,11 +179,10 @@ class BASE_EXPORT JobHandle { ...@@ -171,11 +179,10 @@ class BASE_EXPORT JobHandle {
// |traits| requirements: // |traits| requirements:
// - base::ThreadPolicy must be specified if the priority of the task runner // - base::ThreadPolicy must be specified if the priority of the task runner
// will ever be increased from BEST_EFFORT. // will ever be increased from BEST_EFFORT.
JobHandle BASE_EXPORT JobHandle BASE_EXPORT PostJob(const Location& from_here,
PostJob(const Location& from_here, const TaskTraits& traits,
const TaskTraits& traits, RepeatingCallback<void(JobDelegate*)> worker_task,
RepeatingCallback<void(JobDelegate*)> worker_task, MaxConcurrencyCallback max_concurrency_callback);
RepeatingCallback<size_t(size_t)> max_concurrency_callback);
} // namespace base } // namespace base
......
...@@ -78,12 +78,11 @@ bool JobTaskSource::JoinFlag::ShouldWorkerSignal() { ...@@ -78,12 +78,11 @@ bool JobTaskSource::JoinFlag::ShouldWorkerSignal() {
return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting; return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting;
} }
JobTaskSource::JobTaskSource( JobTaskSource::JobTaskSource(const Location& from_here,
const Location& from_here, const TaskTraits& traits,
const TaskTraits& traits, RepeatingCallback<void(JobDelegate*)> worker_task,
RepeatingCallback<void(JobDelegate*)> worker_task, MaxConcurrencyCallback max_concurrency_callback,
RepeatingCallback<size_t(size_t)> max_concurrency_callback, PooledTaskRunnerDelegate* delegate)
PooledTaskRunnerDelegate* delegate)
: TaskSource(traits, nullptr, TaskSourceExecutionMode::kJob), : TaskSource(traits, nullptr, TaskSourceExecutionMode::kJob),
from_here_(from_here), from_here_(from_here),
max_concurrency_callback_(std::move(max_concurrency_callback)), max_concurrency_callback_(std::move(max_concurrency_callback)),
......
...@@ -35,7 +35,7 @@ class BASE_EXPORT JobTaskSource : public TaskSource { ...@@ -35,7 +35,7 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
JobTaskSource(const Location& from_here, JobTaskSource(const Location& from_here,
const TaskTraits& traits, const TaskTraits& traits,
RepeatingCallback<void(JobDelegate*)> worker_task, RepeatingCallback<void(JobDelegate*)> worker_task,
RepeatingCallback<size_t(size_t)> max_concurrency_callback, MaxConcurrencyCallback max_concurrency_callback,
PooledTaskRunnerDelegate* delegate); PooledTaskRunnerDelegate* delegate);
static JobHandle CreateJobHandle( static JobHandle CreateJobHandle(
......
...@@ -655,7 +655,13 @@ void WorkerTask(base::JobDelegate* job_delegate) { ...@@ -655,7 +655,13 @@ void WorkerTask(base::JobDelegate* job_delegate) {
} }
// Returns the latest thread-safe number of incomplete work items. // Returns the latest thread-safe number of incomplete work items.
void NumIncompleteWorkItems(size_t worker_count); void NumIncompleteWorkItems(size_t worker_count) {
// NumIncompleteWorkItems() may use |worker_count| if it needs to account for
// local work lists, which is easier than doing its own accounting, keeping in
// mind that the actual number of items may be racily overestimated and thus
// WorkerTask() may be called when there's no available work.
return GlobalQueueSize() + worker_count;
}
base::PostJob(FROM_HERE, {}, base::PostJob(FROM_HERE, {},
base::BindRepeating(&WorkerTask), base::BindRepeating(&WorkerTask),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment