Commit 0e88c66a authored by Nate Chapin's avatar Nate Chapin Committed by Commit Bot

WorkerTaskQueue prototype with implicit dependency graph

This adds a new task queue API that returns a handle for the posted
task, and allows future tasks to take this handle as a parameter. This
creates a dependency between the two tasks, and the prerequisite task
will pass its return value as a parameter to the dependent task.

Also, fix a Persistent-rooted reference cycle that was causing
ThreadPools (and their associated threads) to leak.

Bug: 879306
Change-Id: I9af6b85ca2e56da464d4316b29bda53ba694e35a
Reviewed-on: https://chromium-review.googlesource.com/c/1242074
Commit-Queue: Nate Chapin <japhet@chromium.org>
Reviewed-by: default avatarHiroshige Hayashizaki <hiroshige@chromium.org>
Reviewed-by: default avatarHiroki Nakagawa <nhiroki@chromium.org>
Cr-Commit-Position: refs/heads/master@{#597386}
parent c812ddcd
......@@ -21,7 +21,7 @@ function maybeFinish() {
};
for (var i = 1; i < 101; i++) {
queue.postTask(j => j, undefined, i)
queue.postFunction(j => j, undefined, i)
.then(result => sum += result, maybeFinish)
.then(maybeFinish);
}
......
......@@ -33,7 +33,7 @@ const controller = new AbortController();
const signal = controller.signal;
for (var i = 0; i < stop; i++) {
queue.postTask(fib, i < stop - 1 ? signal : undefined, i)
queue.postFunction(fib, i < stop - 1 ? signal : undefined, i)
.then(print, fail);
}
......
......@@ -6,7 +6,7 @@ promise_test(t => {
const queue = new WorkerTaskQueue("user-interaction");
const exception_string = "Exceptions in a posted task should be reported during promise rejection.";
return promise_rejects(t, new Error(exception_string),
queue.postTask(function() { throw exception_string; }));
queue.postFunction(function() { throw exception_string; }));
});
</script>
</body>
<body>
<script src="../../../resources/testharness.js"></script>
<script src="../../../resources/testharnessreport.js"></script>
<script>
promise_test(async () => {
const queue = new WorkerTaskQueue("background");
const task = queue.postTask(i => i, 2);
const task2 = queue.postTask(i => i, 4);
const task3 = queue.postTask((i,j) => i+j, task, task2);
const result = await task3.result;
assert_equals(result, 6);
});
</script>
</body>
<body>
<script src="../../../resources/testharness.js"></script>
<script src="../../../resources/testharnessreport.js"></script>
<script>
promise_test(async () => {
const queue = new WorkerTaskQueue("background");
const task = queue.postTask(i => i, 2);
const task2 = queue.postTask(i => i, 4);
task2.cancel();
const task3 = queue.postTask((i,j) => i+j, task, task2);
const result = await task3.result;
assert_equals(result, 2);
});
promise_test(async () => {
const queue = new WorkerTaskQueue("background");
const task = queue.postTask(i => i, 2);
const task2 = queue.postTask(i => i, 4);
const task3 = queue.postTask((i,j) => i+j, task, task2);
task3.cancel();
const result2 = await task2.result;
const result3 = await task3.result;
assert_equals(result2, 4);
assert_equals(result3, null);
});
</script>
</body>
<body>
This tests generates a random graph of tasks as a test for race conditions. PASS if it doesn't crash.
<script src="../../../resources/testharness.js"></script>
<script src="../../../resources/testharnessreport.js"></script>
<script>
function f() {
var sum = 0;
for (var i = 0; i < arguments.length; i++)
sum += arguments[i];
return sum;
}
var iterations = 500;
promise_test(async () => {
var tasks = new Array(iterations);
const queue = new WorkerTaskQueue("background");
for (var i = 0; i < iterations; i++) {
// Pick a random number of arguments, up to sqrt(i).
var argCount = Math.random() * Math.sqrt(i);
var args = [f];
// Randomly select tasks to be arguments to the next task.
// This can select the same task multiple times.
for (var j = 0; j < argCount; j++) {
var argIndex = Math.trunc(Math.random() * i);
if (tasks[argIndex] != null)
args.push(tasks[argIndex]);
}
// If no arguments were selected, pass in a dummy 1.
if (args.length == 1)
args[1] = 1;
const task = queue.postTask.apply(queue, args);
const policy = Math.random();
// Randomly select one or more of:
// * Retain the JS wrapper for the task
// * Request the result of the task
// * GC unreferenced wrappers
if ((policy >= 0.1 && policy < 0.6) || i == (iterations - 1))
tasks[i] = task;
if (policy < 0.2) {
Promise.resolve(task.result).then(i => i);
}
if (policy > 0.9 && window.gc)
window.gc();
}
const result = await tasks[(iterations - 1)].result;
console.log(result);
});
</script>
</body>
<body>
<script src="../../../resources/testharness.js"></script>
<script src="../../../resources/testharnessreport.js"></script>
<script>
promise_test(async () => {
const queue = new WorkerTaskQueue("background");
const task = queue.postTask(i => i, 2);
const task2 = queue.postTask(i => i, 4);
await task.result;
await task2.result;
const task3 = queue.postTask((i,j) => i+j, task, task2);
const result = await task3.result;
assert_equals(result, 6);
});
</script>
</body>
......@@ -7254,6 +7254,11 @@ interface SyncManager
method constructor
method getTags
method register
interface Task
attribute @@toStringTag
getter result
method cancel
method constructor
interface TaskAttributionTiming : PerformanceEntry
attribute @@toStringTag
getter containerId
......@@ -10081,6 +10086,7 @@ interface Worker : EventTarget
interface WorkerTaskQueue
attribute @@toStringTag
method constructor
method postFunction
method postTask
interface Worklet
attribute @@toStringTag
......
......@@ -452,6 +452,7 @@ core_idl_files =
"typed_arrays/uint8_array.idl",
"typed_arrays/uint8_clamped_array.idl",
"url/url_search_params.idl",
"workers/experimental/task.idl",
"workers/experimental/worker_task_queue.idl",
"workers/shared_worker.idl",
"workers/worker.idl",
......
......@@ -20,6 +20,8 @@ blink_core_sources("workers") {
"dedicated_worker_thread.h",
"execution_context_worker_registry.cc",
"execution_context_worker_registry.h",
"experimental/task.cc",
"experimental/task.h",
"experimental/thread_pool.cc",
"experimental/thread_pool.h",
"experimental/worker_task_queue.cc",
......
This directory contains an experimental API for farming tasks out to a pool of worker threads. API is still highly in flux, and is not anywhere near ready to ship.
This directory contains experimental APIs for farming tasks out to a pool of worker threads. Everything in this directory is still highly in flux, and is not anywhere near ready to ship.
thread_pool.{h,cc} contain a class that manages a pool of worker threads and can distribute work to them.
worker_task_queue.{h,cc,idl} contain a simple API for posting a task to a worker.
task_queue.{h,cc,idl} and task.{h,cc,idl} contain an API for posting tasks that can specify other tasks as prerequisites and coordinates the transfer of return values of prerequisite tasks to dependent tasks.
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/core/workers/experimental/task.h"
#include "third_party/blink/renderer/bindings/core/v8/serialization/serialized_script_value.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_binding_for_core.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_function.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_task.h"
#include "third_party/blink/renderer/bindings/core/v8/worker_or_worklet_script_controller.h"
#include "third_party/blink/renderer/core/workers/experimental/thread_pool.h"
#include "third_party/blink/renderer/core/workers/worker_or_worklet_global_scope.h"
#include "third_party/blink/renderer/platform/cross_thread_functional.h"
namespace blink {
ThreadPoolTask::ThreadPoolTask(ThreadPool* thread_pool,
v8::Isolate* isolate,
const ScriptValue& function,
const Vector<ScriptValue>& arguments)
: self_keep_alive_(base::AdoptRef(this)), arguments_(arguments.size()) {
DCHECK(IsMainThread());
// TODO(japhet): Handle serialization failures
function_ = SerializedScriptValue::SerializeAndSwallowExceptions(
isolate, function.V8Value()->ToString(isolate));
Vector<ThreadPoolTask*> prerequisites;
Vector<size_t> prerequisites_indices;
for (size_t i = 0; i < arguments_.size(); i++) {
// Normal case: if the argument isn't a Task, just serialize it.
if (!V8Task::hasInstance(arguments[i].V8Value(), isolate)) {
arguments_[i].serialized_value =
SerializedScriptValue::SerializeAndSwallowExceptions(
isolate, arguments[i].V8Value());
continue;
}
ThreadPoolTask* prerequisite =
ToScriptWrappable(arguments[i].V8Value().As<v8::Object>())
->ToImpl<Task>()
->GetThreadPoolTask();
prerequisites.push_back(prerequisite);
prerequisites_indices.push_back(i);
}
worker_thread_ = SelectThread(prerequisites, thread_pool);
worker_thread_->IncrementTasksInProgressCount();
if (prerequisites.IsEmpty()) {
MaybeStartTask();
return;
}
// Other ThreadPoolTask instances don't have a reference to |this| yet, so
// no need to lock mutex_. RegisterDependencies() populates those references,
// so any logic after this point must consider the potential for data races.
RegisterDependencies(prerequisites, prerequisites_indices);
}
// static
ThreadPoolThread* ThreadPoolTask::SelectThread(
const Vector<ThreadPoolTask*>& prerequisites,
ThreadPool* thread_pool) {
DCHECK(IsMainThread());
HashCountedSet<WorkerThread*> prerequisite_location_counts;
size_t max_prerequisite_location_count = 0;
ThreadPoolThread* max_prerequisite_thread = nullptr;
for (ThreadPoolTask* prerequisite : prerequisites) {
// For prerequisites that are not yet complete, track which thread the task
// will run on. Put this task on the thread where the most prerequisites
// reside. This is slightly imprecise, because a task may complete before
// registering dependent tasks below.
if (ThreadPoolThread* thread = prerequisite->GetScheduledThread()) {
prerequisite_location_counts.insert(thread);
unsigned thread_count = prerequisite_location_counts.count(thread);
if (thread_count > max_prerequisite_location_count) {
max_prerequisite_location_count = thread_count;
max_prerequisite_thread = thread;
}
}
}
return max_prerequisite_thread ? max_prerequisite_thread
: thread_pool->GetLeastBusyThread();
}
ThreadPoolThread* ThreadPoolTask::GetScheduledThread() {
DCHECK(IsMainThread());
MutexLocker lock(mutex_);
return state_ == State::kCompleted ? nullptr : worker_thread_;
}
// Should only be called from constructor. Split out in to a helper because
// clang appears to exempt constructors from thread safety analysis.
void ThreadPoolTask::RegisterDependencies(
const Vector<ThreadPoolTask*>& prerequisites,
const Vector<size_t>& prerequisites_indices) {
DCHECK(IsMainThread());
{
MutexLocker lock(mutex_);
prerequisites_remaining_ = prerequisites.size();
}
for (size_t i = 0; i < prerequisites.size(); i++) {
ThreadPoolTask* prerequisite = prerequisites[i];
size_t prerequisite_index = prerequisites_indices[i];
scoped_refptr<SerializedScriptValue> result =
prerequisite->RegisterDependencyIfNotComplete(this, prerequisite_index);
if (result)
PrerequisiteFinished(prerequisite_index, v8::Local<v8::Value>(), result);
}
}
scoped_refptr<SerializedScriptValue>
ThreadPoolTask::RegisterDependencyIfNotComplete(ThreadPoolTask* dependent,
size_t index) {
DCHECK(IsMainThread());
MutexLocker lock(mutex_);
if (state_ == State::kCompleted)
return serialized_result_;
dependents_.insert(std::make_unique<Dependent>(dependent, index));
return nullptr;
}
ThreadPoolTask::~ThreadPoolTask() {
DCHECK(IsMainThread());
DCHECK(!resolver_);
DCHECK_EQ(State::kCompleted, state_);
DCHECK(!function_);
DCHECK(arguments_.IsEmpty());
DCHECK(!prerequisites_remaining_);
DCHECK(dependents_.IsEmpty());
}
void ThreadPoolTask::PrerequisiteFinished(
size_t prerequisite_index,
v8::Local<v8::Value> v8_result,
scoped_refptr<SerializedScriptValue> result) {
MutexLocker lock(mutex_);
DCHECK(state_ == State::kPending || state_ == State::kCancelled);
DCHECK_GT(prerequisites_remaining_, 0u);
prerequisites_remaining_--;
// If the result of the prerequisite doesn't need to move between threads,
// save the deserialized v8::Value for later use.
if (worker_thread_->IsCurrentThread() && !v8_result.IsEmpty()) {
arguments_[prerequisite_index].v8_value =
std::make_unique<ScopedPersistent<v8::Value>>(
ToIsolate(worker_thread_->GlobalScope()), v8_result);
} else {
arguments_[prerequisite_index].serialized_value = result;
}
MaybeStartTask();
}
void ThreadPoolTask::MaybeStartTask() {
if (prerequisites_remaining_)
return;
DCHECK(state_ == State::kPending || state_ == State::kCancelled);
PostCrossThreadTask(*worker_thread_->GetTaskRunner(TaskType::kInternalWorker),
FROM_HERE,
CrossThreadBind(&ThreadPoolTask::StartTaskOnWorkerThread,
CrossThreadUnretained(this)));
}
void ThreadPoolTask::StartTaskOnWorkerThread() {
DCHECK(worker_thread_->IsCurrentThread());
bool was_cancelled = false;
{
MutexLocker lock(mutex_);
DCHECK(!prerequisites_remaining_);
switch (state_) {
case State::kPending:
state_ = State::kStarted;
break;
case State::kCancelled:
was_cancelled = true;
break;
case State::kStarted:
case State::kCompleted:
NOTREACHED();
break;
}
}
WorkerOrWorkletGlobalScope* global_scope = worker_thread_->GlobalScope();
v8::Isolate* isolate = ToIsolate(global_scope);
ScriptState::Scope scope(global_scope->ScriptController()->GetScriptState());
scoped_refptr<SerializedScriptValue> local_result;
v8::Local<v8::Value> return_value;
if (was_cancelled) {
local_result = SerializedScriptValue::Create();
} else {
return_value = RunTaskOnWorkerThread(isolate);
local_result = SerializedScriptValue::SerializeAndSwallowExceptions(
isolate, return_value);
}
function_ = nullptr;
arguments_.clear();
HashSet<std::unique_ptr<Dependent>> dependents_to_notify;
{
MutexLocker lock(mutex_);
serialized_result_ = local_result;
state_ = State::kCompleted;
dependents_to_notify.swap(dependents_);
}
for (auto& dependent : dependents_to_notify) {
dependent->task->PrerequisiteFinished(dependent->index, return_value,
local_result);
}
PostCrossThreadTask(
*worker_thread_->GetParentExecutionContextTaskRunners()->Get(
TaskType::kInternalWorker),
FROM_HERE,
CrossThreadBind(&ThreadPoolTask::TaskCompleted,
CrossThreadUnretained(this)));
// TaskCompleted maye delete |this| at any time after this point.
}
v8::Local<v8::Value> ThreadPoolTask::RunTaskOnWorkerThread(
v8::Isolate* isolate) {
DCHECK(worker_thread_->IsCurrentThread());
// No other thread should be touching function_ or arguments_ at this point,
// so no mutex needed while actually running the task.
v8::Local<v8::Context> context = isolate->GetCurrentContext();
String core_script =
"(" + ToCoreString(function_->Deserialize(isolate).As<v8::String>()) +
")";
v8::MaybeLocal<v8::Script> script = v8::Script::Compile(
isolate->GetCurrentContext(), V8String(isolate, core_script));
v8::Local<v8::Function> script_function =
script.ToLocalChecked()->Run(context).ToLocalChecked().As<v8::Function>();
Vector<v8::Local<v8::Value>> params(arguments_.size());
for (size_t i = 0; i < arguments_.size(); i++) {
DCHECK(!arguments_[i].serialized_value || !arguments_[i].v8_value);
if (arguments_[i].serialized_value)
params[i] = arguments_[i].serialized_value->Deserialize(isolate);
else
params[i] = arguments_[i].v8_value->NewLocal(isolate);
}
v8::TryCatch block(isolate);
v8::MaybeLocal<v8::Value> ret = script_function->Call(
context, script_function, params.size(), params.data());
DCHECK_EQ(ret.IsEmpty(), block.HasCaught());
v8::Local<v8::Value> return_value;
if (!ret.IsEmpty()) {
return_value = ret.ToLocalChecked();
if (return_value->IsPromise())
return_value = return_value.As<v8::Promise>()->Result();
} else {
return_value = block.Exception()->ToString(isolate);
}
return return_value;
}
void ThreadPoolTask::TaskCompleted() {
DCHECK(IsMainThread());
#if DCHECK_IS_ON
{
MutexLocker lock(mutex_);
DCHECK_EQ(State::kCompleted, state_);
}
#endif
if (resolver_ && resolver_->GetScriptState()->ContextIsValid()) {
resolver_->Resolve(GetResult(resolver_->GetScriptState()));
resolver_ = nullptr;
}
worker_thread_->DecrementTasksInProgressCount();
self_keep_alive_.reset();
// |this| may be deleted here.
}
ScriptValue ThreadPoolTask::GetResult(ScriptState* script_state) {
DCHECK(IsMainThread());
MutexLocker lock(mutex_);
if (state_ != State::kCompleted) {
DCHECK(!serialized_result_);
DCHECK(deserialized_result_.IsEmpty());
if (!resolver_)
resolver_ = ScriptPromiseResolver::Create(script_state);
return resolver_->Promise().GetScriptValue();
}
if (deserialized_result_.IsEmpty()) {
ScriptState::Scope scope(script_state);
deserialized_result_ = ScriptValue(
script_state,
serialized_result_->Deserialize(script_state->GetIsolate()));
}
return deserialized_result_;
}
void ThreadPoolTask::Cancel() {
DCHECK(IsMainThread());
MutexLocker lock(mutex_);
if (state_ == State::kPending)
state_ = State::kCancelled;
}
} // namespace blink
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_TASK_H_
#define THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_TASK_H_
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/core/workers/experimental/thread_pool.h"
#include "third_party/blink/renderer/platform/bindings/script_wrappable.h"
#include "third_party/blink/renderer/platform/wtf/ref_counted.h"
#include "third_party/blink/renderer/platform/wtf/threading_primitives.h"
namespace blink {
class Task;
// Runs |function| with |arguments| on a thread from the given ThreadPool.
// Scans |arguments| for Task objects, and registers those as dependencies,
// passing the result of those tasks in place of the Task arguments.
// All public functions are main-thread-only.
// ThreadPoolTask keeps itself alive via a self scoped_refptr until the
// the task completes and reports itself done on the main thread via
// TaskCompleted(). Other users (e.g. Task below) can keep the task
// alive after completion.
class ThreadPoolTask final : public RefCounted<ThreadPoolTask> {
public:
// Called on main thread
ThreadPoolTask(ThreadPool*,
v8::Isolate*,
const ScriptValue& function,
const Vector<ScriptValue>& arguments);
~ThreadPoolTask();
ScriptValue GetResult(ScriptState*) LOCKS_EXCLUDED(mutex_);
void Cancel() LOCKS_EXCLUDED(mutex_);
private:
void StartTaskOnWorkerThread() LOCKS_EXCLUDED(mutex_);
v8::Local<v8::Value> RunTaskOnWorkerThread(v8::Isolate*);
// Called on ANY thread (main thread, worker_thread_, or a sibling worker).
void MaybeStartTask() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void PrerequisiteFinished(size_t prerequisite_index,
v8::Local<v8::Value>,
scoped_refptr<SerializedScriptValue>)
LOCKS_EXCLUDED(mutex_);
// Called on main thread
static ThreadPoolThread* SelectThread(
const Vector<ThreadPoolTask*>& prerequisites,
ThreadPool*);
ThreadPoolThread* GetScheduledThread() LOCKS_EXCLUDED(mutex_);
void RegisterDependencies(const Vector<ThreadPoolTask*>& prerequisites,
const Vector<size_t>& prerequisite_indices)
LOCKS_EXCLUDED(mutex_);
scoped_refptr<SerializedScriptValue> RegisterDependencyIfNotComplete(
ThreadPoolTask* dependent,
size_t index) LOCKS_EXCLUDED(mutex_);
void TaskCompleted();
enum class State {
kPending,
kStarted,
kCancelled,
kCompleted,
};
// worker_thread_ is selected in the constructor and not changed thereafter.
ThreadPoolThread* worker_thread_ = nullptr;
// Main thread only
scoped_refptr<ThreadPoolTask> self_keep_alive_;
ScriptValue deserialized_result_;
Persistent<ScriptPromiseResolver> resolver_;
// Created in constructor on the main thread, consumed and cleared on
// worker_thread_. Those steps can't overlap, so no mutex_ required.
scoped_refptr<SerializedScriptValue> function_;
// Created and populated with non-prerequiste parameters on the main thread.
// Each prerequisite writes its return value into arguments_ from its thread.
// If the prequisite and this have the same worker_thread_, there is no need
// to serialize and deserialize the argument, so v8_argument will be populated
// with the v8::Value returned by the prerequisite.
// Consumed and cleared on worker_thread_.
// Only requires mutex_ when writing prerequisite results, at other times
// either the main thread or the worker_thread_ has sole access.
struct Argument {
scoped_refptr<SerializedScriptValue> serialized_value;
std::unique_ptr<ScopedPersistent<v8::Value>> v8_value;
};
Vector<Argument> arguments_;
// Read on main thread, write on worker_thread_.
scoped_refptr<SerializedScriptValue> serialized_result_ GUARDED_BY(mutex_);
// Read/write on both main thread and worker_thread_.
State state_ GUARDED_BY(mutex_) = State::kPending;
// Initialized in constructor on main thread, each completed prerequisite
// decrements from the prerequisite's thread or main thread.
size_t prerequisites_remaining_ GUARDED_BY(mutex_) = 0u;
// Elements added from main thread. Cleared on completion on worker_thread_.
// Each element in dependents_ is not yet in the kCompleted state and
// therefore is guaranteed to be alive.
struct Dependent {
public:
Dependent(ThreadPoolTask* task, size_t index) : task(task), index(index) {}
ThreadPoolTask* task;
size_t index;
};
HashSet<std::unique_ptr<Dependent>> dependents_ GUARDED_BY(mutex_);
Mutex mutex_;
};
// This is a thin, v8-exposed wrapper around ThreadPoolTask that allows
// ThreadPoolTask to avoid being GarbageCollected.
class Task : public ScriptWrappable {
DEFINE_WRAPPERTYPEINFO();
public:
explicit Task(ThreadPoolTask* thread_pool_task)
: thread_pool_task_(thread_pool_task) {}
~Task() override = default;
ScriptValue result(ScriptState* script_state) {
return thread_pool_task_->GetResult(script_state);
}
void cancel() { thread_pool_task_->Cancel(); }
ThreadPoolTask* GetThreadPoolTask() const { return thread_pool_task_.get(); }
private:
scoped_refptr<ThreadPoolTask> thread_pool_task_;
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_TASK_H_
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
[
Exposed=Window,
RuntimeEnabled=WorkerTaskQueue
] interface Task {
[CallWith=ScriptState] readonly attribute any result;
void cancel();
};
......@@ -17,7 +17,6 @@
#include "third_party/blink/renderer/core/workers/dedicated_worker_messaging_proxy.h"
#include "third_party/blink/renderer/core/workers/threaded_messaging_proxy_base.h"
#include "third_party/blink/renderer/core/workers/threaded_object_proxy_base.h"
#include "third_party/blink/renderer/core/workers/worker_backing_thread.h"
#include "third_party/blink/renderer/core/workers/worker_global_scope.h"
#include "third_party/blink/renderer/core/workers/worker_options.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
......@@ -99,38 +98,6 @@ class ThreadPoolObjectProxy final : public ThreadedObjectProxyBase {
HashSet<size_t> cancelled_tasks_;
};
class ThreadPoolThread final : public WorkerThread {
public:
ThreadPoolThread(ExecutionContext* parent_execution_context,
ThreadPoolObjectProxy& object_proxy)
: WorkerThread(object_proxy) {
FrameOrWorkerScheduler* scheduler =
parent_execution_context ? parent_execution_context->GetScheduler()
: nullptr;
worker_backing_thread_ =
WorkerBackingThread::Create(WebThreadCreationParams(GetThreadType())
.SetFrameOrWorkerScheduler(scheduler));
}
~ThreadPoolThread() override = default;
private:
WorkerBackingThread& GetWorkerBackingThread() override {
return *worker_backing_thread_;
}
void ClearWorkerBackingThread() override { worker_backing_thread_ = nullptr; }
WorkerOrWorkletGlobalScope* CreateWorkerGlobalScope(
std::unique_ptr<GlobalScopeCreationParams> creation_params) override {
return new ThreadPoolWorkerGlobalScope(std::move(creation_params), this);
}
WebThreadType GetThreadType() const override {
// TODO(japhet): Replace with WebThreadType::kThreadPoolWorkerThread.
return WebThreadType::kDedicatedWorkerThread;
}
std::unique_ptr<WorkerBackingThread> worker_backing_thread_;
};
class ThreadPoolMessagingProxy final : public ThreadedMessagingProxyBase {
public:
ThreadPoolMessagingProxy(ExecutionContext* context, ThreadPool* thread_pool)
......@@ -185,11 +152,32 @@ class ThreadPoolMessagingProxy final : public ThreadedMessagingProxyBase {
ThreadedMessagingProxyBase::Trace(visitor);
}
ThreadPoolThread* GetWorkerThread() const {
return static_cast<ThreadPoolThread*>(
ThreadedMessagingProxyBase::GetWorkerThread());
}
private:
std::unique_ptr<ThreadPoolObjectProxy> object_proxy_;
Member<ThreadPool> thread_pool_;
WeakMember<ThreadPool> thread_pool_;
};
ThreadPoolThread::ThreadPoolThread(ExecutionContext* parent_execution_context,
ThreadPoolObjectProxy& object_proxy)
: WorkerThread(object_proxy) {
FrameOrWorkerScheduler* scheduler =
parent_execution_context ? parent_execution_context->GetScheduler()
: nullptr;
worker_backing_thread_ =
WorkerBackingThread::Create(WebThreadCreationParams(GetThreadType())
.SetFrameOrWorkerScheduler(scheduler));
}
WorkerOrWorkletGlobalScope* ThreadPoolThread::CreateWorkerGlobalScope(
std::unique_ptr<GlobalScopeCreationParams> creation_params) {
return new ThreadPoolWorkerGlobalScope(std::move(creation_params), this);
}
void ThreadPoolObjectProxy::ProcessTask(
scoped_refptr<SerializedScriptValue> task,
Vector<scoped_refptr<SerializedScriptValue>> arguments,
......@@ -282,27 +270,26 @@ static const size_t kProxyCount = 2;
ThreadPool::ThreadPool(Document& document)
: Supplement<Document>(document),
document_(document),
ContextLifecycleObserver(&document),
context_proxies_(kProxyCount) {}
ThreadPoolMessagingProxy* ThreadPool::GetProxyForTaskType(TaskType task_type) {
DCHECK(document_->IsContextThread());
size_t proxy_id = kProxyCount;
if (task_type == TaskType::kUserInteraction)
proxy_id = 0u;
else if (task_type == TaskType::kIdleTask)
proxy_id = 1u;
DCHECK_LT(proxy_id, kProxyCount);
ThreadPool::~ThreadPool() {
for (size_t i = 0; i < kProxyCount; i++) {
if (context_proxies_[i])
context_proxies_[i]->ParentObjectDestroyed();
}
}
if (!context_proxies_[proxy_id]) {
void ThreadPool::CreateProxyAtId(size_t proxy_id) {
DCHECK(!context_proxies_[proxy_id]);
base::UnguessableToken devtools_worker_token =
document_->GetFrame() ? document_->GetFrame()->GetDevToolsFrameToken()
GetFrame() ? GetFrame()->GetDevToolsFrameToken()
: base::UnguessableToken::Create();
ExecutionContext* context = document_.Get();
ExecutionContext* context = GetExecutionContext();
context_proxies_[proxy_id] = new ThreadPoolMessagingProxy(context, this);
std::unique_ptr<WorkerSettings> settings =
std::make_unique<WorkerSettings>(document_->GetSettings());
std::make_unique<WorkerSettings>(GetFrame()->GetSettings());
context_proxies_[proxy_id]->StartWorker(
std::make_unique<GlobalScopeCreationParams>(
......@@ -310,14 +297,25 @@ ThreadPoolMessagingProxy* ThreadPool::GetProxyForTaskType(TaskType task_type) {
context->GetContentSecurityPolicy()->Headers(),
kReferrerPolicyDefault, context->GetSecurityOrigin(),
context->IsSecureContext(), context->GetHttpsState(),
WorkerClients::Create(),
context->GetSecurityContext().AddressSpace(),
WorkerClients::Create(), context->GetSecurityContext().AddressSpace(),
OriginTrialContext::GetTokens(context).get(), devtools_worker_token,
std::move(settings), kV8CacheOptionsDefault,
nullptr /* worklet_module_responses_map */,
ConnectToWorkerInterfaceProviderForThreadPool(
context, context->GetSecurityOrigin())));
}
}
ThreadPoolMessagingProxy* ThreadPool::GetProxyForTaskType(TaskType task_type) {
DCHECK(GetExecutionContext()->IsContextThread());
size_t proxy_id = kProxyCount;
if (task_type == TaskType::kUserInteraction)
proxy_id = 0u;
else if (task_type == TaskType::kIdleTask)
proxy_id = 1u;
DCHECK_LT(proxy_id, kProxyCount);
if (!context_proxies_[proxy_id])
CreateProxyAtId(proxy_id);
return context_proxies_[proxy_id];
}
......@@ -327,10 +325,11 @@ void ThreadPool::PostTask(
AbortSignal* signal,
const Vector<scoped_refptr<SerializedScriptValue>>& arguments,
TaskType task_type) {
DCHECK(document_->IsContextThread());
DCHECK(GetExecutionContext()->IsContextThread());
GetProxyForTaskType(task_type)->PostTaskToWorkerGlobalScope(
document_.Get(), std::move(task), arguments, next_task_id_, task_type);
GetExecutionContext(), std::move(task), arguments, next_task_id_,
task_type);
resolvers_.insert(next_task_id_, resolver);
if (signal) {
signal->AddAlgorithm(WTF::Bind(&ThreadPool::AbortTask, WrapPersistent(this),
......@@ -339,15 +338,47 @@ void ThreadPool::PostTask(
next_task_id_++;
}
ThreadPoolThread* ThreadPool::GetLeastBusyThread() {
size_t i = 0;
ThreadPoolThread* least_busy_thread = nullptr;
for (; i < kProxyCount && context_proxies_[i]; i++) {
ThreadPoolThread* current_thread = context_proxies_[i]->GetWorkerThread();
if (!least_busy_thread ||
current_thread->GetTasksInProgressCount() <
least_busy_thread->GetTasksInProgressCount()) {
least_busy_thread = current_thread;
}
}
// If there's an idle proxy or we're already at max proxies,
// use the least busy proxy.
if ((least_busy_thread &&
least_busy_thread->GetTasksInProgressCount() == 0) ||
i == kProxyCount) {
return least_busy_thread;
}
// Otherwise, create a new one.
CreateProxyAtId(i);
return context_proxies_[i]->GetWorkerThread();
}
void ThreadPool::ContextDestroyed(ExecutionContext*) {
DCHECK(GetExecutionContext()->IsContextThread());
for (size_t i = 0; i < kProxyCount; i++) {
if (context_proxies_[i])
context_proxies_[i]->TerminateGlobalScope();
}
}
void ThreadPool::AbortTask(size_t task_id, TaskType task_type) {
DCHECK(document_->IsContextThread());
DCHECK(GetExecutionContext()->IsContextThread());
GetProxyForTaskType(task_type)->PostAbortToWorkerGlobalScope(task_id);
}
void ThreadPool::TaskCompleted(size_t task_id,
bool was_rejected,
scoped_refptr<SerializedScriptValue> result) {
DCHECK(document_->IsContextThread());
DCHECK(GetExecutionContext()->IsContextThread());
DCHECK(resolvers_.Contains(task_id));
DCHECK(result);
ScriptPromiseResolver* resolver = resolvers_.Take(task_id);
......@@ -363,7 +394,7 @@ void ThreadPool::TaskCompleted(size_t task_id,
void ThreadPool::Trace(blink::Visitor* visitor) {
Supplement<Document>::Trace(visitor);
visitor->Trace(document_);
ContextLifecycleObserver::Trace(visitor);
visitor->Trace(context_proxies_);
visitor->Trace(resolvers_);
}
......
......@@ -6,6 +6,8 @@
#define THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_THREAD_POOL_H_
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/core/workers/worker_backing_thread.h"
#include "third_party/blink/renderer/core/workers/worker_thread.h"
#include "third_party/blink/renderer/platform/supplementable.h"
namespace blink {
......@@ -14,15 +16,56 @@ class AbortSignal;
class Document;
class SerializedScriptValue;
class ThreadPoolMessagingProxy;
class ThreadPoolObjectProxy;
class WorkerThread;
class ThreadPool final : public GarbageCollected<ThreadPool>,
public Supplement<Document> {
class ThreadPoolThread final : public WorkerThread {
public:
ThreadPoolThread(ExecutionContext*, ThreadPoolObjectProxy&);
~ThreadPoolThread() override = default;
void IncrementTasksInProgressCount() {
DCHECK(IsMainThread());
tasks_in_progress_++;
}
void DecrementTasksInProgressCount() {
DCHECK(IsMainThread());
DCHECK_GT(tasks_in_progress_, 0u);
tasks_in_progress_--;
}
size_t GetTasksInProgressCount() const {
DCHECK(IsMainThread());
return tasks_in_progress_;
}
private:
WorkerBackingThread& GetWorkerBackingThread() override {
return *worker_backing_thread_;
}
void ClearWorkerBackingThread() override { worker_backing_thread_ = nullptr; }
WorkerOrWorkletGlobalScope* CreateWorkerGlobalScope(
std::unique_ptr<GlobalScopeCreationParams> creation_params) override;
WebThreadType GetThreadType() const override {
// TODO(japhet): Replace with WebThreadType::kThreadPoolWorkerThread.
return WebThreadType::kDedicatedWorkerThread;
}
std::unique_ptr<WorkerBackingThread> worker_backing_thread_;
size_t tasks_in_progress_ = 0;
};
class ThreadPool final : public GarbageCollectedFinalized<ThreadPool>,
public Supplement<Document>,
public ContextLifecycleObserver {
USING_GARBAGE_COLLECTED_MIXIN(ThreadPool);
EAGERLY_FINALIZE();
public:
static const char kSupplementName[];
static ThreadPool* From(Document&);
~ThreadPool();
void PostTask(scoped_refptr<SerializedScriptValue> task,
ScriptPromiseResolver*,
......@@ -30,20 +73,24 @@ class ThreadPool final : public GarbageCollected<ThreadPool>,
const Vector<scoped_refptr<SerializedScriptValue>>& arguments,
TaskType);
ThreadPoolThread* GetLeastBusyThread();
void ContextDestroyed(ExecutionContext*) override;
void Trace(blink::Visitor*) final;
private:
ThreadPool(Document&);
~ThreadPool() = default;
friend ThreadPoolMessagingProxy;
ThreadPoolMessagingProxy* GetProxyForTaskType(TaskType);
void CreateProxyAtId(size_t proxy_id);
void TaskCompleted(size_t task_id,
bool was_rejected,
scoped_refptr<SerializedScriptValue> result);
void AbortTask(size_t task_id, TaskType task_type);
Member<Document> document_;
HeapVector<Member<ThreadPoolMessagingProxy>> context_proxies_;
size_t next_task_id_ = 1;
HeapHashMap<int, Member<ScriptPromiseResolver>> resolvers_;
......
......@@ -6,6 +6,7 @@
#include "third_party/blink/renderer/bindings/core/v8/serialization/serialized_script_value.h"
#include "third_party/blink/renderer/core/dom/document.h"
#include "third_party/blink/renderer/core/workers/experimental/task.h"
#include "third_party/blink/renderer/core/workers/experimental/thread_pool.h"
namespace blink {
......@@ -36,7 +37,8 @@ WorkerTaskQueue* WorkerTaskQueue::Create(ExecutionContext* context,
WorkerTaskQueue::WorkerTaskQueue(Document* document, TaskType task_type)
: document_(document), task_type_(task_type) {}
ScriptPromise WorkerTaskQueue::postTask(ScriptState* script_state,
ScriptPromise WorkerTaskQueue::postFunction(
ScriptState* script_state,
const ScriptValue& task,
AbortSignal* signal,
const Vector<ScriptValue>& arguments) {
......@@ -71,6 +73,18 @@ ScriptPromise WorkerTaskQueue::postTask(ScriptState* script_state,
return resolver->Promise();
}
Task* WorkerTaskQueue::postTask(ScriptState* script_state,
const ScriptValue& function,
const Vector<ScriptValue>& arguments) {
DCHECK(document_->IsContextThread());
DCHECK(function.IsFunction());
ThreadPoolTask* thread_pool_task =
new ThreadPoolTask(ThreadPool::From(*document_),
script_state->GetIsolate(), function, arguments);
return new Task(thread_pool_task);
}
void WorkerTaskQueue::Trace(blink::Visitor* visitor) {
ScriptWrappable::Trace(visitor);
visitor->Trace(document_);
......
......@@ -16,6 +16,9 @@ class AbortSignal;
class Document;
class ExceptionState;
class ExecutionContext;
class ScriptState;
class ScriptValue;
class Task;
class CORE_EXPORT WorkerTaskQueue : public ScriptWrappable {
DEFINE_WRAPPERTYPEINFO();
......@@ -26,11 +29,15 @@ class CORE_EXPORT WorkerTaskQueue : public ScriptWrappable {
ExceptionState&);
~WorkerTaskQueue() override = default;
ScriptPromise postTask(ScriptState*,
ScriptPromise postFunction(ScriptState*,
const ScriptValue& task,
AbortSignal*,
const Vector<ScriptValue>& arguments);
Task* postTask(ScriptState*,
const ScriptValue& task,
const Vector<ScriptValue>& arguments);
void Trace(blink::Visitor*) override;
private:
......
......@@ -7,8 +7,11 @@ enum TaskQueueType { "user-interaction", "background" };
[
Constructor(TaskQueueType queue_type),
ConstructorCallWith=ExecutionContext,
Exposed=Window,
RaisesException=Constructor,
RuntimeEnabled=WorkerTaskQueue
RuntimeEnabled=WorkerTaskQueue,
SecureContext
] interface WorkerTaskQueue {
[CallWith=ScriptState] Promise<any> postTask(CallbackFunctionTreatedAsScriptValue task, optional AbortSignal signal = null, any... arguments);
[CallWith=ScriptState] Promise<any> postFunction(CallbackFunctionTreatedAsScriptValue task, optional AbortSignal signal = null, any... arguments);
[CallWith=ScriptState] Task postTask(CallbackFunctionTreatedAsScriptValue task, any... arguments);
};
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment