Commit a6531eab authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[Jobs]: Remove concurrency increase checks.

Concurrency checks gives bogus worker_count values
since https://chromium-review.googlesource.com/c/chromium/src/+/2304972
The thread that just returned from worker task will still count as
active. This can cause races that will trigger dcheck. This could be
fixed probably by tweaking worker_count similar to
https://chromium-review.googlesource.com/c/chromium/src/+/2355297
Since this is dcheck-only code that adds significant complexity however,
it makes more sense to strip the code:
- dcheck can trigger in valid use cases
- The issues that the dcheck will catch would also cause Join to never
  return. This should be easy enough to detect without the dcheck.

Bug: 1114823
Change-Id: I1181ba7f530cf81624f578ed0f324849ad930e41
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2363074
Commit-Queue: François Doray <fdoray@chromium.org>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#799301}
parent ccfafc3b
......@@ -18,24 +18,11 @@ JobDelegate::JobDelegate(
: task_source_(task_source),
pooled_task_runner_delegate_(pooled_task_runner_delegate) {
DCHECK(task_source_);
#if DCHECK_IS_ON()
recorded_increase_version_ = task_source_->GetConcurrencyIncreaseVersion();
// Record max concurrency before running the worker task.
recorded_max_concurrency_ = task_source_->GetMaxConcurrency();
#endif // DCHECK_IS_ON()
}
JobDelegate::~JobDelegate() {
if (task_id_ != kInvalidTaskId)
task_source_->ReleaseTaskId(task_id_);
#if DCHECK_IS_ON()
// When ShouldYield() returns false, the worker task is expected to do
// work before returning.
size_t expected_max_concurrency = recorded_max_concurrency_;
if (!last_should_yield_ && expected_max_concurrency > 0)
--expected_max_concurrency;
AssertExpectedConcurrency(expected_max_concurrency);
#endif // DCHECK_IS_ON()
}
bool JobDelegate::ShouldYield() {
......@@ -68,50 +55,6 @@ uint8_t JobDelegate::GetTaskId() {
return task_id_;
}
void JobDelegate::AssertExpectedConcurrency(size_t expected_max_concurrency) {
// In dcheck builds, verify that max concurrency falls in one of the following
// cases:
// 1) max concurrency behaves normally and is below or equals the expected
// value.
// 2) max concurrency increased above the expected value, which implies
// there are new work items that the associated worker task didn't see and
// NotifyConcurrencyIncrease() should be called to adjust the number of
// worker.
// a) NotifyConcurrencyIncrease() was already called and the recorded
// concurrency version is out of date, i.e. less than the actual version.
// b) NotifyConcurrencyIncrease() has not yet been called, in which case the
// function waits for an imminent increase of the concurrency version,
// or for max concurrency to decrease below or equal the expected value.
// This prevent ill-formed GetMaxConcurrency() implementations that:
// - Don't decrease with the number of remaining work items.
// - Don't return an up-to-date value.
#if DCHECK_IS_ON()
// Case 1:
if (task_source_->GetMaxConcurrency() <= expected_max_concurrency)
return;
// Case 2a:
const size_t actual_version = task_source_->GetConcurrencyIncreaseVersion();
DCHECK_LE(recorded_increase_version_, actual_version);
if (recorded_increase_version_ < actual_version)
return;
// Case 2b:
const bool updated = task_source_->WaitForConcurrencyIncreaseUpdate(
recorded_increase_version_);
const size_t max_concurrency = task_source_->GetMaxConcurrency();
DCHECK(updated || max_concurrency <= expected_max_concurrency)
<< "Value returned by |max_concurrency_callback| (" << max_concurrency
<< ") is expected to decrease below or equal to "
<< expected_max_concurrency
<< ", unless NotifyConcurrencyIncrease() is called."
<< "Last ShouldYield() returned " << last_should_yield_;
recorded_increase_version_ = task_source_->GetConcurrencyIncreaseVersion();
recorded_max_concurrency_ = task_source_->GetMaxConcurrency();
#endif // DCHECK_IS_ON()
}
JobHandle::JobHandle() = default;
JobHandle::JobHandle(scoped_refptr<internal::JobTaskSource> task_source)
......
......@@ -60,21 +60,11 @@ class BASE_EXPORT JobDelegate {
private:
static constexpr uint8_t kInvalidTaskId = std::numeric_limits<uint8_t>::max();
// Verifies that either max concurrency is lower or equal to
// |expected_max_concurrency|, or there is an increase version update
// triggered by NotifyConcurrencyIncrease().
void AssertExpectedConcurrency(size_t expected_max_concurrency);
internal::JobTaskSource* const task_source_;
internal::PooledTaskRunnerDelegate* const pooled_task_runner_delegate_;
uint8_t task_id_ = kInvalidTaskId;
#if DCHECK_IS_ON()
// Used in AssertExpectedConcurrency(), see that method's impl for details.
// Value of max concurrency recorded before running the worker task.
size_t recorded_max_concurrency_;
// Value of the increase version recorded before running the worker task.
size_t recorded_increase_version_;
// Value returned by the last call to ShouldYield().
bool last_should_yield_ = false;
#endif
......
......@@ -99,12 +99,6 @@ JobTaskSource::JobTaskSource(
queue_time_(TimeTicks::Now()),
delegate_(delegate) {
DCHECK(delegate_);
#if DCHECK_IS_ON()
version_condition_for_dcheck_ = worker_lock_.CreateConditionVariable();
// Prevent wait from triggering a ScopedBlockingCall as this would add
// complexity outside this DCHECK-only code.
version_condition_for_dcheck_->declare_only_used_while_idle();
#endif // DCHECK_IS_ON()
}
JobTaskSource::~JobTaskSource() {
......@@ -161,11 +155,6 @@ void JobTaskSource::Cancel(TaskSource::Transaction* transaction) {
// WillRunTask() never succeed. std::memory_order_relaxed is sufficient
// because this task source never needs to be re-enqueued after Cancel().
state_.Cancel();
#if DCHECK_IS_ON()
++increase_version_;
version_condition_for_dcheck_->Broadcast();
#endif // DCHECK_IS_ON()
}
// EXCLUSIVE_LOCK_REQUIRED(worker_lock_)
......@@ -260,14 +249,6 @@ size_t JobTaskSource::GetWorkerCount() const {
}
void JobTaskSource::NotifyConcurrencyIncrease() {
#if DCHECK_IS_ON()
{
CheckedAutoLock auto_lock(worker_lock_);
++increase_version_;
version_condition_for_dcheck_->Broadcast();
}
#endif // DCHECK_IS_ON()
// Avoid unnecessary locks when NotifyConcurrencyIncrease() is spuriously
// called.
if (GetRemainingConcurrency() == 0)
......@@ -330,32 +311,6 @@ bool JobTaskSource::ShouldYield() {
TS_UNCHECKED_READ(state_).Load().is_canceled();
}
#if DCHECK_IS_ON()
size_t JobTaskSource::GetConcurrencyIncreaseVersion() const {
CheckedAutoLock auto_lock(worker_lock_);
return increase_version_;
}
bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
CheckedAutoLock auto_lock(worker_lock_);
constexpr TimeDelta timeout = TimeDelta::FromSeconds(1);
const base::TimeTicks start_time = subtle::TimeTicksNowIgnoringOverride();
do {
DCHECK_LE(recorded_version, increase_version_);
const auto state = state_.Load();
if (recorded_version != increase_version_ || state.is_canceled())
return true;
// Waiting is acceptable because it is in DCHECK-only code.
ScopedAllowBaseSyncPrimitivesOutsideBlockingScope
allow_base_sync_primitives;
version_condition_for_dcheck_->TimedWait(timeout);
} while (subtle::TimeTicksNowIgnoringOverride() - start_time < timeout);
return false;
}
#endif // DCHECK_IS_ON()
Task JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
// JobTaskSource members are not lock-protected so no need to acquire a lock
// if |transaction| is nullptr.
......
......@@ -85,13 +85,6 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
PooledTaskRunnerDelegate* delegate() const { return delegate_; }
#if DCHECK_IS_ON()
size_t GetConcurrencyIncreaseVersion() const;
// Returns true if the concurrency version was updated above
// |recorded_version|, or false on timeout.
bool WaitForConcurrencyIncreaseUpdate(size_t recorded_version);
#endif // DCHECK_IS_ON()
private:
// Atomic internal state to track the number of workers running a task from
// this JobTaskSource and whether this JobTaskSource is canceled. All
......@@ -217,13 +210,6 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
const TimeTicks queue_time_;
PooledTaskRunnerDelegate* delegate_;
#if DCHECK_IS_ON()
// Signaled whenever |increase_version_| is updated.
std::unique_ptr<ConditionVariable> version_condition_for_dcheck_;
// Incremented every time max concurrency is increased.
size_t increase_version_ GUARDED_BY(worker_lock_) = 0;
#endif // DCHECK_IS_ON()
DISALLOW_COPY_AND_ASSIGN(JobTaskSource);
};
......
......@@ -445,59 +445,6 @@ TEST_F(ThreadPoolJobTaskSourceTest, MaxConcurrencyStagnateIfShouldYield) {
registered_task_source.DidProcessTask();
}
// Verifies that a missing call to NotifyConcurrencyIncrease() causes a DCHECK
// death after a timeout.
TEST_F(ThreadPoolJobTaskSourceTest, InvalidConcurrency) {
testing::FLAGS_gtest_death_test_style = "threadsafe";
scoped_refptr<test::MockJobTask> job_task;
job_task = base::MakeRefCounted<test::MockJobTask>(
BindLambdaForTesting([&](JobDelegate* delegate) {
EXPECT_FALSE(delegate->ShouldYield());
job_task->SetNumTasksToRun(2);
EXPECT_FALSE(delegate->ShouldYield());
// After returning, a DCHECK should trigger because we never called
// NotifyConcurrencyIncrease().
}),
/* num_tasks_to_run */ 1);
scoped_refptr<JobTaskSource> task_source =
job_task->GetJobTaskSource(FROM_HERE, {}, &pooled_task_runner_delegate_);
auto registered_task_source =
RegisteredTaskSource::CreateForTesting(task_source);
ASSERT_EQ(registered_task_source.WillRunTask(),
TaskSource::RunStatus::kAllowedSaturated);
auto task = registered_task_source.TakeTask();
EXPECT_DCHECK_DEATH(std::move(task.task).Run());
registered_task_source.DidProcessTask();
}
// Verifies that a stale concurrency with no call to NotifyConcurrencyIncrease()
// causes a DCHECK death after a timeout.
TEST_F(ThreadPoolJobTaskSourceTest, StaleConcurrency) {
testing::FLAGS_gtest_death_test_style = "threadsafe";
auto task_source = MakeRefCounted<JobTaskSource>(
FROM_HERE, TaskTraits{}, BindRepeating([](JobDelegate*) {}),
BindRepeating([](size_t /*worker_count*/) -> size_t { return 1; }),
&pooled_task_runner_delegate_);
auto registered_task_source =
RegisteredTaskSource::CreateForTesting(task_source);
ASSERT_EQ(registered_task_source.WillRunTask(),
TaskSource::RunStatus::kAllowedSaturated);
auto task = registered_task_source.TakeTask();
// A DCHECK should trigger when |JobTaskSource::primary_task_| returns and
// ~JobDelegate() invokes AssertExpectedConcurrency(0)
EXPECT_DCHECK_DEATH(std::move(task.task).Run());
registered_task_source.DidProcessTask();
}
TEST_F(ThreadPoolJobTaskSourceTest, InvalidTakeTask) {
auto job_task =
base::MakeRefCounted<test::MockJobTask>(DoNothing(),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment