Commit 0c8acefd authored by Yutaka Hirano's avatar Yutaka Hirano Committed by Chromium LUCI CQ

[WebTransport] Provide backpressure information for outgoing datagrams

Currently web developers don't know whether the browser is able to
send more datagrams. This CL will enable that by providing the
backpressure information.

Bug: 1150350
Change-Id: I20dad8e8ca0bd1f84c4b9ecf935e14e341af2e2f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2563017
Commit-Queue: Yutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarVictor Vasiliev <vasilvv@chromium.org>
Reviewed-by: default avatarAdam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/master@{#832222}
parent 6751f661
...@@ -350,14 +350,14 @@ void QuicTransport::SendDatagram(base::span<const uint8_t> data, ...@@ -350,14 +350,14 @@ void QuicTransport::SendDatagram(base::span<const uint8_t> data,
base::OnceCallback<void(bool)> callback) { base::OnceCallback<void(bool)> callback) {
DCHECK(!torn_down_); DCHECK(!torn_down_);
datagram_callbacks_.emplace(std::move(callback));
auto buffer = base::MakeRefCounted<net::IOBuffer>(data.size()); auto buffer = base::MakeRefCounted<net::IOBuffer>(data.size());
memcpy(buffer->data(), data.data(), data.size()); memcpy(buffer->data(), data.data(), data.size());
quic::QuicMemSlice slice( quic::QuicMemSlice slice(
quic::QuicMemSliceImpl(std::move(buffer), data.size())); quic::QuicMemSliceImpl(std::move(buffer), data.size()));
const quic::MessageStatus status = transport_->session()->datagram_queue()->SendOrQueueDatagram(
transport_->session()->datagram_queue()->SendOrQueueDatagram( std::move(slice));
std::move(slice));
std::move(callback).Run(status == quic::MESSAGE_STATUS_SUCCESS);
} }
void QuicTransport::CreateStream( void QuicTransport::CreateStream(
...@@ -588,7 +588,11 @@ void QuicTransport::OnCanCreateNewOutgoingUnidirectionalStream() { ...@@ -588,7 +588,11 @@ void QuicTransport::OnCanCreateNewOutgoingUnidirectionalStream() {
void QuicTransport::OnDatagramProcessed( void QuicTransport::OnDatagramProcessed(
base::Optional<quic::MessageStatus> status) { base::Optional<quic::MessageStatus> status) {
// TODO(yhirano): Implement this. DCHECK(!datagram_callbacks_.empty());
std::move(datagram_callbacks_.front())
.Run(status == quic::MESSAGE_STATUS_SUCCESS);
datagram_callbacks_.pop();
} }
void QuicTransport::TearDown() { void QuicTransport::TearDown() {
......
...@@ -100,6 +100,7 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) QuicTransport final ...@@ -100,6 +100,7 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) QuicTransport final
mojo::Receiver<mojom::QuicTransport> receiver_; mojo::Receiver<mojom::QuicTransport> receiver_;
mojo::Remote<mojom::QuicTransportHandshakeClient> handshake_client_; mojo::Remote<mojom::QuicTransportHandshakeClient> handshake_client_;
mojo::Remote<mojom::QuicTransportClient> client_; mojo::Remote<mojom::QuicTransportClient> client_;
base::queue<base::OnceCallback<void(bool)>> datagram_callbacks_;
bool torn_down_ = false; bool torn_down_ = false;
......
...@@ -76,8 +76,8 @@ bool CreateStreamDataPipe(mojo::ScopedDataPipeProducerHandle* producer, ...@@ -76,8 +76,8 @@ bool CreateStreamDataPipe(mojo::ScopedDataPipeProducerHandle* producer,
// Sends a datagram on write(). // Sends a datagram on write().
class QuicTransport::DatagramUnderlyingSink final : public UnderlyingSinkBase { class QuicTransport::DatagramUnderlyingSink final : public UnderlyingSinkBase {
public: public:
explicit DatagramUnderlyingSink(QuicTransport* quic_transport) DatagramUnderlyingSink(QuicTransport* quic_transport, int high_water_mark)
: quic_transport_(quic_transport) {} : quic_transport_(quic_transport), high_water_mark_(high_water_mark) {}
ScriptPromise start(ScriptState* script_state, ScriptPromise start(ScriptState* script_state,
WritableStreamDefaultController*, WritableStreamDefaultController*,
...@@ -131,6 +131,7 @@ class QuicTransport::DatagramUnderlyingSink final : public UnderlyingSinkBase { ...@@ -131,6 +131,7 @@ class QuicTransport::DatagramUnderlyingSink final : public UnderlyingSinkBase {
void Trace(Visitor* visitor) const override { void Trace(Visitor* visitor) const override {
visitor->Trace(quic_transport_); visitor->Trace(quic_transport_);
visitor->Trace(pending_datagrams_);
UnderlyingSinkBase::Trace(visitor); UnderlyingSinkBase::Trace(visitor);
} }
...@@ -145,18 +146,31 @@ class QuicTransport::DatagramUnderlyingSink final : public UnderlyingSinkBase { ...@@ -145,18 +146,31 @@ class QuicTransport::DatagramUnderlyingSink final : public UnderlyingSinkBase {
auto* resolver = MakeGarbageCollected<ScriptPromiseResolver>( auto* resolver = MakeGarbageCollected<ScriptPromiseResolver>(
quic_transport_->script_state_); quic_transport_->script_state_);
pending_datagrams_.push_back(resolver);
quic_transport_->quic_transport_->SendDatagram( quic_transport_->quic_transport_->SendDatagram(
data, WTF::Bind(&DatagramSent, WrapPersistent(resolver))); data, WTF::Bind(&DatagramUnderlyingSink::OnDatagramProcessed,
WrapWeakPersistent(this)));
if (pending_datagrams_.size() < static_cast<wtf_size_t>(high_water_mark_)) {
// In this case we pretend that the datagram is processed immediately, to
// get more requests from the stream.
return ScriptPromise::CastUndefined(quic_transport_->script_state_);
}
return resolver->Promise(); return resolver->Promise();
} }
// |sent| indicates whether the datagram was sent or dropped. Currently we void OnDatagramProcessed(bool sent) {
// |don't do anything with this information. DCHECK(!pending_datagrams_.empty());
static void DatagramSent(ScriptPromiseResolver* resolver, bool sent) {
ScriptPromiseResolver* resolver = pending_datagrams_.front();
pending_datagrams_.pop_front();
resolver->Resolve(); resolver->Resolve();
} }
Member<QuicTransport> quic_transport_; Member<QuicTransport> quic_transport_;
const int high_water_mark_;
HeapDeque<Member<ScriptPromiseResolver>> pending_datagrams_;
}; };
// Captures a pointer to the ReadableStreamDefaultControllerWithScriptScope in // Captures a pointer to the ReadableStreamDefaultControllerWithScriptScope in
...@@ -713,8 +727,25 @@ void QuicTransport::Init(const String& url, ...@@ -713,8 +727,25 @@ void QuicTransport::Init(const String& url,
received_datagrams_ = ReadableStream::CreateWithCountQueueingStrategy( received_datagrams_ = ReadableStream::CreateWithCountQueueingStrategy(
script_state_, script_state_,
MakeGarbageCollected<DatagramUnderlyingSource>(script_state_, this), 1); MakeGarbageCollected<DatagramUnderlyingSource>(script_state_, this), 1);
int outgoing_datagrams_high_water_mark = 1;
if (options.hasDatagramWritableHighWaterMark()) {
outgoing_datagrams_high_water_mark =
options.datagramWritableHighWaterMark();
}
// We create a WritableStream with high water mark 1 and try to mimic the
// given high water mark in the Sink, from two reasons:
// 1. This is better because we can hide the RTT between the renderer and the
// network service.
// 2. Keeping datagrams in the renderer would be confusing for the timer for
// the datagram
// queue in the network service, because the timestamp is taken when the
// datagram is added to the queue.
outgoing_datagrams_ = WritableStream::CreateWithCountQueueingStrategy( outgoing_datagrams_ = WritableStream::CreateWithCountQueueingStrategy(
script_state_, MakeGarbageCollected<DatagramUnderlyingSink>(this), 1); script_state_,
MakeGarbageCollected<DatagramUnderlyingSink>(
this, outgoing_datagrams_high_water_mark),
1);
received_streams_underlying_source_ = received_streams_underlying_source_ =
StreamVendingUnderlyingSource::CreateWithVendor<ReceiveStreamVendor>( StreamVendingUnderlyingSource::CreateWithVendor<ReceiveStreamVendor>(
......
...@@ -6,4 +6,9 @@ ...@@ -6,4 +6,9 @@
dictionary QuicTransportOptions { dictionary QuicTransportOptions {
sequence<RTCDtlsFingerprint> serverCertificateFingerprints; sequence<RTCDtlsFingerprint> serverCertificateFingerprints;
// This is an experimental and non-standard parameter. This controls the high
// water mark (https://streams.spec.whatwg.org/#high-water-mark) for the
// datagramWritable stream.
long datagramWritableHighWaterMark;
}; };
...@@ -160,9 +160,11 @@ class QuicTransportTest : public ::testing::Test { ...@@ -160,9 +160,11 @@ class QuicTransportTest : public ::testing::Test {
} }
// Creates a QuicTransport object with the given |url|. // Creates a QuicTransport object with the given |url|.
QuicTransport* Create(const V8TestingScope& scope, const String& url) { QuicTransport* Create(const V8TestingScope& scope,
const String& url,
QuicTransportOptions* options) {
AddBinder(scope); AddBinder(scope);
return QuicTransport::Create(scope.GetScriptState(), url, EmptyOptions(), return QuicTransport::Create(scope.GetScriptState(), url, options,
ASSERT_NO_EXCEPTION); ASSERT_NO_EXCEPTION);
} }
...@@ -212,9 +214,11 @@ class QuicTransportTest : public ::testing::Test { ...@@ -212,9 +214,11 @@ class QuicTransportTest : public ::testing::Test {
// Creates, connects and returns a QuicTransport object with the given |url|. // Creates, connects and returns a QuicTransport object with the given |url|.
// Runs the event loop. // Runs the event loop.
QuicTransport* CreateAndConnectSuccessfully(const V8TestingScope& scope, QuicTransport* CreateAndConnectSuccessfully(
const String& url) { const V8TestingScope& scope,
auto* quic_transport = Create(scope, url); const String& url,
QuicTransportOptions* options = EmptyOptions()) {
auto* quic_transport = Create(scope, url, options);
ConnectSuccessfully(quic_transport); ConnectSuccessfully(quic_transport);
return quic_transport; return quic_transport;
} }
...@@ -617,9 +621,77 @@ TEST_F(QuicTransportTest, SendDatagram) { ...@@ -617,9 +621,77 @@ TEST_F(QuicTransportTest, SendDatagram) {
EXPECT_TRUE(tester.Value().IsUndefined()); EXPECT_TRUE(tester.Value().IsUndefined());
} }
TEST_F(QuicTransportTest, BackpressureForOutgoingDatagrams) {
V8TestingScope scope;
auto* const options = MakeGarbageCollected<QuicTransportOptions>();
options->setDatagramWritableHighWaterMark(3);
auto* quic_transport = CreateAndConnectSuccessfully(
scope, "quic-transport://example.com", options);
EXPECT_CALL(*mock_quic_transport_, SendDatagram(_, _))
.Times(4)
.WillRepeatedly(
Invoke([](base::span<const uint8_t>,
MockQuicTransport::SendDatagramCallback callback) {
std::move(callback).Run(true);
}));
auto* writable = quic_transport->sendDatagrams();
auto* script_state = scope.GetScriptState();
auto* writer = writable->getWriter(script_state, ASSERT_NO_EXCEPTION);
ScriptPromise promise1;
ScriptPromise promise2;
ScriptPromise promise3;
ScriptPromise promise4;
{
auto* chunk = DOMUint8Array::Create(1);
*chunk->Data() = 'A';
promise1 =
writer->write(script_state, ScriptValue::From(script_state, chunk),
ASSERT_NO_EXCEPTION);
}
{
auto* chunk = DOMUint8Array::Create(1);
*chunk->Data() = 'B';
promise2 =
writer->write(script_state, ScriptValue::From(script_state, chunk),
ASSERT_NO_EXCEPTION);
}
{
auto* chunk = DOMUint8Array::Create(1);
*chunk->Data() = 'C';
promise3 =
writer->write(script_state, ScriptValue::From(script_state, chunk),
ASSERT_NO_EXCEPTION);
}
{
auto* chunk = DOMUint8Array::Create(1);
*chunk->Data() = 'D';
promise4 =
writer->write(script_state, ScriptValue::From(script_state, chunk),
ASSERT_NO_EXCEPTION);
}
// The first two promises are resolved immediately.
v8::MicrotasksScope::PerformCheckpoint(scope.GetIsolate());
EXPECT_EQ(promise1.V8Promise()->State(), v8::Promise::kFulfilled);
EXPECT_EQ(promise2.V8Promise()->State(), v8::Promise::kFulfilled);
EXPECT_EQ(promise3.V8Promise()->State(), v8::Promise::kPending);
EXPECT_EQ(promise4.V8Promise()->State(), v8::Promise::kPending);
// The rest are resolved by the callback.
test::RunPendingTasks();
v8::MicrotasksScope::PerformCheckpoint(scope.GetIsolate());
EXPECT_EQ(promise3.V8Promise()->State(), v8::Promise::kFulfilled);
EXPECT_EQ(promise4.V8Promise()->State(), v8::Promise::kFulfilled);
}
TEST_F(QuicTransportTest, SendDatagramBeforeConnect) { TEST_F(QuicTransportTest, SendDatagramBeforeConnect) {
V8TestingScope scope; V8TestingScope scope;
auto* quic_transport = Create(scope, "quic-transport://example.com"); auto* quic_transport =
Create(scope, "quic-transport://example.com", EmptyOptions());
auto* writable = quic_transport->sendDatagrams(); auto* writable = quic_transport->sendDatagrams();
auto* script_state = scope.GetScriptState(); auto* script_state = scope.GetScriptState();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment