Commit 243f975c authored by Yi Gu's avatar Yi Gu Committed by Commit Bot

Revert "[MessageLoop] Lock-free ScheduleWork() scheme"

This reverts commit f7da13a2.

Reason for revert: As per comments by Findit

Original change's description:
> [MessageLoop] Lock-free ScheduleWork() scheme
> 
> The Lock is causing hangs because of priority inversion
> mixed with priority boosting (ScheduleWork() tends to
> boost the destination thread which may deschedule the
> posting thread; if the posting thread is a background
> thread this boost-induded-desched-while-holding-lock
> can cause a livelock). See https://crbug.com/890978#c10
> for example crashes catching this.
> 
> The Lock was only necessary for startup/shutdown and is
> being replaced by a lock-free atomic scheme in this CL.
> 
> MessagePump::ScheduleWork() itself was already thread-safe
> (but the Android impl did unnecessarily check a non-atomic bool)
> 
> This adds a WaitableEvent in ~MessageLoop(); hence the requirement
> for a wait-allowance in net's EmbeddedTestServer.
> 
> TBR=zhongyi@chromium.org (embedded_test_server.cc side-effects)
> 
> Bug: 890978, 874237
> Change-Id: I0916e5a99035a935b0a23a770af256f334e78c43
> Reviewed-on: https://chromium-review.googlesource.com/c/1278631
> Commit-Queue: Gabriel Charette <gab@chromium.org>
> Reviewed-by: François Doray <fdoray@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#601600}

TBR=gab@chromium.org,fdoray@chromium.org,zhongyi@chromium.org

Change-Id: I521ef6ba2758f84d13a4f98d65cb41b276cb115e
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: 890978, 874237
Reviewed-on: https://chromium-review.googlesource.com/c/1294717Reviewed-by: default avatarYi Gu <yigu@chromium.org>
Commit-Queue: Yi Gu <yigu@chromium.org>
Cr-Commit-Position: refs/heads/master@{#601731}
parent c75ec2a2
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
#include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop.h"
#include <algorithm> #include <algorithm>
#include <atomic>
#include <utility> #include <utility>
#include "base/bind.h" #include "base/bind.h"
...@@ -19,9 +18,7 @@ ...@@ -19,9 +18,7 @@
#include "base/message_loop/message_pump_for_io.h" #include "base/message_loop/message_pump_for_io.h"
#include "base/message_loop/message_pump_for_ui.h" #include "base/message_loop/message_pump_for_ui.h"
#include "base/message_loop/sequenced_task_source.h" #include "base/message_loop/sequenced_task_source.h"
#include "base/optional.h"
#include "base/run_loop.h" #include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/thread_id_name_manager.h" #include "base/threading/thread_id_name_manager.h"
#include "base/threading/thread_task_runner_handle.h" #include "base/threading/thread_task_runner_handle.h"
#include "base/trace_event/trace_event.h" #include "base/trace_event/trace_event.h"
...@@ -45,9 +42,9 @@ std::unique_ptr<MessagePump> ReturnPump(std::unique_ptr<MessagePump> pump) { ...@@ -45,9 +42,9 @@ std::unique_ptr<MessagePump> ReturnPump(std::unique_ptr<MessagePump> pump) {
class MessageLoop::Controller : public SequencedTaskSource::Observer { class MessageLoop::Controller : public SequencedTaskSource::Observer {
public: public:
// Constructs a MessageLoopController which controls |message_loop|, notifying // Constructs a MessageLoopController which controls |message_loop|, notifying
// |task_annotator_| when tasks are queued and scheduling work on // |task_annotator_| when tasks are queued scheduling work on |message_loop|
// |message_loop| as fits. |message_loop| and |task_annotator_| will not be // as fits. |message_loop| and |task_annotator_| will not be used after
// used after DisconnectFromParent() returns. // DisconnectFromParent() returns.
Controller(MessageLoop* message_loop); Controller(MessageLoop* message_loop);
~Controller() override; ~Controller() override;
...@@ -56,14 +53,10 @@ class MessageLoop::Controller : public SequencedTaskSource::Observer { ...@@ -56,14 +53,10 @@ class MessageLoop::Controller : public SequencedTaskSource::Observer {
void WillQueueTask(PendingTask* task) final; void WillQueueTask(PendingTask* task) final;
void DidQueueTask(bool was_empty) final; void DidQueueTask(bool was_empty) final;
// Informs this Controller that it can start invoking
// |message_loop_->ScheduleWork()|. Must be invoked only once on the thread
// |message_loop_| is bound to (when it is bound).
void StartScheduling(); void StartScheduling();
// Disconnects |message_loop_| from this Controller instance (DidQueueTask() // Disconnects |message_loop_| from this Controller instance (DidQueueTask()
// will no-op from this point forward). Must be invoked only once on the // will no-op from this point forward).
// thread |message_loop_| is bound to (when the thread is shutting down).
void DisconnectFromParent(); void DisconnectFromParent();
// Shares this Controller's TaskAnnotator with MessageLoop as TaskAnnotator // Shares this Controller's TaskAnnotator with MessageLoop as TaskAnnotator
...@@ -72,63 +65,35 @@ class MessageLoop::Controller : public SequencedTaskSource::Observer { ...@@ -72,63 +65,35 @@ class MessageLoop::Controller : public SequencedTaskSource::Observer {
debug::TaskAnnotator& task_annotator() { return task_annotator_; } debug::TaskAnnotator& task_annotator() { return task_annotator_; }
private: private:
// Helpers to be invoked before using |message_loop_| instead of operating
// directly on |operations_state_| below. BeforeOperation() returns true iff
// the operation is allowed to be performed (AfterOperation() should only be
// invoked, when done, if the operation was allowed).
bool BeforeOperation();
void AfterOperation();
// A TaskAnnotator which is owned by this Controller to be able to use it // A TaskAnnotator which is owned by this Controller to be able to use it
// without locking |message_loop_lock_|. It cannot be owned by MessageLoop // without locking |message_loop_lock_|. It cannot be owned by MessageLoop
// because this Controller cannot access |message_loop_| safely without the // because this Controller cannot access |message_loop_| safely without the
// lock. Note: the TaskAnnotator API itself is thread-safe. // lock. Note: the TaskAnnotator API itself is thread-safe.
debug::TaskAnnotator task_annotator_; debug::TaskAnnotator task_annotator_;
// An atomic representation of the ongoing operations and shutdown state. The // Lock that serializes |message_loop_->ScheduleWork()| and access to all
// lower bit (kDisconnectedBit) is used to indicate that // members below.
// DisconnectFromParent() was initiated. The other bits are used to indicate base::Lock message_loop_lock_;
// ongoing operations. As such this should be incremented by
// kOperationInProgress before making any operation on |message_loop_|. // Points to this Controller's outer MessageLoop instance. Null after
// Conversely DisconnectFromParent() will wait on |safe_to_shutdown_| if this // DisconnectFromParent().
// was non-zero when it set the lower bit. MessageLoop* message_loop_;
static constexpr int kDisconnectedBit = 1;
static constexpr int kOperationInProgress = 1 << kDisconnectedBit; // False until StartScheduling() is called.
std::atomic_int operations_state_{0}; bool is_ready_for_scheduling_ = false;
// DisconnectFromParent() will instantiate and wait on this event if // True if DidQueueTask() has been called before StartScheduling(); letting it
// |operations_state_| wasn't zero when it set the lower bit. Whoever then // know whether it needs to ScheduleWork() right away or not.
// completes the last in-progress operation needs to signal this event to bool pending_schedule_work_ = false;
// resume the disconnect.
Optional<WaitableEvent> safe_to_shutdown_;
enum InitializationState {
// Initial state : ScheduleWork() cannot be called yet.
kNotReady,
// ScheduleWork() cannot be called yet but should be when transitioning to
// kReadyForScheduling.
kPendingWork,
// ScheduleWork() can be called now.
kReadyForScheduling,
};
std::atomic_int initialization_state_{kNotReady};
// Points to this Controller's outer MessageLoop instance.
// |initialization_state_| must be set to kReadyForScheduling before using
// this. |operations_state_| must then be incremented per the above protocol
// to use this.
MessageLoop* const message_loop_;
DISALLOW_COPY_AND_ASSIGN(Controller); DISALLOW_COPY_AND_ASSIGN(Controller);
}; };
MessageLoop::Controller::Controller(MessageLoop* message_loop) MessageLoop::Controller::Controller(MessageLoop* message_loop)
: message_loop_(message_loop) { : message_loop_(message_loop) {}
DCHECK(message_loop_);
}
MessageLoop::Controller::~Controller() { MessageLoop::Controller::~Controller() {
DCHECK(safe_to_shutdown_) DCHECK(!message_loop_)
<< "DisconnectFromParent() needs to be invoked before destruction."; << "DisconnectFromParent() needs to be invoked before destruction.";
} }
...@@ -137,106 +102,30 @@ void MessageLoop::Controller::WillQueueTask(PendingTask* task) { ...@@ -137,106 +102,30 @@ void MessageLoop::Controller::WillQueueTask(PendingTask* task) {
} }
void MessageLoop::Controller::DidQueueTask(bool was_empty) { void MessageLoop::Controller::DidQueueTask(bool was_empty) {
// Avoid locking if we don't need to schedule.
if (!was_empty) if (!was_empty)
return; return;
// Perform a lock-less check that we are ready for scheduling. If not, AutoLock auto_lock(message_loop_lock_);
// atomically inform StartScheduling() about the pending work.
int previous_state = kNotReady;
if (initialization_state_.compare_exchange_strong(
previous_state, kPendingWork, std::memory_order_acquire) ||
previous_state == kPendingWork) {
return;
}
DCHECK_EQ(previous_state, kReadyForScheduling);
if (!BeforeOperation())
return;
// Some scenarios can result in getting to this point on multiple threads at
// once, e.g.:
//
// Two threads post a task and both make the queue non-empty because an
// unrelated event (A) (e.g. timer or system event) woke up the MessageLoop
// thread in between, allowing it to process the first task, before either
// thread got to ScheduleWork() :
//
// T0(ML) ---[]↘------(A)--------------[]↘--------------------↗↔(racy SchedW)
// T1 --↗---↘-------------------↗Post-→(was_empty=true)---↑--↑SchedW()
// T2 -↗Post--→(was_empty=true)------(descheduled)--------↑SchedW()
// MessageLoop/MessagePump::ScheduleWork() is thread-safe so this is fine. if (message_loop_ && is_ready_for_scheduling_)
message_loop_->ScheduleWork(); message_loop_->ScheduleWork();
else
AfterOperation(); pending_schedule_work_ = true;
} }
void MessageLoop::Controller::StartScheduling() { void MessageLoop::Controller::StartScheduling() {
DCHECK_CALLED_ON_VALID_THREAD(message_loop_->bound_thread_checker_); AutoLock lock(message_loop_lock_);
DCHECK(message_loop_);
// std::memory_order_release because any thread that acquires this value and DCHECK(!is_ready_for_scheduling_);
// sees kReadyForScheduling needs to also see the state initialized on this is_ready_for_scheduling_ = true;
// thread before StartScheduling(). (this is also why matching reads use if (pending_schedule_work_)
// std::memory_order_acquire)
auto previous_state = initialization_state_.exchange(
kReadyForScheduling, std::memory_order_release);
DCHECK_NE(previous_state, kReadyForScheduling);
if (previous_state == kPendingWork)
message_loop_->ScheduleWork(); message_loop_->ScheduleWork();
} }
void MessageLoop::Controller::DisconnectFromParent() { void MessageLoop::Controller::DisconnectFromParent() {
DCHECK_CALLED_ON_VALID_THREAD(message_loop_->bound_thread_checker_); AutoLock lock(message_loop_lock_);
DCHECK(!safe_to_shutdown_); message_loop_ = nullptr;
// Create the WaitableEvent before setting the disconnect bit as this
// guarantees it will be visible to the eventual signaling thread (per the
// release/acquire semantics used around the disconnect logic).
safe_to_shutdown_.emplace();
if (operations_state_.fetch_add(kDisconnectedBit,
std::memory_order_release) != 0) {
safe_to_shutdown_->Wait();
}
// Should always be disconnected with no operations in progress at this point.
DCHECK_EQ(operations_state_.load(), kDisconnectedBit);
}
bool MessageLoop::Controller::BeforeOperation() {
// std::memory_order_acquire is required to ensure that no operation on the
// current thread can be reordered before this one.
const bool allowed = (operations_state_.fetch_add(kOperationInProgress,
std::memory_order_acquire) &
kDisconnectedBit) == 0;
// Undo the increment if disallowed (and potentially signal if that racily
// ended up being the last operation).
if (!allowed)
AfterOperation();
return allowed;
}
void MessageLoop::Controller::AfterOperation() {
constexpr int kWasDisconnectedWithOnlyOneOperationLeft =
kOperationInProgress | kDisconnectedBit;
// std::memory_order_release is required to ensure that no operation on the
// current thread can be reordered after this one. Technically,
// acquire-semantics are only required if the conditional is true (to
// synchronize with DisconnectFromParent() before using |safe_to_shutdown_|).
// Per "Atomic-fence synchronization" semantics [1], it'd be sufficient to
// fetch_sub(std::memory_order_release) and only have a
// std::atomic_thread_fence(std::memory_order_acquire) inside the
// conditional's body. However, as documented in atomic_ref_count.h TSAN
// doesn't support fences at the moment. As such, this uses acq_rel for now.
// [1] https://en.cppreference.com/w/cpp/atomic/atomic_thread_fence
if (operations_state_.fetch_sub(kOperationInProgress,
std::memory_order_acq_rel) ==
kWasDisconnectedWithOnlyOneOperationLeft) {
safe_to_shutdown_->Signal();
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
......
...@@ -260,10 +260,8 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate, ...@@ -260,10 +260,8 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate,
// destructor to make sure all the task's destructors get called. // destructor to make sure all the task's destructors get called.
void DeletePendingTasks(); void DeletePendingTasks();
// Wakes up the message pump. Thread-safe (and callers should avoid holding a // Wakes up the message pump. Can be called on any thread. The caller is
// Lock at all cost while making this call as some platforms' priority // responsible for synchronizing ScheduleWork() calls.
// boosting features have been observed to cause the caller to get descheduled
// : https://crbug.com/890978).
void ScheduleWork(); void ScheduleWork();
// Returns |next_run_time| capped at 1 day from |recent_time_|. This is used // Returns |next_run_time| capped at 1 day from |recent_time_|. This is used
......
...@@ -112,11 +112,9 @@ class BASE_EXPORT MessagePump { ...@@ -112,11 +112,9 @@ class BASE_EXPORT MessagePump {
virtual void Quit() = 0; virtual void Quit() = 0;
// Schedule a DoWork callback to happen reasonably soon. Does nothing if a // Schedule a DoWork callback to happen reasonably soon. Does nothing if a
// DoWork callback is already scheduled. Once this call is made, DoWork should // DoWork callback is already scheduled. This method may be called from any
// not be "starved" at least until it returns a value of false. Thread-safe // thread. Once this call is made, DoWork should not be "starved" at least
// (and callers should avoid holding a Lock at all cost while making this call // until it returns a value of false.
// as some platforms' priority boosting features have been observed to cause
// the caller to get descheduled : https://crbug.com/890978).
virtual void ScheduleWork() = 0; virtual void ScheduleWork() = 0;
// Schedule a DoDelayedWork callback to happen at the specified time, // Schedule a DoDelayedWork callback to happen at the specified time,
......
...@@ -278,6 +278,9 @@ void MessagePumpForUI::Quit() { ...@@ -278,6 +278,9 @@ void MessagePumpForUI::Quit() {
} }
void MessagePumpForUI::ScheduleWork() { void MessagePumpForUI::ScheduleWork() {
if (ShouldQuit())
return;
// Write (add) 1 to the eventfd. This tells the Looper to wake up and call our // Write (add) 1 to the eventfd. This tells the Looper to wake up and call our
// callback, allowing us to run tasks. This also allows us to detect, when we // callback, allowing us to run tasks. This also allows us to detect, when we
// clear the fd, whether additional work was scheduled after we finished // clear the fd, whether additional work was scheduled after we finished
......
...@@ -505,7 +505,7 @@ bool EmbeddedTestServer::PostTaskToIOThreadAndWait( ...@@ -505,7 +505,7 @@ bool EmbeddedTestServer::PostTaskToIOThreadAndWait(
// already. // already.
// //
// To handle this situation, create temporary message loop to support the // To handle this situation, create temporary message loop to support the
// PostTaskAndReply operation if the current thread has no message loop. // PostTaskAndReply operation if the current thread as no message loop.
std::unique_ptr<base::MessageLoop> temporary_loop; std::unique_ptr<base::MessageLoop> temporary_loop;
if (!base::MessageLoopCurrent::Get()) if (!base::MessageLoopCurrent::Get())
temporary_loop.reset(new base::MessageLoop()); temporary_loop.reset(new base::MessageLoop());
...@@ -517,9 +517,6 @@ bool EmbeddedTestServer::PostTaskToIOThreadAndWait( ...@@ -517,9 +517,6 @@ bool EmbeddedTestServer::PostTaskToIOThreadAndWait(
} }
run_loop.Run(); run_loop.Run();
base::ScopedAllowBaseSyncPrimitivesForTesting allow_wait_for_loop_destruction;
temporary_loop.reset();
return true; return true;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment