Commit 3d5a60bf authored by brettw@chromium.org's avatar brettw@chromium.org

Separate out the platform-independent parts of Channel reading.

I'm planning on comsolidating the platform-independent management of the
overflow buffer and message dispatch between the posix and windows channels.
This patch separates out the behavior into the functions I'm planning on adding
on the virtual interface.

Basically, ProcessIncomingMessages and DispatchInputData will be the main
shared code. In future patches, I'll refactor Windows in a similar way and then
combine them into a shared base class.


Review URL: http://codereview.chromium.org/9570001

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@124489 0039d316-1c4b-4281-b951-d872f2087c98
parent 5f887613
...@@ -479,48 +479,68 @@ bool Channel::ChannelImpl::Connect() { ...@@ -479,48 +479,68 @@ bool Channel::ChannelImpl::Connect() {
} }
bool Channel::ChannelImpl::ProcessIncomingMessages() { bool Channel::ChannelImpl::ProcessIncomingMessages() {
for (;;) { while (true) {
if (pipe_ == -1) int bytes_read = 0;
ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
&bytes_read);
if (read_state == READ_FAILED)
return false; return false;
if (read_state == READ_PENDING)
return true;
const char* p = NULL; DCHECK(bytes_read > 0);
const char* end = NULL; if (!DispatchInputData(input_buf_, bytes_read))
if (!ReadDataFromPipe(&p, &end)) return false;
return false; // Pipe error. }
if (!p) }
return true; // No data waiting.
// Dispatch all complete messages in the data buffer.
while (p < end) {
const char* message_tail = Message::FindNext(p, end);
if (message_tail) {
int len = static_cast<int>(message_tail - p);
Message m(p, len);
if (!PopulateMessageFileDescriptors(&m))
return false;
DVLOG(2) << "received message on channel @" << this bool Channel::ChannelImpl::DispatchInputData(const char* input_data,
<< " with type " << m.type() << " on fd " << pipe_; int input_data_len) {
if (IsHelloMessage(&m)) const char* p;
HandleHelloMessage(m); const char* end;
else
listener_->OnMessageReceived(m);
p = message_tail;
} else {
// Last message is partial.
break;
}
}
input_overflow_buf_.assign(p, end - p);
// When the input data buffer is empty, the fds should be too. If this is // Possibly combine with the overflow buffer to make a larger buffer.
// not the case, we probably have a rogue renderer which is trying to fill if (input_overflow_buf_.empty()) {
// our descriptor table. p = input_data;
if (input_overflow_buf_.empty() && !input_fds_.empty()) { end = input_data + input_data_len;
// We close these descriptors in Close() } else {
if (input_overflow_buf_.size() >
kMaximumMessageSize - input_data_len) {
input_overflow_buf_.clear();
LOG(ERROR) << "IPC message is too big";
return false; return false;
} }
input_overflow_buf_.append(input_data, input_data_len);
p = input_overflow_buf_.data();
end = p + input_overflow_buf_.size();
} }
// Dispatch all complete messages in the data buffer.
while (p < end) {
const char* message_tail = Message::FindNext(p, end);
if (message_tail) {
int len = static_cast<int>(message_tail - p);
Message m(p, len);
if (!WillDispatchInputMessage(&m))
return false;
if (IsHelloMessage(&m))
HandleHelloMessage(m);
else
listener_->OnMessageReceived(m);
p = message_tail;
} else {
// Last message is partial.
break;
}
}
// Save any partial data in the overflow buffer.
input_overflow_buf_.assign(p, end - p);
if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
return false;
return true;
} }
bool Channel::ChannelImpl::ProcessOutgoingMessages() { bool Channel::ChannelImpl::ProcessOutgoingMessages() {
...@@ -937,11 +957,16 @@ bool Channel::ChannelImpl::IsHelloMessage(const Message* m) const { ...@@ -937,11 +957,16 @@ bool Channel::ChannelImpl::IsHelloMessage(const Message* m) const {
return m->routing_id() == MSG_ROUTING_NONE && m->type() == HELLO_MESSAGE_TYPE; return m->routing_id() == MSG_ROUTING_NONE && m->type() == HELLO_MESSAGE_TYPE;
} }
bool Channel::ChannelImpl::ReadDataFromPipe(const char** buffer_begin, Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
const char** buffer_end) { char* buffer,
int buffer_len,
int* bytes_read) {
if (pipe_ == -1)
return READ_FAILED;
struct msghdr msg = {0}; struct msghdr msg = {0};
struct iovec iov = {input_buf_, Channel::kReadBufferSize}; struct iovec iov = {buffer, buffer_len};
msg.msg_iov = &iov; msg.msg_iov = &iov;
msg.msg_iovlen = 1; msg.msg_iovlen = 1;
...@@ -949,62 +974,44 @@ bool Channel::ChannelImpl::ReadDataFromPipe(const char** buffer_begin, ...@@ -949,62 +974,44 @@ bool Channel::ChannelImpl::ReadDataFromPipe(const char** buffer_begin,
// recvmsg() returns 0 if the connection has closed or EAGAIN if no data // recvmsg() returns 0 if the connection has closed or EAGAIN if no data
// is waiting on the pipe. // is waiting on the pipe.
ssize_t bytes_read = 0;
#if defined(IPC_USES_READWRITE) #if defined(IPC_USES_READWRITE)
if (fd_pipe_ >= 0) { if (fd_pipe_ >= 0) {
bytes_read = HANDLE_EINTR(read(pipe_, input_buf_, *bytes_read = HANDLE_EINTR(read(pipe_, buffer, buffer_len));
Channel::kReadBufferSize));
msg.msg_controllen = 0; msg.msg_controllen = 0;
} else } else
#endif // IPC_USES_READWRITE #endif // IPC_USES_READWRITE
{ {
msg.msg_controllen = sizeof(input_cmsg_buf_); msg.msg_controllen = sizeof(input_cmsg_buf_);
bytes_read = HANDLE_EINTR(recvmsg(pipe_, &msg, MSG_DONTWAIT)); *bytes_read = HANDLE_EINTR(recvmsg(pipe_, &msg, MSG_DONTWAIT));
} }
if (bytes_read < 0) { if (*bytes_read < 0) {
if (errno == EAGAIN) { if (errno == EAGAIN) {
*buffer_begin = *buffer_end = NULL; // Signal no data was read. return READ_PENDING;
return true;
#if defined(OS_MACOSX) #if defined(OS_MACOSX)
} else if (errno == EPERM) { } else if (errno == EPERM) {
// On OSX, reading from a pipe with no listener returns EPERM // On OSX, reading from a pipe with no listener returns EPERM
// treat this as a special case to prevent spurious error messages // treat this as a special case to prevent spurious error messages
// to the console. // to the console.
return false; return READ_FAILED;
#endif // OS_MACOSX #endif // OS_MACOSX
} else if (errno == ECONNRESET || errno == EPIPE) { } else if (errno == ECONNRESET || errno == EPIPE) {
return false; return READ_FAILED;
} else { } else {
PLOG(ERROR) << "pipe error (" << pipe_ << ")"; PLOG(ERROR) << "pipe error (" << pipe_ << ")";
return false; return READ_FAILED;
} }
} else if (bytes_read == 0) { } else if (*bytes_read == 0) {
// The pipe has closed... // The pipe has closed...
return false; return READ_FAILED;
} }
DCHECK(bytes_read); DCHECK(*bytes_read);
CloseClientFileDescriptor(); CloseClientFileDescriptor();
// Read any file descriptors from the message. // Read any file descriptors from the message.
if (!ExtractFileDescriptorsFromMsghdr(&msg)) if (!ExtractFileDescriptorsFromMsghdr(&msg))
return false; return READ_FAILED;
return READ_SUCCEEDED;
// Possibly combine with the overflow buffer to make a larger buffer.
if (input_overflow_buf_.empty()) {
*buffer_begin = input_buf_;
*buffer_end = *buffer_begin + bytes_read;
} else {
if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) {
input_overflow_buf_.clear();
LOG(ERROR) << "IPC message is too big";
return false;
}
input_overflow_buf_.append(input_buf_, bytes_read);
*buffer_begin = input_overflow_buf_.data();
*buffer_end = *buffer_begin + input_overflow_buf_.size();
}
return true;
} }
#if defined(IPC_USES_READWRITE) #if defined(IPC_USES_READWRITE)
...@@ -1028,7 +1035,7 @@ bool Channel::ChannelImpl::ReadFileDescriptorsFromFDPipe() { ...@@ -1028,7 +1035,7 @@ bool Channel::ChannelImpl::ReadFileDescriptorsFromFDPipe() {
} }
#endif #endif
bool Channel::ChannelImpl::PopulateMessageFileDescriptors(Message* msg) { bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
uint16 header_fds = msg->header()->num_fds; uint16 header_fds = msg->header()->num_fds;
if (!header_fds) if (!header_fds)
return true; // Nothing to do. return true; // Nothing to do.
...@@ -1070,6 +1077,13 @@ bool Channel::ChannelImpl::PopulateMessageFileDescriptors(Message* msg) { ...@@ -1070,6 +1077,13 @@ bool Channel::ChannelImpl::PopulateMessageFileDescriptors(Message* msg) {
return true; return true;
} }
bool Channel::ChannelImpl::DidEmptyInputBuffers() {
// When the input data buffer is empty, the fds should be too. If this is
// not the case, we probably have a rogue renderer which is trying to fill
// our descriptor table.
return input_fds_.empty();
}
bool Channel::ChannelImpl::ExtractFileDescriptorsFromMsghdr(msghdr* msg) { bool Channel::ChannelImpl::ExtractFileDescriptorsFromMsghdr(msghdr* msg) {
// Check that there are any control messages. On OSX, CMSG_FIRSTHDR will // Check that there are any control messages. On OSX, CMSG_FIRSTHDR will
// return an invalid non-NULL pointer in the case that controllen == 0. // return an invalid non-NULL pointer in the case that controllen == 0.
......
...@@ -70,6 +70,8 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher { ...@@ -70,6 +70,8 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
#endif // OS_LINUX #endif // OS_LINUX
private: private:
enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING };
bool CreatePipe(const IPC::ChannelHandle& channel_handle); bool CreatePipe(const IPC::ChannelHandle& channel_handle);
bool ProcessIncomingMessages(); bool ProcessIncomingMessages();
...@@ -81,18 +83,25 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher { ...@@ -81,18 +83,25 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
void QueueHelloMessage(); void QueueHelloMessage();
bool IsHelloMessage(const Message* m) const; bool IsHelloMessage(const Message* m) const;
// Reads data from the "regular" (non FD) pipe into the input buffers. The // Populates the given buffer with data from the pipe.
// two output params will identify the data received. //
// Returns the state of the read. On READ_SUCCESS, the number of bytes
// read will be placed into |*bytes_read| (which can be less than the
// buffer size). On READ_FAILED, the channel will be closed.
// //
// On success, returns true. If there is no data waiting, the pointers will // If the return value is READ_PENDING, it means that there was no data
// both be set to NULL. Otherwise, they'll indicate the data read. This will // ready for reading. The implementation is then responsible for either
// be inside the input_buf_ for short messages, and for long messages will // calling AsyncReadComplete with the number of bytes read into the
// automatically spill into the input_overflow_buf_. When in non-READWRITE // buffer, or ProcessIncomingMessages to try the read again (depending
// mode this will also load any handles from the message into input_fds_. // on whether the platform's async I/O is "try again" or "write
// asynchronously into your buffer").
ReadState ReadData(char* buffer, int buffer_len, int* bytes_read);
// Takes the given data received from the IPC channel and dispatches any
// fully completed messages.
// //
// On failure, returns false. This means there was some kind of pipe error // Returns true on success. False means channel error.
// and we should not continue. bool DispatchInputData(const char* input_data, int input_data_len);
bool ReadDataFromPipe(const char** begin, const char** end);
#if defined(IPC_USES_READWRITE) #if defined(IPC_USES_READWRITE)
// Reads the next message from the fd_pipe_ and appends them to the // Reads the next message from the fd_pipe_ and appends them to the
...@@ -107,7 +116,11 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher { ...@@ -107,7 +116,11 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
// //
// This will read from the input_fds_ and read more handles from the FD // This will read from the input_fds_ and read more handles from the FD
// pipe if necessary. // pipe if necessary.
bool PopulateMessageFileDescriptors(Message* msg); bool WillDispatchInputMessage(Message* msg);
// Performs post-dispatch checks. Called when all input buffers are empty,
// though there could be more data ready to be read from the OS.
bool DidEmptyInputBuffers();
// Finds the set of file descriptors in the given message. On success, // Finds the set of file descriptors in the given message. On success,
// appends the descriptors to the input_fds_ member and returns true // appends the descriptors to the input_fds_ member and returns true
...@@ -170,7 +183,7 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher { ...@@ -170,7 +183,7 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
// Messages to be sent are queued here. // Messages to be sent are queued here.
std::queue<Message*> output_queue_; std::queue<Message*> output_queue_;
// We read from the pipe into this buffer. Managed by ReadDataFromPipe, do // We read from the pipe into this buffer. Managed by DispatchInputData, do
// not access directly outside that function. // not access directly outside that function.
char input_buf_[Channel::kReadBufferSize]; char input_buf_[Channel::kReadBufferSize];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment