Commit 9d52dc48 authored by Steve Anton's avatar Steve Anton Committed by Commit Bot

Implement RTCQuicStream.readInto()

Bug: 874296
Tbr: hbos@chromium.org
Change-Id: Id2905574237fe9221d49a1e772c9ba0f3e7957e1
Reviewed-on: https://chromium-review.googlesource.com/c/1289436
Commit-Queue: Steve Anton <steveanton@chromium.org>
Reviewed-by: default avatarSteve Anton <steveanton@chromium.org>
Cr-Commit-Position: refs/heads/master@{#608878}
parent 8d112e9c
...@@ -55,10 +55,8 @@ promise_test(async t => { ...@@ -55,10 +55,8 @@ promise_test(async t => {
promise_test(async t => { promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] = const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t); await makeTwoConnectedQuicTransports(t);
const firstLocalStream = localQuicTransport.createStream(); localQuicTransport.createStream().finish();
firstLocalStream.finish(); localQuicTransport.createStream().finish();
const secondLocalStream = localQuicTransport.createStream();
secondLocalStream.finish();
const remoteWatcher = const remoteWatcher =
new EventWatcher(t, remoteQuicTransport, [ 'quicstream', 'statechange' ]); new EventWatcher(t, remoteQuicTransport, [ 'quicstream', 'statechange' ]);
const { stream: firstRemoteStream } = const { stream: firstRemoteStream } =
...@@ -105,20 +103,6 @@ promise_test(async t => { ...@@ -105,20 +103,6 @@ promise_test(async t => {
assert_equals(localStream.state, 'closed'); assert_equals(localStream.state, 'closed');
}, `reset() following finish() changes state to 'closed'.`); }, `reset() following finish() changes state to 'closed'.`);
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.finish();
const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
const { stream: remoteStream } = await remoteWatcher.wait_for('quicstream');
assert_equals(remoteStream.state, 'open');
const remoteStreamWatcher = new EventWatcher(t, remoteStream, 'statechange');
await remoteStreamWatcher.wait_for('statechange');
assert_equals(remoteStream.state, 'closing');
}, 'createStream() followed by finish() fires a quicstream event followed by ' +
`a statechange event to 'closing' on the remote side.`);
promise_test(async t => { promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] = const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t); await makeTwoConnectedQuicTransports(t);
...@@ -133,33 +117,6 @@ promise_test(async t => { ...@@ -133,33 +117,6 @@ promise_test(async t => {
}, 'createStream() followed by reset() fires a quicstream event followed ' + }, 'createStream() followed by reset() fires a quicstream event followed ' +
`by a statechange event to 'closed' on the remote side.`); `by a statechange event to 'closed' on the remote side.`);
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
remoteQuicTransport.onquicstream = ({ stream }) => stream.reset();
const localStream = localQuicTransport.createStream();
localStream.finish();
const localWatcher = new EventWatcher(t, localStream, 'statechange');
await localWatcher.wait_for('statechange');
assert_equals(localStream.state, 'closed');
}, 'finish() on a remote stream that has already finished fires a ' +
`statechange event to 'closed' on the remote side.`);
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.finish();
localStream.reset();
const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
const { stream: remoteStream } = await remoteWatcher.wait_for('quicstream');
const remoteStreamWatcher = new EventWatcher(t, remoteStream, 'statechange');
await remoteStreamWatcher.wait_for('statechange');
assert_equals(remoteStream.state, 'closing');
await remoteStreamWatcher.wait_for('statechange');
assert_equals(remoteStream.state, 'closed');
}, 'finish() then reset() fires two statechange events on the remote side.');
promise_test(async t => { promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] = const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t); await makeTwoConnectedQuicTransports(t);
...@@ -213,6 +170,24 @@ promise_test(async t => { ...@@ -213,6 +170,24 @@ promise_test(async t => {
}, 'write() throws if total write buffered amount would be greater than ' + }, 'write() throws if total write buffered amount would be greater than ' +
'maxWriteBufferedAmount.'); 'maxWriteBufferedAmount.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(10));
const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
await remoteWatcher.wait_for('quicstream');
}, 'write() causes quicstream event to fire on the remote transport.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.finish();
const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
await remoteWatcher.wait_for('quicstream');
}, 'finish() causes quicstream event to fire on the remote transport.');
promise_test(async t => { promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] = const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t); await makeTwoConnectedQuicTransports(t);
...@@ -302,4 +277,17 @@ closed_stream_test(async (t, stream) => { ...@@ -302,4 +277,17 @@ closed_stream_test(async (t, stream) => {
stream.waitForWriteBufferedAmountBelow(0)); stream.waitForWriteBufferedAmountBelow(0));
}, 'waitForWriteBufferedBelow() rejects with InvalidStateError.'); }, 'waitForWriteBufferedBelow() rejects with InvalidStateError.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
assert_object_equals(
localStream.readInto(new Uint8Array(10)),
{ amount: 0, finished: false });
}, 'readInto() on new local stream returns amount 0.');
closed_stream_test(async (t, stream) => {
assert_throws('InvalidStateError', () => stream.readInto(new Uint8Array(1)));
}, 'readInto() throws InvalidStateError.');
</script> </script>
...@@ -5591,6 +5591,7 @@ interface RTCPeerConnectionIceEvent : Event ...@@ -5591,6 +5591,7 @@ interface RTCPeerConnectionIceEvent : Event
method constructor method constructor
interface RTCQuicStream : EventTarget interface RTCQuicStream : EventTarget
attribute @@toStringTag attribute @@toStringTag
getter maxReadBufferedAmount
getter maxWriteBufferedAmount getter maxWriteBufferedAmount
getter onstatechange getter onstatechange
getter readBufferedAmount getter readBufferedAmount
...@@ -5599,6 +5600,7 @@ interface RTCQuicStream : EventTarget ...@@ -5599,6 +5600,7 @@ interface RTCQuicStream : EventTarget
getter writeBufferedAmount getter writeBufferedAmount
method constructor method constructor
method finish method finish
method readInto
method reset method reset
method waitForWriteBufferedAmountBelow method waitForWriteBufferedAmountBelow
method write method write
......
...@@ -623,6 +623,7 @@ modules_dictionary_idl_files = ...@@ -623,6 +623,7 @@ modules_dictionary_idl_files =
"peerconnection/rtc_peer_connection_ice_event_init.idl", "peerconnection/rtc_peer_connection_ice_event_init.idl",
"peerconnection/rtc_quic_parameters.idl", "peerconnection/rtc_quic_parameters.idl",
"peerconnection/rtc_quic_stream_event_init.idl", "peerconnection/rtc_quic_stream_event_init.idl",
"peerconnection/rtc_quic_stream_read_result.idl",
"peerconnection/rtc_rtcp_parameters.idl", "peerconnection/rtc_rtcp_parameters.idl",
"peerconnection/rtc_rtp_capabilities.idl", "peerconnection/rtc_rtp_capabilities.idl",
"peerconnection/rtc_rtp_codec_capability.idl", "peerconnection/rtc_rtp_codec_capability.idl",
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
// found in the LICENSE file. // found in the LICENSE file.
#include "third_party/blink/renderer/modules/peerconnection/rtc_quic_stream.h" #include "third_party/blink/renderer/modules/peerconnection/rtc_quic_stream.h"
#include "base/containers/span.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h" #include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/core/dom/events/event.h" #include "third_party/blink/renderer/core/dom/events/event.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h" #include "third_party/blink/renderer/platform/bindings/exception_state.h"
...@@ -10,6 +11,7 @@ ...@@ -10,6 +11,7 @@
namespace blink { namespace blink {
const uint32_t RTCQuicStream::kWriteBufferSize = 4 * 1024; const uint32_t RTCQuicStream::kWriteBufferSize = 4 * 1024;
const uint32_t RTCQuicStream::kReadBufferSize = 4 * 1024;
class RTCQuicStream::PendingWriteBufferedAmountPromise class RTCQuicStream::PendingWriteBufferedAmountPromise
: public GarbageCollected<PendingWriteBufferedAmountPromise> { : public GarbageCollected<PendingWriteBufferedAmountPromise> {
...@@ -59,7 +61,11 @@ String RTCQuicStream::state() const { ...@@ -59,7 +61,11 @@ String RTCQuicStream::state() const {
} }
uint32_t RTCQuicStream::readBufferedAmount() const { uint32_t RTCQuicStream::readBufferedAmount() const {
return read_buffered_amount_; return receive_buffer_.size();
}
uint32_t RTCQuicStream::maxReadBufferedAmount() const {
return kReadBufferSize;
} }
uint32_t RTCQuicStream::writeBufferedAmount() const { uint32_t RTCQuicStream::writeBufferedAmount() const {
...@@ -70,6 +76,33 @@ uint32_t RTCQuicStream::maxWriteBufferedAmount() const { ...@@ -70,6 +76,33 @@ uint32_t RTCQuicStream::maxWriteBufferedAmount() const {
return kWriteBufferSize; return kWriteBufferSize;
} }
RTCQuicStreamReadResult* RTCQuicStream::readInto(
NotShared<DOMUint8Array> data,
ExceptionState& exception_state) {
if (RaiseIfNotReadable(exception_state)) {
return 0;
}
uint32_t read_amount = static_cast<uint32_t>(receive_buffer_.ReadInto(
base::make_span(data.View()->Data(), data.View()->length())));
if (!received_fin_ && read_amount > 0) {
proxy_->MarkReceivedDataConsumed(read_amount);
}
if (receive_buffer_.empty() && received_fin_) {
read_fin_ = true;
if (wrote_fin_) {
DCHECK_EQ(state_, RTCQuicStreamState::kClosing);
Close(CloseReason::kReadWriteFinished);
} else {
DCHECK_EQ(state_, RTCQuicStreamState::kOpen);
state_ = RTCQuicStreamState::kClosing;
}
}
auto* result = RTCQuicStreamReadResult::Create();
result->setAmount(read_amount);
result->setFinished(read_fin_);
return result;
}
void RTCQuicStream::write(NotShared<DOMUint8Array> data, void RTCQuicStream::write(NotShared<DOMUint8Array> data,
ExceptionState& exception_state) { ExceptionState& exception_state) {
if (RaiseIfNotWritable(exception_state)) { if (RaiseIfNotWritable(exception_state)) {
...@@ -103,7 +136,7 @@ void RTCQuicStream::finish() { ...@@ -103,7 +136,7 @@ void RTCQuicStream::finish() {
} }
proxy_->WriteData({}, /*fin=*/true); proxy_->WriteData({}, /*fin=*/true);
wrote_fin_ = true; wrote_fin_ = true;
if (readable_) { if (!read_fin_) {
DCHECK_EQ(state_, RTCQuicStreamState::kOpen); DCHECK_EQ(state_, RTCQuicStreamState::kOpen);
state_ = RTCQuicStreamState::kClosing; state_ = RTCQuicStreamState::kClosing;
RejectPendingWaitForWriteBufferedAmountBelowPromises(); RejectPendingWaitForWriteBufferedAmountBelowPromises();
...@@ -139,6 +172,22 @@ ScriptPromise RTCQuicStream::waitForWriteBufferedAmountBelow( ...@@ -139,6 +172,22 @@ ScriptPromise RTCQuicStream::waitForWriteBufferedAmountBelow(
return promise; return promise;
} }
bool RTCQuicStream::RaiseIfNotReadable(ExceptionState& exception_state) {
if (read_fin_) {
exception_state.ThrowDOMException(
DOMExceptionCode::kInvalidStateError,
"The stream is not readable: The end of the stream has been read.");
return true;
}
if (IsClosed()) {
exception_state.ThrowDOMException(
DOMExceptionCode::kInvalidStateError,
"The stream is not readable: The stream is closed.");
return true;
}
return false;
}
bool RTCQuicStream::RaiseIfNotWritable(ExceptionState& exception_state) { bool RTCQuicStream::RaiseIfNotWritable(ExceptionState& exception_state) {
if (wrote_fin_) { if (wrote_fin_) {
exception_state.ThrowDOMException( exception_state.ThrowDOMException(
...@@ -176,20 +225,10 @@ void RTCQuicStream::OnRemoteReset() { ...@@ -176,20 +225,10 @@ void RTCQuicStream::OnRemoteReset() {
} }
void RTCQuicStream::OnDataReceived(Vector<uint8_t> data, bool fin) { void RTCQuicStream::OnDataReceived(Vector<uint8_t> data, bool fin) {
if (!fin) { DCHECK(!received_fin_);
return; DCHECK_LE(data.size(), kReadBufferSize - receive_buffer_.size());
} received_fin_ = fin;
DCHECK_NE(state_, RTCQuicStreamState::kClosed); receive_buffer_.Append(std::move(data));
DCHECK(readable_);
readable_ = false;
if (!wrote_fin_) {
DCHECK_EQ(state_, RTCQuicStreamState::kOpen);
state_ = RTCQuicStreamState::kClosing;
} else {
DCHECK_EQ(state_, RTCQuicStreamState::kClosing);
Close(CloseReason::kReadWriteFinished);
}
DispatchEvent(*Event::Create(event_type_names::kStatechange));
} }
void RTCQuicStream::OnWriteDataConsumed(uint32_t amount) { void RTCQuicStream::OnWriteDataConsumed(uint32_t amount) {
...@@ -250,7 +289,7 @@ void RTCQuicStream::Close(CloseReason reason) { ...@@ -250,7 +289,7 @@ void RTCQuicStream::Close(CloseReason reason) {
} }
// Clear observable state. // Clear observable state.
readable_ = false; receive_buffer_.Clear();
write_buffered_amount_ = 0; write_buffered_amount_ = 0;
// It's illegal to resolve or reject promises when the ExecutionContext is // It's illegal to resolve or reject promises when the ExecutionContext is
......
...@@ -11,6 +11,8 @@ ...@@ -11,6 +11,8 @@
#include "third_party/blink/renderer/modules/event_target_modules.h" #include "third_party/blink/renderer/modules/event_target_modules.h"
#include "third_party/blink/renderer/modules/modules_export.h" #include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/modules/peerconnection/adapters/quic_stream_proxy.h" #include "third_party/blink/renderer/modules/peerconnection/adapters/quic_stream_proxy.h"
#include "third_party/blink/renderer/modules/peerconnection/byte_buffer_queue.h"
#include "third_party/blink/renderer/modules/peerconnection/rtc_quic_stream_read_result.h"
#include "third_party/blink/renderer/modules/peerconnection/rtc_quic_transport.h" #include "third_party/blink/renderer/modules/peerconnection/rtc_quic_transport.h"
namespace blink { namespace blink {
...@@ -30,6 +32,7 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData, ...@@ -30,6 +32,7 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
public: public:
// TODO(steveanton): These maybe should be adjustable. // TODO(steveanton): These maybe should be adjustable.
static const uint32_t kWriteBufferSize; static const uint32_t kWriteBufferSize;
static const uint32_t kReadBufferSize;
enum class CloseReason { enum class CloseReason {
// Both read and write sides have been finished. // Both read and write sides have been finished.
...@@ -56,8 +59,11 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData, ...@@ -56,8 +59,11 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
RTCQuicTransport* transport() const; RTCQuicTransport* transport() const;
String state() const; String state() const;
uint32_t readBufferedAmount() const; uint32_t readBufferedAmount() const;
uint32_t maxReadBufferedAmount() const;
uint32_t writeBufferedAmount() const; uint32_t writeBufferedAmount() const;
uint32_t maxWriteBufferedAmount() const; uint32_t maxWriteBufferedAmount() const;
RTCQuicStreamReadResult* readInto(NotShared<DOMUint8Array> data,
ExceptionState& exception_state);
void write(NotShared<DOMUint8Array> data, ExceptionState& exception_state); void write(NotShared<DOMUint8Array> data, ExceptionState& exception_state);
void finish(); void finish();
void reset(); void reset();
...@@ -82,6 +88,7 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData, ...@@ -82,6 +88,7 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
void OnDataReceived(Vector<uint8_t> data, bool fin) override; void OnDataReceived(Vector<uint8_t> data, bool fin) override;
void OnWriteDataConsumed(uint32_t amount) override; void OnWriteDataConsumed(uint32_t amount) override;
bool RaiseIfNotReadable(ExceptionState&);
bool RaiseIfNotWritable(ExceptionState&); bool RaiseIfNotWritable(ExceptionState&);
// Permenantly closes the RTCQuicStream with the given reason. // Permenantly closes the RTCQuicStream with the given reason.
...@@ -95,8 +102,16 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData, ...@@ -95,8 +102,16 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
Member<RTCQuicTransport> transport_; Member<RTCQuicTransport> transport_;
RTCQuicStreamState state_ = RTCQuicStreamState::kOpen; RTCQuicStreamState state_ = RTCQuicStreamState::kOpen;
bool readable_ = true;
uint32_t read_buffered_amount_ = 0; // Data that has been received but not read.
// OnDataReceived() appends to the read buffer.
// readInto() will read out from the front of the buffer.
ByteBufferQueue receive_buffer_;
// True if the fin has been received from the network.
bool received_fin_ = false;
// True if the fin has been read out via readInto(). This signifies that the
// RTCQuicStream is closed for reading.
bool read_fin_ = false;
// Amount of bytes written but may not yet have been sent by the underlying // Amount of bytes written but may not yet have been sent by the underlying
// P2PQuicStream. // P2PQuicStream.
......
...@@ -19,8 +19,10 @@ enum RTCQuicStreamState { ...@@ -19,8 +19,10 @@ enum RTCQuicStreamState {
readonly attribute RTCQuicTransport transport; readonly attribute RTCQuicTransport transport;
readonly attribute RTCQuicStreamState state; readonly attribute RTCQuicStreamState state;
readonly attribute unsigned long readBufferedAmount; readonly attribute unsigned long readBufferedAmount;
readonly attribute unsigned long maxReadBufferedAmount;
readonly attribute unsigned long writeBufferedAmount; readonly attribute unsigned long writeBufferedAmount;
readonly attribute unsigned long maxWriteBufferedAmount; readonly attribute unsigned long maxWriteBufferedAmount;
[RaisesException] RTCQuicStreamReadResult readInto(Uint8Array data);
[RaisesException] void write(Uint8Array data); [RaisesException] void write(Uint8Array data);
void finish(); void finish();
void reset(); void reset();
......
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// https://w3c.github.io/webrtc-quic/#dom-rtcquicstreamreadresult
dictionary RTCQuicStreamReadResult {
unsigned long amount;
boolean finished;
};
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment