Commit cc6bd5c9 authored by fdoray's avatar fdoray Committed by Commit bot

Make WaitableEventWatcher TaskScheduler-friendly.

With this CL, WaitableEventWatcher uses SequencedTaskRunnerHandle
instead of MessageLoop::current() to post back to the sequence that
called StartWatching().

Also, WaitableEventWatcher no longer registers itself as a destruction
observer of the MessageLoop from which StartWatching() is called. If
the watched WaitableEvent is signaled after the MessageLoop is
destroyed, the task posted by WaitableEventWatcher to the
SequencedTaskRunner will simply not run (no crash).

MessageLoop::current() and destruction observer do not work with
TaskScheduler.

BUG=650318

Review-Url: https://codereview.chromium.org/2368423002
Cr-Commit-Position: refs/heads/master@{#422856}
parent 70d50e7a
...@@ -6,13 +6,14 @@ ...@@ -6,13 +6,14 @@
#define BASE_SYNCHRONIZATION_WAITABLE_EVENT_WATCHER_H_ #define BASE_SYNCHRONIZATION_WAITABLE_EVENT_WATCHER_H_
#include "base/base_export.h" #include "base/base_export.h"
#include "base/macros.h"
#include "base/sequence_checker.h"
#include "build/build_config.h" #include "build/build_config.h"
#if defined(OS_WIN) #if defined(OS_WIN)
#include "base/win/object_watcher.h" #include "base/win/object_watcher.h"
#else #else
#include "base/callback.h" #include "base/callback.h"
#include "base/message_loop/message_loop.h"
#include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event.h"
#endif #endif
...@@ -26,8 +27,8 @@ class WaitableEvent; ...@@ -26,8 +27,8 @@ class WaitableEvent;
// This class provides a way to wait on a WaitableEvent asynchronously. // This class provides a way to wait on a WaitableEvent asynchronously.
// //
// Each instance of this object can be waiting on a single WaitableEvent. When // Each instance of this object can be waiting on a single WaitableEvent. When
// the waitable event is signaled, a callback is made in the thread of a given // the waitable event is signaled, a callback is invoked on the sequence that
// MessageLoop. This callback can be deleted by deleting the waiter. // called StartWatching(). This callback can be deleted by deleting the waiter.
// //
// Typical usage: // Typical usage:
// //
...@@ -60,53 +61,56 @@ class WaitableEvent; ...@@ -60,53 +61,56 @@ class WaitableEvent;
class BASE_EXPORT WaitableEventWatcher class BASE_EXPORT WaitableEventWatcher
#if defined(OS_WIN) #if defined(OS_WIN)
: public win::ObjectWatcher::Delegate { : public win::ObjectWatcher::Delegate
#else
: public MessageLoop::DestructionObserver {
#endif #endif
{
public: public:
typedef Callback<void(WaitableEvent*)> EventCallback; typedef Callback<void(WaitableEvent*)> EventCallback;
WaitableEventWatcher(); WaitableEventWatcher();
#if defined(OS_WIN)
~WaitableEventWatcher() override; ~WaitableEventWatcher() override;
#else
~WaitableEventWatcher();
#endif
// When @event is signaled, the given callback is called on the thread of the // When |event| is signaled, |callback| is called on the sequence that called
// current message loop when StartWatching is called. // StartWatching().
bool StartWatching(WaitableEvent* event, const EventCallback& callback); bool StartWatching(WaitableEvent* event, const EventCallback& callback);
// Cancel the current watch. Must be called from the same thread which // Cancel the current watch. Must be called from the same sequence which
// started the watch. // started the watch.
// //
// Does nothing if no event is being watched, nor if the watch has completed. // Does nothing if no event is being watched, nor if the watch has completed.
// The callback will *not* be called for the current watch after this // The callback will *not* be called for the current watch after this
// function returns. Since the callback runs on the same thread as this // function returns. Since the callback runs on the same sequence as this
// function, it cannot be called during this function either. // function, it cannot be called during this function either.
void StopWatching(); void StopWatching();
// Return the currently watched event, or NULL if no object is currently being
// watched.
WaitableEvent* GetWatchedEvent();
// Return the callback that will be invoked when the event is
// signaled.
const EventCallback& callback() const { return callback_; }
private: private:
#if defined(OS_WIN) #if defined(OS_WIN)
void OnObjectSignaled(HANDLE h) override; void OnObjectSignaled(HANDLE h) override;
win::ObjectWatcher watcher_; win::ObjectWatcher watcher_;
EventCallback callback_;
WaitableEvent* event_ = nullptr;
#else #else
// Implementation of MessageLoop::DestructionObserver // Instantiated in StartWatching(). Set before the callback runs. Reset in
void WillDestroyCurrentMessageLoop() override; // StopWatching() or StartWatching().
MessageLoop* message_loop_;
scoped_refptr<Flag> cancel_flag_; scoped_refptr<Flag> cancel_flag_;
AsyncWaiter* waiter_;
base::Closure internal_callback_; // Enqueued in the wait list of the watched WaitableEvent.
AsyncWaiter* waiter_ = nullptr;
// Kernel of the watched WaitableEvent.
scoped_refptr<WaitableEvent::WaitableEventKernel> kernel_; scoped_refptr<WaitableEvent::WaitableEventKernel> kernel_;
// Ensures that StartWatching() and StopWatching() are called on the same
// sequence.
SequenceChecker sequence_checker_;
#endif #endif
WaitableEvent* event_; DISALLOW_COPY_AND_ASSIGN(WaitableEventWatcher);
EventCallback callback_;
}; };
} // namespace base } // namespace base
......
...@@ -4,12 +4,12 @@ ...@@ -4,12 +4,12 @@
#include "base/synchronization/waitable_event_watcher.h" #include "base/synchronization/waitable_event_watcher.h"
#include <utility>
#include "base/bind.h" #include "base/bind.h"
#include "base/location.h" #include "base/logging.h"
#include "base/macros.h"
#include "base/single_thread_task_runner.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h" #include "base/threading/sequenced_task_runner_handle.h"
namespace base { namespace base {
...@@ -17,14 +17,15 @@ namespace base { ...@@ -17,14 +17,15 @@ namespace base {
// WaitableEventWatcher (async waits). // WaitableEventWatcher (async waits).
// //
// The basic design is that we add an AsyncWaiter to the wait-list of the event. // The basic design is that we add an AsyncWaiter to the wait-list of the event.
// That AsyncWaiter has a pointer to MessageLoop, and a Task to be posted to it. // That AsyncWaiter has a pointer to SequencedTaskRunner, and a Task to be
// The MessageLoop ends up running the task, which calls the delegate. // posted to it. The task ends up calling the callback when it runs on the
// sequence.
// //
// Since the wait can be canceled, we have a thread-safe Flag object which is // Since the wait can be canceled, we have a thread-safe Flag object which is
// set when the wait has been canceled. At each stage in the above, we check the // set when the wait has been canceled. At each stage in the above, we check the
// flag before going onto the next stage. Since the wait may only be canceled in // flag before going onto the next stage. Since the wait may only be canceled in
// the MessageLoop which runs the Task, we are assured that the delegate cannot // the sequence which runs the Task, we are assured that the callback cannot be
// be called after canceling... // called after canceling...
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// A thread-safe, reference-counted, write-once flag. // A thread-safe, reference-counted, write-once flag.
...@@ -54,23 +55,22 @@ class Flag : public RefCountedThreadSafe<Flag> { ...@@ -54,23 +55,22 @@ class Flag : public RefCountedThreadSafe<Flag> {
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// This is an asynchronous waiter which posts a task to a MessageLoop when // This is an asynchronous waiter which posts a task to a SequencedTaskRunner
// fired. An AsyncWaiter may only be in a single wait-list. // when fired. An AsyncWaiter may only be in a single wait-list.
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class AsyncWaiter : public WaitableEvent::Waiter { class AsyncWaiter : public WaitableEvent::Waiter {
public: public:
AsyncWaiter(MessageLoop* message_loop, AsyncWaiter(scoped_refptr<SequencedTaskRunner> task_runner,
const base::Closure& callback, const base::Closure& callback,
Flag* flag) Flag* flag)
: message_loop_(message_loop), : task_runner_(std::move(task_runner)),
callback_(callback), callback_(callback),
flag_(flag) { } flag_(flag) {}
bool Fire(WaitableEvent* event) override { bool Fire(WaitableEvent* event) override {
// Post the callback if we haven't been cancelled. // Post the callback if we haven't been cancelled.
if (!flag_->value()) { if (!flag_->value())
message_loop_->task_runner()->PostTask(FROM_HERE, callback_); task_runner_->PostTask(FROM_HERE, callback_);
}
// We are removed from the wait-list by the WaitableEvent itself. It only // We are removed from the wait-list by the WaitableEvent itself. It only
// remains to delete ourselves. // remains to delete ourselves.
...@@ -85,37 +85,37 @@ class AsyncWaiter : public WaitableEvent::Waiter { ...@@ -85,37 +85,37 @@ class AsyncWaiter : public WaitableEvent::Waiter {
bool Compare(void* tag) override { return tag == flag_.get(); } bool Compare(void* tag) override { return tag == flag_.get(); }
private: private:
MessageLoop *const message_loop_; const scoped_refptr<SequencedTaskRunner> task_runner_;
base::Closure callback_; const base::Closure callback_;
scoped_refptr<Flag> flag_; const scoped_refptr<Flag> flag_;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// For async waits we need to make a callback in a MessageLoop thread. We do // For async waits we need to run a callback on a sequence. We do this by
// this by posting a callback, which calls the delegate and keeps track of when // posting an AsyncCallbackHelper task, which calls the callback and keeps track
// the event is canceled. // of when the event is canceled.
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void AsyncCallbackHelper(Flag* flag, void AsyncCallbackHelper(Flag* flag,
const WaitableEventWatcher::EventCallback& callback, const WaitableEventWatcher::EventCallback& callback,
WaitableEvent* event) { WaitableEvent* event) {
// Runs in MessageLoop thread. // Runs on the sequence that called StartWatching().
if (!flag->value()) { if (!flag->value()) {
// This is to let the WaitableEventWatcher know that the event has occured // This is to let the WaitableEventWatcher know that the event has occured.
// because it needs to be able to return NULL from GetWatchedObject
flag->Set(); flag->Set();
callback.Run(event); callback.Run(event);
} }
} }
WaitableEventWatcher::WaitableEventWatcher() WaitableEventWatcher::WaitableEventWatcher() {
: message_loop_(NULL), sequence_checker_.DetachFromSequence();
cancel_flag_(NULL),
waiter_(NULL),
event_(NULL) {
} }
WaitableEventWatcher::~WaitableEventWatcher() { WaitableEventWatcher::~WaitableEventWatcher() {
StopWatching(); // The destructor may be called from a different sequence than StartWatching()
// when there is no active watch. To avoid triggering a DCHECK in
// StopWatching(), do not call it when there is no active watch.
if (cancel_flag_ && !cancel_flag_->value())
StopWatching();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -125,61 +125,44 @@ WaitableEventWatcher::~WaitableEventWatcher() { ...@@ -125,61 +125,44 @@ WaitableEventWatcher::~WaitableEventWatcher() {
bool WaitableEventWatcher::StartWatching( bool WaitableEventWatcher::StartWatching(
WaitableEvent* event, WaitableEvent* event,
const EventCallback& callback) { const EventCallback& callback) {
MessageLoop *const current_ml = MessageLoop::current(); DCHECK(sequence_checker_.CalledOnValidSequence());
DCHECK(current_ml) << "Cannot create WaitableEventWatcher without a " DCHECK(SequencedTaskRunnerHandle::Get());
"current MessageLoop";
// A user may call StartWatching from within the callback function. In this // A user may call StartWatching from within the callback function. In this
// case, we won't know that we have finished watching, expect that the Flag // case, we won't know that we have finished watching, expect that the Flag
// will have been set in AsyncCallbackHelper(). // will have been set in AsyncCallbackHelper().
if (cancel_flag_.get() && cancel_flag_->value()) { if (cancel_flag_.get() && cancel_flag_->value())
if (message_loop_) { cancel_flag_ = nullptr;
message_loop_->RemoveDestructionObserver(this);
message_loop_ = NULL;
}
cancel_flag_ = NULL;
}
DCHECK(!cancel_flag_.get()) << "StartWatching called while still watching"; DCHECK(!cancel_flag_) << "StartWatching called while still watching";
cancel_flag_ = new Flag; cancel_flag_ = new Flag;
callback_ = callback; const Closure internal_callback = base::Bind(
internal_callback_ = base::Bind( &AsyncCallbackHelper, base::RetainedRef(cancel_flag_), callback, event);
&AsyncCallbackHelper, base::RetainedRef(cancel_flag_), callback_, event);
WaitableEvent::WaitableEventKernel* kernel = event->kernel_.get(); WaitableEvent::WaitableEventKernel* kernel = event->kernel_.get();
AutoLock locked(kernel->lock_); AutoLock locked(kernel->lock_);
event_ = event;
if (kernel->signaled_) { if (kernel->signaled_) {
if (!kernel->manual_reset_) if (!kernel->manual_reset_)
kernel->signaled_ = false; kernel->signaled_ = false;
// No hairpinning - we can't call the delegate directly here. We have to // No hairpinning - we can't call the delegate directly here. We have to
// enqueue a task on the MessageLoop as normal. // post a task to the SequencedTaskRunnerHandle as usual.
current_ml->task_runner()->PostTask(FROM_HERE, internal_callback_); SequencedTaskRunnerHandle::Get()->PostTask(FROM_HERE, internal_callback);
return true; return true;
} }
message_loop_ = current_ml;
current_ml->AddDestructionObserver(this);
kernel_ = kernel; kernel_ = kernel;
waiter_ = new AsyncWaiter(current_ml, internal_callback_, cancel_flag_.get()); waiter_ = new AsyncWaiter(SequencedTaskRunnerHandle::Get(), internal_callback,
cancel_flag_.get());
event->Enqueue(waiter_); event->Enqueue(waiter_);
return true; return true;
} }
void WaitableEventWatcher::StopWatching() { void WaitableEventWatcher::StopWatching() {
callback_.Reset(); DCHECK(sequence_checker_.CalledOnValidSequence());
if (message_loop_) {
message_loop_->RemoveDestructionObserver(this);
message_loop_ = NULL;
}
if (!cancel_flag_.get()) // if not currently watching... if (!cancel_flag_.get()) // if not currently watching...
return; return;
...@@ -227,44 +210,24 @@ void WaitableEventWatcher::StopWatching() { ...@@ -227,44 +210,24 @@ void WaitableEventWatcher::StopWatching() {
// have been enqueued with the MessageLoop because the waiter was never // have been enqueued with the MessageLoop because the waiter was never
// signaled) // signaled)
delete waiter_; delete waiter_;
internal_callback_.Reset();
cancel_flag_ = NULL; cancel_flag_ = NULL;
return; return;
} }
// Case 3: the waiter isn't on the wait-list, thus it was signaled. It may // Case 3: the waiter isn't on the wait-list, thus it was signaled. It may not
// not have run yet, so we set the flag to tell it not to bother enqueuing the // have run yet, so we set the flag to tell it not to bother enqueuing the
// task on the MessageLoop, but to delete it instead. The Waiter deletes // task on the SequencedTaskRunner, but to delete it instead. The Waiter
// itself once run. // deletes itself once run.
cancel_flag_->Set(); cancel_flag_->Set();
cancel_flag_ = NULL; cancel_flag_ = NULL;
// If the waiter has already run then the task has been enqueued. If the Task // If the waiter has already run then the task has been enqueued. If the Task
// hasn't yet run, the flag will stop the delegate from getting called. (This // hasn't yet run, the flag will stop the delegate from getting called. (This
// is thread safe because one may only delete a Handle from the MessageLoop // is thread safe because one may only delete a Handle from the sequence that
// thread.) // called StartWatching()).
// //
// If the delegate has already been called then we have nothing to do. The // If the delegate has already been called then we have nothing to do. The
// task has been deleted by the MessageLoop. // task has been deleted by the MessageLoop.
} }
WaitableEvent* WaitableEventWatcher::GetWatchedEvent() {
if (!cancel_flag_.get())
return NULL;
if (cancel_flag_->value())
return NULL;
return event_;
}
// -----------------------------------------------------------------------------
// This is called when the MessageLoop which the callback will be run it is
// deleted. We need to cancel the callback as if we had been deleted, but we
// will still be deleted at some point in the future.
// -----------------------------------------------------------------------------
void WaitableEventWatcher::WillDestroyCurrentMessageLoop() {
StopWatching();
}
} // namespace base } // namespace base
...@@ -52,16 +52,11 @@ void RunTest_BasicSignal(MessageLoop::Type message_loop_type) { ...@@ -52,16 +52,11 @@ void RunTest_BasicSignal(MessageLoop::Type message_loop_type) {
WaitableEvent::InitialState::NOT_SIGNALED); WaitableEvent::InitialState::NOT_SIGNALED);
WaitableEventWatcher watcher; WaitableEventWatcher watcher;
EXPECT_TRUE(watcher.GetWatchedEvent() == NULL);
watcher.StartWatching(&event, Bind(&QuitWhenSignaled)); watcher.StartWatching(&event, Bind(&QuitWhenSignaled));
EXPECT_EQ(&event, watcher.GetWatchedEvent());
event.Signal(); event.Signal();
RunLoop().Run(); RunLoop().Run();
EXPECT_TRUE(watcher.GetWatchedEvent() == NULL);
} }
void RunTest_BasicCancel(MessageLoop::Type message_loop_type) { void RunTest_BasicCancel(MessageLoop::Type message_loop_type) {
......
...@@ -10,9 +10,7 @@ ...@@ -10,9 +10,7 @@
namespace base { namespace base {
WaitableEventWatcher::WaitableEventWatcher() WaitableEventWatcher::WaitableEventWatcher() = default;
: event_(NULL) {
}
WaitableEventWatcher::~WaitableEventWatcher() { WaitableEventWatcher::~WaitableEventWatcher() {
} }
...@@ -31,10 +29,6 @@ void WaitableEventWatcher::StopWatching() { ...@@ -31,10 +29,6 @@ void WaitableEventWatcher::StopWatching() {
watcher_.StopWatching(); watcher_.StopWatching();
} }
WaitableEvent* WaitableEventWatcher::GetWatchedEvent() {
return event_;
}
void WaitableEventWatcher::OnObjectSignaled(HANDLE h) { void WaitableEventWatcher::OnObjectSignaled(HANDLE h) {
WaitableEvent* event = event_; WaitableEvent* event = event_;
EventCallback callback = callback_; EventCallback callback = callback_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment