Commit 4b7b3265 authored by Peter Kasting's avatar Peter Kasting Committed by Commit Bot

Convert SyncHandleRegistry to use a CallbackList.

This changes the API to look like CallbackList's: instead of calling
UnregisterEvent(), callers destroy a subscription object they get from
registering.  This is a bit easier for callers to deal with.  However,
the necessity of also synchronously clearing the event from the WaitSet
makes the implementation a bit irritating, since while CallbackList does
provide a hook to be called back on callback removal, that happens after
notification is finished, rather than right at subscription destruction.
Instead of using the hook, wrap the CallbackList::Subscriptions in an
object that will also remove the WaitSet item if need be.

Bug: none
Change-Id: Ideb74b1435bfaad1647324e5ba5f90976c268e2e
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2343956
Commit-Queue: Peter Kasting <pkasting@chromium.org>
Reviewed-by: default avatarKen Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/master@{#796728}
parent a482e677
...@@ -667,28 +667,27 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, ...@@ -667,28 +667,27 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
while (true) { while (true) {
bool dispatch = false; bool dispatch = false;
bool send_done = false;
bool should_pump_messages = false; bool should_pump_messages = false;
base::RepeatingClosure on_send_done_callback = {
base::BindRepeating(&OnEventReady, &send_done); bool send_done = false;
registry->RegisterEvent(context->GetSendDoneEvent(), on_send_done_callback); mojo::SyncHandleRegistry::EventCallbackSubscription
send_done_subscription = registry->RegisterEvent(
base::RepeatingClosure on_pump_messages_callback; context->GetSendDoneEvent(),
if (pump_messages_event) { base::BindRepeating(&OnEventReady, &send_done));
on_pump_messages_callback =
base::BindRepeating(&OnEventReady, &should_pump_messages); mojo::SyncHandleRegistry::EventCallbackSubscription
registry->RegisterEvent(pump_messages_event, on_pump_messages_callback); pump_messages_subsciption;
} if (pump_messages_event) {
pump_messages_subsciption = registry->RegisterEvent(
const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; pump_messages_event,
context->received_sync_msgs()->BlockDispatch(&dispatch); base::BindRepeating(&OnEventReady, &should_pump_messages));
registry->Wait(stop_flags, 3); }
context->received_sync_msgs()->UnblockDispatch();
registry->UnregisterEvent(context->GetSendDoneEvent(), const bool* stop_flags[] = {&dispatch, &send_done, &should_pump_messages};
on_send_done_callback); context->received_sync_msgs()->BlockDispatch(&dispatch);
if (pump_messages_event) registry->Wait(stop_flags, 3);
registry->UnregisterEvent(pump_messages_event, on_pump_messages_callback); context->received_sync_msgs()->UnblockDispatch();
}
if (dispatch) { if (dispatch) {
// We're waiting for a reply, but we received a blocking synchronous call. // We're waiting for a reply, but we received a blocking synchronous call.
......
...@@ -69,25 +69,26 @@ bool SyncMessageFilter::Send(Message* message) { ...@@ -69,25 +69,26 @@ bool SyncMessageFilter::Send(Message* message) {
} }
} }
bool done = false; {
bool shutdown = false; bool done = false;
scoped_refptr<mojo::SyncHandleRegistry> registry = bool shutdown = false;
mojo::SyncHandleRegistry::current(); scoped_refptr<mojo::SyncHandleRegistry> registry =
auto on_shutdown_callback = base::BindRepeating(&OnEventReady, &shutdown); mojo::SyncHandleRegistry::current();
auto on_done_callback = base::BindRepeating(&OnEventReady, &done); mojo::SyncHandleRegistry::EventCallbackSubscription shutdown_subscription =
registry->RegisterEvent(shutdown_event_, on_shutdown_callback); registry->RegisterEvent(shutdown_event_,
registry->RegisterEvent(&done_event, on_done_callback); base::BindRepeating(&OnEventReady, &shutdown));
mojo::SyncHandleRegistry::EventCallbackSubscription done_subscription =
const bool* stop_flags[] = { &done, &shutdown }; registry->RegisterEvent(&done_event,
registry->Wait(stop_flags, 2); base::BindRepeating(&OnEventReady, &done));
if (done) {
TRACE_EVENT_FLOW_END0("toplevel.flow", "SyncMessageFilter::Send", const bool* stop_flags[] = {&done, &shutdown};
&done_event); registry->Wait(stop_flags, 2);
if (done) {
TRACE_EVENT_FLOW_END0("toplevel.flow", "SyncMessageFilter::Send",
&done_event);
}
} }
registry->UnregisterEvent(shutdown_event_, on_shutdown_callback);
registry->UnregisterEvent(&done_event, on_done_callback);
{ {
base::AutoLock auto_lock(lock_); base::AutoLock auto_lock(lock_);
delete pending_message.deserializer; delete pending_message.deserializer;
......
...@@ -20,8 +20,6 @@ SyncEventWatcher::SyncEventWatcher(base::WaitableEvent* event, ...@@ -20,8 +20,6 @@ SyncEventWatcher::SyncEventWatcher(base::WaitableEvent* event,
SyncEventWatcher::~SyncEventWatcher() { SyncEventWatcher::~SyncEventWatcher() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (registered_)
registry_->UnregisterEvent(event_, callback_);
destroyed_->data = true; destroyed_->data = true;
} }
...@@ -34,10 +32,6 @@ bool SyncEventWatcher::SyncWatch(const bool** stop_flags, ...@@ -34,10 +32,6 @@ bool SyncEventWatcher::SyncWatch(const bool** stop_flags,
size_t num_stop_flags) { size_t num_stop_flags) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
IncrementRegisterCount(); IncrementRegisterCount();
if (!registered_) {
DecrementRegisterCount();
return false;
}
// This object may be destroyed during the Wait() call. So we have to preserve // This object may be destroyed during the Wait() call. So we have to preserve
// the boolean that Wait uses. // the boolean that Wait uses.
...@@ -60,20 +54,14 @@ bool SyncEventWatcher::SyncWatch(const bool** stop_flags, ...@@ -60,20 +54,14 @@ bool SyncEventWatcher::SyncWatch(const bool** stop_flags,
} }
void SyncEventWatcher::IncrementRegisterCount() { void SyncEventWatcher::IncrementRegisterCount() {
register_request_count_++; if (register_request_count_++ == 0)
if (!registered_) { subscription_ = registry_->RegisterEvent(event_, callback_);
registry_->RegisterEvent(event_, callback_);
registered_ = true;
}
} }
void SyncEventWatcher::DecrementRegisterCount() { void SyncEventWatcher::DecrementRegisterCount() {
DCHECK_GT(register_request_count_, 0u); DCHECK_GT(register_request_count_, 0u);
register_request_count_--; if (--register_request_count_ == 0)
if (register_request_count_ == 0 && registered_) { subscription_.reset();
registry_->UnregisterEvent(event_, callback_);
registered_ = false;
}
} }
} // namespace mojo } // namespace mojo
...@@ -5,7 +5,9 @@ ...@@ -5,7 +5,9 @@
#include "mojo/public/cpp/bindings/sync_handle_registry.h" #include "mojo/public/cpp/bindings/sync_handle_registry.h"
#include <algorithm> #include <algorithm>
#include <utility>
#include "base/auto_reset.h"
#include "base/check_op.h" #include "base/check_op.h"
#include "base/no_destructor.h" #include "base/no_destructor.h"
#include "base/stl_util.h" #include "base/stl_util.h"
...@@ -15,6 +17,19 @@ ...@@ -15,6 +17,19 @@
namespace mojo { namespace mojo {
SyncHandleRegistry::Subscription::Subscription(base::OnceClosure remove_closure,
EventCallbackList* callbacks,
EventCallback event_callback)
: remove_runner_(std::move(remove_closure)),
subscription_(callbacks->Add(std::move(event_callback))) {}
SyncHandleRegistry::Subscription::Subscription(Subscription&&) = default;
SyncHandleRegistry::Subscription& SyncHandleRegistry::Subscription::operator=(
Subscription&&) = default;
SyncHandleRegistry::Subscription::~Subscription() = default;
// static // static
scoped_refptr<SyncHandleRegistry> SyncHandleRegistry::current() { scoped_refptr<SyncHandleRegistry> SyncHandleRegistry::current() {
static base::NoDestructor< static base::NoDestructor<
...@@ -57,11 +72,12 @@ void SyncHandleRegistry::UnregisterHandle(const Handle& handle) { ...@@ -57,11 +72,12 @@ void SyncHandleRegistry::UnregisterHandle(const Handle& handle) {
handles_.erase(handle); handles_.erase(handle);
} }
void SyncHandleRegistry::RegisterEvent(base::WaitableEvent* event, SyncHandleRegistry::EventCallbackSubscription SyncHandleRegistry::RegisterEvent(
base::RepeatingClosure callback) { base::WaitableEvent* event,
EventCallback callback) {
auto it = events_.find(event); auto it = events_.find(event);
if (it == events_.end()) { if (it == events_.end()) {
auto result = events_.emplace(event, EventCallbackList{}); auto result = events_.emplace(event, std::make_unique<EventCallbackList>());
it = result.first; it = result.first;
} }
...@@ -70,43 +86,29 @@ void SyncHandleRegistry::RegisterEvent(base::WaitableEvent* event, ...@@ -70,43 +86,29 @@ void SyncHandleRegistry::RegisterEvent(base::WaitableEvent* event,
// callbacks to see if any are valid. // callbacks to see if any are valid.
wait_set_.AddEvent(event); wait_set_.AddEvent(event);
it->second.container().push_back(std::move(callback)); // Return an object that will synchronously clear the entry for |event| when
} // its last callback is destroyed.
const auto remove_closure = [](EventCallbackList* callbacks,
void SyncHandleRegistry::UnregisterEvent(base::WaitableEvent* event, WaitSet* wait_set,
base::RepeatingClosure callback) { base::WaitableEvent* event) {
auto it = events_.find(event); // |callbacks| is guaranteed to be valid here. The callbacks are repeating
if (it == events_.end()) // and are thus only removed by their subscriptions being destroyed, so it's
return; // impossible for empty() to be true until the last subscription has been
// destroyed. Since Wait() only deletes a callback list once it's empty,
bool has_valid_callbacks = false; // and this callback runs synchronously with subscription destruction, it's
auto& callbacks = it->second.container(); // impossible for |callbacks| to be deleted before this gets to run at the
if (is_dispatching_event_callbacks_) { // destruction of the last remaining subscription.
// Not safe to remove any elements from |callbacks| here since an outer if (callbacks->empty()) {
// stack frame is currently iterating over it in Wait(). // If this was the last callback registered for |event|, ensure that it's
for (auto& cb : callbacks) { // removed from the WaitSet before returning. Otherwise a nested Wait()
if (cb == callback) // call inside the scope that destroys the subscription will fail.
cb.Reset(); const MojoResult rv = wait_set->RemoveEvent(event);
else if (cb) DCHECK_EQ(MOJO_RESULT_OK, rv);
has_valid_callbacks = true;
} }
remove_invalid_event_callbacks_after_dispatch_ = true; };
} else { return std::make_unique<Subscription>(
callbacks.erase(std::remove(callbacks.begin(), callbacks.end(), callback), base::BindOnce(remove_closure, it->second.get(), &wait_set_, event),
callbacks.end()); it->second.get(), std::move(callback));
if (callbacks.empty())
events_.erase(it);
else
has_valid_callbacks = true;
}
if (!has_valid_callbacks) {
// Regardless of whether or not we're nested within a Wait(), we need to
// ensure that |event| is removed from the WaitSet before returning if this
// was the last callback registered for it.
MojoResult rv = wait_set_.RemoveEvent(event);
DCHECK_EQ(MOJO_RESULT_OK, rv);
}
} }
bool SyncHandleRegistry::Wait(const bool* should_stop[], size_t count) { bool SyncHandleRegistry::Wait(const bool* should_stop[], size_t count) {
...@@ -138,28 +140,18 @@ bool SyncHandleRegistry::Wait(const bool* should_stop[], size_t count) { ...@@ -138,28 +140,18 @@ bool SyncHandleRegistry::Wait(const bool* should_stop[], size_t count) {
if (ready_event) { if (ready_event) {
const auto iter = events_.find(ready_event); const auto iter = events_.find(ready_event);
DCHECK(iter != events_.end()); DCHECK(iter != events_.end());
bool was_dispatching_event_callbacks = is_dispatching_event_callbacks_;
is_dispatching_event_callbacks_ = true; {
base::AutoReset<bool> in_nested_wait(&in_nested_wait_, true);
// NOTE: It's possible for the container to be extended by any of these iter->second->Notify();
// callbacks if they call RegisterEvent, so we are careful to iterate by
// index. Also note that conversely, elements cannot be *removed* from the
// container, by any of these callbacks, so it is safe to assume the size
// only stays the same or increases, with no elements changing position.
auto& callbacks = iter->second.container();
for (size_t i = 0; i < callbacks.size(); ++i) {
auto& callback = callbacks[i];
if (callback)
callback.Run();
} }
is_dispatching_event_callbacks_ = was_dispatching_event_callbacks; // Notify() above may have both added and removed event registrations, for
if (!was_dispatching_event_callbacks && // any event. If we're in the outermost frame, prune any empty map
remove_invalid_event_callbacks_after_dispatch_) { // entries to avoid unbounded growth.
// If we've had events unregistered within any callback dispatch, now is if (!in_nested_wait_) {
// a good time to prune them from the map. base::EraseIf(events_,
RemoveInvalidEventCallbacks(); [](const auto& entry) { return entry.second->empty(); });
remove_invalid_event_callbacks_after_dispatch_ = false;
} }
} }
}; };
...@@ -171,19 +163,4 @@ SyncHandleRegistry::SyncHandleRegistry() = default; ...@@ -171,19 +163,4 @@ SyncHandleRegistry::SyncHandleRegistry() = default;
SyncHandleRegistry::~SyncHandleRegistry() = default; SyncHandleRegistry::~SyncHandleRegistry() = default;
void SyncHandleRegistry::RemoveInvalidEventCallbacks() {
for (auto it = events_.begin(); it != events_.end();) {
auto& callbacks = it->second.container();
callbacks.erase(std::remove_if(callbacks.begin(), callbacks.end(),
[](const base::RepeatingClosure& callback) {
return !callback;
}),
callbacks.end());
if (callbacks.empty())
events_.erase(it++);
else
++it;
}
}
} // namespace mojo } // namespace mojo
...@@ -53,8 +53,7 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) SyncEventWatcher { ...@@ -53,8 +53,7 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) SyncEventWatcher {
base::WaitableEvent* const event_; base::WaitableEvent* const event_;
const base::RepeatingClosure callback_; const base::RepeatingClosure callback_;
// Whether |event_| has been registered with SyncHandleRegistry. SyncHandleRegistry::EventCallbackSubscription subscription_;
bool registered_ = false;
// If non-zero, |event_| should be registered with SyncHandleRegistry. // If non-zero, |event_| should be registered with SyncHandleRegistry.
size_t register_request_count_ = 0; size_t register_request_count_ = 0;
......
...@@ -6,10 +6,12 @@ ...@@ -6,10 +6,12 @@
#define MOJO_PUBLIC_CPP_BINDINGS_SYNC_HANDLE_REGISTRY_H_ #define MOJO_PUBLIC_CPP_BINDINGS_SYNC_HANDLE_REGISTRY_H_
#include <map> #include <map>
#include <memory>
#include "base/callback.h" #include "base/callback.h"
#include "base/callback_helpers.h"
#include "base/callback_list.h"
#include "base/component_export.h" #include "base/component_export.h"
#include "base/containers/stack_container.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/sequence_checker.h" #include "base/sequence_checker.h"
...@@ -26,11 +28,31 @@ namespace mojo { ...@@ -26,11 +28,31 @@ namespace mojo {
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) SyncHandleRegistry class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) SyncHandleRegistry
: public base::RefCounted<SyncHandleRegistry> { : public base::RefCounted<SyncHandleRegistry> {
public: public:
// Returns a sequence-local object. using EventCallbackList = base::RepeatingClosureList;
static scoped_refptr<SyncHandleRegistry> current(); using EventCallback = EventCallbackList::CallbackType;
// Wrapper class that runs a closure after a CallbackList subscription is
// destroyed.
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Subscription {
public:
Subscription(base::OnceClosure remove_closure,
EventCallbackList* callbacks,
EventCallback event_callback);
Subscription(Subscription&&);
Subscription& operator=(Subscription&&);
~Subscription();
private:
base::ScopedClosureRunner remove_runner_;
std::unique_ptr<EventCallbackList::Subscription> subscription_;
};
using EventCallbackSubscription = std::unique_ptr<Subscription>;
using HandleCallback = base::RepeatingCallback<void(MojoResult)>; using HandleCallback = base::RepeatingCallback<void(MojoResult)>;
// Returns a sequence-local object.
static scoped_refptr<SyncHandleRegistry> current();
// Registers a |Handle| to be watched for |handle_signals|. If any such // Registers a |Handle| to be watched for |handle_signals|. If any such
// signals are satisfied during a Wait(), the Wait() is woken up and // signals are satisfied during a Wait(), the Wait() is woken up and
// |callback| is run. // |callback| is run.
...@@ -44,12 +66,8 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) SyncHandleRegistry ...@@ -44,12 +66,8 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) SyncHandleRegistry
// Wait() before any handle signals. |event| is not owned, and if it signals // Wait() before any handle signals. |event| is not owned, and if it signals
// during Wait(), |callback| is invoked. Note that |event| may be registered // during Wait(), |callback| is invoked. Note that |event| may be registered
// multiple times with different callbacks. // multiple times with different callbacks.
void RegisterEvent(base::WaitableEvent* event, EventCallbackSubscription RegisterEvent(base::WaitableEvent* event,
base::RepeatingClosure callback); EventCallback callback);
// Unregisters a specific |event|+|callback| pair.
void UnregisterEvent(base::WaitableEvent* event,
base::RepeatingClosure callback);
// Waits on all the registered handles and events and runs callbacks // Waits on all the registered handles and events and runs callbacks
// synchronously for any that become ready. // synchronously for any that become ready.
...@@ -61,26 +79,17 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) SyncHandleRegistry ...@@ -61,26 +79,17 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) SyncHandleRegistry
private: private:
friend class base::RefCounted<SyncHandleRegistry>; friend class base::RefCounted<SyncHandleRegistry>;
using EventCallbackList = base::StackVector<base::RepeatingClosure, 1>;
using EventMap = std::map<base::WaitableEvent*, EventCallbackList>;
SyncHandleRegistry(); SyncHandleRegistry();
~SyncHandleRegistry(); ~SyncHandleRegistry();
void RemoveInvalidEventCallbacks();
WaitSet wait_set_; WaitSet wait_set_;
std::map<Handle, HandleCallback> handles_; std::map<Handle, HandleCallback> handles_;
EventMap events_; std::map<base::WaitableEvent*, std::unique_ptr<EventCallbackList>> events_;
// |true| iff this registry is currently dispatching event callbacks in
// Wait(). Used to allow for safe event registration/unregistration from event
// callbacks.
bool is_dispatching_event_callbacks_ = false;
// Indicates if one or more event callbacks was unregistered during the most // True when the registry is dispatching event callbacks in Wait(). This is
// recent event callback dispatch. // used to improve the safety and efficiency of pruning unused entries in
bool remove_invalid_event_callbacks_after_dispatch_ = false; // |events_| if Wait() results in reentrancy.
bool in_nested_wait_ = false;
SEQUENCE_CHECKER(sequence_checker_); SEQUENCE_CHECKER(sequence_checker_);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment