Commit 6d12f07b authored by Nate Chapin's avatar Nate Chapin Committed by Commit Bot

Make ThreadPoolTask oilpan-friendly and merge it in to Task

Change-Id: I94ade605404051bf0d735b936923c2084bb805bc
Reviewed-on: https://chromium-review.googlesource.com/c/1316436
Commit-Queue: Nate Chapin <japhet@chromium.org>
Reviewed-by: default avatarHiroki Nakagawa <nhiroki@chromium.org>
Cr-Commit-Position: refs/heads/master@{#605457}
parent 2a90bffa
......@@ -17,10 +17,11 @@
namespace blink {
class ThreadPoolTask::AsyncFunctionCompleted : public ScriptFunction {
// worker_thread_ only.
class Task::AsyncFunctionCompleted : public ScriptFunction {
public:
static v8::Local<v8::Function> CreateFunction(ScriptState* script_state,
ThreadPoolTask* task,
Task* task,
State state) {
return (new AsyncFunctionCompleted(script_state, task, state))
->BindToV8Function();
......@@ -32,50 +33,47 @@ class ThreadPoolTask::AsyncFunctionCompleted : public ScriptFunction {
}
private:
AsyncFunctionCompleted(ScriptState* script_state,
ThreadPoolTask* task,
State state)
AsyncFunctionCompleted(ScriptState* script_state, Task* task, State state)
: ScriptFunction(script_state), task_(task), state_(state) {}
ThreadPoolTask* task_;
CrossThreadPersistent<Task> task_;
const State state_;
};
ThreadPoolTask::ThreadPoolTask(ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const ScriptValue& function,
const Vector<ScriptValue>& arguments,
TaskType task_type)
: ThreadPoolTask(thread_provider,
script_state,
function,
String(),
arguments,
task_type) {}
ThreadPoolTask::ThreadPoolTask(ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType task_type)
: ThreadPoolTask(thread_provider,
script_state,
ScriptValue(),
function_name,
arguments,
task_type) {}
ThreadPoolTask::ThreadPoolTask(ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const ScriptValue& function,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType task_type)
Task::Task(ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const ScriptValue& function,
const Vector<ScriptValue>& arguments,
TaskType task_type)
: Task(thread_provider,
script_state,
function,
String(),
arguments,
task_type) {}
Task::Task(ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType task_type)
: Task(thread_provider,
script_state,
ScriptValue(),
function_name,
arguments,
task_type) {}
Task::Task(ThreadPoolThreadProvider* thread_provider,
ScriptState* script_state,
const ScriptValue& function,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType task_type)
: task_type_(task_type),
self_keep_alive_(base::AdoptRef(this)),
self_keep_alive_(this),
resolver_(ScriptPromiseResolver::Create(script_state)),
function_name_(function_name.IsolatedCopy()),
arguments_(arguments.size()),
weak_factory_(this) {
arguments_(arguments.size()) {
DCHECK(IsMainThread());
DCHECK_EQ(!function.IsEmpty(), function_name.IsNull());
DCHECK(task_type_ == TaskType::kUserInteraction ||
......@@ -88,7 +86,7 @@ ThreadPoolTask::ThreadPoolTask(ThreadPoolThreadProvider* thread_provider,
isolate, function.V8Value()->ToString(isolate));
}
Vector<ThreadPoolTask*> prerequisites;
HeapVector<Member<Task>> prerequisites;
Vector<size_t> prerequisites_indices;
for (size_t i = 0; i < arguments_.size(); i++) {
// Normal case: if the argument isn't a Task, just serialize it.
......@@ -98,10 +96,9 @@ ThreadPoolTask::ThreadPoolTask(ThreadPoolThreadProvider* thread_provider,
isolate, arguments[i].V8Value());
continue;
}
ThreadPoolTask* prerequisite =
Task* prerequisite =
ToScriptWrappable(arguments[i].V8Value().As<v8::Object>())
->ToImpl<Task>()
->GetThreadPoolTask();
->ToImpl<Task>();
prerequisites.push_back(prerequisite);
prerequisites_indices.push_back(i);
}
......@@ -114,7 +111,7 @@ ThreadPoolTask::ThreadPoolTask(ThreadPoolThreadProvider* thread_provider,
return;
}
// Prior to this point, other ThreadPoolTask instances don't have a reference
// Prior to this point, other Task instances don't have a reference
// to |this| yet, so no need to lock mutex_. RegisterDependencies() populates
// those references, so RegisterDependencies() and any logic thereafter must
// consider the potential for data races.
......@@ -122,14 +119,14 @@ ThreadPoolTask::ThreadPoolTask(ThreadPoolThreadProvider* thread_provider,
}
// static
ThreadPoolThread* ThreadPoolTask::SelectThread(
const Vector<ThreadPoolTask*>& prerequisites,
ThreadPoolThread* Task::SelectThread(
const HeapVector<Member<Task>>& prerequisites,
ThreadPoolThreadProvider* thread_provider) {
DCHECK(IsMainThread());
HashCountedSet<ThreadPoolThread*> prerequisite_location_counts;
size_t max_prerequisite_location_count = 0;
ThreadPoolThread* max_prerequisite_thread = nullptr;
for (ThreadPoolTask* prerequisite : prerequisites) {
for (Task* prerequisite : prerequisites) {
// For prerequisites that are not yet complete, track which thread the task
// will run on. Put this task on the thread where the most prerequisites
// reside. This is slightly imprecise, because a task may complete before
......@@ -147,7 +144,7 @@ ThreadPoolThread* ThreadPoolTask::SelectThread(
: thread_provider->GetLeastBusyThread();
}
ThreadPoolThread* ThreadPoolTask::GetScheduledThread() {
ThreadPoolThread* Task::GetScheduledThread() {
DCHECK(IsMainThread());
MutexLocker lock(mutex_);
return HasFinished() ? nullptr : worker_thread_;
......@@ -155,9 +152,8 @@ ThreadPoolThread* ThreadPoolTask::GetScheduledThread() {
// Should only be called from constructor. Split out in to a helper because
// clang appears to exempt constructors from thread safety analysis.
void ThreadPoolTask::RegisterDependencies(
const Vector<ThreadPoolTask*>& prerequisites,
const Vector<size_t>& prerequisites_indices) {
void Task::RegisterDependencies(const HeapVector<Member<Task>>& prerequisites,
const Vector<size_t>& prerequisites_indices) {
DCHECK(IsMainThread());
{
MutexLocker lock(mutex_);
......@@ -165,7 +161,7 @@ void ThreadPoolTask::RegisterDependencies(
}
for (size_t i = 0; i < prerequisites.size(); i++) {
ThreadPoolTask* prerequisite = prerequisites[i];
Task* prerequisite = prerequisites[i];
size_t prerequisite_index = prerequisites_indices[i];
scoped_refptr<SerializedScriptValue> result;
State prerequisite_state = State::kPending;
......@@ -176,8 +172,8 @@ void ThreadPoolTask::RegisterDependencies(
if (prerequisite->HasFinished()) {
result = prerequisite->serialized_result_;
} else {
prerequisite->dependents_.insert(
std::make_unique<Dependent>(this, prerequisite_index));
prerequisite->dependents_.emplace_back(
new Dependent(this, prerequisite_index));
}
}
......@@ -191,7 +187,7 @@ void ThreadPoolTask::RegisterDependencies(
}
}
ThreadPoolTask::~ThreadPoolTask() {
Task::~Task() {
DCHECK(IsMainThread());
DCHECK(HasFinished());
DCHECK(!function_);
......@@ -200,11 +196,10 @@ ThreadPoolTask::~ThreadPoolTask() {
DCHECK(dependents_.IsEmpty());
}
void ThreadPoolTask::PrerequisiteFinished(
size_t prerequisite_index,
v8::Local<v8::Value> v8_result,
scoped_refptr<SerializedScriptValue> result,
State prerequisite_state) {
void Task::PrerequisiteFinished(size_t prerequisite_index,
v8::Local<v8::Value> v8_result,
scoped_refptr<SerializedScriptValue> result,
State prerequisite_state) {
MutexLocker lock(mutex_);
DCHECK(state_ == State::kPending || state_ == State::kCancelPending);
DCHECK(prerequisite_state == State::kCompleted ||
......@@ -225,16 +220,16 @@ void ThreadPoolTask::PrerequisiteFinished(
MaybeStartTask();
}
void ThreadPoolTask::MaybeStartTask() {
void Task::MaybeStartTask() {
if (prerequisites_remaining_)
return;
DCHECK(state_ == State::kPending || state_ == State::kCancelPending);
PostCrossThreadTask(*worker_thread_->GetTaskRunner(task_type_), FROM_HERE,
CrossThreadBind(&ThreadPoolTask::StartTaskOnWorkerThread,
CrossThreadUnretained(this)));
CrossThreadBind(&Task::StartTaskOnWorkerThread,
WrapCrossThreadPersistent(this)));
}
void ThreadPoolTask::StartTaskOnWorkerThread() {
void Task::StartTaskOnWorkerThread() {
DCHECK(worker_thread_->IsCurrentThread());
bool was_cancelled = false;
......@@ -269,9 +264,8 @@ void ThreadPoolTask::StartTaskOnWorkerThread() {
RunTaskOnWorkerThread();
}
void ThreadPoolTask::TaskCompletedOnWorkerThread(
v8::Local<v8::Value> return_value,
State state) {
void Task::TaskCompletedOnWorkerThread(v8::Local<v8::Value> return_value,
State state) {
DCHECK(worker_thread_->IsCurrentThread());
scoped_refptr<SerializedScriptValue> local_result =
......@@ -281,7 +275,7 @@ void ThreadPoolTask::TaskCompletedOnWorkerThread(
function_ = nullptr;
arguments_.clear();
HashSet<std::unique_ptr<Dependent>> dependents_to_notify;
Vector<CrossThreadPersistent<Dependent>> dependents_to_notify;
{
MutexLocker lock(mutex_);
serialized_result_ = local_result;
......@@ -298,12 +292,10 @@ void ThreadPoolTask::TaskCompletedOnWorkerThread(
*worker_thread_->GetParentExecutionContextTaskRunners()->Get(
TaskType::kInternalWorker),
FROM_HERE,
CrossThreadBind(&ThreadPoolTask::TaskCompleted,
CrossThreadUnretained(this)));
// TaskCompleted may delete |this| at any time after this point.
CrossThreadBind(&Task::TaskCompleted, WrapCrossThreadPersistent(this)));
}
void ThreadPoolTask::RunTaskOnWorkerThread() {
void Task::RunTaskOnWorkerThread() {
DCHECK(worker_thread_->IsCurrentThread());
// No other thread should be touching function_ or arguments_ at this point,
// so no mutex needed while actually running the task.
......@@ -378,7 +370,7 @@ void ThreadPoolTask::RunTaskOnWorkerThread() {
TaskCompletedOnWorkerThread(return_value, State::kCompleted);
}
void ThreadPoolTask::TaskCompleted() {
void Task::TaskCompleted() {
DCHECK(IsMainThread());
bool rejected = false;
{
......@@ -401,23 +393,22 @@ void ThreadPoolTask::TaskCompleted() {
resolver_->Resolve(value);
}
worker_thread_->DecrementTasksInProgressCount();
self_keep_alive_.reset();
// |this| may be deleted here.
self_keep_alive_.Clear();
}
ScriptPromise ThreadPoolTask::GetResult() {
ScriptPromise Task::result() {
DCHECK(IsMainThread());
return resolver_->Promise();
}
void ThreadPoolTask::Cancel() {
void Task::cancel() {
DCHECK(IsMainThread());
MutexLocker lock(mutex_);
if (state_ == State::kPending)
AdvanceState(State::kCancelPending);
}
void ThreadPoolTask::AdvanceState(State new_state) {
void Task::AdvanceState(State new_state) {
switch (new_state) {
case State::kPending:
NOTREACHED() << "kPending should only be set via initialization";
......@@ -438,4 +429,9 @@ void ThreadPoolTask::AdvanceState(State new_state) {
state_ = new_state;
}
void Task::Trace(Visitor* visitor) {
ScriptWrappable::Trace(visitor);
visitor->Trace(resolver_);
}
} // namespace blink
......@@ -18,41 +18,40 @@ class SerializedScriptValue;
// Scans |arguments| for Task objects, and registers those as dependencies,
// passing the result of those tasks in place of the Task arguments.
// All public functions are main-thread-only.
// ThreadPoolTask keeps itself alive via a self scoped_refptr until the
// Task keeps itself alive via a SelfKeepAlive until the
// the task completes and reports itself done on the main thread via
// TaskCompleted(). Other users (e.g. Task below) can keep the task
// alive after completion.
class ThreadPoolTask final : public RefCounted<ThreadPoolTask> {
// TaskCompleted().
class Task final : public ScriptWrappable {
DEFINE_WRAPPERTYPEINFO();
public:
// Called on main thread
ThreadPoolTask(ThreadPoolThreadProvider*,
ScriptState*,
const ScriptValue& function,
const Vector<ScriptValue>& arguments,
TaskType);
ThreadPoolTask(ThreadPoolThreadProvider*,
ScriptState*,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType);
~ThreadPoolTask();
Task(ThreadPoolThreadProvider*,
ScriptState*,
const ScriptValue& function,
const Vector<ScriptValue>& arguments,
TaskType);
Task(ThreadPoolThreadProvider*,
ScriptState*,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType);
~Task() override;
// Returns a promise that will be resolved with the result when it completes.
ScriptPromise GetResult();
void Cancel() LOCKS_EXCLUDED(mutex_);
ScriptPromise result();
void cancel() LOCKS_EXCLUDED(mutex_);
base::WeakPtr<ThreadPoolTask> GetWeakPtr() {
return weak_factory_.GetWeakPtr();
}
void Trace(Visitor*) override;
private:
enum class State { kPending, kStarted, kCancelPending, kCompleted, kFailed };
ThreadPoolTask(ThreadPoolThreadProvider*,
ScriptState*,
const ScriptValue& function,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType);
Task(ThreadPoolThreadProvider*,
ScriptState*,
const ScriptValue& function,
const String& function_name,
const Vector<ScriptValue>& arguments,
TaskType);
class AsyncFunctionCompleted;
......@@ -74,10 +73,10 @@ class ThreadPoolTask final : public RefCounted<ThreadPoolTask> {
// Called on main thread
static ThreadPoolThread* SelectThread(
const Vector<ThreadPoolTask*>& prerequisites,
const HeapVector<Member<Task>>& prerequisites,
ThreadPoolThreadProvider*);
ThreadPoolThread* GetScheduledThread() LOCKS_EXCLUDED(mutex_);
void RegisterDependencies(const Vector<ThreadPoolTask*>& prerequisites,
void RegisterDependencies(const HeapVector<Member<Task>>& prerequisites,
const Vector<size_t>& prerequisite_indices)
LOCKS_EXCLUDED(mutex_);
void TaskCompleted();
......@@ -87,8 +86,8 @@ class ThreadPoolTask final : public RefCounted<ThreadPoolTask> {
const TaskType task_type_;
// Main thread only
scoped_refptr<ThreadPoolTask> self_keep_alive_;
Persistent<ScriptPromiseResolver> resolver_;
SelfKeepAlive<Task> self_keep_alive_;
Member<ScriptPromiseResolver> resolver_;
// Created in constructor on the main thread, consumed and cleared on
// worker_thread_. Those steps can't overlap, so no mutex_ required.
......@@ -120,38 +119,20 @@ class ThreadPoolTask final : public RefCounted<ThreadPoolTask> {
size_t prerequisites_remaining_ GUARDED_BY(mutex_) = 0u;
// Elements added from main thread. Cleared on completion on worker_thread_.
// Each element in dependents_ is not yet in the kCompleted state and
// therefore is guaranteed to be alive.
struct Dependent {
// Each element in dependents_ is not yet in the kCompleted state.
struct Dependent final : public GarbageCollected<Dependent> {
public:
Dependent(ThreadPoolTask* task, size_t index) : task(task), index(index) {}
ThreadPoolTask* task;
Dependent(Task* task, size_t index) : task(task), index(index) {
DCHECK(IsMainThread());
}
void Trace(Visitor* visitor) { visitor->Trace(task); }
Member<Task> task;
// The index in the dependent's argument array where this result should go.
size_t index;
};
HashSet<std::unique_ptr<Dependent>> dependents_ GUARDED_BY(mutex_);
Vector<CrossThreadPersistent<Dependent>> dependents_ GUARDED_BY(mutex_);
Mutex mutex_;
base::WeakPtrFactory<ThreadPoolTask> weak_factory_;
};
// This is a thin, v8-exposed wrapper around ThreadPoolTask that allows
// ThreadPoolTask to avoid being GarbageCollected.
class Task : public ScriptWrappable {
DEFINE_WRAPPERTYPEINFO();
public:
explicit Task(ThreadPoolTask* thread_pool_task)
: thread_pool_task_(thread_pool_task) {}
~Task() override = default;
ScriptPromise result() { return thread_pool_task_->GetResult(); }
void cancel() { thread_pool_task_->Cancel(); }
ThreadPoolTask* GetThreadPoolTask() const { return thread_pool_task_.get(); }
private:
scoped_refptr<ThreadPoolTask> thread_pool_task_;
};
} // namespace blink
......
......@@ -55,17 +55,14 @@ Task* TaskWorklet::postTask(ScriptState* script_state,
// TODO(japhet): Here and below: it's unclear what task type should be used,
// and whether the API should allow it to be configured. Using kIdleTask as a
// placeholder for now.
ThreadPoolTask* thread_pool_task = new ThreadPoolTask(
this, script_state, function, arguments, TaskType::kIdleTask);
return new Task(thread_pool_task);
return new Task(this, script_state, function, arguments, TaskType::kIdleTask);
}
Task* TaskWorklet::postTask(ScriptState* script_state,
const String& function_name,
const Vector<ScriptValue>& arguments) {
ThreadPoolTask* thread_pool_task = new ThreadPoolTask(
this, script_state, function_name, arguments, TaskType::kIdleTask);
return new Task(thread_pool_task);
return new Task(this, script_state, function_name, arguments,
TaskType::kIdleTask);
}
ThreadPoolThread* TaskWorklet::GetLeastBusyThread() {
......
......@@ -39,19 +39,17 @@ WorkerTaskQueue::WorkerTaskQueue(Document* document, TaskType task_type)
ScriptPromise WorkerTaskQueue::postFunction(
ScriptState* script_state,
const ScriptValue& task,
const ScriptValue& function,
AbortSignal* signal,
const Vector<ScriptValue>& arguments) {
DCHECK(document_->IsContextThread());
DCHECK(task.IsFunction());
DCHECK(function.IsFunction());
ThreadPoolTask* thread_pool_task = new ThreadPoolTask(
ThreadPool::From(*document_), script_state, task, arguments, task_type_);
if (signal) {
signal->AddAlgorithm(
WTF::Bind(&ThreadPoolTask::Cancel, thread_pool_task->GetWeakPtr()));
}
return thread_pool_task->GetResult();
Task* task = new Task(ThreadPool::From(*document_), script_state, function,
arguments, task_type_);
if (signal)
signal->AddAlgorithm(WTF::Bind(&Task::cancel, WrapWeakPersistent(task)));
return task->result();
}
Task* WorkerTaskQueue::postTask(ScriptState* script_state,
......@@ -59,11 +57,8 @@ Task* WorkerTaskQueue::postTask(ScriptState* script_state,
const Vector<ScriptValue>& arguments) {
DCHECK(document_->IsContextThread());
DCHECK(function.IsFunction());
ThreadPoolTask* thread_pool_task =
new ThreadPoolTask(ThreadPool::From(*document_), script_state, function,
arguments, task_type_);
return new Task(thread_pool_task);
return new Task(ThreadPool::From(*document_), script_state, function,
arguments, task_type_);
}
void WorkerTaskQueue::Trace(blink::Visitor* visitor) {
......
......@@ -30,12 +30,12 @@ class CORE_EXPORT WorkerTaskQueue : public ScriptWrappable {
~WorkerTaskQueue() override = default;
ScriptPromise postFunction(ScriptState*,
const ScriptValue& task,
const ScriptValue& function,
AbortSignal*,
const Vector<ScriptValue>& arguments);
Task* postTask(ScriptState*,
const ScriptValue& task,
const ScriptValue& function,
const Vector<ScriptValue>& arguments);
void Trace(blink::Visitor*) override;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment