Commit fc032c34 authored by Viet-Trung Luu's avatar Viet-Trung Luu

Mojo: Remove knowledge of Channel, etc. from ProxyMessagePipeEndpoint.

R=darin@chromium.org

Review URL: https://codereview.chromium.org/580123004

Cr-Commit-Position: refs/heads/master@{#295550}
parent c5fd1796
...@@ -119,7 +119,7 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( ...@@ -119,7 +119,7 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
endpoint->AttachToChannel(this, local_id); endpoint->AttachToChannel(this, local_id);
// This might fail if that port got an |OnPeerClose()| before attaching. // This might fail if that port got an |OnPeerClose()| before attaching.
if (message_pipe->Attach(port, endpoint.get(), this, local_id)) if (message_pipe->Attach(port, endpoint.get()))
return local_id; return local_id;
// Note: If it failed, quite possibly the endpoint info was removed from that // Note: If it failed, quite possibly the endpoint info was removed from that
...@@ -180,7 +180,7 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, ...@@ -180,7 +180,7 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
// running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
endpoint->Run(remote_id); endpoint->Run(remote_id);
// TODO(vtl): Get rid of this. // TODO(vtl): Get rid of this.
message_pipe->Run(port, remote_id); message_pipe->Run(port);
return true; return true;
} }
......
...@@ -22,6 +22,45 @@ ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, unsigned port) ...@@ -22,6 +22,45 @@ ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, unsigned port)
DCHECK(port_ == 0 || port_ == 1); DCHECK(port_ == 0 || port_ == 1);
} }
bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) {
DCHECK(message);
base::AutoLock locker(lock_);
if (!channel_) {
// Generally, this should only happen if the channel is shut down for some
// reason (with live message pipes on it).
return false;
}
DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
// TODO(vtl): Currently, we only support enqueueing messages when we're
// "running".
DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId);
message->SerializeAndCloseDispatchers(channel_);
message->set_source_id(local_id_);
message->set_destination_id(remote_id_);
return channel_->WriteMessage(message.Pass());
}
void ChannelEndpoint::DetachFromMessagePipe() {
// TODO(vtl): Once |message_pipe_| is under |lock_|, we should null it out
// here. For now, get the channel to do so for us.
scoped_refptr<Channel> channel;
{
base::AutoLock locker(lock_);
if (!channel_)
return;
DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
// TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid
// here as well.
channel = channel_;
}
// Don't call this under |lock_|, since it'll call us back.
// TODO(vtl): This seems pretty suboptimal.
channel->DetachMessagePipeEndpoint(local_id_, remote_id_);
}
void ChannelEndpoint::AttachToChannel(Channel* channel, void ChannelEndpoint::AttachToChannel(Channel* channel,
MessageInTransit::EndpointId local_id) { MessageInTransit::EndpointId local_id) {
DCHECK(channel); DCHECK(channel);
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#include "mojo/system/message_in_transit.h" #include "mojo/system/message_in_transit.h"
#include "mojo/system/system_impl_export.h" #include "mojo/system/system_impl_export.h"
...@@ -116,7 +117,21 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint ...@@ -116,7 +117,21 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint
// |Channel|.) // |Channel|.)
ChannelEndpoint(MessagePipe* message_pipe, unsigned port); ChannelEndpoint(MessagePipe* message_pipe, unsigned port);
// Methods called by |MessagePipe| (via |ProxyMessagePipeEndpoint|):
// TODO(vtl): This currently only works if we're "running". We'll move the
// "paused message queue" here (will this be needed when we have
// locally-allocated remote IDs?).
bool EnqueueMessage(scoped_ptr<MessageInTransit> message);
void DetachFromMessagePipe();
// Methods called by |Channel|:
// Called by |Channel| when it takes a reference to this object.
void AttachToChannel(Channel* channel, MessageInTransit::EndpointId local_id); void AttachToChannel(Channel* channel, MessageInTransit::EndpointId local_id);
// TODO(vtl): Combine this with |AttachToChannel()|.
void Run(MessageInTransit::EndpointId remote_id); void Run(MessageInTransit::EndpointId remote_id);
// Called by |Channel| before it gives up its reference to this object. // Called by |Channel| before it gives up its reference to this object.
......
...@@ -176,31 +176,25 @@ MojoResult MessagePipe::EnqueueMessage(unsigned port, ...@@ -176,31 +176,25 @@ MojoResult MessagePipe::EnqueueMessage(unsigned port,
return EnqueueMessageInternal(port, message.Pass(), NULL); return EnqueueMessageInternal(port, message.Pass(), NULL);
} }
bool MessagePipe::Attach(unsigned port, bool MessagePipe::Attach(unsigned port, ChannelEndpoint* channel_endpoint) {
ChannelEndpoint* channel_endpoint,
Channel* channel,
MessageInTransit::EndpointId local_id) {
DCHECK(port == 0 || port == 1); DCHECK(port == 0 || port == 1);
DCHECK(channel_endpoint); DCHECK(channel_endpoint);
DCHECK(channel);
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
base::AutoLock locker(lock_); base::AutoLock locker(lock_);
if (!endpoints_[port]) if (!endpoints_[port])
return false; return false;
DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy); DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
endpoints_[port]->Attach(channel_endpoint, channel, local_id); endpoints_[port]->Attach(channel_endpoint);
return true; return true;
} }
void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) { void MessagePipe::Run(unsigned port) {
DCHECK(port == 0 || port == 1); DCHECK(port == 0 || port == 1);
DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
base::AutoLock locker(lock_); base::AutoLock locker(lock_);
DCHECK(endpoints_[port]); DCHECK(endpoints_[port]);
if (!endpoints_[port]->Run(remote_id)) if (!endpoints_[port]->Run())
endpoints_[port].reset(); endpoints_[port].reset();
} }
......
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
namespace mojo { namespace mojo {
namespace system { namespace system {
class Channel;
class ChannelEndpoint; class ChannelEndpoint;
class Waiter; class Waiter;
...@@ -96,11 +95,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe ...@@ -96,11 +95,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe
scoped_ptr<MessageInTransit> message); scoped_ptr<MessageInTransit> message);
// These are used by |Channel|. // These are used by |Channel|.
bool Attach(unsigned port, bool Attach(unsigned port, ChannelEndpoint* channel_endpoint);
ChannelEndpoint* channel_endpoint, void Run(unsigned port);
Channel* channel,
MessageInTransit::EndpointId local_id);
void Run(unsigned port, MessageInTransit::EndpointId remote_id);
void OnRemove(unsigned port); void OnRemove(unsigned port);
private: private:
......
...@@ -48,13 +48,11 @@ void MessagePipeEndpoint::RemoveWaiter(Waiter* /*waiter*/, ...@@ -48,13 +48,11 @@ void MessagePipeEndpoint::RemoveWaiter(Waiter* /*waiter*/,
*signals_state = HandleSignalsState(); *signals_state = HandleSignalsState();
} }
void MessagePipeEndpoint::Attach(ChannelEndpoint* /*channel_endpoint*/, void MessagePipeEndpoint::Attach(ChannelEndpoint* /*channel_endpoint*/) {
Channel* /*channel*/,
MessageInTransit::EndpointId /*local_id*/) {
NOTREACHED(); NOTREACHED();
} }
bool MessagePipeEndpoint::Run(MessageInTransit::EndpointId /*remote_id*/) { bool MessagePipeEndpoint::Run() {
NOTREACHED(); NOTREACHED();
return true; return true;
} }
......
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
namespace mojo { namespace mojo {
namespace system { namespace system {
class Channel;
class ChannelEndpoint; class ChannelEndpoint;
class Waiter; class Waiter;
...@@ -75,11 +74,9 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeEndpoint { ...@@ -75,11 +74,9 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeEndpoint {
// Implementations must override these if they represent a proxy endpoint. An // Implementations must override these if they represent a proxy endpoint. An
// implementation for a local endpoint needs not override these methods, since // implementation for a local endpoint needs not override these methods, since
// they should never be called. // they should never be called.
virtual void Attach(ChannelEndpoint* channel_endpoint, virtual void Attach(ChannelEndpoint* channel_endpoint);
Channel* channel,
MessageInTransit::EndpointId local_id);
// Returns false if the endpoint should be closed and destroyed, else true. // Returns false if the endpoint should be closed and destroyed, else true.
virtual bool Run(MessageInTransit::EndpointId remote_id); virtual bool Run();
virtual void OnRemove(); virtual void OnRemove();
protected: protected:
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
#include <string.h> #include <string.h>
#include "base/logging.h" #include "base/logging.h"
#include "mojo/system/channel.h"
#include "mojo/system/channel_endpoint.h" #include "mojo/system/channel_endpoint.h"
#include "mojo/system/local_message_pipe_endpoint.h" #include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_pipe_dispatcher.h" #include "mojo/system/message_pipe_dispatcher.h"
...@@ -16,16 +15,13 @@ namespace mojo { ...@@ -16,16 +15,13 @@ namespace mojo {
namespace system { namespace system {
ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
: local_id_(MessageInTransit::kInvalidEndpointId), : is_running_(false), is_peer_open_(true) {
remote_id_(MessageInTransit::kInvalidEndpointId),
is_peer_open_(true) {
} }
ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
LocalMessagePipeEndpoint* local_message_pipe_endpoint, LocalMessagePipeEndpoint* local_message_pipe_endpoint,
bool is_peer_open) bool is_peer_open)
: local_id_(MessageInTransit::kInvalidEndpointId), : is_running_(false),
remote_id_(MessageInTransit::kInvalidEndpointId),
is_peer_open_(is_peer_open), is_peer_open_(is_peer_open),
paused_message_queue_(MessageInTransitQueue::PassContents(), paused_message_queue_(MessageInTransitQueue::PassContents(),
local_message_pipe_endpoint->message_queue()) { local_message_pipe_endpoint->message_queue()) {
...@@ -35,7 +31,6 @@ ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( ...@@ -35,7 +31,6 @@ ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
DCHECK(!is_running()); DCHECK(!is_running());
DCHECK(!is_attached()); DCHECK(!is_attached());
AssertConsistentState();
DCHECK(paused_message_queue_.IsEmpty()); DCHECK(paused_message_queue_.IsEmpty());
} }
...@@ -71,47 +66,33 @@ bool ProxyMessagePipeEndpoint::OnPeerClose() { ...@@ -71,47 +66,33 @@ bool ProxyMessagePipeEndpoint::OnPeerClose() {
void ProxyMessagePipeEndpoint::EnqueueMessage( void ProxyMessagePipeEndpoint::EnqueueMessage(
scoped_ptr<MessageInTransit> message) { scoped_ptr<MessageInTransit> message) {
if (is_running()) { if (is_running()) {
message->SerializeAndCloseDispatchers(channel_.get()); DCHECK(channel_endpoint_.get());
LOG_IF(WARNING, !channel_endpoint_->EnqueueMessage(message.Pass()))
message->set_source_id(local_id_); << "Failed to write enqueue message to channel";
message->set_destination_id(remote_id_);
if (!channel_->WriteMessage(message.Pass()))
LOG(WARNING) << "Failed to write message to channel";
} else { } else {
paused_message_queue_.AddMessage(message.Pass()); paused_message_queue_.AddMessage(message.Pass());
} }
} }
void ProxyMessagePipeEndpoint::Attach(ChannelEndpoint* channel_endpoint, void ProxyMessagePipeEndpoint::Attach(ChannelEndpoint* channel_endpoint) {
Channel* channel,
MessageInTransit::EndpointId local_id) {
DCHECK(channel_endpoint); DCHECK(channel_endpoint);
DCHECK(channel);
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
DCHECK(!is_attached()); DCHECK(!is_attached());
AssertConsistentState();
channel_endpoint_ = channel_endpoint; channel_endpoint_ = channel_endpoint;
channel_ = channel;
local_id_ = local_id;
AssertConsistentState();
} }
bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { bool ProxyMessagePipeEndpoint::Run() {
// Assertions about arguments:
DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
// Assertions about current state: // Assertions about current state:
DCHECK(is_attached()); DCHECK(is_attached());
DCHECK(!is_running()); DCHECK(!is_running());
AssertConsistentState(); is_running_ = true;
remote_id_ = remote_id;
AssertConsistentState();
while (!paused_message_queue_.IsEmpty()) while (!paused_message_queue_.IsEmpty()) {
EnqueueMessage(paused_message_queue_.GetMessage()); LOG_IF(
WARNING,
!channel_endpoint_->EnqueueMessage(paused_message_queue_.GetMessage()))
<< "Failed to write enqueue message to channel";
}
if (is_peer_open_) if (is_peer_open_)
return true; // Stay alive. return true; // Stay alive.
...@@ -128,27 +109,11 @@ void ProxyMessagePipeEndpoint::OnRemove() { ...@@ -128,27 +109,11 @@ void ProxyMessagePipeEndpoint::OnRemove() {
void ProxyMessagePipeEndpoint::Detach() { void ProxyMessagePipeEndpoint::Detach() {
DCHECK(is_attached()); DCHECK(is_attached());
AssertConsistentState(); channel_endpoint_->DetachFromMessagePipe();
channel_->DetachMessagePipeEndpoint(local_id_, remote_id_);
channel_ = NULL;
// TODO(vtl): Inform |channel_endpoint_| that we were detached.
channel_endpoint_ = NULL; channel_endpoint_ = NULL;
local_id_ = MessageInTransit::kInvalidEndpointId; is_running_ = false;
remote_id_ = MessageInTransit::kInvalidEndpointId;
paused_message_queue_.Clear(); paused_message_queue_.Clear();
AssertConsistentState();
}
#ifndef NDEBUG
void ProxyMessagePipeEndpoint::AssertConsistentState() const {
if (is_attached()) {
DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
} else { // Not attached.
DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
}
} }
#endif
} // namespace system } // namespace system
} // namespace mojo } // namespace mojo
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
#include <stdint.h> #include <stdint.h>
#include "base/compiler_specific.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "mojo/system/message_in_transit.h" #include "mojo/system/message_in_transit.h"
...@@ -18,7 +17,6 @@ ...@@ -18,7 +17,6 @@
namespace mojo { namespace mojo {
namespace system { namespace system {
class Channel;
class ChannelEndpoint; class ChannelEndpoint;
class LocalMessagePipeEndpoint; class LocalMessagePipeEndpoint;
class MessagePipe; class MessagePipe;
...@@ -55,43 +53,22 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint ...@@ -55,43 +53,22 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
virtual Type GetType() const OVERRIDE; virtual Type GetType() const OVERRIDE;
virtual bool OnPeerClose() OVERRIDE; virtual bool OnPeerClose() OVERRIDE;
virtual void EnqueueMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; virtual void EnqueueMessage(scoped_ptr<MessageInTransit> message) OVERRIDE;
virtual void Attach(ChannelEndpoint* channel_endpoint, virtual void Attach(ChannelEndpoint* channel_endpoint) OVERRIDE;
Channel* channel, virtual bool Run() OVERRIDE;
MessageInTransit::EndpointId local_id) OVERRIDE;
virtual bool Run(MessageInTransit::EndpointId remote_id) OVERRIDE;
virtual void OnRemove() OVERRIDE; virtual void OnRemove() OVERRIDE;
private: private:
void Detach(); void Detach();
#ifdef NDEBUG // TODO(vtl): Get rid of these.
void AssertConsistentState() const {} bool is_attached() const { return !!channel_endpoint_.get(); }
#else bool is_running() const { return is_running_; }
void AssertConsistentState() const;
#endif
bool is_attached() const { return !!channel_.get(); }
bool is_running() const {
return remote_id_ != MessageInTransit::kInvalidEndpointId;
}
// This should only be set if we're attached. // This should only be set if we're attached.
scoped_refptr<ChannelEndpoint> channel_endpoint_; scoped_refptr<ChannelEndpoint> channel_endpoint_;
// TODO(vtl): Remove this, local_id_, and remote_id_. // TODO(vtl): Get rid of this.
// This should only be set if we're attached. bool is_running_;
scoped_refptr<Channel> channel_;
// |local_id_| should be set to something other than
// |MessageInTransit::kInvalidEndpointId| when we're attached.
MessageInTransit::EndpointId local_id_;
// |remote_id_| being set to anything other than
// |MessageInTransit::kInvalidEndpointId| indicates that we're "running",
// i.e., actively able to send messages. We should only ever be running if
// we're attached.
MessageInTransit::EndpointId remote_id_;
bool is_peer_open_; bool is_peer_open_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment