Commit cd34266e authored by johnmccutchan's avatar johnmccutchan Committed by Commit bot

Close data pipes when TCP network connection goes down.

- Close data pipes when TCP network connection goes down.

Review URL: https://codereview.chromium.org/1015733002

Cr-Commit-Position: refs/heads/master@{#321456}
parent 7fd718c4
...@@ -36,7 +36,8 @@ void TCPConnectedSocketImpl::ReceiveMore() { ...@@ -36,7 +36,8 @@ void TCPConnectedSocketImpl::ReceiveMore() {
// The pipe is full. We need to wait for it to have more space. // The pipe is full. We need to wait for it to have more space.
receive_handle_watcher_.Start( receive_handle_watcher_.Start(
receive_stream_.get(), receive_stream_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_DEADLINE_INDEFINITE,
base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady,
weak_ptr_factory_.GetWeakPtr())); weak_ptr_factory_.GetWeakPtr()));
return; return;
...@@ -46,12 +47,17 @@ void TCPConnectedSocketImpl::ReceiveMore() { ...@@ -46,12 +47,17 @@ void TCPConnectedSocketImpl::ReceiveMore() {
// It's valid that the user of this class consumed the data they care about // It's valid that the user of this class consumed the data they care about
// and closed their data pipe handles after writing data. This class should // and closed their data pipe handles after writing data. This class should
// still write out all the data. // still write out all the data.
ShutdownReceive();
// TODO(johnmccutchan): Notify socket direction is closed along with
// net_result and mojo_result.
return; return;
} }
if (result != MOJO_RESULT_OK) { if (result != MOJO_RESULT_OK) {
// The receive stream is in a bad state. // The receive stream is in a bad state.
// TODO(darin): How should this be communicated to our client? ShutdownReceive();
// TODO(johnmccutchan): Notify socket direction is closed along with
// net_result and mojo_result.
return; return;
} }
...@@ -65,17 +71,25 @@ void TCPConnectedSocketImpl::ReceiveMore() { ...@@ -65,17 +71,25 @@ void TCPConnectedSocketImpl::ReceiveMore() {
false)); false));
if (read_result == net::ERR_IO_PENDING) { if (read_result == net::ERR_IO_PENDING) {
// Pending I/O, wait for result in DidReceive(). // Pending I/O, wait for result in DidReceive().
} else if (read_result >= 0) { } else if (read_result > 0) {
// Synchronous data ready. // Synchronous data ready.
DidReceive(true, read_result); DidReceive(true, read_result);
} else { } else {
// Some kind of error. // read_result == 0 indicates EOF.
// TODO(brettw) notify caller of error. // read_result < 0 indicates error.
ShutdownReceive();
// TODO(johnmccutchan): Notify socket direction is closed along with
// net_result and mojo_result.
} }
} }
void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) {
// TODO(darin): Handle a bad |result| value. if (result != MOJO_RESULT_OK) {
ShutdownReceive();
// TODO(johnmccutchan): Notify socket direction is closed along with
// net_result and mojo_result.
return;
}
ReceiveMore(); ReceiveMore();
} }
...@@ -83,13 +97,14 @@ void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, ...@@ -83,13 +97,14 @@ void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
int result) { int result) {
if (result < 0) { if (result < 0) {
// Error. // Error.
pending_receive_ = NULL; // Closes the pipe (owned by the pending write). ShutdownReceive();
// TODO(brettw) notify the caller of an error? // TODO(johnmccutchan): Notify socket direction is closed along with
// net_result and mojo_result.
return; return;
} }
receive_stream_ = pending_receive_->Complete(result); receive_stream_ = pending_receive_->Complete(result);
pending_receive_ = NULL; pending_receive_ = nullptr;
// Schedule more reading. // Schedule more reading.
if (completed_synchronously) { if (completed_synchronously) {
...@@ -103,6 +118,11 @@ void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, ...@@ -103,6 +118,11 @@ void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
} }
} }
void TCPConnectedSocketImpl::ShutdownReceive() {
pending_receive_ = nullptr;
receive_stream_.reset();
}
void TCPConnectedSocketImpl::SendMore() { void TCPConnectedSocketImpl::SendMore() {
uint32_t num_bytes = 0; uint32_t num_bytes = 0;
MojoResult result = MojoToNetPendingBuffer::BeginRead( MojoResult result = MojoToNetPendingBuffer::BeginRead(
...@@ -111,12 +131,15 @@ void TCPConnectedSocketImpl::SendMore() { ...@@ -111,12 +131,15 @@ void TCPConnectedSocketImpl::SendMore() {
// Data not ready, wait for it. // Data not ready, wait for it.
send_handle_watcher_.Start( send_handle_watcher_.Start(
send_stream_.get(), send_stream_.get(),
MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_DEADLINE_INDEFINITE,
base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady,
weak_ptr_factory_.GetWeakPtr())); weak_ptr_factory_.GetWeakPtr()));
return; return;
} else if (result != MOJO_RESULT_OK) { } else if (result != MOJO_RESULT_OK) {
// TODO(brettw) notify caller of error. ShutdownSend();
// TODO(johnmccutchan): Notify socket direction is closed along with
// net_result and mojo_result.
return; return;
} }
...@@ -132,25 +155,36 @@ void TCPConnectedSocketImpl::SendMore() { ...@@ -132,25 +155,36 @@ void TCPConnectedSocketImpl::SendMore() {
} else if (write_result >= 0) { } else if (write_result >= 0) {
// Synchronous data consumed. // Synchronous data consumed.
DidSend(true, write_result); DidSend(true, write_result);
} else {
// write_result < 0 indicates error.
ShutdownSend();
// TODO(johnmccutchan): Notify socket direction is closed along with
// net_result and mojo_result.
} }
} }
void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) {
// TODO(brettw): Handle a bad |result| value. if (result != MOJO_RESULT_OK) {
ShutdownSend();
// TODO(johnmccutchan): Notify socket direction is closed along with
// net_result and mojo_result.
return;
}
SendMore(); SendMore();
} }
void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, void TCPConnectedSocketImpl::DidSend(bool completed_synchronously,
int result) { int result) {
if (result < 0) { if (result < 0) {
// TODO(brettw) report error. ShutdownSend();
pending_send_ = NULL; // TODO(johnmccutchan): Notify socket direction is closed along with
// net_result and mojo_result.
return; return;
} }
// Take back ownership of the stream and free the IOBuffer. // Take back ownership of the stream and free the IOBuffer.
send_stream_ = pending_send_->Complete(result); send_stream_ = pending_send_->Complete(result);
pending_send_ = NULL; pending_send_ = nullptr;
// Schedule more writing. // Schedule more writing.
if (completed_synchronously) { if (completed_synchronously) {
...@@ -164,4 +198,9 @@ void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, ...@@ -164,4 +198,9 @@ void TCPConnectedSocketImpl::DidSend(bool completed_synchronously,
} }
} }
void TCPConnectedSocketImpl::ShutdownSend() {
pending_send_ = nullptr;
send_stream_.reset();
}
} // namespace mojo } // namespace mojo
...@@ -30,12 +30,14 @@ class TCPConnectedSocketImpl : public InterfaceImpl<TCPConnectedSocket> { ...@@ -30,12 +30,14 @@ class TCPConnectedSocketImpl : public InterfaceImpl<TCPConnectedSocket> {
void ReceiveMore(); void ReceiveMore();
void OnReceiveStreamReady(MojoResult result); void OnReceiveStreamReady(MojoResult result);
void DidReceive(bool completed_synchronously, int result); void DidReceive(bool completed_synchronously, int result);
void ShutdownReceive();
// "Writing" is reading from the Mojo send_stream and writing to the // "Writing" is reading from the Mojo send_stream and writing to the
// TCPSocket. // TCPSocket.
void SendMore(); void SendMore();
void OnSendStreamReady(MojoResult result); void OnSendStreamReady(MojoResult result);
void DidSend(bool completed_asynchronously, int result); void DidSend(bool completed_asynchronously, int result);
void ShutdownSend();
scoped_ptr<net::TCPSocket> socket_; scoped_ptr<net::TCPSocket> socket_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment