Commit c852d472 authored by Steve Anton's avatar Steve Anton Committed by Commit Bot

Implement RTCQuicStream.waitForReadable()

Bug: 874296
Tbr: hbos@chromium.org
Change-Id: I1d76f979d9ff4cfcec0b5d5a34663dc48bb94056
Reviewed-on: https://chromium-review.googlesource.com/c/1289698
Commit-Queue: Steve Anton <steveanton@chromium.org>
Reviewed-by: default avatarSteve Anton <steveanton@chromium.org>
Cr-Commit-Position: refs/heads/master@{#608921}
parent 83789b82
...@@ -117,6 +117,15 @@ promise_test(async t => { ...@@ -117,6 +117,15 @@ promise_test(async t => {
}, 'createStream() followed by reset() fires a quicstream event followed ' + }, 'createStream() followed by reset() fires a quicstream event followed ' +
`by a statechange event to 'closed' on the remote side.`); `by a statechange event to 'closed' on the remote side.`);
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
const promise = localStream.waitForReadable(1);
localStream.reset();
await promise_rejects(t, 'InvalidStateError', promise);
}, 'reset() rejects pending waitForReadable() promises.');
promise_test(async t => { promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] = const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t); await makeTwoConnectedQuicTransports(t);
...@@ -290,4 +299,116 @@ closed_stream_test(async (t, stream) => { ...@@ -290,4 +299,116 @@ closed_stream_test(async (t, stream) => {
assert_throws('InvalidStateError', () => stream.readInto(new Uint8Array(1))); assert_throws('InvalidStateError', () => stream.readInto(new Uint8Array(1)));
}, 'readInto() throws InvalidStateError.'); }, 'readInto() throws InvalidStateError.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array([ 65 ]));
const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
const { stream: remoteStream } = await remoteWatcher.wait_for('quicstream');
await remoteStream.waitForReadable(1);
assert_equals(remoteStream.readBufferedAmount, 1);
const readBuffer = new Uint8Array(3);
assert_object_equals(
remoteStream.readInto(readBuffer),
{ amount: 1, finished: false });
assert_array_equals(readBuffer, [ 65, 0, 0 ]);
assert_equals(remoteStream.readBufferedAmount, 0);
}, 'Read 1 byte.');
// Returns a Uint8Array of length |amount| with generated data.
function generateData(amount) {
const data = new Uint8Array(amount);
for (let i = 0; i < data.length; i++) {
data[i] = i % 256;
}
return data;
}
// Writes |amount| of bytes to the given RTCQuicStream in maxWriteBufferedAmount
// chunks.
async function writeGeneratedData(stream, amount) {
const data = generateData(Math.min(stream.maxWriteBufferedAmount, amount));
while (amount > 0) {
const chunkSize = Math.min(stream.maxWriteBufferedAmount, amount);
await stream.waitForWriteBufferedAmountBelow(0);
stream.write(data.subarray(0, chunkSize));
amount -= chunkSize;
}
}
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
writeGeneratedData(localStream, localStream.maxReadBufferedAmount);
const { stream: remoteStream } = await remoteWatcher.wait_for('quicstream');
await remoteStream.waitForReadable(localStream.maxReadBufferedAmount);
const readBuffer = new Uint8Array(localStream.maxReadBufferedAmount);
assert_object_equals(
remoteStream.readInto(readBuffer),
{ amount: localStream.maxReadBufferedAmount, finished: false });
assert_array_equals(
readBuffer, generateData(localStream.maxReadBufferedAmount));
}, 'Read maxReadBufferedAmount bytes all at once.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
const writeData = generateData(10);
localStream.write(writeData);
localStream.finish();
const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
const { stream: remoteStream } = await remoteWatcher.wait_for('quicstream');
await remoteStream.waitForReadable(11);
assert_equals(remoteStream.readBufferedAmount, 10);
const readBuffer = new Uint8Array(10);
assert_object_equals(
remoteStream.readInto(readBuffer), { amount: 10, finished: true });
assert_array_equals(readBuffer, writeData);
}, 'waitForReadable() resolves early if remote fin is received.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
await promise_rejects(t, new TypeError(),
localStream.waitForReadable(localStream.maxReadBufferedAmount + 1));
}, 'waitForReadable() rejects with TypeError if amount is more than ' +
'maxReadBufferedAmount.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
const promise1 = localStream.waitForReadable(10);
const promise2 = localStream.waitForReadable(10);
localStream.reset();
await Promise.all([
promise_rejects(t, 'InvalidStateError', promise1),
promise_rejects(t, 'InvalidStateError', promise2)]);
}, 'Pending waitForReadable() promises rejected after reset().');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(1));
const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
const { stream : remoteStream} = await remoteWatcher.wait_for('quicstream');
const promise1 = remoteStream.waitForReadable(10);
const promise2 = remoteStream.waitForReadable(10);
localStream.reset();
await Promise.all([
promise_rejects(t, 'InvalidStateError', promise1),
promise_rejects(t, 'InvalidStateError', promise2)]);
}, 'Pending waitForReadable() promises rejected after remote reset().');
closed_stream_test(async (t, stream) => {
await promise_rejects(t, 'InvalidStateError',
stream.waitForReadable(1));
}, 'waitForReadable() rejects with InvalidStateError.');
</script> </script>
...@@ -5602,6 +5602,7 @@ interface RTCQuicStream : EventTarget ...@@ -5602,6 +5602,7 @@ interface RTCQuicStream : EventTarget
method finish method finish
method readInto method readInto
method reset method reset
method waitForReadable
method waitForWriteBufferedAmountBelow method waitForWriteBufferedAmountBelow
method write method write
setter onstatechange setter onstatechange
......
...@@ -13,6 +13,24 @@ namespace blink { ...@@ -13,6 +13,24 @@ namespace blink {
const uint32_t RTCQuicStream::kWriteBufferSize = 4 * 1024; const uint32_t RTCQuicStream::kWriteBufferSize = 4 * 1024;
const uint32_t RTCQuicStream::kReadBufferSize = 4 * 1024; const uint32_t RTCQuicStream::kReadBufferSize = 4 * 1024;
class RTCQuicStream::PendingReadBufferedAmountPromise
: public GarbageCollected<PendingReadBufferedAmountPromise> {
public:
PendingReadBufferedAmountPromise(ScriptPromiseResolver* promise_resolver,
uint32_t readable_amount)
: promise_resolver_(promise_resolver),
readable_amount_(readable_amount) {}
ScriptPromiseResolver* promise_resolver() const { return promise_resolver_; }
uint32_t readable_amount() const { return readable_amount_; }
void Trace(Visitor* visitor) { visitor->Trace(promise_resolver_); }
private:
Member<ScriptPromiseResolver> promise_resolver_;
uint32_t readable_amount_;
};
class RTCQuicStream::PendingWriteBufferedAmountPromise class RTCQuicStream::PendingWriteBufferedAmountPromise
: public GarbageCollected<PendingWriteBufferedAmountPromise> { : public GarbageCollected<PendingWriteBufferedAmountPromise> {
public: public:
...@@ -153,6 +171,31 @@ void RTCQuicStream::reset() { ...@@ -153,6 +171,31 @@ void RTCQuicStream::reset() {
Close(CloseReason::kLocalReset); Close(CloseReason::kLocalReset);
} }
ScriptPromise RTCQuicStream::waitForReadable(ScriptState* script_state,
uint32_t amount,
ExceptionState& exception_state) {
if (RaiseIfNotReadable(exception_state)) {
return ScriptPromise();
}
if (amount > kReadBufferSize) {
exception_state.ThrowTypeError(
"The amount " + String::Number(amount) +
" is greater than the maximum read buffer size of " +
String::Number(kReadBufferSize) + ".");
return ScriptPromise();
}
ScriptPromiseResolver* promise_resolver =
ScriptPromiseResolver::Create(script_state);
ScriptPromise promise = promise_resolver->Promise();
if (received_fin_ || receive_buffer_.size() >= amount) {
promise_resolver->Resolve();
} else {
pending_read_buffered_amount_promises_.push_back(
new PendingReadBufferedAmountPromise(promise_resolver, amount));
}
return promise;
}
ScriptPromise RTCQuicStream::waitForWriteBufferedAmountBelow( ScriptPromise RTCQuicStream::waitForWriteBufferedAmountBelow(
ScriptState* script_state, ScriptState* script_state,
uint32_t threshold, uint32_t threshold,
...@@ -204,11 +247,30 @@ bool RTCQuicStream::RaiseIfNotWritable(ExceptionState& exception_state) { ...@@ -204,11 +247,30 @@ bool RTCQuicStream::RaiseIfNotWritable(ExceptionState& exception_state) {
return false; return false;
} }
void RTCQuicStream::RejectPendingWaitForReadablePromises() {
// TODO(https://github.com/w3c/webrtc-quic/issues/81): The promise resolve
// order is under specified.
for (PendingReadBufferedAmountPromise* pending_promise :
pending_read_buffered_amount_promises_) {
ScriptState::Scope scope(
pending_promise->promise_resolver()->GetScriptState());
ExceptionState exception_state(
pending_promise->promise_resolver()->GetScriptState()->GetIsolate(),
ExceptionState::kExecutionContext, "RTCQuicStream", "waitForReadable");
exception_state.ThrowDOMException(DOMExceptionCode::kInvalidStateError,
"The RTCQuicStream is not readable.");
pending_promise->promise_resolver()->Reject(exception_state);
}
pending_read_buffered_amount_promises_.clear();
}
void RTCQuicStream::RejectPendingWaitForWriteBufferedAmountBelowPromises() { void RTCQuicStream::RejectPendingWaitForWriteBufferedAmountBelowPromises() {
// TODO(https://github.com/w3c/webrtc-quic/issues/81): The promise resolve // TODO(https://github.com/w3c/webrtc-quic/issues/81): The promise resolve
// order is under specified. // order is under specified.
for (PendingWriteBufferedAmountPromise* pending_promise : for (PendingWriteBufferedAmountPromise* pending_promise :
pending_write_buffered_amount_promises_) { pending_write_buffered_amount_promises_) {
ScriptState::Scope scope(
pending_promise->promise_resolver()->GetScriptState());
ExceptionState exception_state( ExceptionState exception_state(
pending_promise->promise_resolver()->GetScriptState()->GetIsolate(), pending_promise->promise_resolver()->GetScriptState()->GetIsolate(),
ExceptionState::kExecutionContext, "RTCQuicStream", ExceptionState::kExecutionContext, "RTCQuicStream",
...@@ -229,6 +291,20 @@ void RTCQuicStream::OnDataReceived(Vector<uint8_t> data, bool fin) { ...@@ -229,6 +291,20 @@ void RTCQuicStream::OnDataReceived(Vector<uint8_t> data, bool fin) {
DCHECK_LE(data.size(), kReadBufferSize - receive_buffer_.size()); DCHECK_LE(data.size(), kReadBufferSize - receive_buffer_.size());
received_fin_ = fin; received_fin_ = fin;
receive_buffer_.Append(std::move(data)); receive_buffer_.Append(std::move(data));
// TODO(https://github.com/w3c/webrtc-quic/issues/81): The promise resolve
// order is under specified.
for (auto* it = pending_read_buffered_amount_promises_.begin();
it != pending_read_buffered_amount_promises_.end();
/* incremented manually */) {
PendingReadBufferedAmountPromise* pending_promise = *it;
if (received_fin_ ||
receive_buffer_.size() >= pending_promise->readable_amount()) {
pending_promise->promise_resolver()->Resolve();
it = pending_read_buffered_amount_promises_.erase(it);
} else {
++it;
}
}
} }
void RTCQuicStream::OnWriteDataConsumed(uint32_t amount) { void RTCQuicStream::OnWriteDataConsumed(uint32_t amount) {
...@@ -237,7 +313,8 @@ void RTCQuicStream::OnWriteDataConsumed(uint32_t amount) { ...@@ -237,7 +313,8 @@ void RTCQuicStream::OnWriteDataConsumed(uint32_t amount) {
// TODO(https://github.com/w3c/webrtc-quic/issues/81): The promise resolve // TODO(https://github.com/w3c/webrtc-quic/issues/81): The promise resolve
// order is under specified. // order is under specified.
for (auto* it = pending_write_buffered_amount_promises_.begin(); for (auto* it = pending_write_buffered_amount_promises_.begin();
it != pending_write_buffered_amount_promises_.end();) { it != pending_write_buffered_amount_promises_.end();
/* incremented manually */) {
PendingWriteBufferedAmountPromise* pending_promise = *it; PendingWriteBufferedAmountPromise* pending_promise = *it;
if (write_buffered_amount_ <= pending_promise->threshold()) { if (write_buffered_amount_ <= pending_promise->threshold()) {
pending_promise->promise_resolver()->Resolve(); pending_promise->promise_resolver()->Resolve();
...@@ -295,6 +372,7 @@ void RTCQuicStream::Close(CloseReason reason) { ...@@ -295,6 +372,7 @@ void RTCQuicStream::Close(CloseReason reason) {
// It's illegal to resolve or reject promises when the ExecutionContext is // It's illegal to resolve or reject promises when the ExecutionContext is
// being destroyed. // being destroyed.
if (reason != CloseReason::kContextDestroyed) { if (reason != CloseReason::kContextDestroyed) {
RejectPendingWaitForReadablePromises();
RejectPendingWaitForWriteBufferedAmountBelowPromises(); RejectPendingWaitForWriteBufferedAmountBelowPromises();
} }
...@@ -316,6 +394,7 @@ ExecutionContext* RTCQuicStream::GetExecutionContext() const { ...@@ -316,6 +394,7 @@ ExecutionContext* RTCQuicStream::GetExecutionContext() const {
void RTCQuicStream::Trace(blink::Visitor* visitor) { void RTCQuicStream::Trace(blink::Visitor* visitor) {
visitor->Trace(transport_); visitor->Trace(transport_);
visitor->Trace(pending_read_buffered_amount_promises_);
visitor->Trace(pending_write_buffered_amount_promises_); visitor->Trace(pending_write_buffered_amount_promises_);
EventTargetWithInlineData::Trace(visitor); EventTargetWithInlineData::Trace(visitor);
ContextClient::Trace(visitor); ContextClient::Trace(visitor);
......
...@@ -72,6 +72,9 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData, ...@@ -72,6 +72,9 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
ScriptState* script_state, ScriptState* script_state,
uint32_t threshold, uint32_t threshold,
ExceptionState& exception_state); ExceptionState& exception_state);
ScriptPromise waitForReadable(ScriptState* script_state,
uint32_t amount,
ExceptionState& exception_state);
DEFINE_ATTRIBUTE_EVENT_LISTENER(statechange, kStatechange); DEFINE_ATTRIBUTE_EVENT_LISTENER(statechange, kStatechange);
// EventTarget overrides. // EventTarget overrides.
...@@ -82,6 +85,7 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData, ...@@ -82,6 +85,7 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
void Trace(Visitor* visitor) override; void Trace(Visitor* visitor) override;
private: private:
class PendingReadBufferedAmountPromise;
class PendingWriteBufferedAmountPromise; class PendingWriteBufferedAmountPromise;
// QuicStreamProxy::Delegate overrides. // QuicStreamProxy::Delegate overrides.
...@@ -99,6 +103,7 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData, ...@@ -99,6 +103,7 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
bool IsClosed() const { return state_ == RTCQuicStreamState::kClosed; } bool IsClosed() const { return state_ == RTCQuicStreamState::kClosed; }
void RejectPendingWaitForReadablePromises();
void RejectPendingWaitForWriteBufferedAmountBelowPromises(); void RejectPendingWaitForWriteBufferedAmountBelowPromises();
Member<RTCQuicTransport> transport_; Member<RTCQuicTransport> transport_;
...@@ -113,6 +118,9 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData, ...@@ -113,6 +118,9 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
// True if the fin has been read out via readInto(). This signifies that the // True if the fin has been read out via readInto(). This signifies that the
// RTCQuicStream is closed for reading. // RTCQuicStream is closed for reading.
bool read_fin_ = false; bool read_fin_ = false;
// Pending waitForReadable promises.
HeapVector<Member<PendingReadBufferedAmountPromise>>
pending_read_buffered_amount_promises_;
// Amount of bytes written but may not yet have been sent by the underlying // Amount of bytes written but may not yet have been sent by the underlying
// P2PQuicStream. // P2PQuicStream.
......
...@@ -28,6 +28,8 @@ enum RTCQuicStreamState { ...@@ -28,6 +28,8 @@ enum RTCQuicStreamState {
void reset(); void reset();
[CallWith=ScriptState, RaisesException] [CallWith=ScriptState, RaisesException]
Promise<void> waitForWriteBufferedAmountBelow(unsigned long amount); Promise<void> waitForWriteBufferedAmountBelow(unsigned long amount);
[CallWith=ScriptState, RaisesException]
Promise<void> waitForReadable(unsigned long amount);
attribute EventHandler onstatechange; attribute EventHandler onstatechange;
// TODO(crbug.com/868068): Implement remaining methods, attributes, and events. // TODO(crbug.com/868068): Implement remaining methods, attributes, and events.
}; };
......
...@@ -792,6 +792,212 @@ TEST_F(RTCQuicStreamTest, ReadIntoThrowsIfClosed) { ...@@ -792,6 +792,212 @@ TEST_F(RTCQuicStreamTest, ReadIntoThrowsIfClosed) {
RunUntilIdle(); RunUntilIdle();
} }
// The following group tests waitForReadable().
// Test that a waitForReadable() promise resolves once OnDataReceived() delivers
// enough data.
TEST_F(RTCQuicStreamTest, WaitForReadableResolves) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
ScriptPromise promise =
stream->waitForReadable(scope.GetScriptState(), 3, ASSERT_NO_EXCEPTION);
EXPECT_EQ(v8::Promise::kPending,
promise.V8Value().As<v8::Promise>()->State());
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnDataReceived({1, 2, 3}, /*fin=*/false);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise.V8Value().As<v8::Promise>()->State());
}
// Test that a waitForReadable() promise resolves immediately if sufficient data
// is already in the receive buffer.
TEST_F(RTCQuicStreamTest, WaitForReadableResolveImmediately) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnDataReceived({1, 2, 3}, /*fin=*/false);
RunUntilIdle();
ScriptPromise promise =
stream->waitForReadable(scope.GetScriptState(), 3, ASSERT_NO_EXCEPTION);
EXPECT_EQ(v8::Promise::kFulfilled,
promise.V8Value().As<v8::Promise>()->State());
RunUntilIdle();
}
// Test that a waitForReadable() promise does not resolve until OnDataReceived()
// delivers at least the readable amount.
TEST_F(RTCQuicStreamTest, WaitForReadableDoesNotResolveUntilExceedsThreshold) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
ScriptPromise promise =
stream->waitForReadable(scope.GetScriptState(), 5, ASSERT_NO_EXCEPTION);
EXPECT_EQ(v8::Promise::kPending,
promise.V8Value().As<v8::Promise>()->State());
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnDataReceived({1, 2, 3}, /*fin=*/false);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kPending,
promise.V8Value().As<v8::Promise>()->State());
stream_delegate->OnDataReceived({4, 5}, /*fin=*/false);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise.V8Value().As<v8::Promise>()->State());
}
// Test that if two waitForReadable() promises are waiting on different readable
// amounts, OnDataReceived() which satisfies the first readable amount but not
// the second will only resolve the first promise. Once OnDataReceived() is
// received again with readable amount satisfying the second promise then it
// will be resolved.
TEST_F(RTCQuicStreamTest, TwoWaitForReadablePromisesResolveInSequence) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
ScriptPromise promise_3 =
stream->waitForReadable(scope.GetScriptState(), 3, ASSERT_NO_EXCEPTION);
ScriptPromise promise_5 =
stream->waitForReadable(scope.GetScriptState(), 5, ASSERT_NO_EXCEPTION);
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnDataReceived({1, 2, 3}, /*fin=*/false);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise_3.V8Value().As<v8::Promise>()->State());
EXPECT_EQ(v8::Promise::kPending,
promise_5.V8Value().As<v8::Promise>()->State());
stream_delegate->OnDataReceived({4, 5}, /*fin=*/false);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise_5.V8Value().As<v8::Promise>()->State());
}
// Test that if two waitForReadable() promises are waiting on different
// thresholds and a single OnDataReceived() is received such that the readable
// amount satisfies both promises then they are both resolved.
TEST_F(RTCQuicStreamTest, TwoWaitForReadablePromisesResolveTogether) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
ScriptPromise promise_3 =
stream->waitForReadable(scope.GetScriptState(), 3, ASSERT_NO_EXCEPTION);
ScriptPromise promise_5 =
stream->waitForReadable(scope.GetScriptState(), 5, ASSERT_NO_EXCEPTION);
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnDataReceived({1, 2, 3, 4, 5}, /*fin=*/false);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise_3.V8Value().As<v8::Promise>()->State());
EXPECT_EQ(v8::Promise::kFulfilled,
promise_5.V8Value().As<v8::Promise>()->State());
}
// Test that a remote finish immediately resolves all pending waitForReadable()
// promises.
TEST_F(RTCQuicStreamTest, RemoteFinishResolvesPendingWaitForReadablePromises) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
ScriptPromise promise_3 =
stream->waitForReadable(scope.GetScriptState(), 3, ASSERT_NO_EXCEPTION);
ScriptPromise promise_5 =
stream->waitForReadable(scope.GetScriptState(), 5, ASSERT_NO_EXCEPTION);
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnDataReceived({}, /*fin=*/true);
RunUntilIdle();
EXPECT_EQ(v8::Promise::kFulfilled,
promise_3.V8Value().As<v8::Promise>()->State());
EXPECT_EQ(v8::Promise::kFulfilled,
promise_5.V8Value().As<v8::Promise>()->State());
}
// Test that calling waitForReadable() resolves immediately if the finish has
// been received via OnDataReceived() but not yet read out via readInto().
// Note: If the finish has been read out via readInto(), waitForReadable() will
// throw an exception since the stream is no longer readable.
TEST_F(RTCQuicStreamTest, WaitForReadableResolvesImmediatelyIfRemoteFinished) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnDataReceived({}, /*fin=*/true);
RunUntilIdle();
ScriptPromise promise =
stream->waitForReadable(scope.GetScriptState(), 5, ASSERT_NO_EXCEPTION);
EXPECT_EQ(v8::Promise::kFulfilled,
promise.V8Value().As<v8::Promise>()->State());
}
// The following group tests state transitions with reset(), finish(), remote // The following group tests state transitions with reset(), finish(), remote
// reset() and remote finish() // reset() and remote finish()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment