Commit d002ad29 authored by Greg Kraynov's avatar Greg Kraynov Committed by Commit Bot

Introduce TaskQueueProxy.

Aimed to avoid ref-counted lifecycle of TaskQueueImpl.
SingleThreadTaskRunnner implementation will be dropped
from TaskQueue shortly. See bug for details.

TBR=gab@chromium.org

Bug: 865411
Change-Id: I92a3965299cb1c829e8a385f4bb2690383436107
Reviewed-on: https://chromium-review.googlesource.com/1234474
Commit-Queue: Greg Kraynov <kraynov@chromium.org>
Reviewed-by: default avatarAlexander Timin <altimin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#593185}
parent 265dbbbd
...@@ -760,6 +760,8 @@ jumbo_component("base") { ...@@ -760,6 +760,8 @@ jumbo_component("base") {
"task/sequence_manager/task_queue.h", "task/sequence_manager/task_queue.h",
"task/sequence_manager/task_queue_impl.cc", "task/sequence_manager/task_queue_impl.cc",
"task/sequence_manager/task_queue_impl.h", "task/sequence_manager/task_queue_impl.h",
"task/sequence_manager/task_queue_proxy.cc",
"task/sequence_manager/task_queue_proxy.h",
"task/sequence_manager/task_queue_selector.cc", "task/sequence_manager/task_queue_selector.cc",
"task/sequence_manager/task_queue_selector.h", "task/sequence_manager/task_queue_selector.h",
"task/sequence_manager/task_queue_selector_logic.h", "task/sequence_manager/task_queue_selector_logic.h",
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include "base/task/sequence_manager/associated_thread_id.h" #include "base/task/sequence_manager/associated_thread_id.h"
#include "base/task/sequence_manager/sequence_manager_impl.h" #include "base/task/sequence_manager/sequence_manager_impl.h"
#include "base/task/sequence_manager/task_queue_impl.h" #include "base/task/sequence_manager/task_queue_impl.h"
#include "base/task/sequence_manager/task_queue_proxy.h"
#include "base/task/sequence_manager/task_queue_task_runner.h" #include "base/task/sequence_manager/task_queue_task_runner.h"
#include "base/threading/thread_checker.h" #include "base/threading/thread_checker.h"
#include "base/time/time.h" #include "base/time/time.h"
...@@ -17,6 +18,20 @@ ...@@ -17,6 +18,20 @@
namespace base { namespace base {
namespace sequence_manager { namespace sequence_manager {
namespace {
constexpr int kTaskTypeNone = 0;
// TODO(kraynov): Move NullTaskRunner from //base/test to //base.
scoped_refptr<SingleThreadTaskRunner> CreateNullTaskRunner() {
return MakeRefCounted<internal::TaskQueueTaskRunner>(
MakeRefCounted<internal::TaskQueueProxy>(
nullptr, MakeRefCounted<internal::AssociatedThreadId>()),
kTaskTypeNone);
}
} // namespace
TaskQueue::TaskQueue(std::unique_ptr<internal::TaskQueueImpl> impl, TaskQueue::TaskQueue(std::unique_ptr<internal::TaskQueueImpl> impl,
const TaskQueue::Spec& spec) const TaskQueue::Spec& spec)
: impl_(std::move(impl)), : impl_(std::move(impl)),
...@@ -25,8 +40,9 @@ TaskQueue::TaskQueue(std::unique_ptr<internal::TaskQueueImpl> impl, ...@@ -25,8 +40,9 @@ TaskQueue::TaskQueue(std::unique_ptr<internal::TaskQueueImpl> impl,
impl_ ? impl_->GetGracefulQueueShutdownHelper() : nullptr), impl_ ? impl_->GetGracefulQueueShutdownHelper() : nullptr),
associated_thread_((impl_ && impl_->sequence_manager()) associated_thread_((impl_ && impl_->sequence_manager())
? impl_->sequence_manager()->associated_thread() ? impl_->sequence_manager()->associated_thread()
: MakeRefCounted<internal::AssociatedThreadId>()) { : MakeRefCounted<internal::AssociatedThreadId>()),
} default_task_runner_(impl_ ? impl_->CreateTaskRunner(kTaskTypeNone)
: CreateNullTaskRunner()) {}
TaskQueue::~TaskQueue() { TaskQueue::~TaskQueue() {
// scoped_refptr guarantees us that this object isn't used. // scoped_refptr guarantees us that this object isn't used.
...@@ -101,40 +117,27 @@ void TaskQueue::ShutdownTaskQueue() { ...@@ -101,40 +117,27 @@ void TaskQueue::ShutdownTaskQueue() {
scoped_refptr<SingleThreadTaskRunner> TaskQueue::CreateTaskRunner( scoped_refptr<SingleThreadTaskRunner> TaskQueue::CreateTaskRunner(
int task_type) { int task_type) {
return MakeRefCounted<internal::TaskQueueTaskRunner>(this, task_type); Optional<MoveableAutoLock> lock(AcquireImplReadLockIfNeeded());
if (!impl_)
return CreateNullTaskRunner();
return impl_->CreateTaskRunner(task_type);
} }
bool TaskQueue::RunsTasksInCurrentSequence() const { bool TaskQueue::RunsTasksInCurrentSequence() const {
return IsOnMainThread(); return task_runner()->RunsTasksInCurrentSequence();
} }
bool TaskQueue::PostDelayedTask(const Location& from_here, bool TaskQueue::PostDelayedTask(const Location& from_here,
OnceClosure task, OnceClosure task,
TimeDelta delay) { TimeDelta delay) {
return PostTaskWithMetadata( return task_runner()->PostDelayedTask(from_here, std::move(task), delay);
PostedTask(std::move(task), from_here, delay, Nestable::kNestable));
} }
bool TaskQueue::PostNonNestableDelayedTask(const Location& from_here, bool TaskQueue::PostNonNestableDelayedTask(const Location& from_here,
OnceClosure task, OnceClosure task,
TimeDelta delay) { TimeDelta delay) {
return PostTaskWithMetadata( return task_runner()->PostNonNestableDelayedTask(from_here, std::move(task),
PostedTask(std::move(task), from_here, delay, Nestable::kNonNestable)); delay);
}
bool TaskQueue::PostTaskWithMetadata(PostedTask task) {
Optional<MoveableAutoLock> lock = AcquireImplReadLockIfNeeded();
if (!impl_)
return false;
internal::TaskQueueImpl::PostTaskResult result(
impl_->PostDelayedTask(std::move(task)));
if (result.success)
return true;
// If posting task was unsuccessful then |result| will contain
// the original task which should be destructed outside of the lock.
lock = nullopt;
// Task gets implicitly destructed here.
return false;
} }
std::unique_ptr<TaskQueue::QueueEnabledVoter> std::unique_ptr<TaskQueue::QueueEnabledVoter>
......
...@@ -56,6 +56,7 @@ class BASE_EXPORT TaskQueue : public SingleThreadTaskRunner { ...@@ -56,6 +56,7 @@ class BASE_EXPORT TaskQueue : public SingleThreadTaskRunner {
// A wrapper around OnceClosure with additional metadata to be passed // A wrapper around OnceClosure with additional metadata to be passed
// to PostTask and plumbed until PendingTask is created. // to PostTask and plumbed until PendingTask is created.
// TODO(kraynov): Move to TaskQueueTaskRunner.
struct BASE_EXPORT PostedTask { struct BASE_EXPORT PostedTask {
PostedTask(OnceClosure callback, PostedTask(OnceClosure callback,
Location posted_from, Location posted_from,
...@@ -315,10 +316,15 @@ class BASE_EXPORT TaskQueue : public SingleThreadTaskRunner { ...@@ -315,10 +316,15 @@ class BASE_EXPORT TaskQueue : public SingleThreadTaskRunner {
// Create a task runner for this TaskQueue which will annotate all // Create a task runner for this TaskQueue which will annotate all
// posted tasks with the given task type. // posted tasks with the given task type.
// May be called on any thread.
scoped_refptr<SingleThreadTaskRunner> CreateTaskRunner(int task_type); scoped_refptr<SingleThreadTaskRunner> CreateTaskRunner(int task_type);
// TODO(kraynov): Drop this implementation and introduce // Default task runner which doesn't annotate tasks with a task type.
// GetDefaultTaskRunner() method instead. scoped_refptr<SingleThreadTaskRunner> task_runner() const {
return default_task_runner_;
}
// TODO(kraynov): Drop this implementation.
// SingleThreadTaskRunner implementation: // SingleThreadTaskRunner implementation:
bool RunsTasksInCurrentSequence() const override; bool RunsTasksInCurrentSequence() const override;
bool PostDelayedTask(const Location& from_here, bool PostDelayedTask(const Location& from_here,
...@@ -328,8 +334,6 @@ class BASE_EXPORT TaskQueue : public SingleThreadTaskRunner { ...@@ -328,8 +334,6 @@ class BASE_EXPORT TaskQueue : public SingleThreadTaskRunner {
OnceClosure task, OnceClosure task,
TimeDelta delay) override; TimeDelta delay) override;
bool PostTaskWithMetadata(PostedTask task);
protected: protected:
TaskQueue(std::unique_ptr<internal::TaskQueueImpl> impl, TaskQueue(std::unique_ptr<internal::TaskQueueImpl> impl,
const TaskQueue::Spec& spec); const TaskQueue::Spec& spec);
...@@ -365,6 +369,7 @@ class BASE_EXPORT TaskQueue : public SingleThreadTaskRunner { ...@@ -365,6 +369,7 @@ class BASE_EXPORT TaskQueue : public SingleThreadTaskRunner {
graceful_queue_shutdown_helper_; graceful_queue_shutdown_helper_;
scoped_refptr<internal::AssociatedThreadId> associated_thread_; scoped_refptr<internal::AssociatedThreadId> associated_thread_;
scoped_refptr<SingleThreadTaskRunner> default_task_runner_;
DISALLOW_COPY_AND_ASSIGN(TaskQueue); DISALLOW_COPY_AND_ASSIGN(TaskQueue);
}; };
......
...@@ -9,6 +9,8 @@ ...@@ -9,6 +9,8 @@
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
#include "base/task/sequence_manager/sequence_manager_impl.h" #include "base/task/sequence_manager/sequence_manager_impl.h"
#include "base/task/sequence_manager/task_queue_proxy.h"
#include "base/task/sequence_manager/task_queue_task_runner.h"
#include "base/task/sequence_manager/time_domain.h" #include "base/task/sequence_manager/time_domain.h"
#include "base/task/sequence_manager/work_queue.h" #include "base/task/sequence_manager/work_queue.h"
#include "base/time/time.h" #include "base/time/time.h"
...@@ -49,6 +51,7 @@ TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager, ...@@ -49,6 +51,7 @@ TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
: AssociatedThreadId::CreateBound()), : AssociatedThreadId::CreateBound()),
any_thread_(sequence_manager, time_domain), any_thread_(sequence_manager, time_domain),
main_thread_only_(sequence_manager, this, time_domain), main_thread_only_(sequence_manager, this, time_domain),
proxy_(MakeRefCounted<TaskQueueProxy>(this, associated_thread_)),
should_monitor_quiescence_(spec.should_monitor_quiescence), should_monitor_quiescence_(spec.should_monitor_quiescence),
should_notify_observers_(spec.should_notify_observers) { should_notify_observers_(spec.should_notify_observers) {
DCHECK(time_domain); DCHECK(time_domain);
...@@ -130,7 +133,16 @@ TaskQueueImpl::MainThreadOnly::MainThreadOnly( ...@@ -130,7 +133,16 @@ TaskQueueImpl::MainThreadOnly::MainThreadOnly(
TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default; TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default;
scoped_refptr<SingleThreadTaskRunner> TaskQueueImpl::CreateTaskRunner(
int task_type) const {
// |proxy_| pointer is const, hence no need for lock.
return MakeRefCounted<TaskQueueTaskRunner>(proxy_, task_type);
}
void TaskQueueImpl::UnregisterTaskQueue() { void TaskQueueImpl::UnregisterTaskQueue() {
// Detach task runners.
proxy_->DetachFromTaskQueueImpl();
TaskDeque immediate_incoming_queue; TaskDeque immediate_incoming_queue;
{ {
......
...@@ -36,6 +36,7 @@ class TimeDomain; ...@@ -36,6 +36,7 @@ class TimeDomain;
namespace internal { namespace internal {
class SequenceManagerImpl; class SequenceManagerImpl;
class TaskQueueProxy;
class WorkQueue; class WorkQueue;
class WorkQueueSets; class WorkQueueSets;
...@@ -181,6 +182,9 @@ class BASE_EXPORT TaskQueueImpl { ...@@ -181,6 +182,9 @@ class BASE_EXPORT TaskQueueImpl {
RepeatingCallback<void(const TaskQueue::Task&, RepeatingCallback<void(const TaskQueue::Task&,
const TaskQueue::TaskTiming&)>; const TaskQueue::TaskTiming&)>;
// May be called from any thread.
scoped_refptr<SingleThreadTaskRunner> CreateTaskRunner(int task_type) const;
// TaskQueue implementation. // TaskQueue implementation.
const char* GetName() const; const char* GetName() const;
bool RunsTasksInCurrentSequence() const; bool RunsTasksInCurrentSequence() const;
...@@ -448,6 +452,10 @@ class BASE_EXPORT TaskQueueImpl { ...@@ -448,6 +452,10 @@ class BASE_EXPORT TaskQueueImpl {
return main_thread_only_; return main_thread_only_;
} }
// Proxy which allows TaskQueueTaskRunner to dispatch tasks and it can be
// detached from TaskQueueImpl to leave dangling task runners behind sefely.
const scoped_refptr<TaskQueueProxy> proxy_;
mutable Lock immediate_incoming_queue_lock_; mutable Lock immediate_incoming_queue_lock_;
TaskDeque immediate_incoming_queue_; TaskDeque immediate_incoming_queue_;
TaskDeque& immediate_incoming_queue() { TaskDeque& immediate_incoming_queue() {
......
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/sequence_manager/task_queue_proxy.h"
#include "base/task/sequence_manager/associated_thread_id.h"
#include "base/task/sequence_manager/sequence_manager_impl.h"
#include "base/task/sequence_manager/task_queue_impl.h"
namespace base {
namespace sequence_manager {
namespace internal {
TaskQueueProxy::TaskQueueProxy(
TaskQueueImpl* task_queue_impl,
scoped_refptr<AssociatedThreadId> associated_thread)
: task_queue_impl_(task_queue_impl),
associated_thread_(std::move(associated_thread)) {}
TaskQueueProxy::~TaskQueueProxy() = default;
bool TaskQueueProxy::PostTask(TaskQueue::PostedTask task) const {
Optional<MoveableAutoLock> lock(AcquireLockIfNeeded());
if (!task_queue_impl_)
return false;
TaskQueueImpl::PostTaskResult result(
task_queue_impl_->PostDelayedTask(std::move(task)));
// If posting task was unsuccessful then |result| will contain
// the original task which should be destructed outside of the lock
// because new tasks may be posted in the destrictor.
lock = nullopt;
return result.success;
}
bool TaskQueueProxy::RunsTasksInCurrentSequence() const {
return associated_thread_->thread_id == PlatformThread::CurrentId();
}
Optional<MoveableAutoLock> TaskQueueProxy::AcquireLockIfNeeded() const {
if (RunsTasksInCurrentSequence())
return nullopt;
return MoveableAutoLock(lock_);
}
void TaskQueueProxy::DetachFromTaskQueueImpl() {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
AutoLock lock(lock_);
// Main thread is the only thread where |task_queue_impl_| is being read
// without a lock, which is fine because this function is main thread only.
task_queue_impl_ = nullptr;
}
} // namespace internal
} // namespace sequence_manager
} // namespace base
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_PROXY_H_
#define BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_PROXY_H_
#include "base/memory/ref_counted.h"
#include "base/optional.h"
#include "base/synchronization/lock.h"
#include "base/task/sequence_manager/moveable_auto_lock.h"
#include "base/task/sequence_manager/task_queue.h"
namespace base {
namespace sequence_manager {
namespace internal {
struct AssociatedThreadId;
class TaskQueueImpl;
// Task runners are ref-counted and unaccountable, so we need a safe way
// to delete a task queue while associated task runners might be still around.
// When TaskQueueImpl goes away, this proxy becomes a stub and later on gets
// ref-count-destructed once no TaskQueueTaskRunner remains.
// NOTE: Instances must be constructed or detached only by TaskQueueImpl,
// unless |task_queue_impl| is null (which is useful for stub task runners).
class TaskQueueProxy : public RefCountedThreadSafe<TaskQueueProxy> {
public:
TaskQueueProxy(TaskQueueImpl* task_queue_impl,
scoped_refptr<AssociatedThreadId> associated_thread);
// May be called on any thread.
bool PostTask(TaskQueue::PostedTask task) const;
bool RunsTasksInCurrentSequence() const;
// PostTask will reject any task after this call.
// Must be called on main thread only.
void DetachFromTaskQueueImpl();
private:
friend class RefCountedThreadSafe<TaskQueueProxy>;
~TaskQueueProxy();
// Doesn't acquire lock on main thread.
Optional<MoveableAutoLock> AcquireLockIfNeeded() const;
mutable Lock lock_;
TaskQueueImpl* task_queue_impl_; // Not owned.
const scoped_refptr<AssociatedThreadId> associated_thread_;
};
} // namespace internal
} // namespace sequence_manager
} // namespace base
#endif // BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_PROXY_H_
...@@ -5,34 +5,36 @@ ...@@ -5,34 +5,36 @@
#include "base/task/sequence_manager/task_queue_task_runner.h" #include "base/task/sequence_manager/task_queue_task_runner.h"
#include "base/task/sequence_manager/task_queue.h" #include "base/task/sequence_manager/task_queue.h"
#include "base/task/sequence_manager/task_queue_proxy.h"
namespace base { namespace base {
namespace sequence_manager { namespace sequence_manager {
namespace internal { namespace internal {
TaskQueueTaskRunner::TaskQueueTaskRunner(scoped_refptr<TaskQueue> task_queue, TaskQueueTaskRunner::TaskQueueTaskRunner(
int task_type) scoped_refptr<TaskQueueProxy> task_queue_proxy,
: task_queue_(task_queue), task_type_(task_type) {} int task_type)
: task_queue_proxy_(std::move(task_queue_proxy)), task_type_(task_type) {}
TaskQueueTaskRunner::~TaskQueueTaskRunner() {} TaskQueueTaskRunner::~TaskQueueTaskRunner() {}
bool TaskQueueTaskRunner::PostDelayedTask(const Location& location, bool TaskQueueTaskRunner::PostDelayedTask(const Location& location,
OnceClosure callback, OnceClosure callback,
TimeDelta delay) { TimeDelta delay) {
return task_queue_->PostTaskWithMetadata(TaskQueue::PostedTask( return task_queue_proxy_->PostTask(TaskQueue::PostedTask(
std::move(callback), location, delay, Nestable::kNestable, task_type_)); std::move(callback), location, delay, Nestable::kNestable, task_type_));
} }
bool TaskQueueTaskRunner::PostNonNestableDelayedTask(const Location& location, bool TaskQueueTaskRunner::PostNonNestableDelayedTask(const Location& location,
OnceClosure callback, OnceClosure callback,
TimeDelta delay) { TimeDelta delay) {
return task_queue_->PostTaskWithMetadata( return task_queue_proxy_->PostTask(
TaskQueue::PostedTask(std::move(callback), location, delay, TaskQueue::PostedTask(std::move(callback), location, delay,
Nestable::kNonNestable, task_type_)); Nestable::kNonNestable, task_type_));
} }
bool TaskQueueTaskRunner::RunsTasksInCurrentSequence() const { bool TaskQueueTaskRunner::RunsTasksInCurrentSequence() const {
return task_queue_->RunsTasksInCurrentSequence(); return task_queue_proxy_->RunsTasksInCurrentSequence();
} }
} // namespace internal } // namespace internal
......
...@@ -9,19 +9,17 @@ ...@@ -9,19 +9,17 @@
namespace base { namespace base {
namespace sequence_manager { namespace sequence_manager {
class TaskQueue;
namespace internal { namespace internal {
class TaskQueueProxy;
// TODO(kraynov): Post tasks to a TaskQueue solely using this task runner and // TODO(kraynov): Post tasks to a TaskQueue solely using this task runner and
// drop SingleThreadTaskRunner implementation in the TaskQueue class. // drop SingleThreadTaskRunner implementation in the TaskQueue class.
// See https://crbug.com/865411. // See https://crbug.com/865411.
class BASE_EXPORT TaskQueueTaskRunner : public SingleThreadTaskRunner { class BASE_EXPORT TaskQueueTaskRunner : public SingleThreadTaskRunner {
public: public:
// TODO(kraynov): Use TaskQueueTaskProxy that will be detachable TaskQueueTaskRunner(scoped_refptr<TaskQueueProxy> task_queue_proxy,
// from TaskQueue(Impl) when it's getting shutdown. int task_type);
TaskQueueTaskRunner(scoped_refptr<TaskQueue> task_queue, int task_type);
bool PostDelayedTask(const Location& location, bool PostDelayedTask(const Location& location,
OnceClosure callback, OnceClosure callback,
...@@ -34,7 +32,7 @@ class BASE_EXPORT TaskQueueTaskRunner : public SingleThreadTaskRunner { ...@@ -34,7 +32,7 @@ class BASE_EXPORT TaskQueueTaskRunner : public SingleThreadTaskRunner {
private: private:
~TaskQueueTaskRunner() override; // Ref-counted. ~TaskQueueTaskRunner() override; // Ref-counted.
scoped_refptr<TaskQueue> task_queue_; const scoped_refptr<TaskQueueProxy> task_queue_proxy_;
const int task_type_; const int task_type_;
DISALLOW_COPY_AND_ASSIGN(TaskQueueTaskRunner); DISALLOW_COPY_AND_ASSIGN(TaskQueueTaskRunner);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment