Commit 4714a9fc authored by Gabriel Charette's avatar Gabriel Charette Committed by Commit Bot

[MessageLoop] Refactor ScheduleWork and TaskAnnotator logic out of IncomingTaskQueue

This was extracted from
https://chromium-review.googlesource.com/c/chromium/src/+/1088762/17
in an attempt to simplify it (and is now a precursor to it).
(MessageLoop::Controller was IncomingTaskQueue::MessageLoopController
there but after grokking the resulting CL it's simpler under MessageLoop
as done in this CL)

This CL splits the logic in IncomingTaskQueue which took care of
scheduling the MessageLoop into a dedicated class. In that follow-up CL:
MessageLoopTaskRunner will interact directly with the task Observer and
take IncomingTaskQueue completely out of the picture (merely a data
structure holding various task queues at that point).

IncomingTaskQueue also took care of detaching on MessageLoop shutdown
which was moved to the new Controller class as well.

message_loop.cc is the best place for this extracted logic as it all
pertains precisely to MessageLoop's implementation detail (how
ScheduleWork should be invoked).

This CL simplifies locking as well by having a clear separation between
the two locks instead of two locks in the same class used
interchangibly. |incoming_queue_lock_| is now strictly for incoming
tasks. |message_loop_lock_| is now strictly for
ScheduleWork()/DisconnectFromParent().

Note: |message_loop_scheduled_| was dropped as it was redundant (always
equal to |!was_empty|).

Performance wise, the perf tests show that this change is a noop :
 * While BasicPostTaskPerfTest became simpler (executed less code) with
   this CL :
     The new BasicPostTaskPerfTest w/ MockObserverSimulatingOverhead
     reintroduces that overhead to show that it's still the same (or
     slightly in favor of this CL).
 * And the IntegratedPostTaskPerfTest are the same.
 * Augmented perf tests to 30 seconds which yields more reliable results.
   (and ran old ones under 30s mode too when comparing)
 * Results :
     https://docs.google.com/spreadsheets/d/100wYvbCI_dJ7gRnQiSsYaTb5OJnbF_muL6LyQWJLXSU/edit

Bug: 860252
Change-Id: I22de2409d52414524cc125b0e2fe08e2c516fcbe
Reviewed-on: https://chromium-review.googlesource.com/1127262
Commit-Queue: Gabriel Charette <gab@chromium.org>
Reviewed-by: default avatardanakj <danakj@chromium.org>
Reviewed-by: default avatarkylechar <kylechar@chromium.org>
Cr-Commit-Position: refs/heads/master@{#574048}
parent 2f6a3595
...@@ -10,7 +10,6 @@ ...@@ -10,7 +10,6 @@
#include "base/bind.h" #include "base/bind.h"
#include "base/callback_helpers.h" #include "base/callback_helpers.h"
#include "base/location.h" #include "base/location.h"
#include "base/message_loop/message_loop.h"
#include "base/metrics/histogram_macros.h" #include "base/metrics/histogram_macros.h"
#include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event.h"
#include "base/time/time.h" #include "base/time/time.h"
...@@ -38,13 +37,17 @@ TimeTicks CalculateDelayedRuntime(TimeDelta delay) { ...@@ -38,13 +37,17 @@ TimeTicks CalculateDelayedRuntime(TimeDelta delay) {
} // namespace } // namespace
IncomingTaskQueue::IncomingTaskQueue(MessageLoop* message_loop) IncomingTaskQueue::IncomingTaskQueue(
: triage_tasks_(this), message_loop_(message_loop) { std::unique_ptr<Observer> task_queue_observer)
: task_queue_observer_(std::move(task_queue_observer)),
triage_tasks_(this) {
// The constructing sequence is not necessarily the running sequence, e.g. in // The constructing sequence is not necessarily the running sequence, e.g. in
// the case of a MessageLoop created unbound. // the case of a MessageLoop created unbound.
DETACH_FROM_SEQUENCE(sequence_checker_); DETACH_FROM_SEQUENCE(sequence_checker_);
} }
IncomingTaskQueue::~IncomingTaskQueue() = default;
bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here, bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here,
OnceClosure task, OnceClosure task,
TimeDelta delay, TimeDelta delay,
...@@ -75,43 +78,9 @@ bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here, ...@@ -75,43 +78,9 @@ bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here,
return PostPendingTask(&pending_task); return PostPendingTask(&pending_task);
} }
void IncomingTaskQueue::WillDestroyCurrentMessageLoop() { void IncomingTaskQueue::Shutdown() {
{ AutoLock auto_lock(incoming_queue_lock_);
AutoLock auto_lock(incoming_queue_lock_); accept_new_tasks_ = false;
accept_new_tasks_ = false;
}
{
AutoLock auto_lock(message_loop_lock_);
message_loop_ = nullptr;
}
}
void IncomingTaskQueue::StartScheduling() {
bool schedule_work;
{
AutoLock lock(incoming_queue_lock_);
DCHECK(!is_ready_for_scheduling_);
DCHECK(!message_loop_scheduled_);
is_ready_for_scheduling_ = true;
schedule_work = !incoming_queue_.empty();
if (schedule_work)
message_loop_scheduled_ = true;
}
if (schedule_work) {
DCHECK(message_loop_);
AutoLock auto_lock(message_loop_lock_);
message_loop_->ScheduleWork();
}
}
IncomingTaskQueue::~IncomingTaskQueue() {
// Verify that WillDestroyCurrentMessageLoop() has been called.
DCHECK(!message_loop_);
}
void IncomingTaskQueue::RunTask(PendingTask* pending_task) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
task_annotator_.RunTask("MessageLoop::PostTask", pending_task);
} }
void IncomingTaskQueue::ReportMetricsOnIdle() const { void IncomingTaskQueue::ReportMetricsOnIdle() const {
...@@ -283,35 +252,26 @@ bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) { ...@@ -283,35 +252,26 @@ bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
// directly, as it could starve handling of foreign threads. Put every task // directly, as it could starve handling of foreign threads. Put every task
// into this queue. // into this queue.
bool accept_new_tasks; bool accept_new_tasks;
bool schedule_work = false; bool was_empty = false;
{ {
AutoLock auto_lock(incoming_queue_lock_); AutoLock auto_lock(incoming_queue_lock_);
accept_new_tasks = accept_new_tasks_; accept_new_tasks = accept_new_tasks_;
if (accept_new_tasks) if (accept_new_tasks)
schedule_work = PostPendingTaskLockRequired(pending_task); was_empty = PostPendingTaskLockRequired(pending_task);
} }
if (!accept_new_tasks) { if (!accept_new_tasks) {
// Clear the pending task outside of |incoming_queue_lock_| to prevent any // Clear the pending task outside of |incoming_queue_lock_| to prevent any
// chance of self-deadlock if destroying a task also posts a task to this // chance of self-deadlock if destroying a task also posts a task to this
// queue. // queue.
DCHECK(!schedule_work);
pending_task->task.Reset(); pending_task->task.Reset();
return false; return false;
} }
// Wake up the message loop and schedule work. This is done outside // Let |task_queue_observer_| know of the queued task. This is done outside
// |incoming_queue_lock_| to allow for multiple post tasks to occur while // |incoming_queue_lock_| to avoid conflating locks (DidQueueTask() can also
// ScheduleWork() is running. For platforms (e.g. Android) that require one // use a lock).
// call to ScheduleWork() for each task, all pending tasks may serialize task_queue_observer_->DidQueueTask(was_empty);
// within the ScheduleWork() call. As a result, holding a lock to maintain the
// lifetime of |message_loop_| is less of a concern.
if (schedule_work) {
// Ensures |message_loop_| isn't destroyed while running.
AutoLock auto_lock(message_loop_lock_);
if (message_loop_)
message_loop_->ScheduleWork();
}
return true; return true;
} }
...@@ -324,21 +284,11 @@ bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) { ...@@ -324,21 +284,11 @@ bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) {
// delayed_run_time value) and for identifying the task in about:tracing. // delayed_run_time value) and for identifying the task in about:tracing.
pending_task->sequence_num = next_sequence_num_++; pending_task->sequence_num = next_sequence_num_++;
task_annotator_.WillQueueTask("MessageLoop::PostTask", pending_task); task_queue_observer_->WillQueueTask(pending_task);
bool was_empty = incoming_queue_.empty(); bool was_empty = incoming_queue_.empty();
incoming_queue_.push(std::move(*pending_task)); incoming_queue_.push(std::move(*pending_task));
return was_empty;
if (is_ready_for_scheduling_ && !message_loop_scheduled_ && was_empty) {
// After we've scheduled the message loop, we do not need to do so again
// until we know it has processed all of the work in our queue and is
// waiting for more work again. The message loop will always attempt to
// reload from the incoming queue before waiting again so we clear this
// flag in ReloadWorkQueue().
message_loop_scheduled_ = true;
return true;
}
return false;
} }
void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) { void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) {
...@@ -349,14 +299,7 @@ void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) { ...@@ -349,14 +299,7 @@ void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) {
// Acquire all we can from the inter-thread queue with one lock acquisition. // Acquire all we can from the inter-thread queue with one lock acquisition.
AutoLock lock(incoming_queue_lock_); AutoLock lock(incoming_queue_lock_);
if (incoming_queue_.empty()) { incoming_queue_.swap(*work_queue);
// If the loop attempts to reload but there are no tasks in the incoming
// queue, that means it will go to sleep waiting for more work. If the
// incoming queue becomes nonempty we need to schedule it again.
message_loop_scheduled_ = false;
} else {
incoming_queue_.swap(*work_queue);
}
} }
} // namespace internal } // namespace internal
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
#include "base/base_export.h" #include "base/base_export.h"
#include "base/callback.h" #include "base/callback.h"
#include "base/debug/task_annotator.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/pending_task.h" #include "base/pending_task.h"
...@@ -17,7 +16,6 @@ ...@@ -17,7 +16,6 @@
namespace base { namespace base {
class MessageLoop;
class BasicPostTaskPerfTest; class BasicPostTaskPerfTest;
namespace internal { namespace internal {
...@@ -28,6 +26,25 @@ namespace internal { ...@@ -28,6 +26,25 @@ namespace internal {
class BASE_EXPORT IncomingTaskQueue class BASE_EXPORT IncomingTaskQueue
: public RefCountedThreadSafe<IncomingTaskQueue> { : public RefCountedThreadSafe<IncomingTaskQueue> {
public: public:
// TODO(gab): Move this to SequencedTaskSource::Observer in
// https://chromium-review.googlesource.com/c/chromium/src/+/1088762.
class Observer {
public:
virtual ~Observer() = default;
// Notifies this Observer that it is about to enqueue |task|. The Observer
// may alter |task| as a result (e.g. add metadata to the PendingTask
// struct). This may be called while holding a lock and shouldn't perform
// logic requiring synchronization (override DidQueueTask() for that).
virtual void WillQueueTask(PendingTask* task) = 0;
// Notifies this Observer that a task was queued in the IncomingTaskQueue it
// observes. |was_empty| is true if the task source was empty (i.e.
// |!HasTasks()|) before this task was posted. DidQueueTask() can be invoked
// from any thread.
virtual void DidQueueTask(bool was_empty) = 0;
};
// Provides a read and remove only view into a task queue. // Provides a read and remove only view into a task queue.
class ReadAndRemoveOnlyQueue { class ReadAndRemoveOnlyQueue {
public: public:
...@@ -63,7 +80,13 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -63,7 +80,13 @@ class BASE_EXPORT IncomingTaskQueue
DISALLOW_COPY_AND_ASSIGN(Queue); DISALLOW_COPY_AND_ASSIGN(Queue);
}; };
explicit IncomingTaskQueue(MessageLoop* message_loop); // Constructs an IncomingTaskQueue which will invoke |task_queue_observer|
// when tasks are queued. |task_queue_observer| will be bound to this
// IncomingTaskQueue's lifetime. Ownership is required as opposed to a raw
// pointer since IncomingTaskQueue is ref-counted. For the same reasons,
// |task_queue_observer| needs to support being invoked racily during
// shutdown).
explicit IncomingTaskQueue(std::unique_ptr<Observer> task_queue_observer);
// Appends a task to the incoming queue. Posting of all tasks is routed though // Appends a task to the incoming queue. Posting of all tasks is routed though
// AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting // AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting
...@@ -77,15 +100,11 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -77,15 +100,11 @@ class BASE_EXPORT IncomingTaskQueue
TimeDelta delay, TimeDelta delay,
Nestable nestable); Nestable nestable);
// Disconnects |this| from the parent message loop. // Instructs this IncomingTaskQueue to stop accepting tasks, this cannot be
void WillDestroyCurrentMessageLoop(); // undone. Note that the registered IncomingTaskQueue::Observer may still
// racily receive a few DidQueueTask() calls while the Shutdown() signal
// This should be called when the message loop becomes ready for // propagates to other threads and it needs to support that.
// scheduling work. void Shutdown();
void StartScheduling();
// Runs |pending_task|.
void RunTask(PendingTask* pending_task);
ReadAndRemoveOnlyQueue& triage_tasks() { return triage_tasks_; } ReadAndRemoveOnlyQueue& triage_tasks() { return triage_tasks_; }
...@@ -208,8 +227,8 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -208,8 +227,8 @@ class BASE_EXPORT IncomingTaskQueue
// does not retain |pending_task->task| beyond this function call. // does not retain |pending_task->task| beyond this function call.
bool PostPendingTask(PendingTask* pending_task); bool PostPendingTask(PendingTask* pending_task);
// Does the real work of posting a pending task. Returns true if the caller // Does the real work of posting a pending task. Returns true if
// should call ScheduleWork() on the message loop. // |incoming_queue_| was empty before |pending_task| was posted.
bool PostPendingTaskLockRequired(PendingTask* pending_task); bool PostPendingTaskLockRequired(PendingTask* pending_task);
// Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called // Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called
...@@ -219,7 +238,7 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -219,7 +238,7 @@ class BASE_EXPORT IncomingTaskQueue
// Checks calls made only on the MessageLoop thread. // Checks calls made only on the MessageLoop thread.
SEQUENCE_CHECKER(sequence_checker_); SEQUENCE_CHECKER(sequence_checker_);
debug::TaskAnnotator task_annotator_; const std::unique_ptr<Observer> task_queue_observer_;
// Queue for initial triaging of tasks on the |sequence_checker_| sequence. // Queue for initial triaging of tasks on the |sequence_checker_| sequence.
TriageQueue triage_tasks_; TriageQueue triage_tasks_;
...@@ -230,13 +249,6 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -230,13 +249,6 @@ class BASE_EXPORT IncomingTaskQueue
// Queue for non-nestable deferred tasks on the |sequence_checker_| sequence. // Queue for non-nestable deferred tasks on the |sequence_checker_| sequence.
DeferredQueue deferred_tasks_; DeferredQueue deferred_tasks_;
// Lock that serializes |message_loop_->ScheduleWork()| calls as well as
// prevents |message_loop_| from being made nullptr during such a call.
base::Lock message_loop_lock_;
// Points to the message loop that owns |this|.
MessageLoop* message_loop_;
// Synchronizes access to all members below this line. // Synchronizes access to all members below this line.
base::Lock incoming_queue_lock_; base::Lock incoming_queue_lock_;
...@@ -251,13 +263,6 @@ class BASE_EXPORT IncomingTaskQueue ...@@ -251,13 +263,6 @@ class BASE_EXPORT IncomingTaskQueue
// The next sequence number to use for delayed tasks. // The next sequence number to use for delayed tasks.
int next_sequence_num_ = 0; int next_sequence_num_ = 0;
// True if our message loop has already been scheduled and does not need to be
// scheduled again until an empty reload occurs.
bool message_loop_scheduled_ = false;
// False until StartScheduling() is called.
bool is_ready_for_scheduling_ = false;
DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue); DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue);
}; };
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "base/bind.h" #include "base/bind.h"
#include "base/compiler_specific.h" #include "base/compiler_specific.h"
#include "base/debug/task_annotator.h"
#include "base/logging.h" #include "base/logging.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
#include "base/message_loop/message_pump_default.h" #include "base/message_loop/message_pump_default.h"
...@@ -68,6 +69,95 @@ void ReportScheduledWakeupResult(ScheduledWakeupResult result, ...@@ -68,6 +69,95 @@ void ReportScheduledWakeupResult(ScheduledWakeupResult result,
} // namespace } // namespace
class MessageLoop::Controller : public internal::IncomingTaskQueue::Observer {
public:
// Constructs a MessageLoopController which controls |message_loop|, notifying
// |task_annotator_| when tasks are queued scheduling work on |message_loop|
// as fits. |message_loop| and |task_annotator_| will not be used after
// DisconnectFromParent() returns.
Controller(MessageLoop* message_loop);
~Controller() override;
// IncomingTaskQueue::Observer:
void WillQueueTask(PendingTask* task) final;
void DidQueueTask(bool was_empty) final;
void StartScheduling();
// Disconnects |message_loop_| from this Controller instance (DidQueueTask()
// will no-op from this point forward).
void DisconnectFromParent();
// Shares this Controller's TaskAnnotator with MessageLoop as TaskAnnotator
// requires DidQueueTask(x)/RunTask(x) to be invoked on the same TaskAnnotator
// instance.
debug::TaskAnnotator& task_annotator() { return task_annotator_; }
private:
// A TaskAnnotator which is owned by this Controller to be able to use it
// without locking |message_loop_lock_|. It cannot be owned by MessageLoop
// because this Controller cannot access |message_loop_| safely without the
// lock. Note: the TaskAnnotator API itself is thread-safe.
debug::TaskAnnotator task_annotator_;
// Lock that serializes |message_loop_->ScheduleWork()| and access to all
// members below.
base::Lock message_loop_lock_;
// Points to this Controller's outer MessageLoop instance. Null after
// DisconnectFromParent().
MessageLoop* message_loop_;
// False until StartScheduling() is called.
bool is_ready_for_scheduling_ = false;
// True if DidQueueTask() has been called before StartScheduling(); letting it
// know whether it needs to ScheduleWork() right away or not.
bool pending_schedule_work_ = false;
DISALLOW_COPY_AND_ASSIGN(Controller);
};
MessageLoop::Controller::Controller(MessageLoop* message_loop)
: message_loop_(message_loop) {}
MessageLoop::Controller::~Controller() {
DCHECK(!message_loop_)
<< "DisconnectFromParent() needs to be invoked before destruction.";
}
void MessageLoop::Controller::WillQueueTask(PendingTask* task) {
task_annotator_.WillQueueTask("MessageLoop::PostTask", task);
}
void MessageLoop::Controller::DidQueueTask(bool was_empty) {
// Avoid locking if we don't need to schedule.
if (!was_empty)
return;
AutoLock auto_lock(message_loop_lock_);
if (message_loop_ && is_ready_for_scheduling_)
message_loop_->ScheduleWork();
else
pending_schedule_work_ = true;
}
void MessageLoop::Controller::StartScheduling() {
AutoLock lock(message_loop_lock_);
DCHECK(message_loop_);
DCHECK(!is_ready_for_scheduling_);
is_ready_for_scheduling_ = true;
if (pending_schedule_work_)
message_loop_->ScheduleWork();
}
void MessageLoop::Controller::DisconnectFromParent() {
AutoLock lock(message_loop_lock_);
message_loop_ = nullptr;
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
MessageLoop::MessageLoop(Type type) MessageLoop::MessageLoop(Type type)
...@@ -126,7 +216,8 @@ MessageLoop::~MessageLoop() { ...@@ -126,7 +216,8 @@ MessageLoop::~MessageLoop() {
thread_task_runner_handle_.reset(); thread_task_runner_handle_.reset();
// Tell the incoming queue that we are dying. // Tell the incoming queue that we are dying.
incoming_task_queue_->WillDestroyCurrentMessageLoop(); message_loop_controller_->DisconnectFromParent();
incoming_task_queue_->Shutdown();
incoming_task_queue_ = nullptr; incoming_task_queue_ = nullptr;
unbound_task_runner_ = nullptr; unbound_task_runner_ = nullptr;
task_runner_ = nullptr; task_runner_ = nullptr;
...@@ -224,13 +315,18 @@ std::unique_ptr<MessageLoop> MessageLoop::CreateUnbound( ...@@ -224,13 +315,18 @@ std::unique_ptr<MessageLoop> MessageLoop::CreateUnbound(
return WrapUnique(new MessageLoop(type, std::move(pump_factory))); return WrapUnique(new MessageLoop(type, std::move(pump_factory)));
} }
// TODO(gab): Avoid bare new + WrapUnique below when introducing
// SequencedTaskSource in follow-up @
// https://chromium-review.googlesource.com/c/chromium/src/+/1088762.
MessageLoop::MessageLoop(Type type, MessagePumpFactoryCallback pump_factory) MessageLoop::MessageLoop(Type type, MessagePumpFactoryCallback pump_factory)
: MessageLoopCurrent(this), : MessageLoopCurrent(this),
type_(type), type_(type),
pump_factory_(std::move(pump_factory)), pump_factory_(std::move(pump_factory)),
incoming_task_queue_(new internal::IncomingTaskQueue(this)), message_loop_controller_(new Controller(this)),
unbound_task_runner_( incoming_task_queue_(MakeRefCounted<internal::IncomingTaskQueue>(
new internal::MessageLoopTaskRunner(incoming_task_queue_)), WrapUnique(message_loop_controller_))),
unbound_task_runner_(MakeRefCounted<internal::MessageLoopTaskRunner>(
incoming_task_queue_)),
task_runner_(unbound_task_runner_) { task_runner_(unbound_task_runner_) {
// If type is TYPE_CUSTOM non-null pump_factory must be given. // If type is TYPE_CUSTOM non-null pump_factory must be given.
DCHECK(type_ != TYPE_CUSTOM || !pump_factory_.is_null()); DCHECK(type_ != TYPE_CUSTOM || !pump_factory_.is_null());
...@@ -252,7 +348,7 @@ void MessageLoop::BindToCurrentThread() { ...@@ -252,7 +348,7 @@ void MessageLoop::BindToCurrentThread() {
<< "should only have one message loop per thread"; << "should only have one message loop per thread";
MessageLoopCurrent::BindToCurrentThreadInternal(this); MessageLoopCurrent::BindToCurrentThreadInternal(this);
incoming_task_queue_->StartScheduling(); message_loop_controller_->StartScheduling();
unbound_task_runner_->BindToCurrentThread(); unbound_task_runner_->BindToCurrentThread();
unbound_task_runner_ = nullptr; unbound_task_runner_ = nullptr;
SetThreadTaskRunnerHandle(); SetThreadTaskRunnerHandle();
...@@ -354,7 +450,8 @@ void MessageLoop::RunTask(PendingTask* pending_task) { ...@@ -354,7 +450,8 @@ void MessageLoop::RunTask(PendingTask* pending_task) {
for (auto& observer : task_observers_) for (auto& observer : task_observers_)
observer.WillProcessTask(*pending_task); observer.WillProcessTask(*pending_task);
incoming_task_queue_->RunTask(pending_task); message_loop_controller_->task_annotator().RunTask("MessageLoop::PostTask",
pending_task);
for (auto& observer : task_observers_) for (auto& observer : task_observers_)
observer.DidProcessTask(*pending_task); observer.DidProcessTask(*pending_task);
......
...@@ -222,6 +222,8 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate, ...@@ -222,6 +222,8 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate,
friend class Thread; friend class Thread;
FRIEND_TEST_ALL_PREFIXES(MessageLoopTest, DeleteUnboundLoop); FRIEND_TEST_ALL_PREFIXES(MessageLoopTest, DeleteUnboundLoop);
class Controller;
// Creates a MessageLoop without binding to a thread. // Creates a MessageLoop without binding to a thread.
// If |type| is TYPE_CUSTOM non-null |pump_factory| must be also given // If |type| is TYPE_CUSTOM non-null |pump_factory| must be also given
// to create a message pump for this message loop. Otherwise a default // to create a message pump for this message loop. Otherwise a default
...@@ -304,6 +306,9 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate, ...@@ -304,6 +306,9 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate,
ObserverList<TaskObserver> task_observers_; ObserverList<TaskObserver> task_observers_;
// Pointer to this MessageLoop's Controller, valid until the reference to
// |incoming_task_queue_| is dropped below.
Controller* const message_loop_controller_;
scoped_refptr<internal::IncomingTaskQueue> incoming_task_queue_; scoped_refptr<internal::IncomingTaskQueue> incoming_task_queue_;
// A task runner which we haven't bound to a thread yet. // A task runner which we haven't bound to a thread yet.
......
...@@ -8,7 +8,11 @@ ...@@ -8,7 +8,11 @@
#include <utility> #include <utility>
#include "base/bind_helpers.h" #include "base/bind_helpers.h"
#include "base/callback.h"
#include "base/debug/task_annotator.h"
#include "base/macros.h"
#include "base/memory/scoped_refptr.h" #include "base/memory/scoped_refptr.h"
#include "base/message_loop/incoming_task_queue.h"
#include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_task_runner.h" #include "base/message_loop/message_loop_task_runner.h"
#include "base/message_loop/message_pump.h" #include "base/message_loop/message_pump.h"
...@@ -20,27 +24,36 @@ ...@@ -20,27 +24,36 @@
namespace base { namespace base {
class StubMessagePump : public MessagePump { namespace {
public:
StubMessagePump() = default;
~StubMessagePump() override = default;
void Run(Delegate* delegate) override {} // Tests below will post tasks in a loop until |kPostTaskPerfTestDuration| has
// elapsed.
constexpr TimeDelta kPostTaskPerfTestDuration =
base::TimeDelta::FromSeconds(30);
void Quit() override {} } // namespace
void ScheduleWork() override {}
void ScheduleDelayedWork(const TimeTicks& delayed_work_time) override {} class FakeObserver : public internal::IncomingTaskQueue::Observer {
public:
// IncomingTaskQueue::Observer
void WillQueueTask(PendingTask* task) override {}
void DidQueueTask(bool was_empty) override {}
virtual void RunTask(PendingTask* task) { std::move(task->task).Run(); }
}; };
// Exercises MessageLoopTaskRunner+IncomingTaskQueue w/ a mock MessageLoop. // Exercises MessageLoopTaskRunner's multi-threaded queue in isolation.
class BasicPostTaskPerfTest : public testing::Test { class BasicPostTaskPerfTest : public testing::Test {
public: public:
void Run(int batch_size, int tasks_per_reload) { void Run(int batch_size,
int tasks_per_reload,
std::unique_ptr<FakeObserver> task_source_observer) {
base::TimeTicks start = base::TimeTicks::Now(); base::TimeTicks start = base::TimeTicks::Now();
base::TimeTicks now; base::TimeTicks now;
MessageLoop loop(std::unique_ptr<MessagePump>(new StubMessagePump)); FakeObserver* task_source_observer_raw = task_source_observer.get();
scoped_refptr<internal::IncomingTaskQueue> queue( scoped_refptr<internal::IncomingTaskQueue> queue(
base::MakeRefCounted<internal::IncomingTaskQueue>(&loop)); base::MakeRefCounted<internal::IncomingTaskQueue>(
std::move(task_source_observer)));
scoped_refptr<SingleThreadTaskRunner> task_runner( scoped_refptr<SingleThreadTaskRunner> task_runner(
base::MakeRefCounted<internal::MessageLoopTaskRunner>(queue)); base::MakeRefCounted<internal::MessageLoopTaskRunner>(queue));
uint32_t num_posted = 0; uint32_t num_posted = 0;
...@@ -55,31 +68,85 @@ class BasicPostTaskPerfTest : public testing::Test { ...@@ -55,31 +68,85 @@ class BasicPostTaskPerfTest : public testing::Test {
while (!loop_local_queue.empty()) { while (!loop_local_queue.empty()) {
PendingTask t = std::move(loop_local_queue.front()); PendingTask t = std::move(loop_local_queue.front());
loop_local_queue.pop(); loop_local_queue.pop();
loop.RunTask(&t); task_source_observer_raw->RunTask(&t);
} }
} }
now = base::TimeTicks::Now(); now = base::TimeTicks::Now();
} while (now - start < base::TimeDelta::FromSeconds(5)); } while (now - start < kPostTaskPerfTestDuration);
std::string trace = StringPrintf("%d_tasks_per_reload", tasks_per_reload); std::string trace = StringPrintf("%d_tasks_per_reload", tasks_per_reload);
perf_test::PrintResult( perf_test::PrintResult(
"task", "", trace, "task", "", trace,
(now - start).InMicroseconds() / static_cast<double>(num_posted), (now - start).InMicroseconds() / static_cast<double>(num_posted),
"us/task", true); "us/task", true);
queue->WillDestroyCurrentMessageLoop();
} }
}; };
TEST_F(BasicPostTaskPerfTest, OneTaskPerReload) { TEST_F(BasicPostTaskPerfTest, OneTaskPerReload) {
Run(10000, 1); Run(10000, 1, std::make_unique<FakeObserver>());
} }
TEST_F(BasicPostTaskPerfTest, TenTasksPerReload) { TEST_F(BasicPostTaskPerfTest, TenTasksPerReload) {
Run(10000, 10); Run(10000, 10, std::make_unique<FakeObserver>());
} }
TEST_F(BasicPostTaskPerfTest, OneHundredTasksPerReload) { TEST_F(BasicPostTaskPerfTest, OneHundredTasksPerReload) {
Run(1000, 100); Run(1000, 100, std::make_unique<FakeObserver>());
}
class StubMessagePump : public MessagePump {
public:
StubMessagePump() = default;
~StubMessagePump() override = default;
// MessagePump:
void Run(Delegate* delegate) override {}
void Quit() override {}
void ScheduleWork() override {}
void ScheduleDelayedWork(const TimeTicks& delayed_work_time) override {}
};
// Simulates the overhead of hooking TaskAnnotator and ScheduleWork() to the
// post task machinery.
class FakeObserverSimulatingOverhead : public FakeObserver {
public:
FakeObserverSimulatingOverhead() = default;
// FakeObserver:
void WillQueueTask(PendingTask* task) final {
task_annotator_.WillQueueTask("MessageLoop::PostTask", task);
}
void DidQueueTask(bool was_empty) final {
AutoLock scoped_lock(message_loop_lock_);
pump_->ScheduleWork();
}
void RunTask(PendingTask* task) final {
task_annotator_.RunTask("MessageLoop::PostTask", task);
}
private:
// Simulates overhead from ScheduleWork() and TaskAnnotator calls involved in
// a real PostTask (stores the StubMessagePump in a pointer to force a virtual
// dispatch for ScheduleWork() and be closer to reality).
Lock message_loop_lock_;
std::unique_ptr<MessagePump> pump_{std::make_unique<StubMessagePump>()};
debug::TaskAnnotator task_annotator_;
DISALLOW_COPY_AND_ASSIGN(FakeObserverSimulatingOverhead);
};
TEST_F(BasicPostTaskPerfTest, OneTaskPerReloadWithOverhead) {
Run(10000, 1, std::make_unique<FakeObserverSimulatingOverhead>());
}
TEST_F(BasicPostTaskPerfTest, TenTasksPerReloadWithOverhead) {
Run(10000, 10, std::make_unique<FakeObserverSimulatingOverhead>());
}
TEST_F(BasicPostTaskPerfTest, OneHundredTasksPerReloadWithOverhead) {
Run(1000, 100, std::make_unique<FakeObserverSimulatingOverhead>());
} }
// Exercises the full MessageLoop/RunLoop machinery. // Exercises the full MessageLoop/RunLoop machinery.
...@@ -100,7 +167,7 @@ class IntegratedPostTaskPerfTest : public testing::Test { ...@@ -100,7 +167,7 @@ class IntegratedPostTaskPerfTest : public testing::Test {
} }
now = base::TimeTicks::Now(); now = base::TimeTicks::Now();
} while (now - start < base::TimeDelta::FromSeconds(5)); } while (now - start < kPostTaskPerfTestDuration);
std::string trace = StringPrintf("%d_tasks_per_reload", tasks_per_reload); std::string trace = StringPrintf("%d_tasks_per_reload", tasks_per_reload);
perf_test::PrintResult( perf_test::PrintResult(
"task", "", trace, "task", "", trace,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment