Commit 8b00bf4e authored by Josh Nohle's avatar Josh Nohle Committed by Commit Bot

[Nearby] Unblock BT socket input/output stream read/write after close

The Bluetooth socket implements its own input/output streams. During
Read()/Write(), we wait until a base::WaitableEvent is notified that
the read/write finished. Previously, if Close() is called while the
read/write is waiting, the base::WaitableEvent will never be notified.
In this CL, we notify the base::WaitableEvent after closing the stream.

In practice, this CL fixes the blocking Read() on the receiving
Chromebook after the first successful file share. The Read() was waiting
for more bytes to appear in the receive stream, which would trigger
ReceiveMore(), but those bytes never came because the transfer already
completed. This blocking Read() prevented receiving again after the
first successful file share. See the following debugging logs from the
receiving Chromebook at the end of a successful transfer; the fix is
explicitly noted in the logs:
https://paste.googleplex.com/4940385051738112.

Fixed: 1137673
Change-Id: I0e15aeb5ece9609faea7549b8617a404cb2c145c
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2467336
Commit-Queue: James Vecore <vecore@google.com>
Reviewed-by: default avatarJames Vecore <vecore@google.com>
Auto-Submit: Josh Nohle <nohle@chromium.org>
Cr-Commit-Position: refs/heads/master@{#817056}
parent 9a3e43f4
...@@ -53,16 +53,21 @@ class InputStreamImpl : public InputStream { ...@@ -53,16 +53,21 @@ class InputStreamImpl : public InputStream {
pending_read_buffer_ = std::make_unique<ByteArray>(size); pending_read_buffer_ = std::make_unique<ByteArray>(size);
pending_read_buffer_pos_ = 0; pending_read_buffer_pos_ = 0;
task_run_.emplace(); read_waitable_event_.emplace();
task_runner_->PostTask( task_runner_->PostTask(
FROM_HERE, base::BindOnce(&mojo::SimpleWatcher::ArmOrNotify, FROM_HERE, base::BindOnce(&mojo::SimpleWatcher::ArmOrNotify,
base::Unretained(&receive_stream_watcher_))); base::Unretained(&receive_stream_watcher_)));
task_run_->Wait(); read_waitable_event_->Wait();
task_run_.reset(); read_waitable_event_.reset();
pending_read_buffer_.reset(); pending_read_buffer_.reset();
pending_read_buffer_pos_ = 0; pending_read_buffer_pos_ = 0;
// |receive_stream_| might have been reset in Close() while
// |read_waitable_event_| was waiting.
if (!receive_stream_)
return {Exception::kIo};
return exception_or_received_byte_array_; return exception_or_received_byte_array_;
} }
Exception Close() override { Exception Close() override {
...@@ -84,6 +89,15 @@ class InputStreamImpl : public InputStream { ...@@ -84,6 +89,15 @@ class InputStreamImpl : public InputStream {
receive_stream_.reset(); receive_stream_.reset();
// It is possible that a Read() call could still be blocking a different
// sequence via |read_waitable_event_| when Close() is called. Notably, this
// happens on the receiving device when a Nearby Share transfer finishes. If
// we only cancel the stream watcher, the Read() call will block forever. We
// trigger the event manually here, which will cause an IO exception to be
// returned from Read().
if (read_waitable_event_)
read_waitable_event_->Signal();
return {Exception::kSuccess}; return {Exception::kSuccess};
} }
...@@ -94,12 +108,12 @@ class InputStreamImpl : public InputStream { ...@@ -94,12 +108,12 @@ class InputStreamImpl : public InputStream {
DCHECK(receive_stream_.is_valid()); DCHECK(receive_stream_.is_valid());
DCHECK(pending_read_buffer_); DCHECK(pending_read_buffer_);
DCHECK_LT(pending_read_buffer_pos_, pending_read_buffer_->size()); DCHECK_LT(pending_read_buffer_pos_, pending_read_buffer_->size());
DCHECK(task_run_); DCHECK(read_waitable_event_);
if (state.peer_closed()) { if (state.peer_closed()) {
exception_or_received_byte_array_ = exception_or_received_byte_array_ =
ExceptionOr<ByteArray>(Exception::kIo); ExceptionOr<ByteArray>(Exception::kIo);
task_run_->Signal(); read_waitable_event_->Signal();
return; return;
} }
...@@ -126,7 +140,7 @@ class InputStreamImpl : public InputStream { ...@@ -126,7 +140,7 @@ class InputStreamImpl : public InputStream {
exception_or_received_byte_array_ = exception_or_received_byte_array_ =
ExceptionOr<ByteArray>(Exception::kIo); ExceptionOr<ByteArray>(Exception::kIo);
} }
task_run_->Signal(); read_waitable_event_->Signal();
} }
scoped_refptr<base::SequencedTaskRunner> task_runner_; scoped_refptr<base::SequencedTaskRunner> task_runner_;
...@@ -136,7 +150,7 @@ class InputStreamImpl : public InputStream { ...@@ -136,7 +150,7 @@ class InputStreamImpl : public InputStream {
std::unique_ptr<ByteArray> pending_read_buffer_; std::unique_ptr<ByteArray> pending_read_buffer_;
uint32_t pending_read_buffer_pos_ = 0; uint32_t pending_read_buffer_pos_ = 0;
ExceptionOr<ByteArray> exception_or_received_byte_array_; ExceptionOr<ByteArray> exception_or_received_byte_array_;
base::Optional<base::WaitableEvent> task_run_; base::Optional<base::WaitableEvent> read_waitable_event_;
}; };
// Concrete OutputStream implementation, tightly coupled to BluetoothSocket. // Concrete OutputStream implementation, tightly coupled to BluetoothSocket.
...@@ -172,18 +186,23 @@ class OutputStreamImpl : public OutputStream { ...@@ -172,18 +186,23 @@ class OutputStreamImpl : public OutputStream {
pending_write_buffer_ = std::make_unique<ByteArray>(data); pending_write_buffer_ = std::make_unique<ByteArray>(data);
pending_write_buffer_pos_ = 0; pending_write_buffer_pos_ = 0;
task_run_.emplace(); write_waitable_event_.emplace();
task_runner_->PostTask( task_runner_->PostTask(
FROM_HERE, base::BindOnce(&mojo::SimpleWatcher::ArmOrNotify, FROM_HERE, base::BindOnce(&mojo::SimpleWatcher::ArmOrNotify,
base::Unretained(&send_stream_watcher_))); base::Unretained(&send_stream_watcher_)));
task_run_->Wait(); write_waitable_event_->Wait();
Exception result = {write_success_ ? Exception::kSuccess : Exception::kIo}; Exception result = {write_success_ ? Exception::kSuccess : Exception::kIo};
write_success_ = false; write_success_ = false;
pending_write_buffer_.reset(); pending_write_buffer_.reset();
pending_write_buffer_pos_ = 0; pending_write_buffer_pos_ = 0;
task_run_.reset(); write_waitable_event_.reset();
// |send_stream_| might have been reset in Close() while
// |write_waitable_event_| was waiting.
if (!send_stream_)
return {Exception::kIo};
return result; return result;
} }
...@@ -211,6 +230,14 @@ class OutputStreamImpl : public OutputStream { ...@@ -211,6 +230,14 @@ class OutputStreamImpl : public OutputStream {
send_stream_.reset(); send_stream_.reset();
// It is possible that a Write() call could still be blocking a different
// sequence via |write_waitable_event_| when Close() is called. If we only
// cancel the stream watcher, the Write() call will block forever. We
// trigger the event manually here, which will cause an IO exception to be
// returned from Write().
if (write_waitable_event_)
write_waitable_event_->Signal();
return {Exception::kSuccess}; return {Exception::kSuccess};
} }
...@@ -221,11 +248,11 @@ class OutputStreamImpl : public OutputStream { ...@@ -221,11 +248,11 @@ class OutputStreamImpl : public OutputStream {
DCHECK(send_stream_.is_valid()); DCHECK(send_stream_.is_valid());
DCHECK(pending_write_buffer_); DCHECK(pending_write_buffer_);
DCHECK_LT(pending_write_buffer_pos_, pending_write_buffer_->size()); DCHECK_LT(pending_write_buffer_pos_, pending_write_buffer_->size());
DCHECK(task_run_); DCHECK(write_waitable_event_);
if (state.peer_closed()) { if (state.peer_closed()) {
write_success_ = false; write_success_ = false;
task_run_->Signal(); write_waitable_event_->Signal();
return; return;
} }
...@@ -246,7 +273,7 @@ class OutputStreamImpl : public OutputStream { ...@@ -246,7 +273,7 @@ class OutputStreamImpl : public OutputStream {
} }
write_success_ = result == MOJO_RESULT_OK; write_success_ = result == MOJO_RESULT_OK;
task_run_->Signal(); write_waitable_event_->Signal();
} }
scoped_refptr<base::SequencedTaskRunner> task_runner_; scoped_refptr<base::SequencedTaskRunner> task_runner_;
...@@ -256,7 +283,7 @@ class OutputStreamImpl : public OutputStream { ...@@ -256,7 +283,7 @@ class OutputStreamImpl : public OutputStream {
std::unique_ptr<ByteArray> pending_write_buffer_; std::unique_ptr<ByteArray> pending_write_buffer_;
uint32_t pending_write_buffer_pos_ = 0; uint32_t pending_write_buffer_pos_ = 0;
bool write_success_ = false; bool write_success_ = false;
base::Optional<base::WaitableEvent> task_run_; base::Optional<base::WaitableEvent> write_waitable_event_;
}; };
} // namespace } // namespace
......
...@@ -219,7 +219,7 @@ TEST_F(BluetoothSocketTest, TestInputStream_MultipleChunks) { ...@@ -219,7 +219,7 @@ TEST_F(BluetoothSocketTest, TestInputStream_MultipleChunks) {
uint32_t message_size = 1024 * 1024; uint32_t message_size = 1024 * 1024;
std::string message(message_size, 'A'); std::string message(message_size, 'A');
// Post to a thead pool because both InputStream::Read() and // Post to a thread pool because both InputStream::Read() and
// WriteDataBlocking() below are blocking on each other. // WriteDataBlocking() below are blocking on each other.
base::RunLoop run_loop; base::RunLoop run_loop;
base::ThreadPool::CreateSequencedTaskRunner({})->PostTaskAndReply( base::ThreadPool::CreateSequencedTaskRunner({})->PostTaskAndReply(
...@@ -237,6 +237,46 @@ TEST_F(BluetoothSocketTest, TestInputStream_MultipleChunks) { ...@@ -237,6 +237,46 @@ TEST_F(BluetoothSocketTest, TestInputStream_MultipleChunks) {
run_loop.Run(); run_loop.Run();
} }
TEST_F(BluetoothSocketTest, TestInputStream_CloseBeforeRead) {
InputStream& input_stream = bluetooth_socket_->GetInputStream();
EXPECT_EQ(Exception::kSuccess, input_stream.Close().value);
EXPECT_EQ(Exception::kIo, input_stream.Read(1u).exception());
}
TEST_F(BluetoothSocketTest, TestInputStream_CloseWhileReading) {
InputStream& input_stream = bluetooth_socket_->GetInputStream();
base::RunLoop run_loop;
// Start waiting for 1 byte to be read from the |receive_stream_|. Note: We
// run on a separate thread because Read() is blocking.
ExceptionOr<ByteArray> read_exception_or_byte_array;
base::ThreadPool::CreateSequencedTaskRunner({})->PostTaskAndReply(
FROM_HERE,
base::BindLambdaForTesting(
[&input_stream, &read_exception_or_byte_array] {
base::ScopedAllowBaseSyncPrimitivesForTesting allow;
read_exception_or_byte_array = input_stream.Read(1u);
}),
run_loop.QuitClosure());
// While Read() is waiting, close the stream. Note: We delay closing the
// stream by 100 ms to ensure that Read() is in fact waiting when Close() is
// posted. Because Read() is blocking, I think this is the best we can do.
// Even if Close() somehow completes before Read(), an IO exception should
// still be thrown.
base::ThreadPool::CreateSequencedTaskRunner({})->PostDelayedTask(
FROM_HERE, base::BindLambdaForTesting([&input_stream] {
base::ScopedAllowBaseSyncPrimitivesForTesting allow;
EXPECT_EQ(Exception::kSuccess, input_stream.Close().value);
}),
base::TimeDelta::FromMilliseconds(100));
run_loop.Run();
EXPECT_EQ(Exception::kIo, read_exception_or_byte_array.exception());
}
TEST_F(BluetoothSocketTest, TestOutputStream) { TEST_F(BluetoothSocketTest, TestOutputStream) {
OutputStream& output_stream = bluetooth_socket_->GetOutputStream(); OutputStream& output_stream = bluetooth_socket_->GetOutputStream();
...@@ -266,7 +306,7 @@ TEST_F(BluetoothSocketTest, TestOutputStream_MultipleChunks) { ...@@ -266,7 +306,7 @@ TEST_F(BluetoothSocketTest, TestOutputStream_MultipleChunks) {
uint32_t message_size = 1024 * 1024; uint32_t message_size = 1024 * 1024;
std::string message(message_size, 'A'); std::string message(message_size, 'A');
// Post to a thead pool because both InputStream::Write() and // Post to a thread pool because both InputStream::Write() and
// ReadDataBlocking() below are blocking on each other. // ReadDataBlocking() below are blocking on each other.
base::RunLoop run_loop; base::RunLoop run_loop;
base::ThreadPool::CreateSequencedTaskRunner({})->PostTaskAndReply( base::ThreadPool::CreateSequencedTaskRunner({})->PostTaskAndReply(
...@@ -282,6 +322,50 @@ TEST_F(BluetoothSocketTest, TestOutputStream_MultipleChunks) { ...@@ -282,6 +322,50 @@ TEST_F(BluetoothSocketTest, TestOutputStream_MultipleChunks) {
run_loop.Run(); run_loop.Run();
} }
TEST_F(BluetoothSocketTest, TestOutputStream_CloseBeforeWrite) {
OutputStream& output_stream = bluetooth_socket_->GetOutputStream();
EXPECT_EQ(Exception::kSuccess, output_stream.Close().value);
EXPECT_EQ(Exception::kIo, output_stream.Write(ByteArray("message")).value);
}
TEST_F(BluetoothSocketTest, TestOutputStream_CloseWhileWriting) {
OutputStream& output_stream = bluetooth_socket_->GetOutputStream();
base::RunLoop run_loop;
// Start waiting for the bytes to be written from the |send_stream_|. Note: We
// run on a separate thread because Write() is blocking.
Exception write_exception;
base::ThreadPool::CreateSequencedTaskRunner({})->PostTaskAndReply(
FROM_HERE, base::BindLambdaForTesting([&output_stream, &write_exception] {
base::ScopedAllowBaseSyncPrimitivesForTesting allow;
// Expect a total message size of 1MB delivered in chunks because a mojo
// pipe has a maximum buffer size and only accepts a certain amount of
// data per call. The default is 64KB defined in //mojo/core/core.cc. We
// want a large message so the Write() will be forced to wait.
uint32_t message_size = 1024 * 1024;
std::string message(message_size, 'A');
write_exception = output_stream.Write(ByteArray(message));
}),
run_loop.QuitClosure());
// While Write() is waiting, close the stream. Note: We delay closing the
// stream by 100 ms to ensure that Write() is in fact waiting when Close() is
// posted. Because Write() is blocking, I think this is the best we can do.
// Even if Close() somehow completes before Write(), an IO exception should
// still be thrown.
base::ThreadPool::CreateSequencedTaskRunner({})->PostDelayedTask(
FROM_HERE, base::BindLambdaForTesting([&output_stream] {
base::ScopedAllowBaseSyncPrimitivesForTesting allow;
EXPECT_EQ(Exception::kSuccess, output_stream.Close().value);
}),
base::TimeDelta::FromMilliseconds(100));
run_loop.Run();
EXPECT_EQ(Exception::kIo, write_exception.value);
}
TEST_F(BluetoothSocketTest, TestInputStreamResetHandler) { TEST_F(BluetoothSocketTest, TestInputStreamResetHandler) {
InputStream& input_stream = bluetooth_socket_->GetInputStream(); InputStream& input_stream = bluetooth_socket_->GetInputStream();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment