Commit 5732c6d4 authored by Ken Rockot's avatar Ken Rockot Committed by Commit Bot

Move more stuff out of mojo/edk/embedder

Moves some POSIX socket utilities into mojo/public/cpp/platform, moves
ConnectionParams into mojo/edk/system, deletes TransportProtocol.

Bug: 753541
Change-Id: I1575af268e89c0424e618f2a0974693f610f5969
Reviewed-on: https://chromium-review.googlesource.com/1109199
Commit-Queue: Ken Rockot <rockot@chromium.org>
Reviewed-by: default avatarJay Civelli <jcivelli@chromium.org>
Cr-Commit-Position: refs/heads/master@{#569686}
parent a9cd34ec
......@@ -65,7 +65,6 @@ template("core_impl_source_set") {
public = [
"embedder/configuration.h",
"embedder/connection_params.h",
"embedder/named_platform_channel_pair.h",
"embedder/named_platform_handle.h",
"embedder/named_platform_handle_utils.h",
......@@ -73,9 +72,9 @@ template("core_impl_source_set") {
"embedder/platform_handle.h",
"embedder/process_error_callback.h",
"embedder/scoped_platform_handle.h",
"embedder/transport_protocol.h",
"system/channel.h",
"system/configuration.h",
"system/connection_params.h",
"system/core.h",
"system/data_pipe_consumer_dispatcher.h",
"system/data_pipe_control_message.h",
......@@ -104,7 +103,6 @@ template("core_impl_source_set") {
}
sources = [
"embedder/connection_params.cc",
"embedder/named_platform_handle_utils_win.cc",
"embedder/platform_channel_pair.cc",
"embedder/platform_channel_pair_win.cc",
......@@ -115,6 +113,7 @@ template("core_impl_source_set") {
"system/channel.cc",
"system/channel_win.cc",
"system/configuration.cc",
"system/connection_params.cc",
"system/core.cc",
"system/data_pipe_consumer_dispatcher.cc",
"system/data_pipe_control_message.cc",
......@@ -163,9 +162,7 @@ template("core_impl_source_set") {
sources += [ "embedder/platform_channel_pair_posix.cc" ]
if (!is_nacl || is_nacl_nonsfi) {
public += [ "embedder/platform_channel_utils_posix.h" ]
sources += [
"embedder/platform_channel_utils_posix.cc",
"system/broker_posix.cc",
"system/channel_posix.cc",
]
......
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/edk/embedder/platform_channel_utils_posix.h"
#include <stddef.h>
#include <sys/socket.h>
#include <unistd.h>
#include <utility>
#include "base/containers/queue.h"
#include "base/files/file_util.h"
#include "base/logging.h"
#include "base/posix/eintr_wrapper.h"
#include "build/build_config.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#if !defined(OS_NACL)
#include <sys/uio.h>
#endif
#if !defined(SO_PEEK_OFF)
#define SO_PEEK_OFF 42
#endif
namespace mojo {
namespace edk {
namespace {
#if !defined(OS_NACL)
bool IsRecoverableError() {
return errno == ECONNABORTED || errno == EMFILE || errno == ENFILE ||
errno == ENOMEM || errno == ENOBUFS;
}
bool GetPeerEuid(InternalPlatformHandle handle, uid_t* peer_euid) {
DCHECK(peer_euid);
#if defined(OS_MACOSX) || defined(OS_OPENBSD) || defined(OS_FREEBSD)
uid_t socket_euid;
gid_t socket_gid;
if (getpeereid(handle.handle, &socket_euid, &socket_gid) < 0) {
PLOG(ERROR) << "getpeereid " << handle.handle;
return false;
}
*peer_euid = socket_euid;
return true;
#else
struct ucred cred;
socklen_t cred_len = sizeof(cred);
if (getsockopt(handle.handle, SOL_SOCKET, SO_PEERCRED, &cred, &cred_len) <
0) {
PLOG(ERROR) << "getsockopt " << handle.handle;
return false;
}
if (static_cast<unsigned>(cred_len) < sizeof(cred)) {
NOTREACHED() << "Truncated ucred from SO_PEERCRED?";
return false;
}
*peer_euid = cred.uid;
return true;
#endif
}
bool IsPeerAuthorized(InternalPlatformHandle peer_handle) {
uid_t peer_euid;
if (!GetPeerEuid(peer_handle, &peer_euid))
return false;
if (peer_euid != geteuid()) {
DLOG(ERROR) << "Client euid is not authorised";
return false;
}
return true;
}
#endif // !defined(OS_NACL)
} // namespace
// On Linux, |SIGPIPE| is suppressed by passing |MSG_NOSIGNAL| to
// |send()|/|sendmsg()|. (There is no way of suppressing |SIGPIPE| on
// |write()|/|writev().) On Mac, |SIGPIPE| is suppressed by setting the
// |SO_NOSIGPIPE| option on the socket.
//
// Performance notes:
// - On Linux, we have to use |send()|/|sendmsg()| rather than
// |write()|/|writev()| in order to suppress |SIGPIPE|. This is okay, since
// |send()| is (slightly) faster than |write()| (!), while |sendmsg()| is
// quite comparable to |writev()|.
// - On Mac, we may use |write()|/|writev()|. Here, |write()| is considerably
// faster than |send()|, whereas |sendmsg()| is quite comparable to
// |writev()|.
// - On both platforms, an appropriate |sendmsg()|/|writev()| is considerably
// faster than two |send()|s/|write()|s.
// - Relative numbers (minimum real times from 10 runs) for one |write()| of
// 1032 bytes, one |send()| of 1032 bytes, one |writev()| of 32+1000 bytes,
// one |sendmsg()| of 32+1000 bytes, two |write()|s of 32 and 1000 bytes, two
// |send()|s of 32 and 1000 bytes:
// - Linux: 0.81 s, 0.77 s, 0.87 s, 0.89 s, 1.31 s, 1.22 s
// - Mac: 2.21 s, 2.91 s, 2.98 s, 3.08 s, 3.59 s, 4.74 s
// Flags to use with calling |send()| or |sendmsg()| (see above).
#if defined(OS_MACOSX) || defined(OS_FUCHSIA)
const int kSendFlags = 0;
#else
const int kSendFlags = MSG_NOSIGNAL;
#endif
ssize_t PlatformChannelWrite(const ScopedInternalPlatformHandle& h,
const void* bytes,
size_t num_bytes) {
DCHECK(h.is_valid());
DCHECK(bytes);
DCHECK_GT(num_bytes, 0u);
#if defined(OS_MACOSX) || defined(OS_NACL_NONSFI)
// send() is not available under NaCl-nonsfi.
return HANDLE_EINTR(write(h.get().handle, bytes, num_bytes));
#else
return send(h.get().handle, bytes, num_bytes, kSendFlags);
#endif
}
ssize_t PlatformChannelWritev(const ScopedInternalPlatformHandle& h,
struct iovec* iov,
size_t num_iov) {
DCHECK(h.is_valid());
DCHECK(iov);
DCHECK_GT(num_iov, 0u);
#if defined(OS_MACOSX)
return HANDLE_EINTR(writev(h.get().handle, iov, static_cast<int>(num_iov)));
#else
struct msghdr msg = {};
msg.msg_iov = iov;
msg.msg_iovlen = num_iov;
return HANDLE_EINTR(sendmsg(h.get().handle, &msg, kSendFlags));
#endif
}
ssize_t PlatformChannelSendmsgWithHandles(
const ScopedInternalPlatformHandle& h,
struct iovec* iov,
size_t num_iov,
const std::vector<ScopedInternalPlatformHandle>& platform_handles) {
DCHECK(iov);
DCHECK_GT(num_iov, 0u);
DCHECK(!platform_handles.empty());
DCHECK_LE(platform_handles.size(), kPlatformChannelMaxNumHandles);
char cmsg_buf[CMSG_SPACE(kPlatformChannelMaxNumHandles * sizeof(int))];
struct msghdr msg = {};
msg.msg_iov = iov;
msg.msg_iovlen = num_iov;
msg.msg_control = cmsg_buf;
msg.msg_controllen = CMSG_LEN(platform_handles.size() * sizeof(int));
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(platform_handles.size() * sizeof(int));
for (size_t i = 0; i < platform_handles.size(); i++) {
DCHECK(platform_handles[i].is_valid());
reinterpret_cast<int*>(CMSG_DATA(cmsg))[i] =
platform_handles[i].get().handle;
}
return HANDLE_EINTR(sendmsg(h.get().handle, &msg, kSendFlags));
}
ssize_t PlatformChannelRecvmsg(
const ScopedInternalPlatformHandle& h,
void* buf,
size_t num_bytes,
base::circular_deque<ScopedInternalPlatformHandle>* platform_handles,
bool block) {
DCHECK(buf);
DCHECK_GT(num_bytes, 0u);
DCHECK(platform_handles);
struct iovec iov = {buf, num_bytes};
char cmsg_buf[CMSG_SPACE(kPlatformChannelMaxNumHandles * sizeof(int))];
struct msghdr msg = {};
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = cmsg_buf;
msg.msg_controllen = sizeof(cmsg_buf);
ssize_t result =
HANDLE_EINTR(recvmsg(h.get().handle, &msg, block ? 0 : MSG_DONTWAIT));
if (result < 0)
return result;
// Success; no control messages.
if (msg.msg_controllen == 0)
return result;
DCHECK(!(msg.msg_flags & MSG_CTRUNC));
for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); cmsg;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
size_t payload_length = cmsg->cmsg_len - CMSG_LEN(0);
DCHECK_EQ(payload_length % sizeof(int), 0u);
size_t num_fds = payload_length / sizeof(int);
const int* fds = reinterpret_cast<int*>(CMSG_DATA(cmsg));
for (size_t i = 0; i < num_fds; i++) {
platform_handles->push_back(
ScopedInternalPlatformHandle(InternalPlatformHandle(fds[i])));
DCHECK(platform_handles->back().is_valid());
}
}
}
return result;
}
bool ServerAcceptConnection(const ScopedInternalPlatformHandle& server_handle,
ScopedInternalPlatformHandle* connection_handle,
bool check_peer_user) {
DCHECK(server_handle.is_valid());
connection_handle->reset();
#if defined(OS_NACL)
NOTREACHED();
return false;
#else
ScopedInternalPlatformHandle accept_handle(InternalPlatformHandle(
HANDLE_EINTR(accept(server_handle.get().handle, NULL, 0))));
if (!accept_handle.is_valid())
return IsRecoverableError();
// Verify that the IPC channel peer is running as the same user.
if (check_peer_user && !IsPeerAuthorized(accept_handle.get())) {
return true;
}
if (!base::SetNonBlocking(accept_handle.get().handle)) {
PLOG(ERROR) << "base::SetNonBlocking() failed "
<< accept_handle.get().handle;
// It's safe to keep listening on |server_handle| even if the attempt to set
// O_NONBLOCK failed on the client fd.
return true;
}
*connection_handle = std::move(accept_handle);
return true;
#endif // defined(OS_NACL)
}
} // namespace edk
} // namespace mojo
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_EMBEDDER_PLATFORM_CHANNEL_UTILS_POSIX_H_
#define MOJO_EDK_EMBEDDER_PLATFORM_CHANNEL_UTILS_POSIX_H_
#include <stddef.h>
#include <sys/types.h> // For |ssize_t|.
#include <vector>
#include "base/containers/circular_deque.h"
#include "mojo/edk/embedder/platform_handle.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/system_impl_export.h"
struct iovec; // Declared in <sys/uio.h>.
namespace mojo {
namespace edk {
class ScopedInternalPlatformHandle;
// The maximum number of handles that can be sent "at once" using
// |PlatformChannelSendmsgWithHandles()|. This must be less than the Linux
// kernel's SCM_MAX_FD which is 253.
const size_t kPlatformChannelMaxNumHandles = 128;
// Use these to write to a socket created using |PlatformChannelPair| (or
// equivalent). These are like |write()| and |writev()|, but handle |EINTR| and
// never raise |SIGPIPE|. (Note: On Mac, the suppression of |SIGPIPE| is set up
// by |PlatformChannelPair|.)
MOJO_SYSTEM_IMPL_EXPORT ssize_t
PlatformChannelWrite(const ScopedInternalPlatformHandle& h,
const void* bytes,
size_t num_bytes);
MOJO_SYSTEM_IMPL_EXPORT ssize_t
PlatformChannelWritev(const ScopedInternalPlatformHandle& h,
struct iovec* iov,
size_t num_iov);
// Writes data, and the given set of |InternalPlatformHandle|s (i.e., file
// descriptors) over the Unix domain socket given by |h| (e.g., created using
// |PlatformChannelPair()|). All the handles must be valid, and there must be at
// least one and at most |kPlatformChannelMaxNumHandles| handles. The return
// value is as for |sendmsg()|, namely -1 on failure and otherwise the number of
// bytes of data sent on success (note that this may not be all the data
// specified by |iov|). (The handles are not closed, regardless of success or
// failure.)
MOJO_SYSTEM_IMPL_EXPORT ssize_t PlatformChannelSendmsgWithHandles(
const ScopedInternalPlatformHandle& h,
struct iovec* iov,
size_t num_iov,
const std::vector<ScopedInternalPlatformHandle>& platform_handles);
// Wrapper around |recvmsg()|, which will extract any attached file descriptors
// (in the control message) to |InternalPlatformHandle|s (and append them to
// |platform_handles|). (This also handles |EINTR|.)
MOJO_SYSTEM_IMPL_EXPORT ssize_t PlatformChannelRecvmsg(
const ScopedInternalPlatformHandle& h,
void* buf,
size_t num_bytes,
base::circular_deque<ScopedInternalPlatformHandle>* platform_handles,
bool block = false);
// Returns false if |server_handle| encounters an unrecoverable error.
// Returns true if it's valid to keep listening on |server_handle|. In this
// case, it's possible that a connection wasn't successfully established; then,
// |connection_handle| will be invalid. If |check_peer_user| is True, the
// connection will be rejected if the peer is running as a different user.
MOJO_SYSTEM_IMPL_EXPORT bool ServerAcceptConnection(
const ScopedInternalPlatformHandle& server_handle,
ScopedInternalPlatformHandle* connection_handle,
bool check_peer_user = true);
} // namespace edk
} // namespace mojo
#endif // MOJO_EDK_EMBEDDER_PLATFORM_CHANNEL_UTILS_POSIX_H_
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_EMBEDDER_TRANSPORT_PROTOCOL_H_
#define MOJO_EDK_EMBEDDER_TRANSPORT_PROTOCOL_H_
#include <stdint.h>
namespace mojo {
namespace edk {
// Selects the transport protocol to use when connecting to the remote
// process.
//
// New code should generally prefer to use |kAutomatic| to allow the protocol
// to be negotiated by the two processes involved in the connection. As noted
// below, this is not possible when either one of the two processes only
// supports the legacy protocol. In such cases both ends of the connection
// must be aware and explicitly use |kLegacy|, which is the default for now.
enum class TransportProtocol : int32_t {
// Legacy transport protocol. Should only be used when connecting to
// embedders which may not support at least |kVersion0|. Deprecated.
kLegacy = -2,
// Automatically negotiate the transport protocol. This will ultimately
// select the highest protocol version supported by all of the calling
// process, the broker process (if any), and the target process.
//
// This may only be used when connecting to embedders which support at least
// |kVersion0|. Otherwise the deprecated |kLegacy| protocol must be used.
kAutomatic = -1,
// Protocol version 0. This is NOT backwards compatible with the legacy
// protocol. If this is used to establish a connection, the remote embedder
// (and the broker, if applicable) must also support at least version 0.
kVersion0 = 0,
// The maximum version supported by the current process. This must be
// updated if a new version is added.
kMaxSupportedVersion = kVersion0,
};
} // namespace edk
} // namespace mojo
#endif // MOJO_EDK_EMBEDDER_TRANSPORT_PROTOCOL_H_
......@@ -33,10 +33,8 @@ BrokerHost::BrokerHost(base::ProcessHandle client_process,
base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
channel_ = Channel::Create(
this,
ConnectionParams(TransportProtocol::kLegacy, std::move(platform_handle)),
base::ThreadTaskRunnerHandle::Get());
channel_ = Channel::Create(this, ConnectionParams(std::move(platform_handle)),
base::ThreadTaskRunnerHandle::Get());
channel_->Start();
}
......
......@@ -8,15 +8,16 @@
#include <unistd.h>
#include <utility>
#include <vector>
#include "base/logging.h"
#include "base/memory/platform_shared_memory_region.h"
#include "build/build_config.h"
#include "mojo/edk/embedder/platform_channel_utils_posix.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/broker_messages.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/platform_handle_utils.h"
#include "mojo/public/cpp/platform/socket_utils_posix.h"
namespace mojo {
namespace edk {
......@@ -31,10 +32,10 @@ Channel::MessagePtr WaitForBrokerMessage(
std::vector<ScopedInternalPlatformHandle>* incoming_handles) {
Channel::MessagePtr message(new Channel::Message(
sizeof(BrokerMessageHeader) + expected_data_size, expected_num_handles));
base::circular_deque<ScopedInternalPlatformHandle> incoming_platform_handles;
ssize_t read_result = PlatformChannelRecvmsg(
platform_handle, const_cast<void*>(message->data()),
message->data_num_bytes(), &incoming_platform_handles, true /* block */);
std::vector<base::ScopedFD> incoming_fds;
ssize_t read_result = SocketRecvmsg(
platform_handle.get().handle, const_cast<void*>(message->data()),
message->data_num_bytes(), &incoming_fds, true /* block */);
bool error = false;
if (read_result < 0) {
PLOG(ERROR) << "Recvmsg error";
......@@ -42,7 +43,7 @@ Channel::MessagePtr WaitForBrokerMessage(
} else if (static_cast<size_t>(read_result) != message->data_num_bytes()) {
LOG(ERROR) << "Invalid node channel message";
error = true;
} else if (incoming_platform_handles.size() != expected_num_handles) {
} else if (incoming_fds.size() != expected_num_handles) {
LOG(ERROR) << "Received unexpected number of handles";
error = true;
}
......@@ -57,9 +58,11 @@ Channel::MessagePtr WaitForBrokerMessage(
return nullptr;
}
incoming_handles->resize(incoming_platform_handles.size());
std::move(incoming_platform_handles.begin(), incoming_platform_handles.end(),
incoming_handles->begin());
incoming_handles->resize(incoming_fds.size());
for (size_t i = 0; i < incoming_fds.size(); ++i) {
incoming_handles->at(i) = ScopedInternalPlatformHandle(
InternalPlatformHandle(incoming_fds[i].release()));
}
return message;
}
......@@ -98,8 +101,9 @@ base::WritableSharedMemoryRegion Broker::GetWritableSharedMemoryRegion(
Channel::MessagePtr out_message = CreateBrokerMessage(
BrokerMessageType::BUFFER_REQUEST, 0, 0, &buffer_request);
buffer_request->size = num_bytes;
ssize_t write_result = PlatformChannelWrite(
sync_channel_, out_message->data(), out_message->data_num_bytes());
ssize_t write_result =
SocketWrite(sync_channel_.get().handle, out_message->data(),
out_message->data_num_bytes());
if (write_result < 0) {
PLOG(ERROR) << "Error sending sync broker message";
return base::WritableSharedMemoryRegion();
......
......@@ -12,8 +12,8 @@
#include "base/process/process_handle.h"
#include "base/task_runner.h"
#include "build/build_config.h"
#include "mojo/edk/embedder/connection_params.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/connection_params.h"
namespace mojo {
namespace edk {
......
......@@ -20,8 +20,8 @@
#include "base/message_loop/message_pump_for_io.h"
#include "base/synchronization/lock.h"
#include "base/task_runner.h"
#include "mojo/edk/embedder/platform_channel_utils_posix.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/public/cpp/platform/socket_utils_posix.h"
#if !defined(OS_NACL)
#include <sys/uio.h>
......@@ -281,13 +281,14 @@ class ChannelPosix : public Channel,
read_watcher_.reset();
base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
ScopedInternalPlatformHandle accept_fd;
ServerAcceptConnection(handle_, &accept_fd);
base::ScopedFD accept_fd;
AcceptSocketConnection(handle_.get().handle, &accept_fd);
if (!accept_fd.is_valid()) {
OnError(Error::kConnectionFailed);
return;
}
handle_ = std::move(accept_fd);
handle_ = ScopedInternalPlatformHandle(
InternalPlatformHandle(accept_fd.release()));
StartOnIOThread();
#else
NOTREACHED();
......@@ -306,8 +307,13 @@ class ChannelPosix : public Channel,
char* buffer = GetReadBuffer(&buffer_capacity);
DCHECK_GT(buffer_capacity, 0u);
ssize_t read_result = PlatformChannelRecvmsg(
handle_, buffer, buffer_capacity, &incoming_platform_handles_);
std::vector<base::ScopedFD> incoming_fds;
ssize_t read_result = SocketRecvmsg(handle_.get().handle, buffer,
buffer_capacity, &incoming_fds);
for (auto& fd : incoming_fds) {
incoming_platform_handles_.emplace_back(
InternalPlatformHandle(fd.release()));
}
if (read_result > 0) {
bytes_read = static_cast<size_t>(read_result);
......@@ -364,8 +370,11 @@ class ChannelPosix : public Channel,
if (!handles.empty()) {
iovec iov = {const_cast<void*>(message_view.data()),
message_view.data_num_bytes()};
std::vector<base::ScopedFD> fds(handles.size());
for (size_t i = 0; i < handles.size(); ++i)
fds[i] = base::ScopedFD(handles[i].release().handle);
// TODO: Handle lots of handles.
result = PlatformChannelSendmsgWithHandles(handle_, &iov, 1, handles);
result = SendmsgWithHandles(handle_.get().handle, &iov, 1, fds);
if (result >= 0) {
#if defined(OS_MACOSX)
// There is a bug on OSX which makes it dangerous to close
......@@ -393,8 +402,8 @@ class ChannelPosix : public Channel,
#endif // defined(OS_MACOSX)
}
} else {
result = PlatformChannelWrite(handle_, message_view.data(),
message_view.data_num_bytes());
result = SocketWrite(handle_.get().handle, message_view.data(),
message_view.data_num_bytes());
}
if (result < 0) {
......
......@@ -189,9 +189,8 @@ class ChannelTestShutdownAndWriteDelegate : public Channel::Delegate {
: quit_closure_(std::move(quit_closure)),
client_channel_(std::move(client_channel)),
client_thread_(std::move(client_thread)) {
channel_ = Channel::Create(
this, ConnectionParams(TransportProtocol::kLegacy, std::move(handle)),
std::move(task_runner));
channel_ = Channel::Create(this, ConnectionParams(std::move(handle)),
std::move(task_runner));
channel_->Start();
}
~ChannelTestShutdownAndWriteDelegate() override { channel_->ShutDown(); }
......@@ -248,11 +247,9 @@ TEST(ChannelTest, PeerShutdownDuringRead) {
client_thread->StartWithOptions(
base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
scoped_refptr<Channel> client_channel =
Channel::Create(nullptr,
ConnectionParams(TransportProtocol::kLegacy,
channel_pair.PassClientHandle()),
client_thread->task_runner());
scoped_refptr<Channel> client_channel = Channel::Create(
nullptr, ConnectionParams(channel_pair.PassClientHandle()),
client_thread->task_runner());
client_channel->Start();
// On the "client" IO thread, create and write a message.
......
......@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/edk/embedder/connection_params.h"
#include "mojo/edk/system/connection_params.h"
#include <utility>
......@@ -11,12 +11,8 @@
namespace mojo {
namespace edk {
ConnectionParams::ConnectionParams(TransportProtocol protocol,
ScopedInternalPlatformHandle channel)
: protocol_(protocol), channel_(std::move(channel)) {
// TODO(rockot): Support other protocols.
DCHECK_EQ(TransportProtocol::kLegacy, protocol);
}
ConnectionParams::ConnectionParams(ScopedInternalPlatformHandle channel)
: channel_(std::move(channel)) {}
ConnectionParams::ConnectionParams(ConnectionParams&& params) {
*this = std::move(params);
......
......@@ -2,13 +2,12 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_EMBEDDER_CONNECTION_PARAMS_H_
#define MOJO_EDK_EMBEDDER_CONNECTION_PARAMS_H_
#ifndef MOJO_EDK_SYSTEM_CONNECTION_PARAMS_H_
#define MOJO_EDK_SYSTEM_CONNECTION_PARAMS_H_
#include "base/macros.h"
#include "build/build_config.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/embedder/transport_protocol.h"
#include "mojo/edk/system/system_impl_export.h"
namespace mojo {
......@@ -17,20 +16,14 @@ namespace edk {
// A set of parameters used when establishing a connection to another process.
class MOJO_SYSTEM_IMPL_EXPORT ConnectionParams {
public:
// Configures an OS pipe-based connection of type |type| to the remote process
// using the given transport |protocol|.
ConnectionParams(TransportProtocol protocol,
ScopedInternalPlatformHandle channel);
explicit ConnectionParams(ScopedInternalPlatformHandle channel);
ConnectionParams(ConnectionParams&& params);
ConnectionParams& operator=(ConnectionParams&& params);
TransportProtocol protocol() const { return protocol_; }
ScopedInternalPlatformHandle TakeChannelHandle();
private:
TransportProtocol protocol_;
ScopedInternalPlatformHandle channel_;
DISALLOW_COPY_AND_ASSIGN(ConnectionParams);
......@@ -39,4 +32,4 @@ class MOJO_SYSTEM_IMPL_EXPORT ConnectionParams {
} // namespace edk
} // namespace mojo
#endif // MOJO_EDK_EMBEDDER_CONNECTION_PARAMS_H_
#endif // MOJO_EDK_SYSTEM_CONNECTION_PARAMS_H_
......@@ -1358,8 +1358,7 @@ MojoResult Core::SendInvitation(
DCHECK_EQ(removed_dispatcher.get(), invitation_dispatcher);
}
ConnectionParams connection_params(TransportProtocol::kLegacy,
std::move(endpoint_handle));
ConnectionParams connection_params(std::move(endpoint_handle));
std::vector<std::pair<std::string, ports::PortRef>> attached_ports;
InvitationDispatcher::PortMapping attached_port_map =
......@@ -1436,8 +1435,7 @@ MojoResult Core::AcceptInvitation(
options && (options->flags & MOJO_ACCEPT_INVITATION_FLAG_ISOLATED);
NodeController* const node_controller = GetNodeController();
RequestContext request_context;
ConnectionParams connection_params(TransportProtocol::kLegacy,
std::move(endpoint_handle));
ConnectionParams connection_params(std::move(endpoint_handle));
if (is_isolated) {
// For an isolated invitation, we simply mint a new port pair here and send
// one name to the remote endpoint while stashing the other in the accepted
......
......@@ -16,10 +16,10 @@
#include "base/synchronization/lock.h"
#include "base/task_runner.h"
#include "build/build_config.h"
#include "mojo/edk/embedder/connection_params.h"
#include "mojo/edk/embedder/process_error_callback.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/connection_params.h"
#include "mojo/edk/system/ports/name.h"
#include "mojo/edk/system/scoped_process_handle.h"
......
......@@ -214,8 +214,7 @@ void NodeController::AcceptBrokerClientInvitation(
CancelPendingPortMerges();
return;
}
connection_params = ConnectionParams(connection_params.protocol(),
std::move(platform_handle));
connection_params = ConnectionParams(std::move(platform_handle));
#endif
io_task_runner_->PostTask(
......@@ -340,10 +339,9 @@ void NodeController::SendBrokerClientInvitationOnIOThread(
CHECK(channel_ok);
#endif // defined(OS_WIN)
scoped_refptr<NodeChannel> channel = NodeChannel::Create(
this,
ConnectionParams(connection_params.protocol(), std::move(server_handle)),
io_task_runner_, process_error_callback);
scoped_refptr<NodeChannel> channel =
NodeChannel::Create(this, ConnectionParams(std::move(server_handle)),
io_task_runner_, process_error_callback);
#else // !defined(OS_MACOSX) && !defined(OS_NACL)
scoped_refptr<NodeChannel> channel =
......@@ -825,8 +823,7 @@ void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
}
PlatformChannelPair broker_channel;
ConnectionParams connection_params(TransportProtocol::kLegacy,
broker_channel.PassServerHandle());
ConnectionParams connection_params(broker_channel.PassServerHandle());
scoped_refptr<NodeChannel> client =
NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
ProcessErrorCallback());
......@@ -907,10 +904,9 @@ void NodeController::OnAcceptBrokerClient(
broker = inviter;
} else {
DCHECK(broker_channel.is_valid());
broker = NodeChannel::Create(
this,
ConnectionParams(TransportProtocol::kLegacy, std::move(broker_channel)),
io_task_runner_, ProcessErrorCallback());
broker =
NodeChannel::Create(this, ConnectionParams(std::move(broker_channel)),
io_task_runner_, ProcessErrorCallback());
AddPeer(broker_name, broker, true /* start_channel */);
}
......@@ -1049,10 +1045,9 @@ void NodeController::OnIntroduce(const ports::NodeName& from_node,
return;
}
scoped_refptr<NodeChannel> channel = NodeChannel::Create(
this,
ConnectionParams(TransportProtocol::kLegacy, std::move(channel_handle)),
io_task_runner_, ProcessErrorCallback());
scoped_refptr<NodeChannel> channel =
NodeChannel::Create(this, ConnectionParams(std::move(channel_handle)),
io_task_runner_, ProcessErrorCallback());
DVLOG(1) << "Adding new peer " << name << " via broker introduction.";
AddPeer(name, channel, true /* start_channel */);
......
......@@ -24,10 +24,10 @@
#include "base/logging.h"
#include "base/macros.h"
#include "build/build_config.h"
#include "mojo/edk/embedder/platform_channel_utils_posix.h"
#include "mojo/edk/embedder/platform_handle.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/test/test_utils.h"
#include "mojo/public/cpp/platform/socket_utils_posix.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
......@@ -91,7 +91,7 @@ TEST_F(PlatformChannelPairPosixTest, NoSigPipe) {
PLOG(WARNING) << "read (expected 0 for EOF)";
// Test our replacement for |write()|/|send()|.
result = PlatformChannelWrite(server_handle, kHello, sizeof(kHello));
result = SocketWrite(server_handle.get().handle, kHello, sizeof(kHello));
EXPECT_EQ(-1, result);
if (errno != EPIPE)
PLOG(WARNING) << "write (expected EPIPE)";
......@@ -99,7 +99,7 @@ TEST_F(PlatformChannelPairPosixTest, NoSigPipe) {
// Test our replacement for |writev()|/|sendv()|.
struct iovec iov[2] = {{const_cast<char*>(kHello), sizeof(kHello)},
{const_cast<char*>(kHello), sizeof(kHello)}};
result = PlatformChannelWritev(server_handle, iov, 2);
result = SocketWritev(server_handle.get().handle, iov, 2);
EXPECT_EQ(-1, result);
if (errno != EPIPE)
PLOG(WARNING) << "write (expected EPIPE)";
......@@ -114,18 +114,18 @@ TEST_F(PlatformChannelPairPosixTest, SendReceiveData) {
std::string send_string(1 << i, 'A' + i);
EXPECT_EQ(static_cast<ssize_t>(send_string.size()),
PlatformChannelWrite(server_handle, send_string.data(),
send_string.size()));
SocketWrite(server_handle.get().handle, send_string.data(),
send_string.size()));
WaitReadable(client_handle.get());
char buf[10000] = {};
base::circular_deque<ScopedInternalPlatformHandle> received_handles;
ssize_t result = PlatformChannelRecvmsg(client_handle, buf, sizeof(buf),
&received_handles);
std::vector<base::ScopedFD> received_fds;
ssize_t result = SocketRecvmsg(client_handle.get().handle, buf, sizeof(buf),
&received_fds);
EXPECT_EQ(static_cast<ssize_t>(send_string.size()), result);
EXPECT_EQ(send_string, std::string(buf, static_cast<size_t>(result)));
EXPECT_TRUE(received_handles.empty());
EXPECT_TRUE(received_fds.empty());
}
}
......@@ -139,53 +139,43 @@ TEST_F(PlatformChannelPairPosixTest, SendReceiveFDs) {
ScopedInternalPlatformHandle server_handle = channel_pair.PassServerHandle();
ScopedInternalPlatformHandle client_handle = channel_pair.PassClientHandle();
// Reduce the number of FDs opened on OS X to avoid test flake.
#if defined(OS_MACOSX)
const size_t kNumHandlesToSend = kPlatformChannelMaxNumHandles / 2;
#else
const size_t kNumHandlesToSend = kPlatformChannelMaxNumHandles;
#endif
const size_t kNumHandlesToSend = 64;
for (size_t i = 1; i < kNumHandlesToSend; i++) {
// Make |i| files, with the j-th file consisting of j copies of the digit
// |c|.
const char c = '0' + (i % 10);
std::vector<ScopedInternalPlatformHandle> platform_handles;
std::vector<base::ScopedFD> fds;
for (size_t j = 1; j <= i; j++) {
base::FilePath unused;
base::ScopedFILE fp(
base::CreateAndOpenTemporaryFileInDir(temp_dir.GetPath(), &unused));
ASSERT_TRUE(fp);
ASSERT_EQ(j, fwrite(std::string(j, c).data(), 1, j, fp.get()));
platform_handles.emplace_back(InternalPlatformHandle(
test::PlatformHandleFromFILE(std::move(fp)).ReleaseFD()));
ASSERT_TRUE(platform_handles.back().is_valid());
fds.emplace_back(test::PlatformHandleFromFILE(std::move(fp)).TakeFD());
ASSERT_TRUE(fds.back().is_valid());
}
// Send the FDs (+ "hello").
struct iovec iov = {const_cast<char*>(kHello), sizeof(kHello)};
// We assume that the |sendmsg()| actually sends all the data.
EXPECT_EQ(static_cast<ssize_t>(sizeof(kHello)),
PlatformChannelSendmsgWithHandles(server_handle, &iov, 1,
std::move(platform_handles)));
SendmsgWithHandles(server_handle.get().handle, &iov, 1,
std::move(fds)));
WaitReadable(client_handle.get());
char buf[10000] = {};
base::circular_deque<ScopedInternalPlatformHandle> received_handles;
std::vector<base::ScopedFD> received_fds;
// We assume that the |recvmsg()| actually reads all the data.
EXPECT_EQ(static_cast<ssize_t>(sizeof(kHello)),
PlatformChannelRecvmsg(client_handle, buf, sizeof(buf),
&received_handles));
SocketRecvmsg(client_handle.get().handle, buf, sizeof(buf),
&received_fds));
EXPECT_STREQ(kHello, buf);
EXPECT_EQ(i, received_handles.size());
EXPECT_EQ(i, received_fds.size());
for (size_t j = 0; j < received_handles.size(); j++) {
for (size_t j = 0; j < received_fds.size(); j++) {
base::ScopedFILE fp(test::FILEFromPlatformHandle(
PlatformHandle(
base::ScopedFD(received_handles.front().release().handle)),
"rb"));
received_handles.pop_front();
PlatformHandle(std::move(received_fds[j])), "rb"));
ASSERT_TRUE(fp);
rewind(fp.get());
char read_buf[kNumHandlesToSend];
......@@ -215,39 +205,32 @@ TEST_F(PlatformChannelPairPosixTest, AppendReceivedFDs) {
ASSERT_TRUE(fp);
ASSERT_EQ(file_contents.size(),
fwrite(file_contents.data(), 1, file_contents.size(), fp.get()));
std::vector<ScopedInternalPlatformHandle> platform_handles(1);
platform_handles[0].reset(InternalPlatformHandle(
test::PlatformHandleFromFILE(std::move(fp)).ReleaseFD()));
ASSERT_TRUE(platform_handles.back().is_valid());
std::vector<base::ScopedFD> fds(1);
fds[0] = test::PlatformHandleFromFILE(std::move(fp)).TakeFD();
ASSERT_TRUE(fds[0].is_valid());
// Send the FD (+ "hello").
struct iovec iov = {const_cast<char*>(kHello), sizeof(kHello)};
// We assume that the |sendmsg()| actually sends all the data.
EXPECT_EQ(static_cast<ssize_t>(sizeof(kHello)),
PlatformChannelSendmsgWithHandles(server_handle, &iov, 1,
std::move(platform_handles)));
SendmsgWithHandles(server_handle.get().handle, &iov, 1, fds));
}
WaitReadable(client_handle.get());
// Start with an invalid handle in the vector.
base::circular_deque<ScopedInternalPlatformHandle> received_handles;
received_handles.push_back(ScopedInternalPlatformHandle());
std::vector<base::ScopedFD> fds;
char buf[100] = {};
// We assume that the |recvmsg()| actually reads all the data.
EXPECT_EQ(static_cast<ssize_t>(sizeof(kHello)),
PlatformChannelRecvmsg(client_handle, buf, sizeof(buf),
&received_handles));
SocketRecvmsg(client_handle.get().handle, buf, sizeof(buf), &fds));
EXPECT_STREQ(kHello, buf);
ASSERT_EQ(2u, received_handles.size());
EXPECT_FALSE(received_handles[0].is_valid());
EXPECT_TRUE(received_handles[1].is_valid());
ASSERT_EQ(1u, fds.size());
EXPECT_TRUE(fds[0].is_valid());
{
base::ScopedFILE fp(test::FILEFromPlatformHandle(
PlatformHandle(base::ScopedFD(received_handles[1].release().handle)),
"rb"));
base::ScopedFILE fp(
test::FILEFromPlatformHandle(PlatformHandle(std::move(fds[0])), "rb"));
ASSERT_TRUE(fp);
rewind(fp.get());
char read_buf[100];
......
......@@ -77,6 +77,29 @@ constexpr size_t kMaxSendmsgHandles = 128;
} // namespace
ssize_t SocketWrite(base::PlatformFile socket,
const void* bytes,
size_t num_bytes) {
#if defined(OS_MACOSX) || defined(OS_NACL_NONSFI)
return HANDLE_EINTR(write(socket, bytes, num_bytes));
#else
return send(socket, bytes, num_bytes, kSendmsgFlags);
#endif
}
ssize_t SocketWritev(base::PlatformFile socket,
struct iovec* iov,
size_t num_iov) {
#if defined(OS_MACOSX)
return HANDLE_EINTR(writev(socket, iov, static_cast<int>(num_iov)));
#else
struct msghdr msg = {};
msg.msg_iov = iov;
msg.msg_iovlen = num_iov;
return HANDLE_EINTR(sendmsg(socket, &msg, kSendmsgFlags));
#endif
}
ssize_t SendmsgWithHandles(base::PlatformFile socket,
struct iovec* iov,
size_t num_iov,
......@@ -103,6 +126,47 @@ ssize_t SendmsgWithHandles(base::PlatformFile socket,
return HANDLE_EINTR(sendmsg(socket, &msg, kSendmsgFlags));
}
ssize_t SocketRecvmsg(base::PlatformFile socket,
void* buf,
size_t num_bytes,
std::vector<base::ScopedFD>* descriptors,
bool block) {
struct iovec iov = {buf, num_bytes};
char cmsg_buf[CMSG_SPACE(kMaxSendmsgHandles * sizeof(int))];
struct msghdr msg = {};
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = cmsg_buf;
msg.msg_controllen = sizeof(cmsg_buf);
ssize_t result =
HANDLE_EINTR(recvmsg(socket, &msg, block ? 0 : MSG_DONTWAIT));
if (result < 0)
return result;
if (msg.msg_controllen == 0)
return result;
DCHECK(!(msg.msg_flags & MSG_CTRUNC));
descriptors->clear();
for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); cmsg;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
size_t payload_length = cmsg->cmsg_len - CMSG_LEN(0);
DCHECK_EQ(payload_length % sizeof(int), 0u);
size_t num_fds = payload_length / sizeof(int);
const int* fds = reinterpret_cast<int*>(CMSG_DATA(cmsg));
for (size_t i = 0; i < num_fds; ++i) {
base::ScopedFD fd(fds[i]);
DCHECK(fd.is_valid());
descriptors->emplace_back(std::move(fd));
}
}
}
return result;
}
bool AcceptSocketConnection(base::PlatformFile server_fd,
base::ScopedFD* connection_fd,
bool check_peer_user) {
......
......@@ -23,6 +23,18 @@ namespace mojo {
// and there wasn't a much better home for them. Consider moving them to
// src/base or something.
// Like |write()| but handles |EINTR| and never raises |SIGPIPE|.
COMPONENT_EXPORT(MOJO_CPP_PLATFORM)
ssize_t SocketWrite(base::PlatformFile socket,
const void* bytes,
size_t num_bytes);
// Like |writev()| but handles |EINTR| and never raises |SIGPIPE|.
COMPONENT_EXPORT(MOJO_CPP_PLATFORM)
ssize_t SocketWritev(base::PlatformFile socket,
struct iovec* iov,
size_t num_iov);
// Wrapper around |sendmsg()| which makes it convenient to send attached file
// descriptors. All entries in |descriptors| must be valid and |descriptors|
// must be non-empty.
......@@ -39,6 +51,14 @@ ssize_t SendmsgWithHandles(base::PlatformFile socket,
size_t num_iov,
const std::vector<base::ScopedFD>& descriptors);
// Like |recvmsg()|, but handles |EINTR|.
COMPONENT_EXPORT(MOJO_CPP_PLATFORM)
ssize_t SocketRecvmsg(base::PlatformFile socket,
void* buf,
size_t num_bytes,
std::vector<base::ScopedFD>* descriptors,
bool block = false);
// Treats |server_fd| as a socket listening for new connections. Returns |false|
// if it encounters an unrecoverable error.
//
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment