Commit 1bfac224 authored by Nate Chapin's avatar Nate Chapin Committed by Commit Bot

WorkerTaskQueue, a highly-speculative experimental worker task scheduling API


Bug: 879306
Change-Id: Ia1458ab7275bbcfeeb24e358b5702cb5d5e57c2e
Reviewed-on: https://chromium-review.googlesource.com/1196025
Commit-Queue: Nate Chapin <japhet@chromium.org>
Reviewed-by: default avatarHiroki Nakagawa <nhiroki@chromium.org>
Cr-Commit-Position: refs/heads/master@{#587820}
parent ccc9362f
Posts one hundred tasks that return 1...100, and sums them.
5050
<body>
Posts one hundred tasks that return 1...100, and sums them.<br>
<script>
if (window.testRunner) {
testRunner.dumpAsText();
testRunner.waitUntilDone();
}
var queue = new WorkerTaskQueue("user-interaction");
var sum = 0;
var done = 0;
function maybeFinish() {
done++;
if (done == 100) {
document.body.appendChild(document.createTextNode(sum));
document.body.appendChild(document.createElement("br"));
if (window.testRunner)
testRunner.notifyDone();
}
};
for (var i = 1; i < 101; i++) {
queue.postTask(j => j, undefined, i)
.then(result => sum += result, maybeFinish)
.then(maybeFinish);
}
</script>
</body>
Schedules several dozen tasks, then cancels all but the last.
Fibonnaci #10 is 55
<body>
Schedules several dozen tasks, then cancels all but the last.<br>
<script>
if (window.testRunner) {
testRunner.dumpAsText();
testRunner.waitUntilDone();
}
var queue = new WorkerTaskQueue("background");
var stop = 11;
var next = 0;
function print(result) {
document.body.appendChild(document.createTextNode("Fibonnaci #" + next++ + " is " +result));
document.body.appendChild(document.createElement("br"));
if (next == stop) {
if (window.testRunner)
testRunner.notifyDone();
}
};
function fail() {
next++;
}
function fib(n) {
if (n == 0 || n == 1)
return n;
return fib(n - 1) + fib(n - 2);
}
const controller = new AbortController();
const signal = controller.signal;
for (var i = 0; i < stop; i++) {
queue.postTask(fib, i < stop - 1 ? signal : undefined, i)
.then(print, fail);
}
controller.abort();
</script>
</body>
...@@ -9972,6 +9972,10 @@ interface Worker : EventTarget ...@@ -9972,6 +9972,10 @@ interface Worker : EventTarget
method terminate method terminate
setter onerror setter onerror
setter onmessage setter onmessage
interface WorkerTaskQueue
attribute @@toStringTag
method constructor
method postTask
interface Worklet interface Worklet
attribute @@toStringTag attribute @@toStringTag
method addModule method addModule
......
...@@ -447,6 +447,7 @@ core_idl_files = ...@@ -447,6 +447,7 @@ core_idl_files =
"typed_arrays/uint8_array.idl", "typed_arrays/uint8_array.idl",
"typed_arrays/uint8_clamped_array.idl", "typed_arrays/uint8_clamped_array.idl",
"url/url_search_params.idl", "url/url_search_params.idl",
"workers/experimental/worker_task_queue.idl",
"workers/shared_worker.idl", "workers/shared_worker.idl",
"workers/worker.idl", "workers/worker.idl",
"workers/worker_location.idl", "workers/worker_location.idl",
......
...@@ -20,6 +20,10 @@ blink_core_sources("workers") { ...@@ -20,6 +20,10 @@ blink_core_sources("workers") {
"dedicated_worker_thread.h", "dedicated_worker_thread.h",
"execution_context_worker_registry.cc", "execution_context_worker_registry.cc",
"execution_context_worker_registry.h", "execution_context_worker_registry.h",
"experimental/thread_pool.cc",
"experimental/thread_pool.h",
"experimental/worker_task_queue.cc",
"experimental/worker_task_queue.h",
"global_scope_creation_params.cc", "global_scope_creation_params.cc",
"global_scope_creation_params.h", "global_scope_creation_params.h",
"installed_scripts_manager.cc", "installed_scripts_manager.cc",
......
japhet@chromium.org
# TEAM: worker-dev@chromium.org
# COMPONENT: Blink>Workers
This directory contains an experimental API for farming tasks out to a pool of worker threads. API is still highly in flux, and is not anywhere near ready to ship.
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/core/workers/experimental/thread_pool.h"
#include "services/service_manager/public/cpp/interface_provider.h"
#include "third_party/blink/public/platform/dedicated_worker_factory.mojom-blink.h"
#include "third_party/blink/renderer/bindings/core/v8/serialization/serialized_script_value.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_binding_for_core.h"
#include "third_party/blink/renderer/bindings/core/v8/worker_or_worklet_script_controller.h"
#include "third_party/blink/renderer/core/dom/abort_signal.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
#include "third_party/blink/renderer/core/inspector/main_thread_debugger.h"
#include "third_party/blink/renderer/core/origin_trials/origin_trial_context.h"
#include "third_party/blink/renderer/core/probe/core_probes.h"
#include "third_party/blink/renderer/core/script/fetch_client_settings_object_snapshot.h"
#include "third_party/blink/renderer/core/workers/dedicated_worker_messaging_proxy.h"
#include "third_party/blink/renderer/core/workers/threaded_messaging_proxy_base.h"
#include "third_party/blink/renderer/core/workers/threaded_object_proxy_base.h"
#include "third_party/blink/renderer/core/workers/worker_backing_thread.h"
#include "third_party/blink/renderer/core/workers/worker_global_scope.h"
#include "third_party/blink/renderer/core/workers/worker_options.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/cross_thread_functional.h"
#include "third_party/blink/renderer/platform/scheduler/worker/worker_thread_scheduler.h"
namespace blink {
template <wtf_size_t inlineCapacity, typename Allocator>
struct CrossThreadCopier<
Vector<scoped_refptr<SerializedScriptValue>, inlineCapacity, Allocator>> {
STATIC_ONLY(CrossThreadCopier);
using Type =
Vector<scoped_refptr<SerializedScriptValue>, inlineCapacity, Allocator>;
static Type Copy(Type pointer) {
return pointer; // This is in fact a move.
}
};
class ThreadPoolWorkerGlobalScope final : public WorkerGlobalScope {
public:
ThreadPoolWorkerGlobalScope(
std::unique_ptr<GlobalScopeCreationParams> creation_params,
WorkerThread* thread)
: WorkerGlobalScope(std::move(creation_params),
thread,
CurrentTimeTicks()) {}
~ThreadPoolWorkerGlobalScope() override = default;
// EventTarget
const AtomicString& InterfaceName() const override {
// TODO(japhet): Replaces this with
// EventTargetNames::ThreadPoolWorkerGlobalScope.
return EventTargetNames::DedicatedWorkerGlobalScope;
}
// WorkerGlobalScope
void ImportModuleScript(
const KURL& module_url_record,
FetchClientSettingsObjectSnapshot* outside_settings_object,
network::mojom::FetchCredentialsMode) override {
// TODO(japhet): Consider whether modules should be supported.
NOTREACHED();
}
void ExceptionThrown(ErrorEvent*) override {}
};
class ThreadPoolObjectProxy final : public ThreadedObjectProxyBase {
public:
ThreadPoolObjectProxy(ThreadPoolMessagingProxy* messaging_proxy,
ParentExecutionContextTaskRunners* task_runners)
: ThreadedObjectProxyBase(task_runners),
messaging_proxy_(messaging_proxy) {}
~ThreadPoolObjectProxy() override = default;
void DidCreateWorkerGlobalScope(
WorkerOrWorkletGlobalScope* global_scope) override {
global_scope_ = static_cast<ThreadPoolWorkerGlobalScope*>(global_scope);
}
void WillDestroyWorkerGlobalScope() override { global_scope_ = nullptr; }
CrossThreadWeakPersistent<ThreadedMessagingProxyBase> MessagingProxyWeakPtr()
override {
return messaging_proxy_;
}
void ProcessTask(scoped_refptr<SerializedScriptValue> task,
Vector<scoped_refptr<SerializedScriptValue>> arguments,
size_t task_id,
const v8_inspector::V8StackTraceId&);
void AbortTask(size_t task_id) { cancelled_tasks_.insert(task_id); }
private:
CrossThreadWeakPersistent<ThreadPoolMessagingProxy> messaging_proxy_;
CrossThreadPersistent<ThreadPoolWorkerGlobalScope> global_scope_;
HashSet<size_t> cancelled_tasks_;
};
class ThreadPoolThread final : public WorkerThread {
public:
ThreadPoolThread(ExecutionContext* parent_execution_context,
ThreadPoolObjectProxy& object_proxy)
: WorkerThread(object_proxy) {
FrameOrWorkerScheduler* scheduler =
parent_execution_context ? parent_execution_context->GetScheduler()
: nullptr;
worker_backing_thread_ =
WorkerBackingThread::Create(WebThreadCreationParams(GetThreadType())
.SetFrameOrWorkerScheduler(scheduler));
}
~ThreadPoolThread() override = default;
private:
WorkerBackingThread& GetWorkerBackingThread() override {
return *worker_backing_thread_;
}
void ClearWorkerBackingThread() override { worker_backing_thread_ = nullptr; }
WorkerOrWorkletGlobalScope* CreateWorkerGlobalScope(
std::unique_ptr<GlobalScopeCreationParams> creation_params) override {
return new ThreadPoolWorkerGlobalScope(std::move(creation_params), this);
}
WebThreadType GetThreadType() const override {
// TODO(japhet): Replace with WebThreadType::kThreadPoolWorkerThread.
return WebThreadType::kDedicatedWorkerThread;
}
std::unique_ptr<WorkerBackingThread> worker_backing_thread_;
};
class ThreadPoolMessagingProxy final : public ThreadedMessagingProxyBase {
public:
ThreadPoolMessagingProxy(ExecutionContext* context, ThreadPool* thread_pool)
: ThreadedMessagingProxyBase(context), thread_pool_(thread_pool) {
object_proxy_ = std::make_unique<ThreadPoolObjectProxy>(
this, GetParentExecutionContextTaskRunners());
}
~ThreadPoolMessagingProxy() override = default;
void StartWorker(std::unique_ptr<GlobalScopeCreationParams> creation_params) {
InitializeWorkerThread(std::move(creation_params),
WorkerBackingThreadStartupData::CreateDefault());
}
std::unique_ptr<WorkerThread> CreateWorkerThread() override {
return std::make_unique<ThreadPoolThread>(GetExecutionContext(),
*object_proxy_.get());
}
void PostTaskToWorkerGlobalScope(
ExecutionContext* context,
scoped_refptr<SerializedScriptValue> task,
const Vector<scoped_refptr<SerializedScriptValue>>& arguments,
size_t task_id,
TaskType task_type) {
v8_inspector::V8StackTraceId stack_id =
ThreadDebugger::From(ToIsolate(context))
->StoreCurrentStackTrace("ThreadPool.postTask");
PostCrossThreadTask(
*GetWorkerThread()->GetTaskRunner(task_type), FROM_HERE,
CrossThreadBind(&ThreadPoolObjectProxy::ProcessTask,
CrossThreadUnretained(object_proxy_.get()),
std::move(task), std::move(arguments), task_id,
stack_id));
}
void PostAbortToWorkerGlobalScope(size_t task_id) {
PostCrossThreadTask(
*GetWorkerThread()->GetControlTaskRunner(), FROM_HERE,
CrossThreadBind(&ThreadPoolObjectProxy::AbortTask,
CrossThreadUnretained(object_proxy_.get()), task_id));
}
void TaskCompleted(size_t task_id,
scoped_refptr<SerializedScriptValue> result) {
if (thread_pool_)
thread_pool_->TaskCompleted(task_id, std::move(result));
}
void Trace(blink::Visitor* visitor) override {
visitor->Trace(thread_pool_);
ThreadedMessagingProxyBase::Trace(visitor);
}
private:
std::unique_ptr<ThreadPoolObjectProxy> object_proxy_;
Member<ThreadPool> thread_pool_;
};
void ThreadPoolObjectProxy::ProcessTask(
scoped_refptr<SerializedScriptValue> task,
Vector<scoped_refptr<SerializedScriptValue>> arguments,
size_t task_id,
const v8_inspector::V8StackTraceId& stack_id) {
DCHECK(global_scope_->IsContextThread());
if (cancelled_tasks_.Contains(task_id)) {
cancelled_tasks_.erase(task_id);
PostCrossThreadTask(
*GetParentExecutionContextTaskRunners()->Get(TaskType::kPostedMessage),
FROM_HERE,
CrossThreadBind(&ThreadPoolMessagingProxy::TaskCompleted,
messaging_proxy_, task_id, nullptr));
return;
}
v8::Isolate* isolate = ToIsolate(global_scope_.Get());
ThreadDebugger* debugger = ThreadDebugger::From(isolate);
debugger->ExternalAsyncTaskStarted(stack_id);
ScriptState::Scope scope(global_scope_->ScriptController()->GetScriptState());
v8::Local<v8::Context> context = isolate->GetCurrentContext();
String core_script =
"(" + ToCoreString(task->Deserialize(isolate).As<v8::String>()) + ")";
v8::MaybeLocal<v8::Script> script = v8::Script::Compile(
isolate->GetCurrentContext(), V8String(isolate, core_script));
v8::Local<v8::Function> script_function =
script.ToLocalChecked()->Run(context).ToLocalChecked().As<v8::Function>();
Vector<v8::Local<v8::Value>> params(arguments.size());
for (size_t i = 0; i < arguments.size(); i++) {
params[i] = arguments[i]->Deserialize(isolate);
}
v8::MaybeLocal<v8::Value> ret = script_function->Call(
context, script_function, params.size(), params.data());
scoped_refptr<SerializedScriptValue> result =
ret.IsEmpty() ? nullptr
: SerializedScriptValue::SerializeAndSwallowExceptions(
isolate, ret.ToLocalChecked());
debugger->ExternalAsyncTaskFinished(stack_id);
// TODO(japhet): Is it ok to always send the completion notification back on
// the same task queue, or should this be the task type sent to the worker?
PostCrossThreadTask(
*GetParentExecutionContextTaskRunners()->Get(TaskType::kPostedMessage),
FROM_HERE,
CrossThreadBind(&ThreadPoolMessagingProxy::TaskCompleted,
messaging_proxy_, task_id, std::move(result)));
}
service_manager::mojom::blink::InterfaceProviderPtrInfo
ConnectToWorkerInterfaceProvider(
ExecutionContext* execution_context,
scoped_refptr<const SecurityOrigin> script_origin) {
// TODO(japhet): Implement a proper factory.
mojom::blink::DedicatedWorkerFactoryPtr worker_factory;
execution_context->GetInterfaceProvider()->GetInterface(&worker_factory);
service_manager::mojom::blink::InterfaceProviderPtrInfo
interface_provider_ptr;
worker_factory->CreateDedicatedWorker(
script_origin, mojo::MakeRequest(&interface_provider_ptr));
return interface_provider_ptr;
}
const char ThreadPool::kSupplementName[] = "ThreadPool";
ThreadPool* ThreadPool::From(Document& document) {
ThreadPool* thread_pool = Supplement<Document>::From<ThreadPool>(document);
if (!thread_pool) {
thread_pool = new ThreadPool(document);
Supplement<Document>::ProvideTo(document, thread_pool);
}
return thread_pool;
}
static const size_t kProxyCount = 2;
ThreadPool::ThreadPool(Document& document)
: Supplement<Document>(document),
document_(document),
context_proxies_(kProxyCount) {}
ThreadPoolMessagingProxy* ThreadPool::GetProxyForTaskType(TaskType task_type) {
DCHECK(document_->IsContextThread());
size_t proxy_id = kProxyCount;
if (task_type == TaskType::kUserInteraction)
proxy_id = 0u;
else if (task_type == TaskType::kIdleTask)
proxy_id = 1u;
DCHECK_LT(proxy_id, kProxyCount);
if (!context_proxies_[proxy_id]) {
base::UnguessableToken devtools_worker_token =
document_->GetFrame() ? document_->GetFrame()->GetDevToolsFrameToken()
: base::UnguessableToken::Create();
ExecutionContext* context = document_.Get();
context_proxies_[proxy_id] = new ThreadPoolMessagingProxy(context, this);
std::unique_ptr<WorkerSettings> settings =
std::make_unique<WorkerSettings>(document_->GetSettings());
context_proxies_[proxy_id]->StartWorker(
std::make_unique<GlobalScopeCreationParams>(
context->Url(), ScriptType::kClassic, context->UserAgent(),
context->GetContentSecurityPolicy()->Headers(),
kReferrerPolicyDefault, context->GetSecurityOrigin(),
context->IsSecureContext(), WorkerClients::Create(),
context->GetSecurityContext().AddressSpace(),
OriginTrialContext::GetTokens(context).get(), devtools_worker_token,
std::move(settings), kV8CacheOptionsDefault,
nullptr /* worklet_module_responses_map */,
ConnectToWorkerInterfaceProvider(context,
context->GetSecurityOrigin())));
}
return context_proxies_[proxy_id];
}
void ThreadPool::PostTask(
scoped_refptr<SerializedScriptValue> task,
ScriptPromiseResolver* resolver,
AbortSignal* signal,
const Vector<scoped_refptr<SerializedScriptValue>>& arguments,
TaskType task_type) {
DCHECK(document_->IsContextThread());
GetProxyForTaskType(task_type)->PostTaskToWorkerGlobalScope(
document_.Get(), std::move(task), arguments, next_task_id_, task_type);
resolvers_.insert(next_task_id_, resolver);
if (signal) {
signal->AddAlgorithm(WTF::Bind(&ThreadPool::AbortTask, WrapPersistent(this),
next_task_id_, task_type));
}
next_task_id_++;
}
void ThreadPool::AbortTask(size_t task_id, TaskType task_type) {
DCHECK(document_->IsContextThread());
GetProxyForTaskType(task_type)->PostAbortToWorkerGlobalScope(task_id);
}
void ThreadPool::TaskCompleted(size_t task_id,
scoped_refptr<SerializedScriptValue> result) {
DCHECK(document_->IsContextThread());
DCHECK(resolvers_.Contains(task_id));
ScriptPromiseResolver* resolver = resolvers_.Take(task_id);
if (!result) {
resolver->Reject();
return;
}
ScriptState::Scope scope(resolver->GetScriptState());
resolver->Resolve(
result->Deserialize(resolver->GetScriptState()->GetIsolate()));
}
void ThreadPool::Trace(blink::Visitor* visitor) {
Supplement<Document>::Trace(visitor);
visitor->Trace(document_);
visitor->Trace(context_proxies_);
visitor->Trace(resolvers_);
}
} // namespace blink
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_THREAD_POOL_H_
#define THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_THREAD_POOL_H_
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/platform/supplementable.h"
namespace blink {
class AbortSignal;
class Document;
class SerializedScriptValue;
class ThreadPoolMessagingProxy;
class ThreadPool final : public GarbageCollected<ThreadPool>,
public Supplement<Document> {
USING_GARBAGE_COLLECTED_MIXIN(ThreadPool);
public:
static const char kSupplementName[];
static ThreadPool* From(Document&);
void PostTask(scoped_refptr<SerializedScriptValue> task,
ScriptPromiseResolver*,
AbortSignal*,
const Vector<scoped_refptr<SerializedScriptValue>>& arguments,
TaskType);
void Trace(blink::Visitor*) final;
private:
ThreadPool(Document&);
~ThreadPool() = default;
friend ThreadPoolMessagingProxy;
ThreadPoolMessagingProxy* GetProxyForTaskType(TaskType);
void TaskCompleted(size_t task_id, scoped_refptr<SerializedScriptValue>);
void AbortTask(size_t task_id, TaskType task_type);
Member<Document> document_;
HeapVector<Member<ThreadPoolMessagingProxy>> context_proxies_;
size_t next_task_id_ = 1;
HeapHashMap<int, Member<ScriptPromiseResolver>> resolvers_;
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_THREAD_POOL_H_
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/core/workers/experimental/worker_task_queue.h"
#include "third_party/blink/renderer/bindings/core/v8/serialization/serialized_script_value.h"
#include "third_party/blink/renderer/core/dom/document.h"
#include "third_party/blink/renderer/core/workers/experimental/thread_pool.h"
namespace blink {
WorkerTaskQueue* WorkerTaskQueue::Create(ExecutionContext* context,
const String& type,
ExceptionState& exception_state) {
if (context->IsContextDestroyed()) {
exception_state.ThrowDOMException(DOMExceptionCode::kInvalidAccessError,
"The context provided is invalid.");
return nullptr;
}
if (!context->IsDocument()) {
exception_state.ThrowDOMException(
DOMExceptionCode::kInvalidAccessError,
"WorkerTaskQueue can only be constructed from a document.");
return nullptr;
}
DCHECK(type == "user-interaction" || type == "background");
TaskType task_type = type == "user-interaction" ? TaskType::kUserInteraction
: TaskType::kIdleTask;
return new WorkerTaskQueue(ToDocument(context), task_type);
}
WorkerTaskQueue::WorkerTaskQueue(Document* document, TaskType task_type)
: document_(document), task_type_(task_type) {}
ScriptPromise WorkerTaskQueue::postTask(ScriptState* script_state,
const ScriptValue& task,
AbortSignal* signal,
const Vector<ScriptValue>& arguments) {
DCHECK(document_->IsContextThread());
DCHECK(task.IsFunction());
ScriptPromiseResolver* resolver = ScriptPromiseResolver::Create(script_state);
scoped_refptr<SerializedScriptValue> serialized_task =
SerializedScriptValue::SerializeAndSwallowExceptions(
script_state->GetIsolate(),
task.V8Value()->ToString(script_state->GetIsolate()));
if (!serialized_task) {
resolver->Reject();
return resolver->Promise();
}
Vector<scoped_refptr<SerializedScriptValue>> serialized_arguments;
serialized_arguments.ReserveInitialCapacity(arguments.size());
for (auto& argument : arguments) {
scoped_refptr<SerializedScriptValue> serialized_argument =
SerializedScriptValue::SerializeAndSwallowExceptions(
script_state->GetIsolate(), argument.V8Value());
if (!serialized_argument) {
resolver->Reject();
return resolver->Promise();
}
serialized_arguments.push_back(serialized_argument);
}
ThreadPool::From(*document_)
->PostTask(std::move(serialized_task), resolver, signal,
std::move(serialized_arguments), task_type_);
return resolver->Promise();
}
void WorkerTaskQueue::Trace(blink::Visitor* visitor) {
ScriptWrappable::Trace(visitor);
visitor->Trace(document_);
}
} // namespace blink
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_WORKER_TASK_QUEUE_H_
#define THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_WORKER_TASK_QUEUE_H_
#include "third_party/blink/public/platform/task_type.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/core/core_export.h"
#include "third_party/blink/renderer/platform/bindings/script_wrappable.h"
namespace blink {
class AbortSignal;
class Document;
class ExceptionState;
class ExecutionContext;
class CORE_EXPORT WorkerTaskQueue : public ScriptWrappable {
DEFINE_WRAPPERTYPEINFO();
public:
static WorkerTaskQueue* Create(ExecutionContext*,
const String&,
ExceptionState&);
~WorkerTaskQueue() override = default;
ScriptPromise postTask(ScriptState*,
const ScriptValue& task,
AbortSignal*,
const Vector<ScriptValue>& arguments);
void Trace(blink::Visitor*) override;
private:
WorkerTaskQueue(Document*, TaskType);
Member<Document> document_;
const TaskType task_type_;
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_WORKER_TASK_QUEUE_H_
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
enum TaskQueueType { "user-interaction", "background" };
[
Constructor(TaskQueueType queue_type),
ConstructorCallWith=ExecutionContext,
RaisesException=Constructor,
RuntimeEnabled=WorkerTaskQueue
] interface WorkerTaskQueue {
[CallWith=ScriptState] Promise<any> postTask(CallbackFunctionTreatedAsScriptValue task, optional AbortSignal signal = null, any... arguments);
};
...@@ -339,6 +339,11 @@ scheduler::WorkerScheduler* WorkerThread::GetScheduler() { ...@@ -339,6 +339,11 @@ scheduler::WorkerScheduler* WorkerThread::GetScheduler() {
return worker_scheduler_.get(); return worker_scheduler_.get();
} }
scoped_refptr<base::SingleThreadTaskRunner>
WorkerThread::GetControlTaskRunner() {
return worker_scheduler_->GetWorkerThreadScheduler()->ControlTaskQueue();
}
void WorkerThread::ChildThreadStartedOnWorkerThread(WorkerThread* child) { void WorkerThread::ChildThreadStartedOnWorkerThread(WorkerThread* child) {
DCHECK(IsCurrentThread()); DCHECK(IsCurrentThread());
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
......
...@@ -206,6 +206,9 @@ class CORE_EXPORT WorkerThread : public WebThread::TaskObserver { ...@@ -206,6 +206,9 @@ class CORE_EXPORT WorkerThread : public WebThread::TaskObserver {
return worker_scheduler_->GetTaskRunner(type); return worker_scheduler_->GetTaskRunner(type);
} }
// TODO(japhet): Hack to support an experimental worker scheduling API.
scoped_refptr<base::SingleThreadTaskRunner> GetControlTaskRunner();
void ChildThreadStartedOnWorkerThread(WorkerThread*); void ChildThreadStartedOnWorkerThread(WorkerThread*);
void ChildThreadTerminatedOnWorkerThread(WorkerThread*); void ChildThreadTerminatedOnWorkerThread(WorkerThread*);
......
...@@ -1378,6 +1378,10 @@ ...@@ -1378,6 +1378,10 @@
status: "test", status: "test",
implied_by: ["WorkerNosniffBlock"], implied_by: ["WorkerNosniffBlock"],
}, },
{
name: "WorkerTaskQueue",
status: "experimental"
},
{ {
name: "WorkStealingInScriptRunner", name: "WorkStealingInScriptRunner",
status: "experimental", status: "experimental",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment