Commit 5d983728 authored by Ken Rockot's avatar Ken Rockot Committed by Commit Bot

Revert "Revert "Mojo EDK: Refactor layered message serialization""

This reverts commit 9a9457b3.

Reason for revert: Relanding this change as the broken test was invalid anyway and has since been removed. Sorry for the churn!

Original change's description:
> Revert "Mojo EDK: Refactor layered message serialization"
> 
> This reverts commit 77c59a87.
> 
> Reason for revert: Broke build. See the comment on the original review.
> 
> Original change's description:
> > Mojo EDK: Refactor layered message serialization
> > 
> > Consolidates and simplifies user message serialization within the EDK.
> > Namely:
> > 
> >   * The ports layer no longer operates under the assumption that all
> >   events exist exclusively in serialized form. This eliminates the
> >   complexity of layered message allocation at the cost of a small amount
> >   of extra copying in some cases.
> > 
> >   * ports::Message is now more aptly named ports::Event, and different
> >   event types are subclasses.
> > 
> >   * Introduces a ports::UserMessage base class for ports embedder
> >   message types. These can be attached to ports::UserMessageEvent
> >   objects.
> > 
> >   * The MessageForTransit and PortsMessage types have been flattened
> >   into a single UserMessageImpl type, a subclass of ports::UserMessage.
> > 
> >   * Nearly all user message serialization and deserialization logic has
> >   been consolidated into UserMessageImpl. A few small pieces remain
> >   isolated in node_controller.cc and will eventually move elsewhere
> >   after additional refactoring.
> > 
> > All of this work is a precursor to supporting lazy serialization.
> > 
> > BUG=725321
> > 
> > Change-Id: Ib9e66cd0d5b88ac0e6dc898f248c958039078023
> > Reviewed-on: https://chromium-review.googlesource.com/516402
> > Commit-Queue: Ken Rockot <rockot@chromium.org>
> > Reviewed-by: Jay Civelli <jcivelli@chromium.org>
> > Cr-Commit-Position: refs/heads/master@{#475731}
> 
> TBR=jcivelli@chromium.org,rockot@chromium.org
> No-Presubmit: true
> No-Tree-Checks: true
> No-Try: true
> BUG=725321
> 
> Change-Id: I84f950290e6c45dc830f2bc3177af09ae955f432
> Reviewed-on: https://chromium-review.googlesource.com/518742
> Reviewed-by: Takeshi Yoshino <tyoshino@chromium.org>
> Commit-Queue: Takeshi Yoshino <tyoshino@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#475805}

TBR=abarth@chromium.org,jcivelli@chromium.org,aa@chromium.org,rockot@chromium.org,tyoshino@chromium.org,darin@chromium.org,chromium-reviews@chromium.org,findit-for-me@appspot.gserviceaccount.com
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
BUG=725321

Change-Id: I5a2dbe4abf2e1f576bcf3f7f651f3557583d9033
Reviewed-on: https://chromium-review.googlesource.com/518427Reviewed-by: default avatarKen Rockot <rockot@chromium.org>
Commit-Queue: Ken Rockot <rockot@chromium.org>
Cr-Commit-Position: refs/heads/master@{#475825}
parent fa78f836
......@@ -42,8 +42,6 @@ component("system") {
"handle_table.h",
"mapping_table.cc",
"mapping_table.h",
"message_for_transit.cc",
"message_for_transit.h",
"message_pipe_dispatcher.cc",
"message_pipe_dispatcher.h",
"node_channel.cc",
......@@ -53,12 +51,12 @@ component("system") {
"options_validation.h",
"platform_handle_dispatcher.cc",
"platform_handle_dispatcher.h",
"ports_message.cc",
"ports_message.h",
"request_context.cc",
"request_context.h",
"shared_buffer_dispatcher.cc",
"shared_buffer_dispatcher.h",
"user_message_impl.cc",
"user_message_impl.h",
"watch.cc",
"watch.h",
"watcher_dispatcher.cc",
......
......@@ -26,13 +26,14 @@
#include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
#include "mojo/edk/system/data_pipe_producer_dispatcher.h"
#include "mojo/edk/system/handle_signals_state.h"
#include "mojo/edk/system/message_for_transit.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"
#include "mojo/edk/system/platform_handle_dispatcher.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/name.h"
#include "mojo/edk/system/ports/node.h"
#include "mojo/edk/system/request_context.h"
#include "mojo/edk/system/shared_buffer_dispatcher.h"
#include "mojo/edk/system/user_message_impl.h"
#include "mojo/edk/system/watcher_dispatcher.h"
namespace mojo {
......@@ -435,17 +436,18 @@ MojoResult Core::AllocMessage(uint32_t num_bytes,
const MojoHandle* handles,
uint32_t num_handles,
MojoAllocMessageFlags flags,
MojoMessageHandle* message) {
if (!message)
MojoMessageHandle* message_handle) {
if (!message_handle)
return MOJO_RESULT_INVALID_ARGUMENT;
if (num_handles == 0) { // Fast path: no handles.
std::unique_ptr<MessageForTransit> msg;
MojoResult rv = MessageForTransit::Create(&msg, num_bytes, nullptr, 0);
std::unique_ptr<ports::UserMessageEvent> message;
MojoResult rv = UserMessageImpl::CreateEventForNewSerializedMessage(
num_bytes, nullptr, 0, &message);
if (rv != MOJO_RESULT_OK)
return rv;
*message = reinterpret_cast<MojoMessageHandle>(msg.release());
*message_handle = reinterpret_cast<MojoMessageHandle>(message.release());
return MOJO_RESULT_OK;
}
......@@ -470,15 +472,15 @@ MojoResult Core::AllocMessage(uint32_t num_bytes,
}
DCHECK_EQ(num_handles, dispatchers.size());
std::unique_ptr<MessageForTransit> msg;
MojoResult rv = MessageForTransit::Create(
&msg, num_bytes, dispatchers.data(), num_handles);
std::unique_ptr<ports::UserMessageEvent> message;
MojoResult rv = UserMessageImpl::CreateEventForNewSerializedMessage(
num_bytes, dispatchers.data(), num_handles, &message);
{
base::AutoLock lock(handles_lock_);
if (rv == MOJO_RESULT_OK) {
handles_.CompleteTransitAndClose(dispatchers);
*message = reinterpret_cast<MojoMessageHandle>(msg.release());
*message_handle = reinterpret_cast<MojoMessageHandle>(message.release());
} else {
handles_.CancelTransit(dispatchers);
}
......@@ -487,22 +489,26 @@ MojoResult Core::AllocMessage(uint32_t num_bytes,
return rv;
}
MojoResult Core::FreeMessage(MojoMessageHandle message) {
if (!message)
MojoResult Core::FreeMessage(MojoMessageHandle message_handle) {
if (!message_handle)
return MOJO_RESULT_INVALID_ARGUMENT;
RequestContext request_context;
delete reinterpret_cast<MessageForTransit*>(message);
delete reinterpret_cast<ports::UserMessageEvent*>(message_handle);
return MOJO_RESULT_OK;
}
MojoResult Core::GetMessageBuffer(MojoMessageHandle message, void** buffer) {
if (!message)
MojoResult Core::GetMessageBuffer(MojoMessageHandle message_handle,
void** buffer) {
if (!message_handle)
return MOJO_RESULT_INVALID_ARGUMENT;
*buffer = reinterpret_cast<MessageForTransit*>(message)->mutable_bytes();
auto* message = reinterpret_cast<ports::UserMessageEvent*>(message_handle)
->GetMessage<UserMessageImpl>();
if (!message->IsSerialized())
return MOJO_RESULT_NOT_FOUND;
*buffer = message->user_payload();
return MOJO_RESULT_OK;
}
......@@ -525,8 +531,8 @@ MojoResult Core::CreateMessagePipe(
ports::PortRef port0, port1;
GetNodeController()->node()->CreatePortPair(&port0, &port1);
CHECK(message_pipe_handle0);
CHECK(message_pipe_handle1);
DCHECK(message_pipe_handle0);
DCHECK(message_pipe_handle1);
uint64_t pipe_id = base::RandUint64();
......@@ -575,16 +581,15 @@ MojoResult Core::WriteMessage(MojoHandle message_pipe_handle,
}
MojoResult Core::WriteMessageNew(MojoHandle message_pipe_handle,
MojoMessageHandle message,
MojoMessageHandle message_handle,
MojoWriteMessageFlags flags) {
RequestContext request_context;
std::unique_ptr<MessageForTransit> message_for_transit(
reinterpret_cast<MessageForTransit*>(message));
auto message = base::WrapUnique(
reinterpret_cast<ports::UserMessageEvent*>(message_handle));
auto dispatcher = GetDispatcher(message_pipe_handle);
if (!dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
return dispatcher->WriteMessage(std::move(message_for_transit), flags);
return dispatcher->WriteMessage(std::move(message), flags);
}
MojoResult Core::ReadMessage(MojoHandle message_pipe_handle,
......@@ -593,44 +598,55 @@ MojoResult Core::ReadMessage(MojoHandle message_pipe_handle,
MojoHandle* handles,
uint32_t* num_handles,
MojoReadMessageFlags flags) {
CHECK((!num_handles || !*num_handles || handles) &&
(!num_bytes || !*num_bytes || bytes));
DCHECK((!num_handles || !*num_handles || handles) &&
(!num_bytes || !*num_bytes || bytes));
RequestContext request_context;
auto dispatcher = GetDispatcher(message_pipe_handle);
if (!dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
std::unique_ptr<MessageForTransit> message;
std::unique_ptr<ports::UserMessageEvent> message_event;
MojoResult rv =
dispatcher->ReadMessage(&message, num_bytes, handles, num_handles, flags,
false /* ignore_num_bytes */);
dispatcher->ReadMessage(&message_event, num_bytes, handles, num_handles,
flags, false /* ignore_num_bytes */);
if (rv != MOJO_RESULT_OK)
return rv;
if (message && message->num_bytes())
memcpy(bytes, message->bytes(), message->num_bytes());
// Some tests use fake message pipe dispatchers which return null events.
//
// TODO(rockot): Fix the tests, because this is weird.
if (!message_event)
return MOJO_RESULT_OK;
auto* message = message_event->GetMessage<UserMessageImpl>();
if (!message->IsSerialized())
return MOJO_RESULT_UNKNOWN; // Maybe we need a better result code.
if (message->user_payload_size())
memcpy(bytes, message->user_payload(), message->user_payload_size());
return MOJO_RESULT_OK;
}
MojoResult Core::ReadMessageNew(MojoHandle message_pipe_handle,
MojoMessageHandle* message,
MojoMessageHandle* message_handle,
uint32_t* num_bytes,
MojoHandle* handles,
uint32_t* num_handles,
MojoReadMessageFlags flags) {
CHECK(message);
CHECK(!num_handles || !*num_handles || handles);
DCHECK(message_handle);
DCHECK(!num_handles || !*num_handles || handles);
RequestContext request_context;
auto dispatcher = GetDispatcher(message_pipe_handle);
if (!dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
std::unique_ptr<MessageForTransit> msg;
std::unique_ptr<ports::UserMessageEvent> message_event;
MojoResult rv =
dispatcher->ReadMessage(&msg, num_bytes, handles, num_handles, flags,
true /* ignore_num_bytes */);
dispatcher->ReadMessage(&message_event, num_bytes, handles, num_handles,
flags, true /* ignore_num_bytes */);
if (rv != MOJO_RESULT_OK)
return rv;
*message = reinterpret_cast<MojoMessageHandle>(msg.release());
*message_handle =
reinterpret_cast<MojoMessageHandle>(message_event.release());
return MOJO_RESULT_OK;
}
......@@ -669,15 +685,16 @@ MojoResult Core::FuseMessagePipes(MojoHandle handle0, MojoHandle handle1) {
return MOJO_RESULT_OK;
}
MojoResult Core::NotifyBadMessage(MojoMessageHandle message,
MojoResult Core::NotifyBadMessage(MojoMessageHandle message_handle,
const char* error,
size_t error_num_bytes) {
if (!message)
if (!message_handle)
return MOJO_RESULT_INVALID_ARGUMENT;
const PortsMessage& ports_message =
reinterpret_cast<MessageForTransit*>(message)->ports_message();
if (ports_message.source_node() == ports::kInvalidNodeName) {
auto* message_event =
reinterpret_cast<ports::UserMessageEvent*>(message_handle);
auto* message = message_event->GetMessage<UserMessageImpl>();
if (message->source_node() == ports::kInvalidNodeName) {
DVLOG(1) << "Received invalid message from unknown node.";
if (!default_process_error_callback_.is_null())
default_process_error_callback_.Run(std::string(error, error_num_bytes));
......@@ -685,7 +702,7 @@ MojoResult Core::NotifyBadMessage(MojoMessageHandle message,
}
GetNodeController()->NotifyBadMessageFrom(
ports_message.source_node(), std::string(error, error_num_bytes));
message->source_node(), std::string(error, error_num_bytes));
return MOJO_RESULT_OK;
}
......@@ -991,12 +1008,12 @@ MojoResult Core::UnwrapPlatformSharedBufferHandle(
static_cast<SharedBufferDispatcher*>(dispatcher.get());
scoped_refptr<PlatformSharedBuffer> platform_shared_buffer =
shm_dispatcher->PassPlatformSharedBuffer();
CHECK(platform_shared_buffer);
DCHECK(platform_shared_buffer);
CHECK(size);
DCHECK(size);
*size = platform_shared_buffer->GetNumBytes();
CHECK(flags);
DCHECK(flags);
*flags = MOJO_PLATFORM_SHARED_BUFFER_HANDLE_FLAG_NONE;
if (platform_shared_buffer->IsReadOnly())
*flags |= MOJO_PLATFORM_SHARED_BUFFER_HANDLE_FLAG_READ_ONLY;
......
......@@ -175,9 +175,9 @@ class MOJO_SYSTEM_IMPL_EXPORT Core {
const MojoHandle* handles,
uint32_t num_handles,
MojoAllocMessageFlags flags,
MojoMessageHandle* message);
MojoResult FreeMessage(MojoMessageHandle message);
MojoResult GetMessageBuffer(MojoMessageHandle message, void** buffer);
MojoMessageHandle* message_handle);
MojoResult FreeMessage(MojoMessageHandle message_handle);
MojoResult GetMessageBuffer(MojoMessageHandle message_handle, void** buffer);
MojoResult GetProperty(MojoPropertyType type, void* value);
// These methods correspond to the API functions defined in
......@@ -193,7 +193,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Core {
uint32_t num_handles,
MojoWriteMessageFlags flags);
MojoResult WriteMessageNew(MojoHandle message_pipe_handle,
MojoMessageHandle message,
MojoMessageHandle message_handle,
MojoWriteMessageFlags flags);
MojoResult ReadMessage(MojoHandle message_pipe_handle,
void* bytes,
......@@ -202,13 +202,13 @@ class MOJO_SYSTEM_IMPL_EXPORT Core {
uint32_t* num_handles,
MojoReadMessageFlags flags);
MojoResult ReadMessageNew(MojoHandle message_pipe_handle,
MojoMessageHandle* message,
MojoMessageHandle* message_handle,
uint32_t* num_bytes,
MojoHandle* handles,
uint32_t* num_handles,
MojoReadMessageFlags flags);
MojoResult FuseMessagePipes(MojoHandle handle0, MojoHandle handle1);
MojoResult NotifyBadMessage(MojoMessageHandle message,
MojoResult NotifyBadMessage(MojoMessageHandle message_handle,
const char* error,
size_t error_num_bytes);
......
......@@ -16,7 +16,7 @@
#include "mojo/edk/system/configuration.h"
#include "mojo/edk/system/core.h"
#include "mojo/edk/system/dispatcher.h"
#include "mojo/edk/system/message_for_transit.h"
#include "mojo/edk/system/user_message_impl.h"
namespace mojo {
namespace edk {
......@@ -42,11 +42,12 @@ class MockDispatcher : public Dispatcher {
}
MojoResult WriteMessage(
std::unique_ptr<MessageForTransit> message,
std::unique_ptr<ports::UserMessageEvent> message_event,
MojoWriteMessageFlags /*flags*/) override {
info_->IncrementWriteMessageCallCount();
if (message->num_bytes() > GetConfiguration().max_message_num_bytes)
auto* message = message_event->GetMessage<UserMessageImpl>();
if (message->user_payload_size() > GetConfiguration().max_message_num_bytes)
return MOJO_RESULT_RESOURCE_EXHAUSTED;
if (message->num_handles())
......@@ -55,12 +56,13 @@ class MockDispatcher : public Dispatcher {
return MOJO_RESULT_OK;
}
MojoResult ReadMessage(std::unique_ptr<MessageForTransit>* message,
uint32_t* num_bytes,
MojoHandle* handle,
uint32_t* num_handles,
MojoReadMessageFlags /*flags*/,
bool ignore_num_bytes) override {
MojoResult ReadMessage(
std::unique_ptr<ports::UserMessageEvent>* message_event,
uint32_t* num_bytes,
MojoHandle* handle,
uint32_t* num_handles,
MojoReadMessageFlags /*flags*/,
bool ignore_num_bytes) override {
info_->IncrementReadMessageCallCount();
if (num_handles)
......
......@@ -20,8 +20,8 @@
#include "mojo/edk/system/core.h"
#include "mojo/edk/system/data_pipe_control_message.h"
#include "mojo/edk/system/node_controller.h"
#include "mojo/edk/system/ports_message.h"
#include "mojo/edk/system/request_context.h"
#include "mojo/edk/system/user_message_impl.h"
#include "mojo/public/c/system/data_pipe.h"
namespace mojo {
......@@ -527,21 +527,21 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
<< " [control_port=" << control_port_.name() << "]";
peer_closed_ = true;
} else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
ports::ScopedMessage message;
std::unique_ptr<ports::UserMessageEvent> message_event;
do {
int rv = node_controller_->node()->GetMessage(
control_port_, &message, nullptr);
int rv = node_controller_->node()->GetMessage(control_port_,
&message_event, nullptr);
if (rv != ports::OK)
peer_closed_ = true;
if (message) {
if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
if (message_event) {
auto* message = message_event->GetMessage<UserMessageImpl>();
if (message->user_payload_size() < sizeof(DataPipeControlMessage)) {
peer_closed_ = true;
break;
}
const DataPipeControlMessage* m =
static_cast<const DataPipeControlMessage*>(
message->payload_bytes());
static_cast<const DataPipeControlMessage*>(message->user_payload());
if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
DLOG(ERROR) << "Unexpected control message from producer.";
......@@ -562,7 +562,7 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
bytes_available_ += m->num_bytes;
}
} while (message);
} while (message_event);
}
bool has_new_data = bytes_available_ != previous_bytes_available;
......
......@@ -6,7 +6,8 @@
#include "mojo/edk/embedder/platform_handle_vector.h"
#include "mojo/edk/system/node_controller.h"
#include "mojo/edk/system/ports_message.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/user_message_impl.h"
namespace mojo {
namespace edk {
......@@ -15,16 +16,18 @@ void SendDataPipeControlMessage(NodeController* node_controller,
const ports::PortRef& port,
DataPipeCommand command,
uint32_t num_bytes) {
std::unique_ptr<PortsMessage> message =
PortsMessage::NewUserMessage(sizeof(DataPipeControlMessage), 0, 0);
CHECK(message);
std::unique_ptr<ports::UserMessageEvent> event;
MojoResult result = UserMessageImpl::CreateEventForNewSerializedMessage(
sizeof(DataPipeControlMessage), nullptr, 0, &event);
DCHECK_EQ(MOJO_RESULT_OK, result);
DCHECK(event);
DataPipeControlMessage* data =
static_cast<DataPipeControlMessage*>(message->mutable_payload_bytes());
DataPipeControlMessage* data = static_cast<DataPipeControlMessage*>(
event->GetMessage<UserMessageImpl>()->user_payload());
data->command = command;
data->num_bytes = num_bytes;
int rv = node_controller->SendMessage(port, std::move(message));
int rv = node_controller->SendUserMessage(port, std::move(event));
if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
DLOG(ERROR) << "Unexpected failure sending data pipe control message: "
<< rv;
......
......@@ -19,8 +19,8 @@
#include "mojo/edk/system/core.h"
#include "mojo/edk/system/data_pipe_control_message.h"
#include "mojo/edk/system/node_controller.h"
#include "mojo/edk/system/ports_message.h"
#include "mojo/edk/system/request_context.h"
#include "mojo/edk/system/user_message_impl.h"
#include "mojo/public/c/system/data_pipe.h"
namespace mojo {
......@@ -476,21 +476,21 @@ void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
<< " [control_port=" << control_port_.name() << "]";
peer_closed_ = true;
} else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
ports::ScopedMessage message;
std::unique_ptr<ports::UserMessageEvent> message_event;
do {
int rv = node_controller_->node()->GetMessage(
control_port_, &message, nullptr);
int rv = node_controller_->node()->GetMessage(control_port_,
&message_event, nullptr);
if (rv != ports::OK)
peer_closed_ = true;
if (message) {
if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
if (message_event) {
auto* message = message_event->GetMessage<UserMessageImpl>();
if (message->user_payload_size() < sizeof(DataPipeControlMessage)) {
peer_closed_ = true;
break;
}
const DataPipeControlMessage* m =
static_cast<const DataPipeControlMessage*>(
message->payload_bytes());
static_cast<const DataPipeControlMessage*>(message->user_payload());
if (m->command != DataPipeCommand::DATA_WAS_READ) {
DLOG(ERROR) << "Unexpected message from consumer.";
......@@ -510,7 +510,7 @@ void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
available_capacity_ += m->num_bytes;
}
} while (message);
} while (message_event);
}
if (peer_closed_ != was_peer_closed ||
......
......@@ -10,6 +10,7 @@
#include "mojo/edk/system/data_pipe_producer_dispatcher.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"
#include "mojo/edk/system/platform_handle_dispatcher.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/shared_buffer_dispatcher.h"
namespace mojo {
......@@ -39,17 +40,19 @@ MojoResult Dispatcher::Arm(uint32_t* num_ready_contexts,
return MOJO_RESULT_INVALID_ARGUMENT;
}
MojoResult Dispatcher::WriteMessage(std::unique_ptr<MessageForTransit> message,
MojoWriteMessageFlags flags) {
MojoResult Dispatcher::WriteMessage(
std::unique_ptr<ports::UserMessageEvent> message,
MojoWriteMessageFlags flags) {
return MOJO_RESULT_INVALID_ARGUMENT;
}
MojoResult Dispatcher::ReadMessage(std::unique_ptr<MessageForTransit>* message,
uint32_t* num_bytes,
MojoHandle* handles,
uint32_t* num_handles,
MojoReadMessageFlags flags,
bool read_any_size) {
MojoResult Dispatcher::ReadMessage(
std::unique_ptr<ports::UserMessageEvent>* message,
uint32_t* num_bytes,
MojoHandle* handles,
uint32_t* num_handles,
MojoReadMessageFlags flags,
bool read_any_size) {
return MOJO_RESULT_INVALID_ARGUMENT;
}
......
......@@ -29,8 +29,11 @@
namespace mojo {
namespace edk {
namespace ports {
class UserMessageEvent;
}
class Dispatcher;
class MessageForTransit;
using DispatcherVector = std::vector<scoped_refptr<Dispatcher>>;
......@@ -78,15 +81,17 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher
///////////// Message pipe API /////////////
virtual MojoResult WriteMessage(std::unique_ptr<MessageForTransit> message,
MojoWriteMessageFlags flags);
virtual MojoResult ReadMessage(std::unique_ptr<MessageForTransit>* message,
uint32_t* num_bytes,
MojoHandle* handles,
uint32_t* num_handles,
MojoReadMessageFlags flags,
bool read_any_size);
virtual MojoResult WriteMessage(
std::unique_ptr<ports::UserMessageEvent> message,
MojoWriteMessageFlags flags);
virtual MojoResult ReadMessage(
std::unique_ptr<ports::UserMessageEvent>* message,
uint32_t* num_bytes,
MojoHandle* handles,
uint32_t* num_handles,
MojoReadMessageFlags flags,
bool read_any_size);
///////////// Shared buffer API /////////////
......
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/edk/system/message_for_transit.h"
#include <vector>
#include "mojo/edk/embedder/platform_handle_vector.h"
namespace mojo {
namespace edk {
namespace {
static_assert(sizeof(MessageForTransit::MessageHeader) % 8 == 0,
"Invalid MessageHeader size.");
static_assert(sizeof(MessageForTransit::DispatcherHeader) % 8 == 0,
"Invalid DispatcherHeader size.");
} // namespace
MessageForTransit::~MessageForTransit() {}
// static
MojoResult MessageForTransit::Create(
std::unique_ptr<MessageForTransit>* message,
uint32_t num_bytes,
const Dispatcher::DispatcherInTransit* dispatchers,
uint32_t num_dispatchers) {
// A structure for retaining information about every Dispatcher that will be
// sent with this message.
struct DispatcherInfo {
uint32_t num_bytes;
uint32_t num_ports;
uint32_t num_handles;
};
// This is only the base header size. It will grow as we accumulate the
// size of serialized state for each dispatcher.
size_t header_size = sizeof(MessageHeader) +
num_dispatchers * sizeof(DispatcherHeader);
size_t num_ports = 0;
size_t num_handles = 0;
std::vector<DispatcherInfo> dispatcher_info(num_dispatchers);
for (size_t i = 0; i < num_dispatchers; ++i) {
Dispatcher* d = dispatchers[i].dispatcher.get();
d->StartSerialize(&dispatcher_info[i].num_bytes,
&dispatcher_info[i].num_ports,
&dispatcher_info[i].num_handles);
header_size += dispatcher_info[i].num_bytes;
num_ports += dispatcher_info[i].num_ports;
num_handles += dispatcher_info[i].num_handles;
}
// We now have enough information to fully allocate the message storage.
std::unique_ptr<PortsMessage> msg = PortsMessage::NewUserMessage(
header_size + num_bytes, num_ports, num_handles);
if (!msg)
return MOJO_RESULT_RESOURCE_EXHAUSTED;
// Populate the message header with information about serialized dispatchers.
//
// The front of the message is always a MessageHeader followed by a
// DispatcherHeader for each dispatcher to be sent.
MessageHeader* header =
static_cast<MessageHeader*>(msg->mutable_payload_bytes());
DispatcherHeader* dispatcher_headers =
reinterpret_cast<DispatcherHeader*>(header + 1);
// Serialized dispatcher state immediately follows the series of
// DispatcherHeaders.
char* dispatcher_data =
reinterpret_cast<char*>(dispatcher_headers + num_dispatchers);
header->num_dispatchers = num_dispatchers;
// |header_size| is the total number of bytes preceding the message payload,
// including all dispatcher headers and serialized dispatcher state.
DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max());
header->header_size = static_cast<uint32_t>(header_size);
if (num_dispatchers > 0) {
ScopedPlatformHandleVectorPtr handles(
new PlatformHandleVector(num_handles));
size_t port_index = 0;
size_t handle_index = 0;
bool fail = false;
for (size_t i = 0; i < num_dispatchers; ++i) {
Dispatcher* d = dispatchers[i].dispatcher.get();
DispatcherHeader* dh = &dispatcher_headers[i];
const DispatcherInfo& info = dispatcher_info[i];
// Fill in the header for this dispatcher.
dh->type = static_cast<int32_t>(d->GetType());
dh->num_bytes = info.num_bytes;
dh->num_ports = info.num_ports;
dh->num_platform_handles = info.num_handles;
// Fill in serialized state, ports, and platform handles. We'll cancel
// the send if the dispatcher implementation rejects for some reason.
if (!d->EndSerialize(static_cast<void*>(dispatcher_data),
msg->mutable_ports() + port_index,
handles->data() + handle_index)) {
fail = true;
break;
}
dispatcher_data += info.num_bytes;
port_index += info.num_ports;
handle_index += info.num_handles;
}
if (fail) {
// Release any platform handles we've accumulated. Their dispatchers
// retain ownership when message creation fails, so these are not actually
// leaking.
handles->clear();
return MOJO_RESULT_INVALID_ARGUMENT;
}
// Take ownership of all the handles and move them into message storage.
msg->SetHandles(std::move(handles));
}
message->reset(new MessageForTransit(std::move(msg)));
return MOJO_RESULT_OK;
}
MessageForTransit::MessageForTransit(std::unique_ptr<PortsMessage> message)
: message_(std::move(message)) {
}
} // namespace edk
} // namespace mojo
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_SYSTEM_MESSAGE_FOR_TRANSIT_H_
#define MOJO_EDK_SYSTEM_MESSAGE_FOR_TRANSIT_H_
#include <stdint.h>
#include <memory>
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "mojo/edk/system/dispatcher.h"
#include "mojo/edk/system/ports_message.h"
#include "mojo/edk/system/system_impl_export.h"
namespace mojo {
namespace edk {
// MessageForTransit holds onto a PortsMessage which may be sent via
// |MojoWriteMessage()| or which may have been received on a pipe endpoint.
// Instances of this class are exposed to Mojo system API consumers via the
// opaque pointers used with |MojoCreateMessage()|, |MojoDestroyMessage()|,
// |MojoWriteMessageNew()|, and |MojoReadMessageNew()|.
class MOJO_SYSTEM_IMPL_EXPORT MessageForTransit {
public:
#pragma pack(push, 1)
// Header attached to every message.
struct MessageHeader {
// The number of serialized dispatchers included in this header.
uint32_t num_dispatchers;
// Total size of the header, including serialized dispatcher data.
uint32_t header_size;
};
// Header for each dispatcher in a message, immediately following the message
// header.
struct DispatcherHeader {
// The type of the dispatcher, correpsonding to the Dispatcher::Type enum.
int32_t type;
// The size of the serialized dispatcher, not including this header.
uint32_t num_bytes;
// The number of ports needed to deserialize this dispatcher.
uint32_t num_ports;
// The number of platform handles needed to deserialize this dispatcher.
uint32_t num_platform_handles;
};
#pragma pack(pop)
~MessageForTransit();
// A static constructor for building outbound messages.
static MojoResult Create(
std::unique_ptr<MessageForTransit>* message,
uint32_t num_bytes,
const Dispatcher::DispatcherInTransit* dispatchers,
uint32_t num_dispatchers);
// A static constructor for wrapping inbound messages.
static std::unique_ptr<MessageForTransit> WrapPortsMessage(
std::unique_ptr<PortsMessage> message) {
return base::WrapUnique(new MessageForTransit(std::move(message)));
}
const void* bytes() const {
DCHECK(message_);
return static_cast<const void*>(
static_cast<const char*>(message_->payload_bytes()) +
header()->header_size);
}
void* mutable_bytes() {
DCHECK(message_);
return static_cast<void*>(
static_cast<char*>(message_->mutable_payload_bytes()) +
header()->header_size);
}
size_t num_bytes() const {
size_t header_size = header()->header_size;
DCHECK_GE(message_->num_payload_bytes(), header_size);
return message_->num_payload_bytes() - header_size;
}
size_t num_handles() const { return header()->num_dispatchers; }
const PortsMessage& ports_message() const { return *message_; }
std::unique_ptr<PortsMessage> TakePortsMessage() {
return std::move(message_);
}
private:
explicit MessageForTransit(std::unique_ptr<PortsMessage> message);
const MessageForTransit::MessageHeader* header() const {
DCHECK(message_);
return static_cast<const MessageForTransit::MessageHeader*>(
message_->payload_bytes());
}
std::unique_ptr<PortsMessage> message_;
DISALLOW_COPY_AND_ASSIGN(MessageForTransit);
};
} // namespace edk
} // namespace mojo
#endif // MOJO_EDK_SYSTEM_MESSAGE_FOR_TRANSIT_H_
This diff is collapsed.
......@@ -13,7 +13,6 @@
#include "base/macros.h"
#include "mojo/edk/system/atomic_flag.h"
#include "mojo/edk/system/dispatcher.h"
#include "mojo/edk/system/message_for_transit.h"
#include "mojo/edk/system/ports/port_ref.h"
#include "mojo/edk/system/watcher_set.h"
......@@ -48,9 +47,9 @@ class MessagePipeDispatcher : public Dispatcher {
// Dispatcher:
Type GetType() const override;
MojoResult Close() override;
MojoResult WriteMessage(std::unique_ptr<MessageForTransit> message,
MojoResult WriteMessage(std::unique_ptr<ports::UserMessageEvent> message,
MojoWriteMessageFlags flags) override;
MojoResult ReadMessage(std::unique_ptr<MessageForTransit>* message,
MojoResult ReadMessage(std::unique_ptr<ports::UserMessageEvent>* message,
uint32_t* num_bytes,
MojoHandle* handles,
uint32_t* num_handles,
......
This diff is collapsed.
......@@ -56,7 +56,7 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
virtual void OnAcceptBrokerClient(const ports::NodeName& from_node,
const ports::NodeName& broker_name,
ScopedPlatformHandle broker_channel) = 0;
virtual void OnPortsMessage(const ports::NodeName& from_node,
virtual void OnEventMessage(const ports::NodeName& from_node,
Channel::MessagePtr message) = 0;
virtual void OnRequestPortMerge(const ports::NodeName& from_node,
const ports::PortName& connector_port_name,
......@@ -69,11 +69,11 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
virtual void OnBroadcast(const ports::NodeName& from_node,
Channel::MessagePtr message) = 0;
#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
virtual void OnRelayPortsMessage(const ports::NodeName& from_node,
virtual void OnRelayEventMessage(const ports::NodeName& from_node,
base::ProcessHandle from_process,
const ports::NodeName& destination,
Channel::MessagePtr message) = 0;
virtual void OnPortsMessageFromRelay(const ports::NodeName& from_node,
virtual void OnEventMessageFromRelay(const ports::NodeName& from_node,
const ports::NodeName& source_node,
Channel::MessagePtr message) = 0;
#endif
......@@ -95,11 +95,12 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
scoped_refptr<base::TaskRunner> io_task_runner,
const ProcessErrorCallback& process_error_callback);
static Channel::MessagePtr CreatePortsMessage(size_t payload_size,
static Channel::MessagePtr CreateEventMessage(size_t payload_size,
void** payload,
size_t num_handles);
static void GetPortsMessageData(Channel::Message* message, void** data,
static void GetEventMessageData(Channel::Message* message,
void** data,
size_t* num_data_bytes);
// Start receiving messages.
......@@ -137,12 +138,12 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
ScopedPlatformHandle broker_channel);
void AcceptBrokerClient(const ports::NodeName& broker_name,
ScopedPlatformHandle broker_channel);
void PortsMessage(Channel::MessagePtr message);
void RequestPortMerge(const ports::PortName& connector_port_name,
const std::string& token);
void RequestIntroduction(const ports::NodeName& name);
void Introduce(const ports::NodeName& name,
ScopedPlatformHandle channel_handle);
void SendChannelMessage(Channel::MessagePtr message);
void Broadcast(Channel::MessagePtr message);
#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
......@@ -150,13 +151,13 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
// pass windows handles between two processes that do not have permission to
// duplicate handles into the other's address space. The relay process is
// assumed to have that permission.
void RelayPortsMessage(const ports::NodeName& destination,
void RelayEventMessage(const ports::NodeName& destination,
Channel::MessagePtr message);
// Sends a message to its destination from a relay. This is interpreted by the
// receiver similarly to PortsMessage, but the original source node is
// receiver similarly to EventMessage, but the original source node is
// provided as additional message metadata from the (trusted) relay node.
void PortsMessageFromRelay(const ports::NodeName& source,
void EventMessageFromRelay(const ports::NodeName& source,
Channel::MessagePtr message);
#endif
......
This diff is collapsed.
......@@ -24,6 +24,7 @@
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/atomic_flag.h"
#include "mojo/edk/system/node_channel.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/name.h"
#include "mojo/edk/system/ports/node.h"
#include "mojo/edk/system/ports/node_delegate.h"
......@@ -38,7 +39,6 @@ namespace edk {
class Broker;
class Core;
class MachPortRelay;
class PortsMessage;
// The owner of ports::Node which facilitates core EDK implementation. All
// public interface methods are safe to call from any thread.
......@@ -103,8 +103,8 @@ class NodeController : public ports::NodeDelegate,
void ClosePort(const ports::PortRef& port);
// Sends a message on a port to its peer.
int SendMessage(const ports::PortRef& port_ref,
std::unique_ptr<PortsMessage> message);
int SendUserMessage(const ports::PortRef& port_ref,
std::unique_ptr<ports::UserMessageEvent> message);
// Merges a local port |port| into a port reserved by |name| in the parent.
void MergePortIntoParent(const std::string& name, const ports::PortRef& port);
......@@ -176,19 +176,16 @@ class NodeController : public ports::NodeDelegate,
scoped_refptr<NodeChannel> channel,
bool start_channel);
void DropPeer(const ports::NodeName& name, NodeChannel* channel);
void SendPeerMessage(const ports::NodeName& name,
ports::ScopedMessage message);
void SendPeerEvent(const ports::NodeName& name, ports::ScopedEvent event);
void AcceptIncomingMessages();
void ProcessIncomingMessages();
void DropAllPeers();
// ports::NodeDelegate:
void GenerateRandomPortName(ports::PortName* port_name) override;
void AllocMessage(size_t num_header_bytes,
ports::ScopedMessage* message) override;
void ForwardMessage(const ports::NodeName& node,
ports::ScopedMessage message) override;
void BroadcastMessage(ports::ScopedMessage message) override;
void ForwardEvent(const ports::NodeName& node,
ports::ScopedEvent event) override;
void BroadcastEvent(ports::ScopedEvent event) override;
void PortStatusChanged(const ports::PortRef& port) override;
// NodeChannel::Delegate:
......@@ -207,7 +204,7 @@ class NodeController : public ports::NodeDelegate,
void OnAcceptBrokerClient(const ports::NodeName& from_node,
const ports::NodeName& broker_name,
ScopedPlatformHandle broker_channel) override;
void OnPortsMessage(const ports::NodeName& from_node,
void OnEventMessage(const ports::NodeName& from_node,
Channel::MessagePtr message) override;
void OnRequestPortMerge(const ports::NodeName& from_node,
const ports::PortName& connector_port_name,
......@@ -220,11 +217,11 @@ class NodeController : public ports::NodeDelegate,
void OnBroadcast(const ports::NodeName& from_node,
Channel::MessagePtr message) override;
#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
void OnRelayPortsMessage(const ports::NodeName& from_node,
void OnRelayEventMessage(const ports::NodeName& from_node,
base::ProcessHandle from_process,
const ports::NodeName& destination,
Channel::MessagePtr message) override;
void OnPortsMessageFromRelay(const ports::NodeName& from_node,
void OnEventMessageFromRelay(const ports::NodeName& from_node,
const ports::NodeName& source_node,
Channel::MessagePtr message) override;
#endif
......@@ -311,14 +308,14 @@ class NodeController : public ports::NodeDelegate,
std::unordered_map<ports::NodeName, OutgoingMessageQueue>
pending_relay_messages_;
// Guards |incoming_messages_| and |incoming_messages_task_posted_|.
base::Lock messages_lock_;
std::queue<ports::ScopedMessage> incoming_messages_;
// Guards |incoming_events_| and |incoming_events_task_posted_|.
base::Lock events_lock_;
std::vector<ports::ScopedEvent> incoming_events_;
// Ensures that there is only one incoming messages task posted to the IO
// thread.
bool incoming_messages_task_posted_ = false;
// Flag to fast-path checking |incoming_messages_|.
AtomicFlag incoming_messages_flag_;
bool incoming_events_task_posted_ = false;
// Flag to fast-path checking |incoming_events_|.
AtomicFlag incoming_events_flag_;
// Guards |shutdown_callback_|.
base::Lock shutdown_lock_;
......
......@@ -8,8 +8,6 @@ source_set("ports") {
sources = [
"event.cc",
"event.h",
"message.cc",
"message.h",
"message_filter.h",
"message_queue.cc",
"message_queue.h",
......@@ -23,6 +21,7 @@ source_set("ports") {
"port_ref.cc",
"port_ref.h",
"user_data.h",
"user_message.h",
]
public_deps = [
......
This diff is collapsed.
......@@ -7,102 +7,247 @@
#include <stdint.h>
#include "mojo/edk/system/ports/message.h"
#include <vector>
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "mojo/edk/system/ports/name.h"
#include "mojo/edk/system/ports/user_message.h"
namespace mojo {
namespace edk {
namespace ports {
class Event;
using ScopedEvent = std::unique_ptr<Event>;
class Event {
public:
enum Type : uint32_t {
kUserMessage,
kPortAccepted,
kObserveProxy,
kObserveProxyAck,
kObserveClosure,
kMergePort,
};
#pragma pack(push, 1)
struct PortDescriptor {
PortDescriptor();
// TODO: Add static assertions of alignment.
NodeName peer_node_name;
PortName peer_port_name;
NodeName referring_node_name;
PortName referring_port_name;
uint64_t next_sequence_num_to_send;
uint64_t next_sequence_num_to_receive;
uint64_t last_sequence_num_to_receive;
bool peer_closed;
char padding[7];
};
#pragma pack(pop)
virtual ~Event();
struct PortDescriptor {
PortDescriptor();
static ScopedEvent Deserialize(const void* buffer, size_t num_bytes);
NodeName peer_node_name;
PortName peer_port_name;
NodeName referring_node_name;
PortName referring_port_name;
uint64_t next_sequence_num_to_send;
uint64_t next_sequence_num_to_receive;
uint64_t last_sequence_num_to_receive;
bool peer_closed;
char padding[7];
};
template <typename T>
static std::unique_ptr<T> Cast(ScopedEvent* event) {
return base::WrapUnique(static_cast<T*>(event->release()));
}
enum struct EventType : uint32_t {
kUser,
kPortAccepted,
kObserveProxy,
kObserveProxyAck,
kObserveClosure,
kMergePort,
};
Type type() const { return type_; }
const PortName& port_name() const { return port_name_; }
void set_port_name(const PortName& port_name) { port_name_ = port_name; }
size_t GetSerializedSize() const;
void Serialize(void* buffer) const;
virtual ScopedEvent Clone() const;
protected:
Event(Type type, const PortName& port_name);
struct EventHeader {
EventType type;
uint32_t padding;
PortName port_name;
virtual size_t GetSerializedDataSize() const = 0;
virtual void SerializeData(void* buffer) const = 0;
private:
const Type type_;
PortName port_name_;
DISALLOW_COPY_AND_ASSIGN(Event);
};
struct UserEventData {
uint64_t sequence_num;
uint32_t num_ports;
uint32_t padding;
class UserMessageEvent : public Event {
public:
explicit UserMessageEvent(size_t num_ports);
~UserMessageEvent() override;
bool HasMessage() const { return !!message_; }
void AttachMessage(std::unique_ptr<UserMessage> message);
template <typename T>
T* GetMessage() {
DCHECK(HasMessage());
DCHECK_EQ(&T::kUserMessageTypeInfo, message_->type_info());
return static_cast<T*>(message_.get());
}
template <typename T>
const T* GetMessage() const {
DCHECK(HasMessage());
DCHECK_EQ(&T::kUserMessageTypeInfo, message_->type_info());
return static_cast<const T*>(message_.get());
}
void ReservePorts(size_t num_ports);
uint32_t sequence_num() const { return sequence_num_; }
void set_sequence_num(uint32_t sequence_num) { sequence_num_ = sequence_num; }
size_t num_ports() const { return ports_.size(); }
PortDescriptor* port_descriptors() { return port_descriptors_.data(); }
PortName* ports() { return ports_.data(); }
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
UserMessageEvent(const PortName& port_name, uint64_t sequence_num);
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
uint64_t sequence_num_ = 0;
std::vector<PortDescriptor> port_descriptors_;
std::vector<PortName> ports_;
std::unique_ptr<UserMessage> message_;
DISALLOW_COPY_AND_ASSIGN(UserMessageEvent);
};
struct ObserveProxyEventData {
NodeName proxy_node_name;
PortName proxy_port_name;
NodeName proxy_to_node_name;
PortName proxy_to_port_name;
class PortAcceptedEvent : public Event {
public:
explicit PortAcceptedEvent(const PortName& port_name);
~PortAcceptedEvent() override;
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
DISALLOW_COPY_AND_ASSIGN(PortAcceptedEvent);
};
struct ObserveProxyAckEventData {
uint64_t last_sequence_num;
class ObserveProxyEvent : public Event {
public:
ObserveProxyEvent(const PortName& port_name,
const NodeName& proxy_node_name,
const PortName& proxy_port_name,
const NodeName& proxy_target_node_name,
const PortName& proxy_target_port_name);
~ObserveProxyEvent() override;
const NodeName& proxy_node_name() const { return proxy_node_name_; }
const PortName& proxy_port_name() const { return proxy_port_name_; }
const NodeName& proxy_target_node_name() const {
return proxy_target_node_name_;
}
const PortName& proxy_target_port_name() const {
return proxy_target_port_name_;
}
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
ScopedEvent Clone() const override;
const NodeName proxy_node_name_;
const PortName proxy_port_name_;
const NodeName proxy_target_node_name_;
const PortName proxy_target_port_name_;
DISALLOW_COPY_AND_ASSIGN(ObserveProxyEvent);
};
struct ObserveClosureEventData {
uint64_t last_sequence_num;
class ObserveProxyAckEvent : public Event {
public:
ObserveProxyAckEvent(const PortName& port_name, uint64_t last_sequence_num);
~ObserveProxyAckEvent() override;
uint64_t last_sequence_num() const { return last_sequence_num_; }
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
ScopedEvent Clone() const override;
const uint64_t last_sequence_num_;
DISALLOW_COPY_AND_ASSIGN(ObserveProxyAckEvent);
};
struct MergePortEventData {
PortName new_port_name;
PortDescriptor new_port_descriptor;
class ObserveClosureEvent : public Event {
public:
ObserveClosureEvent(const PortName& port_name, uint64_t last_sequence_num);
~ObserveClosureEvent() override;
uint64_t last_sequence_num() const { return last_sequence_num_; }
void set_last_sequence_num(uint64_t last_sequence_num) {
last_sequence_num_ = last_sequence_num;
}
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
ScopedEvent Clone() const override;
uint64_t last_sequence_num_;
DISALLOW_COPY_AND_ASSIGN(ObserveClosureEvent);
};
#pragma pack(pop)
class MergePortEvent : public Event {
public:
MergePortEvent(const PortName& port_name,
const PortName& new_port_name,
const PortDescriptor& new_port_descriptor);
~MergePortEvent() override;
inline const EventHeader* GetEventHeader(const Message& message) {
return static_cast<const EventHeader*>(message.header_bytes());
}
inline EventHeader* GetMutableEventHeader(Message* message) {
return static_cast<EventHeader*>(message->mutable_header_bytes());
}
template <typename EventData>
inline const EventData* GetEventData(const Message& message) {
return reinterpret_cast<const EventData*>(
reinterpret_cast<const char*>(GetEventHeader(message) + 1));
}
template <typename EventData>
inline EventData* GetMutableEventData(Message* message) {
return reinterpret_cast<EventData*>(
reinterpret_cast<char*>(GetMutableEventHeader(message) + 1));
}
inline const PortDescriptor* GetPortDescriptors(const UserEventData* event) {
return reinterpret_cast<const PortDescriptor*>(
reinterpret_cast<const char*>(event + 1));
}
inline PortDescriptor* GetMutablePortDescriptors(UserEventData* event) {
return reinterpret_cast<PortDescriptor*>(reinterpret_cast<char*>(event + 1));
}
const PortName& new_port_name() const { return new_port_name_; }
const PortDescriptor& new_port_descriptor() const {
return new_port_descriptor_;
}
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
const PortName new_port_name_;
const PortDescriptor new_port_descriptor_;
DISALLOW_COPY_AND_ASSIGN(MergePortEvent);
};
} // namespace ports
} // namespace edk
......
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stdlib.h>
#include <limits>
#include "base/logging.h"
#include "mojo/edk/system/ports/event.h"
namespace mojo {
namespace edk {
namespace ports {
// static
bool Message::Parse(const void* bytes,
size_t num_bytes,
size_t* num_header_bytes,
size_t* num_payload_bytes,
size_t* num_ports_bytes) {
if (num_bytes < sizeof(EventHeader))
return false;
const EventHeader* header = static_cast<const EventHeader*>(bytes);
switch (header->type) {
case EventType::kUser:
// See below.
break;
case EventType::kPortAccepted:
*num_header_bytes = sizeof(EventHeader);
break;
case EventType::kObserveProxy:
*num_header_bytes = sizeof(EventHeader) + sizeof(ObserveProxyEventData);
break;
case EventType::kObserveProxyAck:
*num_header_bytes =
sizeof(EventHeader) + sizeof(ObserveProxyAckEventData);
break;
case EventType::kObserveClosure:
*num_header_bytes = sizeof(EventHeader) + sizeof(ObserveClosureEventData);
break;
case EventType::kMergePort:
*num_header_bytes = sizeof(EventHeader) + sizeof(MergePortEventData);
break;
default:
return false;
}
if (header->type == EventType::kUser) {
if (num_bytes < sizeof(EventHeader) + sizeof(UserEventData))
return false;
const UserEventData* event_data =
reinterpret_cast<const UserEventData*>(
reinterpret_cast<const char*>(header + 1));
if (event_data->num_ports > std::numeric_limits<uint16_t>::max())
return false;
*num_header_bytes = sizeof(EventHeader) +
sizeof(UserEventData) +
event_data->num_ports * sizeof(PortDescriptor);
*num_ports_bytes = event_data->num_ports * sizeof(PortName);
if (num_bytes < *num_header_bytes + *num_ports_bytes)
return false;
*num_payload_bytes = num_bytes - *num_header_bytes - *num_ports_bytes;
} else {
if (*num_header_bytes != num_bytes)
return false;
*num_payload_bytes = 0;
*num_ports_bytes = 0;
}
return true;
}
Message::Message(size_t num_payload_bytes, size_t num_ports)
: Message(sizeof(EventHeader) + sizeof(UserEventData) +
num_ports * sizeof(PortDescriptor),
num_payload_bytes, num_ports * sizeof(PortName)) {
num_ports_ = num_ports;
}
Message::Message(size_t num_header_bytes,
size_t num_payload_bytes,
size_t num_ports_bytes)
: start_(nullptr),
num_header_bytes_(num_header_bytes),
num_ports_bytes_(num_ports_bytes),
num_payload_bytes_(num_payload_bytes) {
}
void Message::InitializeUserMessageHeader(void* start) {
start_ = static_cast<char*>(start);
memset(start_, 0, num_header_bytes_);
GetMutableEventHeader(this)->type = EventType::kUser;
GetMutableEventData<UserEventData>(this)->num_ports =
static_cast<uint32_t>(num_ports_);
}
} // namespace ports
} // namespace edk
} // namespace mojo
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_SYSTEM_PORTS_MESSAGE_H_
#define MOJO_EDK_SYSTEM_PORTS_MESSAGE_H_
#include <stddef.h>
#include <memory>
#include "mojo/edk/system/ports/name.h"
namespace mojo {
namespace edk {
namespace ports {
// A message consists of a header (array of bytes), payload (array of bytes)
// and an array of ports. The header is used by the Node implementation.
//
// This class is designed to be subclassed, and the subclass is responsible for
// providing the underlying storage. The header size will be aligned, and it
// should be followed in memory by the array of ports and finally the payload.
//
// NOTE: This class does not manage the lifetime of the ports it references.
class Message {
public:
virtual ~Message() {}
// Inspect the message at |bytes| and return the size of each section. Returns
// |false| if the message is malformed and |true| otherwise.
static bool Parse(const void* bytes,
size_t num_bytes,
size_t* num_header_bytes,
size_t* num_payload_bytes,
size_t* num_ports_bytes);
void* mutable_header_bytes() { return start_; }
const void* header_bytes() const { return start_; }
size_t num_header_bytes() const { return num_header_bytes_; }
void* mutable_payload_bytes() {
return start_ + num_header_bytes_ + num_ports_bytes_;
}
const void* payload_bytes() const {
return const_cast<Message*>(this)->mutable_payload_bytes();
}
size_t num_payload_bytes() const { return num_payload_bytes_; }
PortName* mutable_ports() {
return reinterpret_cast<PortName*>(start_ + num_header_bytes_);
}
const PortName* ports() const {
return const_cast<Message*>(this)->mutable_ports();
}
size_t num_ports_bytes() const { return num_ports_bytes_; }
size_t num_ports() const { return num_ports_bytes_ / sizeof(PortName); }
protected:
// Constructs a new Message base for a user message.
//
// Note: You MUST call InitializeUserMessageHeader() before this Message is
// ready for transmission.
Message(size_t num_payload_bytes, size_t num_ports);
// Constructs a new Message base for an internal message. Do NOT call
// InitializeUserMessageHeader() when using this constructor.
Message(size_t num_header_bytes,
size_t num_payload_bytes,
size_t num_ports_bytes);
Message(const Message& other) = delete;
void operator=(const Message& other) = delete;
// Initializes the header in a newly allocated message buffer to carry a
// user message.
void InitializeUserMessageHeader(void* start);
// Note: storage is [header][ports][payload].
char* start_ = nullptr;
size_t num_ports_ = 0;
size_t num_header_bytes_ = 0;
size_t num_ports_bytes_ = 0;
size_t num_payload_bytes_ = 0;
};
using ScopedMessage = std::unique_ptr<Message>;
} // namespace ports
} // namespace edk
} // namespace mojo
#endif // MOJO_EDK_SYSTEM_PORTS_MESSAGE_H_
......@@ -9,17 +9,17 @@ namespace mojo {
namespace edk {
namespace ports {
class Message;
class UserMessageEvent;
// An interface which can be implemented to filter port messages according to
// An interface which can be implemented to user message events according to
// arbitrary policy.
class MessageFilter {
public:
virtual ~MessageFilter() {}
// Returns true of |message| should be accepted by whomever is applying this
// Returns true if |message| should be accepted by whomever is applying this
// filter. See MessageQueue::GetNextMessage(), for example.
virtual bool Match(const Message& message) = 0;
virtual bool Match(const UserMessageEvent& message) = 0;
};
} // namespace ports
......
......@@ -7,20 +7,16 @@
#include <algorithm>
#include "base/logging.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/message_filter.h"
namespace mojo {
namespace edk {
namespace ports {
inline uint64_t GetSequenceNum(const ScopedMessage& message) {
return GetEventData<UserEventData>(*message)->sequence_num;
}
// Used by std::{push,pop}_heap functions
inline bool operator<(const ScopedMessage& a, const ScopedMessage& b) {
return GetSequenceNum(a) > GetSequenceNum(b);
inline bool operator<(const std::unique_ptr<UserMessageEvent>& a,
const std::unique_ptr<UserMessageEvent>& b) {
return a->sequence_num() > b->sequence_num();
}
MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {}
......@@ -42,12 +38,12 @@ MessageQueue::~MessageQueue() {
}
bool MessageQueue::HasNextMessage() const {
return !heap_.empty() && GetSequenceNum(heap_[0]) == next_sequence_num_;
return !heap_.empty() && heap_[0]->sequence_num() == next_sequence_num_;
}
void MessageQueue::GetNextMessage(ScopedMessage* message,
void MessageQueue::GetNextMessage(std::unique_ptr<UserMessageEvent>* message,
MessageFilter* filter) {
if (!HasNextMessage() || (filter && !filter->Match(*heap_[0].get()))) {
if (!HasNextMessage() || (filter && !filter->Match(*heap_[0]))) {
message->reset();
return;
}
......@@ -59,10 +55,8 @@ void MessageQueue::GetNextMessage(ScopedMessage* message,
next_sequence_num_++;
}
void MessageQueue::AcceptMessage(ScopedMessage message,
void MessageQueue::AcceptMessage(std::unique_ptr<UserMessageEvent> message,
bool* has_next_message) {
DCHECK(GetEventHeader(*message)->type == EventType::kUser);
// TODO: Handle sequence number roll-over.
heap_.emplace_back(std::move(message));
......@@ -71,7 +65,7 @@ void MessageQueue::AcceptMessage(ScopedMessage message,
if (!signalable_) {
*has_next_message = false;
} else {
*has_next_message = (GetSequenceNum(heap_[0]) == next_sequence_num_);
*has_next_message = (heap_[0]->sequence_num() == next_sequence_num_);
}
}
......
......@@ -8,12 +8,12 @@
#include <stdint.h>
#include <deque>
#include <functional>
#include <limits>
#include <memory>
#include <vector>
#include "base/macros.h"
#include "mojo/edk/system/ports/message.h"
#include "mojo/edk/system/ports/event.h"
namespace mojo {
namespace edk {
......@@ -42,7 +42,8 @@ class MessageQueue {
// Gives ownership of the message. If |filter| is non-null, the next message
// will only be retrieved if the filter successfully matches it.
void GetNextMessage(ScopedMessage* message, MessageFilter* filter);
void GetNextMessage(std::unique_ptr<UserMessageEvent>* message,
MessageFilter* filter);
// Takes ownership of the message. Note: Messages are ordered, so while we
// have added a message to the queue, we may still be waiting on a message
......@@ -53,13 +54,14 @@ class MessageQueue {
// until GetNextMessage is called enough times to return a null message.
// In other words, has_next_message acts like an edge trigger.
//
void AcceptMessage(ScopedMessage message, bool* has_next_message);
void AcceptMessage(std::unique_ptr<UserMessageEvent> message,
bool* has_next_message);
// Returns all of the ports referenced by messages in this message queue.
void GetReferencedPorts(std::deque<PortName>* ports);
private:
std::vector<ScopedMessage> heap_;
std::vector<std::unique_ptr<UserMessageEvent>> heap_;
uint64_t next_sequence_num_;
bool signalable_ = true;
......
This diff is collapsed.
......@@ -15,14 +15,11 @@
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/message.h"
#include "mojo/edk/system/ports/name.h"
#include "mojo/edk/system/ports/port.h"
#include "mojo/edk/system/ports/port_ref.h"
#include "mojo/edk/system/ports/user_data.h"
#undef SendMessage // Gah, windows
namespace mojo {
namespace edk {
namespace ports {
......@@ -114,16 +111,17 @@ class Node {
// available. Ownership of |filter| is not taken, and it must outlive the
// extent of this call.
int GetMessage(const PortRef& port_ref,
ScopedMessage* message,
std::unique_ptr<UserMessageEvent>* message,
MessageFilter* filter);
// Sends a message from the specified port to its peer. Note that the message
// notification may arrive synchronously (via PortStatusChanged() on the
// delegate) if the peer is local to this Node.
int SendMessage(const PortRef& port_ref, ScopedMessage message);
int SendUserMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent> message);
// Corresponding to NodeDelegate::ForwardMessage.
int AcceptMessage(ScopedMessage message);
// Corresponding to NodeDelegate::ForwardEvent.
int AcceptEvent(ScopedEvent event);
// Called to merge two ports with each other. If you have two independent
// port pairs A <=> B and C <=> D, the net result of merging B and C is a
......@@ -155,13 +153,12 @@ class Node {
// Note: Functions that end with _Locked require |ports_lock_| to be held
// before calling.
int OnUserMessage(ScopedMessage message);
int OnPortAccepted(const PortName& port_name);
int OnObserveProxy(const PortName& port_name,
const ObserveProxyEventData& event);
int OnObserveProxyAck(const PortName& port_name, uint64_t last_sequence_num);
int OnObserveClosure(const PortName& port_name, uint64_t last_sequence_num);
int OnMergePort(const PortName& port_name, const MergePortEventData& event);
int OnUserMessage(std::unique_ptr<UserMessageEvent> message);
int OnPortAccepted(std::unique_ptr<PortAcceptedEvent> event);
int OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event);
int OnObserveProxyAck(std::unique_ptr<ObserveProxyAckEvent> event);
int OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event);
int OnMergePort(std::unique_ptr<MergePortEvent> event);
int AddPortWithName(const PortName& port_name, scoped_refptr<Port> port);
void ErasePort(const PortName& port_name);
......@@ -169,18 +166,19 @@ class Node {
scoped_refptr<Port> GetPort(const PortName& port_name);
scoped_refptr<Port> GetPort_Locked(const PortName& port_name);
int SendMessageInternal(const PortRef& port_ref, ScopedMessage* message);
int SendUserMessageInternal(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message);
int MergePorts_Locked(const PortRef& port0_ref, const PortRef& port1_ref);
void WillSendPort(const LockedPort& port,
const NodeName& to_node_name,
PortName* port_name,
PortDescriptor* port_descriptor);
Event::PortDescriptor* port_descriptor);
int AcceptPort(const PortName& port_name,
const PortDescriptor& port_descriptor);
const Event::PortDescriptor& port_descriptor);
int WillSendMessage_Locked(const LockedPort& port,
const PortName& port_name,
Message* message);
int WillForwardUserMessage_Locked(const LockedPort& port,
const PortName& port_name,
UserMessageEvent* message);
int BeginProxying_Locked(const LockedPort& port, const PortName& port_name);
int BeginProxying(PortRef port_ref);
int ForwardMessages_Locked(const LockedPort& port, const PortName& port_name);
......@@ -191,30 +189,13 @@ class Node {
void DestroyAllPortsWithPeer(const NodeName& node_name,
const PortName& port_name);
ScopedMessage NewInternalMessage_Helper(const PortName& port_name,
const EventType& type,
const void* data,
size_t num_data_bytes);
ScopedMessage NewInternalMessage(const PortName& port_name,
const EventType& type) {
return NewInternalMessage_Helper(port_name, type, nullptr, 0);
}
template <typename EventData>
ScopedMessage NewInternalMessage(const PortName& port_name,
const EventType& type,
const EventData& data) {
return NewInternalMessage_Helper(port_name, type, &data, sizeof(data));
}
const NodeName name_;
NodeDelegate* const delegate_;
// Guards |ports_| as well as any operation which needs to hold multiple port
// locks simultaneously. Usage of this is subtle: it must NEVER be acquired
// after a Port lock is acquired, and it must ALWAYS be acquired before
// calling WillSendMessage_Locked or ForwardMessages_Locked.
// calling WillForwardUserMessage_Locked or ForwardMessages_Locked.
base::Lock ports_lock_;
std::unordered_map<PortName, scoped_refptr<Port>> ports_;
......
......@@ -7,7 +7,7 @@
#include <stddef.h>
#include "mojo/edk/system/ports/message.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/name.h"
#include "mojo/edk/system/ports/port_ref.h"
......@@ -22,18 +22,12 @@ class NodeDelegate {
// Port names should be difficult to guess.
virtual void GenerateRandomPortName(PortName* port_name) = 0;
// Allocate a message, including a header that can be used by the Node
// implementation. |num_header_bytes| will be aligned. The newly allocated
// memory need not be zero-filled.
virtual void AllocMessage(size_t num_header_bytes,
ScopedMessage* message) = 0;
// Forward an event asynchronously to the specified node. This method MUST NOT
// synchronously call any methods on Node.
virtual void ForwardEvent(const NodeName& node, ScopedEvent event) = 0;
// Forward a message asynchronously to the specified node. This method MUST
// NOT synchronously call any methods on Node.
virtual void ForwardMessage(const NodeName& node, ScopedMessage message) = 0;
// Broadcast a message to all nodes.
virtual void BroadcastMessage(ScopedMessage message) = 0;
// Broadcast an event to all nodes.
virtual void BroadcastEvent(ScopedEvent event) = 0;
// Indicates that the port's status has changed recently. Use Node::GetStatus
// to query the latest status of the port. Note, this event could be spurious
......
......@@ -13,6 +13,7 @@
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/message_queue.h"
#include "mojo/edk/system/ports/user_data.h"
......@@ -37,7 +38,7 @@ class Port : public base::RefCountedThreadSafe<Port> {
uint64_t next_sequence_num_to_send;
uint64_t last_sequence_num_to_receive;
MessageQueue message_queue;
std::unique_ptr<std::pair<NodeName, ScopedMessage>> send_on_proxy_removal;
std::unique_ptr<std::pair<NodeName, ScopedEvent>> send_on_proxy_removal;
scoped_refptr<UserData> user_data;
bool remove_proxy_on_last_message;
bool peer_closed;
......
This diff is collapsed.
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_SYSTEM_PORTS_USER_MESSAGE_H_
#define MOJO_EDK_SYSTEM_PORTS_USER_MESSAGE_H_
#include "base/macros.h"
namespace mojo {
namespace edk {
namespace ports {
// Base type to use for any embedder-defined user message implementation. This
// class is intentionally empty.
//
// Provides a bit of type-safety help to subclasses since by design downcasting
// from this type is a common operation in embedders.
//
// Each subclass should define a static const instance of TypeInfo named
// |kUserMessageTypeInfo| and pass its address down to the UserMessage
// constructor. The type of a UserMessage can then be dynamically inspected by
// comparing |type_info()| to any subclass's |&kUserMessageTypeInfo|.
class UserMessage {
public:
struct TypeInfo {};
virtual ~UserMessage() {}
explicit UserMessage(const TypeInfo* type_info) : type_info_(type_info) {}
const TypeInfo* type_info() const { return type_info_; }
private:
const TypeInfo* const type_info_;
DISALLOW_COPY_AND_ASSIGN(UserMessage);
};
} // namespace ports
} // namespace edk
} // namespace mojo
#endif // MOJO_EDK_SYSTEM_PORTS_USER_MESSAGE_H_
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/edk/system/ports_message.h"
#include "base/memory/ptr_util.h"
#include "mojo/edk/system/node_channel.h"
namespace mojo {
namespace edk {
// static
std::unique_ptr<PortsMessage> PortsMessage::NewUserMessage(
size_t num_payload_bytes,
size_t num_ports,
size_t num_handles) {
return base::WrapUnique(
new PortsMessage(num_payload_bytes, num_ports, num_handles));
}
PortsMessage::~PortsMessage() {}
PortsMessage::PortsMessage(size_t num_payload_bytes,
size_t num_ports,
size_t num_handles)
: ports::Message(num_payload_bytes, num_ports) {
size_t size = num_header_bytes_ + num_ports_bytes_ + num_payload_bytes;
void* ptr;
channel_message_ = NodeChannel::CreatePortsMessage(size, &ptr, num_handles);
InitializeUserMessageHeader(ptr);
}
PortsMessage::PortsMessage(size_t num_header_bytes,
size_t num_payload_bytes,
size_t num_ports_bytes,
Channel::MessagePtr channel_message)
: ports::Message(num_header_bytes,
num_payload_bytes,
num_ports_bytes) {
if (channel_message) {
channel_message_ = std::move(channel_message);
void* data;
size_t num_data_bytes;
NodeChannel::GetPortsMessageData(channel_message_.get(), &data,
&num_data_bytes);
start_ = static_cast<char*>(data);
} else {
// TODO: Clean this up. In practice this branch of the constructor should
// only be reached from Node-internal calls to AllocMessage, which never
// carry ports or non-header bytes.
CHECK_EQ(num_payload_bytes, 0u);
CHECK_EQ(num_ports_bytes, 0u);
void* ptr;
channel_message_ =
NodeChannel::CreatePortsMessage(num_header_bytes, &ptr, 0);
start_ = static_cast<char*>(ptr);
}
}
} // namespace edk
} // namespace mojo
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_SYSTEM_PORTS_MESSAGE_H__
#define MOJO_EDK_SYSTEM_PORTS_MESSAGE_H__
#include <memory>
#include <utility>
#include "mojo/edk/embedder/platform_handle_vector.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/ports/message.h"
#include "mojo/edk/system/ports/name.h"
namespace mojo {
namespace edk {
class NodeController;
class PortsMessage : public ports::Message {
public:
static std::unique_ptr<PortsMessage> NewUserMessage(size_t num_payload_bytes,
size_t num_ports,
size_t num_handles);
~PortsMessage() override;
size_t num_handles() const { return channel_message_->num_handles(); }
bool has_handles() const { return channel_message_->has_handles(); }
void SetHandles(ScopedPlatformHandleVectorPtr handles) {
channel_message_->SetHandles(std::move(handles));
}
ScopedPlatformHandleVectorPtr TakeHandles() {
return channel_message_->TakeHandles();
}
Channel::MessagePtr TakeChannelMessage() {
return std::move(channel_message_);
}
void set_source_node(const ports::NodeName& name) { source_node_ = name; }
const ports::NodeName& source_node() const { return source_node_; }
private:
friend class NodeController;
// Construct a new user PortsMessage backed by a new Channel::Message.
PortsMessage(size_t num_payload_bytes, size_t num_ports, size_t num_handles);
// Construct a new PortsMessage backed by a Channel::Message. If
// |channel_message| is null, a new one is allocated internally.
PortsMessage(size_t num_header_bytes,
size_t num_payload_bytes,
size_t num_ports_bytes,
Channel::MessagePtr channel_message);
Channel::MessagePtr channel_message_;
// The node name from which this message was received, if known.
ports::NodeName source_node_ = ports::kInvalidNodeName;
};
} // namespace edk
} // namespace mojo
#endif // MOJO_EDK_SYSTEM_PORTS_MESSAGE_H__
This diff is collapsed.
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_SYSTEM_USER_MESSAGE_IMPL_H_
#define MOJO_EDK_SYSTEM_USER_MESSAGE_IMPL_H_
#include <memory>
#include <utility>
#include "base/macros.h"
#include "mojo/edk/embedder/platform_handle_vector.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/dispatcher.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/name.h"
#include "mojo/edk/system/ports/port_ref.h"
#include "mojo/edk/system/ports/user_message.h"
#include "mojo/edk/system/system_impl_export.h"
#include "mojo/public/c/system/message_pipe.h"
#include "mojo/public/c/system/types.h"
namespace mojo {
namespace edk {
class NodeController;
// UserMessageImpl is the sole implementation of ports::UserMessage used to
// attach message data to any ports::UserMessageEvent.
//
// A UserMessageImpl may be either serialized or unserialized. Unserialized
// instances are serialized lazily only when necessary, i.e., if and when
// Serialize() is called to obtain a serialized message for wire transfer.
//
// TODO(crbug.com/725321): Implement support for unserialized messages.
class MOJO_SYSTEM_IMPL_EXPORT UserMessageImpl
: public NON_EXPORTED_BASE(ports::UserMessage) {
public:
static const TypeInfo kUserMessageTypeInfo;
~UserMessageImpl() override;
// Creates a new ports::UserMessageEvent with an attached serialized
// UserMessageImpl. May fail iff one or more |dispatchers| fails to serialize
// (e.g. due to it being in an invalid state.)
//
// Upon success, MOJO_RESULT_OK is returned and the new UserMessageEvent is
// stored in |*out_event|.
static MojoResult CreateEventForNewSerializedMessage(
uint32_t num_bytes,
const Dispatcher::DispatcherInTransit* dispatchers,
uint32_t num_dispatchers,
std::unique_ptr<ports::UserMessageEvent>* out_event);
// Creates a new UserMessageImpl from an existing serialized message buffer
// which was read from a Channel. Takes ownership of |channel_message|.
// |payload| and |payload_size| represent the range of bytes within
// |channel_message| which should be parsed by this call.
static std::unique_ptr<UserMessageImpl> CreateFromChannelMessage(
Channel::MessagePtr channel_message,
void* payload,
size_t payload_size);
// Reads a message from |port| on |node_controller|'s Node.
//
// The message may or may not require deserialization. If the read message is
// unserialized, it must have been sent from within the same process that's
// receiving it and this call merely passes ownership of the message object
// back out of the ports layer. In this case, |read_any_size| must be true,
// |*out_event| will own the read message upon return, and all other arguments
// are ignored.
//
// If the read message is still serialized, it must have been created by
// CreateFromChannelMessage() above whenever its bytes were first read from a
// Channel. In this case, the message will be taken form the port and returned
// in |*out_event|, if and only iff |read_any_size| is true or both
// |*num_bytes| and |*num_handles| are sufficiently large to contain the
// contents of the message. Upon success this returns |MOJO_RESULT_OK|, and
// updates |*num_bytes| and |*num_handles| with the actual size of the read
// message.
//
// Upon failure this returns any of various error codes detailed by the
// documentation for MojoReadMessage/MojoReadMessageNew in
// src/mojo/public/c/system/message_pipe.h.
static MojoResult ReadMessageEventFromPort(
NodeController* node_controller,
const ports::PortRef& port,
bool read_any_size,
bool may_discard,
uint32_t* num_bytes,
MojoHandle* handles,
uint32_t* num_handles,
std::unique_ptr<ports::UserMessageEvent>* out_event);
// Produces a serialized Channel::Message from the UserMessageEvent in
// |event|. |event| must have a UserMessageImpl instance attached.
//
// If the attached message is not already serialized, it is serialized into a
// new Channel::Message; otherwise this simply passes ownership of the
// internally owned serialized data.
//
// In any case, |message_event| is serialized into the front of the message
// payload before returning.
static Channel::MessagePtr SerializeEventMessage(
std::unique_ptr<ports::UserMessageEvent> event);
// TODO(crbug.com/725321): Support unserialized messages.
bool HasContext() const { return false; }
uintptr_t context() const { return 0; }
bool IsSerialized() const { return true; }
void* user_payload() {
DCHECK(IsSerialized());
return user_payload_;
}
const void* user_payload() const {
DCHECK(IsSerialized());
return user_payload_;
}
size_t user_payload_size() const {
DCHECK(IsSerialized());
return user_payload_size_;
}
size_t num_handles() const;
void set_source_node(const ports::NodeName& name) { source_node_ = name; }
const ports::NodeName& source_node() const { return source_node_; }
private:
class ReadMessageFilter;
// Constructs a serialized UserMessageImpl backed by a new Channel::Message
// with enough storage for the given number of serialized event, header, and
// payload bytes; transferred ports; and system handles.
UserMessageImpl(size_t event_size,
size_t header_size,
size_t payload_size,
size_t num_ports,
size_t num_handles);
// Creates a serialized UserMessageImpl backed by an existing Channel::Message
// object. |header| and |user_payload| must be pointers into
// |channel_message|'s own storage, and |user_payload_size| is the number of
// bytes comprising the user message contents at |user_payload|.
UserMessageImpl(Channel::MessagePtr channel_message,
void* header,
void* user_payload,
size_t user_payload_size);
// Serialized message contents. May be null if this is not a serialized
// message.
Channel::MessagePtr channel_message_;
// Only valid if |channel_message_| is non-null. |header_| is the address
// of the UserMessageImpl's internal MessageHeader structure within the
// serialized message buffer. |user_payload_| is the address of the first byte
// after any serialized dispatchers, with the payload comprising the remaining
// |user_payload_size_| bytes of the message.
void* header_ = nullptr;
void* user_payload_ = nullptr;
size_t user_payload_size_ = 0;
// The node name from which this message was received, iff it came from
// out-of-process and the source is known.
ports::NodeName source_node_ = ports::kInvalidNodeName;
DISALLOW_COPY_AND_ASSIGN(UserMessageImpl);
};
} // namespace edk
} // namespace mojo
#endif // MOJO_EDK_SYSTEM_USER_MESSAGE_IMPL_H_
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment