Commit d4d4982b authored by Steve Anton's avatar Steve Anton Committed by Commit Bot

Implement RTCQuicStream.waitForWriteBufferedAmountBelow()

Bug: 874296
Change-Id: I694325a0cc85ad520c18762e34ba05e2204e2c74
Reviewed-on: https://chromium-review.googlesource.com/c/1286902
Commit-Queue: Steve Anton <steveanton@chromium.org>
Reviewed-by: default avatarHenrik Boström <hbos@chromium.org>
Cr-Commit-Position: refs/heads/master@{#608648}
parent 2e01e692
......@@ -253,4 +253,53 @@ promise_test(async t => {
assert_equals(localStream.writeBufferedAmount, 10);
}, 'writeBufferedAmount maintained after finish() has been called.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
await localStream.waitForWriteBufferedAmountBelow(0);
}, 'waitForWriteBufferedAmountBelow(0) resolves immediately.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
await localStream.waitForWriteBufferedAmountBelow(
localStream.maxWriteBufferedAmount);
}, 'waitForWriteBufferedAmountBelow(maxWriteBufferedAmount) resolves ' +
'immediately.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(localStream.maxWriteBufferedAmount));
const promise1 = localStream.waitForWriteBufferedAmountBelow(0);
const promise2 = localStream.waitForWriteBufferedAmountBelow(0);
localStream.finish();
await Promise.all([
promise_rejects(t, 'InvalidStateError', promise1),
promise_rejects(t, 'InvalidStateError', promise2)]);
}, 'Pending waitForWriteBufferedAmountBelow() promises rejected after ' +
'finish().');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(localStream.maxWriteBufferedAmount));
const promise1 = localStream.waitForWriteBufferedAmountBelow(0);
const promise2 = localStream.waitForWriteBufferedAmountBelow(0);
localStream.reset();
await Promise.all([
promise_rejects(t, 'InvalidStateError', promise1),
promise_rejects(t, 'InvalidStateError', promise2)]);
}, 'Pending waitForWriteBufferedAmountBelow() promises rejected after ' +
'reset().');
closed_stream_test(async (t, stream) => {
await promise_rejects(t, 'InvalidStateError',
stream.waitForWriteBufferedAmountBelow(0));
}, 'waitForWriteBufferedBelow() rejects with InvalidStateError.');
</script>
......@@ -5600,6 +5600,7 @@ interface RTCQuicStream : EventTarget
method constructor
method finish
method reset
method waitForWriteBufferedAmountBelow
method write
setter onstatechange
interface RTCQuicStreamEvent : Event
......
......@@ -3,6 +3,7 @@
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/peerconnection/rtc_quic_stream.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/core/dom/events/event.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
......@@ -10,6 +11,23 @@ namespace blink {
const uint32_t RTCQuicStream::kWriteBufferSize = 4 * 1024;
class RTCQuicStream::PendingWriteBufferedAmountPromise
: public GarbageCollected<PendingWriteBufferedAmountPromise> {
public:
PendingWriteBufferedAmountPromise(ScriptPromiseResolver* promise_resolver,
uint32_t threshold)
: promise_resolver_(promise_resolver), threshold_(threshold) {}
ScriptPromiseResolver* promise_resolver() const { return promise_resolver_; }
uint32_t threshold() const { return threshold_; }
void Trace(Visitor* visitor) { visitor->Trace(promise_resolver_); }
private:
Member<ScriptPromiseResolver> promise_resolver_;
uint32_t threshold_;
};
RTCQuicStream::RTCQuicStream(ExecutionContext* context,
RTCQuicTransport* transport,
QuicStreamProxy* stream_proxy)
......@@ -54,9 +72,7 @@ uint32_t RTCQuicStream::maxWriteBufferedAmount() const {
void RTCQuicStream::write(NotShared<DOMUint8Array> data,
ExceptionState& exception_state) {
if (IsClosed() || wrote_fin_) {
exception_state.ThrowDOMException(DOMExceptionCode::kInvalidStateError,
"The stream is not writable.");
if (RaiseIfNotWritable(exception_state)) {
return;
}
if (data.View()->length() == 0) {
......@@ -90,6 +106,7 @@ void RTCQuicStream::finish() {
if (readable_) {
DCHECK_EQ(state_, RTCQuicStreamState::kOpen);
state_ = RTCQuicStreamState::kClosing;
RejectPendingWaitForWriteBufferedAmountBelowPromises();
} else {
DCHECK_EQ(state_, RTCQuicStreamState::kClosing);
Close(CloseReason::kReadWriteFinished);
......@@ -103,6 +120,57 @@ void RTCQuicStream::reset() {
Close(CloseReason::kLocalReset);
}
ScriptPromise RTCQuicStream::waitForWriteBufferedAmountBelow(
ScriptState* script_state,
uint32_t threshold,
ExceptionState& exception_state) {
if (RaiseIfNotWritable(exception_state)) {
return ScriptPromise();
}
ScriptPromiseResolver* promise_resolver =
ScriptPromiseResolver::Create(script_state);
ScriptPromise promise = promise_resolver->Promise();
if (write_buffered_amount_ <= threshold) {
promise_resolver->Resolve();
} else {
pending_write_buffered_amount_promises_.push_back(
new PendingWriteBufferedAmountPromise(promise_resolver, threshold));
}
return promise;
}
bool RTCQuicStream::RaiseIfNotWritable(ExceptionState& exception_state) {
if (wrote_fin_) {
exception_state.ThrowDOMException(
DOMExceptionCode::kInvalidStateError,
"The stream is not writable: finish() has been called.");
return true;
}
if (IsClosed()) {
exception_state.ThrowDOMException(
DOMExceptionCode::kInvalidStateError,
"The stream is not writable: The stream is closed.");
return true;
}
return false;
}
void RTCQuicStream::RejectPendingWaitForWriteBufferedAmountBelowPromises() {
// TODO(https://github.com/w3c/webrtc-quic/issues/81): The promise resolve
// order is under specified.
for (PendingWriteBufferedAmountPromise* pending_promise :
pending_write_buffered_amount_promises_) {
ExceptionState exception_state(
pending_promise->promise_resolver()->GetScriptState()->GetIsolate(),
ExceptionState::kExecutionContext, "RTCQuicStream",
"waitForWriteBufferedAmountBelow");
exception_state.ThrowDOMException(DOMExceptionCode::kInvalidStateError,
"The stream is no longer writable.");
pending_promise->promise_resolver()->Reject(exception_state);
}
pending_write_buffered_amount_promises_.clear();
}
void RTCQuicStream::OnRemoteReset() {
Close(CloseReason::kRemoteReset);
}
......@@ -127,6 +195,18 @@ void RTCQuicStream::OnDataReceived(Vector<uint8_t> data, bool fin) {
void RTCQuicStream::OnWriteDataConsumed(uint32_t amount) {
DCHECK_GE(write_buffered_amount_, amount);
write_buffered_amount_ -= amount;
// TODO(https://github.com/w3c/webrtc-quic/issues/81): The promise resolve
// order is under specified.
for (auto* it = pending_write_buffered_amount_promises_.begin();
it != pending_write_buffered_amount_promises_.end();) {
PendingWriteBufferedAmountPromise* pending_promise = *it;
if (write_buffered_amount_ <= pending_promise->threshold()) {
pending_promise->promise_resolver()->Resolve();
it = pending_write_buffered_amount_promises_.erase(it);
} else {
++it;
}
}
}
void RTCQuicStream::OnQuicTransportClosed(
......@@ -173,6 +253,12 @@ void RTCQuicStream::Close(CloseReason reason) {
readable_ = false;
write_buffered_amount_ = 0;
// It's illegal to resolve or reject promises when the ExecutionContext is
// being destroyed.
if (reason != CloseReason::kContextDestroyed) {
RejectPendingWaitForWriteBufferedAmountBelowPromises();
}
// Change the state. Fire the statechange event only if the close is caused by
// a remote stream event.
state_ = RTCQuicStreamState::kClosed;
......@@ -191,6 +277,7 @@ ExecutionContext* RTCQuicStream::GetExecutionContext() const {
void RTCQuicStream::Trace(blink::Visitor* visitor) {
visitor->Trace(transport_);
visitor->Trace(pending_write_buffered_amount_promises_);
EventTargetWithInlineData::Trace(visitor);
ContextClient::Trace(visitor);
}
......
......@@ -15,6 +15,8 @@
namespace blink {
class ScriptPromise;
enum class RTCQuicStreamState { kNew, kOpening, kOpen, kClosing, kClosed };
// The RTCQuicStream does not need to be ActiveScriptWrappable since the
......@@ -59,6 +61,10 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
void write(NotShared<DOMUint8Array> data, ExceptionState& exception_state);
void finish();
void reset();
ScriptPromise waitForWriteBufferedAmountBelow(
ScriptState* script_state,
uint32_t threshold,
ExceptionState& exception_state);
DEFINE_ATTRIBUTE_EVENT_LISTENER(statechange, kStatechange);
// EventTarget overrides.
......@@ -66,14 +72,18 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
ExecutionContext* GetExecutionContext() const override;
// For garbage collection.
void Trace(blink::Visitor* visitor) override;
void Trace(Visitor* visitor) override;
private:
class PendingWriteBufferedAmountPromise;
// QuicStreamProxy::Delegate overrides.
void OnRemoteReset() override;
void OnDataReceived(Vector<uint8_t> data, bool fin) override;
void OnWriteDataConsumed(uint32_t amount) override;
bool RaiseIfNotWritable(ExceptionState&);
// Permenantly closes the RTCQuicStream with the given reason.
// The RTCQuicStream must not already be closed.
// This will transition the state to closed.
......@@ -81,12 +91,24 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
bool IsClosed() const { return state_ == RTCQuicStreamState::kClosed; }
void RejectPendingWaitForWriteBufferedAmountBelowPromises();
Member<RTCQuicTransport> transport_;
RTCQuicStreamState state_ = RTCQuicStreamState::kOpen;
bool readable_ = true;
uint32_t read_buffered_amount_ = 0;
// Amount of bytes written but may not yet have been sent by the underlying
// P2PQuicStream.
// write() increases this number.
// OnDataSent() decreases this number.
uint32_t write_buffered_amount_ = 0;
// True if finish() has been called.
bool wrote_fin_ = false;
// Pending waitForWriteBufferedAmountBelow Promises.
HeapVector<Member<PendingWriteBufferedAmountPromise>>
pending_write_buffered_amount_promises_;
QuicStreamProxy* proxy_;
};
......
......@@ -24,6 +24,8 @@ enum RTCQuicStreamState {
[RaisesException] void write(Uint8Array data);
void finish();
void reset();
[CallWith=ScriptState, RaisesException]
Promise<void> waitForWriteBufferedAmountBelow(unsigned long amount);
attribute EventHandler onstatechange;
// TODO(crbug.com/868068): Implement remaining methods, attributes, and events.
};
......
......@@ -8,6 +8,7 @@
// for the main thread / worker thread.
#include "third_party/blink/renderer/modules/peerconnection/rtc_quic_stream.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/modules/peerconnection/adapters/test/mock_p2p_quic_stream.h"
#include "third_party/blink/renderer/modules/peerconnection/rtc_quic_stream_event.h"
#include "third_party/blink/renderer/modules/peerconnection/rtc_quic_transport_test.h"
......@@ -334,4 +335,179 @@ TEST_F(RTCQuicStreamTest, WriteThrowsIfRemoteReset) {
RunUntilIdle();
}
// The following group tests waitForWriteBufferedAmountBelow().
// Test that a waitForWriteBufferedAmountBelow() promise resolves once
// OnWriteDataConsumed() frees up enough write buffer space.
TEST_F(RTCQuicStreamTest, WaitForWriteBufferedAmountBelowResolves) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->write(CreateUint8ArrayOfLength(stream->maxWriteBufferedAmount()),
ASSERT_NO_EXCEPTION);
ScriptPromise promise = stream->waitForWriteBufferedAmountBelow(
scope.GetScriptState(), stream->maxWriteBufferedAmount() - 1,
ASSERT_NO_EXCEPTION);
EXPECT_EQ(v8::Promise::kPending,
promise.V8Value().As<v8::Promise>()->State());
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnWriteDataConsumed(1);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise.V8Value().As<v8::Promise>()->State());
}
// Test that a waitForWriteBufferedAmount() promise does not resolve until
// OnWriteDataConsumed() frees up exactly the threshold amount.
TEST_F(RTCQuicStreamTest,
WaitForWriteBufferedAmountBelowDoesNotResolveUntilExceedsThreshold) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->write(CreateUint8ArrayOfLength(stream->maxWriteBufferedAmount()),
ASSERT_NO_EXCEPTION);
ScriptPromise promise = stream->waitForWriteBufferedAmountBelow(
scope.GetScriptState(), stream->maxWriteBufferedAmount() - 10,
ASSERT_NO_EXCEPTION);
EXPECT_EQ(v8::Promise::kPending,
promise.V8Value().As<v8::Promise>()->State());
RunUntilIdle();
// Post OnWriteDataConsumed(9) -- this should not resolve the promise since
// we're waiting for 10 bytes to be available.
ASSERT_TRUE(stream_delegate);
stream_delegate->OnWriteDataConsumed(9);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kPending,
promise.V8Value().As<v8::Promise>()->State());
// Post OnWriteDataConsumed(1) -- this should resolve the promise since now we
// have 9 + 1 = 10 bytes available.
stream_delegate->OnWriteDataConsumed(1);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise.V8Value().As<v8::Promise>()->State());
}
// Test that if two waitForWriteBufferedAmount() promises are waiting on
// different thresholds, OnWriteDataConsumed() which satisfies the first
// threshold but not the second will only resolve the first promise. Once
// OnWriteDataConsumed() is received again past the second threshold then the
// second promise will be resolved.
TEST_F(RTCQuicStreamTest,
TwoWaitForWriteBufferedAmountBelowPromisesResolveInSequence) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->write(CreateUint8ArrayOfLength(stream->maxWriteBufferedAmount()),
ASSERT_NO_EXCEPTION);
ScriptPromise promise_10 = stream->waitForWriteBufferedAmountBelow(
scope.GetScriptState(), stream->maxWriteBufferedAmount() - 10,
ASSERT_NO_EXCEPTION);
ScriptPromise promise_90 = stream->waitForWriteBufferedAmountBelow(
scope.GetScriptState(), stream->maxWriteBufferedAmount() - 90,
ASSERT_NO_EXCEPTION);
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnWriteDataConsumed(10);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise_10.V8Value().As<v8::Promise>()->State());
EXPECT_EQ(v8::Promise::kPending,
promise_90.V8Value().As<v8::Promise>()->State());
stream_delegate->OnWriteDataConsumed(80);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise_90.V8Value().As<v8::Promise>()->State());
}
// Test that if two waitForWriteBufferedAmount() promises are waiting on
// different thresholds and a single OnWriteDataConsumed() is received such that
// the buffered amount is below both thresholds then both promises are resolved.
TEST_F(RTCQuicStreamTest,
TwoWaitForWriteBufferedAmountBelowPromisesResolveTogether) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->write(CreateUint8ArrayOfLength(stream->maxWriteBufferedAmount()),
ASSERT_NO_EXCEPTION);
ScriptPromise promise_10 = stream->waitForWriteBufferedAmountBelow(
scope.GetScriptState(), stream->maxWriteBufferedAmount() - 10,
ASSERT_NO_EXCEPTION);
ScriptPromise promise_90 = stream->waitForWriteBufferedAmountBelow(
scope.GetScriptState(), stream->maxWriteBufferedAmount() - 90,
ASSERT_NO_EXCEPTION);
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnWriteDataConsumed(90);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise_10.V8Value().As<v8::Promise>()->State());
EXPECT_EQ(v8::Promise::kFulfilled,
promise_90.V8Value().As<v8::Promise>()->State());
}
// Test that there is no crash when the ExecutionContext is being destroyed and
// there are pending waitForWriteBufferedAmountBelow() promises. If the
// RTCQuicStream attempts to resolve the promise in ContextDestroyed, it will
// likely crash since the v8::Isolate is being torn down.
TEST_F(
RTCQuicStreamTest,
NoCrashIfPendingWaitForWriteBufferedAmountBelowPromisesOnContextDestroyed) {
V8TestingScope scope;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>();
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->write(CreateUint8ArrayOfLength(stream->maxWriteBufferedAmount()),
ASSERT_NO_EXCEPTION);
stream->waitForWriteBufferedAmountBelow(scope.GetScriptState(), 0,
ASSERT_NO_EXCEPTION);
RunUntilIdle();
}
} // namespace blink
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment