Mojo: Refactor some message pipe stuff.

Move handling of dispatchers when enqueueing messages into the endpoint
code. That handling will need to be very specific to the type of
endpoint.

R=darin@chromium.org

Review URL: https://codereview.chromium.org/147983009

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@247753 0039d316-1c4b-4281-b951-d872f2087c98
parent 8a168d0e
......@@ -14,26 +14,52 @@ namespace mojo {
namespace system {
LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry()
: message(NULL) {
: message_(NULL) {
}
// See comment in header file.
LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry(
const MessageQueueEntry& other)
: message(NULL) {
DCHECK(!other.message);
DCHECK(other.dispatchers.empty());
: message_(NULL) {
DCHECK(!other.message_);
DCHECK(other.dispatchers_.empty());
}
LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() {
if (message)
message->Destroy();
if (message_)
message_->Destroy();
// Close all the dispatchers.
for (size_t i = 0; i < dispatchers.size(); i++) {
for (size_t i = 0; i < dispatchers_.size(); i++) {
// Note: Taking the |Dispatcher| locks is okay, since no one else should
// have a reference to the dispatchers (and the locks shouldn't be held).
DCHECK(dispatchers[i]->HasOneRef());
dispatchers[i]->Close();
DCHECK(dispatchers_[i]->HasOneRef());
dispatchers_[i]->Close();
}
}
void LocalMessagePipeEndpoint::MessageQueueEntry::Init(
MessageInTransit* message,
const std::vector<Dispatcher*>* dispatchers) {
DCHECK(message);
DCHECK(!dispatchers || !dispatchers->empty());
DCHECK(!message_);
DCHECK(dispatchers_.empty());
message_ = message;
if (dispatchers) {
dispatchers_.reserve(dispatchers->size());
for (size_t i = 0; i < dispatchers->size(); i++) {
dispatchers_.push_back(
(*dispatchers)[i]->CreateEquivalentDispatcherAndCloseNoLock());
#ifndef NDEBUG
// It's important that we have "ownership" of these dispatchers. In
// particular, they must not be in the global handle table (i.e., have
// live handles referring to them). If we need to destroy any queued
// messages, we need to know that any handles in them should be closed.
DCHECK(dispatchers_[i]->HasOneRef());
#endif
}
}
}
......@@ -69,36 +95,24 @@ void LocalMessagePipeEndpoint::OnPeerClose() {
}
}
MojoResult LocalMessagePipeEndpoint::CanEnqueueMessage(
const MessageInTransit* /*message*/,
const std::vector<Dispatcher*>* /*dispatchers*/) {
return MOJO_RESULT_OK;
}
void LocalMessagePipeEndpoint::EnqueueMessage(
MojoResult LocalMessagePipeEndpoint::EnqueueMessage(
MessageInTransit* message,
std::vector<scoped_refptr<Dispatcher> >* dispatchers) {
const std::vector<Dispatcher*>* dispatchers) {
DCHECK(is_open_);
DCHECK(is_peer_open_);
DCHECK(!dispatchers || !dispatchers->empty());
bool was_empty = message_queue_.empty();
// TODO(vtl): Use |emplace_back()| (and a suitable constructor, instead of
// |Init()|) when that becomes available.
message_queue_.push_back(MessageQueueEntry());
message_queue_.back().message = message;
if (dispatchers) {
#ifndef NDEBUG
// It's important that we're taking "ownership" of the dispatchers. In
// particular, they must not be in the global handle table (i.e., have live
// handles referring to them). If we need to destroy any queued messages, we
// need to know that any handles in them should be closed.
for (size_t i = 0; i < dispatchers->size(); i++)
DCHECK((*dispatchers)[i]->HasOneRef());
#endif
message_queue_.back().dispatchers.swap(*dispatchers);
}
message_queue_.back().Init(message, dispatchers);
if (was_empty) {
waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
SatisfiableFlags());
}
return MOJO_RESULT_OK;
}
void LocalMessagePipeEndpoint::CancelAllWaiters() {
......@@ -125,7 +139,7 @@ MojoResult LocalMessagePipeEndpoint::ReadMessage(
// TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
// and release the lock immediately.
bool enough_space = true;
const MessageInTransit* queued_message = message_queue_.front().message;
const MessageInTransit* queued_message = message_queue_.front().message();
if (num_bytes)
*num_bytes = queued_message->data_size();
if (queued_message->data_size() <= max_bytes)
......@@ -134,7 +148,7 @@ MojoResult LocalMessagePipeEndpoint::ReadMessage(
enough_space = false;
std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers =
&message_queue_.front().dispatchers;
message_queue_.front().dispatchers();
if (num_dispatchers)
*num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
if (enough_space) {
......
......@@ -27,12 +27,9 @@ class MOJO_SYSTEM_IMPL_EXPORT LocalMessagePipeEndpoint
// |MessagePipeEndpoint| implementation:
virtual void Close() OVERRIDE;
virtual void OnPeerClose() OVERRIDE;
virtual MojoResult CanEnqueueMessage(
const MessageInTransit* message,
const std::vector<Dispatcher*>* dispatchers) OVERRIDE;
virtual void EnqueueMessage(
virtual MojoResult EnqueueMessage(
MessageInTransit* message,
std::vector<scoped_refptr<Dispatcher> >* dispatchers) OVERRIDE;
const std::vector<Dispatcher*>* dispatchers) OVERRIDE;
// There's a dispatcher for |LocalMessagePipeEndpoint|s, so we have to
// implement/override these:
......@@ -48,7 +45,8 @@ class MOJO_SYSTEM_IMPL_EXPORT LocalMessagePipeEndpoint
virtual void RemoveWaiter(Waiter* waiter) OVERRIDE;
private:
struct MessageQueueEntry {
class MessageQueueEntry {
public:
MessageQueueEntry();
// Provide an explicit copy constructor, so that we can use this directly in
// a (C++03) STL container. However, we only allow the case where |other| is
......@@ -57,10 +55,25 @@ class MOJO_SYSTEM_IMPL_EXPORT LocalMessagePipeEndpoint
MessageQueueEntry(const MessageQueueEntry& other);
~MessageQueueEntry();
MessageInTransit* message;
std::vector<scoped_refptr<Dispatcher> > dispatchers;
// Initialize, taking ownership of |message| and creating equivalent
// "duplicate" |dispatchers|. |dispatchers| should be non-null only if
// nonempty.
// TODO(vtl): This would simply be a constructor, but we don't have C++11's
// emplace operations yet, and I don't want to copy |dispatchers_|.
void Init(MessageInTransit* message,
const std::vector<Dispatcher*>* dispatchers);
MessageInTransit* message() {
return message_;
}
std::vector<scoped_refptr<Dispatcher> >* dispatchers() {
return &dispatchers_;
}
private:
MessageInTransit* message_;
std::vector<scoped_refptr<Dispatcher> > dispatchers_;
// We don't need assignment, however.
DISALLOW_ASSIGN(MessageQueueEntry);
};
......
......@@ -114,6 +114,7 @@ MojoResult MessagePipe::EnqueueMessage(
const std::vector<Dispatcher*>* dispatchers) {
DCHECK(port == 0 || port == 1);
DCHECK(message);
DCHECK(!dispatchers || !dispatchers->empty());
if (message->type() == MessageInTransit::kTypeMessagePipe) {
DCHECK(!dispatchers);
......@@ -131,27 +132,7 @@ MojoResult MessagePipe::EnqueueMessage(
return MOJO_RESULT_FAILED_PRECONDITION;
}
MojoResult result = endpoints_[port]->CanEnqueueMessage(message, dispatchers);
if (result != MOJO_RESULT_OK) {
message->Destroy();
return result;
}
if (dispatchers) {
DCHECK(!dispatchers->empty());
std::vector<scoped_refptr<Dispatcher> > replacement_dispatchers;
for (size_t i = 0; i < dispatchers->size(); i++) {
replacement_dispatchers.push_back(
(*dispatchers)[i]->CreateEquivalentDispatcherAndCloseNoLock());
}
endpoints_[port]->EnqueueMessage(message, &replacement_dispatchers);
} else {
endpoints_[port]->EnqueueMessage(message, NULL);
}
return MOJO_RESULT_OK;
return endpoints_[port]->EnqueueMessage(message, dispatchers);
}
void MessagePipe::Attach(unsigned port,
......
......@@ -66,7 +66,7 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe :
// This is used internally by |WriteMessage()| and by |Channel| to enqueue
// messages (typically to a |LocalMessagePipeEndpoint|). Unlike
// |WriteMessage()|, |port| is the *destination* port. Takes ownership of
// |message|.
// |message|. |dispatchers| should be non-null only if it's nonempty.
MojoResult EnqueueMessage(unsigned port,
MessageInTransit* message,
const std::vector<Dispatcher*>* dispatchers);
......
......@@ -37,20 +37,11 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeEndpoint {
// All implementations must implement these.
virtual void Close() = 0;
virtual void OnPeerClose() = 0;
// Checks if |EnqueueMessage()| will be able to enqueue the given message
// (with the given set of dispatchers). |dispatchers| should be non-null only
// if it's nonempty. Returns |MOJO_RESULT_OK| if it will and an appropriate
// error code if it won't.
virtual MojoResult CanEnqueueMessage(
const MessageInTransit* message,
const std::vector<Dispatcher*>* dispatchers) = 0;
// Takes ownership of |message| and the contents of |dispatchers| (leaving
// it empty). This should only be called after |CanEnqueueMessage()| has
// indicated success. (Unlike |CanEnqueueMessage()|, |dispatchers| may be
// non-null but empty.)
virtual void EnqueueMessage(
// Implements |MessagePipe::EnqueueMessage()| (see its description for
// details).
virtual MojoResult EnqueueMessage(
MessageInTransit* message,
std::vector<scoped_refptr<Dispatcher> >* dispatchers) = 0;
const std::vector<Dispatcher*>* dispatchers) = 0;
// Implementations must override these if they represent a local endpoint,
// i.e., one for which there's a |MessagePipeDispatcher| (and thus a handle).
......
......@@ -55,47 +55,22 @@ void ProxyMessagePipeEndpoint::OnPeerClose() {
MessageInTransit::Create(MessageInTransit::kTypeMessagePipe,
MessageInTransit::kSubtypeMessagePipePeerClosed,
NULL, 0);
if (CanEnqueueMessage(message, NULL) == MOJO_RESULT_OK) {
EnqueueMessage(message, NULL);
} else {
message->Destroy();
// TODO(vtl): Do something more sensible on error here?
LOG(WARNING) << "Failed to send peer closed control message";
}
EnqueueMessageInternal(message, NULL);
}
MojoResult ProxyMessagePipeEndpoint::CanEnqueueMessage(
const MessageInTransit* /*message*/,
const std::vector<Dispatcher*>* dispatchers) {
// TODO(vtl): Support sending handles over OS pipes.
if (dispatchers) {
NOTIMPLEMENTED();
return MOJO_RESULT_UNIMPLEMENTED;
}
return MOJO_RESULT_OK;
}
// Note: We may have to enqueue messages even when our (local) peer isn't open
// -- it may have been written to and closed immediately, before we were ready.
// This case is handled in |Run()| (which will call us).
void ProxyMessagePipeEndpoint::EnqueueMessage(
MojoResult ProxyMessagePipeEndpoint::EnqueueMessage(
MessageInTransit* message,
std::vector<scoped_refptr<Dispatcher> >* dispatchers) {
DCHECK(is_open_);
// TODO(vtl)
DCHECK(!dispatchers || dispatchers->empty());
const std::vector<Dispatcher*>* dispatchers) {
DCHECK(!dispatchers || !dispatchers->empty());
if (is_running()) {
message->set_source_id(local_id_);
message->set_destination_id(remote_id_);
// TODO(vtl): Figure out error handling here (where it's rather late) --
// maybe move whatever checks we can into |CanEnqueueMessage()|.
if (!channel_->WriteMessage(message))
LOG(WARNING) << "Failed to write message to channel";
} else {
paused_message_queue_.push_back(message);
MojoResult result = CanEnqueueDispatchers(dispatchers);
if (result != MOJO_RESULT_OK) {
message->Destroy();
return result;
}
EnqueueMessageInternal(message, dispatchers);
return MOJO_RESULT_OK;
}
void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel,
......@@ -124,21 +99,48 @@ void ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
AssertConsistentState();
for (std::deque<MessageInTransit*>::iterator it =
paused_message_queue_.begin();
it != paused_message_queue_.end();
++it) {
if (CanEnqueueMessage(*it, NULL) == MOJO_RESULT_OK) {
EnqueueMessage(*it, NULL);
} else {
(*it)->Destroy();
// TODO(vtl): Do something more sensible on error here?
LOG(WARNING) << "Failed to send message";
// TODO(vtl): Abort?
}
}
paused_message_queue_.begin(); it != paused_message_queue_.end();
++it)
EnqueueMessageInternal(*it, NULL);
paused_message_queue_.clear();
}
MojoResult ProxyMessagePipeEndpoint::CanEnqueueDispatchers(
const std::vector<Dispatcher*>* dispatchers) {
// TODO(vtl): Support sending handles over OS pipes.
if (dispatchers) {
NOTIMPLEMENTED();
return MOJO_RESULT_UNIMPLEMENTED;
}
return MOJO_RESULT_OK;
}
// Note: We may have to enqueue messages even when our (local) peer isn't open
// -- it may have been written to and closed immediately, before we were ready.
// This case is handled in |Run()| (which will call us).
void ProxyMessagePipeEndpoint::EnqueueMessageInternal(
MessageInTransit* message,
const std::vector<Dispatcher*>* dispatchers) {
DCHECK(is_open_);
DCHECK(!dispatchers || !dispatchers->empty());
// TODO(vtl): We don't actually support sending dispatchers yet. We shouldn't
// get here due to other checks.
DCHECK(!dispatchers) << "Not yet implemented";
if (is_running()) {
message->set_source_id(local_id_);
message->set_destination_id(remote_id_);
// If it fails at this point, the message gets dropped. (This is no
// different from any other in-transit errors.)
// Note: |WriteMessage()| will destroy the message even on failure.
if (!channel_->WriteMessage(message))
LOG(WARNING) << "Failed to write message to channel";
} else {
paused_message_queue_.push_back(message);
}
}
#ifndef NDEBUG
void ProxyMessagePipeEndpoint::AssertConsistentState() const {
if (is_attached()) {
......
......@@ -46,12 +46,9 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
// |MessagePipeEndpoint| implementation:
virtual void Close() OVERRIDE;
virtual void OnPeerClose() OVERRIDE;
virtual MojoResult CanEnqueueMessage(
const MessageInTransit* message,
const std::vector<Dispatcher*>* dispatchers) OVERRIDE;
virtual void EnqueueMessage(
virtual MojoResult EnqueueMessage(
MessageInTransit* message,
std::vector<scoped_refptr<Dispatcher> >* dispatchers) OVERRIDE;
const std::vector<Dispatcher*>* dispatchers) OVERRIDE;
virtual void Attach(scoped_refptr<Channel> channel,
MessageInTransit::EndpointId local_id) OVERRIDE;
virtual void Run(MessageInTransit::EndpointId remote_id) OVERRIDE;
......@@ -65,6 +62,12 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
return remote_id_ != MessageInTransit::kInvalidEndpointId;
}
MojoResult CanEnqueueDispatchers(const std::vector<Dispatcher*>* dispatchers);
// |dispatchers| should be non-null only if it's nonempty, in which case the
// dispatchers should have been preflighted by |CanEnqueueDispatchers()|.
void EnqueueMessageInternal(MessageInTransit* message,
const std::vector<Dispatcher*>* dispatchers);
#ifdef NDEBUG
void AssertConsistentState() const {}
#else
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment