Commit 51da4f94 authored by Ken Rockot's avatar Ken Rockot Committed by Commit Bot

[mojo] Support clients rejoining a process network

This change allows Mojo client processes to connect to a new process
network if disconnected from a previous one. Prior to this change, a
process could only belong to one network for its entire lifetime.

This new behavior is useful in cases where a client process may outlive
its original broker process (for example, if the broker crashes) and
should be able to continue operating normally when a new broker process
is established in the system.

Bug: 813112
Change-Id: Ifb5f16f1b39127cf7b469ee798f2d3da159fa432
Tbr: jam@chromium.org
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1821743Reviewed-by: default avatarWez <wez@chromium.org>
Reviewed-by: default avatarOksana Zhuravlova <oksamyt@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/master@{#700414}
parent bc6ed27b
......@@ -233,7 +233,8 @@ mojo::IncomingInvitation InitializeMojoIPCChannel() {
service_manager::kMojoIPCChannel))));
#endif
return mojo::IncomingInvitation::Accept(std::move(endpoint));
return mojo::IncomingInvitation::Accept(
std::move(endpoint), MOJO_ACCEPT_INVITATION_FLAG_LEAK_TRANSPORT_ENDPOINT);
}
class ChannelBootstrapFilter : public ConnectionFilter {
......
......@@ -39,8 +39,12 @@ class MOJO_SYSTEM_IMPL_EXPORT ConnectionParams {
void set_is_async(bool is_async) { is_async_ = is_async; }
bool is_async() const { return is_async_; }
void set_leak_endpoint(bool leak_endpoint) { leak_endpoint_ = leak_endpoint; }
bool leak_endpoint() const { return leak_endpoint_; }
private:
bool is_async_ = false;
bool leak_endpoint_ = false;
PlatformChannelEndpoint endpoint_;
PlatformChannelServerEndpoint server_endpoint_;
......
......@@ -199,12 +199,6 @@ void Core::SendBrokerClientInvitation(
process_error_callback);
}
void Core::AcceptBrokerClientInvitation(ConnectionParams connection_params) {
RequestContext request_context;
GetNodeController()->AcceptBrokerClientInvitation(
std::move(connection_params));
}
void Core::ConnectIsolated(ConnectionParams connection_params,
const ports::PortRef& port,
base::StringPiece connection_name) {
......@@ -1437,6 +1431,10 @@ MojoResult Core::AcceptInvitation(
connection_params =
ConnectionParams(PlatformChannelEndpoint(std::move(endpoint)));
}
if (options &&
options->flags & MOJO_ACCEPT_INVITATION_FLAG_LEAK_TRANSPORT_ENDPOINT) {
connection_params.set_leak_endpoint(true);
}
bool is_isolated =
options && (options->flags & MOJO_ACCEPT_INVITATION_FLAG_ISOLATED);
......
......@@ -83,11 +83,6 @@ class MOJO_SYSTEM_IMPL_EXPORT Core {
const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports,
const ProcessErrorCallback& process_error_callback);
// Accepts an invitation via |connection_params|. The other end of the
// connection medium in |connection_params| must have been used by some other
// process to send an invitation.
void AcceptBrokerClientInvitation(ConnectionParams connection_params);
// Extracts a named message pipe endpoint from the broker client invitation
// accepted by this process. Must only be called after
// AcceptBrokerClientInvitation.
......
......@@ -14,16 +14,21 @@
#include "base/macros.h"
#include "base/optional.h"
#include "base/path_service.h"
#include "base/process/process.h"
#include "base/run_loop.h"
#include "base/synchronization/lock.h"
#include "base/test/bind_test_util.h"
#include "base/test/multiprocess_test.h"
#include "base/test/task_environment.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "build/build_config.h"
#include "mojo/core/core.h"
#include "mojo/core/node_controller.h"
#include "mojo/core/test/mojo_test_base.h"
#include "mojo/public/c/system/invitation.h"
#include "mojo/public/cpp/platform/named_platform_channel.h"
#include "mojo/public/cpp/platform/platform_channel.h"
#include "mojo/public/cpp/system/invitation.h"
#include "mojo/public/cpp/system/platform_handle.h"
namespace mojo {
......@@ -686,6 +691,89 @@ DEFINE_TEST_CLIENT(ProcessErrorsClient) {
EXPECT_EQ(kDisconnectMessage, ReadMessage(pipe));
}
TEST_F(InvitationTest, Reinvitation) {
// The gist of this test is that a process should be able to accept an
// invitation, lose its connection to the process network, and then accept a
// new invitation to re-establish communication.
// We pass an extra PlatformChannel endpoint to the child process which it
// will use to accept a secondary invitation after we sever its first
// connection.
PlatformChannel secondary_channel;
auto command_line = base::GetMultiProcessTestChildBaseCommandLine();
base::LaunchOptions launch_options;
PrepareToPassRemoteEndpoint(&secondary_channel, &launch_options,
&command_line, kSecondaryChannelHandleSwitch);
MojoHandle pipe;
base::Process child_process = LaunchChildTestClient(
"ReinvitationClient", &pipe, 1, TransportType::kChannel,
MOJO_SEND_INVITATION_FLAG_NONE, nullptr, 0, &command_line,
&launch_options);
secondary_channel.RemoteProcessLaunchAttempted();
// Synchronize end-to-end communication first to ensure the process connection
// is fully established.
WriteMessage(pipe, kTestMessage1);
EXPECT_EQ(kTestMessage2, ReadMessage(pipe));
// Force-disconnect the child process.
Core::Get()->GetNodeController()->ForceDisconnectProcessForTesting(
child_process.Pid());
// The above disconnection should force pipe closure eventually.
WaitForSignals(pipe, MOJO_HANDLE_SIGNAL_PEER_CLOSED);
MojoClose(pipe);
// Now use our secondary channel to send a new invitation to the same process.
// It should be able to accept the new invitation and re-establish
// communication.
mojo::OutgoingInvitation new_invitation;
auto new_pipe = new_invitation.AttachMessagePipe(0);
mojo::OutgoingInvitation::Send(std::move(new_invitation),
child_process.Handle(),
secondary_channel.TakeLocalEndpoint());
WriteMessage(new_pipe.get().value(), kTestMessage3);
EXPECT_EQ(kTestMessage4, ReadMessage(new_pipe.get().value()));
WriteMessage(new_pipe.get().value(), kDisconnectMessage);
int wait_result = -1;
base::WaitForMultiprocessTestChildExit(
child_process, TestTimeouts::action_timeout(), &wait_result);
child_process.Close();
EXPECT_EQ(0, wait_result);
}
DEFINE_TEST_CLIENT(ReinvitationClient) {
MojoHandle pipe;
MojoHandle invitation = AcceptInvitation(MOJO_ACCEPT_INVITATION_FLAG_NONE);
const uint32_t pipe_name = 0;
ASSERT_EQ(MOJO_RESULT_OK, MojoExtractMessagePipeFromInvitation(
invitation, &pipe_name, 4, nullptr, &pipe));
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(invitation));
EXPECT_EQ(kTestMessage1, ReadMessage(pipe));
WriteMessage(pipe, kTestMessage2);
// Wait for the pipe to break due to forced process disconnection.
WaitForSignals(pipe, MOJO_HANDLE_SIGNAL_PEER_CLOSED);
MojoClose(pipe);
// Now grab the secondary channel and accept a new invitation from it.
PlatformChannelEndpoint new_endpoint =
PlatformChannel::RecoverPassedEndpointFromString(
base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
kSecondaryChannelHandleSwitch));
auto secondary_invitation =
mojo::IncomingInvitation::Accept(std::move(new_endpoint));
auto new_pipe = secondary_invitation.ExtractMessagePipe(0);
// Ensure that the new connection is working end-to-end.
EXPECT_EQ(kTestMessage3, ReadMessage(new_pipe.get().value()));
WriteMessage(new_pipe.get().value(), kTestMessage4);
EXPECT_EQ(kDisconnectMessage, ReadMessage(new_pipe.get().value()));
}
TEST_F(InvitationTest, SendIsolatedInvitation) {
MojoHandle primordial_pipe;
base::Process child_process = LaunchChildTestClient(
......
......@@ -213,7 +213,10 @@ void NodeController::AcceptBrokerClientInvitation(
CancelPendingPortMerges();
return;
}
const bool leak_endpoint = connection_params.leak_endpoint();
connection_params = ConnectionParams(std::move(endpoint));
connection_params.set_leak_endpoint(leak_endpoint);
} else {
// For async connections, we instead create a new channel for the broker and
// send a request for the inviting process to bind to it. This avoids doing
......@@ -226,6 +229,10 @@ void NodeController::AcceptBrokerClientInvitation(
broker_host_handle = channel.TakeRemoteEndpoint().TakePlatformHandle();
}
#endif
// Re-enable port merge operations, which may have been disabled if this isn't
// the first invitation accepted by this process.
base::AutoLock lock(pending_port_merges_lock_);
reject_pending_merges_ = false;
io_task_runner_->PostTask(
FROM_HERE,
......@@ -322,6 +329,15 @@ void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
peer->NotifyBadMessage(error);
}
void NodeController::ForceDisconnectProcessForTesting(
base::ProcessId process_id) {
io_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
&NodeController::ForceDisconnectProcessForTestingOnIOThread,
base::Unretained(this), process_id));
}
// static
void NodeController::DeserializeRawBytesAsEventForFuzzer(
base::span<const unsigned char> data) {
......@@ -412,7 +428,17 @@ void NodeController::AcceptBrokerClientInvitationOnIOThread(
{
base::AutoLock lock(inviter_lock_);
DCHECK(inviter_name_ == ports::kInvalidNodeName);
if (inviter_name_ != ports::kInvalidNodeName) {
// We've already accepted an invitation before and are already part of
// a different Mojo process network. In order to accept this new one and
// remain in a consistent state, we have to purge all peer connections and
// start from scratch.
{
base::AutoUnlock unlock(inviter_lock_);
DropAllPeers();
}
inviter_name_ = ports::kInvalidNodeName;
}
// At this point we don't know the inviter's name, so we can't yet insert it
// into our |peers_| map. That will happen as soon as we receive an
......@@ -421,10 +447,18 @@ void NodeController::AcceptBrokerClientInvitationOnIOThread(
NodeChannel::Create(this, std::move(connection_params),
Channel::HandlePolicy::kAcceptHandles,
io_task_runner_, ProcessErrorCallback());
// Prevent the inviter pipe handle from being closed on shutdown. Pipe
// closure may be used by the inviter to detect the invitee process has
// exited.
bootstrap_inviter_channel_->LeakHandleOnShutdown();
if (connection_params.leak_endpoint()) {
// Prevent the inviter pipe handle from being closed on shutdown. Pipe
// closure may be used by the inviter to detect that the invited process
// has terminated. In such cases, the invited process must not be invited
// more than once in its lifetime; otherwise this leak matters.
//
// Note that this behavior is supported primarily to help adapt legacy
// Chrome IPC to Mojo, since channel disconnection is used there as a
// signal for normal child process termination.
bootstrap_inviter_channel_->LeakHandleOnShutdown();
}
}
bootstrap_inviter_channel_->Start();
if (broker_host_handle)
......@@ -1259,6 +1293,31 @@ void NodeController::AttemptShutdownIfRequested() {
callback.Run();
}
void NodeController::ForceDisconnectProcessForTestingOnIOThread(
base::ProcessId process_id) {
#if defined(OS_NACL) || defined(OS_IOS)
NOTREACHED();
#else
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
RequestContext request_context;
// A channel may have multiple aliases since we generate one for any we
// invite and then only later refer to it by its own chosen name.
NodeMap peers_to_drop;
for (auto& peer : peers_) {
NodeChannel* channel = peer.second.get();
if (channel->HasRemoteProcessHandle()) {
base::Process process(channel->CloneRemoteProcessHandle().release());
if (process.Pid() == process_id)
peers_to_drop.emplace(peer.first, peer.second);
}
}
for (auto& peer : peers_to_drop)
DropPeer(peer.first, peer.second.get());
#endif
}
NodeController::IsolatedConnection::IsolatedConnection() = default;
NodeController::IsolatedConnection::IsolatedConnection(
......
......@@ -121,6 +121,11 @@ class MOJO_SYSTEM_IMPL_EXPORT NodeController : public ports::NodeDelegate,
void NotifyBadMessageFrom(const ports::NodeName& source_node,
const std::string& error);
// Force-closes the connection to another process to simulate connection
// failures for testing. |process_id| must correspond to a process to which
// this node has an active NodeChannel.
void ForceDisconnectProcessForTesting(base::ProcessId process_id);
static void DeserializeRawBytesAsEventForFuzzer(
base::span<const unsigned char> data);
static void DeserializeMessageAsEventForFuzzer(Channel::MessagePtr message);
......@@ -240,6 +245,9 @@ class MOJO_SYSTEM_IMPL_EXPORT NodeController : public ports::NodeDelegate,
// possible. If so, shutdown is performed and the shutdown callback is run.
void AttemptShutdownIfRequested();
// See |ForceDisconnectProcessForTesting()|.
void ForceDisconnectProcessForTestingOnIOThread(base::ProcessId process_id);
// These are safe to access from any thread as long as the Node is alive.
Core* const core_;
const ports::NodeName name_;
......
......@@ -10,6 +10,10 @@
#include <windows.h>
#endif
#if defined(OS_FUCHSIA)
#include <zircon/syscalls.h>
#endif
namespace mojo {
namespace core {
......@@ -30,7 +34,12 @@ base::ProcessHandle GetCurrentProcessHandle() {
ScopedProcessHandle::ScopedProcessHandle() = default;
ScopedProcessHandle::ScopedProcessHandle(base::ProcessHandle handle)
: handle_(handle) {
#if defined(OS_FUCHSIA)
: process_(handle)
#else
: handle_(handle)
#endif
{
DCHECK_NE(handle, GetCurrentProcessHandle());
}
......@@ -49,8 +58,20 @@ ScopedProcessHandle ScopedProcessHandle::CloneFrom(base::ProcessHandle handle) {
GetCurrentProcessHandle(), &handle, 0, FALSE,
DUPLICATE_SAME_ACCESS);
DCHECK(ok);
#endif
return ScopedProcessHandle(handle);
#elif defined(OS_FUCHSIA)
base::ProcessHandle new_handle;
zx_status_t status =
zx_handle_duplicate(handle, ZX_RIGHT_SAME_RIGHTS, &new_handle);
if (status != ZX_OK)
return ScopedProcessHandle();
return ScopedProcessHandle(new_handle);
#elif defined(OS_POSIX)
return ScopedProcessHandle(handle);
#else
#error "Unsupported platform."
return ScopedProcessHandle();
#endif
}
ScopedProcessHandle& ScopedProcessHandle::operator=(ScopedProcessHandle&&) =
......@@ -59,6 +80,8 @@ ScopedProcessHandle& ScopedProcessHandle::operator=(ScopedProcessHandle&&) =
bool ScopedProcessHandle::is_valid() const {
#if defined(OS_WIN)
return handle_.IsValid();
#elif defined(OS_FUCHSIA)
return process_.is_valid();
#else
return handle_ != base::kNullProcessHandle;
#endif
......@@ -67,6 +90,8 @@ bool ScopedProcessHandle::is_valid() const {
base::ProcessHandle ScopedProcessHandle::get() const {
#if defined(OS_WIN)
return handle_.Get();
#elif defined(OS_FUCHSIA)
return process_.get();
#else
return handle_;
#endif
......@@ -75,6 +100,8 @@ base::ProcessHandle ScopedProcessHandle::get() const {
base::ProcessHandle ScopedProcessHandle::release() {
#if defined(OS_WIN)
return handle_.Take();
#elif defined(OS_FUCHSIA)
return process_.release();
#else
return handle_;
#endif
......
......@@ -13,6 +13,10 @@
#include "base/win/scoped_handle.h"
#endif
#if defined(OS_FUCHSIA)
#include <lib/zx/process.h>
#endif
namespace mojo {
namespace core {
......@@ -52,6 +56,8 @@ class ScopedProcessHandle {
private:
#if defined(OS_WIN)
base::win::ScopedHandle handle_;
#elif defined(OS_FUCHSIA)
zx::process process_;
#else
base::ProcessHandle handle_ = base::kNullProcessHandle;
#endif
......
......@@ -229,6 +229,17 @@ typedef uint32_t MojoAcceptInvitationFlags;
// |MOJO_SEND_INVITATION_FLAG_ISOLATED| for details.
#define MOJO_ACCEPT_INVITATION_FLAG_ISOLATED ((MojoAcceptInvitationFlags)1)
// The transport endpoint used to accept this invitation should be leaked, i.e.
// never closed until it's implicitly closed on process death. This exists to
// support adaptation of legacy code to Mojo IPC so that, e.g., a broken pipe
// can be used as a reliable indication of remote process death.
//
// This flag should generally not be used unless strictly necessary, and it is
// unsafe to use in any situation where a process may accept multiple
// invitations over the course of its lifetime.
#define MOJO_ACCEPT_INVITATION_FLAG_LEAK_TRANSPORT_ENDPOINT \
((MojoAcceptInvitationFlags)2)
// Options passed to |MojoAcceptInvitation()|.
struct MOJO_ALIGNAS(8) MojoAcceptInvitationOptions {
// The size of this structure, used for versioning.
......
......@@ -225,7 +225,8 @@ IncomingInvitation& IncomingInvitation::operator=(IncomingInvitation&& other) =
// static
IncomingInvitation IncomingInvitation::Accept(
PlatformChannelEndpoint channel_endpoint) {
PlatformChannelEndpoint channel_endpoint,
MojoAcceptInvitationFlags flags) {
MojoPlatformHandle endpoint_handle;
PlatformHandle::ToMojoPlatformHandle(channel_endpoint.TakePlatformHandle(),
&endpoint_handle);
......@@ -237,9 +238,13 @@ IncomingInvitation IncomingInvitation::Accept(
transport_endpoint.num_platform_handles = 1;
transport_endpoint.platform_handles = &endpoint_handle;
MojoAcceptInvitationOptions options;
options.struct_size = sizeof(options);
options.flags = flags;
MojoHandle invitation_handle;
MojoResult result =
MojoAcceptInvitation(&transport_endpoint, nullptr, &invitation_handle);
MojoAcceptInvitation(&transport_endpoint, &options, &invitation_handle);
if (result != MOJO_RESULT_OK)
return IncomingInvitation();
......
......@@ -13,6 +13,7 @@
#include "base/macros.h"
#include "base/process/process_handle.h"
#include "base/strings/string_piece.h"
#include "mojo/public/c/system/invitation.h"
#include "mojo/public/cpp/platform/platform_channel_endpoint.h"
#include "mojo/public/cpp/platform/platform_channel_server_endpoint.h"
#include "mojo/public/cpp/system/handle.h"
......@@ -169,7 +170,9 @@ class MOJO_CPP_SYSTEM_EXPORT IncomingInvitation {
// by |NamedPlatformChannel::ConnectToServer|.
//
// Note that this performs blocking I/O on the calling thread.
static IncomingInvitation Accept(PlatformChannelEndpoint channel_endpoint);
static IncomingInvitation Accept(
PlatformChannelEndpoint channel_endpoint,
MojoAcceptInvitationFlags flags = MOJO_ACCEPT_INVITATION_FLAG_NONE);
// Like above, but does not perform any blocking I/O. Not all platforms and
// sandbox configurations are compatible with this API. In such cases, the
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment