Mojo: Make MessageInTransit more opaque.

Still more to do, but incremental change is less scary.

R=yzshen@chromium.org

Review URL: https://codereview.chromium.org/172953003

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@252128 0039d316-1c4b-4281-b951-d872f2087c98
parent 264498db
...@@ -193,9 +193,7 @@ void Channel::OnReadMessageForDownstream(const MessageInTransit& message) { ...@@ -193,9 +193,7 @@ void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
// We need to duplicate the message, because |EnqueueMessage()| will take // We need to duplicate the message, because |EnqueueMessage()| will take
// ownership of it. // ownership of it.
// TODO(vtl): Need to enforce limits on message size and handle count. // TODO(vtl): Need to enforce limits on message size and handle count.
MessageInTransit* own_message = MessageInTransit::Create( MessageInTransit* own_message = message.Clone();
message.type(), message.subtype(), message.bytes(), message.num_bytes(),
message.num_handles());
std::vector<DispatcherTransport> transports(message.num_handles()); std::vector<DispatcherTransport> transports(message.num_handles());
// TODO(vtl): Create dispatchers for handles. // TODO(vtl): Create dispatchers for handles.
// TODO(vtl): It's bad that the current API will create equivalent dispatchers // TODO(vtl): It's bad that the current API will create equivalent dispatchers
......
...@@ -51,21 +51,27 @@ MessageInTransit* MessageInTransit::Create(Type type, ...@@ -51,21 +51,27 @@ MessageInTransit* MessageInTransit::Create(Type type,
const size_t data_size = num_bytes; const size_t data_size = num_bytes;
const size_t size_with_header = sizeof(MessageInTransit) + data_size; const size_t size_with_header = sizeof(MessageInTransit) + data_size;
const size_t size_with_header_and_padding = const size_t buffer_size = RoundUpMessageAlignment(size_with_header);
RoundUpMessageAlignment(size_with_header);
char* buffer = static_cast<char*>( char* buffer = static_cast<char*>(base::AlignedAlloc(buffer_size,
base::AlignedAlloc(size_with_header_and_padding, kMessageAlignment)); kMessageAlignment));
// The buffer consists of the header (a |MessageInTransit|, constructed using // The buffer consists of the header (a |MessageInTransit|, constructed using
// a placement new), followed by the data, followed by padding (of zeros). // a placement new), followed by the data, followed by padding (of zeros).
MessageInTransit* rv = new (buffer) MessageInTransit( MessageInTransit* rv = new (buffer) MessageInTransit(
static_cast<uint32_t>(data_size), type, subtype, num_bytes, num_handles); static_cast<uint32_t>(data_size), type, subtype, num_bytes, num_handles);
memcpy(buffer + sizeof(MessageInTransit), bytes, num_bytes); memcpy(buffer + sizeof(MessageInTransit), bytes, num_bytes);
memset(buffer + size_with_header, 0, memset(buffer + size_with_header, 0, buffer_size - size_with_header);
size_with_header_and_padding - size_with_header);
return rv; return rv;
} }
MessageInTransit* MessageInTransit::Clone() const {
size_t buffer_size = main_buffer_size();
char* buffer = static_cast<char*>(base::AlignedAlloc(buffer_size,
kMessageAlignment));
memcpy(buffer, main_buffer(), buffer_size);
return reinterpret_cast<MessageInTransit*>(buffer);
}
void MessageInTransit::Destroy() { void MessageInTransit::Destroy() {
// No need to call the destructor, since we're POD. // No need to call the destructor, since we're POD.
base::AlignedFree(this); base::AlignedFree(this);
......
...@@ -48,9 +48,25 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { ...@@ -48,9 +48,25 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
uint32_t num_bytes, uint32_t num_bytes,
uint32_t num_handles); uint32_t num_handles);
// Destroys a |MessageInTransit| created using |Create()|. MessageInTransit* Clone() const;
// Destroys a |MessageInTransit| created using |Create()| or |Clone()|.
void Destroy(); void Destroy();
// Gets the "main" buffer for a |MessageInTransit|. A |MessageInTransit| can
// be serialized by writing the main buffer. The returned pointer will be
// aligned to a multiple of |kMessageAlignment| bytes, and the size of the
// buffer (see below) will also be a multiple of |kMessageAlignment|.
// TODO(vtl): Add a "secondary" buffer, so that this makes more sense.
const void* main_buffer() const {
return static_cast<const void*>(this);
}
// Gets the size of the main buffer (in number of bytes).
size_t main_buffer_size() const {
return RoundUpMessageAlignment(sizeof(*this) + header()->data_size);
}
// Gets the size of the data (in number of bytes). This is the full size of // Gets the size of the data (in number of bytes). This is the full size of
// the data that follows the header, and may include data other than the // the data that follows the header, and may include data other than the
// message data. (See also |num_bytes()|.) // message data. (See also |num_bytes()|.)
...@@ -77,10 +93,6 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { ...@@ -77,10 +93,6 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
return header()->num_handles; return header()->num_handles;
} }
size_t size_with_header_and_padding() const {
return RoundUpMessageAlignment(sizeof(*this) + header()->data_size);
}
Type type() const { return header()->type; } Type type() const { return header()->type; }
Subtype subtype() const { return header()->subtype; } Subtype subtype() const { return header()->subtype; }
EndpointId source_id() const { return header()->source_id; } EndpointId source_id() const { return header()->source_id; }
......
...@@ -258,8 +258,7 @@ void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { ...@@ -258,8 +258,7 @@ void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
DCHECK_EQ(reinterpret_cast<size_t>(message) % DCHECK_EQ(reinterpret_cast<size_t>(message) %
MessageInTransit::kMessageAlignment, 0u); MessageInTransit::kMessageAlignment, 0u);
// If we have the header, not the whole message.... // If we have the header, not the whole message....
if (read_buffer_num_valid_bytes_ < if (read_buffer_num_valid_bytes_ < message->main_buffer_size())
message->size_with_header_and_padding())
break; break;
// Dispatch the message. // Dispatch the message.
...@@ -272,8 +271,8 @@ void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { ...@@ -272,8 +271,8 @@ void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
did_dispatch_message = true; did_dispatch_message = true;
// Update our state. // Update our state.
read_buffer_start += message->size_with_header_and_padding(); read_buffer_start += message->main_buffer_size();
read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding(); read_buffer_num_valid_bytes_ -= message->main_buffer_size();
} }
// If we dispatched any messages, stop reading for now (and let the message // If we dispatched any messages, stop reading for now (and let the message
...@@ -353,12 +352,12 @@ bool RawChannelPosix::WriteFrontMessageNoLock() { ...@@ -353,12 +352,12 @@ bool RawChannelPosix::WriteFrontMessageNoLock() {
DCHECK(!write_message_queue_.empty()); DCHECK(!write_message_queue_.empty());
MessageInTransit* message = write_message_queue_.front(); MessageInTransit* message = write_message_queue_.front();
DCHECK_LT(write_message_offset_, message->size_with_header_and_padding()); DCHECK_LT(write_message_offset_, message->main_buffer_size());
size_t bytes_to_write = size_t bytes_to_write = message->main_buffer_size() - write_message_offset_;
message->size_with_header_and_padding() - write_message_offset_;
ssize_t bytes_written = HANDLE_EINTR( ssize_t bytes_written = HANDLE_EINTR(
write(fd_.get().fd, write(fd_.get().fd,
reinterpret_cast<char*>(message) + write_message_offset_, static_cast<const char*>(message->main_buffer()) +
write_message_offset_,
bytes_to_write)); bytes_to_write));
if (bytes_written < 0) { if (bytes_written < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) { if (errno != EAGAIN && errno != EWOULDBLOCK) {
......
...@@ -67,9 +67,8 @@ bool WriteTestMessageToHandle(const embedder::PlatformHandle& handle, ...@@ -67,9 +67,8 @@ bool WriteTestMessageToHandle(const embedder::PlatformHandle& handle,
MessageInTransit* message = MakeTestMessage(num_bytes); MessageInTransit* message = MakeTestMessage(num_bytes);
ssize_t write_size = HANDLE_EINTR( ssize_t write_size = HANDLE_EINTR(
write(handle.fd, message, message->size_with_header_and_padding())); write(handle.fd, message->main_buffer(), message->main_buffer_size()));
bool result = write_size == bool result = write_size == static_cast<ssize_t>(message->main_buffer_size());
static_cast<ssize_t>(message->size_with_header_and_padding());
message->Destroy(); message->Destroy();
return result; return result;
} }
...@@ -159,7 +158,7 @@ class TestMessageReaderAndChecker { ...@@ -159,7 +158,7 @@ class TestMessageReaderAndChecker {
} }
// If we've read the whole message.... // If we've read the whole message....
if (bytes_.size() >= message->size_with_header_and_padding()) { if (bytes_.size() >= message->main_buffer_size()) {
if (!CheckMessageData(message->data(), message->data_size())) { if (!CheckMessageData(message->data(), message->data_size())) {
LOG(ERROR) << "Incorrect message data."; LOG(ERROR) << "Incorrect message data.";
return false; return false;
...@@ -168,7 +167,7 @@ class TestMessageReaderAndChecker { ...@@ -168,7 +167,7 @@ class TestMessageReaderAndChecker {
// Erase message data. // Erase message data.
bytes_.erase(bytes_.begin(), bytes_.erase(bytes_.begin(),
bytes_.begin() + bytes_.begin() +
message->size_with_header_and_padding()); message->main_buffer_size());
return true; return true;
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment