Commit 3c4a5fb1 authored by Etienne Pierre-doray's avatar Etienne Pierre-doray Committed by Commit Bot

[Jobs]: Implement JobDelegate::GetTaskId

A unique id that's < worker count. This will be used by several v8/blink
use cases to index into worklist by worker.

Change-Id: I3fce2f3f885d507e5b4f1f4425b9065dd642a6fc
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2230760
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#783702}
parent 8ce4f9d7
...@@ -26,6 +26,8 @@ JobDelegate::JobDelegate( ...@@ -26,6 +26,8 @@ JobDelegate::JobDelegate(
} }
JobDelegate::~JobDelegate() { JobDelegate::~JobDelegate() {
if (task_id_ != kInvalidTaskId)
task_source_->ReleaseTaskId(task_id_);
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
// When ShouldYield() returns false, the worker task is expected to do // When ShouldYield() returns false, the worker task is expected to do
// work before returning. // work before returning.
...@@ -60,6 +62,12 @@ void JobDelegate::NotifyConcurrencyIncrease() { ...@@ -60,6 +62,12 @@ void JobDelegate::NotifyConcurrencyIncrease() {
task_source_->NotifyConcurrencyIncrease(); task_source_->NotifyConcurrencyIncrease();
} }
uint8_t JobDelegate::GetTaskId() {
if (task_id_ == kInvalidTaskId)
task_id_ = task_source_->AcquireTaskId();
return task_id_;
}
void JobDelegate::AssertExpectedConcurrency(size_t expected_max_concurrency) { void JobDelegate::AssertExpectedConcurrency(size_t expected_max_concurrency) {
// In dcheck builds, verify that max concurrency falls in one of the following // In dcheck builds, verify that max concurrency falls in one of the following
// cases: // cases:
......
...@@ -5,6 +5,8 @@ ...@@ -5,6 +5,8 @@
#ifndef BASE_TASK_POST_JOB_H_ #ifndef BASE_TASK_POST_JOB_H_
#define BASE_TASK_POST_JOB_H_ #define BASE_TASK_POST_JOB_H_
#include <limits>
#include "base/base_export.h" #include "base/base_export.h"
#include "base/callback.h" #include "base/callback.h"
#include "base/check_op.h" #include "base/check_op.h"
...@@ -50,7 +52,14 @@ class BASE_EXPORT JobDelegate { ...@@ -50,7 +52,14 @@ class BASE_EXPORT JobDelegate {
// of worker should be adjusted accordingly. See PostJob() for more details. // of worker should be adjusted accordingly. See PostJob() for more details.
void NotifyConcurrencyIncrease(); void NotifyConcurrencyIncrease();
// Returns a task_id unique among threads currently running this job, such
// that GetTaskId() < worker count. To achieve this, the same task_id may be
// reused by a different thread after a worker_task returns.
uint8_t GetTaskId();
private: private:
static constexpr uint8_t kInvalidTaskId = std::numeric_limits<uint8_t>::max();
// Verifies that either max concurrency is lower or equal to // Verifies that either max concurrency is lower or equal to
// |expected_max_concurrency|, or there is an increase version update // |expected_max_concurrency|, or there is an increase version update
// triggered by NotifyConcurrencyIncrease(). // triggered by NotifyConcurrencyIncrease().
...@@ -58,6 +67,7 @@ class BASE_EXPORT JobDelegate { ...@@ -58,6 +67,7 @@ class BASE_EXPORT JobDelegate {
internal::JobTaskSource* const task_source_; internal::JobTaskSource* const task_source_;
internal::PooledTaskRunnerDelegate* const pooled_task_runner_delegate_; internal::PooledTaskRunnerDelegate* const pooled_task_runner_delegate_;
uint8_t task_id_ = kInvalidTaskId;
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
// Used in AssertExpectedConcurrency(), see that method's impl for details. // Used in AssertExpectedConcurrency(), see that method's impl for details.
......
...@@ -4,10 +4,12 @@ ...@@ -4,10 +4,12 @@
#include "base/task/thread_pool/job_task_source.h" #include "base/task/thread_pool/job_task_source.h"
#include <type_traits>
#include <utility> #include <utility>
#include "base/bind.h" #include "base/bind.h"
#include "base/bind_helpers.h" #include "base/bind_helpers.h"
#include "base/bits.h"
#include "base/check_op.h" #include "base/check_op.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
#include "base/task/common/checked_lock.h" #include "base/task/common/checked_lock.h"
...@@ -20,6 +22,18 @@ ...@@ -20,6 +22,18 @@
namespace base { namespace base {
namespace internal { namespace internal {
namespace {
// Capped to allow assigning task_ids from a bitfield.
constexpr size_t kMaxWorkersPerJob = 32;
static_assert(
kMaxWorkersPerJob <=
std::numeric_limits<std::result_of<
decltype (&JobDelegate::GetTaskId)(JobDelegate)>::type>::max(),
"AcquireTaskId return type isn't big enough to fit kMaxWorkersPerJob");
} // namespace
// Memory ordering on |state_| operations // Memory ordering on |state_| operations
// //
// The write operation on |state_| in WillRunTask() uses // The write operation on |state_| in WillRunTask() uses
...@@ -317,7 +331,30 @@ void JobTaskSource::NotifyConcurrencyIncrease() { ...@@ -317,7 +331,30 @@ void JobTaskSource::NotifyConcurrencyIncrease() {
} }
size_t JobTaskSource::GetMaxConcurrency() const { size_t JobTaskSource::GetMaxConcurrency() const {
return max_concurrency_callback_.Run(); return std::min(max_concurrency_callback_.Run(), kMaxWorkersPerJob);
}
uint8_t JobTaskSource::AcquireTaskId() {
static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
"TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
uint32_t assigned_task_ids =
assigned_task_ids_.load(std::memory_order_relaxed);
uint32_t new_assigned_task_ids = 0;
uint8_t task_id = 0;
do {
// Count trailing one bits. This is the id of the right-most 0-bit in
// |assigned_task_ids|.
task_id = bits::CountTrailingZeroBits(~assigned_task_ids);
new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
} while (assigned_task_ids_.compare_exchange_weak(
assigned_task_ids, new_assigned_task_ids, std::memory_order_relaxed));
return task_id;
}
void JobTaskSource::ReleaseTaskId(uint8_t task_id) {
uint32_t previous_task_ids =
assigned_task_ids_.fetch_and(~(uint32_t(1) << task_id));
DCHECK(previous_task_ids & (uint32_t(1) << task_id));
} }
bool JobTaskSource::ShouldYield() { bool JobTaskSource::ShouldYield() {
......
...@@ -73,6 +73,9 @@ class BASE_EXPORT JobTaskSource : public TaskSource { ...@@ -73,6 +73,9 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// concurrently. // concurrently.
size_t GetMaxConcurrency() const; size_t GetMaxConcurrency() const;
uint8_t AcquireTaskId();
void ReleaseTaskId(uint8_t task_id);
// Returns true if a worker should return from the worker task on the current // Returns true if a worker should return from the worker task on the current
// thread ASAP. // thread ASAP.
bool ShouldYield(); bool ShouldYield();
...@@ -194,6 +197,7 @@ class BASE_EXPORT JobTaskSource : public TaskSource { ...@@ -194,6 +197,7 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// Current atomic state. // Current atomic state.
State state_; State state_;
std::atomic<uint32_t> assigned_task_ids_{0};
// Normally, |join_flag_| is protected by |lock_|, except in ShouldYield() // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
// hence the use of atomics. // hence the use of atomics.
JoinFlag join_flag_ GUARDED_BY(lock_); JoinFlag join_flag_ GUARDED_BY(lock_);
......
...@@ -475,5 +475,54 @@ TEST_F(ThreadPoolJobTaskSourceTest, InvalidDidProcessTask) { ...@@ -475,5 +475,54 @@ TEST_F(ThreadPoolJobTaskSourceTest, InvalidDidProcessTask) {
EXPECT_DCHECK_DEATH(registered_task_source.DidProcessTask()); EXPECT_DCHECK_DEATH(registered_task_source.DidProcessTask());
} }
TEST_F(ThreadPoolJobTaskSourceTest, AcquireTaskId) {
auto job_task =
base::MakeRefCounted<test::MockJobTask>(DoNothing(),
/* num_tasks_to_run */ 4);
scoped_refptr<JobTaskSource> task_source =
job_task->GetJobTaskSource(FROM_HERE, {}, &pooled_task_runner_delegate_);
EXPECT_EQ(0U, task_source->AcquireTaskId());
EXPECT_EQ(1U, task_source->AcquireTaskId());
EXPECT_EQ(2U, task_source->AcquireTaskId());
EXPECT_EQ(3U, task_source->AcquireTaskId());
EXPECT_EQ(4U, task_source->AcquireTaskId());
task_source->ReleaseTaskId(1);
task_source->ReleaseTaskId(3);
EXPECT_EQ(1U, task_source->AcquireTaskId());
EXPECT_EQ(3U, task_source->AcquireTaskId());
EXPECT_EQ(5U, task_source->AcquireTaskId());
}
// Verifies that task id is released after worker_task returns.
TEST_F(ThreadPoolJobTaskSourceTest, GetTaskId) {
auto task_source = MakeRefCounted<JobTaskSource>(
FROM_HERE, TaskTraits{}, BindRepeating([](JobDelegate* delegate) {
// Confirm that task id 0 is reused on the second run.
EXPECT_EQ(0U, delegate->GetTaskId());
// Allow running the task again.
delegate->NotifyConcurrencyIncrease();
}),
BindRepeating([]() -> size_t { return 1; }),
&pooled_task_runner_delegate_);
auto registered_task_source =
RegisteredTaskSource::CreateForTesting(task_source);
// Run the worker_task twice.
ASSERT_EQ(registered_task_source.WillRunTask(),
TaskSource::RunStatus::kAllowedSaturated);
auto task1 = registered_task_source.TakeTask();
std::move(task1.task).Run();
registered_task_source.DidProcessTask();
ASSERT_EQ(registered_task_source.WillRunTask(),
TaskSource::RunStatus::kAllowedSaturated);
auto task2 = registered_task_source.TakeTask();
std::move(task2.task).Run();
registered_task_source.DidProcessTask();
}
} // namespace internal } // namespace internal
} // namespace base } // namespace base
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment