Mojo: Add a RawChannel::EnqueueMessage() that can be overridden by subclasses.

This mostly adds a TODO for POSIX. (We want to actually send "real"
messages with FDs attached, which means that we have to "split" any
message with too many FDs into multiple messages.)

R=yzshen@chromium.org

Review URL: https://codereview.chromium.org/297683012

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@272471 0039d316-1c4b-4281-b951-d872f2087c98
parent afb72811
...@@ -219,11 +219,11 @@ bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { ...@@ -219,11 +219,11 @@ bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
return false; return false;
if (!write_buffer_->message_queue_.empty()) { if (!write_buffer_->message_queue_.empty()) {
write_buffer_->message_queue_.push_back(message.release()); EnqueueMessageNoLock(message.Pass());
return true; return true;
} }
write_buffer_->message_queue_.push_front(message.release()); EnqueueMessageNoLock(message.Pass());
DCHECK_EQ(write_buffer_->data_offset_, 0u); DCHECK_EQ(write_buffer_->data_offset_, 0u);
size_t platform_handles_written = 0; size_t platform_handles_written = 0;
...@@ -254,16 +254,6 @@ bool RawChannel::IsWriteBufferEmpty() { ...@@ -254,16 +254,6 @@ bool RawChannel::IsWriteBufferEmpty() {
return write_buffer_->message_queue_.empty(); return write_buffer_->message_queue_.empty();
} }
RawChannel::ReadBuffer* RawChannel::read_buffer() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
return read_buffer_.get();
}
RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
write_lock_.AssertAcquired();
return write_buffer_.get();
}
void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
...@@ -423,6 +413,11 @@ void RawChannel::OnWriteCompleted(bool result, ...@@ -423,6 +413,11 @@ void RawChannel::OnWriteCompleted(bool result,
CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
} }
void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
write_lock_.AssertAcquired();
write_buffer_->message_queue_.push_back(message.release());
}
void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
// TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
......
...@@ -185,14 +185,29 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { ...@@ -185,14 +185,29 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
RawChannel(); RawChannel();
// Must be called on the I/O thread WITHOUT |write_lock_| held.
void OnReadCompleted(bool result, size_t bytes_read);
// Must be called on the I/O thread WITHOUT |write_lock_| held.
void OnWriteCompleted(bool result,
size_t platform_handles_written,
size_t bytes_written);
base::MessageLoopForIO* message_loop_for_io() { return message_loop_for_io_; } base::MessageLoopForIO* message_loop_for_io() { return message_loop_for_io_; }
base::Lock& write_lock() { return write_lock_; } base::Lock& write_lock() { return write_lock_; }
// Only accessed on the I/O thread. // Should only be called on the I/O thread.
ReadBuffer* read_buffer(); ReadBuffer* read_buffer() { return read_buffer_.get(); }
// Only called under |write_lock_|.
WriteBuffer* write_buffer_no_lock() {
write_lock_.AssertAcquired();
return write_buffer_.get();
}
// Only accessed under |write_lock_|. // Adds |message| to the write message queue. Implementation subclasses may
WriteBuffer* write_buffer_no_lock(); // override this to add any additional "control" messages needed. This is
// called (on any thread) with |write_lock_| held.
virtual void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message);
// Reads into |read_buffer()|. // Reads into |read_buffer()|.
// This class guarantees that: // This class guarantees that:
...@@ -249,13 +264,6 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { ...@@ -249,13 +264,6 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
scoped_ptr<ReadBuffer> read_buffer, scoped_ptr<ReadBuffer> read_buffer,
scoped_ptr<WriteBuffer> write_buffer) = 0; scoped_ptr<WriteBuffer> write_buffer) = 0;
// Must be called on the I/O thread WITHOUT |write_lock_| held.
void OnReadCompleted(bool result, size_t bytes_read);
// Must be called on the I/O thread WITHOUT |write_lock_| held.
void OnWriteCompleted(bool result,
size_t platform_handles_written,
size_t bytes_written);
private: private:
// Calls |delegate_->OnFatalError(fatal_error)|. Must be called on the I/O // Calls |delegate_->OnFatalError(fatal_error)|. Must be called on the I/O
// thread WITHOUT |write_lock_| held. // thread WITHOUT |write_lock_| held.
......
...@@ -41,6 +41,10 @@ class RawChannelPosix : public RawChannel, ...@@ -41,6 +41,10 @@ class RawChannelPosix : public RawChannel,
private: private:
// |RawChannel| protected methods: // |RawChannel| protected methods:
// Actually override |EnqueueMessageNoLock()| so that we can send multiple
// messages with FDs if necessary.
virtual void EnqueueMessageNoLock(
scoped_ptr<MessageInTransit> message) OVERRIDE;
virtual IOResult Read(size_t* bytes_read) OVERRIDE; virtual IOResult Read(size_t* bytes_read) OVERRIDE;
virtual IOResult ScheduleRead() OVERRIDE; virtual IOResult ScheduleRead() OVERRIDE;
virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
...@@ -112,6 +116,13 @@ size_t RawChannelPosix::GetSerializedPlatformHandleSize() const { ...@@ -112,6 +116,13 @@ size_t RawChannelPosix::GetSerializedPlatformHandleSize() const {
return 0; return 0;
} }
void RawChannelPosix::EnqueueMessageNoLock(
scoped_ptr<MessageInTransit> message) {
// TODO(vtl): Split any message with too many platform handles into multiple
// messages.
RawChannel::EnqueueMessageNoLock(message.Pass());
}
RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
DCHECK(!pending_read_); DCHECK(!pending_read_);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment