Commit 885b7c48 authored by Alexander Timin's avatar Alexander Timin Committed by Commit Bot

[sequence_manager] Improve ThreadControllerWithMessagePump.

Fixes required for SequenceManager/ThreadControllerWithMessagePump to
work properly on all platforms:

Always call MessagePump::ScheduleWork from EnsureWorkScheduled to avoid
deadlocks on some platforms.

Switch from implicit quit_do_work to quit_pending, which is set to false
when the current run loop (and ThreadController::Run) call ends.

Improve test coverage.

R=alexclarke@chromium.org
СС=gab@chromium.org
BUG=891670

Change-Id: I9fb554ed91cdca6982e773f4b29055b4ae99dc95
Reviewed-on: https://chromium-review.googlesource.com/c/1337501
Commit-Queue: Alexander Timin <altimin@chromium.org>
Reviewed-by: default avatarSami Kyöstilä <skyostil@chromium.org>
Reviewed-by: default avatarAlex Clarke <alexclarke@chromium.org>
Cr-Commit-Position: refs/heads/master@{#608521}
parent 4f899bbe
...@@ -41,6 +41,10 @@ struct BASE_EXPORT AssociatedThreadId ...@@ -41,6 +41,10 @@ struct BASE_EXPORT AssociatedThreadId
// the SequenceManager and TaskQueues on a different thread/sequence than the // the SequenceManager and TaskQueues on a different thread/sequence than the
// one it will manage. Should only be called once. // one it will manage. Should only be called once.
void BindToCurrentThread() { void BindToCurrentThread() {
// TODO(altimin): Remove this after MessageLoopImpl is gone and
// initialisation is simplified.
if (thread_id == PlatformThread::CurrentId())
return;
DCHECK_EQ(kInvalidThreadId, thread_id); DCHECK_EQ(kInvalidThreadId, thread_id);
thread_id = PlatformThread::CurrentId(); thread_id = PlatformThread::CurrentId();
......
...@@ -5,8 +5,7 @@ ...@@ -5,8 +5,7 @@
#ifndef BASE_TASK_SEQUENCE_MANAGER_THREAD_CONTROLLER_H_ #ifndef BASE_TASK_SEQUENCE_MANAGER_THREAD_CONTROLLER_H_
#define BASE_TASK_SEQUENCE_MANAGER_THREAD_CONTROLLER_H_ #define BASE_TASK_SEQUENCE_MANAGER_THREAD_CONTROLLER_H_
#include "base/message_loop/timer_slack.h" #include "base/message_loop/message_pump.h"
#include "base/run_loop.h"
#include "base/single_thread_task_runner.h" #include "base/single_thread_task_runner.h"
#include "base/task/sequence_manager/lazy_now.h" #include "base/task/sequence_manager/lazy_now.h"
#include "base/time/time.h" #include "base/time/time.h"
...@@ -81,8 +80,11 @@ class ThreadController { ...@@ -81,8 +80,11 @@ class ThreadController {
virtual void BindToCurrentThread( virtual void BindToCurrentThread(
std::unique_ptr<MessagePump> message_pump) = 0; std::unique_ptr<MessagePump> message_pump) = 0;
// Explicitly allow or disallow task execution. Implicitly disallowed when
// entering a nested runloop.
virtual void SetTaskExecutionAllowed(bool allowed) = 0; virtual void SetTaskExecutionAllowed(bool allowed) = 0;
// Whether task execution is allowed or not.
virtual bool IsTaskExecutionAllowed() const = 0; virtual bool IsTaskExecutionAllowed() const = 0;
// Returns the MessagePump we're bound to if any. // Returns the MessagePump we're bound to if any.
......
...@@ -84,9 +84,10 @@ void ThreadControllerWithMessagePumpImpl::BindToCurrentThread( ...@@ -84,9 +84,10 @@ void ThreadControllerWithMessagePumpImpl::BindToCurrentThread(
scoped_set_sequence_local_storage_map_for_current_thread_ = std::make_unique< scoped_set_sequence_local_storage_map_for_current_thread_ = std::make_unique<
base::internal::ScopedSetSequenceLocalStorageMapForCurrentThread>( base::internal::ScopedSetSequenceLocalStorageMapForCurrentThread>(
&sequence_local_storage_map_); &sequence_local_storage_map_);
if (task_runner_to_set_) { {
SetDefaultTaskRunner(task_runner_to_set_); AutoLock task_runner_lock(task_runner_lock_);
task_runner_to_set_ = nullptr; if (task_runner_)
InitializeThreadTaskRunnerHandle();
} }
if (should_schedule_work_after_bind_) if (should_schedule_work_after_bind_)
ScheduleWork(); ScheduleWork();
...@@ -138,15 +139,14 @@ void ThreadControllerWithMessagePumpImpl::SetNextDelayedDoWork( ...@@ -138,15 +139,14 @@ void ThreadControllerWithMessagePumpImpl::SetNextDelayedDoWork(
if (main_thread_only().next_delayed_do_work == run_time) if (main_thread_only().next_delayed_do_work == run_time)
return; return;
// Don't post a DoWork if there's an immediate DoWork in flight or if we're run_time = CapAtOneDay(run_time, lazy_now);
// inside a top level DoWork. We can rely on a continuation being posted as main_thread_only().next_delayed_do_work = run_time;
// needed.
// Do not call ScheduleDelayedWork if there is an immediate DoWork scheduled.
// We can rely on calling ScheduleDelayedWork from the next DoWork call.
if (main_thread_only().immediate_do_work_posted || InTopLevelDoWork()) if (main_thread_only().immediate_do_work_posted || InTopLevelDoWork())
return; return;
run_time = CapAtOneDay(run_time, lazy_now);
main_thread_only().next_delayed_do_work = run_time;
// |pump_| can't be null as all postTasks are cross-thread before binding, // |pump_| can't be null as all postTasks are cross-thread before binding,
// and delayed cross-thread postTasks do the thread hop through an immediate // and delayed cross-thread postTasks do the thread hop through an immediate
// task. // task.
...@@ -163,18 +163,22 @@ bool ThreadControllerWithMessagePumpImpl::RunsTasksInCurrentSequence() { ...@@ -163,18 +163,22 @@ bool ThreadControllerWithMessagePumpImpl::RunsTasksInCurrentSequence() {
void ThreadControllerWithMessagePumpImpl::SetDefaultTaskRunner( void ThreadControllerWithMessagePumpImpl::SetDefaultTaskRunner(
scoped_refptr<SingleThreadTaskRunner> task_runner) { scoped_refptr<SingleThreadTaskRunner> task_runner) {
if (associated_thread_->thread_id == kInvalidThreadId) { DCHECK(associated_thread_->thread_id == kInvalidThreadId ||
// Save task runner, it will be set in BindToCurrentThread. associated_thread_->thread_id == PlatformThread::CurrentId());
task_runner_to_set_ = std::move(task_runner); AutoLock lock(task_runner_lock_);
} else { task_runner_ = task_runner;
if (associated_thread_->thread_id != kInvalidThreadId) {
// Thread task runner handle will be created in BindToCurrentThread().
InitializeThreadTaskRunnerHandle();
}
}
void ThreadControllerWithMessagePumpImpl::InitializeThreadTaskRunnerHandle() {
// Only one ThreadTaskRunnerHandle can exist at any time, // Only one ThreadTaskRunnerHandle can exist at any time,
// so reset the old one. // so reset the old one.
main_thread_only().thread_task_runner_handle.reset(); main_thread_only().thread_task_runner_handle.reset();
main_thread_only().thread_task_runner_handle = main_thread_only().thread_task_runner_handle =
std::make_unique<ThreadTaskRunnerHandle>(task_runner); std::make_unique<ThreadTaskRunnerHandle>(task_runner_);
AutoLock lock(task_runner_lock_);
task_runner_ = std::move(task_runner);
}
} }
scoped_refptr<SingleThreadTaskRunner> scoped_refptr<SingleThreadTaskRunner>
...@@ -222,6 +226,8 @@ bool ThreadControllerWithMessagePumpImpl::DoWorkImpl( ...@@ -222,6 +226,8 @@ bool ThreadControllerWithMessagePumpImpl::DoWorkImpl(
base::TimeTicks* next_run_time) { base::TimeTicks* next_run_time) {
if (!main_thread_only().task_execution_allowed) if (!main_thread_only().task_execution_allowed)
return false; return false;
if (main_thread_only().quit_pending)
return false;
DCHECK(main_thread_only().task_source); DCHECK(main_thread_only().task_source);
bool task_ran = false; bool task_ran = false;
...@@ -244,22 +250,19 @@ bool ThreadControllerWithMessagePumpImpl::DoWorkImpl( ...@@ -244,22 +250,19 @@ bool ThreadControllerWithMessagePumpImpl::DoWorkImpl(
main_thread_only().task_source->DidRunTask(); main_thread_only().task_source->DidRunTask();
// If we have executed a delayed task, reset the next delayed do work. // If we have executed a task, reset the next delayed do work.
if (next_run_time)
main_thread_only().next_delayed_do_work = TimeTicks(); main_thread_only().next_delayed_do_work = TimeTicks();
// When Quit() is called we must stop running the batch because the caller // When Quit() is called we must stop running the batch because the caller
// expects per-task granularity. // expects per-task granularity.
if (main_thread_only().quit_do_work) if (main_thread_only().quit_pending)
break; break;
} }
main_thread_only().do_work_running_count--; main_thread_only().do_work_running_count--;
if (main_thread_only().quit_do_work) { if (main_thread_only().quit_pending)
main_thread_only().quit_do_work = false;
return task_ran; return task_ran;
}
LazyNow lazy_now(time_source_); LazyNow lazy_now(time_source_);
TimeDelta do_work_delay = TimeDelta do_work_delay =
...@@ -321,6 +324,10 @@ bool ThreadControllerWithMessagePumpImpl::DoIdleWork() { ...@@ -321,6 +324,10 @@ bool ThreadControllerWithMessagePumpImpl::DoIdleWork() {
void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed) { void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed) {
DCHECK(RunsTasksInCurrentSequence()); DCHECK(RunsTasksInCurrentSequence());
// Quit may have been called outside of a Run(), so |quit_pending| might be
// true here. We can't use InTopLevelDoWork() in Quit() as this call may be
// outside top-level DoWork but still in Run().
main_thread_only().quit_pending = false;
if (application_tasks_allowed && !main_thread_only().task_execution_allowed) { if (application_tasks_allowed && !main_thread_only().task_execution_allowed) {
// Allow nested task execution as explicitly requested. // Allow nested task execution as explicitly requested.
DCHECK(RunLoop::IsNestedOnCurrentThread()); DCHECK(RunLoop::IsNestedOnCurrentThread());
...@@ -330,6 +337,7 @@ void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed) { ...@@ -330,6 +337,7 @@ void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed) {
} else { } else {
pump_->Run(this); pump_->Run(this);
} }
main_thread_only().quit_pending = false;
} }
void ThreadControllerWithMessagePumpImpl::OnBeginNestedRunLoop() { void ThreadControllerWithMessagePumpImpl::OnBeginNestedRunLoop() {
...@@ -348,15 +356,14 @@ void ThreadControllerWithMessagePumpImpl::OnExitNestedRunLoop() { ...@@ -348,15 +356,14 @@ void ThreadControllerWithMessagePumpImpl::OnExitNestedRunLoop() {
void ThreadControllerWithMessagePumpImpl::Quit() { void ThreadControllerWithMessagePumpImpl::Quit() {
DCHECK(RunsTasksInCurrentSequence()); DCHECK(RunsTasksInCurrentSequence());
// Interrupt a batch of work. // Interrupt a batch of work.
if (InTopLevelDoWork()) main_thread_only().quit_pending = true;
main_thread_only().quit_do_work = true;
// If we're in a nested RunLoop, continuation will be posted if necessary. // If we're in a nested RunLoop, continuation will be posted if necessary.
pump_->Quit(); pump_->Quit();
} }
void ThreadControllerWithMessagePumpImpl::EnsureWorkScheduled() { void ThreadControllerWithMessagePumpImpl::EnsureWorkScheduled() {
pump_->ScheduleWork();
main_thread_only().immediate_do_work_posted = true; main_thread_only().immediate_do_work_posted = true;
ScheduleWork();
} }
void ThreadControllerWithMessagePumpImpl::SetTaskExecutionAllowed( void ThreadControllerWithMessagePumpImpl::SetTaskExecutionAllowed(
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include "base/debug/task_annotator.h" #include "base/debug/task_annotator.h"
#include "base/message_loop/message_pump.h" #include "base/message_loop/message_pump.h"
#include "base/optional.h" #include "base/optional.h"
#include "base/run_loop.h"
#include "base/task/sequence_manager/associated_thread_id.h" #include "base/task/sequence_manager/associated_thread_id.h"
#include "base/task/sequence_manager/moveable_auto_lock.h" #include "base/task/sequence_manager/moveable_auto_lock.h"
#include "base/task/sequence_manager/sequenced_task_source.h" #include "base/task/sequence_manager/sequenced_task_source.h"
...@@ -76,19 +77,22 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl ...@@ -76,19 +77,22 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl
bool DoDelayedWork(TimeTicks* next_run_time) override; bool DoDelayedWork(TimeTicks* next_run_time) override;
bool DoIdleWork() override; bool DoIdleWork() override;
private:
friend class DoWorkScope;
friend class RunScope;
// RunLoop::Delegate implementation. // RunLoop::Delegate implementation.
void Run(bool application_tasks_allowed) override; void Run(bool application_tasks_allowed) override;
void Quit() override; void Quit() override;
void EnsureWorkScheduled() override; void EnsureWorkScheduled() override;
private:
friend class DoWorkScope;
friend class RunScope;
bool DoWorkImpl(base::TimeTicks* next_run_time); bool DoWorkImpl(base::TimeTicks* next_run_time);
bool InTopLevelDoWork() const; bool InTopLevelDoWork() const;
void InitializeThreadTaskRunnerHandle()
EXCLUSIVE_LOCKS_REQUIRED(task_runner_lock_);
struct MainThreadOnly { struct MainThreadOnly {
MainThreadOnly(); MainThreadOnly();
~MainThreadOnly(); ~MainThreadOnly();
...@@ -98,7 +102,7 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl ...@@ -98,7 +102,7 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl
std::unique_ptr<ThreadTaskRunnerHandle> thread_task_runner_handle; std::unique_ptr<ThreadTaskRunnerHandle> thread_task_runner_handle;
// Indicates that we should yield DoWork ASAP. // Indicates that we should yield DoWork ASAP.
bool quit_do_work = false; bool quit_pending = false;
// Whether high resolution timing is enabled or not. // Whether high resolution timing is enabled or not.
bool in_high_res_mode = false; bool in_high_res_mode = false;
...@@ -154,7 +158,6 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl ...@@ -154,7 +158,6 @@ class BASE_EXPORT ThreadControllerWithMessagePumpImpl
debug::TaskAnnotator task_annotator_; debug::TaskAnnotator task_annotator_;
const TickClock* time_source_; // Not owned. const TickClock* time_source_; // Not owned.
scoped_refptr<SingleThreadTaskRunner> task_runner_to_set_;
// Required to register the current thread as a sequence. // Required to register the current thread as a sequence.
base::internal::SequenceLocalStorageMap sequence_local_storage_map_; base::internal::SequenceLocalStorageMap sequence_local_storage_map_;
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "base/task/sequence_manager/thread_controller_with_message_pump_impl.h" #include "base/task/sequence_manager/thread_controller_with_message_pump_impl.h"
#include "base/bind_helpers.h"
#include "base/memory/scoped_refptr.h" #include "base/memory/scoped_refptr.h"
#include "base/single_thread_task_runner.h" #include "base/single_thread_task_runner.h"
#include "base/test/mock_callback.h" #include "base/test/mock_callback.h"
...@@ -32,9 +33,11 @@ class ThreadControllerForTest ...@@ -32,9 +33,11 @@ class ThreadControllerForTest
const TickClock* clock) const TickClock* clock)
: ThreadControllerWithMessagePumpImpl(std::move(pump), clock) {} : ThreadControllerWithMessagePumpImpl(std::move(pump), clock) {}
using ThreadControllerWithMessagePumpImpl::DoWork;
using ThreadControllerWithMessagePumpImpl::DoDelayedWork; using ThreadControllerWithMessagePumpImpl::DoDelayedWork;
using ThreadControllerWithMessagePumpImpl::DoIdleWork; using ThreadControllerWithMessagePumpImpl::DoIdleWork;
using ThreadControllerWithMessagePumpImpl::DoWork;
using ThreadControllerWithMessagePumpImpl::EnsureWorkScheduled;
using ThreadControllerWithMessagePumpImpl::Quit;
}; };
class MockMessagePump : public MessagePump { class MockMessagePump : public MessagePump {
...@@ -301,6 +304,8 @@ TEST_F(ThreadControllerWithMessagePumpTest, ...@@ -301,6 +304,8 @@ TEST_F(ThreadControllerWithMessagePumpTest,
EXPECT_FALSE(delegate->DoWork()); EXPECT_FALSE(delegate->DoWork());
log.push_back("exiting nested runloop"); log.push_back("exiting nested runloop");
})); }));
// An extra schedule work will be called when entering a nested runloop.
EXPECT_CALL(*message_pump_, ScheduleWork());
task_source_.AddTask( task_source_.AddTask(
PendingTask(FROM_HERE, PendingTask(FROM_HERE,
...@@ -357,5 +362,109 @@ TEST_F(ThreadControllerWithMessagePumpTest, SetDefaultTaskRunner) { ...@@ -357,5 +362,109 @@ TEST_F(ThreadControllerWithMessagePumpTest, SetDefaultTaskRunner) {
EXPECT_EQ(task_runner2, ThreadTaskRunnerHandle::Get()); EXPECT_EQ(task_runner2, ThreadTaskRunnerHandle::Get());
} }
TEST_F(ThreadControllerWithMessagePumpTest, EnsureWorkScheduled) {
task_source_.AddTask(PendingTask(FROM_HERE, DoNothing(), TimeTicks()));
// Ensure that the first ScheduleWork() call results in the pump being called.
EXPECT_CALL(*message_pump_, ScheduleWork());
thread_controller_.ScheduleWork();
testing::Mock::VerifyAndClearExpectations(message_pump_);
// Ensure that the subsequent ScheduleWork() does not call the pump.
thread_controller_.ScheduleWork();
testing::Mock::VerifyAndClearExpectations(message_pump_);
// Ensure that EnsureWorkScheduled() forces a call to a pump.
EXPECT_CALL(*message_pump_, ScheduleWork());
thread_controller_.EnsureWorkScheduled();
testing::Mock::VerifyAndClearExpectations(message_pump_);
}
TEST_F(ThreadControllerWithMessagePumpTest, NoWorkAfterQuit) {
// Ensure that multiple DoWork calls are no-ops after Quit() is called
// in the current RunLoop.
ThreadTaskRunnerHandle handle(MakeRefCounted<FakeTaskRunner>());
std::vector<std::string> log;
EXPECT_CALL(*message_pump_, Run(_))
.WillOnce(Invoke([this](MessagePump::Delegate* delegate) {
EXPECT_EQ(delegate, &thread_controller_);
base::TimeTicks next_time;
EXPECT_TRUE(delegate->DoWork());
EXPECT_FALSE(delegate->DoDelayedWork(&next_time));
// We still have work to do, but subsequent calls to DoWork should
// do nothing as we're in the process of quitting the current loop.
EXPECT_FALSE(delegate->DoWork());
EXPECT_FALSE(delegate->DoDelayedWork(&next_time));
EXPECT_FALSE(delegate->DoWork());
EXPECT_FALSE(delegate->DoDelayedWork(&next_time));
}));
EXPECT_CALL(*message_pump_, Quit());
RunLoop run_loop;
task_source_.AddTask(
PendingTask(FROM_HERE,
base::BindOnce(
[](std::vector<std::string>* log, RunLoop* run_loop) {
log->push_back("task1");
run_loop->Quit();
},
&log, &run_loop),
TimeTicks()));
task_source_.AddTask(PendingTask(
FROM_HERE,
base::BindOnce(
[](std::vector<std::string>* log) { log->push_back("task2"); }, &log),
TimeTicks()));
run_loop.Run();
EXPECT_THAT(log, ElementsAre("task1"));
testing::Mock::VerifyAndClearExpectations(message_pump_);
}
TEST_F(ThreadControllerWithMessagePumpTest, EarlyQuit) {
// This test ensures that a opt-of-runloop Quit() (which is possible with some
// pump implementations) doesn't affect the next RunLoop::Run call.
ThreadTaskRunnerHandle handle(MakeRefCounted<FakeTaskRunner>());
std::vector<std::string> log;
// This quit should be a no-op for future calls.
EXPECT_CALL(*message_pump_, Quit());
thread_controller_.Quit();
testing::Mock::VerifyAndClearExpectations(message_pump_);
EXPECT_CALL(*message_pump_, Run(_))
.WillOnce(Invoke([this](MessagePump::Delegate* delegate) {
EXPECT_EQ(delegate, &thread_controller_);
EXPECT_TRUE(delegate->DoWork());
EXPECT_TRUE(delegate->DoWork());
EXPECT_FALSE(delegate->DoWork());
}));
RunLoop run_loop;
task_source_.AddTask(PendingTask(
FROM_HERE,
base::BindOnce(
[](std::vector<std::string>* log) { log->push_back("task1"); }, &log),
TimeTicks()));
task_source_.AddTask(PendingTask(
FROM_HERE,
base::BindOnce(
[](std::vector<std::string>* log) { log->push_back("task2"); }, &log),
TimeTicks()));
run_loop.RunUntilIdle();
EXPECT_THAT(log, ElementsAre("task1", "task2"));
testing::Mock::VerifyAndClearExpectations(message_pump_);
}
} // namespace sequence_manager } // namespace sequence_manager
} // namespace base } // namespace base
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment