Commit 567da340 authored by Yutaka Hirano's avatar Yutaka Hirano Committed by Commit Bot

[QuicTransport] Introduce mojo messages for stream closure

Introduce the following messages:
 - QuicTransport.SendFin
 - QuicTransportClient.OnIncomingStreamClosed

Bug: 1011392
Change-Id: If15ffb5bc7e619489cfc6e748300247a56ec888b
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1993075Reviewed-by: default avatarVictor Vasiliev <vasilvv@chromium.org>
Reviewed-by: default avatarAdam Rice <ricea@chromium.org>
Reviewed-by: default avatarKinuko Yasuda <kinuko@chromium.org>
Commit-Queue: Yutaka Hirano <yhirano@chromium.org>
Cr-Commit-Position: refs/heads/master@{#732220}
parent 4e23e8e8
...@@ -21,18 +21,29 @@ interface QuicTransport { ...@@ -21,18 +21,29 @@ interface QuicTransport {
handle<data_pipe_producer>? writable) => handle<data_pipe_producer>? writable) =>
(bool succeeded, uint32 stream_id); (bool succeeded, uint32 stream_id);
// Accept a bidirectional stream created by the server. // Accepts a bidirectional stream created by the server.
AcceptBidirectionalStream() => (uint32 stream_id, AcceptBidirectionalStream() => (uint32 stream_id,
handle<data_pipe_consumer> readable, handle<data_pipe_consumer> readable,
handle<data_pipe_producer> writable); handle<data_pipe_producer> writable);
// Accept a unidirectional stream created by the server. // Accepts a unidirectional stream created by the server.
AcceptUnidirectionalStream() => (uint32 stream_id, AcceptUnidirectionalStream() => (uint32 stream_id,
handle<data_pipe_consumer> readable); handle<data_pipe_consumer> readable);
// Expresses that the client will not write data to the stream for
// |stream_id|. After calling this function on a stream, the client will not
// be able to write any data to the stream, but it may be able to use other
// functions such as reading data from the stream.
SendFin(uint32 stream_id);
}; };
// A mojo interface for the client of QuicTransport. // A mojo interface for the client of QuicTransport.
interface QuicTransportClient { interface QuicTransportClient {
// Notifies that the server will not write data to the Stream for |stream_id|.
// |fin_received| is true when FIN is received from the server.
// Note that OnIncomingStreamClosed and OnOutgoingStreamClosed can both be
// dispatched to the same stream, if it is a bidirectional stream.
OnIncomingStreamClosed(uint32 stream_id, bool fin_received);
}; };
// Used to create a QuicTransport connection. This is split from QuicTransport // Used to create a QuicTransport connection. This is split from QuicTransport
......
...@@ -27,8 +27,20 @@ class QuicTransport::Stream final { ...@@ -27,8 +27,20 @@ class QuicTransport::Stream final {
: stream_(stream->weak_factory_.GetWeakPtr()) {} : stream_(stream->weak_factory_.GetWeakPtr()) {}
~StreamVisitor() override { ~StreamVisitor() override {
if (stream_) { if (stream_) {
stream_->incoming_ = nullptr; if (stream_->incoming_) {
stream_->outgoing_ = nullptr; stream_->writable_watcher_.Cancel();
stream_->writable_.reset();
stream_->transport_->client_->OnIncomingStreamClosed(
stream_->id_,
/*fin_received=*/false);
stream_->incoming_ = nullptr;
}
if (stream_->outgoing_) {
stream_->readable_watcher_.Cancel();
stream_->readable_.reset();
stream_->outgoing_ = nullptr;
}
stream_->MayDisposeLater();
} }
} }
...@@ -101,6 +113,11 @@ class QuicTransport::Stream final { ...@@ -101,6 +113,11 @@ class QuicTransport::Stream final {
Init(); Init();
} }
void NotifyFinFromClient() {
has_received_fin_from_client_ = true;
MaySendFin();
}
~Stream() { transport_->transport_->session()->CloseStream(id_); } ~Stream() { transport_->transport_->session()->CloseStream(id_); }
private: private:
...@@ -135,6 +152,7 @@ class QuicTransport::Stream final { ...@@ -135,6 +152,7 @@ class QuicTransport::Stream final {
} }
void Send() { void Send() {
MaySendFin();
while (outgoing_ && outgoing_->CanWrite()) { while (outgoing_ && outgoing_->CanWrite()) {
const void* data = nullptr; const void* data = nullptr;
uint32_t available = 0; uint32_t available = 0;
...@@ -144,15 +162,8 @@ class QuicTransport::Stream final { ...@@ -144,15 +162,8 @@ class QuicTransport::Stream final {
return; return;
} }
if (result == MOJO_RESULT_FAILED_PRECONDITION) { if (result == MOJO_RESULT_FAILED_PRECONDITION) {
const bool result = outgoing_->SendFin(); has_seen_end_of_pipe_for_readable_ = true;
// |SendFin| must succeed when CanWrite() returns true. MaySendFin();
DCHECK(result);
outgoing_ = nullptr;
readable_watcher_.Cancel();
readable_.reset();
// We need an explicit signal to close the stream.
// TODO(yhirano): Add CloseStream mojo message.
MayDisposeLater();
return; return;
} }
DCHECK_EQ(result, MOJO_RESULT_OK); DCHECK_EQ(result, MOJO_RESULT_OK);
...@@ -173,6 +184,22 @@ class QuicTransport::Stream final { ...@@ -173,6 +184,22 @@ class QuicTransport::Stream final {
Receive(); Receive();
} }
void MaySendFin() {
if (!outgoing_) {
return;
}
if (!has_seen_end_of_pipe_for_readable_ || !has_received_fin_from_client_) {
return;
}
if (outgoing_->SendFin()) {
outgoing_ = nullptr;
readable_watcher_.Cancel();
readable_.reset();
MayDisposeLater();
}
// Otherwise, retry in Send().
}
void Receive() { void Receive() {
while (incoming_ && incoming_->ReadableBytes() > 0) { while (incoming_ && incoming_->ReadableBytes() > 0) {
void* buffer = nullptr; void* buffer = nullptr;
...@@ -184,8 +211,11 @@ class QuicTransport::Stream final { ...@@ -184,8 +211,11 @@ class QuicTransport::Stream final {
return; return;
} }
if (result == MOJO_RESULT_FAILED_PRECONDITION) { if (result == MOJO_RESULT_FAILED_PRECONDITION) {
// We need an explicit signal to close the stream. // The client doesn't want further data.
// TODO(yhirano): Add CloseStream mojo message. writable_watcher_.Cancel();
writable_.reset();
incoming_ = nullptr;
MayDisposeLater();
return; return;
} }
DCHECK_EQ(result, MOJO_RESULT_OK); DCHECK_EQ(result, MOJO_RESULT_OK);
...@@ -206,6 +236,7 @@ class QuicTransport::Stream final { ...@@ -206,6 +236,7 @@ class QuicTransport::Stream final {
void OnFinRead() { void OnFinRead() {
incoming_ = nullptr; incoming_ = nullptr;
transport_->client_->OnIncomingStreamClosed(id_, /*fin_received=*/true);
if (in_two_phase_write_) { if (in_two_phase_write_) {
return; return;
} }
...@@ -242,6 +273,8 @@ class QuicTransport::Stream final { ...@@ -242,6 +273,8 @@ class QuicTransport::Stream final {
mojo::SimpleWatcher writable_watcher_; mojo::SimpleWatcher writable_watcher_;
bool in_two_phase_write_ = false; bool in_two_phase_write_ = false;
bool has_seen_end_of_pipe_for_readable_ = false;
bool has_received_fin_from_client_ = false;
// This must be the last member. // This must be the last member.
base::WeakPtrFactory<Stream> weak_factory_{this}; base::WeakPtrFactory<Stream> weak_factory_{this};
...@@ -349,6 +382,14 @@ void QuicTransport::AcceptUnidirectionalStream( ...@@ -349,6 +382,14 @@ void QuicTransport::AcceptUnidirectionalStream(
OnIncomingUnidirectionalStreamAvailable(); OnIncomingUnidirectionalStreamAvailable();
} }
void QuicTransport::SendFin(uint32_t stream) {
auto it = streams_.find(stream);
if (it == streams_.end()) {
return;
}
it->second->NotifyFinFromClient();
}
void QuicTransport::OnConnected() { void QuicTransport::OnConnected() {
if (torn_down_) { if (torn_down_) {
return; return;
......
...@@ -61,6 +61,7 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) QuicTransport final ...@@ -61,6 +61,7 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) QuicTransport final
BidirectionalStreamAcceptanceCallback callback) override; BidirectionalStreamAcceptanceCallback callback) override;
void AcceptUnidirectionalStream( void AcceptUnidirectionalStream(
UnidirectionalStreamAcceptanceCallback callback) override; UnidirectionalStreamAcceptanceCallback callback) override;
void SendFin(uint32_t stream_id) override;
// net::QuicTransportClient::Visitor implementation: // net::QuicTransportClient::Visitor implementation:
void OnConnected() override; void OnConnected() override;
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "services/network/quic_transport.h" #include "services/network/quic_transport.h"
#include <set>
#include <vector> #include <vector>
#include "base/test/bind_test_util.h" #include "base/test/bind_test_util.h"
...@@ -29,7 +30,10 @@ std::string Read(mojo::ScopedDataPipeConsumerHandle readable) { ...@@ -29,7 +30,10 @@ std::string Read(mojo::ScopedDataPipeConsumerHandle readable) {
MojoResult result = MojoResult result =
readable->ReadData(buffer, &size, MOJO_READ_DATA_FLAG_NONE); readable->ReadData(buffer, &size, MOJO_READ_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) { if (result == MOJO_RESULT_SHOULD_WAIT) {
base::RunLoop().RunUntilIdle(); base::RunLoop run_loop;
base::SequencedTaskRunnerHandle::Get()->PostTask(FROM_HERE,
run_loop.QuitClosure());
run_loop.Run();
continue; continue;
} }
if (result == MOJO_RESULT_FAILED_PRECONDITION) { if (result == MOJO_RESULT_FAILED_PRECONDITION) {
...@@ -105,17 +109,62 @@ class TestClient final : public mojom::QuicTransportClient { ...@@ -105,17 +109,62 @@ class TestClient final : public mojom::QuicTransportClient {
public: public:
explicit TestClient( explicit TestClient(
mojo::PendingReceiver<mojom::QuicTransportClient> pending_receiver) mojo::PendingReceiver<mojom::QuicTransportClient> pending_receiver)
: receiver_(this, std::move(pending_receiver)) {} : receiver_(this, std::move(pending_receiver)) {
receiver_.set_disconnect_handler(base::BindOnce(
&TestClient::OnMojoConnectionError, base::Unretained(this)));
}
// mojom::QuicTransportClient implementation.
void OnIncomingStreamClosed(uint32_t stream_id, bool fin_received) override {
closed_incoming_streams_.insert(std::make_pair(stream_id, fin_received));
if (quit_closure_for_incoming_stream_closure_) {
std::move(quit_closure_for_incoming_stream_closure_).Run();
}
}
void WaitUntilMojoConnectionError() { void WaitUntilMojoConnectionError() {
base::RunLoop run_loop; base::RunLoop run_loop;
receiver_.set_disconnect_handler(run_loop.QuitClosure()); quit_closure_for_mojo_connection_error_ = run_loop.QuitClosure();
run_loop.Run(); run_loop.Run();
} }
void WaitUntilIncomingStreamIsClosed(uint32_t stream_id) {
while (!stream_is_closed_as_incoming_stream(stream_id)) {
base::RunLoop run_loop;
quit_closure_for_incoming_stream_closure_ = run_loop.QuitClosure();
run_loop.Run();
}
}
bool has_received_fin_for(uint32_t stream_id) {
auto it = closed_incoming_streams_.find(stream_id);
return it != closed_incoming_streams_.end() && it->second;
}
bool stream_is_closed_as_incoming_stream(uint32_t stream_id) {
return closed_incoming_streams_.find(stream_id) !=
closed_incoming_streams_.end();
}
bool has_seen_mojo_connection_error() const {
return has_seen_mojo_connection_error_;
}
private: private:
void OnMojoConnectionError() {
has_seen_mojo_connection_error_ = true;
if (quit_closure_for_mojo_connection_error_) {
std::move(quit_closure_for_mojo_connection_error_).Run();
}
}
mojo::Receiver<mojom::QuicTransportClient> receiver_; mojo::Receiver<mojom::QuicTransportClient> receiver_;
base::OnceClosure quit_closure_for_mojo_connection_error_;
base::OnceClosure quit_closure_for_incoming_stream_closure_;
std::map<uint32_t, bool> closed_incoming_streams_;
bool has_seen_mojo_connection_error_ = false;
}; };
class QuicTransportTest : public testing::Test { class QuicTransportTest : public testing::Test {
...@@ -172,6 +221,13 @@ class QuicTransportTest : public testing::Test { ...@@ -172,6 +221,13 @@ class QuicTransportTest : public testing::Test {
const url::Origin& origin() const { return origin_; } const url::Origin& origin() const { return origin_; }
const NetworkContext& network_context() const { return network_context_; } const NetworkContext& network_context() const { return network_context_; }
void RunPendingTasks() {
base::RunLoop run_loop;
base::SequencedTaskRunnerHandle::Get()->PostTask(FROM_HERE,
run_loop.QuitClosure());
run_loop.Run();
}
private: private:
const url::Origin origin_; const url::Origin origin_;
base::test::TaskEnvironment task_environment_; base::test::TaskEnvironment task_environment_;
...@@ -300,6 +356,7 @@ TEST_F(QuicTransportTest, EchoOnUnidirectionalStreams) { ...@@ -300,6 +356,7 @@ TEST_F(QuicTransportTest, EchoOnUnidirectionalStreams) {
ASSERT_TRUE(test_handshake_client.has_seen_connection_establishment()); ASSERT_TRUE(test_handshake_client.has_seen_connection_establishment());
TestClient client(test_handshake_client.PassClientReceiver());
mojo::Remote<mojom::QuicTransport> transport_remote( mojo::Remote<mojom::QuicTransport> transport_remote(
test_handshake_client.PassTransport()); test_handshake_client.PassTransport());
...@@ -313,8 +370,6 @@ TEST_F(QuicTransportTest, EchoOnUnidirectionalStreams) { ...@@ -313,8 +370,6 @@ TEST_F(QuicTransportTest, EchoOnUnidirectionalStreams) {
uint32_t size = 5; uint32_t size = 5;
ASSERT_EQ(MOJO_RESULT_OK, writable_for_outgoing->WriteData( ASSERT_EQ(MOJO_RESULT_OK, writable_for_outgoing->WriteData(
"hello", &size, MOJO_WRITE_DATA_FLAG_NONE)); "hello", &size, MOJO_WRITE_DATA_FLAG_NONE));
// Signal the end-of-data.
writable_for_outgoing.reset();
base::RunLoop run_loop_for_stream_creation; base::RunLoop run_loop_for_stream_creation;
uint32_t stream_id; uint32_t stream_id;
...@@ -329,6 +384,10 @@ TEST_F(QuicTransportTest, EchoOnUnidirectionalStreams) { ...@@ -329,6 +384,10 @@ TEST_F(QuicTransportTest, EchoOnUnidirectionalStreams) {
run_loop_for_stream_creation.Run(); run_loop_for_stream_creation.Run();
ASSERT_TRUE(stream_created); ASSERT_TRUE(stream_created);
// Signal the end-of-data.
writable_for_outgoing.reset();
transport_remote->SendFin(stream_id);
mojo::ScopedDataPipeConsumerHandle readable_for_incoming; mojo::ScopedDataPipeConsumerHandle readable_for_incoming;
uint32_t incoming_stream_id = stream_id; uint32_t incoming_stream_id = stream_id;
base::RunLoop run_loop_for_incoming_stream; base::RunLoop run_loop_for_incoming_stream;
...@@ -345,6 +404,12 @@ TEST_F(QuicTransportTest, EchoOnUnidirectionalStreams) { ...@@ -345,6 +404,12 @@ TEST_F(QuicTransportTest, EchoOnUnidirectionalStreams) {
std::string echo_back = Read(std::move(readable_for_incoming)); std::string echo_back = Read(std::move(readable_for_incoming));
EXPECT_EQ("hello", echo_back); EXPECT_EQ("hello", echo_back);
client.WaitUntilIncomingStreamIsClosed(incoming_stream_id);
EXPECT_FALSE(client.has_received_fin_for(stream_id));
EXPECT_TRUE(client.has_received_fin_for(incoming_stream_id));
EXPECT_FALSE(client.has_seen_mojo_connection_error());
} }
TEST_F(QuicTransportTest, EchoOnBidirectionalStream) { TEST_F(QuicTransportTest, EchoOnBidirectionalStream) {
...@@ -362,6 +427,7 @@ TEST_F(QuicTransportTest, EchoOnBidirectionalStream) { ...@@ -362,6 +427,7 @@ TEST_F(QuicTransportTest, EchoOnBidirectionalStream) {
ASSERT_TRUE(test_handshake_client.has_seen_connection_establishment()); ASSERT_TRUE(test_handshake_client.has_seen_connection_establishment());
TestClient client(test_handshake_client.PassClientReceiver());
mojo::Remote<mojom::QuicTransport> transport_remote( mojo::Remote<mojom::QuicTransport> transport_remote(
test_handshake_client.PassTransport()); test_handshake_client.PassTransport());
...@@ -380,8 +446,6 @@ TEST_F(QuicTransportTest, EchoOnBidirectionalStream) { ...@@ -380,8 +446,6 @@ TEST_F(QuicTransportTest, EchoOnBidirectionalStream) {
uint32_t size = 5; uint32_t size = 5;
ASSERT_EQ(MOJO_RESULT_OK, writable_for_outgoing->WriteData( ASSERT_EQ(MOJO_RESULT_OK, writable_for_outgoing->WriteData(
"hello", &size, MOJO_WRITE_DATA_FLAG_NONE)); "hello", &size, MOJO_WRITE_DATA_FLAG_NONE));
// Signal the end-of-data.
writable_for_outgoing.reset();
base::RunLoop run_loop_for_stream_creation; base::RunLoop run_loop_for_stream_creation;
uint32_t stream_id; uint32_t stream_id;
...@@ -396,8 +460,17 @@ TEST_F(QuicTransportTest, EchoOnBidirectionalStream) { ...@@ -396,8 +460,17 @@ TEST_F(QuicTransportTest, EchoOnBidirectionalStream) {
run_loop_for_stream_creation.Run(); run_loop_for_stream_creation.Run();
ASSERT_TRUE(stream_created); ASSERT_TRUE(stream_created);
// Signal the end-of-data.
writable_for_outgoing.reset();
transport_remote->SendFin(stream_id);
std::string echo_back = Read(std::move(readable_for_incoming)); std::string echo_back = Read(std::move(readable_for_incoming));
EXPECT_EQ("hello", echo_back); EXPECT_EQ("hello", echo_back);
client.WaitUntilIncomingStreamIsClosed(stream_id);
EXPECT_FALSE(client.has_seen_mojo_connection_error());
EXPECT_TRUE(client.has_received_fin_for(stream_id));
EXPECT_TRUE(client.stream_is_closed_as_incoming_stream(stream_id));
} }
} // namespace } // namespace
......
...@@ -177,6 +177,13 @@ void QuicTransport::OnHandshakeFailed() { ...@@ -177,6 +177,13 @@ void QuicTransport::OnHandshakeFailed() {
Dispose(); Dispose();
} }
void QuicTransport::OnIncomingStreamClosed(uint32_t stream_id,
bool fin_received) {
DVLOG(1) << "QuicTransport::OnIncomingStreamClosed(" << stream_id << ", "
<< fin_received << ") this=" << this;
// TODO(ricea): Implement this.
}
void QuicTransport::ContextDestroyed(ExecutionContext* execution_context) { void QuicTransport::ContextDestroyed(ExecutionContext* execution_context) {
DVLOG(1) << "QuicTransport::ContextDestroyed() this=" << this; DVLOG(1) << "QuicTransport::ContextDestroyed() this=" << this;
Dispose(); Dispose();
......
...@@ -60,7 +60,7 @@ class MODULES_EXPORT QuicTransport final ...@@ -60,7 +60,7 @@ class MODULES_EXPORT QuicTransport final
void OnHandshakeFailed() override; void OnHandshakeFailed() override;
// QuicTransportClient implementation // QuicTransportClient implementation
// TODO(ricea): Add methods. void OnIncomingStreamClosed(uint32_t stream_id, bool fin_received) override;
// Implementation of ContextLifecycleObserver // Implementation of ContextLifecycleObserver
void ContextDestroyed(ExecutionContext*) final; void ContextDestroyed(ExecutionContext*) final;
......
...@@ -106,6 +106,8 @@ class MockQuicTransport : public network::mojom::blink::QuicTransport { ...@@ -106,6 +106,8 @@ class MockQuicTransport : public network::mojom::blink::QuicTransport {
void(base::OnceCallback< void(base::OnceCallback<
void(uint32_t, mojo::ScopedDataPipeConsumerHandle)>)); void(uint32_t, mojo::ScopedDataPipeConsumerHandle)>));
void SendFin(uint32_t stream_id) override {}
private: private:
mojo::Receiver<network::mojom::blink::QuicTransport> receiver_; mojo::Receiver<network::mojom::blink::QuicTransport> receiver_;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment