Commit 54ad225c authored by Ken Rockot's avatar Ken Rockot Committed by Commit Bot

Mojo: Optimize Node::DestroyAllPortsWithPeer

Node::DestroyAllPortsWithPeer is called any time a connection is lost to
a remote Node, allowing the local process to recursively back-propagate
awareness of broken port cycles (and thus disconnected Mojo primitives).

The current implementation of DestroyAllPortsWithPeer turns out to be
O(n^2) *gasp* and the frequency with which this can be called has gone
overlooked for some time. Eep.

This CL has Node maintain a reverse mapping from peer NodeNode+PortName
to the set of all local ports which reference that NodeName+PortName
combo as their peer. This allows for a much more efficient
implementation of DestroyAllPortsWithPeer, particularly in the recursive
branches which typically only affect at most a single local port.

Bug: 922650
Change-Id: Ic0baf822f8735cce5f59a7820a729b748a6875ec
Reviewed-on: https://chromium-review.googlesource.com/c/1433134
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: default avatarErik Chen <erikchen@chromium.org>
Cr-Commit-Position: refs/heads/master@{#625841}
parent 04ca4d8a
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <algorithm> #include <algorithm>
#include <utility> #include <utility>
#include <vector>
#include "base/atomicops.h" #include "base/atomicops.h"
#include "base/containers/stack_container.h" #include "base/containers/stack_container.h"
...@@ -180,14 +181,18 @@ int Node::InitializePort(const PortRef& port_ref, ...@@ -180,14 +181,18 @@ int Node::InitializePort(const PortRef& port_ref,
const NodeName& peer_node_name, const NodeName& peer_node_name,
const PortName& peer_port_name) { const PortName& peer_port_name) {
{ {
// Must be acquired for UpdatePortPeerAddress below.
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_locker(ports_lock_);
SinglePortLocker locker(&port_ref); SinglePortLocker locker(&port_ref);
auto* port = locker.port(); auto* port = locker.port();
if (port->state != Port::kUninitialized) if (port->state != Port::kUninitialized)
return ERROR_PORT_STATE_UNEXPECTED; return ERROR_PORT_STATE_UNEXPECTED;
port->state = Port::kReceiving; port->state = Port::kReceiving;
port->peer_node_name = peer_node_name; UpdatePortPeerAddress(port_ref.name(), port, peer_node_name,
port->peer_port_name = peer_port_name; peer_port_name);
} }
delegate_->PortStatusChanged(port_ref); delegate_->PortStatusChanged(port_ref);
...@@ -400,6 +405,10 @@ int Node::MergePorts(const PortRef& port_ref, ...@@ -400,6 +405,10 @@ int Node::MergePorts(const PortRef& port_ref,
PortName new_port_name; PortName new_port_name;
Event::PortDescriptor new_port_descriptor; Event::PortDescriptor new_port_descriptor;
{ {
// Must be held for ConvertToProxy.
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_locker(ports_lock_);
SinglePortLocker locker(&port_ref); SinglePortLocker locker(&port_ref);
DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
...@@ -588,14 +597,19 @@ int Node::OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event) { ...@@ -588,14 +597,19 @@ int Node::OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event) {
ScopedEvent event_to_forward; ScopedEvent event_to_forward;
NodeName event_target_node; NodeName event_target_node;
{ {
// Must be acquired for UpdatePortPeerAddress below.
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_locker(ports_lock_);
SinglePortLocker locker(&port_ref); SinglePortLocker locker(&port_ref);
auto* port = locker.port(); auto* port = locker.port();
if (port->peer_node_name == event->proxy_node_name() && if (port->peer_node_name == event->proxy_node_name() &&
port->peer_port_name == event->proxy_port_name()) { port->peer_port_name == event->proxy_port_name()) {
if (port->state == Port::kReceiving) { if (port->state == Port::kReceiving) {
port->peer_node_name = event->proxy_target_node_name(); UpdatePortPeerAddress(port_ref.name(), port,
port->peer_port_name = event->proxy_target_port_name(); event->proxy_target_node_name(),
event->proxy_target_port_name());
event_target_node = event->proxy_node_name(); event_target_node = event->proxy_node_name();
event_to_forward = std::make_unique<ObserveProxyAckEvent>( event_to_forward = std::make_unique<ObserveProxyAckEvent>(
event->proxy_port_name(), port->next_sequence_num_to_send - 1); event->proxy_port_name(), port->next_sequence_num_to_send - 1);
...@@ -788,6 +802,11 @@ int Node::OnMergePort(std::unique_ptr<MergePortEvent> event) { ...@@ -788,6 +802,11 @@ int Node::OnMergePort(std::unique_ptr<MergePortEvent> event) {
int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) { int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) {
PortLocker::AssertNoPortsLockedOnCurrentThread(); PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock lock(ports_lock_); base::AutoLock lock(ports_lock_);
if (port->peer_port_name != kInvalidPortName) {
DCHECK_NE(kInvalidNodeName, port->peer_node_name);
peer_port_maps_[port->peer_node_name][port->peer_port_name].emplace(
port_name, PortRef(port_name, port));
}
if (!ports_.emplace(port_name, std::move(port)).second) if (!ports_.emplace(port_name, std::move(port)).second)
return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
DVLOG(2) << "Created port " << port_name << "@" << name_; DVLOG(2) << "Created port " << port_name << "@" << name_;
...@@ -804,6 +823,8 @@ void Node::ErasePort(const PortName& port_name) { ...@@ -804,6 +823,8 @@ void Node::ErasePort(const PortName& port_name) {
return; return;
port = std::move(it->second); port = std::move(it->second);
ports_.erase(it); ports_.erase(it);
RemoveFromPeerPortMap(port_name, port.get());
} }
// NOTE: We are careful not to release the port's messages while holding any // NOTE: We are careful not to release the port's messages while holding any
// locks, since they may run arbitrary user code upon destruction. // locks, since they may run arbitrary user code upon destruction.
...@@ -856,6 +877,10 @@ int Node::MergePortsInternal(const PortRef& port0_ref, ...@@ -856,6 +877,10 @@ int Node::MergePortsInternal(const PortRef& port0_ref,
bool allow_close_on_bad_state) { bool allow_close_on_bad_state) {
const PortRef* port_refs[2] = {&port0_ref, &port1_ref}; const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
{ {
// Needed to swap peer map entries below.
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::Optional<base::AutoLock> ports_locker(base::in_place, ports_lock_);
base::Optional<PortLocker> locker(base::in_place, port_refs, 2); base::Optional<PortLocker> locker(base::in_place, port_refs, 2);
auto* port0 = locker->GetPort(port0_ref); auto* port0 = locker->GetPort(port0_ref);
auto* port1 = locker->GetPort(port1_ref); auto* port1 = locker->GetPort(port1_ref);
...@@ -885,6 +910,7 @@ int Node::MergePortsInternal(const PortRef& port0_ref, ...@@ -885,6 +910,7 @@ int Node::MergePortsInternal(const PortRef& port0_ref,
const bool close_port1 = const bool close_port1 =
port1->state == Port::kReceiving || allow_close_on_bad_state; port1->state == Port::kReceiving || allow_close_on_bad_state;
locker.reset(); locker.reset();
ports_locker.reset();
if (close_port0) if (close_port0)
ClosePort(port0_ref); ClosePort(port0_ref);
if (close_port1) if (close_port1)
...@@ -893,8 +919,7 @@ int Node::MergePortsInternal(const PortRef& port0_ref, ...@@ -893,8 +919,7 @@ int Node::MergePortsInternal(const PortRef& port0_ref,
} }
// Swap the ports' peer information and switch them both to proxying mode. // Swap the ports' peer information and switch them both to proxying mode.
std::swap(port0->peer_node_name, port1->peer_node_name); SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
std::swap(port0->peer_port_name, port1->peer_port_name);
port0->state = Port::kProxying; port0->state = Port::kProxying;
port1->state = Port::kProxying; port1->state = Port::kProxying;
if (port0->peer_closed) if (port0->peer_closed)
...@@ -941,11 +966,12 @@ int Node::MergePortsInternal(const PortRef& port0_ref, ...@@ -941,11 +966,12 @@ int Node::MergePortsInternal(const PortRef& port0_ref,
// If we failed to forward proxied messages, we keep the system in a // If we failed to forward proxied messages, we keep the system in a
// consistent state by undoing the peer swap and closing the ports. // consistent state by undoing the peer swap and closing the ports.
{ {
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_locker(ports_lock_);
PortLocker locker(port_refs, 2); PortLocker locker(port_refs, 2);
auto* port0 = locker.GetPort(port0_ref); auto* port0 = locker.GetPort(port0_ref);
auto* port1 = locker.GetPort(port1_ref); auto* port1 = locker.GetPort(port1_ref);
std::swap(port0->peer_node_name, port1->peer_node_name); SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
std::swap(port0->peer_port_name, port1->peer_port_name);
port0->remove_proxy_on_last_message = false; port0->remove_proxy_on_last_message = false;
port1->remove_proxy_on_last_message = false; port1->remove_proxy_on_last_message = false;
DCHECK_EQ(Port::kProxying, port0->state); DCHECK_EQ(Port::kProxying, port0->state);
...@@ -994,8 +1020,7 @@ void Node::ConvertToProxy(Port* port, ...@@ -994,8 +1020,7 @@ void Node::ConvertToProxy(Port* port,
memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding)); memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
// Configure the local port to point to the new port. // Configure the local port to point to the new port.
port->peer_node_name = to_node_name; UpdatePortPeerAddress(local_port_name, port, to_node_name, new_port_name);
port->peer_port_name = new_port_name;
} }
int Node::AcceptPort(const PortName& port_name, int Node::AcceptPort(const PortName& port_name,
...@@ -1052,6 +1077,10 @@ int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref, ...@@ -1052,6 +1077,10 @@ int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
} }
} }
// Must be held because ConvertToProxy needs to update |peer_port_maps_|.
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_locker(ports_lock_);
// Simultaneously lock the forwarding port as well as all attached ports. // Simultaneously lock the forwarding port as well as all attached ports.
base::StackVector<PortRef, 4> attached_port_refs; base::StackVector<PortRef, 4> attached_port_refs;
base::StackVector<const PortRef*, 5> ports_to_lock; base::StackVector<const PortRef*, 5> ports_to_lock;
...@@ -1059,8 +1088,10 @@ int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref, ...@@ -1059,8 +1088,10 @@ int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
ports_to_lock.container().resize(message->num_ports() + 1); ports_to_lock.container().resize(message->num_ports() + 1);
ports_to_lock[0] = &forwarding_port_ref; ports_to_lock[0] = &forwarding_port_ref;
for (size_t i = 0; i < message->num_ports(); ++i) { for (size_t i = 0; i < message->num_ports(); ++i) {
GetPort(message->ports()[i], &attached_port_refs[i]); const PortName& attached_port_name = message->ports()[i];
DCHECK(attached_port_refs[i].is_valid()); auto iter = ports_.find(attached_port_name);
DCHECK(iter != ports_.end());
attached_port_refs[i] = PortRef(attached_port_name, iter->second);
ports_to_lock[i + 1] = &attached_port_refs[i]; ports_to_lock[i + 1] = &attached_port_refs[i];
} }
PortLocker locker(ports_to_lock.container().data(), PortLocker locker(ports_to_lock.container().data(),
...@@ -1297,15 +1328,38 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name, ...@@ -1297,15 +1328,38 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
PortLocker::AssertNoPortsLockedOnCurrentThread(); PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_lock(ports_lock_); base::AutoLock ports_lock(ports_lock_);
for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) { auto node_peer_port_map_iter = peer_port_maps_.find(node_name);
PortRef port_ref(iter->first, iter->second); if (node_peer_port_map_iter == peer_port_maps_.end())
{ return;
SinglePortLocker locker(&port_ref);
auto& node_peer_port_map = node_peer_port_map_iter->second;
auto peer_ports_begin = node_peer_port_map.begin();
auto peer_ports_end = node_peer_port_map.end();
if (port_name != kInvalidPortName) {
// If |port_name| is given, we limit the set of local ports to the ones
// with that specific port as their peer.
peer_ports_begin = node_peer_port_map.find(port_name);
if (peer_ports_begin == node_peer_port_map.end())
return;
peer_ports_end = peer_ports_begin;
++peer_ports_end;
}
for (auto peer_port_iter = peer_ports_begin;
peer_port_iter != peer_ports_end; ++peer_port_iter) {
auto& local_ports = peer_port_iter->second;
// NOTE: This inner loop almost always has only one element. There are
// relatively short-lived cases where more than one local port points to
// the same peer, and this only happens when extra ports are bypassed
// proxies waiting to be torn down.
for (auto local_port_iter = local_ports.begin();
local_port_iter != local_ports.end(); ++local_port_iter) {
auto& local_port_ref = local_port_iter->second;
SinglePortLocker locker(&local_port_ref);
auto* port = locker.port(); auto* port = locker.port();
if (port->peer_node_name == node_name &&
(port_name == kInvalidPortName ||
port->peer_port_name == port_name)) {
if (!port->peer_closed) { if (!port->peer_closed) {
// Treat this as immediate peer closure. It's an exceptional // Treat this as immediate peer closure. It's an exceptional
// condition akin to a broken pipe, so we don't care about losing // condition akin to a broken pipe, so we don't care about losing
...@@ -1316,7 +1370,7 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name, ...@@ -1316,7 +1370,7 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
port->message_queue.next_sequence_num() - 1; port->message_queue.next_sequence_num() - 1;
if (port->state == Port::kReceiving) if (port->state == Port::kReceiving)
ports_to_notify.push_back(PortRef(iter->first, port)); ports_to_notify.push_back(local_port_ref);
} }
// We don't expect to forward any further messages, and we don't // We don't expect to forward any further messages, and we don't
...@@ -1326,16 +1380,15 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name, ...@@ -1326,16 +1380,15 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
// broadcast our own death so it can be back-propagated. This is // broadcast our own death so it can be back-propagated. This is
// inefficient but rare. // inefficient but rare.
if (port->state != Port::kReceiving) { if (port->state != Port::kReceiving) {
dead_proxies_to_broadcast.push_back(iter->first); dead_proxies_to_broadcast.push_back(local_port_ref.name());
std::vector<std::unique_ptr<UserMessageEvent>> messages; std::vector<std::unique_ptr<UserMessageEvent>> messages;
iter->second->message_queue.TakeAllMessages(&messages); port->message_queue.TakeAllMessages(&messages);
for (auto& message : messages) for (auto& message : messages)
undelivered_messages.emplace_back(std::move(message)); undelivered_messages.emplace_back(std::move(message));
} }
} }
} }
} }
}
for (const auto& proxy_name : dead_proxies_to_broadcast) { for (const auto& proxy_name : dead_proxies_to_broadcast) {
ErasePort(proxy_name); ErasePort(proxy_name);
...@@ -1369,6 +1422,68 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name, ...@@ -1369,6 +1422,68 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
} }
} }
void Node::UpdatePortPeerAddress(const PortName& local_port_name,
Port* local_port,
const NodeName& new_peer_node,
const PortName& new_peer_port) {
ports_lock_.AssertAcquired();
local_port->AssertLockAcquired();
RemoveFromPeerPortMap(local_port_name, local_port);
local_port->peer_node_name = new_peer_node;
local_port->peer_port_name = new_peer_port;
if (new_peer_port != kInvalidPortName) {
peer_port_maps_[new_peer_node][new_peer_port].emplace(
local_port_name,
PortRef(local_port_name, base::WrapRefCounted<Port>(local_port)));
}
}
void Node::RemoveFromPeerPortMap(const PortName& local_port_name,
Port* local_port) {
if (local_port->peer_port_name == kInvalidPortName)
return;
auto node_iter = peer_port_maps_.find(local_port->peer_node_name);
if (node_iter == peer_port_maps_.end())
return;
auto& node_peer_port_map = node_iter->second;
auto ports_iter = node_peer_port_map.find(local_port->peer_port_name);
if (ports_iter == node_peer_port_map.end())
return;
auto& local_ports_with_this_peer = ports_iter->second;
local_ports_with_this_peer.erase(local_port_name);
if (local_ports_with_this_peer.empty())
node_peer_port_map.erase(ports_iter);
if (node_peer_port_map.empty())
peer_port_maps_.erase(node_iter);
}
void Node::SwapPortPeers(const PortName& port0_name,
Port* port0,
const PortName& port1_name,
Port* port1) {
ports_lock_.AssertAcquired();
port0->AssertLockAcquired();
port1->AssertLockAcquired();
auto& peer0_ports =
peer_port_maps_[port0->peer_node_name][port0->peer_port_name];
auto& peer1_ports =
peer_port_maps_[port1->peer_node_name][port1->peer_port_name];
peer0_ports.erase(port0_name);
peer1_ports.erase(port1_name);
peer0_ports.emplace(port1_name,
PortRef(port1_name, base::WrapRefCounted<Port>(port1)));
peer1_ports.emplace(port0_name,
PortRef(port0_name, base::WrapRefCounted<Port>(port0)));
std::swap(port0->peer_node_name, port1->peer_node_name);
std::swap(port0->peer_port_name, port1->peer_port_name);
}
Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate) Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate)
: node_(node), delegate_(delegate) { : node_(node), delegate_(delegate) {
DCHECK(node_); DCHECK(node_);
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include <unordered_map> #include <unordered_map>
#include "base/component_export.h" #include "base/component_export.h"
#include "base/containers/flat_map.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
...@@ -227,18 +228,59 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node { ...@@ -227,18 +228,59 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
void DestroyAllPortsWithPeer(const NodeName& node_name, void DestroyAllPortsWithPeer(const NodeName& node_name,
const PortName& port_name); const PortName& port_name);
// Changes the peer node and port name referenced by |port|. Note that both
// |ports_lock_| MUST be held through the extent of this method.
// |local_port|'s lock must be held if and only if a reference to |local_port|
// exist in |ports_|.
void UpdatePortPeerAddress(const PortName& local_port_name,
Port* local_port,
const NodeName& new_peer_node,
const PortName& new_peer_port);
// Removes an entry from |peer_port_map_| corresponding to |local_port|'s peer
// address, if valid.
void RemoveFromPeerPortMap(const PortName& local_port_name, Port* local_port);
// Swaps the peer information for two local ports. Used during port merges.
// Note that |ports_lock_| must be held along with each of the two port's own
// locks, through the extent of this method.
void SwapPortPeers(const PortName& port0_name,
Port* port0,
const PortName& port1_name,
Port* port1);
const NodeName name_; const NodeName name_;
const DelegateHolder delegate_; const DelegateHolder delegate_;
// Guards |ports_|. This must never be acquired while an individual port's // Just to clarify readability of the types below.
// lock is held on the same thread. Conversely, individual port locks may be using LocalPortName = PortName;
// acquired while this one is held. using PeerPortName = PortName;
// Guards access to |ports_| and |peer_port_maps_| below.
//
// This must never be acquired while an individual port's lock is held on the
// same thread. Conversely, individual port locks may be acquired while this
// one is held.
// //
// Because UserMessage events may execute arbitrary user code during // Because UserMessage events may execute arbitrary user code during
// destruction, it is also important to ensure that such events are never // destruction, it is also important to ensure that such events are never
// destroyed while this (or any individual Port) lock is held. // destroyed while this (or any individual Port) lock is held.
base::Lock ports_lock_; base::Lock ports_lock_;
std::unordered_map<PortName, scoped_refptr<Port>> ports_; std::unordered_map<LocalPortName, scoped_refptr<Port>> ports_;
// Maps a peer port name to a list of PortRefs for all local ports which have
// the port name key designated as their peer port. The set of local ports
// which have the same peer port is expected to always be relatively small and
// usually 1. Hence we just use a flat_map of local PortRefs keyed on each
// local port's name.
using PeerPortMap =
std::unordered_map<PeerPortName, base::flat_map<LocalPortName, PortRef>>;
// A reverse mapping which can be used to find all local ports that reference
// a given peer node or a local port that references a specific given peer
// port on a peer node. The key to this map is the corresponding peer node
// name.
std::unordered_map<NodeName, PeerPortMap> peer_port_maps_;
DISALLOW_COPY_AND_ASSIGN(Node); DISALLOW_COPY_AND_ASSIGN(Node);
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment