Commit 658b86b1 authored by darin@chromium.org's avatar darin@chromium.org

Adds a class named Connector that connects a MessagePipe to a MessageReceiver.

MessageReceiver is the interface used by the generated proxies and stubs to transmit messages. Connector is responsible for reading and writing those messages on a MessagePipe. A Connector "is a" MessageReceiver so that it can accept outbound messages, and "has a" MessageReceiver so it can dispatch incoming messages.

R=viettrungluu@chromium.org

Review URL: https://codereview.chromium.org/54743003

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@233372 0039d316-1c4b-4281-b951-d872f2087c98
parent 88f10a25
...@@ -27,6 +27,8 @@ ...@@ -27,6 +27,8 @@
'mojo_system', 'mojo_system',
], ],
'sources': [ 'sources': [
'public/tests/simple_bindings_support.cc',
'public/tests/simple_bindings_support.h',
'public/tests/test_support.cc', 'public/tests/test_support.cc',
'public/tests/test_support.h', 'public/tests/test_support.h',
], ],
...@@ -37,10 +39,12 @@ ...@@ -37,10 +39,12 @@
'dependencies': [ 'dependencies': [
'../base/base.gyp:run_all_unittests', '../base/base.gyp:run_all_unittests',
'../testing/gtest.gyp:gtest', '../testing/gtest.gyp:gtest',
'mojo_bindings',
'mojo_public_test_support', 'mojo_public_test_support',
'mojo_system', 'mojo_system',
], ],
'sources': [ 'sources': [
'public/tests/bindings_connector_unittest.cc',
'public/tests/system_core_unittest.cc', 'public/tests/system_core_unittest.cc',
], ],
}, },
...@@ -251,12 +255,18 @@ ...@@ -251,12 +255,18 @@
'public/bindings/lib/bindings_internal.h', 'public/bindings/lib/bindings_internal.h',
'public/bindings/lib/bindings_serialization.cc', 'public/bindings/lib/bindings_serialization.cc',
'public/bindings/lib/bindings_serialization.h', 'public/bindings/lib/bindings_serialization.h',
'public/bindings/lib/bindings_support.cc',
'public/bindings/lib/bindings_support.h',
'public/bindings/lib/buffer.cc', 'public/bindings/lib/buffer.cc',
'public/bindings/lib/buffer.h', 'public/bindings/lib/buffer.h',
'public/bindings/lib/connector.cc',
'public/bindings/lib/connector.h',
'public/bindings/lib/message.cc', 'public/bindings/lib/message.cc',
'public/bindings/lib/message.h', 'public/bindings/lib/message.h',
'public/bindings/lib/message_builder.cc', 'public/bindings/lib/message_builder.cc',
'public/bindings/lib/message_builder.h', 'public/bindings/lib/message_builder.h',
'public/bindings/lib/message_queue.cc',
'public/bindings/lib/message_queue.h',
], ],
}, },
{ {
......
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/bindings/lib/bindings_support.h"
#include <assert.h>
#include <stddef.h>
namespace mojo {
namespace {
BindingsSupport* g_bindings_support = NULL;
}
// static
void BindingsSupport::Set(BindingsSupport* support) {
g_bindings_support = support;
}
// static
BindingsSupport* BindingsSupport::Get() {
assert(g_bindings_support);
return g_bindings_support;
}
} // namespace mojo
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_PUBLIC_BINDINGS_LIB_BINDINGS_SUPPORT_H_
#define MOJO_PUBLIC_BINDINGS_LIB_BINDINGS_SUPPORT_H_
#include "mojo/public/system/core.h"
namespace mojo {
// An embedder of the bindings library MUST implement BindingsSupport and call
// BindingsSupport::Set prior to using the library.
class BindingsSupport {
public:
class AsyncWaitCallback {
public:
virtual void OnHandleReady(MojoResult result) = 0;
};
// Asynchronously call MojoWait on a background thread, and return the result
// to the current thread via the given AsyncWaitCallback.
virtual bool AsyncWait(Handle handle,
MojoWaitFlags flags,
MojoDeadline deadline,
AsyncWaitCallback* callback) = 0;
// Cancel an existing call to AsyncWait with the given callback. The
// callback's OnHandleReady method should not be called in this case.
virtual void CancelWait(AsyncWaitCallback* callback) = 0;
static void Set(BindingsSupport* support);
static BindingsSupport* Get();
};
} // namespace mojo
#endif // MOJO_PUBLIC_BINDINGS_LIB_BINDINGS_SUPPORT_H_
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/bindings/lib/connector.h"
#include <assert.h>
#include <stdlib.h>
#include <algorithm>
namespace mojo {
// ----------------------------------------------------------------------------
Connector::Connector(Handle message_pipe)
: message_pipe_(message_pipe),
incoming_receiver_(NULL),
error_(false) {
}
Connector::~Connector() {
if (read_callback_.IsPending())
read_callback_.Cancel();
if (write_callback_.IsPending())
write_callback_.Cancel();
}
void Connector::SetIncomingReceiver(MessageReceiver* receiver) {
assert(!incoming_receiver_);
incoming_receiver_ = receiver;
if (incoming_receiver_)
WaitToReadMore();
}
bool Connector::Accept(Message* message) {
if (error_)
return false;
write_queue_.Push(message);
WriteMore();
return !error_;
}
void Connector::OnHandleReady(Callback* callback, MojoResult result) {
if (callback == &read_callback_)
ReadMore();
if (callback == &write_callback_)
WriteMore();
}
void Connector::WaitToReadMore() {
read_callback_.SetOwnerToNotify(this);
bool ok = BindingsSupport::Get()->AsyncWait(message_pipe_,
MOJO_WAIT_FLAG_READABLE,
MOJO_DEADLINE_INDEFINITE,
&read_callback_);
if (!ok)
error_ = true;
}
void Connector::WaitToWriteMore() {
write_callback_.SetOwnerToNotify(this);
bool ok = BindingsSupport::Get()->AsyncWait(message_pipe_,
MOJO_WAIT_FLAG_WRITABLE,
MOJO_DEADLINE_INDEFINITE,
&write_callback_);
if (!ok)
error_ = true;
}
void Connector::ReadMore() {
for (;;) {
MojoResult rv;
uint32_t num_bytes = 0, num_handles = 0;
rv = ReadMessage(message_pipe_,
NULL,
&num_bytes,
NULL,
&num_handles,
MOJO_READ_MESSAGE_FLAG_NONE);
if (rv == MOJO_RESULT_NOT_FOUND) {
WaitToReadMore();
break;
}
if (rv != MOJO_RESULT_RESOURCE_EXHAUSTED) {
error_ = true;
break;
}
Message message;
message.data = static_cast<MessageData*>(malloc(num_bytes));
message.handles.resize(num_handles);
rv = ReadMessage(message_pipe_,
message.data,
&num_bytes,
&message.handles[0],
&num_handles,
MOJO_READ_MESSAGE_FLAG_NONE);
if (rv != MOJO_RESULT_OK) {
error_ = true;
break;
}
incoming_receiver_->Accept(&message);
}
}
void Connector::WriteMore() {
while (!write_queue_.IsEmpty()) {
const Message* message = write_queue_.Peek();
MojoResult rv = WriteMessage(message_pipe_,
message->data,
message->data->header.num_bytes,
message->handles.data(),
message->handles.size(),
MOJO_WRITE_MESSAGE_FLAG_NONE);
if (rv == MOJO_RESULT_OK) {
// TODO(darin): Handles were successfully transferred, and so we need
// to take care not to Close them here.
write_queue_.Pop();
continue; // Write another message.
}
error_ = true;
break;
}
}
// ----------------------------------------------------------------------------
Connector::Callback::Callback()
: owner_(NULL) {
}
void Connector::Callback::Cancel() {
owner_ = NULL;
BindingsSupport::Get()->CancelWait(this);
}
void Connector::Callback::SetOwnerToNotify(Connector* owner) {
assert(!owner_);
owner_ = owner;
}
bool Connector::Callback::IsPending() const {
return owner_ != NULL;
}
void Connector::Callback::OnHandleReady(MojoResult result) {
assert(owner_);
Connector* owner = NULL;
std::swap(owner, owner_);
owner->OnHandleReady(this, result);
}
} // namespace mojo
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_PUBLIC_BINDINGS_LIB_CONNECTOR_H_
#define MOJO_PUBLIC_BINDINGS_LIB_CONNECTOR_H_
#include "mojo/public/bindings/lib/bindings_support.h"
#include "mojo/public/bindings/lib/message.h"
#include "mojo/public/bindings/lib/message_queue.h"
#include "mojo/public/system/core.h"
namespace mojo {
// The Connector class is responsible for performing read/write operations on a
// MessagePipe. It writes messages it receives through the MessageReceiver
// interface that it subclasses, and it forwards messages it reads through the
// MessageReceiver interface assigned as its incoming receiver.
//
// NOTE: MessagePipe I/O is non-blocking.
//
class Connector : public MessageReceiver {
public:
// The Connector does not take ownership of |message_pipe|.
// TODO(darin): Perhaps it should take ownership.
explicit Connector(Handle message_pipe);
virtual ~Connector();
// Sets the receiver to handle messages read from the message pipe. The
// Connector will only read messages from the pipe if an incoming receiver
// has been set.
void SetIncomingReceiver(MessageReceiver* receiver);
// Returns true if an error was encountered while reading from or writing to
// the message pipe.
bool EncounteredError() const { return error_; }
// MessageReceiver implementation:
virtual bool Accept(Message* message) MOJO_OVERRIDE;
private:
class Callback : public BindingsSupport::AsyncWaitCallback {
public:
Callback();
void Cancel();
void SetOwnerToNotify(Connector* owner);
bool IsPending() const;
virtual void OnHandleReady(MojoResult result) MOJO_OVERRIDE;
private:
Connector* owner_;
};
friend class Callback;
void OnHandleReady(Callback* callback, MojoResult result);
void WaitToReadMore();
void WaitToWriteMore();
void ReadMore();
void WriteMore();
Handle message_pipe_;
MessageReceiver* incoming_receiver_;
MessageQueue write_queue_;
Callback read_callback_;
Callback write_callback_;
bool error_;
};
} // namespace mojo
#endif // MOJO_PUBLIC_BINDINGS_LIB_CONNECTOR_H_
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <algorithm>
namespace mojo { namespace mojo {
Message::Message() Message::Message()
...@@ -14,6 +16,12 @@ Message::Message() ...@@ -14,6 +16,12 @@ Message::Message()
Message::~Message() { Message::~Message() {
free(data); free(data);
// TODO(darin): Need to Close any handles so they don't leak.
}
void Message::Swap(Message* other) {
std::swap(data, other->data);
std::swap(handles, other->handles);
} }
} // namespace mojo } // namespace mojo
...@@ -27,18 +27,30 @@ MOJO_COMPILE_ASSERT(sizeof(MessageData) == 9, bad_sizeof_MessageData); ...@@ -27,18 +27,30 @@ MOJO_COMPILE_ASSERT(sizeof(MessageData) == 9, bad_sizeof_MessageData);
#pragma pack(pop) #pragma pack(pop)
struct Message { // Message is a holder for the data and handles to be sent over a MessagePipe.
// Message owns its data, but a consumer of Message is free to manipulate the
// data member or replace it. If replacing, then be sure to use |malloc| to
// allocate the memory.
class Message {
public:
Message(); Message();
~Message(); ~Message();
void Swap(Message* other);
MessageData* data; // Heap-allocated. MessageData* data; // Heap-allocated.
std::vector<Handle> handles; std::vector<Handle> handles;
private:
MOJO_DISALLOW_COPY_AND_ASSIGN(Message);
}; };
class MessageReceiver { class MessageReceiver {
public: public:
// The receiver may mutate the given message or take ownership of its // The receiver may mutate the given message or take ownership of its
// |message->data| member by setting it to NULL. // |message->data| member by setting it to NULL. Returns true if the message
// was accepted and false otherwise, indicating that the message was invalid
// or malformed.
virtual bool Accept(Message* message) = 0; virtual bool Accept(Message* message) = 0;
}; };
......
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/bindings/lib/message_queue.h"
#include <assert.h>
#include <stddef.h>
#include "mojo/public/bindings/lib/message.h"
namespace mojo {
MessageQueue::MessageQueue() {
}
MessageQueue::~MessageQueue() {
while (!queue_.empty())
Pop();
}
bool MessageQueue::IsEmpty() const {
return queue_.empty();
}
const Message* MessageQueue::Peek() const {
assert(!queue_.empty());
return queue_.front();
}
void MessageQueue::Push(Message* message) {
queue_.push(new Message());
queue_.back()->Swap(message);
}
void MessageQueue::Pop(Message* message) {
assert(!queue_.empty());
queue_.front()->Swap(message);
Pop();
}
void MessageQueue::Pop() {
assert(!queue_.empty());
delete queue_.front();
queue_.pop();
}
} // namespace mojo
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_PUBLIC_BINDINGS_LIB_MESSAGE_QUEUE_H_
#define MOJO_PUBLIC_BINDINGS_LIB_MESSAGE_QUEUE_H_
#include <queue>
#include "mojo/public/system/macros.h"
namespace mojo {
class Message;
// A queue for Message objects.
class MessageQueue {
public:
MessageQueue();
~MessageQueue();
bool IsEmpty() const;
const Message* Peek() const;
// This method transfers ownership of |message->data| and |message->handles|
// to the message queue, resetting |message| in the process.
void Push(Message* message);
// Removes the next message from the queue, transferring ownership of its
// data and handles to the given |message|.
void Pop(Message* message);
// Removes the next message from the queue, discarding its data and handles.
// This is meant to be used in conjunction with |Peek|.
void Pop();
private:
std::queue<Message*> queue_;
MOJO_DISALLOW_COPY_AND_ASSIGN(MessageQueue);
};
} // namespace mojo
#endif // MOJO_PUBLIC_BINDINGS_LIB_MESSAGE_QUEUE_H_
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stdlib.h>
#include <string.h>
#include "mojo/public/bindings/lib/bindings_support.h"
#include "mojo/public/bindings/lib/connector.h"
#include "mojo/public/bindings/lib/message_queue.h"
#include "mojo/public/tests/simple_bindings_support.h"
#include "mojo/public/tests/test_support.h"
namespace mojo {
namespace test {
class MessageAccumulator : public MessageReceiver {
public:
MessageAccumulator() {
}
virtual bool Accept(Message* message) MOJO_OVERRIDE {
queue_.Push(message);
return true;
}
bool IsEmpty() const {
return queue_.IsEmpty();
}
void Pop(Message* message) {
queue_.Pop(message);
}
private:
MessageQueue queue_;
};
class BindingsConnectorTest : public TestBase {
public:
BindingsConnectorTest()
: handle0_(kInvalidHandle),
handle1_(kInvalidHandle) {
}
virtual void SetUp() OVERRIDE {
CreateMessagePipe(&handle0_, &handle1_);
}
virtual void TearDown() OVERRIDE {
Close(handle0_);
Close(handle1_);
}
void AllocMessage(const char* text, Message* message) {
size_t payload_size = strlen(text) + 1; // Plus null terminator.
size_t num_bytes = sizeof(MessageHeader) + payload_size;
message->data = static_cast<MessageData*>(malloc(num_bytes));
message->data->header.num_bytes = num_bytes;
message->data->header.name = 1;
memcpy(message->data->payload, text, payload_size);
}
void PumpMessages() {
bindings_support_.Process();
}
protected:
Handle handle0_;
Handle handle1_;
private:
SimpleBindingsSupport bindings_support_;
};
TEST_F(BindingsConnectorTest, Basic) {
Connector connector0(handle0_);
Connector connector1(handle1_);
const char kText[] = "hello world";
Message message;
AllocMessage(kText, &message);
connector0.Accept(&message);
MessageAccumulator accumulator;
connector1.SetIncomingReceiver(&accumulator);
PumpMessages();
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(std::string(kText),
std::string(
reinterpret_cast<char*>(message_received.data->payload)));
}
TEST_F(BindingsConnectorTest, Basic_EarlyIncomingReceiver) {
Connector connector0(handle0_);
Connector connector1(handle1_);
MessageAccumulator accumulator;
connector1.SetIncomingReceiver(&accumulator);
const char kText[] = "hello world";
Message message;
AllocMessage(kText, &message);
connector0.Accept(&message);
PumpMessages();
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(std::string(kText),
std::string(
reinterpret_cast<char*>(message_received.data->payload)));
}
TEST_F(BindingsConnectorTest, Basic_TwoMessages) {
Connector connector0(handle0_);
Connector connector1(handle1_);
const char* kText[] = { "hello", "world" };
for (size_t i = 0; i < arraysize(kText); ++i) {
Message message;
AllocMessage(kText[i], &message);
connector0.Accept(&message);
}
MessageAccumulator accumulator;
connector1.SetIncomingReceiver(&accumulator);
PumpMessages();
for (size_t i = 0; i < arraysize(kText); ++i) {
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(std::string(kText[i]),
std::string(
reinterpret_cast<char*>(message_received.data->payload)));
}
}
TEST_F(BindingsConnectorTest, WriteToClosedPipe) {
Connector connector0(handle0_);
const char kText[] = "hello world";
Message message;
AllocMessage(kText, &message);
Close(handle0_); // Close the handle before writing to it.
bool ok = connector0.Accept(&message);
EXPECT_FALSE(ok);
EXPECT_TRUE(connector0.EncounteredError());
}
} // namespace test
} // namespace mojo
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/tests/simple_bindings_support.h"
#include <stdlib.h>
namespace mojo {
namespace test {
SimpleBindingsSupport::SimpleBindingsSupport() {
BindingsSupport::Set(this);
}
SimpleBindingsSupport::~SimpleBindingsSupport() {
BindingsSupport::Set(NULL);
}
bool SimpleBindingsSupport::AsyncWait(Handle handle,
MojoWaitFlags flags,
MojoDeadline deadline,
AsyncWaitCallback* callback) {
Waiter waiter;
waiter.handle = handle;
waiter.flags = flags;
waiter.deadline = deadline;
waiter.callback = callback;
waiters_.push_back(waiter);
return true;
}
void SimpleBindingsSupport::CancelWait(AsyncWaitCallback* callback) {
std::list<Waiter>::iterator it = waiters_.begin();
while (it != waiters_.end()) {
if (it->callback == callback) {
std::list<Waiter>::iterator doomed = it++;
waiters_.erase(doomed);
} else {
++it;
}
}
}
void SimpleBindingsSupport::Process() {
typedef std::pair<AsyncWaitCallback*, MojoResult> Result;
std::list<Result> results;
// TODO(darin): Honor given deadline.
std::list<Waiter>::iterator it = waiters_.begin();
while (it != waiters_.end()) {
const Waiter& waiter = *it;
MojoResult result;
if (IsReady(waiter.handle, waiter.flags, &result)) {
results.push_back(std::make_pair(waiter.callback, result));
std::list<Waiter>::iterator doomed = it++;
waiters_.erase(doomed);
} else {
++it;
}
}
for (std::list<Result>::const_iterator it = results.begin();
it != results.end();
++it) {
it->first->OnHandleReady(it->second);
}
}
bool SimpleBindingsSupport::IsReady(Handle handle, MojoWaitFlags flags,
MojoResult* result) {
*result = Wait(handle, flags, 0);
return *result != MOJO_RESULT_DEADLINE_EXCEEDED;
}
} // namespace test
} // namespace mojo
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_PUBLIC_TESTS_SIMPLE_BINDINGS_SUPPORT_H_
#define MOJO_PUBLIC_TESTS_SIMPLE_BINDINGS_SUPPORT_H_
#include <list>
#include "mojo/public/bindings/lib/bindings_support.h"
namespace mojo {
namespace test {
class SimpleBindingsSupport : public BindingsSupport {
public:
SimpleBindingsSupport();
virtual ~SimpleBindingsSupport();
virtual bool AsyncWait(Handle handle,
MojoWaitFlags flags,
MojoDeadline deadline,
AsyncWaitCallback* callback) MOJO_OVERRIDE;
virtual void CancelWait(AsyncWaitCallback* callback) MOJO_OVERRIDE;
// This method is called by unit tests to check the status of any handles
// that we are asynchronously waiting on and to dispatch callbacks for any
// handles that are ready.
void Process();
private:
bool IsReady(Handle handle, MojoWaitFlags flags, MojoResult* result);
struct Waiter {
Handle handle;
MojoWaitFlags flags;
MojoDeadline deadline;
AsyncWaitCallback* callback;
};
std::list<Waiter> waiters_;
};
} // namespace test
} // namespace mojo
#endif // MOJO_PUBLIC_TESTS_SIMPLE_BINDINGS_SUPPORT_H_
...@@ -12,16 +12,13 @@ namespace mojo { ...@@ -12,16 +12,13 @@ namespace mojo {
namespace test { namespace test {
TestBase::TestBase() { TestBase::TestBase() {
if (!system::CoreImpl::Get())
system::CoreImpl::Init();
} }
TestBase::~TestBase() { TestBase::~TestBase() {
} }
void TestBase::SetUp() {
if (!system::CoreImpl::Get())
system::CoreImpl::Init();
}
void IterateAndReportPerf(const char* test_name, void IterateAndReportPerf(const char* test_name,
base::Callback<void()> single_iteration) { base::Callback<void()> single_iteration) {
// TODO(vtl): These should be specifiable using command-line flags. // TODO(vtl): These should be specifiable using command-line flags.
......
...@@ -18,8 +18,6 @@ class TestBase : public testing::Test { ...@@ -18,8 +18,6 @@ class TestBase : public testing::Test {
TestBase(); TestBase();
virtual ~TestBase(); virtual ~TestBase();
virtual void SetUp() OVERRIDE;
private: private:
DISALLOW_COPY_AND_ASSIGN(TestBase); DISALLOW_COPY_AND_ASSIGN(TestBase);
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment