Commit f7da13a2 authored by Gabriel Charette's avatar Gabriel Charette Committed by Commit Bot

[MessageLoop] Lock-free ScheduleWork() scheme

The Lock is causing hangs because of priority inversion
mixed with priority boosting (ScheduleWork() tends to
boost the destination thread which may deschedule the
posting thread; if the posting thread is a background
thread this boost-induded-desched-while-holding-lock
can cause a livelock). See https://crbug.com/890978#c10
for example crashes catching this.

The Lock was only necessary for startup/shutdown and is
being replaced by a lock-free atomic scheme in this CL.

MessagePump::ScheduleWork() itself was already thread-safe
(but the Android impl did unnecessarily check a non-atomic bool)

This adds a WaitableEvent in ~MessageLoop(); hence the requirement
for a wait-allowance in net's EmbeddedTestServer.

TBR=zhongyi@chromium.org (embedded_test_server.cc side-effects)

Bug: 890978, 874237
Change-Id: I0916e5a99035a935b0a23a770af256f334e78c43
Reviewed-on: https://chromium-review.googlesource.com/c/1278631
Commit-Queue: Gabriel Charette <gab@chromium.org>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#601600}
parent dd3415c5
......@@ -5,6 +5,7 @@
#include "base/message_loop/message_loop.h"
#include <algorithm>
#include <atomic>
#include <utility>
#include "base/bind.h"
......@@ -18,7 +19,9 @@
#include "base/message_loop/message_pump_for_io.h"
#include "base/message_loop/message_pump_for_ui.h"
#include "base/message_loop/sequenced_task_source.h"
#include "base/optional.h"
#include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/thread_id_name_manager.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/trace_event/trace_event.h"
......@@ -42,9 +45,9 @@ std::unique_ptr<MessagePump> ReturnPump(std::unique_ptr<MessagePump> pump) {
class MessageLoop::Controller : public SequencedTaskSource::Observer {
public:
// Constructs a MessageLoopController which controls |message_loop|, notifying
// |task_annotator_| when tasks are queued scheduling work on |message_loop|
// as fits. |message_loop| and |task_annotator_| will not be used after
// DisconnectFromParent() returns.
// |task_annotator_| when tasks are queued and scheduling work on
// |message_loop| as fits. |message_loop| and |task_annotator_| will not be
// used after DisconnectFromParent() returns.
Controller(MessageLoop* message_loop);
~Controller() override;
......@@ -53,10 +56,14 @@ class MessageLoop::Controller : public SequencedTaskSource::Observer {
void WillQueueTask(PendingTask* task) final;
void DidQueueTask(bool was_empty) final;
// Informs this Controller that it can start invoking
// |message_loop_->ScheduleWork()|. Must be invoked only once on the thread
// |message_loop_| is bound to (when it is bound).
void StartScheduling();
// Disconnects |message_loop_| from this Controller instance (DidQueueTask()
// will no-op from this point forward).
// will no-op from this point forward). Must be invoked only once on the
// thread |message_loop_| is bound to (when the thread is shutting down).
void DisconnectFromParent();
// Shares this Controller's TaskAnnotator with MessageLoop as TaskAnnotator
......@@ -65,35 +72,63 @@ class MessageLoop::Controller : public SequencedTaskSource::Observer {
debug::TaskAnnotator& task_annotator() { return task_annotator_; }
private:
// Helpers to be invoked before using |message_loop_| instead of operating
// directly on |operations_state_| below. BeforeOperation() returns true iff
// the operation is allowed to be performed (AfterOperation() should only be
// invoked, when done, if the operation was allowed).
bool BeforeOperation();
void AfterOperation();
// A TaskAnnotator which is owned by this Controller to be able to use it
// without locking |message_loop_lock_|. It cannot be owned by MessageLoop
// because this Controller cannot access |message_loop_| safely without the
// lock. Note: the TaskAnnotator API itself is thread-safe.
debug::TaskAnnotator task_annotator_;
// Lock that serializes |message_loop_->ScheduleWork()| and access to all
// members below.
base::Lock message_loop_lock_;
// Points to this Controller's outer MessageLoop instance. Null after
// DisconnectFromParent().
MessageLoop* message_loop_;
// False until StartScheduling() is called.
bool is_ready_for_scheduling_ = false;
// True if DidQueueTask() has been called before StartScheduling(); letting it
// know whether it needs to ScheduleWork() right away or not.
bool pending_schedule_work_ = false;
// An atomic representation of the ongoing operations and shutdown state. The
// lower bit (kDisconnectedBit) is used to indicate that
// DisconnectFromParent() was initiated. The other bits are used to indicate
// ongoing operations. As such this should be incremented by
// kOperationInProgress before making any operation on |message_loop_|.
// Conversely DisconnectFromParent() will wait on |safe_to_shutdown_| if this
// was non-zero when it set the lower bit.
static constexpr int kDisconnectedBit = 1;
static constexpr int kOperationInProgress = 1 << kDisconnectedBit;
std::atomic_int operations_state_{0};
// DisconnectFromParent() will instantiate and wait on this event if
// |operations_state_| wasn't zero when it set the lower bit. Whoever then
// completes the last in-progress operation needs to signal this event to
// resume the disconnect.
Optional<WaitableEvent> safe_to_shutdown_;
enum InitializationState {
// Initial state : ScheduleWork() cannot be called yet.
kNotReady,
// ScheduleWork() cannot be called yet but should be when transitioning to
// kReadyForScheduling.
kPendingWork,
// ScheduleWork() can be called now.
kReadyForScheduling,
};
std::atomic_int initialization_state_{kNotReady};
// Points to this Controller's outer MessageLoop instance.
// |initialization_state_| must be set to kReadyForScheduling before using
// this. |operations_state_| must then be incremented per the above protocol
// to use this.
MessageLoop* const message_loop_;
DISALLOW_COPY_AND_ASSIGN(Controller);
};
MessageLoop::Controller::Controller(MessageLoop* message_loop)
: message_loop_(message_loop) {}
: message_loop_(message_loop) {
DCHECK(message_loop_);
}
MessageLoop::Controller::~Controller() {
DCHECK(!message_loop_)
DCHECK(safe_to_shutdown_)
<< "DisconnectFromParent() needs to be invoked before destruction.";
}
......@@ -102,30 +137,106 @@ void MessageLoop::Controller::WillQueueTask(PendingTask* task) {
}
void MessageLoop::Controller::DidQueueTask(bool was_empty) {
// Avoid locking if we don't need to schedule.
if (!was_empty)
return;
AutoLock auto_lock(message_loop_lock_);
// Perform a lock-less check that we are ready for scheduling. If not,
// atomically inform StartScheduling() about the pending work.
int previous_state = kNotReady;
if (initialization_state_.compare_exchange_strong(
previous_state, kPendingWork, std::memory_order_acquire) ||
previous_state == kPendingWork) {
return;
}
DCHECK_EQ(previous_state, kReadyForScheduling);
if (message_loop_ && is_ready_for_scheduling_)
message_loop_->ScheduleWork();
else
pending_schedule_work_ = true;
if (!BeforeOperation())
return;
// Some scenarios can result in getting to this point on multiple threads at
// once, e.g.:
//
// Two threads post a task and both make the queue non-empty because an
// unrelated event (A) (e.g. timer or system event) woke up the MessageLoop
// thread in between, allowing it to process the first task, before either
// thread got to ScheduleWork() :
//
// T0(ML) ---[]↘------(A)--------------[]↘--------------------↗↔(racy SchedW)
// T1 --↗---↘-------------------↗Post-→(was_empty=true)---↑--↑SchedW()
// T2 -↗Post--→(was_empty=true)------(descheduled)--------↑SchedW()
// MessageLoop/MessagePump::ScheduleWork() is thread-safe so this is fine.
message_loop_->ScheduleWork();
AfterOperation();
}
void MessageLoop::Controller::StartScheduling() {
AutoLock lock(message_loop_lock_);
DCHECK(message_loop_);
DCHECK(!is_ready_for_scheduling_);
is_ready_for_scheduling_ = true;
if (pending_schedule_work_)
DCHECK_CALLED_ON_VALID_THREAD(message_loop_->bound_thread_checker_);
// std::memory_order_release because any thread that acquires this value and
// sees kReadyForScheduling needs to also see the state initialized on this
// thread before StartScheduling(). (this is also why matching reads use
// std::memory_order_acquire)
auto previous_state = initialization_state_.exchange(
kReadyForScheduling, std::memory_order_release);
DCHECK_NE(previous_state, kReadyForScheduling);
if (previous_state == kPendingWork)
message_loop_->ScheduleWork();
}
void MessageLoop::Controller::DisconnectFromParent() {
AutoLock lock(message_loop_lock_);
message_loop_ = nullptr;
DCHECK_CALLED_ON_VALID_THREAD(message_loop_->bound_thread_checker_);
DCHECK(!safe_to_shutdown_);
// Create the WaitableEvent before setting the disconnect bit as this
// guarantees it will be visible to the eventual signaling thread (per the
// release/acquire semantics used around the disconnect logic).
safe_to_shutdown_.emplace();
if (operations_state_.fetch_add(kDisconnectedBit,
std::memory_order_release) != 0) {
safe_to_shutdown_->Wait();
}
// Should always be disconnected with no operations in progress at this point.
DCHECK_EQ(operations_state_.load(), kDisconnectedBit);
}
bool MessageLoop::Controller::BeforeOperation() {
// std::memory_order_acquire is required to ensure that no operation on the
// current thread can be reordered before this one.
const bool allowed = (operations_state_.fetch_add(kOperationInProgress,
std::memory_order_acquire) &
kDisconnectedBit) == 0;
// Undo the increment if disallowed (and potentially signal if that racily
// ended up being the last operation).
if (!allowed)
AfterOperation();
return allowed;
}
void MessageLoop::Controller::AfterOperation() {
constexpr int kWasDisconnectedWithOnlyOneOperationLeft =
kOperationInProgress | kDisconnectedBit;
// std::memory_order_release is required to ensure that no operation on the
// current thread can be reordered after this one. Technically,
// acquire-semantics are only required if the conditional is true (to
// synchronize with DisconnectFromParent() before using |safe_to_shutdown_|).
// Per "Atomic-fence synchronization" semantics [1], it'd be sufficient to
// fetch_sub(std::memory_order_release) and only have a
// std::atomic_thread_fence(std::memory_order_acquire) inside the
// conditional's body. However, as documented in atomic_ref_count.h TSAN
// doesn't support fences at the moment. As such, this uses acq_rel for now.
// [1] https://en.cppreference.com/w/cpp/atomic/atomic_thread_fence
if (operations_state_.fetch_sub(kOperationInProgress,
std::memory_order_acq_rel) ==
kWasDisconnectedWithOnlyOneOperationLeft) {
safe_to_shutdown_->Signal();
}
}
//------------------------------------------------------------------------------
......
......@@ -260,8 +260,10 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate,
// destructor to make sure all the task's destructors get called.
void DeletePendingTasks();
// Wakes up the message pump. Can be called on any thread. The caller is
// responsible for synchronizing ScheduleWork() calls.
// Wakes up the message pump. Thread-safe (and callers should avoid holding a
// Lock at all cost while making this call as some platforms' priority
// boosting features have been observed to cause the caller to get descheduled
// : https://crbug.com/890978).
void ScheduleWork();
// Returns |next_run_time| capped at 1 day from |recent_time_|. This is used
......
......@@ -112,9 +112,11 @@ class BASE_EXPORT MessagePump {
virtual void Quit() = 0;
// Schedule a DoWork callback to happen reasonably soon. Does nothing if a
// DoWork callback is already scheduled. This method may be called from any
// thread. Once this call is made, DoWork should not be "starved" at least
// until it returns a value of false.
// DoWork callback is already scheduled. Once this call is made, DoWork should
// not be "starved" at least until it returns a value of false. Thread-safe
// (and callers should avoid holding a Lock at all cost while making this call
// as some platforms' priority boosting features have been observed to cause
// the caller to get descheduled : https://crbug.com/890978).
virtual void ScheduleWork() = 0;
// Schedule a DoDelayedWork callback to happen at the specified time,
......
......@@ -278,9 +278,6 @@ void MessagePumpForUI::Quit() {
}
void MessagePumpForUI::ScheduleWork() {
if (ShouldQuit())
return;
// Write (add) 1 to the eventfd. This tells the Looper to wake up and call our
// callback, allowing us to run tasks. This also allows us to detect, when we
// clear the fd, whether additional work was scheduled after we finished
......
......@@ -501,7 +501,7 @@ bool EmbeddedTestServer::PostTaskToIOThreadAndWait(
// already.
//
// To handle this situation, create temporary message loop to support the
// PostTaskAndReply operation if the current thread as no message loop.
// PostTaskAndReply operation if the current thread has no message loop.
std::unique_ptr<base::MessageLoop> temporary_loop;
if (!base::MessageLoopCurrent::Get())
temporary_loop.reset(new base::MessageLoop());
......@@ -513,6 +513,9 @@ bool EmbeddedTestServer::PostTaskToIOThreadAndWait(
}
run_loop.Run();
base::ScopedAllowBaseSyncPrimitivesForTesting allow_wait_for_loop_destruction;
temporary_loop.reset();
return true;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment