Commit 02b1cd6a authored by Gabriel Charette's avatar Gabriel Charette Committed by Commit Bot

[MessageLoop] MessageLoopTaskRunner as multi-threaded incoming queue.

Introduces the SequencedTaskSource interface to //base. This should ultimately
be coalesced with a similar interface used by SequenceManager but this CL was
complex enough in isolation that it was easier to introduce only the basic API
for now.

MessageLoop will as a follow-up expose an API that will allow it to be created
with a custom SequencedTaskSource. This will enable
ScopedTaskEnvironment::MOCK_TIME on custom MessageLoops (i.e. ForUI/ForIO) and
soon after on all threads (not just main thread), since TaskScheduler's delayed
tasks are managed on a MessageLoopForIO. Finally this will allow MOCK_TIME on
TestBrowserThreadBundle (which uses a MessageLoopForUI); nicely rounding out the
MOCK_TIME APIs.

In this CL the following semantics have changed in MessageLoop :
 * MessageLoop::unbound_task_runner_ ->
    - MessageLoop::underlying_task_runner_
    - MessageLoop::task_runner_ still initially set to it
    - but now const to clarify lifetime (and not set to null in
      ~MessageLoop() as this was pointless)
    - Implements SequencedTaskSource
    - Now has a Shutdown() phase so that PostTask() that happen-after
      ~MessageLoop() returns false
      (MessageLoop::Controller::DisconnectFromParent() still required,
      see comment in ~MessageLoop())
 * IncomingTaskQueue (to be renamed to PendingTaskQueue in follow-up) :
    - no longer has a TriageQueue (MessageLoop communicates directly with
      SequencedTaskSource)
    - no longer ref-counted, strictly a member of MessageLoop
    - MessageLoop uses it as storage for tasks it must delay/defer
    - IncomingTaskQueue no longer has a Shutdown() phase, becomes
      dead storage on shutdown and dies with MessageLoop
    - Racy PostTask() on shutdown are now strictly handled between
      MessageLoop and its Controller
 * SequencedTaskSource :
    - the interface MessageLoop feeds from, initially only
      implemented by MessageLoopTaskRunner but will be made customizable
      as a follow-up
 * MessageLoop::SetTaskRunner()
    - still a thing to keep existing use cases alive but will ultimately
      die in favor of providing an external SequencedTaskSource (and
      killing all MessageLoop task_runner dances performed today :))


Note: most new logic in message_loop_task_runner.cc was moved as-is from
incoming_task_queue.cc

Bug: 708584, 860252
Change-Id: I23f40d293d2df6fe41374fad127745f2ff72fbe0
Reviewed-on: https://chromium-review.googlesource.com/1088762
Commit-Queue: Gabriel Charette <gab@chromium.org>
Reviewed-by: default avatarkylechar <kylechar@chromium.org>
Cr-Commit-Position: refs/heads/master@{#578809}
parent 77605f5e
...@@ -574,6 +574,7 @@ jumbo_component("base") { ...@@ -574,6 +574,7 @@ jumbo_component("base") {
"message_loop/message_pump_mac.mm", "message_loop/message_pump_mac.mm",
"message_loop/message_pump_win.cc", "message_loop/message_pump_win.cc",
"message_loop/message_pump_win.h", "message_loop/message_pump_win.h",
"message_loop/sequenced_task_source.h",
"message_loop/timer_slack.h", "message_loop/timer_slack.h",
"metrics/bucket_ranges.cc", "metrics/bucket_ranges.cc",
"metrics/bucket_ranges.h", "metrics/bucket_ranges.h",
......
...@@ -4,157 +4,28 @@ ...@@ -4,157 +4,28 @@
#include "base/message_loop/incoming_task_queue.h" #include "base/message_loop/incoming_task_queue.h"
#include <limits>
#include <utility> #include <utility>
#include "base/bind.h" #include "base/logging.h"
#include "base/callback_helpers.h"
#include "base/location.h"
#include "base/metrics/histogram_macros.h" #include "base/metrics/histogram_macros.h"
#include "base/synchronization/waitable_event.h"
#include "base/time/time.h"
#include "build/build_config.h" #include "build/build_config.h"
namespace base { namespace base {
namespace internal { namespace internal {
namespace { IncomingTaskQueue::IncomingTaskQueue() = default;
#if DCHECK_IS_ON()
// Delays larger than this are often bogus, and a warning should be emitted in
// debug builds to warn developers. http://crbug.com/450045
constexpr TimeDelta kTaskDelayWarningThreshold = TimeDelta::FromDays(14);
#endif
TimeTicks CalculateDelayedRuntime(TimeDelta delay) {
TimeTicks delayed_run_time;
if (delay > TimeDelta())
delayed_run_time = TimeTicks::Now() + delay;
else
DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
return delayed_run_time;
}
} // namespace
IncomingTaskQueue::IncomingTaskQueue(
std::unique_ptr<Observer> task_queue_observer)
: task_queue_observer_(std::move(task_queue_observer)),
triage_tasks_(this) {
// The constructing sequence is not necessarily the running sequence, e.g. in
// the case of a MessageLoop created unbound.
DETACH_FROM_SEQUENCE(sequence_checker_);
}
IncomingTaskQueue::~IncomingTaskQueue() = default; IncomingTaskQueue::~IncomingTaskQueue() = default;
bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here,
OnceClosure task,
TimeDelta delay,
Nestable nestable) {
// Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
// for details.
CHECK(task);
DLOG_IF(WARNING, delay > kTaskDelayWarningThreshold)
<< "Requesting super-long task delay period of " << delay.InSeconds()
<< " seconds from here: " << from_here.ToString();
PendingTask pending_task(from_here, std::move(task),
CalculateDelayedRuntime(delay), nestable);
#if defined(OS_WIN)
// We consider the task needs a high resolution timer if the delay is
// more than 0 and less than 32ms. This caps the relative error to
// less than 50% : a 33ms wait can wake at 48ms since the default
// resolution on Windows is between 10 and 15ms.
if (delay > TimeDelta() &&
delay.InMilliseconds() < (2 * Time::kMinLowResolutionThresholdMs)) {
pending_task.is_high_res = true;
}
#endif
return PostPendingTask(&pending_task);
}
void IncomingTaskQueue::Shutdown() {
AutoLock auto_lock(incoming_queue_lock_);
accept_new_tasks_ = false;
}
void IncomingTaskQueue::ReportMetricsOnIdle() const { void IncomingTaskQueue::ReportMetricsOnIdle() const {
UMA_HISTOGRAM_COUNTS_1M( UMA_HISTOGRAM_COUNTS_1M(
"MessageLoop.DelayedTaskQueueForUI.PendingTasksCountOnIdle", "MessageLoop.DelayedTaskQueueForUI.PendingTasksCountOnIdle",
delayed_tasks_.Size()); delayed_tasks_.Size());
} }
IncomingTaskQueue::TriageQueue::TriageQueue(IncomingTaskQueue* outer)
: outer_(outer) {}
IncomingTaskQueue::TriageQueue::~TriageQueue() = default;
const PendingTask& IncomingTaskQueue::TriageQueue::Peek() {
DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
ReloadFromIncomingQueueIfEmpty();
DCHECK(!queue_.empty());
return queue_.front();
}
PendingTask IncomingTaskQueue::TriageQueue::Pop() {
DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
ReloadFromIncomingQueueIfEmpty();
DCHECK(!queue_.empty());
PendingTask pending_task = std::move(queue_.front());
queue_.pop();
return pending_task;
}
bool IncomingTaskQueue::TriageQueue::HasTasks() {
DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
ReloadFromIncomingQueueIfEmpty();
return !queue_.empty();
}
void IncomingTaskQueue::TriageQueue::Clear() {
DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
// Clear() should be invoked before WillDestroyCurrentMessageLoop().
DCHECK(outer_->accept_new_tasks_);
// Delete all currently pending tasks but not tasks potentially posted from
// their destructors. See ~MessageLoop() for the full logic mitigating against
// infite loops when clearing pending tasks. The ScopedClosureRunner below
// will be bound to a task posted at the end of the queue. After it is posted,
// tasks will be deleted one by one, when the bound ScopedClosureRunner is
// deleted and sets |deleted_all_originally_pending|, we know we've deleted
// all originally pending tasks.
bool deleted_all_originally_pending = false;
ScopedClosureRunner capture_deleted_all_originally_pending(BindOnce(
[](bool* deleted_all_originally_pending) {
*deleted_all_originally_pending = true;
},
Unretained(&deleted_all_originally_pending)));
outer_->AddToIncomingQueue(
FROM_HERE,
BindOnce([](ScopedClosureRunner) {},
std::move(capture_deleted_all_originally_pending)),
TimeDelta(), Nestable::kNestable);
while (!deleted_all_originally_pending) {
PendingTask pending_task = Pop();
if (!pending_task.delayed_run_time.is_null()) {
outer_->delayed_tasks().Push(std::move(pending_task));
}
}
}
void IncomingTaskQueue::TriageQueue::ReloadFromIncomingQueueIfEmpty() {
DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
if (queue_.empty()) {
outer_->ReloadWorkQueue(&queue_);
}
}
IncomingTaskQueue::DelayedQueue::DelayedQueue() { IncomingTaskQueue::DelayedQueue::DelayedQueue() {
// The constructing sequence is not necessarily the running sequence, e.g. in
// the case of a MessageLoop created unbound.
DETACH_FROM_SEQUENCE(sequence_checker_); DETACH_FROM_SEQUENCE(sequence_checker_);
} }
...@@ -209,6 +80,8 @@ size_t IncomingTaskQueue::DelayedQueue::Size() const { ...@@ -209,6 +80,8 @@ size_t IncomingTaskQueue::DelayedQueue::Size() const {
} }
IncomingTaskQueue::DeferredQueue::DeferredQueue() { IncomingTaskQueue::DeferredQueue::DeferredQueue() {
// The constructing sequence is not necessarily the running sequence, e.g. in
// the case of a MessageLoop created unbound.
DETACH_FROM_SEQUENCE(sequence_checker_); DETACH_FROM_SEQUENCE(sequence_checker_);
} }
...@@ -244,63 +117,5 @@ void IncomingTaskQueue::DeferredQueue::Clear() { ...@@ -244,63 +117,5 @@ void IncomingTaskQueue::DeferredQueue::Clear() {
Pop(); Pop();
} }
bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
// Warning: Don't try to short-circuit, and handle this thread's tasks more
// directly, as it could starve handling of foreign threads. Put every task
// into this queue.
bool accept_new_tasks;
bool was_empty = false;
{
AutoLock auto_lock(incoming_queue_lock_);
accept_new_tasks = accept_new_tasks_;
if (accept_new_tasks) {
was_empty =
PostPendingTaskLockRequired(pending_task) && triage_queue_empty_;
}
}
if (!accept_new_tasks) {
// Clear the pending task outside of |incoming_queue_lock_| to prevent any
// chance of self-deadlock if destroying a task also posts a task to this
// queue.
pending_task->task.Reset();
return false;
}
// Let |task_queue_observer_| know of the queued task. This is done outside
// |incoming_queue_lock_| to avoid conflating locks (DidQueueTask() can also
// use a lock).
task_queue_observer_->DidQueueTask(was_empty);
return true;
}
bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) {
incoming_queue_lock_.AssertAcquired();
// Initialize the sequence number. The sequence number is used for delayed
// tasks (to facilitate FIFO sorting when two tasks have the same
// delayed_run_time value) and for identifying the task in about:tracing.
pending_task->sequence_num = next_sequence_num_++;
task_queue_observer_->WillQueueTask(pending_task);
bool was_empty = incoming_queue_.empty();
incoming_queue_.push(std::move(*pending_task));
return was_empty;
}
void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Make sure no tasks are lost.
DCHECK(work_queue->empty());
// Acquire all we can from the inter-thread queue with one lock acquisition.
AutoLock lock(incoming_queue_lock_);
incoming_queue_.swap(*work_queue);
triage_queue_empty_ = work_queue->empty();
}
} // namespace internal } // namespace internal
} // namespace base } // namespace base
...@@ -5,51 +5,24 @@ ...@@ -5,51 +5,24 @@
#ifndef BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_ #ifndef BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
#define BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_ #define BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
#include "base/base_export.h"
#include "base/callback.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/pending_task.h" #include "base/pending_task.h"
#include "base/sequence_checker.h" #include "base/sequence_checker.h"
#include "base/synchronization/lock.h"
#include "base/time/time.h"
namespace base { namespace base {
class BasicPostTaskPerfTest;
namespace internal { namespace internal {
// Implements a queue of tasks posted to the message loop running on the current // Provides storage for tasks deferred by MessageLoop via DelayedQueue and
// thread. This class takes care of synchronizing posting tasks from different // DeferredQueue.
// threads and together with MessageLoop ensures clean shutdown. // TODO(gab): Rename to PendingTaskQueue after landing
class BASE_EXPORT IncomingTaskQueue // https://crrev.com/1088762.
: public RefCountedThreadSafe<IncomingTaskQueue> { class IncomingTaskQueue {
public: public:
// TODO(gab): Move this to SequencedTaskSource::Observer in // Provides a read-write task queue.
// https://chromium-review.googlesource.com/c/chromium/src/+/1088762. class Queue {
class Observer {
public:
virtual ~Observer() = default;
// Notifies this Observer that it is about to enqueue |task|. The Observer
// may alter |task| as a result (e.g. add metadata to the PendingTask
// struct). This may be called while holding a lock and shouldn't perform
// logic requiring synchronization (override DidQueueTask() for that).
virtual void WillQueueTask(PendingTask* task) = 0;
// Notifies this Observer that a task was queued in the IncomingTaskQueue it
// observes. |was_empty| is true if the task source was empty (i.e.
// |!HasTasks()|) before this task was posted. DidQueueTask() can be invoked
// from any thread.
virtual void DidQueueTask(bool was_empty) = 0;
};
// Provides a read and remove only view into a task queue.
class ReadAndRemoveOnlyQueue {
public: public:
ReadAndRemoveOnlyQueue() = default; Queue() = default;
virtual ~ReadAndRemoveOnlyQueue() = default; virtual ~Queue() = default;
// Returns the next task. HasTasks() is assumed to be true. // Returns the next task. HasTasks() is assumed to be true.
virtual const PendingTask& Peek() = 0; virtual const PendingTask& Peek() = 0;
...@@ -63,16 +36,6 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -63,16 +36,6 @@ class BASE_EXPORT IncomingTaskQueue
// Removes all tasks. // Removes all tasks.
virtual void Clear() = 0; virtual void Clear() = 0;
private:
DISALLOW_COPY_AND_ASSIGN(ReadAndRemoveOnlyQueue);
};
// Provides a read-write task queue.
class Queue : public ReadAndRemoveOnlyQueue {
public:
Queue() = default;
~Queue() override = default;
// Adds the task to the end of the queue. // Adds the task to the end of the queue.
virtual void Push(PendingTask pending_task) = 0; virtual void Push(PendingTask pending_task) = 0;
...@@ -80,40 +43,14 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -80,40 +43,14 @@ class BASE_EXPORT IncomingTaskQueue
DISALLOW_COPY_AND_ASSIGN(Queue); DISALLOW_COPY_AND_ASSIGN(Queue);
}; };
// Constructs an IncomingTaskQueue which will invoke |task_queue_observer| IncomingTaskQueue();
// when tasks are queued. |task_queue_observer| will be bound to this ~IncomingTaskQueue();
// IncomingTaskQueue's lifetime. Ownership is required as opposed to a raw
// pointer since IncomingTaskQueue is ref-counted. For the same reasons,
// |task_queue_observer| needs to support being invoked racily during
// shutdown).
explicit IncomingTaskQueue(std::unique_ptr<Observer> task_queue_observer);
// Appends a task to the incoming queue. Posting of all tasks is routed though
// AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting
// task is properly synchronized between different threads.
//
// Returns true if the task was successfully added to the queue, otherwise
// returns false. In all cases, the ownership of |task| is transferred to the
// called method.
bool AddToIncomingQueue(const Location& from_here,
OnceClosure task,
TimeDelta delay,
Nestable nestable);
// Instructs this IncomingTaskQueue to stop accepting tasks, this cannot be
// undone. Note that the registered IncomingTaskQueue::Observer may still
// racily receive a few DidQueueTask() calls while the Shutdown() signal
// propagates to other threads and it needs to support that.
void Shutdown();
ReadAndRemoveOnlyQueue& triage_tasks() { return triage_tasks_; }
Queue& delayed_tasks() { return delayed_tasks_; } Queue& delayed_tasks() { return delayed_tasks_; }
Queue& deferred_tasks() { return deferred_tasks_; } Queue& deferred_tasks() { return deferred_tasks_; }
bool HasPendingHighResolutionTasks() const { bool HasPendingHighResolutionTasks() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return delayed_tasks_.HasPendingHighResolutionTasks(); return delayed_tasks_.HasPendingHighResolutionTasks();
} }
...@@ -122,54 +59,8 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -122,54 +59,8 @@ class BASE_EXPORT IncomingTaskQueue
void ReportMetricsOnIdle() const; void ReportMetricsOnIdle() const;
private: private:
friend class base::BasicPostTaskPerfTest;
friend class RefCountedThreadSafe<IncomingTaskQueue>;
// These queues below support the previous MessageLoop behavior of
// maintaining three queue queues to process tasks:
//
// TriageQueue
// The first queue to receive all tasks for the processing sequence (when
// reloading from the thread-safe |incoming_queue_|). Tasks are generally
// either dispatched immediately or sent to the queues below.
//
// DelayedQueue
// The queue for holding tasks that should be run later and sorted by expected // The queue for holding tasks that should be run later and sorted by expected
// run time. // run time.
//
// DeferredQueue
// The queue for holding tasks that couldn't be run while the MessageLoop was
// nested. These are generally processed during the idle stage.
//
// Many of these do not share implementations even though they look like they
// could because of small quirks (reloading semantics) or differing underlying
// data strucutre (TaskQueue vs DelayedTaskQueue).
// The starting point for all tasks on the sequence processing the tasks.
class TriageQueue : public ReadAndRemoveOnlyQueue {
public:
TriageQueue(IncomingTaskQueue* outer);
~TriageQueue() override;
// ReadAndRemoveOnlyQueue:
// The methods below will attempt to reload from the incoming queue if the
// queue itself is empty (Clear() has special logic to reload only once
// should destructors post more tasks).
const PendingTask& Peek() override;
PendingTask Pop() override;
// Whether this queue has tasks after reloading from the incoming queue.
bool HasTasks() override;
void Clear() override;
private:
void ReloadFromIncomingQueueIfEmpty();
IncomingTaskQueue* const outer_;
TaskQueue queue_;
DISALLOW_COPY_AND_ASSIGN(TriageQueue);
};
class DelayedQueue : public Queue { class DelayedQueue : public Queue {
public: public:
DelayedQueue(); DelayedQueue();
...@@ -185,6 +76,7 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -185,6 +76,7 @@ class BASE_EXPORT IncomingTaskQueue
size_t Size() const; size_t Size() const;
bool HasPendingHighResolutionTasks() const { bool HasPendingHighResolutionTasks() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return pending_high_res_tasks_ > 0; return pending_high_res_tasks_ > 0;
} }
...@@ -199,6 +91,8 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -199,6 +91,8 @@ class BASE_EXPORT IncomingTaskQueue
DISALLOW_COPY_AND_ASSIGN(DelayedQueue); DISALLOW_COPY_AND_ASSIGN(DelayedQueue);
}; };
// The queue for holding tasks that couldn't be run while the MessageLoop was
// nested. These are generally processed during the idle stage.
class DeferredQueue : public Queue { class DeferredQueue : public Queue {
public: public:
DeferredQueue(); DeferredQueue();
...@@ -219,57 +113,9 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -219,57 +113,9 @@ class BASE_EXPORT IncomingTaskQueue
DISALLOW_COPY_AND_ASSIGN(DeferredQueue); DISALLOW_COPY_AND_ASSIGN(DeferredQueue);
}; };
virtual ~IncomingTaskQueue();
// Adds a task to |incoming_queue_|. The caller retains ownership of
// |pending_task|, but this function will reset the value of
// |pending_task->task|. This is needed to ensure that the posting call stack
// does not retain |pending_task->task| beyond this function call.
bool PostPendingTask(PendingTask* pending_task);
// Does the real work of posting a pending task. Returns true if
// |incoming_queue_| was empty before |pending_task| was posted.
bool PostPendingTaskLockRequired(PendingTask* pending_task);
// Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called
// from the sequence processing the tasks.
void ReloadWorkQueue(TaskQueue* work_queue);
// Checks calls made only on the MessageLoop thread.
SEQUENCE_CHECKER(sequence_checker_);
const std::unique_ptr<Observer> task_queue_observer_;
// Queue for initial triaging of tasks on the |sequence_checker_| sequence.
TriageQueue triage_tasks_;
// Queue for delayed tasks on the |sequence_checker_| sequence.
DelayedQueue delayed_tasks_; DelayedQueue delayed_tasks_;
// Queue for non-nestable deferred tasks on the |sequence_checker_| sequence.
DeferredQueue deferred_tasks_; DeferredQueue deferred_tasks_;
// Synchronizes access to all members below this line.
base::Lock incoming_queue_lock_;
// An incoming queue of tasks that are acquired under a mutex for processing
// on this instance's thread. These tasks have not yet been been pushed to
// |triage_tasks_|.
TaskQueue incoming_queue_;
// True if new tasks should be accepted.
bool accept_new_tasks_ = true;
// The next sequence number to use for delayed tasks.
int next_sequence_num_ = 0;
// True if the outgoing queue (|triage_tasks_|) is empty. Toggled under
// |incoming_queue_lock_| in ReloadWorkQueue() so that
// PostPendingTaskLockRequired() can tell, without accessing the thread unsafe
// |triage_tasks_|, if the IncomingTaskQueue has been made non-empty by a
// PostTask() (and needs to inform its Observer).
bool triage_queue_empty_ = true;
DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue); DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue);
}; };
......
...@@ -8,13 +8,16 @@ ...@@ -8,13 +8,16 @@
#include <utility> #include <utility>
#include "base/bind.h" #include "base/bind.h"
#include "base/callback_helpers.h"
#include "base/compiler_specific.h" #include "base/compiler_specific.h"
#include "base/debug/task_annotator.h" #include "base/debug/task_annotator.h"
#include "base/logging.h" #include "base/logging.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
#include "base/message_loop/message_loop_task_runner.h"
#include "base/message_loop/message_pump_default.h" #include "base/message_loop/message_pump_default.h"
#include "base/message_loop/message_pump_for_io.h" #include "base/message_loop/message_pump_for_io.h"
#include "base/message_loop/message_pump_for_ui.h" #include "base/message_loop/message_pump_for_ui.h"
#include "base/message_loop/sequenced_task_source.h"
#include "base/run_loop.h" #include "base/run_loop.h"
#include "base/third_party/dynamic_annotations/dynamic_annotations.h" #include "base/third_party/dynamic_annotations/dynamic_annotations.h"
#include "base/threading/thread_id_name_manager.h" #include "base/threading/thread_id_name_manager.h"
...@@ -37,7 +40,7 @@ std::unique_ptr<MessagePump> ReturnPump(std::unique_ptr<MessagePump> pump) { ...@@ -37,7 +40,7 @@ std::unique_ptr<MessagePump> ReturnPump(std::unique_ptr<MessagePump> pump) {
} // namespace } // namespace
class MessageLoop::Controller : public internal::IncomingTaskQueue::Observer { class MessageLoop::Controller : public SequencedTaskSource::Observer {
public: public:
// Constructs a MessageLoopController which controls |message_loop|, notifying // Constructs a MessageLoopController which controls |message_loop|, notifying
// |task_annotator_| when tasks are queued scheduling work on |message_loop| // |task_annotator_| when tasks are queued scheduling work on |message_loop|
...@@ -47,7 +50,7 @@ class MessageLoop::Controller : public internal::IncomingTaskQueue::Observer { ...@@ -47,7 +50,7 @@ class MessageLoop::Controller : public internal::IncomingTaskQueue::Observer {
~Controller() override; ~Controller() override;
// IncomingTaskQueue::Observer: // SequencedTaskSource::Observer:
void WillQueueTask(PendingTask* task) final; void WillQueueTask(PendingTask* task) final;
void DidQueueTask(bool was_empty) final; void DidQueueTask(bool was_empty) final;
...@@ -171,7 +174,7 @@ MessageLoop::~MessageLoop() { ...@@ -171,7 +174,7 @@ MessageLoop::~MessageLoop() {
for (int i = 0; i < 100; ++i) { for (int i = 0; i < 100; ++i) {
DeletePendingTasks(); DeletePendingTasks();
// If we end up with empty queues, then break out of the loop. // If we end up with empty queues, then break out of the loop.
tasks_remain = incoming_task_queue_->triage_tasks().HasTasks(); tasks_remain = sequenced_task_source_->HasTasks();
if (!tasks_remain) if (!tasks_remain)
break; break;
} }
...@@ -183,12 +186,22 @@ MessageLoop::~MessageLoop() { ...@@ -183,12 +186,22 @@ MessageLoop::~MessageLoop() {
thread_task_runner_handle_.reset(); thread_task_runner_handle_.reset();
// Tell the incoming queue that we are dying. // Detach this instance's Controller from |this|. After this point,
// |underlying_task_runner_| may still receive tasks and notify the controller
// but the controller will no-op (and not use this MessageLoop after free).
// |underlying_task_runner_| being ref-counted and potentially kept alive by
// many SingleThreadTaskRunner refs, the best we can do is tell it to shutdown
// after which it will start returning false to PostTasks that happen-after
// this point (note that invoking Shutdown() first would not remove the need
// to DisconnectFromParent() since the controller is invoked *after* a task is
// enqueued and the incoming queue's lock is released (see
// MessageLoopTaskRunner::AddToIncomingQueue()).
// Details : while an "in-progress post tasks" refcount in Controller in lieu
// of |message_loop_lock_| would be an option to handle the "pending post
// tasks on shutdown" case, |message_loop_lock_| would still be required to
// serialize ScheduleWork() call and as such that optimization isn't worth it.
message_loop_controller_->DisconnectFromParent(); message_loop_controller_->DisconnectFromParent();
incoming_task_queue_->Shutdown(); underlying_task_runner_->Shutdown();
incoming_task_queue_ = nullptr;
unbound_task_runner_ = nullptr;
task_runner_ = nullptr;
// OK, now make it so that no one can find us. // OK, now make it so that no one can find us.
if (MessageLoopCurrent::IsBoundToCurrentThreadInternal(this)) if (MessageLoopCurrent::IsBoundToCurrentThreadInternal(this))
...@@ -262,11 +275,11 @@ void MessageLoop::RemoveTaskObserver(TaskObserver* task_observer) { ...@@ -262,11 +275,11 @@ void MessageLoop::RemoveTaskObserver(TaskObserver* task_observer) {
bool MessageLoop::IsIdleForTesting() { bool MessageLoop::IsIdleForTesting() {
// Have unprocessed tasks? (this reloads the work queue if necessary) // Have unprocessed tasks? (this reloads the work queue if necessary)
if (incoming_task_queue_->triage_tasks().HasTasks()) if (sequenced_task_source_->HasTasks())
return false; return false;
// Have unprocessed deferred tasks which can be processed at this run-level? // Have unprocessed deferred tasks which can be processed at this run-level?
if (incoming_task_queue_->deferred_tasks().HasTasks() && if (pending_task_queue_.deferred_tasks().HasTasks() &&
!RunLoop::IsNestedOnCurrentThread()) { !RunLoop::IsNestedOnCurrentThread()) {
return false; return false;
} }
...@@ -283,19 +296,16 @@ std::unique_ptr<MessageLoop> MessageLoop::CreateUnbound( ...@@ -283,19 +296,16 @@ std::unique_ptr<MessageLoop> MessageLoop::CreateUnbound(
return WrapUnique(new MessageLoop(type, std::move(pump_factory))); return WrapUnique(new MessageLoop(type, std::move(pump_factory)));
} }
// TODO(gab): Avoid bare new + WrapUnique below when introducing
// SequencedTaskSource in follow-up @
// https://chromium-review.googlesource.com/c/chromium/src/+/1088762.
MessageLoop::MessageLoop(Type type, MessagePumpFactoryCallback pump_factory) MessageLoop::MessageLoop(Type type, MessagePumpFactoryCallback pump_factory)
: MessageLoopCurrent(this), : MessageLoopCurrent(this),
type_(type), type_(type),
pump_factory_(std::move(pump_factory)), pump_factory_(std::move(pump_factory)),
message_loop_controller_(new Controller(this)), message_loop_controller_(
incoming_task_queue_(MakeRefCounted<internal::IncomingTaskQueue>( new Controller(this)), // Ownership transferred on the next line.
underlying_task_runner_(MakeRefCounted<internal::MessageLoopTaskRunner>(
WrapUnique(message_loop_controller_))), WrapUnique(message_loop_controller_))),
unbound_task_runner_(MakeRefCounted<internal::MessageLoopTaskRunner>( sequenced_task_source_(underlying_task_runner_.get()),
incoming_task_queue_)), task_runner_(underlying_task_runner_) {
task_runner_(unbound_task_runner_) {
// If type is TYPE_CUSTOM non-null pump_factory must be given. // If type is TYPE_CUSTOM non-null pump_factory must be given.
DCHECK(type_ != TYPE_CUSTOM || !pump_factory_.is_null()); DCHECK(type_ != TYPE_CUSTOM || !pump_factory_.is_null());
...@@ -316,9 +326,8 @@ void MessageLoop::BindToCurrentThread() { ...@@ -316,9 +326,8 @@ void MessageLoop::BindToCurrentThread() {
<< "should only have one message loop per thread"; << "should only have one message loop per thread";
MessageLoopCurrent::BindToCurrentThreadInternal(this); MessageLoopCurrent::BindToCurrentThreadInternal(this);
underlying_task_runner_->BindToCurrentThread();
message_loop_controller_->StartScheduling(); message_loop_controller_->StartScheduling();
unbound_task_runner_->BindToCurrentThread();
unbound_task_runner_ = nullptr;
SetThreadTaskRunnerHandle(); SetThreadTaskRunnerHandle();
thread_id_ = PlatformThread::CurrentId(); thread_id_ = PlatformThread::CurrentId();
...@@ -345,25 +354,19 @@ std::string MessageLoop::GetThreadName() const { ...@@ -345,25 +354,19 @@ std::string MessageLoop::GetThreadName() const {
void MessageLoop::SetTaskRunner( void MessageLoop::SetTaskRunner(
scoped_refptr<SingleThreadTaskRunner> task_runner) { scoped_refptr<SingleThreadTaskRunner> task_runner) {
DCHECK(task_runner); DCHECK(task_runner);
if (!unbound_task_runner_) { if (thread_id_ == kInvalidThreadId) {
// ThreadTaskRunnerHandle will be set during BindToCurrentThread().
task_runner_ = std::move(task_runner);
} else {
// Once MessageLoop is bound, |task_runner_| may only be altered on the
// bound thread.
DCHECK_CALLED_ON_VALID_THREAD(bound_thread_checker_); DCHECK_CALLED_ON_VALID_THREAD(bound_thread_checker_);
DCHECK(task_runner->BelongsToCurrentThread()); DCHECK(task_runner->BelongsToCurrentThread());
task_runner_ = std::move(task_runner); task_runner_ = std::move(task_runner);
SetThreadTaskRunnerHandle(); SetThreadTaskRunnerHandle();
} else {
// ThreadTaskRunnerHandle will be set during BindToCurrentThread().
task_runner_ = std::move(task_runner);
} }
} }
void MessageLoop::ClearTaskRunnerForTesting() {
DCHECK_CALLED_ON_VALID_THREAD(bound_thread_checker_);
DCHECK(!unbound_task_runner_);
task_runner_ = nullptr;
thread_task_runner_handle_.reset();
}
void MessageLoop::Run(bool application_tasks_allowed) { void MessageLoop::Run(bool application_tasks_allowed) {
DCHECK_CALLED_ON_VALID_THREAD(bound_thread_checker_); DCHECK_CALLED_ON_VALID_THREAD(bound_thread_checker_);
if (application_tasks_allowed && !task_execution_allowed_) { if (application_tasks_allowed && !task_execution_allowed_) {
...@@ -384,7 +387,7 @@ void MessageLoop::Quit() { ...@@ -384,7 +387,7 @@ void MessageLoop::Quit() {
void MessageLoop::EnsureWorkScheduled() { void MessageLoop::EnsureWorkScheduled() {
DCHECK_CALLED_ON_VALID_THREAD(bound_thread_checker_); DCHECK_CALLED_ON_VALID_THREAD(bound_thread_checker_);
if (incoming_task_queue_->triage_tasks().HasTasks()) if (sequenced_task_source_->HasTasks())
pump_->ScheduleWork(); pump_->ScheduleWork();
} }
...@@ -400,8 +403,8 @@ bool MessageLoop::ProcessNextDelayedNonNestableTask() { ...@@ -400,8 +403,8 @@ bool MessageLoop::ProcessNextDelayedNonNestableTask() {
if (RunLoop::IsNestedOnCurrentThread()) if (RunLoop::IsNestedOnCurrentThread())
return false; return false;
while (incoming_task_queue_->deferred_tasks().HasTasks()) { while (pending_task_queue_.deferred_tasks().HasTasks()) {
PendingTask pending_task = incoming_task_queue_->deferred_tasks().Pop(); PendingTask pending_task = pending_task_queue_.deferred_tasks().Pop();
if (!pending_task.task.IsCancelled()) { if (!pending_task.task.IsCancelled()) {
RunTask(&pending_task); RunTask(&pending_task);
return true; return true;
...@@ -440,17 +443,41 @@ bool MessageLoop::DeferOrRunPendingTask(PendingTask pending_task) { ...@@ -440,17 +443,41 @@ bool MessageLoop::DeferOrRunPendingTask(PendingTask pending_task) {
// We couldn't run the task now because we're in a nested run loop // We couldn't run the task now because we're in a nested run loop
// and the task isn't nestable. // and the task isn't nestable.
incoming_task_queue_->deferred_tasks().Push(std::move(pending_task)); pending_task_queue_.deferred_tasks().Push(std::move(pending_task));
return false; return false;
} }
void MessageLoop::DeletePendingTasks() { void MessageLoop::DeletePendingTasks() {
incoming_task_queue_->triage_tasks().Clear(); // Delete all currently pending tasks but not tasks potentially posted from
incoming_task_queue_->deferred_tasks().Clear(); // their destructors. See ~MessageLoop() for the full logic mitigating against
// infite loops when clearing pending tasks. The ScopedClosureRunner below
// will be bound to a task posted at the end of the queue. After it is posted,
// tasks will be deleted one by one, when the bound ScopedClosureRunner is
// deleted and sets |deleted_all_originally_pending|, we know we've deleted
// all originally pending tasks.
bool deleted_all_originally_pending = false;
ScopedClosureRunner capture_deleted_all_originally_pending(BindOnce(
[](bool* deleted_all_originally_pending) {
*deleted_all_originally_pending = true;
},
Unretained(&deleted_all_originally_pending)));
sequenced_task_source_->InjectTask(
BindOnce([](ScopedClosureRunner) {},
std::move(capture_deleted_all_originally_pending)));
while (!deleted_all_originally_pending) {
PendingTask pending_task = sequenced_task_source_->TakeTask();
// New delayed tasks should be deleted after older ones.
if (!pending_task.delayed_run_time.is_null())
pending_task_queue_.delayed_tasks().Push(std::move(pending_task));
}
pending_task_queue_.deferred_tasks().Clear();
// TODO(robliao): Determine if we can move delayed task destruction before // TODO(robliao): Determine if we can move delayed task destruction before
// deferred tasks to maintain the MessagePump DoWork, DoDelayedWork, and // deferred tasks to maintain the MessagePump DoWork, DoDelayedWork, and
// DoIdleWork processing order. // DoIdleWork processing order.
incoming_task_queue_->delayed_tasks().Clear(); pending_task_queue_.delayed_tasks().Clear();
} }
void MessageLoop::ScheduleWork() { void MessageLoop::ScheduleWork() {
...@@ -466,17 +493,17 @@ bool MessageLoop::DoWork() { ...@@ -466,17 +493,17 @@ bool MessageLoop::DoWork() {
return false; return false;
// Execute oldest task. // Execute oldest task.
while (incoming_task_queue_->triage_tasks().HasTasks()) { while (sequenced_task_source_->HasTasks()) {
PendingTask pending_task = incoming_task_queue_->triage_tasks().Pop(); PendingTask pending_task = sequenced_task_source_->TakeTask();
if (pending_task.task.IsCancelled()) if (pending_task.task.IsCancelled())
continue; continue;
if (!pending_task.delayed_run_time.is_null()) { if (!pending_task.delayed_run_time.is_null()) {
int sequence_num = pending_task.sequence_num; int sequence_num = pending_task.sequence_num;
TimeTicks delayed_run_time = pending_task.delayed_run_time; TimeTicks delayed_run_time = pending_task.delayed_run_time;
incoming_task_queue_->delayed_tasks().Push(std::move(pending_task)); pending_task_queue_.delayed_tasks().Push(std::move(pending_task));
// If we changed the topmost task, then it is time to reschedule. // If we changed the topmost task, then it is time to reschedule.
if (incoming_task_queue_->delayed_tasks().Peek().sequence_num == if (pending_task_queue_.delayed_tasks().Peek().sequence_num ==
sequence_num) { sequence_num) {
pump_->ScheduleDelayedWork(delayed_run_time); pump_->ScheduleDelayedWork(delayed_run_time);
} }
...@@ -491,7 +518,7 @@ bool MessageLoop::DoWork() { ...@@ -491,7 +518,7 @@ bool MessageLoop::DoWork() {
bool MessageLoop::DoDelayedWork(TimeTicks* next_delayed_work_time) { bool MessageLoop::DoDelayedWork(TimeTicks* next_delayed_work_time) {
if (!task_execution_allowed_ || if (!task_execution_allowed_ ||
!incoming_task_queue_->delayed_tasks().HasTasks()) { !pending_task_queue_.delayed_tasks().HasTasks()) {
*next_delayed_work_time = TimeTicks(); *next_delayed_work_time = TimeTicks();
return false; return false;
} }
...@@ -504,7 +531,7 @@ bool MessageLoop::DoDelayedWork(TimeTicks* next_delayed_work_time) { ...@@ -504,7 +531,7 @@ bool MessageLoop::DoDelayedWork(TimeTicks* next_delayed_work_time) {
// efficient we'll be at handling the tasks. // efficient we'll be at handling the tasks.
TimeTicks next_run_time = TimeTicks next_run_time =
incoming_task_queue_->delayed_tasks().Peek().delayed_run_time; pending_task_queue_.delayed_tasks().Peek().delayed_run_time;
if (next_run_time > recent_time_) { if (next_run_time > recent_time_) {
recent_time_ = TimeTicks::Now(); // Get a better view of Now(); recent_time_ = TimeTicks::Now(); // Get a better view of Now();
...@@ -514,11 +541,11 @@ bool MessageLoop::DoDelayedWork(TimeTicks* next_delayed_work_time) { ...@@ -514,11 +541,11 @@ bool MessageLoop::DoDelayedWork(TimeTicks* next_delayed_work_time) {
} }
} }
PendingTask pending_task = incoming_task_queue_->delayed_tasks().Pop(); PendingTask pending_task = pending_task_queue_.delayed_tasks().Pop();
if (incoming_task_queue_->delayed_tasks().HasTasks()) { if (pending_task_queue_.delayed_tasks().HasTasks()) {
*next_delayed_work_time = CapAtOneDay( *next_delayed_work_time = CapAtOneDay(
incoming_task_queue_->delayed_tasks().Peek().delayed_run_time); pending_task_queue_.delayed_tasks().Peek().delayed_run_time);
} }
return DeferOrRunPendingTask(std::move(pending_task)); return DeferOrRunPendingTask(std::move(pending_task));
...@@ -544,15 +571,14 @@ bool MessageLoop::DoIdleWork() { ...@@ -544,15 +571,14 @@ bool MessageLoop::DoIdleWork() {
// only one UI thread per process and, for practical purposes, restricting // only one UI thread per process and, for practical purposes, restricting
// the MessageLoop diagnostic metrics to it yields similar information. // the MessageLoop diagnostic metrics to it yields similar information.
if (type_ == TYPE_UI) if (type_ == TYPE_UI)
incoming_task_queue_->ReportMetricsOnIdle(); pending_task_queue_.ReportMetricsOnIdle();
#if defined(OS_WIN) #if defined(OS_WIN)
// On Windows we activate the high resolution timer so that the wait // On Windows we activate the high resolution timer so that the wait
// _if_ triggered by the timer happens with good resolution. If we don't // _if_ triggered by the timer happens with good resolution. If we don't
// do this the default resolution is 15ms which might not be acceptable // do this the default resolution is 15ms which might not be acceptable
// for some tasks. // for some tasks.
need_high_res_timers = need_high_res_timers = pending_task_queue_.HasPendingHighResolutionTasks();
incoming_task_queue_->HasPendingHighResolutionTasks();
#endif #endif
} }
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#include "base/memory/scoped_refptr.h" #include "base/memory/scoped_refptr.h"
#include "base/message_loop/incoming_task_queue.h" #include "base/message_loop/incoming_task_queue.h"
#include "base/message_loop/message_loop_current.h" #include "base/message_loop/message_loop_current.h"
#include "base/message_loop/message_loop_task_runner.h"
#include "base/message_loop/message_pump.h" #include "base/message_loop/message_pump.h"
#include "base/message_loop/timer_slack.h" #include "base/message_loop/timer_slack.h"
#include "base/observer_list.h" #include "base/observer_list.h"
...@@ -30,8 +29,13 @@ ...@@ -30,8 +29,13 @@
namespace base { namespace base {
class SequencedTaskSource;
class ThreadTaskRunnerHandle; class ThreadTaskRunnerHandle;
namespace internal {
class MessageLoopTaskRunner;
}
// A MessageLoop is used to process events for a particular thread. There is // A MessageLoop is used to process events for a particular thread. There is
// at most one MessageLoop instance per thread. // at most one MessageLoop instance per thread.
// //
...@@ -169,10 +173,6 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate, ...@@ -169,10 +173,6 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate,
// already bound, this must be called on the thread to which it is bound. // already bound, this must be called on the thread to which it is bound.
void SetTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner); void SetTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner);
// Clears task_runner() and the ThreadTaskRunnerHandle for the target thread.
// Must be called on the thread to which the message loop is bound.
void ClearTaskRunnerForTesting();
// TODO(https://crbug.com/825327): Remove users of TaskObservers through // TODO(https://crbug.com/825327): Remove users of TaskObservers through
// MessageLoop::current() and migrate the type back here. // MessageLoop::current() and migrate the type back here.
using TaskObserver = MessageLoopCurrent::TaskObserver; using TaskObserver = MessageLoopCurrent::TaskObserver;
...@@ -211,7 +211,6 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate, ...@@ -211,7 +211,6 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate,
void BindToCurrentThread(); void BindToCurrentThread();
private: private:
friend class internal::IncomingTaskQueue;
friend class MessageLoopCurrent; friend class MessageLoopCurrent;
friend class MessageLoopCurrentForIO; friend class MessageLoopCurrentForIO;
friend class MessageLoopCurrentForUI; friend class MessageLoopCurrentForUI;
...@@ -302,15 +301,26 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate, ...@@ -302,15 +301,26 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate,
ObserverList<TaskObserver> task_observers_; ObserverList<TaskObserver> task_observers_;
// Pointer to this MessageLoop's Controller, valid until the reference to // Pointer to this MessageLoop's Controller, valid throughout this
// |incoming_task_queue_| is dropped below. // MessageLoop's lifetime (until |underlying_task_runner_| is released at the
// end of ~MessageLoop()).
Controller* const message_loop_controller_; Controller* const message_loop_controller_;
scoped_refptr<internal::IncomingTaskQueue> incoming_task_queue_;
// A task runner which we haven't bound to a thread yet. // The task runner this MessageLoop will extract its tasks from. By default,
scoped_refptr<internal::MessageLoopTaskRunner> unbound_task_runner_; // it will also be bound as the ThreadTaskRunnerHandle on the current thread.
// That default can be overridden by SetTaskRunner() but this MessageLoop will
// nonetheless take its tasks from |underlying_task_runner_| (the overrider is
// responsible for doing the routing). This member must be before
// |pending_task_queue| as it must outlive it.
const scoped_refptr<internal::MessageLoopTaskRunner> underlying_task_runner_;
// The source of tasks for this MessageLoop. Currently this is always
// |underlying_task_runner_|. TODO(gab): Make this customizable.
SequencedTaskSource* const sequenced_task_source_;
internal::IncomingTaskQueue pending_task_queue_;
// The task runner associated with this message loop. // The task runner exposed by this message loop.
scoped_refptr<SingleThreadTaskRunner> task_runner_; scoped_refptr<SingleThreadTaskRunner> task_runner_;
std::unique_ptr<ThreadTaskRunnerHandle> thread_task_runner_handle_; std::unique_ptr<ThreadTaskRunnerHandle> thread_task_runner_handle_;
......
...@@ -8,37 +8,60 @@ ...@@ -8,37 +8,60 @@
#include "base/location.h" #include "base/location.h"
#include "base/logging.h" #include "base/logging.h"
#include "base/message_loop/incoming_task_queue.h" #include "base/metrics/histogram_macros.h"
#include "build/build_config.h"
namespace base { namespace base {
namespace internal { namespace internal {
MessageLoopTaskRunner::MessageLoopTaskRunner( namespace {
scoped_refptr<IncomingTaskQueue> incoming_queue)
: incoming_queue_(incoming_queue), valid_thread_id_(kInvalidThreadId) { #if DCHECK_IS_ON()
// Delays larger than this are often bogus, and a warning should be emitted in
// debug builds to warn developers. http://crbug.com/450045
constexpr TimeDelta kTaskDelayWarningThreshold = TimeDelta::FromDays(14);
#endif
TimeTicks CalculateDelayedRuntime(const Location& from_here, TimeDelta delay) {
DLOG_IF(WARNING, delay > kTaskDelayWarningThreshold)
<< "Requesting super-long task delay period of " << delay.InSeconds()
<< " seconds from here: " << from_here.ToString();
DCHECK_GE(delay, TimeDelta()) << "delay should not be negative";
return delay > TimeDelta() ? TimeTicks::Now() + delay : TimeTicks();
} }
} // namespace
MessageLoopTaskRunner::MessageLoopTaskRunner(
std::unique_ptr<SequencedTaskSource::Observer> task_source_observer)
: task_source_observer_(std::move(task_source_observer)) {}
void MessageLoopTaskRunner::BindToCurrentThread() { void MessageLoopTaskRunner::BindToCurrentThread() {
AutoLock lock(valid_thread_id_lock_); AutoLock lock(valid_thread_id_lock_);
DCHECK_EQ(kInvalidThreadId, valid_thread_id_); DCHECK_EQ(kInvalidThreadId, valid_thread_id_);
valid_thread_id_ = PlatformThread::CurrentId(); valid_thread_id_ = PlatformThread::CurrentId();
} }
void MessageLoopTaskRunner::Shutdown() {
AutoLock lock(incoming_queue_lock_);
accept_new_tasks_ = false;
}
bool MessageLoopTaskRunner::PostDelayedTask(const Location& from_here, bool MessageLoopTaskRunner::PostDelayedTask(const Location& from_here,
OnceClosure task, OnceClosure task,
base::TimeDelta delay) { base::TimeDelta delay) {
DCHECK(!task.is_null()) << from_here.ToString(); return AddToIncomingQueue(from_here, std::move(task), delay,
return incoming_queue_->AddToIncomingQueue(from_here, std::move(task), delay, Nestable::kNestable);
Nestable::kNestable);
} }
bool MessageLoopTaskRunner::PostNonNestableDelayedTask( bool MessageLoopTaskRunner::PostNonNestableDelayedTask(
const Location& from_here, const Location& from_here,
OnceClosure task, OnceClosure task,
base::TimeDelta delay) { base::TimeDelta delay) {
DCHECK(!task.is_null()) << from_here.ToString(); return AddToIncomingQueue(from_here, std::move(task), delay,
return incoming_queue_->AddToIncomingQueue(from_here, std::move(task), delay, Nestable::kNonNestable);
Nestable::kNonNestable);
} }
bool MessageLoopTaskRunner::RunsTasksInCurrentSequence() const { bool MessageLoopTaskRunner::RunsTasksInCurrentSequence() const {
...@@ -46,8 +69,108 @@ bool MessageLoopTaskRunner::RunsTasksInCurrentSequence() const { ...@@ -46,8 +69,108 @@ bool MessageLoopTaskRunner::RunsTasksInCurrentSequence() const {
return valid_thread_id_ == PlatformThread::CurrentId(); return valid_thread_id_ == PlatformThread::CurrentId();
} }
PendingTask MessageLoopTaskRunner::TakeTask() {
// Must be called on execution sequence, unless clearing tasks from an unbound
// MessageLoop.
DCHECK(RunsTasksInCurrentSequence() || valid_thread_id_ == kInvalidThreadId);
// HasTasks() will reload the queue if necessary (there should always be
// pending tasks by contract).
const bool has_tasks = HasTasks();
DCHECK(has_tasks);
PendingTask pending_task = std::move(outgoing_queue_.front());
outgoing_queue_.pop();
return pending_task;
}
bool MessageLoopTaskRunner::HasTasks() {
// Must be called on execution sequence, unless clearing tasks from an unbound
// MessageLoop.
DCHECK(RunsTasksInCurrentSequence() || valid_thread_id_ == kInvalidThreadId);
if (outgoing_queue_.empty()) {
AutoLock lock(incoming_queue_lock_);
incoming_queue_.swap(outgoing_queue_);
outgoing_queue_empty_ = outgoing_queue_.empty();
}
return !outgoing_queue_.empty();
}
void MessageLoopTaskRunner::InjectTask(OnceClosure task) {
// Must be called on execution sequence, unless clearing tasks from an unbound
// MessageLoop.
DCHECK(RunsTasksInCurrentSequence() || valid_thread_id_ == kInvalidThreadId);
bool success = this->PostTask(FROM_HERE, std::move(task));
DCHECK(success) << "Injected a task in a dead task runner.";
}
MessageLoopTaskRunner::~MessageLoopTaskRunner() = default; MessageLoopTaskRunner::~MessageLoopTaskRunner() = default;
bool MessageLoopTaskRunner::AddToIncomingQueue(const Location& from_here,
OnceClosure task,
TimeDelta delay,
Nestable nestable) {
DCHECK(task_source_observer_)
<< "SetObserver() must be called before posting tasks";
// Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
// for details.
CHECK(task);
PendingTask pending_task(from_here, std::move(task),
CalculateDelayedRuntime(from_here, delay), nestable);
#if defined(OS_WIN)
// We consider the task needs a high resolution timer if the delay is
// more than 0 and less than 32ms. This caps the relative error to
// less than 50% : a 33ms wait can wake at 48ms since the default
// resolution on Windows is between 10 and 15ms.
if (delay > TimeDelta() &&
delay.InMilliseconds() < (2 * Time::kMinLowResolutionThresholdMs)) {
pending_task.is_high_res = true;
}
#endif
if (!delay.is_zero())
UMA_HISTOGRAM_LONG_TIMES("MessageLoop.DelayedTaskQueue.PostedDelay", delay);
bool did_queue_task = false;
bool was_empty;
{
AutoLock auto_lock(incoming_queue_lock_);
if (accept_new_tasks_) {
// Initialize the sequence number. The sequence number is used for delayed
// tasks (to facilitate FIFO sorting when two tasks have the same
// delayed_run_time value) and for identifying the task in about:tracing.
pending_task.sequence_num = next_sequence_num_++;
task_source_observer_->WillQueueTask(&pending_task);
was_empty = outgoing_queue_empty_ && incoming_queue_.empty();
incoming_queue_.push(std::move(pending_task));
did_queue_task = true;
}
}
if (!did_queue_task) {
// Clear the pending task outside of |incoming_queue_lock_| to prevent any
// chance of self-deadlock if destroying a task also posts a task to this
// queue.
pending_task.task.Reset();
return false;
}
// Let |task_source_observer_| know about the task just queued. It's important
// to do this outside of |incoming_queue_lock_| to avoid blocking tasks
// incoming from other threads on the resolution of DidQueueTask().
task_source_observer_->DidQueueTask(was_empty);
return true;
}
} // namespace internal } // namespace internal
} // namespace base } // namespace base
...@@ -9,28 +9,42 @@ ...@@ -9,28 +9,42 @@
#include "base/callback.h" #include "base/callback.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/message_loop/sequenced_task_source.h"
#include "base/pending_task.h" #include "base/pending_task.h"
#include "base/single_thread_task_runner.h" #include "base/single_thread_task_runner.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h" #include "base/threading/platform_thread.h"
#include "base/time/time.h"
namespace base { namespace base {
namespace internal { namespace internal {
class IncomingTaskQueue; // A SingleThreadTaskRunner which receives and queues tasks destined to its
// owning MessageLoop (vending them back to its IncomingTaskQueue over the
// A stock implementation of SingleThreadTaskRunner that is created and managed // SequencedTaskSource interface). It does not manage delays (i.e. tasks
// by a MessageLoop. For now a MessageLoopTaskRunner can only be created as // returned by TakeTask() might have a non-zero delay).
// part of a MessageLoop. class BASE_EXPORT MessageLoopTaskRunner : public SingleThreadTaskRunner,
class BASE_EXPORT MessageLoopTaskRunner : public SingleThreadTaskRunner { public SequencedTaskSource {
public: public:
explicit MessageLoopTaskRunner( // Constructs a MessageLoopTaskRunner which will notify |task_source_observer|
scoped_refptr<IncomingTaskQueue> incoming_queue); // about tasks it receives. |task_source_observer| will be bound to this
// MessageLoopTaskRunner's lifetime. Ownership is required as opposed to a raw
// pointer since MessageLoopTaskRunner impls tend to receive tasks beyond the
// destination's lifetime. For the same reasons, |task_source_observer| needs
// to support being invoked racily during shutdown.
MessageLoopTaskRunner(
std::unique_ptr<SequencedTaskSource::Observer> task_source_observer);
// Initialize this message loop task runner on the current thread. // Initialize this MessageLoopTaskRunner on the current thread.
void BindToCurrentThread(); void BindToCurrentThread();
// SingleThreadTaskRunner implementation // Instructs this task runner to stop accepting tasks, this cannot be undone.
// Note that the registered SequencedTaskSource::Observer may still racily
// receive a few DidQueueTask() calls while the Shutdown() signal propagates
// to other threads and it needs to support that.
void Shutdown();
// SingleThreadTaskRunner:
bool PostDelayedTask(const Location& from_here, bool PostDelayedTask(const Location& from_here,
OnceClosure task, OnceClosure task,
TimeDelta delay) override; TimeDelta delay) override;
...@@ -39,18 +53,54 @@ class BASE_EXPORT MessageLoopTaskRunner : public SingleThreadTaskRunner { ...@@ -39,18 +53,54 @@ class BASE_EXPORT MessageLoopTaskRunner : public SingleThreadTaskRunner {
TimeDelta delay) override; TimeDelta delay) override;
bool RunsTasksInCurrentSequence() const override; bool RunsTasksInCurrentSequence() const override;
// SequencedTaskSource:
PendingTask TakeTask() override;
bool HasTasks() override;
void InjectTask(OnceClosure task) override;
private: private:
friend class RefCountedThreadSafe<MessageLoopTaskRunner>; friend class RefCountedThreadSafe<MessageLoopTaskRunner>;
~MessageLoopTaskRunner() override; ~MessageLoopTaskRunner() override;
// The incoming queue receiving all posted tasks. // Appends a task to the incoming queue. Thread-safe.
scoped_refptr<IncomingTaskQueue> incoming_queue_; // Returns true if the task was successfully added to the queue.
bool AddToIncomingQueue(const Location& from_here,
OnceClosure task,
TimeDelta delay,
Nestable nestable);
// ID of the thread |this| was created on. Could be accessed on multiple // ID of the thread |this| was created on. Could be accessed on multiple
// threads, protected by |valid_thread_id_lock_|. // threads, protected by |valid_thread_id_lock_|.
PlatformThreadId valid_thread_id_; PlatformThreadId valid_thread_id_ = kInvalidThreadId;
mutable Lock valid_thread_id_lock_; mutable Lock valid_thread_id_lock_;
std::unique_ptr<SequencedTaskSource::Observer> task_source_observer_;
// Tasks to be returned to TakeTask(). Reloaded from |incoming_queue_| when
// it becomes empty.
TaskQueue outgoing_queue_;
// Synchronizes access to all members below this line.
base::Lock incoming_queue_lock_;
// An incoming queue of tasks that are acquired under a mutex for processing
// on this instance's thread. These tasks have not yet been been pushed to
// |outgoing_queue_|.
TaskQueue incoming_queue_;
// True if the |outgoing_queue_| is empty. Toggled under
// |incoming_queue_lock_| when reloading the work queue so that
// PostPendingTaskLockRequired() can tell, without accessing the thread unsafe
// |outgoing_queue_|, if this SequencedTaskSource has been made non-empty by a
// PostTask() (and needs to inform its Observer).
bool outgoing_queue_empty_ = true;
// True if new tasks should be accepted.
bool accept_new_tasks_ = true;
// The next sequence number to use for delayed tasks.
int next_sequence_num_ = 0;
DISALLOW_COPY_AND_ASSIGN(MessageLoopTaskRunner); DISALLOW_COPY_AND_ASSIGN(MessageLoopTaskRunner);
}; };
......
...@@ -12,10 +12,10 @@ ...@@ -12,10 +12,10 @@
#include "base/debug/task_annotator.h" #include "base/debug/task_annotator.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/scoped_refptr.h" #include "base/memory/scoped_refptr.h"
#include "base/message_loop/incoming_task_queue.h"
#include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_task_runner.h" #include "base/message_loop/message_loop_task_runner.h"
#include "base/message_loop/message_pump.h" #include "base/message_loop/message_pump.h"
#include "base/message_loop/sequenced_task_source.h"
#include "base/run_loop.h" #include "base/run_loop.h"
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
#include "base/time/time.h" #include "base/time/time.h"
...@@ -33,9 +33,9 @@ constexpr TimeDelta kPostTaskPerfTestDuration = ...@@ -33,9 +33,9 @@ constexpr TimeDelta kPostTaskPerfTestDuration =
} // namespace } // namespace
class FakeObserver : public internal::IncomingTaskQueue::Observer { class FakeObserver : public SequencedTaskSource::Observer {
public: public:
// IncomingTaskQueue::Observer // SequencedTaskSource::Observer
void WillQueueTask(PendingTask* task) override {} void WillQueueTask(PendingTask* task) override {}
void DidQueueTask(bool was_empty) override {} void DidQueueTask(bool was_empty) override {}
...@@ -51,24 +51,21 @@ class BasicPostTaskPerfTest : public testing::Test { ...@@ -51,24 +51,21 @@ class BasicPostTaskPerfTest : public testing::Test {
base::TimeTicks start = base::TimeTicks::Now(); base::TimeTicks start = base::TimeTicks::Now();
base::TimeTicks now; base::TimeTicks now;
FakeObserver* task_source_observer_raw = task_source_observer.get(); FakeObserver* task_source_observer_raw = task_source_observer.get();
scoped_refptr<internal::IncomingTaskQueue> queue( auto message_loop_task_runner =
base::MakeRefCounted<internal::IncomingTaskQueue>( MakeRefCounted<internal::MessageLoopTaskRunner>(
std::move(task_source_observer))); std::move(task_source_observer));
scoped_refptr<SingleThreadTaskRunner> task_runner(
base::MakeRefCounted<internal::MessageLoopTaskRunner>(queue));
uint32_t num_posted = 0; uint32_t num_posted = 0;
do { do {
for (int i = 0; i < batch_size; ++i) { for (int i = 0; i < batch_size; ++i) {
for (int j = 0; j < tasks_per_reload; ++j) { for (int j = 0; j < tasks_per_reload; ++j) {
task_runner->PostTask(FROM_HERE, DoNothing()); message_loop_task_runner->PostTask(FROM_HERE, DoNothing());
num_posted++; num_posted++;
} }
TaskQueue loop_local_queue; // The outgoing queue will only be reloaded when first entering this
queue->ReloadWorkQueue(&loop_local_queue); // loop.
while (!loop_local_queue.empty()) { while (message_loop_task_runner->HasTasks()) {
PendingTask t = std::move(loop_local_queue.front()); auto task = message_loop_task_runner->TakeTask();
loop_local_queue.pop(); task_source_observer_raw->RunTask(&task);
task_source_observer_raw->RunTask(&t);
} }
} }
......
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_MESSAGE_LOOP_SEQUENCED_TASK_SOURCE_H_
#define BASE_MESSAGE_LOOP_SEQUENCED_TASK_SOURCE_H_
#include "base/callback.h"
#include "base/pending_task.h"
#include "base/time/time.h"
namespace base {
// A source of tasks to be executed sequentially. Unless specified otherwise,
// methods below are not thread-safe (must be called from the executing
// sequence).
// TODO(scheduler-dev): Coalesce with
// base::sequence_manager::SequencedTaskSource.
class SequencedTaskSource {
public:
class Observer {
public:
virtual ~Observer() = default;
// Notifies this Observer that |task| is about to be enqueued in the
// SequencedTaskSource it observes.
// WillQueueTask() may be invoked from any thread.
virtual void WillQueueTask(PendingTask* task) = 0;
// Notifies this Observer that a task was enqueued in the
// SequencedTaskSource it observes. |was_empty| is true if the task source
// was empty (i.e. |!HasTasks()|) before this task was posted.
// DidQueueTask() may be invoked from any thread.
virtual void DidQueueTask(bool was_empty) = 0;
};
virtual ~SequencedTaskSource() = default;
// Take a next task to run from a sequence. Must only be called if
// HasTasks() returns true.
virtual PendingTask TakeTask() = 0;
// Returns true if this SequencedTaskSource will return a task from the next
// TakeTask() call.
virtual bool HasTasks() = 0;
// Injects |task| at the end of this SequencedTaskSource (such that it will be
// the last task returned by TakeTask() if no other task are posted after this
// point). TODO(gab): This is only required to support clearing tasks on
// shutdown, maybe leaking tasks on shutdown is a better alternative.
virtual void InjectTask(OnceClosure task) = 0;
};
} // namespace base
#endif // BASE_MESSAGE_LOOP_SEQUENCED_TASK_SOURCE_H_
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment