Commit 23ae8173 authored by viettrungluu's avatar viettrungluu Committed by Commit bot

Mojo: Have |ProxyMessagePipeEndpoint|s constructed with a |ChannelEndpoint|.

This eliminates the need for |ProxyMessagePipeEndpoint::Attach()|.

R=brettw@chromium.org

Review URL: https://codereview.chromium.org/588193004

Cr-Commit-Position: refs/heads/master@{#296485}
parent faa99f8c
......@@ -14,7 +14,6 @@
#include "mojo/system/core.h"
#include "mojo/system/entrypoints.h"
#include "mojo/system/message_in_transit.h"
#include "mojo/system/message_pipe.h"
#include "mojo/system/message_pipe_dispatcher.h"
#include "mojo/system/platform_handle_dispatcher.h"
#include "mojo/system/raw_channel.h"
......@@ -42,7 +41,7 @@ namespace {
scoped_refptr<system::Channel> MakeChannel(
system::Core* core,
ScopedPlatformHandle platform_handle,
scoped_refptr<system::MessagePipe> message_pipe) {
scoped_refptr<system::ChannelEndpoint> channel_endpoint) {
DCHECK(platform_handle.is_valid());
// Create and initialize a |system::Channel|.
......@@ -58,9 +57,9 @@ scoped_refptr<system::Channel> MakeChannel(
// Once |Init()| has succeeded, we have to return |channel| (since
// |Shutdown()| will have to be called on it).
// Attach the message pipe endpoint.
system::MessageInTransit::EndpointId endpoint_id = channel->AttachEndpoint(
make_scoped_refptr(new system::ChannelEndpoint(message_pipe.get(), 1)));
// Attach the endpoint.
system::MessageInTransit::EndpointId endpoint_id =
channel->AttachEndpoint(channel_endpoint);
if (endpoint_id == system::MessageInTransit::kInvalidEndpointId) {
// This means that, e.g., the other endpoint of the message pipe was closed
// first. But it's not necessarily an error per se.
......@@ -83,11 +82,11 @@ void CreateChannelHelper(
system::Core* core,
ScopedPlatformHandle platform_handle,
scoped_ptr<ChannelInfo> channel_info,
scoped_refptr<system::MessagePipe> message_pipe,
scoped_refptr<system::ChannelEndpoint> channel_endpoint,
DidCreateChannelCallback callback,
scoped_refptr<base::TaskRunner> callback_thread_task_runner) {
channel_info->channel =
MakeChannel(core, platform_handle.Pass(), message_pipe);
MakeChannel(core, platform_handle.Pass(), channel_endpoint);
// Hand the channel back to the embedder.
if (callback_thread_task_runner.get()) {
......@@ -111,18 +110,18 @@ ScopedMessagePipeHandle CreateChannelOnIOThread(
DCHECK(platform_handle.is_valid());
DCHECK(channel_info);
std::pair<scoped_refptr<system::MessagePipeDispatcher>,
scoped_refptr<system::MessagePipe> > remote_message_pipe =
system::MessagePipeDispatcher::CreateRemoteMessagePipe();
scoped_refptr<system::ChannelEndpoint> channel_endpoint;
scoped_refptr<system::MessagePipeDispatcher> dispatcher =
system::MessagePipeDispatcher::CreateRemoteMessagePipe(&channel_endpoint);
system::Core* core = system::entrypoints::GetCore();
DCHECK(core);
ScopedMessagePipeHandle rv(
MessagePipeHandle(core->AddDispatcher(remote_message_pipe.first)));
MessagePipeHandle(core->AddDispatcher(dispatcher)));
*channel_info = new ChannelInfo();
(*channel_info)->channel =
MakeChannel(core, platform_handle.Pass(), remote_message_pipe.second);
MakeChannel(core, platform_handle.Pass(), channel_endpoint);
return rv.Pass();
}
......@@ -134,14 +133,14 @@ ScopedMessagePipeHandle CreateChannel(
scoped_refptr<base::TaskRunner> callback_thread_task_runner) {
DCHECK(platform_handle.is_valid());
std::pair<scoped_refptr<system::MessagePipeDispatcher>,
scoped_refptr<system::MessagePipe> > remote_message_pipe =
system::MessagePipeDispatcher::CreateRemoteMessagePipe();
scoped_refptr<system::ChannelEndpoint> channel_endpoint;
scoped_refptr<system::MessagePipeDispatcher> dispatcher =
system::MessagePipeDispatcher::CreateRemoteMessagePipe(&channel_endpoint);
system::Core* core = system::entrypoints::GetCore();
DCHECK(core);
ScopedMessagePipeHandle rv(
MessagePipeHandle(core->AddDispatcher(remote_message_pipe.first)));
MessagePipeHandle(core->AddDispatcher(dispatcher)));
scoped_ptr<ChannelInfo> channel_info(new ChannelInfo());
channel_info->io_thread_task_runner = io_thread_task_runner;
......@@ -152,7 +151,7 @@ ScopedMessagePipeHandle CreateChannel(
base::Unretained(core),
base::Passed(&platform_handle),
base::Passed(&channel_info),
remote_message_pipe.second,
channel_endpoint,
callback,
callback_thread_task_runner));
} else {
......
......@@ -194,10 +194,12 @@ TEST_F(ChannelTest, CloseBeforeRun) {
base::Bind(&ChannelTest::InitChannelOnIOThread, base::Unretained(this)));
EXPECT_EQ(TRISTATE_TRUE, init_result());
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
scoped_refptr<ChannelEndpoint> channel_endpoint;
scoped_refptr<MessagePipe> mp(
MessagePipe::CreateLocalProxy(&channel_endpoint));
MessageInTransit::EndpointId local_id = channel()->AttachEndpoint(
make_scoped_refptr(new ChannelEndpoint(mp.get(), 1)));
MessageInTransit::EndpointId local_id =
channel()->AttachEndpoint(channel_endpoint);
EXPECT_EQ(Channel::kBootstrapEndpointId, local_id);
mp->Close(0);
......@@ -232,10 +234,12 @@ TEST_F(ChannelTest, ShutdownAfterAttach) {
base::Bind(&ChannelTest::InitChannelOnIOThread, base::Unretained(this)));
EXPECT_EQ(TRISTATE_TRUE, init_result());
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
scoped_refptr<ChannelEndpoint> channel_endpoint;
scoped_refptr<MessagePipe> mp(
MessagePipe::CreateLocalProxy(&channel_endpoint));
MessageInTransit::EndpointId local_id = channel()->AttachEndpoint(
make_scoped_refptr(new ChannelEndpoint(mp.get(), 1)));
MessageInTransit::EndpointId local_id =
channel()->AttachEndpoint(channel_endpoint);
EXPECT_EQ(Channel::kBootstrapEndpointId, local_id);
// TODO(vtl): Currently, we always "expect" a |RunMessagePipeEndpoint()| after
......@@ -282,10 +286,12 @@ TEST_F(ChannelTest, WaitAfterAttachRunAndShutdown) {
base::Bind(&ChannelTest::InitChannelOnIOThread, base::Unretained(this)));
EXPECT_EQ(TRISTATE_TRUE, init_result());
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
scoped_refptr<ChannelEndpoint> channel_endpoint;
scoped_refptr<MessagePipe> mp(
MessagePipe::CreateLocalProxy(&channel_endpoint));
MessageInTransit::EndpointId local_id = channel()->AttachEndpoint(
make_scoped_refptr(new ChannelEndpoint(mp.get(), 1)));
MessageInTransit::EndpointId local_id =
channel()->AttachEndpoint(channel_endpoint);
EXPECT_EQ(Channel::kBootstrapEndpointId, local_id);
EXPECT_TRUE(channel()->RunMessagePipeEndpoint(local_id,
......
......@@ -15,31 +15,36 @@
namespace mojo {
namespace system {
MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
scoped_ptr<MessagePipeEndpoint> endpoint1) {
endpoints_[0].reset(endpoint0.release());
endpoints_[1].reset(endpoint1.release());
}
// static
MessagePipe* MessagePipe::CreateLocalLocal() {
return new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint),
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint));
MessagePipe* message_pipe = new MessagePipe();
message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
return message_pipe;
}
// static
MessagePipe* MessagePipe::CreateLocalProxy() {
return new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint),
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint));
MessagePipe* MessagePipe::CreateLocalProxy(
scoped_refptr<ChannelEndpoint>* channel_endpoint) {
DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
MessagePipe* message_pipe = new MessagePipe();
message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
*channel_endpoint = new ChannelEndpoint(message_pipe, 1);
message_pipe->endpoints_[1].reset(
new ProxyMessagePipeEndpoint(channel_endpoint->get()));
return message_pipe;
}
// static
MessagePipe* MessagePipe::CreateProxyLocal() {
return new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint),
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint));
MessagePipe* MessagePipe::CreateProxyLocal(
scoped_refptr<ChannelEndpoint>* channel_endpoint) {
DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
MessagePipe* message_pipe = new MessagePipe();
*channel_endpoint = new ChannelEndpoint(message_pipe, 0);
message_pipe->endpoints_[0].reset(
new ProxyMessagePipeEndpoint(channel_endpoint->get()));
message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
return message_pipe;
}
// static
......@@ -165,13 +170,16 @@ scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
<< "Direct message pipe passing across multiple channels not yet "
"implemented; will proxy";
scoped_refptr<ChannelEndpoint> channel_endpoint(
new ChannelEndpoint(this, port));
scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
new ProxyMessagePipeEndpoint(
channel_endpoint.get(),
static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
is_peer_open));
endpoints_[port].swap(replacement_endpoint);
return make_scoped_refptr(new ChannelEndpoint(this, port));
return channel_endpoint;
}
MojoResult MessagePipe::EnqueueMessage(unsigned port,
......@@ -188,7 +196,6 @@ bool MessagePipe::Attach(unsigned port, ChannelEndpoint* channel_endpoint) {
return false;
DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
endpoints_[port]->Attach(channel_endpoint);
return true;
}
......@@ -217,6 +224,9 @@ void MessagePipe::OnRemove(unsigned port) {
endpoints_[port].reset();
}
MessagePipe::MessagePipe() {
}
MessagePipe::~MessagePipe() {
// Owned by the dispatchers. The owning dispatchers should only release us via
// their |Close()| method, which should inform us of being closed via our
......
......@@ -34,22 +34,23 @@ class Waiter;
class MOJO_SYSTEM_IMPL_EXPORT MessagePipe
: public base::RefCountedThreadSafe<MessagePipe> {
public:
MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
scoped_ptr<MessagePipeEndpoint> endpoint1);
// Creates a |MessagePipe| with two new |LocalMessagePipeEndpoint|s.
static MessagePipe* CreateLocalLocal();
// Creates a |MessagePipe| with a |LocalMessagePipeEndpoint| on port 0 and a
// |ProxyMessagePipeEndpoint| on port 1.
static MessagePipe* CreateLocalProxy();
// |ProxyMessagePipeEndpoint| on port 1. |*channel_endpoint| is set to the
// (newly-created) |ChannelEndpoint| for the latter.
static MessagePipe* CreateLocalProxy(
scoped_refptr<ChannelEndpoint>* channel_endpoint);
// Creates a |MessagePipe| with a |ProxyMessagePipeEndpoint| on port 0 and a
// |LocalMessagePipeEndpoint| on port 1.
// |LocalMessagePipeEndpoint| on port 1. |*channel_endpoint| is set to the
// (newly-created) |ChannelEndpoint| for the former.
// Note: This is really only needed in tests (outside of tests, this
// configuration arises from a local message pipe having its port 0
// "converted" using |ConvertLocalToProxy()|).
static MessagePipe* CreateProxyLocal();
static MessagePipe* CreateProxyLocal(
scoped_refptr<ChannelEndpoint>* channel_endpoint);
// Gets the other port number (i.e., 0 -> 1, 1 -> 0).
static unsigned GetPeerPort(unsigned port);
......@@ -95,11 +96,14 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe
scoped_ptr<MessageInTransit> message);
// These are used by |Channel|.
// TODO(vtl): Remove |Attach()|.
bool Attach(unsigned port, ChannelEndpoint* channel_endpoint);
void Run(unsigned port);
void OnRemove(unsigned port);
private:
MessagePipe();
friend class base::RefCountedThreadSafe<MessagePipe>;
virtual ~MessagePipe();
......
......@@ -83,14 +83,15 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const {
}
// static
std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
MessagePipeDispatcher::CreateRemoteMessagePipe() {
scoped_refptr<MessagePipe> message_pipe(MessagePipe::CreateLocalProxy());
scoped_refptr<MessagePipeDispatcher>
MessagePipeDispatcher::CreateRemoteMessagePipe(
scoped_refptr<ChannelEndpoint>* channel_endpoint) {
scoped_refptr<MessagePipe> message_pipe(
MessagePipe::CreateLocalProxy(channel_endpoint));
scoped_refptr<MessagePipeDispatcher> dispatcher(
new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
dispatcher->Init(message_pipe, 0);
return std::make_pair(dispatcher, message_pipe);
return dispatcher;
}
// static
......@@ -103,8 +104,9 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
return scoped_refptr<MessagePipeDispatcher>();
}
std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
remote_message_pipe = CreateRemoteMessagePipe();
scoped_refptr<ChannelEndpoint> channel_endpoint;
scoped_refptr<MessagePipeDispatcher> dispatcher =
CreateRemoteMessagePipe(&channel_endpoint);
MessageInTransit::EndpointId remote_id =
static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
......@@ -117,8 +119,7 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
return scoped_refptr<MessagePipeDispatcher>();
}
MessageInTransit::EndpointId local_id =
channel->AttachEndpoint(make_scoped_refptr(
new ChannelEndpoint(remote_message_pipe.second.get(), 1)));
channel->AttachEndpoint(channel_endpoint);
if (local_id == MessageInTransit::kInvalidEndpointId) {
LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
"attach; remote ID = " << remote_id << ")";
......@@ -135,7 +136,7 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
// TODO(vtl): FIXME -- Need some error handling here.
channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
return remote_message_pipe.first;
return dispatcher;
}
MessagePipeDispatcher::~MessagePipeDispatcher() {
......
......@@ -5,8 +5,6 @@
#ifndef MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
#define MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
#include <utility>
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "mojo/system/dispatcher.h"
......@@ -16,6 +14,7 @@
namespace mojo {
namespace system {
class ChannelEndpoint;
class MessagePipe;
class MessagePipeDispatcherTransport;
......@@ -51,9 +50,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeDispatcher : public Dispatcher {
// the message pipe, port 0).
// TODO(vtl): This currently uses |kDefaultCreateOptions|, which is okay since
// there aren't any options, but eventually options should be plumbed through.
static std::pair<scoped_refptr<MessagePipeDispatcher>,
scoped_refptr<MessagePipe> >
CreateRemoteMessagePipe();
static scoped_refptr<MessagePipeDispatcher> CreateRemoteMessagePipe(
scoped_refptr<ChannelEndpoint>* channel_endpoint);
// The "opposite" of |SerializeAndClose()|. (Typically this is called by
// |Dispatcher::Deserialize()|.)
......
......@@ -110,10 +110,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(PingPongClient) {
embedder::ScopedPlatformHandle client_platform_handle =
mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
scoped_refptr<MessagePipe> mp(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
channel_thread.Start(client_platform_handle.Pass(), mp);
scoped_refptr<ChannelEndpoint> ep;
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
channel_thread.Start(client_platform_handle.Pass(), ep);
std::string buffer(1000000, '\0');
int rv = 0;
......@@ -158,10 +157,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(PingPongClient) {
TEST_F(MultiprocessMessagePipePerfTest, PingPong) {
helper()->StartChild("PingPongClient");
scoped_refptr<MessagePipe> mp(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
Init(mp);
scoped_refptr<ChannelEndpoint> ep;
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
Init(ep);
// This values are set to align with one at ipc_pertests.cc for comparison.
const size_t kMsgSize[5] = {12, 144, 1728, 20736, 248832};
......
......@@ -6,7 +6,9 @@
#include "base/bind.h"
#include "base/threading/platform_thread.h" // For |Sleep()|.
#include "mojo/system/channel.h"
#include "mojo/system/channel_endpoint.h"
#include "mojo/system/message_pipe.h"
#include "mojo/system/waiter.h"
namespace mojo {
......@@ -40,14 +42,14 @@ ChannelThread::~ChannelThread() {
}
void ChannelThread::Start(embedder::ScopedPlatformHandle platform_handle,
scoped_refptr<MessagePipe> message_pipe) {
scoped_refptr<ChannelEndpoint> channel_endpoint) {
test_io_thread_.Start();
test_io_thread_.PostTaskAndWait(
FROM_HERE,
base::Bind(&ChannelThread::InitChannelOnIOThread,
base::Unretained(this),
base::Passed(&platform_handle),
message_pipe));
channel_endpoint));
}
void ChannelThread::Stop() {
......@@ -68,7 +70,7 @@ void ChannelThread::Stop() {
void ChannelThread::InitChannelOnIOThread(
embedder::ScopedPlatformHandle platform_handle,
scoped_refptr<MessagePipe> message_pipe) {
scoped_refptr<ChannelEndpoint> channel_endpoint) {
CHECK_EQ(base::MessageLoop::current(), test_io_thread_.message_loop());
CHECK(platform_handle.is_valid());
......@@ -83,8 +85,7 @@ void ChannelThread::InitChannelOnIOThread(
// receive/process messages (which it can do as soon as it's hooked up to
// the IO thread message loop, and that message loop runs) before the
// message pipe endpoint is attached.
CHECK_EQ(channel_->AttachEndpoint(
make_scoped_refptr(new ChannelEndpoint(message_pipe.get(), 1))),
CHECK_EQ(channel_->AttachEndpoint(channel_endpoint),
Channel::kBootstrapEndpointId);
CHECK(channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId,
Channel::kBootstrapEndpointId));
......@@ -104,8 +105,8 @@ MultiprocessMessagePipeTestBase::MultiprocessMessagePipeTestBase()
MultiprocessMessagePipeTestBase::~MultiprocessMessagePipeTestBase() {
}
void MultiprocessMessagePipeTestBase::Init(scoped_refptr<MessagePipe> mp) {
channel_thread_.Start(helper_.server_platform_handle.Pass(), mp);
void MultiprocessMessagePipeTestBase::Init(scoped_refptr<ChannelEndpoint> ep) {
channel_thread_.Start(helper_.server_platform_handle.Pass(), ep);
}
#endif
......
......@@ -9,11 +9,15 @@
#include "mojo/common/test/multiprocess_test_helper.h"
#include "mojo/embedder/simple_platform_support.h"
#include "mojo/system/channel.h"
#include "mojo/system/message_pipe.h"
#include "mojo/system/test_utils.h"
namespace mojo {
namespace system {
class Channel;
class ChannelEndpoint;
class MessagePipe;
namespace test {
MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp,
......@@ -26,12 +30,12 @@ class ChannelThread {
~ChannelThread();
void Start(embedder::ScopedPlatformHandle platform_handle,
scoped_refptr<MessagePipe> message_pipe);
scoped_refptr<ChannelEndpoint> channel_endpoint);
void Stop();
private:
void InitChannelOnIOThread(embedder::ScopedPlatformHandle platform_handle,
scoped_refptr<MessagePipe> message_pipe);
scoped_refptr<ChannelEndpoint> channel_endpoint);
void ShutdownChannelOnIOThread();
embedder::PlatformSupport* const platform_support_;
......@@ -48,7 +52,7 @@ class MultiprocessMessagePipeTestBase : public testing::Test {
virtual ~MultiprocessMessagePipeTestBase();
protected:
void Init(scoped_refptr<MessagePipe> mp);
void Init(scoped_refptr<ChannelEndpoint> ep);
embedder::PlatformSupport* platform_support() { return &platform_support_; }
mojo::test::MultiprocessTestHelper* helper() { return &helper_; }
......
......@@ -48,8 +48,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
embedder::ScopedPlatformHandle client_platform_handle =
mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
channel_thread.Start(client_platform_handle.Pass(), mp);
scoped_refptr<ChannelEndpoint> ep;
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
channel_thread.Start(client_platform_handle.Pass(), ep);
const std::string quitquitquit("quitquitquit");
int rv = 0;
......@@ -103,8 +104,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
TEST_F(MultiprocessMessagePipeTest, Basic) {
helper()->StartChild("EchoEcho");
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
Init(mp);
scoped_refptr<ChannelEndpoint> ep;
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
Init(ep);
std::string hello("hello");
EXPECT_EQ(MOJO_RESULT_OK,
......@@ -147,8 +149,9 @@ TEST_F(MultiprocessMessagePipeTest, Basic) {
TEST_F(MultiprocessMessagePipeTest, QueueMessages) {
helper()->StartChild("EchoEcho");
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
Init(mp);
scoped_refptr<ChannelEndpoint> ep;
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
Init(ep);
static const size_t kNumMessages = 1001;
for (size_t i = 0; i < kNumMessages; i++) {
......@@ -213,8 +216,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer) {
embedder::ScopedPlatformHandle client_platform_handle =
mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
channel_thread.Start(client_platform_handle.Pass(), mp);
scoped_refptr<ChannelEndpoint> ep;
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
channel_thread.Start(client_platform_handle.Pass(), ep);
// Wait for the first message from our parent.
HandleSignalsState hss;
......@@ -312,8 +316,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer) {
TEST_F(MultiprocessMessagePipeTest, MAYBE_SharedBufferPassing) {
helper()->StartChild("CheckSharedBuffer");
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
Init(mp);
scoped_refptr<ChannelEndpoint> ep;
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
Init(ep);
// Make a shared buffer.
scoped_refptr<SharedBufferDispatcher> dispatcher;
......@@ -407,8 +412,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckPlatformHandleFile) {
embedder::ScopedPlatformHandle client_platform_handle =
mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
channel_thread.Start(client_platform_handle.Pass(), mp);
scoped_refptr<ChannelEndpoint> ep;
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
channel_thread.Start(client_platform_handle.Pass(), ep);
HandleSignalsState hss;
CHECK_EQ(test::WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE, &hss),
......@@ -465,8 +471,9 @@ TEST_F(MultiprocessMessagePipeTest, MAYBE_PlatformHandlePassing) {
helper()->StartChild("CheckPlatformHandleFile");
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
Init(mp);
scoped_refptr<ChannelEndpoint> ep;
scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
Init(ep);
base::FilePath unused;
base::ScopedFILE fp(
......
......@@ -14,14 +14,20 @@
namespace mojo {
namespace system {
ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
: is_running_(false), is_peer_open_(true) {
ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
ChannelEndpoint* channel_endpoint)
: channel_endpoint_(channel_endpoint),
is_running_(false),
is_peer_open_(true) {
}
ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
ChannelEndpoint* channel_endpoint,
LocalMessagePipeEndpoint* local_message_pipe_endpoint,
bool is_peer_open)
: is_running_(false), is_peer_open_(is_peer_open) {
: channel_endpoint_(channel_endpoint),
is_running_(false),
is_peer_open_(is_peer_open) {
paused_message_queue_.Swap(local_message_pipe_endpoint->message_queue());
local_message_pipe_endpoint->Close();
}
......@@ -72,12 +78,6 @@ void ProxyMessagePipeEndpoint::EnqueueMessage(
}
}
void ProxyMessagePipeEndpoint::Attach(ChannelEndpoint* channel_endpoint) {
DCHECK(channel_endpoint);
DCHECK(!is_attached());
channel_endpoint_ = channel_endpoint;
}
bool ProxyMessagePipeEndpoint::Run() {
// Assertions about current state:
DCHECK(is_attached());
......
......@@ -37,12 +37,13 @@ class MessagePipe;
class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
: public MessagePipeEndpoint {
public:
ProxyMessagePipeEndpoint();
explicit ProxyMessagePipeEndpoint(ChannelEndpoint* channel_endpoint);
// Constructs a |ProxyMessagePipeEndpoint| that replaces the given
// |LocalMessagePipeEndpoint| (which this constructor will close), taking its
// message queue's contents. This is done when transferring a message pipe
// handle over a remote message pipe.
ProxyMessagePipeEndpoint(
ChannelEndpoint* channel_endpoint,
LocalMessagePipeEndpoint* local_message_pipe_endpoint,
bool is_peer_open);
virtual ~ProxyMessagePipeEndpoint();
......@@ -51,7 +52,6 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
virtual Type GetType() const OVERRIDE;
virtual bool OnPeerClose() OVERRIDE;
virtual void EnqueueMessage(scoped_ptr<MessageInTransit> message) OVERRIDE;
virtual void Attach(ChannelEndpoint* channel_endpoint) OVERRIDE;
virtual bool Run() OVERRIDE;
virtual void OnRemove() OVERRIDE;
......
......@@ -60,31 +60,29 @@ class RemoteMessagePipeTest : public testing::Test {
}
protected:
// This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
// port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
// 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
scoped_refptr<MessagePipe> mp1) {
// This connects the two given |ChannelEndpoint|s.
void ConnectChannelEndpoints(scoped_refptr<ChannelEndpoint> ep0,
scoped_refptr<ChannelEndpoint> ep1) {
io_thread_.PostTaskAndWait(
FROM_HERE,
base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
base::Bind(&RemoteMessagePipeTest::ConnectChannelEndpointsOnIOThread,
base::Unretained(this),
mp0,
mp1));
ep0,
ep1));
}
// This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
// It assumes/requires that this is the bootstrap case, i.e., that the
// endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
// returns *without* waiting for it to finish connecting.
void BootstrapMessagePipeNoWait(unsigned channel_index,
scoped_refptr<MessagePipe> mp) {
// This bootstraps |ep| on |channels_[channel_index]|. It assumes/requires
// that this is the bootstrap case, i.e., that the endpoint IDs are both/will
// both be |Channel::kBootstrapEndpointId|. This returns *without* waiting for
// it to finish connecting.
void BootstrapChannelEndpointNoWait(unsigned channel_index,
scoped_refptr<ChannelEndpoint> ep) {
io_thread_.PostTask(
FROM_HERE,
base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
base::Bind(&RemoteMessagePipeTest::BootstrapChannelEndpointOnIOThread,
base::Unretained(this),
channel_index,
mp));
ep));
}
void RestoreInitialState() {
......@@ -129,8 +127,8 @@ class RemoteMessagePipeTest : public testing::Test {
RawChannel::Create(platform_handles_[channel_index].Pass())));
}
void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
scoped_refptr<MessagePipe> mp1) {
void ConnectChannelEndpointsOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
scoped_refptr<ChannelEndpoint> ep1) {
CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
if (!channels_[0].get())
......@@ -138,26 +136,21 @@ class RemoteMessagePipeTest : public testing::Test {
if (!channels_[1].get())
CreateAndInitChannel(1);
MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(
make_scoped_refptr(new ChannelEndpoint(mp0.get(), 1)));
MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(
make_scoped_refptr(new ChannelEndpoint(mp1.get(), 0)));
MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(ep0);
MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(ep1);
CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
}
void BootstrapMessagePipeOnIOThread(unsigned channel_index,
scoped_refptr<MessagePipe> mp) {
void BootstrapChannelEndpointOnIOThread(unsigned channel_index,
scoped_refptr<ChannelEndpoint> ep) {
CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
CHECK(channel_index == 0 || channel_index == 1);
unsigned port = channel_index ^ 1u;
CreateAndInitChannel(channel_index);
MessageInTransit::EndpointId endpoint_id =
channels_[channel_index]->AttachEndpoint(
make_scoped_refptr(new ChannelEndpoint(mp.get(), port)));
channels_[channel_index]->AttachEndpoint(ep);
if (endpoint_id == MessageInTransit::kInvalidEndpointId)
return;
......@@ -194,9 +187,11 @@ TEST_F(RemoteMessagePipeTest, Basic) {
// connected to MP 1, port 0, which will be attached to channel 1. This leaves
// MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
ConnectMessagePipes(mp0, mp1);
scoped_refptr<ChannelEndpoint> ep0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
scoped_refptr<ChannelEndpoint> ep1;
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
ConnectChannelEndpoints(ep0, ep1);
// Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
......@@ -303,15 +298,19 @@ TEST_F(RemoteMessagePipeTest, Multiplex) {
// Connect message pipes as in the |Basic| test.
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
ConnectMessagePipes(mp0, mp1);
scoped_refptr<ChannelEndpoint> ep0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
scoped_refptr<ChannelEndpoint> ep1;
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
ConnectChannelEndpoints(ep0, ep1);
// Now put another message pipe on the channel.
scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy());
scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal());
ConnectMessagePipes(mp2, mp3);
scoped_refptr<ChannelEndpoint> ep2;
scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy(&ep2));
scoped_refptr<ChannelEndpoint> ep3;
scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal(&ep3));
ConnectChannelEndpoints(ep2, ep3);
// Write: MP 2, port 0 -> MP 3, port 1.
......@@ -450,7 +449,8 @@ TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
// connected to MP 1, port 0, which will be attached to channel 1. This leaves
// MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
scoped_refptr<ChannelEndpoint> ep0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
// Write to MP 0, port 0.
EXPECT_EQ(MOJO_RESULT_OK,
......@@ -460,12 +460,13 @@ TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
NULL,
MOJO_WRITE_MESSAGE_FLAG_NONE));
BootstrapMessagePipeNoWait(0, mp0);
BootstrapChannelEndpointNoWait(0, ep0);
// Close MP 0, port 0 before channel 1 is even connected.
mp0->Close(0);
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
scoped_refptr<ChannelEndpoint> ep1;
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
// Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
......@@ -473,7 +474,7 @@ TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
ASSERT_EQ(MOJO_RESULT_OK,
mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
BootstrapMessagePipeNoWait(1, mp1);
BootstrapChannelEndpointNoWait(1, ep1);
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
......@@ -508,9 +509,11 @@ TEST_F(RemoteMessagePipeTest, HandlePassing) {
HandleSignalsState hss;
uint32_t context = 0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
ConnectMessagePipes(mp0, mp1);
scoped_refptr<ChannelEndpoint> ep0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
scoped_refptr<ChannelEndpoint> ep1;
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
ConnectChannelEndpoints(ep0, ep1);
// We'll try to pass this dispatcher.
scoped_refptr<MessagePipeDispatcher> dispatcher(
......@@ -676,9 +679,11 @@ TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
HandleSignalsState hss;
uint32_t context = 0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
ConnectMessagePipes(mp0, mp1);
scoped_refptr<ChannelEndpoint> ep0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
scoped_refptr<ChannelEndpoint> ep1;
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
ConnectChannelEndpoints(ep0, ep1);
// We'll try to pass this dispatcher.
scoped_refptr<SharedBufferDispatcher> dispatcher;
......@@ -810,9 +815,11 @@ TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
uint32_t context = 0;
HandleSignalsState hss;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
ConnectMessagePipes(mp0, mp1);
scoped_refptr<ChannelEndpoint> ep0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
scoped_refptr<ChannelEndpoint> ep1;
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
ConnectChannelEndpoints(ep0, ep1);
base::FilePath unused;
base::ScopedFILE fp(
......@@ -913,11 +920,13 @@ TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
for (unsigned i = 0; i < 256; i++) {
DVLOG(2) << "---------------------------------------- " << i;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
BootstrapMessagePipeNoWait(0, mp0);
scoped_refptr<ChannelEndpoint> ep0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
BootstrapChannelEndpointNoWait(0, ep0);
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
BootstrapMessagePipeNoWait(1, mp1);
scoped_refptr<ChannelEndpoint> ep1;
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
BootstrapChannelEndpointNoWait(1, ep1);
if (i & 1u) {
io_thread()->task_runner()->PostTask(
......@@ -951,9 +960,11 @@ TEST_F(RemoteMessagePipeTest, PassMessagePipeHandleAcrossAndBack) {
HandleSignalsState hss;
uint32_t context = 0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
ConnectMessagePipes(mp0, mp1);
scoped_refptr<ChannelEndpoint> ep0;
scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
scoped_refptr<ChannelEndpoint> ep1;
scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
ConnectChannelEndpoints(ep0, ep1);
// We'll try to pass this dispatcher.
scoped_refptr<MessagePipeDispatcher> dispatcher(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment