Commit b1559135 authored by viettrungluu's avatar viettrungluu Committed by Commit bot

Mojo: Give ChannelEndpoint the remote ID and ProxyMessagePipeEndpoint the ChannelEndpoint.

Witht this, I can begin removing knowledge of the Channel, etc. from the
ProxyMessagePipeEndpoint. (Eventually, ProxyMessagePipeEndpoint will
really just look like a stupid proxy, and relative sanity will prevail.)

R=darin@chromium.org

Review URL: https://codereview.chromium.org/577313002

Cr-Commit-Position: refs/heads/master@{#295497}
parent e42f2a59
...@@ -113,12 +113,13 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( ...@@ -113,12 +113,13 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
local_id = next_local_id_; local_id = next_local_id_;
next_local_id_++; next_local_id_++;
endpoint = new ChannelEndpoint(message_pipe.get(), port, this, local_id); endpoint = new ChannelEndpoint(message_pipe.get(), port);
local_id_to_endpoint_map_[local_id] = endpoint; local_id_to_endpoint_map_[local_id] = endpoint;
} }
endpoint->AttachToChannel(this, local_id);
// This might fail if that port got an |OnPeerClose()| before attaching. // This might fail if that port got an |OnPeerClose()| before attaching.
if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id)) if (message_pipe->Attach(port, endpoint.get(), this, local_id))
return local_id; return local_id;
// Note: If it failed, quite possibly the endpoint info was removed from that // Note: If it failed, quite possibly the endpoint info was removed from that
...@@ -147,6 +148,7 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( ...@@ -147,6 +148,7 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) { MessageInTransit::EndpointId remote_id) {
scoped_refptr<ChannelEndpoint> endpoint;
ChannelEndpoint::State state; ChannelEndpoint::State state;
scoped_refptr<MessagePipe> message_pipe; scoped_refptr<MessagePipe> message_pipe;
unsigned port; unsigned port;
...@@ -160,6 +162,7 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, ...@@ -160,6 +162,7 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
local_id_to_endpoint_map_.find(local_id); local_id_to_endpoint_map_.find(local_id);
if (it == local_id_to_endpoint_map_.end()) if (it == local_id_to_endpoint_map_.end())
return false; return false;
endpoint = it->second;
state = it->second->state_; state = it->second->state_;
message_pipe = it->second->message_pipe_; message_pipe = it->second->message_pipe_;
port = it->second->port_; port = it->second->port_;
...@@ -175,6 +178,8 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, ...@@ -175,6 +178,8 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
// TODO(vtl): FIXME -- We need to handle the case that message pipe is already // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
// running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
endpoint->Run(remote_id);
// TODO(vtl): Get rid of this.
message_pipe->Run(port, remote_id); message_pipe->Run(port, remote_id);
return true; return true;
} }
......
...@@ -11,29 +11,53 @@ ...@@ -11,29 +11,53 @@
namespace mojo { namespace mojo {
namespace system { namespace system {
ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, unsigned port)
unsigned port,
Channel* channel,
MessageInTransit::EndpointId local_id)
: state_(STATE_NORMAL), : state_(STATE_NORMAL),
message_pipe_(message_pipe), message_pipe_(message_pipe),
port_(port), port_(port),
channel_(channel), channel_(),
local_id_(local_id) { local_id_(MessageInTransit::kInvalidEndpointId),
remote_id_(MessageInTransit::kInvalidEndpointId) {
DCHECK(message_pipe_.get()); DCHECK(message_pipe_.get());
DCHECK(port_ == 0 || port_ == 1); DCHECK(port_ == 0 || port_ == 1);
}
void ChannelEndpoint::AttachToChannel(Channel* channel,
MessageInTransit::EndpointId local_id) {
DCHECK(channel);
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
base::AutoLock locker(lock_);
DCHECK(!channel_);
DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
channel_ = channel;
local_id_ = local_id;
}
void ChannelEndpoint::Run(MessageInTransit::EndpointId remote_id) {
DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
base::AutoLock locker(lock_);
DCHECK(channel_); DCHECK(channel_);
DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
remote_id_ = remote_id;
} }
void ChannelEndpoint::DetachFromChannel() { void ChannelEndpoint::DetachFromChannel() {
base::AutoLock locker(lock_); base::AutoLock locker(lock_);
DCHECK(channel_); DCHECK(channel_);
DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
// TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid
// here as well.
channel_ = NULL; channel_ = NULL;
local_id_ = MessageInTransit::kInvalidEndpointId;
remote_id_ = MessageInTransit::kInvalidEndpointId;
} }
ChannelEndpoint::~ChannelEndpoint() { ChannelEndpoint::~ChannelEndpoint() {
DCHECK(!channel_); DCHECK(!channel_);
DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
} }
} // namespace system } // namespace system
......
...@@ -114,10 +114,10 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint ...@@ -114,10 +114,10 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint
// |Channel::AttachMessagePipeEndpoint()| to a |Channel::AttachEndpoint()| // |Channel::AttachMessagePipeEndpoint()| to a |Channel::AttachEndpoint()|
// that takes a |ChannelEndpoint|, and move |ChannelEndpoint| creation out of // that takes a |ChannelEndpoint|, and move |ChannelEndpoint| creation out of
// |Channel|.) // |Channel|.)
ChannelEndpoint(MessagePipe* message_pipe, ChannelEndpoint(MessagePipe* message_pipe, unsigned port);
unsigned port,
Channel* channel, void AttachToChannel(Channel* channel, MessageInTransit::EndpointId local_id);
MessageInTransit::EndpointId local_id); void Run(MessageInTransit::EndpointId remote_id);
// Called by |Channel| before it gives up its reference to this object. // Called by |Channel| before it gives up its reference to this object.
void DetachFromChannel(); void DetachFromChannel();
...@@ -142,6 +142,7 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint ...@@ -142,6 +142,7 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint
~ChannelEndpoint(); ~ChannelEndpoint();
State state_; State state_;
// TODO(vtl): When moved under lock, this can/should be made a raw pointer.
scoped_refptr<MessagePipe> message_pipe_; scoped_refptr<MessagePipe> message_pipe_;
unsigned port_; unsigned port_;
...@@ -149,12 +150,11 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint ...@@ -149,12 +150,11 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint
// Protects the members below. // Protects the members below.
base::Lock lock_; base::Lock lock_;
// |channel_| must be alive whenever this is nonnull. Before the |channel_| // |channel_| must be alive whenever this is non-null. Before the |channel_|
// gives up its reference to this object, it will call |DetachFromChannel()|. // gives up its reference to this object, it will call |DetachFromChannel()|.
Channel* channel_; Channel* channel_;
MessageInTransit::EndpointId local_id_; MessageInTransit::EndpointId local_id_;
// TODO(vtl): MessageInTransit::EndpointId remote_id_;
// MessageInTransit::EndpointId remote_id_;
DISALLOW_COPY_AND_ASSIGN(ChannelEndpoint); DISALLOW_COPY_AND_ASSIGN(ChannelEndpoint);
}; };
......
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
#include "mojo/system/message_pipe.h" #include "mojo/system/message_pipe.h"
#include "base/logging.h" #include "base/logging.h"
#include "mojo/system/channel.h"
#include "mojo/system/local_message_pipe_endpoint.h" #include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_in_transit.h" #include "mojo/system/message_in_transit.h"
#include "mojo/system/message_pipe_dispatcher.h" #include "mojo/system/message_pipe_dispatcher.h"
...@@ -178,10 +177,12 @@ MojoResult MessagePipe::EnqueueMessage(unsigned port, ...@@ -178,10 +177,12 @@ MojoResult MessagePipe::EnqueueMessage(unsigned port,
} }
bool MessagePipe::Attach(unsigned port, bool MessagePipe::Attach(unsigned port,
scoped_refptr<Channel> channel, ChannelEndpoint* channel_endpoint,
Channel* channel,
MessageInTransit::EndpointId local_id) { MessageInTransit::EndpointId local_id) {
DCHECK(port == 0 || port == 1); DCHECK(port == 0 || port == 1);
DCHECK(channel.get()); DCHECK(channel_endpoint);
DCHECK(channel);
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
base::AutoLock locker(lock_); base::AutoLock locker(lock_);
...@@ -189,7 +190,7 @@ bool MessagePipe::Attach(unsigned port, ...@@ -189,7 +190,7 @@ bool MessagePipe::Attach(unsigned port,
return false; return false;
DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy); DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
endpoints_[port]->Attach(channel, local_id); endpoints_[port]->Attach(channel_endpoint, channel, local_id);
return true; return true;
} }
......
...@@ -26,6 +26,7 @@ namespace mojo { ...@@ -26,6 +26,7 @@ namespace mojo {
namespace system { namespace system {
class Channel; class Channel;
class ChannelEndpoint;
class Waiter; class Waiter;
// |MessagePipe| is the secondary object implementing a message pipe (see the // |MessagePipe| is the secondary object implementing a message pipe (see the
...@@ -96,7 +97,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe ...@@ -96,7 +97,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe
// These are used by |Channel|. // These are used by |Channel|.
bool Attach(unsigned port, bool Attach(unsigned port,
scoped_refptr<Channel> channel, ChannelEndpoint* channel_endpoint,
Channel* channel,
MessageInTransit::EndpointId local_id); MessageInTransit::EndpointId local_id);
void Run(unsigned port, MessageInTransit::EndpointId remote_id); void Run(unsigned port, MessageInTransit::EndpointId remote_id);
void OnRemove(unsigned port); void OnRemove(unsigned port);
......
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
#include "mojo/system/message_pipe_endpoint.h" #include "mojo/system/message_pipe_endpoint.h"
#include "base/logging.h" #include "base/logging.h"
#include "mojo/system/channel.h"
namespace mojo { namespace mojo {
namespace system { namespace system {
...@@ -49,7 +48,8 @@ void MessagePipeEndpoint::RemoveWaiter(Waiter* /*waiter*/, ...@@ -49,7 +48,8 @@ void MessagePipeEndpoint::RemoveWaiter(Waiter* /*waiter*/,
*signals_state = HandleSignalsState(); *signals_state = HandleSignalsState();
} }
void MessagePipeEndpoint::Attach(scoped_refptr<Channel> /*channel*/, void MessagePipeEndpoint::Attach(ChannelEndpoint* /*channel_endpoint*/,
Channel* /*channel*/,
MessageInTransit::EndpointId /*local_id*/) { MessageInTransit::EndpointId /*local_id*/) {
NOTREACHED(); NOTREACHED();
} }
......
...@@ -23,6 +23,7 @@ namespace mojo { ...@@ -23,6 +23,7 @@ namespace mojo {
namespace system { namespace system {
class Channel; class Channel;
class ChannelEndpoint;
class Waiter; class Waiter;
// This is an interface to one of the ends of a message pipe, and is used by // This is an interface to one of the ends of a message pipe, and is used by
...@@ -74,7 +75,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeEndpoint { ...@@ -74,7 +75,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeEndpoint {
// Implementations must override these if they represent a proxy endpoint. An // Implementations must override these if they represent a proxy endpoint. An
// implementation for a local endpoint needs not override these methods, since // implementation for a local endpoint needs not override these methods, since
// they should never be called. // they should never be called.
virtual void Attach(scoped_refptr<Channel> channel, virtual void Attach(ChannelEndpoint* channel_endpoint,
Channel* channel,
MessageInTransit::EndpointId local_id); MessageInTransit::EndpointId local_id);
// Returns false if the endpoint should be closed and destroyed, else true. // Returns false if the endpoint should be closed and destroyed, else true.
virtual bool Run(MessageInTransit::EndpointId remote_id); virtual bool Run(MessageInTransit::EndpointId remote_id);
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "base/logging.h" #include "base/logging.h"
#include "mojo/system/channel.h" #include "mojo/system/channel.h"
#include "mojo/system/channel_endpoint.h"
#include "mojo/system/local_message_pipe_endpoint.h" #include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_pipe_dispatcher.h" #include "mojo/system/message_pipe_dispatcher.h"
...@@ -81,14 +82,17 @@ void ProxyMessagePipeEndpoint::EnqueueMessage( ...@@ -81,14 +82,17 @@ void ProxyMessagePipeEndpoint::EnqueueMessage(
} }
} }
void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel, void ProxyMessagePipeEndpoint::Attach(ChannelEndpoint* channel_endpoint,
Channel* channel,
MessageInTransit::EndpointId local_id) { MessageInTransit::EndpointId local_id) {
DCHECK(channel.get()); DCHECK(channel_endpoint);
DCHECK(channel);
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
DCHECK(!is_attached()); DCHECK(!is_attached());
AssertConsistentState(); AssertConsistentState();
channel_endpoint_ = channel_endpoint;
channel_ = channel; channel_ = channel;
local_id_ = local_id; local_id_ = local_id;
AssertConsistentState(); AssertConsistentState();
...@@ -127,6 +131,8 @@ void ProxyMessagePipeEndpoint::Detach() { ...@@ -127,6 +131,8 @@ void ProxyMessagePipeEndpoint::Detach() {
AssertConsistentState(); AssertConsistentState();
channel_->DetachMessagePipeEndpoint(local_id_, remote_id_); channel_->DetachMessagePipeEndpoint(local_id_, remote_id_);
channel_ = NULL; channel_ = NULL;
// TODO(vtl): Inform |channel_endpoint_| that we were detached.
channel_endpoint_ = NULL;
local_id_ = MessageInTransit::kInvalidEndpointId; local_id_ = MessageInTransit::kInvalidEndpointId;
remote_id_ = MessageInTransit::kInvalidEndpointId; remote_id_ = MessageInTransit::kInvalidEndpointId;
paused_message_queue_.Clear(); paused_message_queue_.Clear();
......
...@@ -19,6 +19,7 @@ namespace mojo { ...@@ -19,6 +19,7 @@ namespace mojo {
namespace system { namespace system {
class Channel; class Channel;
class ChannelEndpoint;
class LocalMessagePipeEndpoint; class LocalMessagePipeEndpoint;
class MessagePipe; class MessagePipe;
...@@ -54,7 +55,8 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint ...@@ -54,7 +55,8 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
virtual Type GetType() const OVERRIDE; virtual Type GetType() const OVERRIDE;
virtual bool OnPeerClose() OVERRIDE; virtual bool OnPeerClose() OVERRIDE;
virtual void EnqueueMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; virtual void EnqueueMessage(scoped_ptr<MessageInTransit> message) OVERRIDE;
virtual void Attach(scoped_refptr<Channel> channel, virtual void Attach(ChannelEndpoint* channel_endpoint,
Channel* channel,
MessageInTransit::EndpointId local_id) OVERRIDE; MessageInTransit::EndpointId local_id) OVERRIDE;
virtual bool Run(MessageInTransit::EndpointId remote_id) OVERRIDE; virtual bool Run(MessageInTransit::EndpointId remote_id) OVERRIDE;
virtual void OnRemove() OVERRIDE; virtual void OnRemove() OVERRIDE;
...@@ -74,6 +76,10 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint ...@@ -74,6 +76,10 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
return remote_id_ != MessageInTransit::kInvalidEndpointId; return remote_id_ != MessageInTransit::kInvalidEndpointId;
} }
// This should only be set if we're attached.
scoped_refptr<ChannelEndpoint> channel_endpoint_;
// TODO(vtl): Remove this, local_id_, and remote_id_.
// This should only be set if we're attached. // This should only be set if we're attached.
scoped_refptr<Channel> channel_; scoped_refptr<Channel> channel_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment