Commit c8a1dc54 authored by Gabriel Charette's avatar Gabriel Charette Committed by Commit Bot

Reland "[MessageLoop] Lock-free ScheduleWork() scheme"

This is a reland of f7da13a2
(original change in PS1)

The original change was missing acquire ordering in
DisconnectFromParent(). This is necessary in order for
the disconnecting thread to see all memory side-effects
previously made by other threads (or some side-effects
of message_loop_->ScheduleWork() could racily come in
after ~MessageLoop()).

Also removed the DCHECK that
|operations_state_ == kDisconnectedBit| at the end of
DisconnectFromParent() as it was incorrect. A racy
BeforeOperation() call can make it 1->3 (and no-op) after
DisconnectFromParent() made it 0->1.

And lastly, added a scoped allowance to always allow the
very fast wait instead of requiring callers to know about
this implementation detail of MessageLoop (and reverted
changes to //net).

Original change's description:
> [MessageLoop] Lock-free ScheduleWork() scheme
>
> The Lock is causing hangs because of priority inversion
> mixed with priority boosting (ScheduleWork() tends to
> boost the destination thread which may deschedule the
> posting thread; if the posting thread is a background
> thread this boost-induded-desched-while-holding-lock
> can cause a livelock). See https://crbug.com/890978#c10
> for example crashes catching this.
>
> The Lock was only necessary for startup/shutdown and is
> being replaced by a lock-free atomic scheme in this CL.
>
> MessagePump::ScheduleWork() itself was already thread-safe
> (but the Android impl did unnecessarily check a non-atomic bool)
>
> This adds a WaitableEvent in ~MessageLoop(); hence the requirement
> for a wait-allowance in net's EmbeddedTestServer.
>
> TBR=zhongyi@chromium.org (embedded_test_server.cc side-effects)
>
> Bug: 890978, 874237
> Change-Id: I0916e5a99035a935b0a23a770af256f334e78c43
> Reviewed-on: https://chromium-review.googlesource.com/c/1278631
> Commit-Queue: Gabriel Charette <gab@chromium.org>
> Reviewed-by: François Doray <fdoray@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#601600}

Bug: 890978, 874237, 897925
Change-Id: I17c515f9a3169bbdfc303a4b259f34097e31730d
Reviewed-on: https://chromium-review.googlesource.com/c/1297129Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Commit-Queue: Gabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#602166}
parent e72bb599
......@@ -5,6 +5,7 @@
#include "base/message_loop/message_loop.h"
#include <algorithm>
#include <atomic>
#include <utility>
#include "base/bind.h"
......@@ -18,8 +19,11 @@
#include "base/message_loop/message_pump_for_io.h"
#include "base/message_loop/message_pump_for_ui.h"
#include "base/message_loop/sequenced_task_source.h"
#include "base/optional.h"
#include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/thread_id_name_manager.h"
#include "base/threading/thread_restrictions.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/trace_event/trace_event.h"
......@@ -42,9 +46,9 @@ std::unique_ptr<MessagePump> ReturnPump(std::unique_ptr<MessagePump> pump) {
class MessageLoop::Controller : public SequencedTaskSource::Observer {
public:
// Constructs a MessageLoopController which controls |message_loop|, notifying
// |task_annotator_| when tasks are queued scheduling work on |message_loop|
// as fits. |message_loop| and |task_annotator_| will not be used after
// DisconnectFromParent() returns.
// |task_annotator_| when tasks are queued and scheduling work on
// |message_loop| as fits. |message_loop| and |task_annotator_| will not be
// used after DisconnectFromParent() returns.
Controller(MessageLoop* message_loop);
~Controller() override;
......@@ -53,10 +57,14 @@ class MessageLoop::Controller : public SequencedTaskSource::Observer {
void WillQueueTask(PendingTask* task) final;
void DidQueueTask(bool was_empty) final;
// Informs this Controller that it can start invoking
// |message_loop_->ScheduleWork()|. Must be invoked only once on the thread
// |message_loop_| is bound to (when it is bound).
void StartScheduling();
// Disconnects |message_loop_| from this Controller instance (DidQueueTask()
// will no-op from this point forward).
// will no-op from this point forward). Must be invoked only once on the
// thread |message_loop_| is bound to (when the thread is shutting down).
void DisconnectFromParent();
// Shares this Controller's TaskAnnotator with MessageLoop as TaskAnnotator
......@@ -65,35 +73,63 @@ class MessageLoop::Controller : public SequencedTaskSource::Observer {
debug::TaskAnnotator& task_annotator() { return task_annotator_; }
private:
// Helpers to be invoked before using |message_loop_| instead of operating
// directly on |operations_state_| below. BeforeOperation() returns true iff
// the operation is allowed to be performed (in which case, AfterOperation()
// must be invoked when done).
bool BeforeOperation();
void AfterOperation();
// A TaskAnnotator which is owned by this Controller to be able to use it
// without locking |message_loop_lock_|. It cannot be owned by MessageLoop
// because this Controller cannot access |message_loop_| safely without the
// lock. Note: the TaskAnnotator API itself is thread-safe.
debug::TaskAnnotator task_annotator_;
// Lock that serializes |message_loop_->ScheduleWork()| and access to all
// members below.
base::Lock message_loop_lock_;
// Points to this Controller's outer MessageLoop instance. Null after
// DisconnectFromParent().
MessageLoop* message_loop_;
// False until StartScheduling() is called.
bool is_ready_for_scheduling_ = false;
// True if DidQueueTask() has been called before StartScheduling(); letting it
// know whether it needs to ScheduleWork() right away or not.
bool pending_schedule_work_ = false;
// An atomic representation of the ongoing operations and shutdown state. The
// lower bit (kDisconnectedBit) is used to indicate that
// DisconnectFromParent() was initiated. The other bits are used to indicate
// ongoing operations. As such this should be incremented by
// kOperationInProgress before making any operation on |message_loop_|.
// Conversely DisconnectFromParent() will wait on |safe_to_shutdown_| if this
// was non-zero when it set the lower bit.
static constexpr int kDisconnectedBit = 1;
static constexpr int kOperationInProgress = 1 << kDisconnectedBit;
std::atomic_int operations_state_{0};
// DisconnectFromParent() will instantiate and wait on this event if
// |operations_state_| wasn't zero when it set the lower bit. Whoever then
// completes the last in-progress operation needs to signal this event to
// resume the disconnect.
Optional<WaitableEvent> safe_to_shutdown_;
enum InitializationState {
// Initial state : ScheduleWork() cannot be called yet.
kNotReady,
// ScheduleWork() cannot be called yet but should be when transitioning to
// kReadyForScheduling.
kPendingWork,
// ScheduleWork() can be called now.
kReadyForScheduling,
};
std::atomic_int initialization_state_{kNotReady};
// Points to this Controller's outer MessageLoop instance.
// |initialization_state_| must be set to kReadyForScheduling before using
// this. |operations_state_| must then be incremented per the above protocol
// to use this.
MessageLoop* const message_loop_;
DISALLOW_COPY_AND_ASSIGN(Controller);
};
MessageLoop::Controller::Controller(MessageLoop* message_loop)
: message_loop_(message_loop) {}
: message_loop_(message_loop) {
DCHECK(message_loop_);
}
MessageLoop::Controller::~Controller() {
DCHECK(!message_loop_)
DCHECK(safe_to_shutdown_)
<< "DisconnectFromParent() needs to be invoked before destruction.";
}
......@@ -102,30 +138,107 @@ void MessageLoop::Controller::WillQueueTask(PendingTask* task) {
}
void MessageLoop::Controller::DidQueueTask(bool was_empty) {
// Avoid locking if we don't need to schedule.
if (!was_empty)
return;
AutoLock auto_lock(message_loop_lock_);
// Perform a lock-less check that we are ready for scheduling. If not,
// atomically inform StartScheduling() about the pending work.
// std::memory_order_acquire as documented in StartScheduling().
int previous_state = kNotReady;
if (initialization_state_.compare_exchange_strong(
previous_state, kPendingWork, std::memory_order_acquire) ||
previous_state == kPendingWork) {
return;
}
DCHECK_EQ(previous_state, kReadyForScheduling);
if (message_loop_ && is_ready_for_scheduling_)
message_loop_->ScheduleWork();
else
pending_schedule_work_ = true;
if (!BeforeOperation())
return;
// Some scenarios can result in getting to this point on multiple threads at
// once, e.g.:
//
// Two threads post a task and both make the queue non-empty because an
// unrelated event (A) (e.g. timer or system event) woke up the MessageLoop
// thread in between, allowing it to process the first task, before either
// thread got to ScheduleWork() :
//
// T0(ML) ---[]↘------(A)--------------[]↘--------------------↗↔(racy SchedW)
// T1 --↗---↘-------------------↗Post-→(was_empty=true)---↑--↑SchedW()
// T2 -↗Post--→(was_empty=true)------(descheduled)--------↑SchedW()
// MessageLoop/MessagePump::ScheduleWork() is thread-safe so this is fine.
message_loop_->ScheduleWork();
AfterOperation();
}
void MessageLoop::Controller::StartScheduling() {
AutoLock lock(message_loop_lock_);
DCHECK(message_loop_);
DCHECK(!is_ready_for_scheduling_);
is_ready_for_scheduling_ = true;
if (pending_schedule_work_)
DCHECK_CALLED_ON_VALID_THREAD(message_loop_->bound_thread_checker_);
// std::memory_order_release because any thread that acquires this value and
// sees kReadyForScheduling needs to also see the state initialized on this
// thread before StartScheduling(). (this is also why matching reads use
// std::memory_order_acquire)
auto previous_state = initialization_state_.exchange(
kReadyForScheduling, std::memory_order_release);
DCHECK_NE(previous_state, kReadyForScheduling);
if (previous_state == kPendingWork)
message_loop_->ScheduleWork();
}
void MessageLoop::Controller::DisconnectFromParent() {
AutoLock lock(message_loop_lock_);
message_loop_ = nullptr;
DCHECK_CALLED_ON_VALID_THREAD(message_loop_->bound_thread_checker_);
DCHECK(!safe_to_shutdown_);
safe_to_shutdown_.emplace();
// Acquire semantics are required to guarantee that all memory side-effects
// made to |message_loop_| by other threads are visible to this thread before
// it returns from this method. Release semantics are required to guarantee
// that threads seeing the disconnect bit will also see a fully initialized
// |safe_to_shutdown_|.
if (operations_state_.fetch_add(kDisconnectedBit,
std::memory_order_acq_rel) != 0) {
ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait_for_fast_ops;
safe_to_shutdown_->Wait();
}
}
bool MessageLoop::Controller::BeforeOperation() {
// Acquire semantics are required to ensure that no operation on the current
// thread can be reordered before this one.
const bool allowed = (operations_state_.fetch_add(kOperationInProgress,
std::memory_order_acquire) &
kDisconnectedBit) == 0;
// Undo the increment if disallowed (and potentially signal if that racily
// ended up being the last operation).
if (!allowed)
AfterOperation();
return allowed;
}
void MessageLoop::Controller::AfterOperation() {
constexpr int kWasDisconnectedWithOnlyOneOperationLeft =
kOperationInProgress | kDisconnectedBit;
// Release semantics are required to ensure that no operation on the current
// thread can be reordered after this one. Technically, acquire semantics are
// only required if the conditional is true (to synchronize with
// DisconnectFromParent() before using |safe_to_shutdown_|). As such, per
// "Atomic-fence synchronization" semantics [1], it'd be sufficient to
// fetch_sub(std::memory_order_release) and only have a
// std::atomic_thread_fence(std::memory_order_acquire) inside the
// conditional's body. However, as documented in atomic_ref_count.h TSAN
// doesn't support fences at the moment. Hence, this uses acq_rel for now.
// [1] https://en.cppreference.com/w/cpp/atomic/atomic_thread_fence
if (operations_state_.fetch_sub(kOperationInProgress,
std::memory_order_acq_rel) ==
kWasDisconnectedWithOnlyOneOperationLeft) {
safe_to_shutdown_->Signal();
}
}
//------------------------------------------------------------------------------
......
......@@ -260,8 +260,10 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate,
// destructor to make sure all the task's destructors get called.
void DeletePendingTasks();
// Wakes up the message pump. Can be called on any thread. The caller is
// responsible for synchronizing ScheduleWork() calls.
// Wakes up the message pump. Thread-safe (and callers should avoid holding a
// Lock at all cost while making this call as some platforms' priority
// boosting features have been observed to cause the caller to get descheduled
// : https://crbug.com/890978).
void ScheduleWork();
// Returns |next_run_time| capped at 1 day from |recent_time_|. This is used
......
......@@ -112,9 +112,11 @@ class BASE_EXPORT MessagePump {
virtual void Quit() = 0;
// Schedule a DoWork callback to happen reasonably soon. Does nothing if a
// DoWork callback is already scheduled. This method may be called from any
// thread. Once this call is made, DoWork should not be "starved" at least
// until it returns a value of false.
// DoWork callback is already scheduled. Once this call is made, DoWork should
// not be "starved" at least until it returns a value of false. Thread-safe
// (and callers should avoid holding a Lock at all cost while making this call
// as some platforms' priority boosting features have been observed to cause
// the caller to get descheduled : https://crbug.com/890978).
virtual void ScheduleWork() = 0;
// Schedule a DoDelayedWork callback to happen at the specified time,
......
......@@ -278,9 +278,6 @@ void MessagePumpForUI::Quit() {
}
void MessagePumpForUI::ScheduleWork() {
if (ShouldQuit())
return;
// Write (add) 1 to the eventfd. This tells the Looper to wake up and call our
// callback, allowing us to run tasks. This also allows us to detect, when we
// clear the fd, whether additional work was scheduled after we finished
......
......@@ -149,6 +149,7 @@ class TaskTracker;
class AdjustOOMScoreHelper;
class GetAppOutputScopedAllowBaseSyncPrimitives;
class MessageLoop;
class SimpleThread;
class StackSamplingProfiler;
class Thread;
......@@ -353,6 +354,7 @@ class BASE_EXPORT ScopedAllowBaseSyncPrimitivesOutsideBlockingScope {
ThreadRestrictionsTest,
ScopedAllowBaseSyncPrimitivesOutsideBlockingScopeResetsState);
friend class ::KeyStorageLinux;
friend class base::MessageLoop;
friend class content::SynchronousCompositor;
friend class content::SynchronousCompositorHost;
friend class content::SynchronousCompositorSyncCallBridge;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment