Commit d54c0c1e authored by Robert Sesek's avatar Robert Sesek Committed by Commit Bot

Reimplement base::WaitableEvent with Mach messaging on Mac.

A WaitableEvent is now a Mach port with a message queue length of one.
Signaling the event enqueues an empty message on the port, or times out
if one is already queued. Waiting on the event receives a message on the
port, dequeuing it if the event is auto-reset, or just peeking it if the
event is manual-reset.

WaitMany is implemented by adding all the events' ports to a port set and
receiving on it.

WaitableEventWatcher is implemented using a TYPE_MACH_RECV dispatch source
for auto-reset events and manual-reset events on 10.12. For manual-reset
events on macOS prior to 10.12, a lock-protected list of watcher callbacks is
used instead of dispatch, because dispatch does not provide timely and
reliable invocation callbacks if the event were to be Reset() immediately after
Signal().

Bug: 681167
Change-Id: I22a9294ad0ae8900d16716d8033285fe91510eda
Reviewed-on: https://chromium-review.googlesource.com/592516
Commit-Queue: Robert Sesek <rsesek@chromium.org>
Reviewed-by: default avatarMark Mentovai <mark@chromium.org>
Cr-Commit-Position: refs/heads/master@{#491860}
parent aed698af
...@@ -761,8 +761,10 @@ component("base") { ...@@ -761,8 +761,10 @@ component("base") {
"synchronization/read_write_lock_win.cc", "synchronization/read_write_lock_win.cc",
"synchronization/spin_wait.h", "synchronization/spin_wait.h",
"synchronization/waitable_event.h", "synchronization/waitable_event.h",
"synchronization/waitable_event_mac.cc",
"synchronization/waitable_event_posix.cc", "synchronization/waitable_event_posix.cc",
"synchronization/waitable_event_watcher.h", "synchronization/waitable_event_watcher.h",
"synchronization/waitable_event_watcher_mac.cc",
"synchronization/waitable_event_watcher_posix.cc", "synchronization/waitable_event_watcher_posix.cc",
"synchronization/waitable_event_watcher_win.cc", "synchronization/waitable_event_watcher_win.cc",
"synchronization/waitable_event_win.cc", "synchronization/waitable_event_win.cc",
...@@ -1499,6 +1501,8 @@ component("base") { ...@@ -1499,6 +1501,8 @@ component("base") {
"memory/shared_memory_posix.cc", "memory/shared_memory_posix.cc",
"native_library_posix.cc", "native_library_posix.cc",
"strings/sys_string_conversions_posix.cc", "strings/sys_string_conversions_posix.cc",
"synchronization/waitable_event_posix.cc",
"synchronization/waitable_event_watcher_posix.cc",
"threading/platform_thread_internal_posix.cc", "threading/platform_thread_internal_posix.cc",
] ]
...@@ -1620,6 +1624,8 @@ component("base") { ...@@ -1620,6 +1624,8 @@ component("base") {
"power_monitor/power_monitor_device_source_ios.mm", "power_monitor/power_monitor_device_source_ios.mm",
"process/memory_stubs.cc", "process/memory_stubs.cc",
"strings/sys_string_conversions_mac.mm", "strings/sys_string_conversions_mac.mm",
"synchronization/waitable_event_mac.cc",
"synchronization/waitable_event_watcher_mac.cc",
"threading/platform_thread_mac.mm", "threading/platform_thread_mac.mm",
"time/time_conversion_posix.cc", "time/time_conversion_posix.cc",
"time/time_mac.cc", "time/time_mac.cc",
......
...@@ -13,11 +13,20 @@ ...@@ -13,11 +13,20 @@
#if defined(OS_WIN) #if defined(OS_WIN)
#include "base/win/scoped_handle.h" #include "base/win/scoped_handle.h"
#endif #elif defined(OS_MACOSX)
#include <mach/mach.h>
#include <list>
#include <memory>
#if defined(OS_POSIX) #include "base/callback_forward.h"
#include "base/mac/scoped_mach_port.h"
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#elif defined(OS_POSIX)
#include <list> #include <list>
#include <utility> #include <utility>
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#endif #endif
...@@ -154,6 +163,72 @@ class BASE_EXPORT WaitableEvent { ...@@ -154,6 +163,72 @@ class BASE_EXPORT WaitableEvent {
#if defined(OS_WIN) #if defined(OS_WIN)
win::ScopedHandle handle_; win::ScopedHandle handle_;
#elif defined(OS_MACOSX)
// Prior to macOS 10.12, a TYPE_MACH_RECV dispatch source may not be invoked
// immediately. If a WaitableEventWatcher is used on a manual-reset event,
// and another thread that is Wait()ing on the event calls Reset()
// immediately after waking up, the watcher may not receive the callback.
// On macOS 10.12 and higher, dispatch delivery is reliable. But for OSes
// prior, a lock-protected list of callbacks is used for manual-reset event
// watchers. Automatic-reset events are not prone to this issue, since the
// first thread to wake will claim the event.
static bool UseSlowWatchList(ResetPolicy policy);
// Peeks the message queue named by |port| and returns true if a message
// is present and false if not. If |dequeue| is true, the messsage will be
// drained from the queue. If |dequeue| is false, the queue will only be
// peeked. |port| must be a receive right.
static bool PeekPort(mach_port_t port, bool dequeue);
// The Mach receive right is waited on by both WaitableEvent and
// WaitableEventWatcher. It is valid to signal and then delete an event, and
// a watcher should still be notified. If the right were to be destroyed
// immediately, the watcher would not receive the signal. Because Mach
// receive rights cannot have a user refcount greater than one, the right
// must be reference-counted manually.
class ReceiveRight : public RefCountedThreadSafe<ReceiveRight> {
public:
ReceiveRight(mach_port_t name, bool create_slow_watch_list);
mach_port_t Name() const { return right_.get(); };
// This structure is used iff UseSlowWatchList() is true. See the comment
// in Signal() for details.
struct WatchList {
WatchList();
~WatchList();
// The lock protects a list of closures to be run when the event is
// Signal()ed. The closures are invoked on the signaling thread, so they
// must be safe to be called from any thread.
Lock lock;
std::list<OnceClosure> list;
};
WatchList* SlowWatchList() const { return slow_watch_list_.get(); }
private:
friend class RefCountedThreadSafe<ReceiveRight>;
~ReceiveRight();
mac::ScopedMachReceiveRight right_;
// This is allocated iff UseSlowWatchList() is true. It is created on the
// heap to avoid performing initialization when not using the slow path.
std::unique_ptr<WatchList> slow_watch_list_;
DISALLOW_COPY_AND_ASSIGN(ReceiveRight);
};
const ResetPolicy policy_;
// The receive right for the event.
scoped_refptr<ReceiveRight> receive_right_;
// The send right used to signal the event. This can be disposed of with
// the event, unlike the receive right, since a deleted event cannot be
// signaled.
mac::ScopedMachSendRight send_right_;
#else #else
// On Windows, you must not close a HANDLE which is currently being waited on. // On Windows, you must not close a HANDLE which is currently being waited on.
// The MSDN documentation says that the resulting behaviour is 'undefined'. // The MSDN documentation says that the resulting behaviour is 'undefined'.
......
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/synchronization/waitable_event.h"
#include <mach/mach.h>
#include "base/debug/activity_tracker.h"
#include "base/mac/mac_util.h"
#include "base/mac/mach_logging.h"
#include "base/threading/thread_restrictions.h"
#include "build/build_config.h"
namespace base {
WaitableEvent::WaitableEvent(ResetPolicy reset_policy,
InitialState initial_state)
: policy_(reset_policy) {
mach_port_options_t options{};
options.flags = MPO_INSERT_SEND_RIGHT;
options.mpl.mpl_qlimit = 1;
mach_port_t name;
kern_return_t kr = mach_port_construct(mach_task_self(), &options, 0, &name);
MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_construct";
receive_right_ = new ReceiveRight(name, UseSlowWatchList(policy_));
send_right_.reset(name);
if (initial_state == InitialState::SIGNALED)
Signal();
}
WaitableEvent::~WaitableEvent() = default;
void WaitableEvent::Reset() {
PeekPort(receive_right_->Name(), true);
}
void WaitableEvent::Signal() {
// If using the slow watch-list, copy the watchers to a local. After
// mach_msg(), the event object may be deleted by an awoken thread.
const bool use_slow_path = UseSlowWatchList(policy_);
ReceiveRight* receive_right = nullptr; // Manually reference counted.
std::unique_ptr<std::list<OnceClosure>> watch_list;
if (use_slow_path) {
// To avoid a race condition of a WaitableEventWatcher getting added
// while another thread is in this method, hold the watch-list lock for
// the duration of mach_msg(). This requires ref-counting the
// |receive_right_| object that contains it, in case the event is deleted
// by a waiting thread after mach_msg().
receive_right = receive_right_.get();
receive_right->AddRef();
ReceiveRight::WatchList* slow_watch_list = receive_right->SlowWatchList();
slow_watch_list->lock.Acquire();
if (!slow_watch_list->list.empty()) {
watch_list.reset(new std::list<OnceClosure>());
std::swap(*watch_list, slow_watch_list->list);
}
}
mach_msg_empty_send_t msg{};
msg.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
msg.header.msgh_size = sizeof(&msg);
msg.header.msgh_remote_port = send_right_.get();
// If the event is already signaled, this will time out because the queue
// has a length of one.
kern_return_t kr =
mach_msg(&msg.header, MACH_SEND_MSG | MACH_SEND_TIMEOUT, sizeof(msg), 0,
MACH_PORT_NULL, 0, MACH_PORT_NULL);
MACH_CHECK(kr == KERN_SUCCESS || kr == MACH_SEND_TIMED_OUT, kr) << "mach_msg";
if (use_slow_path) {
// If a WaitableEventWatcher were to start watching when the event is
// signaled, it runs the callback immediately without adding it to the
// list. Therefore the watch list can only be non-empty if the event is
// newly signaled.
if (watch_list.get()) {
MACH_CHECK(kr == KERN_SUCCESS, kr);
for (auto& watcher : *watch_list) {
std::move(watcher).Run();
}
}
receive_right->SlowWatchList()->lock.Release();
receive_right->Release();
}
}
bool WaitableEvent::IsSignaled() {
return PeekPort(receive_right_->Name(), policy_ == ResetPolicy::AUTOMATIC);
}
void WaitableEvent::Wait() {
bool result = TimedWaitUntil(TimeTicks::Max());
DCHECK(result) << "TimedWait() should never fail with infinite timeout";
}
bool WaitableEvent::TimedWait(const TimeDelta& wait_delta) {
return TimedWaitUntil(TimeTicks::Now() + wait_delta);
}
bool WaitableEvent::TimedWaitUntil(const TimeTicks& end_time) {
ThreadRestrictions::AssertWaitAllowed();
// Record the event that this thread is blocking upon (for hang diagnosis).
debug::ScopedEventWaitActivity event_activity(this);
TimeDelta wait_time = end_time - TimeTicks::Now();
if (wait_time < TimeDelta()) {
// A negative delta would be treated by the system as indefinite, but
// it needs to be treated as a poll instead.
wait_time = TimeDelta();
}
mach_msg_empty_rcv_t msg{};
msg.header.msgh_local_port = receive_right_->Name();
mach_msg_option_t options = MACH_RCV_MSG;
mach_msg_timeout_t timeout = 0;
if (!end_time.is_max()) {
options |= MACH_RCV_TIMEOUT;
timeout = wait_time.InMillisecondsRoundedUp();
}
mach_msg_size_t rcv_size = sizeof(msg);
if (policy_ == ResetPolicy::MANUAL) {
// To avoid dequeing the message, receive with a size of 0 and set
// MACH_RCV_LARGE to keep the message in the queue.
options |= MACH_RCV_LARGE;
rcv_size = 0;
}
kern_return_t kr = mach_msg(&msg.header, options, 0, rcv_size,
receive_right_->Name(), timeout, MACH_PORT_NULL);
if (kr == KERN_SUCCESS) {
return true;
} else if (rcv_size == 0 && kr == MACH_RCV_TOO_LARGE) {
return true;
} else {
MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
return false;
}
}
// static
bool WaitableEvent::UseSlowWatchList(ResetPolicy policy) {
#if defined(OS_IOS)
const bool use_slow_path = false;
#else
static bool use_slow_path = !mac::IsAtLeastOS10_12();
#endif
return policy == ResetPolicy::MANUAL && use_slow_path;
}
// static
size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, size_t count) {
ThreadRestrictions::AssertWaitAllowed();
DCHECK(count) << "Cannot wait on no events";
kern_return_t kr;
mac::ScopedMachPortSet port_set;
{
mach_port_t name;
kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &name);
MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
port_set.reset(name);
}
for (size_t i = 0; i < count; ++i) {
kr = mach_port_insert_member(mach_task_self(),
raw_waitables[i]->receive_right_->Name(),
port_set.get());
MACH_CHECK(kr == KERN_SUCCESS, kr) << "index " << i;
}
mach_msg_empty_rcv_t msg{};
// Wait on the port set. Only specify space enough for the header, to
// identify which port in the set is signaled. Otherwise, receiving from the
// port set may dequeue a message for a manual-reset event object, which
// would cause it to be reset.
kr = mach_msg(&msg.header,
MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY, 0,
sizeof(msg.header), port_set.get(), 0, MACH_PORT_NULL);
MACH_CHECK(kr == MACH_RCV_TOO_LARGE, kr) << "mach_msg";
for (size_t i = 0; i < count; ++i) {
WaitableEvent* event = raw_waitables[i];
if (msg.header.msgh_local_port == event->receive_right_->Name()) {
if (event->policy_ == ResetPolicy::AUTOMATIC) {
// The message needs to be dequeued to reset the event.
PeekPort(msg.header.msgh_local_port, true);
}
return i;
}
}
NOTREACHED();
return 0;
}
// static
bool WaitableEvent::PeekPort(mach_port_t port, bool dequeue) {
if (dequeue) {
mach_msg_empty_rcv_t msg{};
msg.header.msgh_local_port = port;
kern_return_t kr = mach_msg(&msg.header, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0,
sizeof(msg), port, 0, MACH_PORT_NULL);
if (kr == KERN_SUCCESS) {
return true;
} else {
MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
return false;
}
} else {
mach_port_seqno_t seqno = 0;
mach_msg_size_t size;
mach_msg_id_t id;
mach_msg_trailer_t trailer;
mach_msg_type_number_t trailer_size = sizeof(trailer);
kern_return_t kr = mach_port_peek(
mach_task_self(), port, MACH_RCV_TRAILER_TYPE(MACH_RCV_TRAILER_NULL),
&seqno, &size, &id, reinterpret_cast<mach_msg_trailer_info_t>(&trailer),
&trailer_size);
if (kr == KERN_SUCCESS) {
return true;
} else {
MACH_CHECK(kr == KERN_FAILURE, kr) << "mach_port_peek";
return false;
}
}
}
WaitableEvent::ReceiveRight::ReceiveRight(mach_port_t name,
bool create_slow_watch_list)
: right_(name),
slow_watch_list_(create_slow_watch_list ? new WatchList() : nullptr) {}
WaitableEvent::ReceiveRight::~ReceiveRight() = default;
WaitableEvent::ReceiveRight::WatchList::WatchList() = default;
WaitableEvent::ReceiveRight::WatchList::~WatchList() = default;
} // namespace base
...@@ -12,12 +12,21 @@ ...@@ -12,12 +12,21 @@
#if defined(OS_WIN) #if defined(OS_WIN)
#include "base/win/object_watcher.h" #include "base/win/object_watcher.h"
#include "base/win/scoped_handle.h" #include "base/win/scoped_handle.h"
#elif defined(OS_MACOSX)
#include <dispatch/dispatch.h>
#include "base/mac/scoped_dispatch_object.h"
#include "base/memory/weak_ptr.h"
#include "base/synchronization/waitable_event.h"
#else #else
#include "base/callback.h"
#include "base/sequence_checker.h" #include "base/sequence_checker.h"
#include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event.h"
#endif #endif
#if !defined(OS_WIN)
#include "base/callback.h"
#endif
namespace base { namespace base {
class Flag; class Flag;
...@@ -101,6 +110,27 @@ class BASE_EXPORT WaitableEventWatcher ...@@ -101,6 +110,27 @@ class BASE_EXPORT WaitableEventWatcher
EventCallback callback_; EventCallback callback_;
WaitableEvent* event_ = nullptr; WaitableEvent* event_ = nullptr;
#elif defined(OS_MACOSX)
// Invokes the callback and resets the source. Must be called on the task
// runner on which StartWatching() was called.
void InvokeCallback();
// Closure bound to the event being watched. This will be is_null() if
// nothing is being watched.
OnceClosure callback_;
// A reference to the receive right that is kept alive while a watcher
// is waiting. Null if no event is being watched.
scoped_refptr<WaitableEvent::ReceiveRight> receive_right_;
// A TYPE_MACH_RECV dispatch source on |receive_right_|. When a receive event
// is delivered, the message queue will be peeked and the bound |callback_|
// may be run. This will be null if nothing is currently being watched.
ScopedDispatchObject<dispatch_source_t> source_;
// Used to vend a weak pointer for calling InvokeCallback() from the
// |source_| event handler.
WeakPtrFactory<WaitableEventWatcher> weak_ptr_factory_;
#else #else
// Instantiated in StartWatching(). Set before the callback runs. Reset in // Instantiated in StartWatching(). Set before the callback runs. Reset in
// StopWatching() or StartWatching(). // StopWatching() or StartWatching().
......
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/synchronization/waitable_event_watcher.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/threading/sequenced_task_runner_handle.h"
namespace base {
WaitableEventWatcher::WaitableEventWatcher() : weak_ptr_factory_(this) {}
WaitableEventWatcher::~WaitableEventWatcher() {
StopWatching();
}
bool WaitableEventWatcher::StartWatching(WaitableEvent* event,
EventCallback callback) {
DCHECK(!source_ || dispatch_source_testcancel(source_));
// Keep a reference to the receive right, so that if the event is deleted
// out from under the watcher, a signal can still be observed.
receive_right_ = event->receive_right_;
callback_ = BindOnce(std::move(callback), event);
// Locals for capture by the block. Accessing anything through the |this| or
// |event| pointers is not safe, since either may have been deleted by the
// time the handler block is invoked.
scoped_refptr<SequencedTaskRunner> task_runner =
SequencedTaskRunnerHandle::Get();
WeakPtr<WaitableEventWatcher> weak_this = weak_ptr_factory_.GetWeakPtr();
const bool auto_reset =
event->policy_ == WaitableEvent::ResetPolicy::AUTOMATIC;
// Auto-reset events always use a dispatch source. Manual-reset events
// only do so if dispatch provides reliable delivery, otherwise a manual
// watcher list is used.
if (!WaitableEvent::UseSlowWatchList(event->policy_)) {
// Use the global concurrent queue here, since it is only used to thunk
// to the real callback on the target task runner.
source_.reset(dispatch_source_create(
DISPATCH_SOURCE_TYPE_MACH_RECV, receive_right_->Name(), 0,
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)));
// Additional locals for block capture.
dispatch_source_t source = source_.get();
mach_port_t name = receive_right_->Name();
dispatch_source_set_event_handler(source_, ^{
// For automatic-reset events, only fire the callback if this watcher
// can claim/dequeue the event. For manual-reset events, all watchers can
// be called back.
if (auto_reset && !WaitableEvent::PeekPort(name, true)) {
return;
}
// The event has been consumed. A watcher is one-shot, so cancel the
// source to prevent receiving future event signals.
dispatch_source_cancel(source);
task_runner->PostTask(
FROM_HERE,
BindOnce(&WaitableEventWatcher::InvokeCallback, weak_this));
});
dispatch_resume(source_);
} else {
// The |event->watch_list_| closures can be run from any thread, so bind
// the callback as an invocation of PostTask.
OnceClosure watcher =
BindOnce(IgnoreResult(&TaskRunner::PostTask), task_runner, FROM_HERE,
BindOnce(&WaitableEventWatcher::InvokeCallback, weak_this));
// Hold an additional reference to the ReceiveRight, in case |watcher|
// runs and deletes the event while the lock is held.
// Hold the lock for the duration of IsSignaled() so that if Signal()
// is called by another thread, it waits for this to be added to the
// watch list.
scoped_refptr<WaitableEvent::ReceiveRight> receive_right(receive_right_);
AutoLock lock(receive_right->SlowWatchList()->lock);
if (event->IsSignaled()) {
std::move(watcher).Run();
return true;
}
receive_right_->SlowWatchList()->list.push_back(std::move(watcher));
}
return true;
}
void WaitableEventWatcher::StopWatching() {
callback_.Reset();
receive_right_ = nullptr;
if (source_) {
dispatch_source_cancel(source_);
source_.reset();
}
}
void WaitableEventWatcher::InvokeCallback() {
// The callback can be null if StopWatching() is called between signaling
// and the |callback_| getting run on the target task runner.
if (callback_.is_null())
return;
source_.reset();
receive_right_ = nullptr;
std::move(callback_).Run();
}
} // namespace base
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment