Commit e6bc5ffa authored by Brian Geffon's avatar Brian Geffon Committed by Chromium LUCI CQ

Mojo: Add a ChannelPosix header so it can be extended

This CL adds a declaration for ChannelPosix so it can be extended. This
will keep things cleaner when we add a Fast PosixChannel, otherwise
things would become unweildy in a single file.

Bug: b:173022729
Change-Id: Ie4ed21b885a62e4d755ae393484aa2717e0668b3
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2586045
Commit-Queue: Ken Rockot <rockot@google.com>
Auto-Submit: Brian Geffon <bgeffon@chromium.org>
Reviewed-by: default avatarKen Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/master@{#840261}
parent 861cfb97
...@@ -121,6 +121,7 @@ template("core_impl_source_set") { ...@@ -121,6 +121,7 @@ template("core_impl_source_set") {
sources += [ sources += [
"broker_posix.cc", "broker_posix.cc",
"channel_posix.cc", "channel_posix.cc",
"channel_posix.h",
] ]
} }
} }
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#include "mojo/core/channel.h" #include "mojo/core/channel_posix.h"
#include <errno.h> #include <errno.h>
#include <sys/socket.h> #include <sys/socket.h>
...@@ -37,12 +37,12 @@ namespace mojo { ...@@ -37,12 +37,12 @@ namespace mojo {
namespace core { namespace core {
namespace { namespace {
#if !defined(OS_NACL) #if !defined(OS_NACL)
std::atomic<bool> g_use_writev{false}; std::atomic<bool> g_use_writev{false};
#endif #endif
const size_t kMaxBatchReadCapacity = 256 * 1024; const size_t kMaxBatchReadCapacity = 256 * 1024;
} // namespace
// A view over a Channel::Message object. The write queue uses these since // A view over a Channel::Message object. The write queue uses these since
// large messages may need to be sent in chunks. // large messages may need to be sent in chunks.
...@@ -110,176 +110,168 @@ class MessageView { ...@@ -110,176 +110,168 @@ class MessageView {
DISALLOW_COPY_AND_ASSIGN(MessageView); DISALLOW_COPY_AND_ASSIGN(MessageView);
}; };
class ChannelPosix : public Channel, ChannelPosix::ChannelPosix(
public base::CurrentThread::DestructionObserver, Delegate* delegate,
public base::MessagePumpForIO::FdWatcher { ConnectionParams connection_params,
public: HandlePolicy handle_policy,
ChannelPosix(Delegate* delegate, scoped_refptr<base::SingleThreadTaskRunner> io_task_runner)
ConnectionParams connection_params, : Channel(delegate, handle_policy),
HandlePolicy handle_policy, self_(this),
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner) io_task_runner_(io_task_runner) {
: Channel(delegate, handle_policy), if (connection_params.server_endpoint().is_valid())
self_(this), server_ = connection_params.TakeServerEndpoint();
io_task_runner_(io_task_runner) { else
if (connection_params.server_endpoint().is_valid()) socket_ = connection_params.TakeEndpoint().TakePlatformHandle().TakeFD();
server_ = connection_params.TakeServerEndpoint();
else CHECK(server_.is_valid() || socket_.is_valid());
socket_ = connection_params.TakeEndpoint().TakePlatformHandle().TakeFD(); }
CHECK(server_.is_valid() || socket_.is_valid());
}
void Start() override { ChannelPosix::~ChannelPosix() {
if (io_task_runner_->RunsTasksInCurrentSequence()) { DCHECK(!read_watcher_);
StartOnIOThread(); DCHECK(!write_watcher_);
} else { }
io_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ChannelPosix::StartOnIOThread, this));
}
}
void ShutDownImpl() override { void ChannelPosix::Start() {
// Always shut down asynchronously when called through the public interface. if (io_task_runner_->RunsTasksInCurrentSequence()) {
StartOnIOThread();
} else {
io_task_runner_->PostTask( io_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ChannelPosix::ShutDownOnIOThread, this)); FROM_HERE, base::BindOnce(&ChannelPosix::StartOnIOThread, this));
} }
}
void Write(MessagePtr message) override { void ChannelPosix::ShutDownImpl() {
UMA_HISTOGRAM_COUNTS_100000("Mojo.Channel.WriteMessageSize", // Always shut down asynchronously when called through the public interface.
message->data_num_bytes()); io_task_runner_->PostTask(
UMA_HISTOGRAM_COUNTS_100("Mojo.Channel.WriteMessageHandles", FROM_HERE, base::BindOnce(&ChannelPosix::ShutDownOnIOThread, this));
message->NumHandlesForTransit()); }
bool write_error = false; void ChannelPosix::Write(MessagePtr message) {
bool queued = false; UMA_HISTOGRAM_COUNTS_100000("Mojo.Channel.WriteMessageSize",
{ message->data_num_bytes());
base::AutoLock lock(write_lock_); UMA_HISTOGRAM_COUNTS_100("Mojo.Channel.WriteMessageHandles",
if (reject_writes_) message->NumHandlesForTransit());
return;
if (outgoing_messages_.empty()) { bool write_error = false;
if (!WriteNoLock(MessageView(std::move(message), 0))) bool queued = false;
reject_writes_ = write_error = true; {
} else { base::AutoLock lock(write_lock_);
outgoing_messages_.emplace_back(std::move(message), 0); if (reject_writes_)
} return;
queued = !outgoing_messages_.empty(); if (outgoing_messages_.empty()) {
} if (!WriteNoLock(MessageView(std::move(message), 0)))
if (write_error) { reject_writes_ = write_error = true;
// Invoke OnWriteError() asynchronously on the IO thread, in case Write() } else {
// was called by the delegate, in which case we should not re-enter it. outgoing_messages_.emplace_back(std::move(message), 0);
io_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ChannelPosix::OnWriteError, this,
Error::kDisconnected));
} }
UMA_HISTOGRAM_BOOLEAN("Mojo.Channel.WriteQueued", queued); queued = !outgoing_messages_.empty();
} }
if (write_error) {
void LeakHandle() override { // Invoke OnWriteError() asynchronously on the IO thread, in case Write()
DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); // was called by the delegate, in which case we should not re-enter it.
leak_handle_ = true; io_task_runner_->PostTask(FROM_HERE,
base::BindOnce(&ChannelPosix::OnWriteError, this,
Error::kDisconnected));
} }
UMA_HISTOGRAM_BOOLEAN("Mojo.Channel.WriteQueued", queued);
}
bool GetReadPlatformHandles(const void* payload, void ChannelPosix::LeakHandle() {
size_t payload_size, DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
size_t num_handles, leak_handle_ = true;
const void* extra_header, }
size_t extra_header_size,
std::vector<PlatformHandle>* handles,
bool* deferred) override {
if (num_handles > std::numeric_limits<uint16_t>::max())
return false;
if (incoming_fds_.size() < num_handles)
return true;
handles->resize(num_handles);
for (size_t i = 0; i < num_handles; ++i) {
handles->at(i) = PlatformHandle(std::move(incoming_fds_.front()));
incoming_fds_.pop_front();
}
bool ChannelPosix::GetReadPlatformHandles(const void* payload,
size_t payload_size,
size_t num_handles,
const void* extra_header,
size_t extra_header_size,
std::vector<PlatformHandle>* handles,
bool* deferred) {
if (num_handles > std::numeric_limits<uint16_t>::max())
return false;
if (incoming_fds_.size() < num_handles)
return true; return true;
}
private: handles->resize(num_handles);
~ChannelPosix() override { for (size_t i = 0; i < num_handles; ++i) {
DCHECK(!read_watcher_); handles->at(i) = PlatformHandle(std::move(incoming_fds_.front()));
DCHECK(!write_watcher_); incoming_fds_.pop_front();
} }
void StartOnIOThread() { return true;
DCHECK(!read_watcher_); }
DCHECK(!write_watcher_);
read_watcher_.reset(
new base::MessagePumpForIO::FdWatchController(FROM_HERE));
base::CurrentThread::Get()->AddDestructionObserver(this);
if (server_.is_valid()) {
base::CurrentIOThread::Get()->WatchFileDescriptor(
server_.platform_handle().GetFD().get(), false /* persistent */,
base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this);
} else {
write_watcher_.reset(
new base::MessagePumpForIO::FdWatchController(FROM_HERE));
base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_.get(), true /* persistent */,
base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this);
base::AutoLock lock(write_lock_);
FlushOutgoingMessagesNoLock();
}
}
void WaitForWriteOnIOThread() { void ChannelPosix::StartOnIOThread() {
DCHECK(!read_watcher_);
DCHECK(!write_watcher_);
read_watcher_.reset(new base::MessagePumpForIO::FdWatchController(FROM_HERE));
base::CurrentThread::Get()->AddDestructionObserver(this);
if (server_.is_valid()) {
base::CurrentIOThread::Get()->WatchFileDescriptor(
server_.platform_handle().GetFD().get(), false /* persistent */,
base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this);
} else {
write_watcher_.reset(
new base::MessagePumpForIO::FdWatchController(FROM_HERE));
base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_.get(), true /* persistent */,
base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this);
base::AutoLock lock(write_lock_); base::AutoLock lock(write_lock_);
WaitForWriteOnIOThreadNoLock(); FlushOutgoingMessagesNoLock();
} }
}
void WaitForWriteOnIOThreadNoLock() { void ChannelPosix::WaitForWriteOnIOThread() {
if (pending_write_) base::AutoLock lock(write_lock_);
return; WaitForWriteOnIOThreadNoLock();
if (!write_watcher_) }
return;
if (io_task_runner_->RunsTasksInCurrentSequence()) {
pending_write_ = true;
base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_.get(), false /* persistent */,
base::MessagePumpForIO::WATCH_WRITE, write_watcher_.get(), this);
} else {
io_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&ChannelPosix::WaitForWriteOnIOThread, this));
}
}
void ShutDownOnIOThread() { void ChannelPosix::WaitForWriteOnIOThreadNoLock() {
base::CurrentThread::Get()->RemoveDestructionObserver(this); if (pending_write_)
return;
if (!write_watcher_)
return;
if (io_task_runner_->RunsTasksInCurrentSequence()) {
pending_write_ = true;
base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_.get(), false /* persistent */,
base::MessagePumpForIO::WATCH_WRITE, write_watcher_.get(), this);
} else {
io_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ChannelPosix::WaitForWriteOnIOThread, this));
}
}
read_watcher_.reset(); void ChannelPosix::ShutDownOnIOThread() {
write_watcher_.reset(); base::CurrentThread::Get()->RemoveDestructionObserver(this);
if (leak_handle_) {
ignore_result(socket_.release()); read_watcher_.reset();
server_.TakePlatformHandle().release(); write_watcher_.reset();
} else { if (leak_handle_) {
socket_.reset(); ignore_result(socket_.release());
ignore_result(server_.TakePlatformHandle()); server_.TakePlatformHandle().release();
} } else {
socket_.reset();
ignore_result(server_.TakePlatformHandle());
}
#if defined(OS_IOS) #if defined(OS_IOS)
fds_to_close_.clear(); fds_to_close_.clear();
#endif #endif
// May destroy the |this| if it was the last reference. // May destroy the |this| if it was the last reference.
self_ = nullptr; self_ = nullptr;
} }
// base::CurrentThread::DestructionObserver: void ChannelPosix::WillDestroyCurrentMessageLoop() {
void WillDestroyCurrentMessageLoop() override { DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); if (self_)
if (self_) ShutDownOnIOThread();
ShutDownOnIOThread(); }
}
// base::MessagePumpForIO::FdWatcher: void ChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
void OnFileCanReadWithoutBlocking(int fd) override { if (server_.is_valid()) {
if (server_.is_valid()) { CHECK_EQ(fd, server_.platform_handle().GetFD().get());
CHECK_EQ(fd, server_.platform_handle().GetFD().get());
#if !defined(OS_NACL) #if !defined(OS_NACL)
read_watcher_.reset(); read_watcher_.reset();
base::CurrentThread::Get()->RemoveDestructionObserver(this); base::CurrentThread::Get()->RemoveDestructionObserver(this);
...@@ -295,7 +287,7 @@ class ChannelPosix : public Channel, ...@@ -295,7 +287,7 @@ class ChannelPosix : public Channel,
NOTREACHED(); NOTREACHED();
#endif #endif
return; return;
} }
CHECK_EQ(fd, socket_.get()); CHECK_EQ(fd, socket_.get());
bool validation_error = false; bool validation_error = false;
...@@ -343,47 +335,47 @@ class ChannelPosix : public Channel, ...@@ -343,47 +335,47 @@ class ChannelPosix : public Channel,
else else
OnError(Error::kDisconnected); OnError(Error::kDisconnected);
} }
} }
void OnFileCanWriteWithoutBlocking(int fd) override { void ChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
bool write_error = false; bool write_error = false;
{ {
base::AutoLock lock(write_lock_); base::AutoLock lock(write_lock_);
pending_write_ = false; pending_write_ = false;
if (!FlushOutgoingMessagesNoLock()) if (!FlushOutgoingMessagesNoLock())
reject_writes_ = write_error = true; reject_writes_ = write_error = true;
}
if (write_error)
OnWriteError(Error::kDisconnected);
} }
if (write_error)
OnWriteError(Error::kDisconnected);
}
// Attempts to write a message directly to the channel. If the full message // Attempts to write a message directly to the channel. If the full message
// cannot be written, it's queued and a wait is initiated to write the message // cannot be written, it's queued and a wait is initiated to write the message
// ASAP on the I/O thread. // ASAP on the I/O thread.
bool WriteNoLock(MessageView message_view) { bool ChannelPosix::WriteNoLock(MessageView message_view) {
if (server_.is_valid()) { if (server_.is_valid()) {
outgoing_messages_.emplace_front(std::move(message_view)); outgoing_messages_.emplace_front(std::move(message_view));
return true; return true;
} }
size_t bytes_written = 0; size_t bytes_written = 0;
std::vector<PlatformHandleInTransit> handles = message_view.TakeHandles(); std::vector<PlatformHandleInTransit> handles = message_view.TakeHandles();
size_t num_handles = handles.size(); size_t num_handles = handles.size();
size_t handles_written = message_view.num_handles_sent(); size_t handles_written = message_view.num_handles_sent();
do { do {
message_view.advance_data_offset(bytes_written); message_view.advance_data_offset(bytes_written);
ssize_t result; ssize_t result;
if (handles_written < num_handles) { if (handles_written < num_handles) {
iovec iov = {const_cast<void*>(message_view.data()), iovec iov = {const_cast<void*>(message_view.data()),
message_view.data_num_bytes()}; message_view.data_num_bytes()};
size_t num_handles_to_send = size_t num_handles_to_send =
std::min(num_handles - handles_written, kMaxSendmsgHandles); std::min(num_handles - handles_written, kMaxSendmsgHandles);
std::vector<base::ScopedFD> fds(num_handles_to_send); std::vector<base::ScopedFD> fds(num_handles_to_send);
for (size_t i = 0; i < num_handles_to_send; ++i) for (size_t i = 0; i < num_handles_to_send; ++i)
fds[i] = handles[i + handles_written].TakeHandle().TakeFD(); fds[i] = handles[i + handles_written].TakeHandle().TakeFD();
// TODO: Handle lots of handles. // TODO: Handle lots of handles.
result = SendmsgWithHandles(socket_.get(), &iov, 1, fds); result = SendmsgWithHandles(socket_.get(), &iov, 1, fds);
if (result >= 0) { if (result >= 0) {
#if defined(OS_IOS) #if defined(OS_IOS)
// There is a bug in XNU which makes it dangerous to close // There is a bug in XNU which makes it dangerous to close
// a file descriptor while it is in transit. So instead we // a file descriptor while it is in transit. So instead we
...@@ -408,18 +400,18 @@ class ChannelPosix : public Channel, ...@@ -408,18 +400,18 @@ class ChannelPosix : public Channel,
handles_written += num_handles_to_send; handles_written += num_handles_to_send;
DCHECK_LE(handles_written, num_handles); DCHECK_LE(handles_written, num_handles);
message_view.set_num_handles_sent(handles_written); message_view.set_num_handles_sent(handles_written);
} else {
// Message transmission failed, so pull the FDs back into |handles|
// so they can be held by the Message again.
for (size_t i = 0; i < fds.size(); ++i) {
handles[i + handles_written] =
PlatformHandleInTransit(PlatformHandle(std::move(fds[i])));
}
}
} else { } else {
result = SocketWrite(socket_.get(), message_view.data(), // Message transmission failed, so pull the FDs back into |handles|
message_view.data_num_bytes()); // so they can be held by the Message again.
for (size_t i = 0; i < fds.size(); ++i) {
handles[i + handles_written] =
PlatformHandleInTransit(PlatformHandle(std::move(fds[i])));
}
} }
} else {
result = SocketWrite(socket_.get(), message_view.data(),
message_view.data_num_bytes());
}
if (result < 0) { if (result < 0) {
if (errno != EAGAIN && if (errno != EAGAIN &&
...@@ -450,13 +442,13 @@ class ChannelPosix : public Channel, ...@@ -450,13 +442,13 @@ class ChannelPosix : public Channel,
} }
bytes_written = static_cast<size_t>(result); bytes_written = static_cast<size_t>(result);
} while (handles_written < num_handles || } while (handles_written < num_handles ||
bytes_written < message_view.data_num_bytes()); bytes_written < message_view.data_num_bytes());
return FlushOutgoingMessagesNoLock(); return FlushOutgoingMessagesNoLock();
} }
bool FlushOutgoingMessagesNoLock() { bool ChannelPosix::FlushOutgoingMessagesNoLock() {
#if !defined(OS_NACL) #if !defined(OS_NACL)
if (g_use_writev) if (g_use_writev)
return FlushOutgoingMessagesWritevNoLock(); return FlushOutgoingMessagesWritevNoLock();
...@@ -491,236 +483,198 @@ class ChannelPosix : public Channel, ...@@ -491,236 +483,198 @@ class ChannelPosix : public Channel,
} }
return true; return true;
} }
#if !defined(OS_NACL)
bool WriteOutgoingMessagesWithWritev() {
if (outgoing_messages_.empty())
return true;
// If all goes well we can submit a writev(2) with a iovec of size void ChannelPosix::OnWriteError(Error error) {
// outgoing_messages_.size() but never more than the kernel allows. DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
size_t num_messages_to_send = DCHECK(reject_writes_);
std::min<size_t>(IOV_MAX, outgoing_messages_.size());
iovec iov[num_messages_to_send];
memset(&iov[0], 0, sizeof(iov));
// Populate the iov.
size_t num_iovs_set = 0;
for (auto it = outgoing_messages_.begin();
num_iovs_set < num_messages_to_send; ++it) {
if (it->num_handles_remaining() > 0) {
// We can't send handles with writev(2) so stop at this message.
break;
}
iov[num_iovs_set].iov_base = const_cast<void*>(it->data()); if (error == Error::kDisconnected) {
iov[num_iovs_set].iov_len = it->data_num_bytes(); // If we can't write because the pipe is disconnected then continue
num_iovs_set++; // reading to fetch any in-flight messages, relying on end-of-stream to
// signal the actual disconnection.
if (read_watcher_) {
write_watcher_.reset();
return;
} }
}
UMA_HISTOGRAM_COUNTS_1000("Mojo.Channel.WritevBatchedMessages", OnError(error);
num_iovs_set); }
size_t iov_offset = 0;
while (iov_offset < num_iovs_set) {
ssize_t bytes_written = SocketWritev(socket_.get(), &iov[iov_offset],
num_iovs_set - iov_offset);
if (bytes_written < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
WaitForWriteOnIOThreadNoLock();
return true;
}
return false;
}
// Let's walk our outgoing_messages_ popping off outgoing_messages_ #if !defined(OS_NACL)
// that were fully written. bool ChannelPosix::WriteOutgoingMessagesWithWritev() {
size_t bytes_remaining = bytes_written; if (outgoing_messages_.empty())
while (bytes_remaining > 0) { return true;
if (bytes_remaining >= outgoing_messages_.front().data_num_bytes()) {
// This message was fully written. // If all goes well we can submit a writev(2) with a iovec of size
bytes_remaining -= outgoing_messages_.front().data_num_bytes(); // outgoing_messages_.size() but never more than the kernel allows.
outgoing_messages_.pop_front(); size_t num_messages_to_send =
iov_offset++; std::min<size_t>(IOV_MAX, outgoing_messages_.size());
} else { iovec iov[num_messages_to_send];
// This message was partially written, account for what was memset(&iov[0], 0, sizeof(iov));
// already written.
outgoing_messages_.front().advance_data_offset(bytes_remaining); // Populate the iov.
bytes_remaining = 0; size_t num_iovs_set = 0;
for (auto it = outgoing_messages_.begin();
// Update the iov too as we will call writev again. num_iovs_set < num_messages_to_send; ++it) {
iov[iov_offset].iov_base = if (it->num_handles_remaining() > 0) {
const_cast<void*>(outgoing_messages_.front().data()); // We can't send handles with writev(2) so stop at this message.
iov[iov_offset].iov_len = outgoing_messages_.front().data_num_bytes(); break;
}
}
} }
return true; iov[num_iovs_set].iov_base = const_cast<void*>(it->data());
iov[num_iovs_set].iov_len = it->data_num_bytes();
num_iovs_set++;
} }
// FlushOutgoingMessagesWritevNoLock is equivalent to UMA_HISTOGRAM_COUNTS_1000("Mojo.Channel.WritevBatchedMessages", num_iovs_set);
// FlushOutgoingMessagesNoLock except it looks for opportunities to make only
// a single write syscall by using writev(2) instead of write(2). In most
// situations this is very straight forward; however, when a handle needs to
// be transferred we cannot use writev(2) and instead will fall back to the
// standard write.
bool FlushOutgoingMessagesWritevNoLock() {
do {
// If the first message contains a handle we will flush it first using a
// standard write, we will also use the standard write if we only have a
// single message.
while (!outgoing_messages_.empty() &&
(outgoing_messages_.front().num_handles_remaining() > 0 ||
outgoing_messages_.size() == 1)) {
MessageView message = std::move(outgoing_messages_.front());
outgoing_messages_.pop_front();
size_t messages_before_write = outgoing_messages_.size();
if (!WriteNoLock(std::move(message)))
return false;
if (outgoing_messages_.size() > messages_before_write) { size_t iov_offset = 0;
// It was re-queued by WriteNoLock. while (iov_offset < num_iovs_set) {
return true; ssize_t bytes_written = SocketWritev(socket_.get(), &iov[iov_offset],
} num_iovs_set - iov_offset);
if (bytes_written < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
WaitForWriteOnIOThreadNoLock();
return true;
} }
return false;
}
if (!WriteOutgoingMessagesWithWritev()) // Let's walk our outgoing_messages_ popping off outgoing_messages_
return false; // that were fully written.
size_t bytes_remaining = bytes_written;
// At this point if we have more messages then it's either because we while (bytes_remaining > 0) {
// exceeded IOV_MAX OR it's because we ran into a FileHandle. Either way if (bytes_remaining >= outgoing_messages_.front().data_num_bytes()) {
// we just start the process all over again and it will flush any // This message was fully written.
// FileHandles before attempting writev(2) again. bytes_remaining -= outgoing_messages_.front().data_num_bytes();
} while (!outgoing_messages_.empty()); outgoing_messages_.pop_front();
return true; iov_offset++;
} else {
// This message was partially written, account for what was
// already written.
outgoing_messages_.front().advance_data_offset(bytes_remaining);
bytes_remaining = 0;
// Update the iov too as we will call writev again.
iov[iov_offset].iov_base =
const_cast<void*>(outgoing_messages_.front().data());
iov[iov_offset].iov_len = outgoing_messages_.front().data_num_bytes();
}
}
} }
#endif // !defined(OS_NACL)
#if defined(OS_IOS) return true;
bool OnControlMessage(Message::MessageType message_type, }
const void* payload,
size_t payload_size,
std::vector<PlatformHandle> handles) override {
switch (message_type) {
case Message::MessageType::HANDLES_SENT: {
if (payload_size == 0)
break;
MessagePtr message(new Channel::Message(
payload_size, 0, Message::MessageType::HANDLES_SENT_ACK));
memcpy(message->mutable_payload(), payload, payload_size);
Write(std::move(message));
return true;
}
case Message::MessageType::HANDLES_SENT_ACK: { // FlushOutgoingMessagesWritevNoLock is equivalent to
size_t num_fds = payload_size / sizeof(int); // FlushOutgoingMessagesNoLock except it looks for opportunities to make only
if (num_fds == 0 || payload_size % sizeof(int) != 0) // a single write syscall by using writev(2) instead of write(2). In most
break; // situations this is very straight forward; however, when a handle needs to
// be transferred we cannot use writev(2) and instead will fall back to the
// standard write.
bool ChannelPosix::FlushOutgoingMessagesWritevNoLock() {
do {
// If the first message contains a handle we will flush it first using a
// standard write, we will also use the standard write if we only have a
// single message.
while (!outgoing_messages_.empty() &&
(outgoing_messages_.front().num_handles_remaining() > 0 ||
outgoing_messages_.size() == 1)) {
MessageView message = std::move(outgoing_messages_.front());
outgoing_messages_.pop_front();
size_t messages_before_write = outgoing_messages_.size();
if (!WriteNoLock(std::move(message)))
return false;
const int* fds = reinterpret_cast<const int*>(payload); if (outgoing_messages_.size() > messages_before_write) {
if (!CloseHandles(fds, num_fds)) // It was re-queued by WriteNoLock.
break;
return true; return true;
} }
default:
break;
} }
return false; if (!WriteOutgoingMessagesWithWritev())
}
// Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
// |fds| does not match a sequence of handles in |fds_to_close_|.
bool CloseHandles(const int* fds, size_t num_fds) {
base::AutoLock l(fds_to_close_lock_);
if (!num_fds)
return false; return false;
auto start = std::find_if( // At this point if we have more messages then it's either because we
fds_to_close_.begin(), fds_to_close_.end(), // exceeded IOV_MAX OR it's because we ran into a FileHandle. Either way
[&fds](const base::ScopedFD& fd) { return fd.get() == fds[0]; }); // we just start the process all over again and it will flush any
if (start == fds_to_close_.end()) // FileHandles before attempting writev(2) again.
return false; } while (!outgoing_messages_.empty());
return true;
}
#endif // !defined(OS_NACL)
auto it = start; #if defined(OS_IOS)
size_t i = 0; bool ChannelPosix::OnControlMessage(Message::MessageType message_type,
// The FDs in the message should match a sequence of handles in const void* payload,
// |fds_to_close_|. size_t payload_size,
// TODO(wez): Consider making |fds_to_close_| a circular_deque<> std::vector<PlatformHandle> handles) {
// for greater efficiency? Or assign a unique Id to each FD-containing switch (message_type) {
// message, and map that to a vector of FDs to close, to avoid the case Message::MessageType::HANDLES_SENT: {
// need for this traversal? Id could even be the first FD in the message. if (payload_size == 0)
for (; i < num_fds && it != fds_to_close_.end(); i++, ++it) { break;
if (it->get() != fds[i]) MessagePtr message(new Channel::Message(
return false; payload_size, 0, Message::MessageType::HANDLES_SENT_ACK));
memcpy(message->mutable_payload(), payload, payload_size);
Write(std::move(message));
return true;
} }
if (i != num_fds)
return false;
// Close the FDs by erase()ing their ScopedFDs. case Message::MessageType::HANDLES_SENT_ACK: {
fds_to_close_.erase(start, it); size_t num_fds = payload_size / sizeof(int);
return true; if (num_fds == 0 || payload_size % sizeof(int) != 0)
} break;
#endif // defined(OS_IOS)
void OnWriteError(Error error) {
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
DCHECK(reject_writes_);
if (error == Error::kDisconnected) { const int* fds = reinterpret_cast<const int*>(payload);
// If we can't write because the pipe is disconnected then continue if (!CloseHandles(fds, num_fds))
// reading to fetch any in-flight messages, relying on end-of-stream to break;
// signal the actual disconnection. return true;
if (read_watcher_) {
write_watcher_.reset();
return;
}
} }
OnError(error); default:
break;
} }
// Keeps the Channel alive at least until explicit shutdown on the IO thread. return false;
scoped_refptr<Channel> self_; }
// We may be initialized with a server socket, in which case this will be
// valid until it accepts an incoming connection.
PlatformChannelServerEndpoint server_;
// The socket over which to communicate. May be passed in at construction time
// or accepted over |server_|.
base::ScopedFD socket_;
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_;
// These watchers must only be accessed on the IO thread.
std::unique_ptr<base::MessagePumpForIO::FdWatchController> read_watcher_;
std::unique_ptr<base::MessagePumpForIO::FdWatchController> write_watcher_;
base::circular_deque<base::ScopedFD> incoming_fds_; // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
// |fds| does not match a sequence of handles in |fds_to_close_|.
bool ChannelPosix::CloseHandles(const int* fds, size_t num_fds) {
base::AutoLock l(fds_to_close_lock_);
if (!num_fds)
return false;
// Protects |pending_write_| and |outgoing_messages_|. auto start = std::find_if(
base::Lock write_lock_; fds_to_close_.begin(), fds_to_close_.end(),
bool pending_write_ = false; [&fds](const base::ScopedFD& fd) { return fd.get() == fds[0]; });
bool reject_writes_ = false; if (start == fds_to_close_.end())
base::circular_deque<MessageView> outgoing_messages_; return false;
bool leak_handle_ = false; auto it = start;
size_t i = 0;
// The FDs in the message should match a sequence of handles in
// |fds_to_close_|.
// TODO(wez): Consider making |fds_to_close_| a circular_deque<>
// for greater efficiency? Or assign a unique Id to each FD-containing
// message, and map that to a vector of FDs to close, to avoid the
// need for this traversal? Id could even be the first FD in the message.
for (; i < num_fds && it != fds_to_close_.end(); i++, ++it) {
if (it->get() != fds[i])
return false;
}
if (i != num_fds)
return false;
#if defined(OS_IOS) // Close the FDs by erase()ing their ScopedFDs.
base::Lock fds_to_close_lock_; fds_to_close_.erase(start, it);
std::vector<base::ScopedFD> fds_to_close_; return true;
}
#endif // defined(OS_IOS) #endif // defined(OS_IOS)
DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
};
} // namespace
// static // static
#if !defined(OS_NACL) #if !defined(OS_NACL)
void Channel::set_posix_use_writev(bool use_writev) { void Channel::set_posix_use_writev(bool use_writev) {
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_CHANNEL_POSIX_H_
#define MOJO_CORE_CHANNEL_POSIX_H_
#include "mojo/core/channel.h"
#include "base/containers/queue.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_pump_for_io.h"
#include "base/synchronization/lock.h"
#include "base/task/current_thread.h"
#include "base/task_runner.h"
#include "build/build_config.h"
#include "mojo/core/core.h"
namespace mojo {
namespace core {
class MessageView;
class ChannelPosix : public Channel,
public base::CurrentThread::DestructionObserver,
public base::MessagePumpForIO::FdWatcher {
public:
ChannelPosix(Delegate* delegate,
ConnectionParams connection_params,
HandlePolicy handle_policy,
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner);
void Start() override;
void ShutDownImpl() override;
void Write(MessagePtr message) override;
void LeakHandle() override;
bool GetReadPlatformHandles(const void* payload,
size_t payload_size,
size_t num_handles,
const void* extra_header,
size_t extra_header_size,
std::vector<PlatformHandle>* handles,
bool* deferred) override;
private:
~ChannelPosix() override;
void StartOnIOThread();
void WaitForWriteOnIOThread();
void WaitForWriteOnIOThreadNoLock();
void ShutDownOnIOThread();
// base::CurrentThread::DestructionObserver:
void WillDestroyCurrentMessageLoop() override;
// base::MessagePumpForIO::FdWatcher:
void OnFileCanReadWithoutBlocking(int fd) override;
void OnFileCanWriteWithoutBlocking(int fd) override;
// Attempts to write a message directly to the channel. If the full message
// cannot be written, it's queued and a wait is initiated to write the message
// ASAP on the I/O thread.
bool WriteNoLock(MessageView message_view);
bool FlushOutgoingMessagesNoLock();
#if !defined(OS_NACL)
bool WriteOutgoingMessagesWithWritev();
// FlushOutgoingMessagesWritevNoLock is equivalent to
// FlushOutgoingMessagesNoLock except it looks for opportunities to make
// only a single write syscall by using writev(2) instead of write(2). In
// most situations this is very straight forward; however, when a handle
// needs to be transferred we cannot use writev(2) and instead will fall
// back to the standard write.
bool FlushOutgoingMessagesWritevNoLock();
#endif // !defined(OS_NACL)
#if defined(OS_IOS)
bool OnControlMessage(Message::MessageType message_type,
const void* payload,
size_t payload_size,
std::vector<PlatformHandle> handles) override;
bool CloseHandles(const int* fds, size_t num_fds);
#endif // defined(OS_IOS)
void OnWriteError(Error error);
// Keeps the Channel alive at least until explicit shutdown on the IO thread.
scoped_refptr<Channel> self_;
// We may be initialized with a server socket, in which case this will be
// valid until it accepts an incoming connection.
PlatformChannelServerEndpoint server_;
// The socket over which to communicate. May be passed in at construction time
// or accepted over |server_|.
base::ScopedFD socket_;
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_;
// These watchers must only be accessed on the IO thread.
std::unique_ptr<base::MessagePumpForIO::FdWatchController> read_watcher_;
std::unique_ptr<base::MessagePumpForIO::FdWatchController> write_watcher_;
base::circular_deque<base::ScopedFD> incoming_fds_;
// Protects |pending_write_| and |outgoing_messages_|.
base::Lock write_lock_;
bool pending_write_ = false;
bool reject_writes_ = false;
base::circular_deque<MessageView> outgoing_messages_;
bool leak_handle_ = false;
#if defined(OS_IOS)
base::Lock fds_to_close_lock_;
std::vector<base::ScopedFD> fds_to_close_;
#endif // defined(OS_IOS)
DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
};
} // namespace core
} // namespace mojo
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment