Commit 56203474 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[Jobs API]: Exclude inactive main thread from worker_count.

GetMaxConcurrency() is given the active worker_count. The main thread shouldn't
be counted when it returns, otherwise causing infinite job.

Bug: 1114823
Change-Id: I4954559992a8e17a92dbbbc9e3bce152563d1235
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2355297Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Commit-Queue: François Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#799094}
parent cdad9ef9
......@@ -127,7 +127,8 @@ bool JobTaskSource::WillJoin() {
const auto state_before_add = state_.IncrementWorkerCount();
if (!state_before_add.is_canceled() &&
state_before_add.worker_count() < GetMaxConcurrency(state_before_add)) {
state_before_add.worker_count() <
GetMaxConcurrency(state_before_add.worker_count())) {
return true;
}
return WaitForParticipationOpportunity();
......@@ -142,8 +143,12 @@ bool JobTaskSource::RunJoinTask() {
// Stale values will only cause WaitForParticipationOpportunity() to be
// called.
const auto state = TS_UNCHECKED_READ(state_).Load();
if (!state.is_canceled() && state.worker_count() <= GetMaxConcurrency(state))
// The condition is slightly different from the one in WillJoin() since we're
// using |state| that was already incremented to include the joining thread.
if (!state.is_canceled() &&
state.worker_count() <= GetMaxConcurrency(state.worker_count() - 1)) {
return true;
}
TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
CheckedAutoLock auto_lock(worker_lock_);
......@@ -170,7 +175,8 @@ bool JobTaskSource::WaitForParticipationOpportunity() {
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with |state_| outside of |lock_|.
auto state = state_.Load();
size_t max_concurrency = GetMaxConcurrency(state);
// |worker_count - 1| to exclude the joining thread which is not active.
size_t max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
// Wait until either:
// A) |worker_count| is below or equal to max concurrency and state is not
......@@ -188,7 +194,8 @@ bool JobTaskSource::WaitForParticipationOpportunity() {
// 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase.
worker_released_condition_->Wait();
state = state_.Load();
max_concurrency = GetMaxConcurrency(state);
// |worker_count - 1| to exclude the joining thread which is not active.
max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
}
// Case A:
if (state.worker_count() <= max_concurrency && !state.is_canceled())
......@@ -205,7 +212,8 @@ TaskSource::RunStatus JobTaskSource::WillRunTask() {
CheckedAutoLock auto_lock(worker_lock_);
auto state_before_add = state_.Load();
const size_t max_concurrency = GetMaxConcurrency(state_before_add);
const size_t max_concurrency =
GetMaxConcurrency(state_before_add.worker_count());
if (!state_before_add.is_canceled() &&
state_before_add.worker_count() < max_concurrency) {
state_before_add = state_.IncrementWorkerCount();
......@@ -233,7 +241,7 @@ size_t JobTaskSource::GetRemainingConcurrency() const {
// It is safe to read |state_| without a lock since this variable is atomic,
// and no other state is synchronized with GetRemainingConcurrency().
const auto state = TS_UNCHECKED_READ(state_).Load();
const size_t max_concurrency = GetMaxConcurrency(state);
const size_t max_concurrency = GetMaxConcurrency(state.worker_count());
// Avoid underflows.
if (state.is_canceled() || state.worker_count() > max_concurrency)
return 0;
......@@ -243,7 +251,8 @@ size_t JobTaskSource::GetRemainingConcurrency() const {
bool JobTaskSource::IsCompleted() const {
CheckedAutoLock auto_lock(worker_lock_);
auto state = state_.Load();
return GetMaxConcurrency(state) == 0 && state.worker_count() == 0;
return GetMaxConcurrency(state.worker_count()) == 0 &&
state.worker_count() == 0;
}
size_t JobTaskSource::GetWorkerCount() const {
......@@ -282,11 +291,11 @@ void JobTaskSource::NotifyConcurrencyIncrease() {
}
size_t JobTaskSource::GetMaxConcurrency() const {
return GetMaxConcurrency(TS_UNCHECKED_READ(state_).Load());
return GetMaxConcurrency(TS_UNCHECKED_READ(state_).Load().worker_count());
}
size_t JobTaskSource::GetMaxConcurrency(State::Value value) const {
return std::min(max_concurrency_callback_.Run(value.worker_count()),
size_t JobTaskSource::GetMaxConcurrency(size_t worker_count) const {
return std::min(max_concurrency_callback_.Run(worker_count),
kMaxWorkersPerJob);
}
......@@ -372,7 +381,9 @@ bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) {
// Re-enqueue the TaskSource if the task ran and the worker count is below the
// max concurrency.
return state_before_sub.worker_count() <= GetMaxConcurrency(state_.Load());
// |worker_count - 1| to exclude the returning thread.
return state_before_sub.worker_count() <=
GetMaxConcurrency(state_before_sub.worker_count() - 1);
}
SequenceSortKey JobTaskSource::GetSortKey() const {
......
......@@ -182,7 +182,7 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// either there's no work remaining or Job was cancelled.
bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_);
size_t GetMaxConcurrency(State::Value value) const;
size_t GetMaxConcurrency(size_t worker_count) const;
// TaskSource:
RunStatus WillRunTask() override;
......
......@@ -253,6 +253,54 @@ TEST_F(ThreadPoolJobTaskSourceTest, RunJoinTask) {
EXPECT_FALSE(task_source->RunJoinTask());
}
// Verify that |worker_count| excludes the (inactive) returning thread calling
// max_concurrency_callback.
TEST_F(ThreadPoolJobTaskSourceTest, RunTaskWorkerCount) {
size_t max_concurrency = 1;
scoped_refptr<JobTaskSource> task_source =
base::MakeRefCounted<JobTaskSource>(
FROM_HERE, TaskTraits(),
BindLambdaForTesting(
[&](JobDelegate* delegate) { --max_concurrency; }),
BindLambdaForTesting([&](size_t worker_count) -> size_t {
return max_concurrency + worker_count;
}),
&pooled_task_runner_delegate_);
auto registered_task_source =
RegisteredTaskSource::CreateForTesting(task_source);
EXPECT_EQ(registered_task_source.WillRunTask(),
TaskSource::RunStatus::kAllowedSaturated);
auto task = registered_task_source.TakeTask();
std::move(task.task).Run();
// Once the worker_task runs, |worker_count| should drop to 0 and the job
// should finish.
EXPECT_FALSE(registered_task_source.DidProcessTask());
EXPECT_EQ(0U, max_concurrency);
}
// Verify that |worker_count| excludes the (inactive) joining thread calling
// max_concurrency_callback.
TEST_F(ThreadPoolJobTaskSourceTest, RunJoinTaskWorkerCount) {
size_t max_concurrency = 1;
scoped_refptr<JobTaskSource> task_source =
base::MakeRefCounted<JobTaskSource>(
FROM_HERE, TaskTraits(),
BindLambdaForTesting(
[&](JobDelegate* delegate) { --max_concurrency; }),
BindLambdaForTesting([&](size_t worker_count) -> size_t {
return max_concurrency + worker_count;
}),
&pooled_task_runner_delegate_);
EXPECT_TRUE(task_source->WillJoin());
// Once the worker_task runs, |worker_count| should drop to 0 and the job
// should finish.
EXPECT_FALSE(task_source->RunJoinTask());
EXPECT_EQ(0U, max_concurrency);
}
// Verifies that WillJoin() doesn't allow a joining thread to contribute
// after Cancel() is called.
TEST_F(ThreadPoolJobTaskSourceTest, CancelJoinTask) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment