Commit 244b6032 authored by Viet-Trung Luu's avatar Viet-Trung Luu

Mojo: Move the paused message queue from ProxyMessagePipeEndpoint to ChannelEndpoint.

R=brettw@chromium.org

Review URL: https://codereview.chromium.org/587153003

Cr-Commit-Position: refs/heads/master@{#296772}
parent 83566744
......@@ -22,15 +22,24 @@ ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, unsigned port)
DCHECK(port_ == 0 || port_ == 1);
}
void ChannelEndpoint::TakeMessages(MessageInTransitQueue* message_queue) {
DCHECK(paused_message_queue_.IsEmpty());
paused_message_queue_.Swap(message_queue);
}
bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) {
DCHECK(message);
base::AutoLock locker(lock_);
if (!channel_) {
// Generally, this should only happen if the channel is shut down for some
// reason (with live message pipes on it).
return false;
if (!channel_ || remote_id_ == MessageInTransit::kInvalidEndpointId) {
// We may reach here if we haven't been attached or run yet.
// TODO(vtl): We may also reach here if the channel is shut down early for
// some reason (with live message pipes on it). We can't check |state_| yet,
// until it's protected under lock, but in this case we should return false
// (and not enqueue any messages).
paused_message_queue_.AddMessage(message.Pass());
return true;
}
// TODO(vtl): Currently, this only works in the "running" case.
......@@ -77,6 +86,11 @@ void ChannelEndpoint::Run(MessageInTransit::EndpointId remote_id) {
DCHECK(channel_);
DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
remote_id_ = remote_id;
while (!paused_message_queue_.IsEmpty()) {
LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage()))
<< "Failed to write enqueue message to channel";
}
}
void ChannelEndpoint::DetachFromChannel() {
......
......@@ -10,6 +10,7 @@
#include "base/memory/scoped_ptr.h"
#include "base/synchronization/lock.h"
#include "mojo/system/message_in_transit.h"
#include "mojo/system/message_in_transit_queue.h"
#include "mojo/system/system_impl_export.h"
namespace mojo {
......@@ -112,6 +113,11 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint
// TODO(vtl): More comments....
ChannelEndpoint(MessagePipe* message_pipe, unsigned port);
// Takes messages from the given |MessageInTransitQueue|. This must be called
// before this object is attached to a channel, and before anyone has a chance
// to enqueue any messages.
void TakeMessages(MessageInTransitQueue* message_queue);
// Methods called by |MessagePipe| (via |ProxyMessagePipeEndpoint|):
// TODO(vtl): This currently only works if we're "running". We'll move the
......@@ -170,6 +176,10 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint
MessageInTransit::EndpointId local_id_;
MessageInTransit::EndpointId remote_id_;
// This queue is used before we're running on a channel and ready to send
// messages.
MessageInTransitQueue paused_message_queue_;
DISALLOW_COPY_AND_ASSIGN(ChannelEndpoint);
};
......
......@@ -44,7 +44,7 @@ class MOJO_SYSTEM_IMPL_EXPORT LocalMessagePipeEndpoint
virtual void RemoveWaiter(Waiter* waiter,
HandleSignalsState* signals_state) OVERRIDE;
// This is only to be used by |ProxyMessagePipeEndpoint|:
// This is only to be used by |MessagePipe|:
MessageInTransitQueue* message_queue() { return &message_queue_; }
private:
......
......@@ -170,14 +170,14 @@ scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
<< "Direct message pipe passing across multiple channels not yet "
"implemented; will proxy";
scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
scoped_refptr<ChannelEndpoint> channel_endpoint(
new ChannelEndpoint(this, port));
scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
new ProxyMessagePipeEndpoint(
channel_endpoint.get(),
static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
is_peer_open));
endpoints_[port].swap(replacement_endpoint);
endpoints_[port].reset(
new ProxyMessagePipeEndpoint(channel_endpoint.get(), is_peer_open));
channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>(
old_endpoint.get())->message_queue());
old_endpoint->Close();
return channel_endpoint;
}
......
......@@ -23,19 +23,15 @@ ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
ChannelEndpoint* channel_endpoint,
LocalMessagePipeEndpoint* local_message_pipe_endpoint,
bool is_peer_open)
: channel_endpoint_(channel_endpoint),
is_running_(false),
is_peer_open_(is_peer_open) {
paused_message_queue_.Swap(local_message_pipe_endpoint->message_queue());
local_message_pipe_endpoint->Close();
}
ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
DCHECK(!is_running());
DCHECK(!is_attached());
DCHECK(paused_message_queue_.IsEmpty());
}
MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const {
......@@ -47,10 +43,6 @@ bool ProxyMessagePipeEndpoint::OnPeerClose() {
is_peer_open_ = false;
// If our outgoing message queue isn't empty, we shouldn't be destroyed yet.
if (!paused_message_queue_.IsEmpty())
return true;
if (is_attached()) {
if (!is_running()) {
// If we're not running yet, we can't be destroyed yet, because we're
......@@ -69,13 +61,9 @@ bool ProxyMessagePipeEndpoint::OnPeerClose() {
// This case is handled in |Run()| (which will call us).
void ProxyMessagePipeEndpoint::EnqueueMessage(
scoped_ptr<MessageInTransit> message) {
if (is_running()) {
DCHECK(channel_endpoint_.get());
LOG_IF(WARNING, !channel_endpoint_->EnqueueMessage(message.Pass()))
<< "Failed to write enqueue message to channel";
} else {
paused_message_queue_.AddMessage(message.Pass());
}
DCHECK(channel_endpoint_.get());
LOG_IF(WARNING, !channel_endpoint_->EnqueueMessage(message.Pass()))
<< "Failed to write enqueue message to channel";
}
bool ProxyMessagePipeEndpoint::Run() {
......@@ -85,13 +73,6 @@ bool ProxyMessagePipeEndpoint::Run() {
is_running_ = true;
while (!paused_message_queue_.IsEmpty()) {
LOG_IF(
WARNING,
!channel_endpoint_->EnqueueMessage(paused_message_queue_.GetMessage()))
<< "Failed to write enqueue message to channel";
}
if (is_peer_open_)
return true; // Stay alive.
......@@ -110,7 +91,6 @@ void ProxyMessagePipeEndpoint::Detach() {
channel_endpoint_->DetachFromMessagePipe();
channel_endpoint_ = nullptr;
is_running_ = false;
paused_message_queue_.Clear();
}
} // namespace system
......
......@@ -8,7 +8,6 @@
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "mojo/system/message_in_transit.h"
#include "mojo/system/message_in_transit_queue.h"
#include "mojo/system/message_pipe_endpoint.h"
#include "mojo/system/system_impl_export.h"
......@@ -38,13 +37,11 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
: public MessagePipeEndpoint {
public:
explicit ProxyMessagePipeEndpoint(ChannelEndpoint* channel_endpoint);
// Constructs a |ProxyMessagePipeEndpoint| that replaces the given
// |LocalMessagePipeEndpoint| (which this constructor will close), taking its
// message queue's contents. This is done when transferring a message pipe
// handle over a remote message pipe.
// Constructs a |ProxyMessagePipeEndpoint|, whose peer may already be closed.
// This is used to construct one to replace a |LocalMessagePipeEndpoint|, when
// transferring a message pipe handle over a remote message pipe.
ProxyMessagePipeEndpoint(
ChannelEndpoint* channel_endpoint,
LocalMessagePipeEndpoint* local_message_pipe_endpoint,
bool is_peer_open);
virtual ~ProxyMessagePipeEndpoint();
......@@ -70,10 +67,6 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
bool is_peer_open_;
// This queue is only used while we're detached, to store messages while we're
// not ready to send them yet.
MessageInTransitQueue paused_message_queue_;
DISALLOW_COPY_AND_ASSIGN(ProxyMessagePipeEndpoint);
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment