Commit 602c0515 authored by Nate Chapin's avatar Nate Chapin Committed by Commit Bot

TaskWorklet: Tweak passing results from completed prerequisites

1. When selecting a thread for a task, consider which thread all
   prerequisites were assigned, not just the ones still pending.
2. Store the v8::Value for completed tasks.
3. If a task will run on the same thread as a completed
   prerequisite, hop to the worker thread and pass the
   prerequisite's result directly without a deserialization.
   (2) enables this, and (1) increases the frequency in which
   this optimization can be used.
4. Given that it is no longer guaranteed that the serialized
   result will be needed (i.e., if all dependents run on
   the same thread and task.result is never requested on the
   main thread, only the v8::Value will be used), move to a
   lazy serialization model, where the result is only serialized
   when it is promised via task.result or when a dependent is
   assigned to a different thread.

Bug: 879306
Change-Id: I22fe5f201f22b376861a5fb6d6e881ce0d5785cc
Reviewed-on: https://chromium-review.googlesource.com/c/1311498
Commit-Queue: Nate Chapin <japhet@chromium.org>
Reviewed-by: default avatarHiroki Nakagawa <nhiroki@chromium.org>
Cr-Commit-Position: refs/heads/master@{#610136}
parent 1378f210
...@@ -18,10 +18,10 @@ ...@@ -18,10 +18,10 @@
namespace blink { namespace blink {
// worker_thread_ only. // worker_thread_ only.
class Task::AsyncFunctionCompleted : public ScriptFunction { class TaskBase::AsyncFunctionCompleted : public ScriptFunction {
public: public:
static v8::Local<v8::Function> CreateFunction(ScriptState* script_state, static v8::Local<v8::Function> CreateFunction(ScriptState* script_state,
Task* task, TaskBase* task,
State state) { State state) {
return (new AsyncFunctionCompleted(script_state, task, state)) return (new AsyncFunctionCompleted(script_state, task, state))
->BindToV8Function(); ->BindToV8Function();
...@@ -33,60 +33,37 @@ class Task::AsyncFunctionCompleted : public ScriptFunction { ...@@ -33,60 +33,37 @@ class Task::AsyncFunctionCompleted : public ScriptFunction {
} }
private: private:
AsyncFunctionCompleted(ScriptState* script_state, Task* task, State state) AsyncFunctionCompleted(ScriptState* script_state, TaskBase* task, State state)
: ScriptFunction(script_state), task_(task), state_(state) {} : ScriptFunction(script_state), task_(task), state_(state) {}
CrossThreadPersistent<Task> task_; CrossThreadPersistent<TaskBase> task_;
const State state_; const State state_;
}; };
Task::Task(ThreadPoolThreadProvider* thread_provider, TaskBase::TaskBase(TaskType task_type,
ScriptState* script_state, ScriptState* script_state,
const ScriptValue& function, const ScriptValue& function,
const Vector<ScriptValue>& arguments, const String& function_name)
TaskType task_type)
: Task(thread_provider,
script_state,
function,
String(),
arguments,
task_type) {}
Task::Task(ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType task_type)
: Task(thread_provider,
script_state,
ScriptValue(),
function_name,
arguments,
task_type) {}
Task::Task(ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const ScriptValue& function,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType task_type)
: task_type_(task_type), : task_type_(task_type),
self_keep_alive_(this), self_keep_alive_(this),
resolver_(ScriptPromiseResolver::Create(script_state)), function_name_(function_name.IsolatedCopy()) {
function_name_(function_name.IsolatedCopy()),
arguments_(arguments.size()) {
DCHECK(IsMainThread()); DCHECK(IsMainThread());
DCHECK_EQ(!function.IsEmpty(), function_name.IsNull());
DCHECK(task_type_ == TaskType::kUserInteraction || DCHECK(task_type_ == TaskType::kUserInteraction ||
task_type_ == TaskType::kIdleTask); task_type_ == TaskType::kIdleTask);
v8::Isolate* isolate = script_state->GetIsolate();
// TODO(japhet): Handle serialization failures // TODO(japhet): Handle serialization failures
v8::Isolate* isolate = script_state->GetIsolate();
if (!function.IsEmpty()) { if (!function.IsEmpty()) {
function_ = SerializedScriptValue::SerializeAndSwallowExceptions( function_ = SerializedScriptValue::SerializeAndSwallowExceptions(
isolate, function.V8Value()->ToString(isolate)); isolate, function.V8Value()->ToString(isolate));
} }
}
HeapVector<Member<Task>> prerequisites; void TaskBase::InitializeArgumentsOnMainThread(
ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const Vector<ScriptValue>& arguments) {
v8::Isolate* isolate = script_state->GetIsolate();
arguments_.resize(arguments.size());
HeapVector<Member<TaskBase>> prerequisites;
Vector<size_t> prerequisites_indices; Vector<size_t> prerequisites_indices;
for (size_t i = 0; i < arguments_.size(); i++) { for (size_t i = 0; i < arguments_.size(); i++) {
// Normal case: if the argument isn't a Task, just serialize it. // Normal case: if the argument isn't a Task, just serialize it.
...@@ -96,7 +73,7 @@ Task::Task(ThreadPoolThreadProvider* thread_provider, ...@@ -96,7 +73,7 @@ Task::Task(ThreadPoolThreadProvider* thread_provider,
isolate, arguments[i].V8Value()); isolate, arguments[i].V8Value());
continue; continue;
} }
Task* prerequisite = TaskBase* prerequisite =
ToScriptWrappable(arguments[i].V8Value().As<v8::Object>()) ToScriptWrappable(arguments[i].V8Value().As<v8::Object>())
->ToImpl<Task>(); ->ToImpl<Task>();
prerequisites.push_back(prerequisite); prerequisites.push_back(prerequisite);
...@@ -107,6 +84,7 @@ Task::Task(ThreadPoolThreadProvider* thread_provider, ...@@ -107,6 +84,7 @@ Task::Task(ThreadPoolThreadProvider* thread_provider,
worker_thread_->IncrementTasksInProgressCount(); worker_thread_->IncrementTasksInProgressCount();
if (prerequisites.IsEmpty()) { if (prerequisites.IsEmpty()) {
MutexLocker lock(mutex_);
MaybeStartTask(); MaybeStartTask();
return; return;
} }
...@@ -119,41 +97,33 @@ Task::Task(ThreadPoolThreadProvider* thread_provider, ...@@ -119,41 +97,33 @@ Task::Task(ThreadPoolThreadProvider* thread_provider,
} }
// static // static
ThreadPoolThread* Task::SelectThread( ThreadPoolThread* TaskBase::SelectThread(
const HeapVector<Member<Task>>& prerequisites, const HeapVector<Member<TaskBase>>& prerequisites,
ThreadPoolThreadProvider* thread_provider) { ThreadPoolThreadProvider* thread_provider) {
DCHECK(IsMainThread()); DCHECK(IsMainThread());
HashCountedSet<ThreadPoolThread*> prerequisite_location_counts; HashCountedSet<ThreadPoolThread*> prerequisite_location_counts;
size_t max_prerequisite_location_count = 0; size_t max_prerequisite_location_count = 0;
ThreadPoolThread* max_prerequisite_thread = nullptr; ThreadPoolThread* max_prerequisite_thread = nullptr;
for (Task* prerequisite : prerequisites) { for (TaskBase* prerequisite : prerequisites) {
// For prerequisites that are not yet complete, track which thread the task // Track which thread the prerequisites will run on. Put this task on the
// will run on. Put this task on the thread where the most prerequisites // thread where the most prerequisites reside.
// reside. This is slightly imprecise, because a task may complete before ThreadPoolThread* thread = prerequisite->worker_thread_;
// registering dependent tasks below. prerequisite_location_counts.insert(thread);
if (ThreadPoolThread* thread = prerequisite->GetScheduledThread()) { unsigned thread_count = prerequisite_location_counts.count(thread);
prerequisite_location_counts.insert(thread); if (thread_count > max_prerequisite_location_count) {
unsigned thread_count = prerequisite_location_counts.count(thread); max_prerequisite_location_count = thread_count;
if (thread_count > max_prerequisite_location_count) { max_prerequisite_thread = thread;
max_prerequisite_location_count = thread_count;
max_prerequisite_thread = thread;
}
} }
} }
return max_prerequisite_thread ? max_prerequisite_thread return max_prerequisite_thread ? max_prerequisite_thread
: thread_provider->GetLeastBusyThread(); : thread_provider->GetLeastBusyThread();
} }
ThreadPoolThread* Task::GetScheduledThread() {
DCHECK(IsMainThread());
MutexLocker lock(mutex_);
return HasFinished() ? nullptr : worker_thread_;
}
// Should only be called from constructor. Split out in to a helper because // Should only be called from constructor. Split out in to a helper because
// clang appears to exempt constructors from thread safety analysis. // clang appears to exempt constructors from thread safety analysis.
void Task::RegisterDependencies(const HeapVector<Member<Task>>& prerequisites, void TaskBase::RegisterDependencies(
const Vector<size_t>& prerequisites_indices) { const HeapVector<Member<TaskBase>>& prerequisites,
const Vector<size_t>& prerequisites_indices) {
DCHECK(IsMainThread()); DCHECK(IsMainThread());
{ {
MutexLocker lock(mutex_); MutexLocker lock(mutex_);
...@@ -161,33 +131,27 @@ void Task::RegisterDependencies(const HeapVector<Member<Task>>& prerequisites, ...@@ -161,33 +131,27 @@ void Task::RegisterDependencies(const HeapVector<Member<Task>>& prerequisites,
} }
for (size_t i = 0; i < prerequisites.size(); i++) { for (size_t i = 0; i < prerequisites.size(); i++) {
Task* prerequisite = prerequisites[i]; TaskBase* prerequisite = prerequisites[i];
size_t prerequisite_index = prerequisites_indices[i]; size_t prerequisite_index = prerequisites_indices[i];
scoped_refptr<SerializedScriptValue> result;
State prerequisite_state = State::kPending;
{ {
MutexLocker lock(prerequisite->mutex_); MutexLocker lock(prerequisite->mutex_);
prerequisite_state = prerequisite->state_; if (!prerequisite->HasFinished()) {
if (prerequisite->HasFinished()) {
result = prerequisite->serialized_result_;
} else {
prerequisite->dependents_.emplace_back( prerequisite->dependents_.emplace_back(
new Dependent(this, prerequisite_index)); MakeGarbageCollected<Dependent>(this, prerequisite_index));
continue;
} }
} }
// TODO(japhet): if a prerequisite failed, this task will be cancelled. PostCrossThreadTask(
// Should that throw an exception? *prerequisite->worker_thread_->GetTaskRunner(task_type_), FROM_HERE,
if (prerequisite_state == State::kCompleted || CrossThreadBind(&TaskBase::PassResultToDependentOnWorkerThread,
prerequisite_state == State::kFailed) { WrapCrossThreadPersistent(prerequisite),
PrerequisiteFinished(prerequisite_index, v8::Local<v8::Value>(), result, prerequisite_index, WrapCrossThreadPersistent(this)));
prerequisite_state);
}
} }
} }
Task::~Task() { TaskBase::~TaskBase() {
DCHECK(IsMainThread()); DCHECK(IsMainThread());
DCHECK(HasFinished()); DCHECK(HasFinished());
DCHECK(!function_); DCHECK(!function_);
...@@ -196,43 +160,71 @@ Task::~Task() { ...@@ -196,43 +160,71 @@ Task::~Task() {
DCHECK(dependents_.IsEmpty()); DCHECK(dependents_.IsEmpty());
} }
void Task::PrerequisiteFinished(size_t prerequisite_index, scoped_refptr<SerializedScriptValue> TaskBase::GetSerializedResult() {
v8::Local<v8::Value> v8_result, DCHECK(IsMainThread() || worker_thread_->IsCurrentThread());
scoped_refptr<SerializedScriptValue> result, MutexLocker lock(mutex_);
State prerequisite_state) { DCHECK(HasFinished());
if (!serialized_result_ && worker_thread_->IsCurrentThread()) {
DCHECK(v8_result_);
ScriptState::Scope scope(
worker_thread_->GlobalScope()->ScriptController()->GetScriptState());
v8::Isolate* isolate = ToIsolate(worker_thread_->GlobalScope());
serialized_result_ = SerializedScriptValue::SerializeAndSwallowExceptions(
isolate, v8_result_->GetResult(isolate));
}
return serialized_result_;
}
void TaskBase::PassResultToDependentOnWorkerThread(size_t dependent_index,
TaskBase* dependent) {
DCHECK(worker_thread_->IsCurrentThread());
bool failed = false;
{
MutexLocker lock(mutex_);
DCHECK(HasFinished());
failed = state_ == State::kFailed;
}
// Only serialize if the dependent needs the result on a different thread.
// Otherwise, use the unserialized result from v8.
scoped_refptr<SerializedScriptValue> serialized_result =
dependent->IsTargetThreadForArguments() ? nullptr : GetSerializedResult();
V8ResultHolder* v8_result =
dependent->IsTargetThreadForArguments() ? v8_result_.Get() : nullptr;
dependent->PrerequisiteFinished(dependent_index, v8_result, serialized_result,
failed);
}
void TaskBase::PrerequisiteFinished(
size_t index,
V8ResultHolder* v8_result,
scoped_refptr<SerializedScriptValue> serialized_result,
bool failed) {
DCHECK(v8_result || serialized_result);
DCHECK(!v8_result || !serialized_result);
MutexLocker lock(mutex_); MutexLocker lock(mutex_);
DCHECK(state_ == State::kPending || state_ == State::kCancelPending); DCHECK(state_ == State::kPending || state_ == State::kCancelPending);
DCHECK(prerequisite_state == State::kCompleted ||
prerequisite_state == State::kFailed);
DCHECK_GT(prerequisites_remaining_, 0u); DCHECK_GT(prerequisites_remaining_, 0u);
prerequisites_remaining_--; prerequisites_remaining_--;
// If the result of the prerequisite doesn't need to move between threads, if (failed)
// save the deserialized v8::Value for later use.
if (prerequisite_state == State::kFailed) {
AdvanceState(State::kCancelPending); AdvanceState(State::kCancelPending);
} else if (worker_thread_->IsCurrentThread() && !v8_result.IsEmpty()) { arguments_[index].v8_value = v8_result;
arguments_[prerequisite_index].v8_value = arguments_[index].serialized_value = serialized_result;
std::make_unique<ScopedPersistent<v8::Value>>(
ToIsolate(worker_thread_->GlobalScope()), v8_result);
} else {
arguments_[prerequisite_index].serialized_value = result;
}
MaybeStartTask(); MaybeStartTask();
} }
void Task::MaybeStartTask() { void TaskBase::MaybeStartTask() {
if (prerequisites_remaining_) if (prerequisites_remaining_)
return; return;
DCHECK(state_ == State::kPending || state_ == State::kCancelPending); DCHECK(state_ == State::kPending || state_ == State::kCancelPending);
PostCrossThreadTask(*worker_thread_->GetTaskRunner(task_type_), FROM_HERE, PostCrossThreadTask(*worker_thread_->GetTaskRunner(task_type_), FROM_HERE,
CrossThreadBind(&Task::StartTaskOnWorkerThread, CrossThreadBind(&TaskBase::StartTaskOnWorkerThread,
WrapCrossThreadPersistent(this))); WrapCrossThreadPersistent(this)));
} }
void Task::StartTaskOnWorkerThread() { bool TaskBase::WillStartTaskOnWorkerThread() {
DCHECK(worker_thread_->IsCurrentThread()); DCHECK(worker_thread_->IsCurrentThread());
bool was_cancelled = false;
{ {
MutexLocker lock(mutex_); MutexLocker lock(mutex_);
DCHECK(!prerequisites_remaining_); DCHECK(!prerequisites_remaining_);
...@@ -241,8 +233,7 @@ void Task::StartTaskOnWorkerThread() { ...@@ -241,8 +233,7 @@ void Task::StartTaskOnWorkerThread() {
AdvanceState(State::kStarted); AdvanceState(State::kStarted);
break; break;
case State::kCancelPending: case State::kCancelPending:
was_cancelled = true; return false;
break;
case State::kStarted: case State::kStarted:
case State::kCompleted: case State::kCompleted:
case State::kFailed: case State::kFailed:
...@@ -250,52 +241,36 @@ void Task::StartTaskOnWorkerThread() { ...@@ -250,52 +241,36 @@ void Task::StartTaskOnWorkerThread() {
break; break;
} }
} }
return true;
if (was_cancelled) {
WorkerOrWorkletGlobalScope* global_scope = worker_thread_->GlobalScope();
v8::Isolate* isolate = ToIsolate(global_scope);
ScriptState::Scope scope(
global_scope->ScriptController()->GetScriptState());
TaskCompletedOnWorkerThread(V8String(isolate, "Task aborted"),
State::kFailed);
return;
}
RunTaskOnWorkerThread();
} }
void Task::TaskCompletedOnWorkerThread(v8::Local<v8::Value> return_value, void TaskBase::TaskCompletedOnWorkerThread(v8::Local<v8::Value> v8_result,
State state) { State state) {
DCHECK(worker_thread_->IsCurrentThread()); DCHECK(worker_thread_->IsCurrentThread());
v8_result_ =
scoped_refptr<SerializedScriptValue> local_result = new V8ResultHolder(ToIsolate(worker_thread_->GlobalScope()), v8_result);
SerializedScriptValue::SerializeAndSwallowExceptions(
ToIsolate(worker_thread_->GlobalScope()), return_value);
function_ = nullptr; function_ = nullptr;
arguments_.clear(); arguments_.clear();
Vector<CrossThreadPersistent<Dependent>> dependents_to_notify; Vector<CrossThreadPersistent<Dependent>> dependents_to_notify;
{ {
MutexLocker lock(mutex_); MutexLocker lock(mutex_);
serialized_result_ = local_result;
AdvanceState(state); AdvanceState(state);
dependents_to_notify.swap(dependents_); dependents_to_notify.swap(dependents_);
} }
for (auto& dependent : dependents_to_notify) { for (auto& dependent : dependents_to_notify)
dependent->task->PrerequisiteFinished(dependent->index, return_value, PassResultToDependentOnWorkerThread(dependent->index, dependent->task);
local_result, state);
}
PostCrossThreadTask( PostCrossThreadTask(
*worker_thread_->GetParentExecutionContextTaskRunners()->Get( *worker_thread_->GetParentExecutionContextTaskRunners()->Get(
TaskType::kInternalWorker), TaskType::kInternalWorker),
FROM_HERE, FROM_HERE,
CrossThreadBind(&Task::TaskCompleted, WrapCrossThreadPersistent(this))); CrossThreadBind(&TaskBase::TaskCompleted, WrapCrossThreadPersistent(this),
state == State::kCompleted));
} }
void Task::RunTaskOnWorkerThread() { void TaskBase::RunTaskOnWorkerThread() {
DCHECK(worker_thread_->IsCurrentThread()); DCHECK(worker_thread_->IsCurrentThread());
// No other thread should be touching function_ or arguments_ at this point, // No other thread should be touching function_ or arguments_ at this point,
// so no mutex needed while actually running the task. // so no mutex needed while actually running the task.
...@@ -338,7 +313,7 @@ void Task::RunTaskOnWorkerThread() { ...@@ -338,7 +313,7 @@ void Task::RunTaskOnWorkerThread() {
if (arguments_[i].serialized_value) if (arguments_[i].serialized_value)
params[i] = arguments_[i].serialized_value->Deserialize(isolate); params[i] = arguments_[i].serialized_value->Deserialize(isolate);
else else
params[i] = arguments_[i].v8_value->NewLocal(isolate); params[i] = arguments_[i].v8_value->GetResult(isolate);
} }
v8::TryCatch block(isolate); v8::TryCatch block(isolate);
...@@ -370,45 +345,13 @@ void Task::RunTaskOnWorkerThread() { ...@@ -370,45 +345,13 @@ void Task::RunTaskOnWorkerThread() {
TaskCompletedOnWorkerThread(return_value, State::kCompleted); TaskCompletedOnWorkerThread(return_value, State::kCompleted);
} }
void Task::TaskCompleted() { void TaskBase::TaskCompleted(bool was_successful) {
DCHECK(IsMainThread()); DCHECK(IsMainThread());
bool rejected = false;
{
MutexLocker lock(mutex_);
DCHECK(HasFinished());
rejected = state_ == State::kFailed;
}
ScriptState* script_state = resolver_->GetScriptState();
if (script_state->ContextIsValid()) {
ScriptState::Scope scope(script_state);
v8::Local<v8::Value> value;
{
MutexLocker lock(mutex_);
value = serialized_result_->Deserialize(script_state->GetIsolate());
}
if (rejected)
resolver_->Reject(v8::Exception::Error(value.As<v8::String>()));
else
resolver_->Resolve(value);
}
worker_thread_->DecrementTasksInProgressCount(); worker_thread_->DecrementTasksInProgressCount();
self_keep_alive_.Clear(); self_keep_alive_.Clear();
} }
ScriptPromise Task::result() { void TaskBase::AdvanceState(State new_state) {
DCHECK(IsMainThread());
return resolver_->Promise();
}
void Task::cancel() {
DCHECK(IsMainThread());
MutexLocker lock(mutex_);
if (state_ == State::kPending)
AdvanceState(State::kCancelPending);
}
void Task::AdvanceState(State new_state) {
switch (new_state) { switch (new_state) {
case State::kPending: case State::kPending:
NOTREACHED() << "kPending should only be set via initialization"; NOTREACHED() << "kPending should only be set via initialization";
...@@ -429,8 +372,94 @@ void Task::AdvanceState(State new_state) { ...@@ -429,8 +372,94 @@ void Task::AdvanceState(State new_state) {
state_ = new_state; state_ = new_state;
} }
Vector<ScriptValue> GetResolverArgument(ScriptState* script_state, Task* task) {
v8::Isolate* isolate = script_state->GetIsolate();
return Vector<ScriptValue>({ScriptValue(
script_state,
ToV8(task, isolate->GetCurrentContext()->Global(), isolate))});
}
ScriptPromise Task::result(ScriptState* script_state) {
DCHECK(IsMainThread());
if (!resolve_task_) {
resolve_task_ =
MakeGarbageCollected<ResolveTask>(script_state, task_type_, this);
}
return resolve_task_->GetPromise();
}
void Task::cancel() {
DCHECK(IsMainThread());
MutexLocker lock(mutex_);
if (state_ == State::kPending)
AdvanceState(State::kCancelPending);
}
void Task::StartTaskOnWorkerThread() {
DCHECK(worker_thread_->IsCurrentThread());
if (!WillStartTaskOnWorkerThread()) {
WorkerOrWorkletGlobalScope* global_scope = worker_thread_->GlobalScope();
v8::Isolate* isolate = ToIsolate(global_scope);
ScriptState::Scope scope(
global_scope->ScriptController()->GetScriptState());
TaskCompletedOnWorkerThread(V8String(isolate, "Task aborted"),
State::kFailed);
return;
}
RunTaskOnWorkerThread();
}
void Task::Trace(Visitor* visitor) { void Task::Trace(Visitor* visitor) {
ScriptWrappable::Trace(visitor); ScriptWrappable::Trace(visitor);
TaskBase::Trace(visitor);
visitor->Trace(resolve_task_);
}
ResolveTask::ResolveTask(ScriptState* script_state,
TaskType task_type,
Task* prerequisite)
: TaskBase(task_type, script_state, ScriptValue(), String()),
resolver_(ScriptPromiseResolver::Create(script_state)) {
DCHECK(IsMainThread());
// It's safe to pass a nullptr ThreadPoolThreadProivder here because it
// is only used to select a thread if there are no prerequisites, but a
// ResolveTask always has exactly one prerequisite.
InitializeArgumentsOnMainThread(
nullptr, script_state, GetResolverArgument(script_state, prerequisite));
}
void ResolveTask::StartTaskOnWorkerThread() {
// Just take the sole argument and use it as the return value that will be
// given to the promise resolver.
{
MutexLocker lock(mutex_);
serialized_result_ = arguments_[0].serialized_value;
}
TaskCompletedOnWorkerThread(
v8::Local<v8::Value>(),
WillStartTaskOnWorkerThread() ? State::kCompleted : State::kFailed);
}
void ResolveTask::TaskCompleted(bool was_successful) {
DCHECK(IsMainThread());
ScriptState* script_state = resolver_->GetScriptState();
if (!script_state->ContextIsValid())
return;
ScriptState::Scope scope(script_state);
v8::Local<v8::Value> value =
GetSerializedResult()->Deserialize(script_state->GetIsolate());
if (was_successful)
resolver_->Resolve(value);
else
resolver_->Reject(v8::Exception::Error(value.As<v8::String>()));
TaskBase::TaskCompleted(was_successful);
}
void ResolveTask::Trace(Visitor* visitor) {
TaskBase::Trace(visitor);
visitor->Trace(resolver_); visitor->Trace(resolver_);
} }
......
...@@ -12,82 +12,93 @@ ...@@ -12,82 +12,93 @@
#include "third_party/blink/renderer/platform/wtf/threading_primitives.h" #include "third_party/blink/renderer/platform/wtf/threading_primitives.h"
namespace blink { namespace blink {
class ResolveTask;
class SerializedScriptValue; class SerializedScriptValue;
// Runs |function| with |arguments| on a thread from the given ThreadPool. // Runs |function| with |arguments| on a thread from the given ThreadPool.
// Scans |arguments| for Task objects, and registers those as dependencies, // Scans |arguments| for Task objects, and registers those as dependencies,
// passing the result of those tasks in place of the Task arguments. // passing the result of those tasks in place of the Task arguments.
// All public functions are main-thread-only. // All public functions are main-thread-only.
// Task keeps itself alive via a SelfKeepAlive until the // TaskBase keeps itself alive via a SelfKeepAlive until the
// the task completes and reports itself done on the main thread via // the task completes and reports itself done on the main thread via
// TaskCompleted(). // TaskCompleted().
class Task final : public ScriptWrappable { class TaskBase : public GarbageCollectedMixin {
DEFINE_WRAPPERTYPEINFO();
public: public:
// Called on main thread virtual ~TaskBase();
Task(ThreadPoolThreadProvider*,
ScriptState*,
const ScriptValue& function,
const Vector<ScriptValue>& arguments,
TaskType);
Task(ThreadPoolThreadProvider*,
ScriptState*,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType);
~Task() override;
// Returns a promise that will be resolved with the result when it completes.
ScriptPromise result();
void cancel() LOCKS_EXCLUDED(mutex_);
void Trace(Visitor*) override; protected:
virtual void StartTaskOnWorkerThread() LOCKS_EXCLUDED(mutex_) = 0;
virtual bool IsTargetThreadForArguments() = 0;
private:
enum class State { kPending, kStarted, kCancelPending, kCompleted, kFailed }; enum class State { kPending, kStarted, kCancelPending, kCompleted, kFailed };
Task(ThreadPoolThreadProvider*, TaskBase(TaskType,
ScriptState*, ScriptState*,
const ScriptValue& function, const ScriptValue& function,
const String& function_name, const String& function_name);
const Vector<ScriptValue>& arguments,
TaskType); void InitializeArgumentsOnMainThread(ThreadPoolThreadProvider*,
ScriptState*,
const Vector<ScriptValue>& arguments);
class AsyncFunctionCompleted; class AsyncFunctionCompleted;
void StartTaskOnWorkerThread() LOCKS_EXCLUDED(mutex_); // This caches the result after the task completes on the worker thread.
// We can't safely clear the ScopedPersistent from the main thread, so
// this wrappper allows us to hold a CrossThreadPersistent that arranges
// for GC on the worker thread.
class V8ResultHolder final
: public GarbageCollectedFinalized<V8ResultHolder> {
public:
V8ResultHolder(v8::Isolate* isolate, v8::Local<v8::Value> result)
: result_(isolate, result) {}
~V8ResultHolder() = default;
v8::Local<v8::Value> GetResult(v8::Isolate* isolate) {
return result_.NewLocal(isolate);
}
void Trace(Visitor*) {}
private:
ScopedPersistent<v8::Value> result_;
};
bool WillStartTaskOnWorkerThread();
void RunTaskOnWorkerThread(); void RunTaskOnWorkerThread();
void TaskCompletedOnWorkerThread(v8::Local<v8::Value> return_value, State) void TaskCompletedOnWorkerThread(v8::Local<v8::Value> v8_result, State)
LOCKS_EXCLUDED(mutex_);
void PassResultToDependentOnWorkerThread(size_t dependent_index, TaskBase*)
LOCKS_EXCLUDED(mutex_); LOCKS_EXCLUDED(mutex_);
// Called on ANY thread (main thread, worker_thread_, or a sibling worker). // Called on ANY thread (main thread, worker_thread_, or a sibling worker).
void MaybeStartTask() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void MaybeStartTask() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void PrerequisiteFinished(size_t prerequisite_index, void PrerequisiteFinished(size_t index,
v8::Local<v8::Value>, V8ResultHolder*,
scoped_refptr<SerializedScriptValue>, scoped_refptr<SerializedScriptValue>,
State) LOCKS_EXCLUDED(mutex_); bool failed) LOCKS_EXCLUDED(mutex_);
bool HasFinished() const EXCLUSIVE_LOCKS_REQUIRED(mutex_) { bool HasFinished() const EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return state_ == State::kCompleted || state_ == State::kFailed; return state_ == State::kCompleted || state_ == State::kFailed;
} }
void AdvanceState(State new_state) EXCLUSIVE_LOCKS_REQUIRED(mutex_); void AdvanceState(State new_state) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Called on main thread or worker_thread_
scoped_refptr<SerializedScriptValue> GetSerializedResult()
LOCKS_EXCLUDED(mutex_);
// Called on main thread // Called on main thread
static ThreadPoolThread* SelectThread( static ThreadPoolThread* SelectThread(
const HeapVector<Member<Task>>& prerequisites, const HeapVector<Member<TaskBase>>& prerequisites,
ThreadPoolThreadProvider*); ThreadPoolThreadProvider*);
ThreadPoolThread* GetScheduledThread() LOCKS_EXCLUDED(mutex_); void RegisterDependencies(const HeapVector<Member<TaskBase>>& prerequisites,
void RegisterDependencies(const HeapVector<Member<Task>>& prerequisites,
const Vector<size_t>& prerequisite_indices) const Vector<size_t>& prerequisite_indices)
LOCKS_EXCLUDED(mutex_); LOCKS_EXCLUDED(mutex_);
void TaskCompleted(); virtual void TaskCompleted(bool was_successful);
// worker_thread_ is selected in the constructor and not changed thereafter. // worker_thread_ is selected in the constructor and not changed thereafter.
ThreadPoolThread* worker_thread_ = nullptr; ThreadPoolThread* worker_thread_ = nullptr;
const TaskType task_type_; const TaskType task_type_;
// Main thread only // Main thread only
SelfKeepAlive<Task> self_keep_alive_; SelfKeepAlive<TaskBase> self_keep_alive_;
Member<ScriptPromiseResolver> resolver_;
// Created in constructor on the main thread, consumed and cleared on // Created in constructor on the main thread, consumed and cleared on
// worker_thread_. Those steps can't overlap, so no mutex_ required. // worker_thread_. Those steps can't overlap, so no mutex_ required.
...@@ -97,20 +108,23 @@ class Task final : public ScriptWrappable { ...@@ -97,20 +108,23 @@ class Task final : public ScriptWrappable {
// Created and populated with non-prerequiste parameters on the main thread. // Created and populated with non-prerequiste parameters on the main thread.
// Each prerequisite writes its return value into arguments_ from its thread. // Each prerequisite writes its return value into arguments_ from its thread.
// If the prequisite and this have the same worker_thread_, there is no need // If the prequisite and this have the same worker_thread_, there is no need
// to serialize and deserialize the argument, so v8_argument will be populated // to serialize and deserialize the argument, so v8_value will be populated
// with the v8::Value returned by the prerequisite. // with the v8::Value returned by the prerequisite.
// Consumed and cleared on worker_thread_. // Consumed and cleared on worker_thread_.
// Only requires mutex_ when writing prerequisite results, at other times // Only requires mutex_ when writing prerequisite results, at other times
// either the main thread or the worker_thread_ has sole access. // either the main thread or the worker_thread_ has sole access.
struct Argument { struct Argument {
scoped_refptr<SerializedScriptValue> serialized_value; scoped_refptr<SerializedScriptValue> serialized_value;
std::unique_ptr<ScopedPersistent<v8::Value>> v8_value; CrossThreadPersistent<V8ResultHolder> v8_value;
}; };
Vector<Argument> arguments_; Vector<Argument> arguments_;
// Read on main thread, write on worker_thread_. // Read on main thread, write on worker_thread_.
scoped_refptr<SerializedScriptValue> serialized_result_ GUARDED_BY(mutex_); scoped_refptr<SerializedScriptValue> serialized_result_ GUARDED_BY(mutex_);
// Read/write on worker_thread_
CrossThreadPersistent<V8ResultHolder> v8_result_;
// Read/write on both main thread and worker_thread_. // Read/write on both main thread and worker_thread_.
State state_ GUARDED_BY(mutex_) = State::kPending; State state_ GUARDED_BY(mutex_) = State::kPending;
...@@ -122,11 +136,11 @@ class Task final : public ScriptWrappable { ...@@ -122,11 +136,11 @@ class Task final : public ScriptWrappable {
// Each element in dependents_ is not yet in the kCompleted state. // Each element in dependents_ is not yet in the kCompleted state.
struct Dependent final : public GarbageCollected<Dependent> { struct Dependent final : public GarbageCollected<Dependent> {
public: public:
Dependent(Task* task, size_t index) : task(task), index(index) { Dependent(TaskBase* task, size_t index) : task(task), index(index) {
DCHECK(IsMainThread()); DCHECK(IsMainThread());
} }
void Trace(Visitor* visitor) { visitor->Trace(task); } void Trace(Visitor* visitor) { visitor->Trace(task); }
Member<Task> task; Member<TaskBase> task;
// The index in the dependent's argument array where this result should go. // The index in the dependent's argument array where this result should go.
size_t index; size_t index;
}; };
...@@ -135,6 +149,65 @@ class Task final : public ScriptWrappable { ...@@ -135,6 +149,65 @@ class Task final : public ScriptWrappable {
Mutex mutex_; Mutex mutex_;
}; };
// The variant of TaskBase that is exposed to JS.
class Task final : public ScriptWrappable, public TaskBase {
DEFINE_WRAPPERTYPEINFO();
USING_GARBAGE_COLLECTED_MIXIN(Task);
public:
// Called on main thread
Task(ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const ScriptValue& function,
const Vector<ScriptValue>& arguments,
TaskType task_type)
: TaskBase(task_type, script_state, function, String()) {
InitializeArgumentsOnMainThread(thread_provider, script_state, arguments);
}
Task(ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType task_type)
: TaskBase(task_type, script_state, ScriptValue(), function_name) {
InitializeArgumentsOnMainThread(thread_provider, script_state, arguments);
}
// Returns a promise that will be resolved with the result when it completes.
ScriptPromise result(ScriptState*);
void cancel() LOCKS_EXCLUDED(mutex_);
void StartTaskOnWorkerThread() override LOCKS_EXCLUDED(mutex_);
bool IsTargetThreadForArguments() override {
return worker_thread_->IsCurrentThread();
}
void Trace(Visitor*) override;
private:
Member<ResolveTask> resolve_task_;
};
// An internal TaskBase subclass that drives main thread promise resolution.
// It registers itself as a dependent on the Task whose result is being
// promised. When that Task completes, it runs a dummy script that just returns
// the dependent's result as its own. It then eagerly serializes the result, and
// overrides TaskCompleted() to actually resolve the promise.
class ResolveTask final : public GarbageCollectedFinalized<ResolveTask>,
public TaskBase {
USING_GARBAGE_COLLECTED_MIXIN(ResolveTask);
public:
ResolveTask(ScriptState*, TaskType, Task* prerequisite);
void StartTaskOnWorkerThread() override LOCKS_EXCLUDED(mutex_);
bool IsTargetThreadForArguments() override { return IsMainThread(); }
void TaskCompleted(bool was_successful) override;
ScriptPromise GetPromise() { return resolver_->Promise(); }
void Trace(Visitor*) override;
private:
Member<ScriptPromiseResolver> resolver_;
};
} // namespace blink } // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_TASK_H_ #endif // THIRD_PARTY_BLINK_RENDERER_CORE_WORKERS_EXPERIMENTAL_TASK_H_
...@@ -6,6 +6,6 @@ ...@@ -6,6 +6,6 @@
Exposed=Window, Exposed=Window,
RuntimeEnabled=WorkerTaskQueue RuntimeEnabled=WorkerTaskQueue
] interface Task { ] interface Task {
readonly attribute Promise<any> result; [CallWith=ScriptState] readonly attribute Promise<any> result;
void cancel(); void cancel();
}; };
...@@ -49,7 +49,7 @@ ScriptPromise WorkerTaskQueue::postFunction( ...@@ -49,7 +49,7 @@ ScriptPromise WorkerTaskQueue::postFunction(
arguments, task_type_); arguments, task_type_);
if (signal) if (signal)
signal->AddAlgorithm(WTF::Bind(&Task::cancel, WrapWeakPersistent(task))); signal->AddAlgorithm(WTF::Bind(&Task::cancel, WrapWeakPersistent(task)));
return task->result(); return task->result(script_state);
} }
Task* WorkerTaskQueue::postTask(ScriptState* script_state, Task* WorkerTaskQueue::postTask(ScriptState* script_state,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment