Commit 81ee0759 authored by Alex Clarke's avatar Alex Clarke Committed by Commit Bot

Add deduplication logic to ThreadControllerWithMessagePumpImpl

This patch adds ScheduleWork / ScheduleDelayedWork deduplication which
improves the perf test results.

Unlike ThreadControllerImpl this deduplication logic is main thread only
because I'm assuming the overhead of the locks isn't worth it.

Bug: 897751
Change-Id: I879ad904e41c820180712702c66c4afd66cb3fae
Reviewed-on: https://chromium-review.googlesource.com/c/1301462
Commit-Queue: Alex Clarke <alexclarke@chromium.org>
Reviewed-by: default avatarSami Kyöstilä <skyostil@chromium.org>
Reviewed-by: default avatarAlexander Timin <altimin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#603077}
parent fcce12fe
...@@ -53,7 +53,7 @@ void ThreadControllerWithMessagePumpImpl::SetMessageLoop( ...@@ -53,7 +53,7 @@ void ThreadControllerWithMessagePumpImpl::SetMessageLoop(
void ThreadControllerWithMessagePumpImpl::SetWorkBatchSize( void ThreadControllerWithMessagePumpImpl::SetWorkBatchSize(
int work_batch_size) { int work_batch_size) {
DCHECK_GE(work_batch_size, 1); DCHECK_GE(work_batch_size, 1);
main_thread_only().batch_size = work_batch_size; main_thread_only().work_batch_size = work_batch_size;
} }
void ThreadControllerWithMessagePumpImpl::SetTimerSlack( void ThreadControllerWithMessagePumpImpl::SetTimerSlack(
...@@ -67,18 +67,34 @@ void ThreadControllerWithMessagePumpImpl::WillQueueTask( ...@@ -67,18 +67,34 @@ void ThreadControllerWithMessagePumpImpl::WillQueueTask(
} }
void ThreadControllerWithMessagePumpImpl::ScheduleWork() { void ThreadControllerWithMessagePumpImpl::ScheduleWork() {
// This assumes that cross thread ScheduleWork isn't frequent enough to
// warrant ScheduleWork deduplication.
if (RunsTasksInCurrentSequence()) {
// Don't post a DoWork if there's an immediate DoWork in flight or if we're
// inside a top level DoWork. We can rely on a continuation being posted as
// needed.
if (main_thread_only().immediate_do_work_posted || InTopLevelDoWork())
return;
main_thread_only().immediate_do_work_posted = true;
}
pump_->ScheduleWork(); pump_->ScheduleWork();
} }
void ThreadControllerWithMessagePumpImpl::SetNextDelayedDoWork( void ThreadControllerWithMessagePumpImpl::SetNextDelayedDoWork(
LazyNow* lazy_now, LazyNow* lazy_now,
TimeTicks run_time) { TimeTicks run_time) {
// Since this method must be called on the main thread, we're probably
// inside of DoWork() except some initialization code.
// DoWork() will schedule next wake-up if necessary.
if (is_doing_work())
return;
DCHECK_LT(time_source_->NowTicks(), run_time); DCHECK_LT(time_source_->NowTicks(), run_time);
if (main_thread_only().next_delayed_do_work == run_time)
return;
// Don't post a DoWork if there's an immediate DoWork in flight or if we're
// inside a top level DoWork. We can rely on a continuation being posted as
// needed.
if (main_thread_only().immediate_do_work_posted || InTopLevelDoWork())
return;
main_thread_only().next_delayed_do_work = run_time;
pump_->ScheduleDelayedWork(run_time); pump_->ScheduleDelayedWork(run_time);
} }
...@@ -103,16 +119,17 @@ void ThreadControllerWithMessagePumpImpl::RestoreDefaultTaskRunner() { ...@@ -103,16 +119,17 @@ void ThreadControllerWithMessagePumpImpl::RestoreDefaultTaskRunner() {
void ThreadControllerWithMessagePumpImpl::AddNestingObserver( void ThreadControllerWithMessagePumpImpl::AddNestingObserver(
RunLoop::NestingObserver* observer) { RunLoop::NestingObserver* observer) {
DCHECK_LE(main_thread_only().run_depth, 1);
DCHECK(!main_thread_only().nesting_observer); DCHECK(!main_thread_only().nesting_observer);
DCHECK(observer); DCHECK(observer);
main_thread_only().nesting_observer = observer; main_thread_only().nesting_observer = observer;
RunLoop::AddNestingObserverOnCurrentThread(this);
} }
void ThreadControllerWithMessagePumpImpl::RemoveNestingObserver( void ThreadControllerWithMessagePumpImpl::RemoveNestingObserver(
RunLoop::NestingObserver* observer) { RunLoop::NestingObserver* observer) {
DCHECK_EQ(main_thread_only().nesting_observer, observer); DCHECK_EQ(main_thread_only().nesting_observer, observer);
main_thread_only().nesting_observer = nullptr; main_thread_only().nesting_observer = nullptr;
RunLoop::RemoveNestingObserverOnCurrentThread(this);
} }
const scoped_refptr<AssociatedThreadId>& const scoped_refptr<AssociatedThreadId>&
...@@ -122,11 +139,13 @@ ThreadControllerWithMessagePumpImpl::GetAssociatedThread() const { ...@@ -122,11 +139,13 @@ ThreadControllerWithMessagePumpImpl::GetAssociatedThread() const {
bool ThreadControllerWithMessagePumpImpl::DoWork() { bool ThreadControllerWithMessagePumpImpl::DoWork() {
base::TimeTicks next_run_time; base::TimeTicks next_run_time;
main_thread_only().immediate_do_work_posted = false;
return DoWorkImpl(&next_run_time); return DoWorkImpl(&next_run_time);
} }
bool ThreadControllerWithMessagePumpImpl::DoDelayedWork( bool ThreadControllerWithMessagePumpImpl::DoDelayedWork(
TimeTicks* next_run_time) { TimeTicks* next_run_time) {
main_thread_only().next_delayed_do_work = TimeTicks::Max();
return DoWorkImpl(next_run_time); return DoWorkImpl(next_run_time);
} }
...@@ -135,29 +154,31 @@ bool ThreadControllerWithMessagePumpImpl::DoWorkImpl( ...@@ -135,29 +154,31 @@ bool ThreadControllerWithMessagePumpImpl::DoWorkImpl(
DCHECK(main_thread_only().task_source); DCHECK(main_thread_only().task_source);
bool task_ran = false; bool task_ran = false;
{ main_thread_only().do_work_running_count++;
AutoReset<int> do_work_scope(&main_thread_only().do_work_depth,
main_thread_only().do_work_depth + 1); for (int i = 0; i < main_thread_only().work_batch_size; i++) {
Optional<PendingTask> task = main_thread_only().task_source->TakeTask();
if (!task)
break;
TRACE_TASK_EXECUTION("ThreadController::Task", *task);
task_annotator_.RunTask("ThreadController::Task", &*task);
task_ran = true;
for (int i = 0; i < main_thread_only().batch_size; i++) { main_thread_only().task_source->DidRunTask();
Optional<PendingTask> task = main_thread_only().task_source->TakeTask();
if (!task)
break;
TRACE_TASK_EXECUTION("ThreadController::Task", *task); // When Quit() is called we must stop running the batch because the caller
task_annotator_.RunTask("ThreadController::Task", &*task); // expects per-task granularity.
task_ran = true; if (main_thread_only().quit_do_work)
break;
}
main_thread_only().task_source->DidRunTask(); main_thread_only().do_work_running_count--;
if (main_thread_only().quit_do_work) { if (main_thread_only().quit_do_work) {
// When Quit() is called we must stop running the batch because main_thread_only().quit_do_work = false;
// caller expects per-task granularity. return task_ran;
main_thread_only().quit_do_work = false; }
return true;
}
}
} // DoWorkScope.
LazyNow lazy_now(time_source_); LazyNow lazy_now(time_source_);
TimeDelta do_work_delay = TimeDelta do_work_delay =
...@@ -171,6 +192,7 @@ bool ThreadControllerWithMessagePumpImpl::DoWorkImpl( ...@@ -171,6 +192,7 @@ bool ThreadControllerWithMessagePumpImpl::DoWorkImpl(
// Need to run new work immediately, but due to the contract of DoWork we // Need to run new work immediately, but due to the contract of DoWork we
// only need to return true to ensure that happens. // only need to return true to ensure that happens.
*next_run_time = lazy_now.Now(); *next_run_time = lazy_now.Now();
main_thread_only().immediate_do_work_posted = true;
return true; return true;
} else if (do_work_delay != TimeDelta::Max()) { } else if (do_work_delay != TimeDelta::Max()) {
*next_run_time = lazy_now.Now() + do_work_delay; *next_run_time = lazy_now.Now() + do_work_delay;
...@@ -183,6 +205,11 @@ bool ThreadControllerWithMessagePumpImpl::DoWorkImpl( ...@@ -183,6 +205,11 @@ bool ThreadControllerWithMessagePumpImpl::DoWorkImpl(
return task_ran; return task_ran;
} }
bool ThreadControllerWithMessagePumpImpl::InTopLevelDoWork() const {
return main_thread_only().do_work_running_count >
main_thread_only().nesting_depth;
}
bool ThreadControllerWithMessagePumpImpl::DoIdleWork() { bool ThreadControllerWithMessagePumpImpl::DoIdleWork() {
// RunLoop::Delegate knows whether we called Run() or RunUntilIdle(). // RunLoop::Delegate knows whether we called Run() or RunUntilIdle().
if (ShouldQuitWhenIdle()) if (ShouldQuitWhenIdle())
...@@ -206,32 +233,34 @@ void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed) { ...@@ -206,32 +233,34 @@ void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed) {
// No system messages are being processed by this class. // No system messages are being processed by this class.
DCHECK(application_tasks_allowed); DCHECK(application_tasks_allowed);
// We already have a MessagePump::Run() running, so we're in a nested RunLoop. // MessagePump::Run() blocks until Quit() called, but previously started
if (main_thread_only().run_depth > 0 && main_thread_only().nesting_observer) // Run() calls continue to block.
main_thread_only().nesting_observer->OnBeginNestedRunLoop(); pump_->Run(this);
}
{ void ThreadControllerWithMessagePumpImpl::OnBeginNestedRunLoop() {
AutoReset<int> run_scope(&main_thread_only().run_depth, main_thread_only().nesting_depth++;
main_thread_only().run_depth + 1); if (main_thread_only().nesting_observer)
// MessagePump::Run() blocks until Quit() called, but previously started main_thread_only().nesting_observer->OnBeginNestedRunLoop();
// Run() calls continue to block. }
pump_->Run(this);
}
// We'll soon continue to run an outer MessagePump::Run() loop. void ThreadControllerWithMessagePumpImpl::OnExitNestedRunLoop() {
if (main_thread_only().run_depth > 0 && main_thread_only().nesting_observer) main_thread_only().nesting_depth--;
DCHECK_GE(main_thread_only().nesting_depth, 0);
if (main_thread_only().nesting_observer)
main_thread_only().nesting_observer->OnExitNestedRunLoop(); main_thread_only().nesting_observer->OnExitNestedRunLoop();
} }
void ThreadControllerWithMessagePumpImpl::Quit() { void ThreadControllerWithMessagePumpImpl::Quit() {
// Interrupt a batch of work. // Interrupt a batch of work.
if (is_doing_work()) if (InTopLevelDoWork())
main_thread_only().quit_do_work = true; main_thread_only().quit_do_work = true;
// If we're in a nested RunLoop, continuation will be posted if necessary. // If we're in a nested RunLoop, continuation will be posted if necessary.
pump_->Quit(); pump_->Quit();
} }
void ThreadControllerWithMessagePumpImpl::EnsureWorkScheduled() { void ThreadControllerWithMessagePumpImpl::EnsureWorkScheduled() {
main_thread_only().immediate_do_work_posted = true;
ScheduleWork(); ScheduleWork();
} }
......
...@@ -26,7 +26,8 @@ namespace internal { ...@@ -26,7 +26,8 @@ namespace internal {
class BASE_EXPORT ThreadControllerWithMessagePumpImpl class BASE_EXPORT ThreadControllerWithMessagePumpImpl
: public ThreadController, : public ThreadController,
public MessagePump::Delegate, public MessagePump::Delegate,
public RunLoop::Delegate { public RunLoop::Delegate,
public RunLoop::NestingObserver {
public: public:
ThreadControllerWithMessagePumpImpl(std::unique_ptr<MessagePump> message_pump, ThreadControllerWithMessagePumpImpl(std::unique_ptr<MessagePump> message_pump,
const TickClock* time_source); const TickClock* time_source);
...@@ -49,6 +50,10 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl ...@@ -49,6 +50,10 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl
void RemoveNestingObserver(RunLoop::NestingObserver* observer) override; void RemoveNestingObserver(RunLoop::NestingObserver* observer) override;
const scoped_refptr<AssociatedThreadId>& GetAssociatedThread() const override; const scoped_refptr<AssociatedThreadId>& GetAssociatedThread() const override;
// RunLoop::NestingObserver:
void OnBeginNestedRunLoop() override;
void OnExitNestedRunLoop() override;
protected: protected:
// MessagePump::Delegate implementation. // MessagePump::Delegate implementation.
bool DoWork() override; bool DoWork() override;
...@@ -66,6 +71,8 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl ...@@ -66,6 +71,8 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl
bool DoWorkImpl(base::TimeTicks* next_run_time); bool DoWorkImpl(base::TimeTicks* next_run_time);
bool InTopLevelDoWork() const;
struct MainThreadOnly { struct MainThreadOnly {
MainThreadOnly(); MainThreadOnly();
~MainThreadOnly(); ~MainThreadOnly();
...@@ -77,18 +84,23 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl ...@@ -77,18 +84,23 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl
// Indicates that we should yield DoWork ASAP. // Indicates that we should yield DoWork ASAP.
bool quit_do_work = false; bool quit_do_work = false;
// Whether high resolution timing is enabled or not.
bool in_high_res_mode = false;
// Used to prevent redundant calls to ScheduleWork / ScheduleDelayedWork.
bool immediate_do_work_posted = false;
// Number of tasks processed in a single DoWork invocation. // Number of tasks processed in a single DoWork invocation.
int batch_size = 1; int work_batch_size = 1;
// Number of RunLoop layers currently running. // Number of DoWorks on the stack. Must be >= |nesting_depth|.
int run_depth = 0; int do_work_running_count = 0;
// Number of DoWork running, but only the inner-most one can take tasks. // Number of nested RunLoops on the stack.
// Must be equal to |run_depth| or |run_depth - 1|. int nesting_depth = 0;
int do_work_depth = 0;
// Whether high resolution timing is enabled or not. // When the next scheduled delayed work should run, if any.
bool in_high_res_mode = false; TimeTicks next_delayed_do_work = TimeTicks::Max();
}; };
MainThreadOnly& main_thread_only() { MainThreadOnly& main_thread_only() {
...@@ -96,11 +108,9 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl ...@@ -96,11 +108,9 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl
return main_thread_only_; return main_thread_only_;
} }
// Returns true if there's a DoWork running on the inner-most nesting layer. const MainThreadOnly& main_thread_only() const {
bool is_doing_work() const {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
return main_thread_only_.do_work_depth == main_thread_only_.run_depth && return main_thread_only_;
main_thread_only_.do_work_depth != 0;
} }
scoped_refptr<AssociatedThreadId> associated_thread_; scoped_refptr<AssociatedThreadId> associated_thread_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment