Commit 61720441 authored by Steve Anton's avatar Steve Anton Committed by Commit Bot

Implement RTCQuicStream.write()

Bug: 874296
Change-Id: I1786acb3b64fa0450e76fe27300c178081d1e191
Reviewed-on: https://chromium-review.googlesource.com/c/1285528
Commit-Queue: Steve Anton <steveanton@chromium.org>
Reviewed-by: default avatarHenrik Boström <hbos@chromium.org>
Cr-Commit-Position: refs/heads/master@{#607708}
parent b5b99113
'use strict';
// This file depends on RTCQuicTransport-helper.js which should be loaded from
// the main HTML file.
// The following helper methods are called from RTCQuicTransport-helper.js:
// makeTwoConnectedQuicTransports
// Run a test function for as many ways as an RTCQuicStream can transition to
// the 'closed' state.
// |test_func| will be called with the test as the first argument and the closed
// RTCQuicStream as the second argument.
function closed_stream_test(test_func, description) {
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.reset();
assert_equals(localStream.state, 'closed');
return test_func(t, localStream);
}, 'Stream closed by local reset(): ' + description);
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(1));
const remoteWatcher =
new EventWatcher(t, remoteQuicTransport, 'quicstream');
const { stream: remoteStream } = await remoteWatcher.wait_for('quicstream');
localStream.reset();
const remoteStreamWatcher =
new EventWatcher(t, remoteStream, 'statechange');
await remoteStreamWatcher.wait_for('statechange');
assert_equals(remoteStream.state, 'closed');
return test_func(t, remoteStream);
}, 'Stream closed by remote reset(): ' + description);
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localQuicTransport.stop();
assert_equals(localStream.state, 'closed');
return test_func(t, localStream);
}, 'Stream closed by local RTCQuicTransport stop(): ' + description);
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(1));
const remoteWatcher =
new EventWatcher(t, remoteQuicTransport,
[ 'quicstream', 'statechange' ]);
const { stream: remoteStream } = await remoteWatcher.wait_for('quicstream');
localQuicTransport.stop();
await remoteWatcher.wait_for('statechange');
assert_equals(localStream.state, 'closed');
return test_func(t, localStream);
}, 'Stream closed by remote RTCQuicTransport stop(): ' + description);
}
......@@ -5,6 +5,7 @@
<script src="/resources/testharnessreport.js"></script>
<script src="RTCIceTransport-extension-helper.js"></script>
<script src="RTCQuicTransport-helper.js"></script>
<script src="RTCQuicStream-helper.js"></script>
<script>
'use strict';
......@@ -14,6 +15,8 @@
// The following helper functions are called from RTCQuicTransport-helper.js:
// makeStandaloneQuicTransport
// makeTwoConnectedQuicTransports
// The following helper functions are called from RTCQuicStream-helper.js:
// closed_stream_test
promise_test(async t => {
const [ quicTransport, ] = await makeTwoConnectedQuicTransports(t);
......@@ -25,6 +28,8 @@ promise_test(async t => {
'Expect read buffered amount to be 0.');
assert_equals(quicStream.writeBufferedAmount, 0,
'Expect write buffered amount to be 0.');
assert_greater_than(quicStream.maxWriteBufferedAmount, 0,
'Expect max write buffered amount to be greater than 0.');
}, 'createStream() returns an RTCQuicStream with initial properties set.');
promise_test(async t => {
......@@ -155,4 +160,97 @@ promise_test(async t => {
assert_equals(remoteStream.state, 'closed');
}, 'finish() then reset() fires two statechange events on the remote side.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(0));
assert_equals(localStream.writeBufferedAmount, 0);
}, 'write() with an empty array does nothing.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array([65]));
assert_equals(localStream.writeBufferedAmount, 1);
localStream.write(new Uint8Array([66, 67]));
assert_equals(localStream.writeBufferedAmount, 3);
localStream.write(new Uint8Array([68, 69, 70]));
assert_equals(localStream.writeBufferedAmount, 6);
}, 'write() adds to writeBufferedAmount each call.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(localStream.maxWriteBufferedAmount));
assert_equals(localStream.writeBufferedAmount,
localStream.maxWriteBufferedAmount);
}, 'write() can write exactly maxWriteBufferedAmount.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
assert_throws('OperationError',
() =>
localStream.write(
new Uint8Array(localStream.maxWriteBufferedAmount + 1)));
assert_equals(localStream.writeBufferedAmount, 0);
}, 'write() throws if data longer than maxWriteBufferedAmount.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(10));
assert_throws('OperationError',
() =>
localStream.write(
new Uint8Array(localStream.maxWriteBufferedAmount)));
assert_equals(localStream.writeBufferedAmount, 10);
}, 'write() throws if total write buffered amount would be greater than ' +
'maxWriteBufferedAmount.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.finish();
assert_throws('InvalidStateError',
() => localStream.write(new Uint8Array()));
}, 'write() throws InvalidStateError if finish() has been called.');
closed_stream_test(async (t, stream) => {
assert_throws('InvalidStateError', () => stream.write(new Uint8Array()));
}, 'write() throws InvalidStateError.');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(10));
localStream.reset();
assert_equals(localStream.writeBufferedAmount, 0);
}, 'writeBufferedAmount set to 0 after local reset().');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(10));
localQuicTransport.stop();
assert_equals(localStream.writeBufferedAmount, 0);
}, 'writeBufferedAmount set to 0 after local RTCQuicTransport stop().');
promise_test(async t => {
const [ localQuicTransport, remoteQuicTransport ] =
await makeTwoConnectedQuicTransports(t);
const localStream = localQuicTransport.createStream();
localStream.write(new Uint8Array(10));
localStream.finish();
assert_equals(localStream.writeBufferedAmount, 10);
}, 'writeBufferedAmount maintained after finish() has been called.');
</script>
......@@ -5592,6 +5592,7 @@ interface RTCPeerConnectionIceEvent : Event
method constructor
interface RTCQuicStream : EventTarget
attribute @@toStringTag
getter maxWriteBufferedAmount
getter onstatechange
getter readBufferedAmount
getter state
......@@ -5600,6 +5601,7 @@ interface RTCQuicStream : EventTarget
method constructor
method finish
method reset
method write
setter onstatechange
interface RTCQuicStreamEvent : Event
attribute @@toStringTag
......
......@@ -12,11 +12,23 @@ namespace blink {
class MockP2PQuicStream : public testing::NiceMock<P2PQuicStream> {
public:
explicit MockP2PQuicStream(P2PQuicStream::Delegate** delegate_out = nullptr) {
if (delegate_out) {
// Ensure the caller has not left the delegate_out value floating.
DCHECK_EQ(nullptr, *delegate_out);
EXPECT_CALL(*this, SetDelegate(testing::_))
.WillOnce(testing::Invoke(
[delegate_out](P2PQuicStream::Delegate* delegate) {
*delegate_out = delegate;
}));
}
}
// P2PQuicStream overrides.
MOCK_METHOD0(Reset, void());
MOCK_METHOD1(SetDelegate, void(Delegate*));
MOCK_METHOD2(WriteData, void(Vector<uint8_t>, bool));
MOCK_METHOD1(MarkReceivedDataConsumed, void(uint32_t));
MOCK_METHOD2(WriteData, void(Vector<uint8_t>, bool));
MOCK_METHOD1(SetDelegate, void(Delegate*));
};
} // namespace blink
......
......@@ -5,9 +5,12 @@
#include "third_party/blink/renderer/core/dom/events/event.h"
#include "third_party/blink/renderer/modules/peerconnection/rtc_quic_transport.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
namespace blink {
const uint32_t RTCQuicStream::kWriteBufferSize = 4 * 1024;
RTCQuicStream::RTCQuicStream(ExecutionContext* context,
RTCQuicTransport* transport,
QuicStreamProxy* stream_proxy)
......@@ -49,12 +52,45 @@ uint32_t RTCQuicStream::writeBufferedAmount() const {
return write_buffered_amount_;
}
uint32_t RTCQuicStream::maxWriteBufferedAmount() const {
return kWriteBufferSize;
}
void RTCQuicStream::write(NotShared<DOMUint8Array> data,
ExceptionState& exception_state) {
if (IsClosed() || wrote_fin_) {
exception_state.ThrowDOMException(DOMExceptionCode::kInvalidStateError,
"The stream is not writable.");
return;
}
if (data.View()->length() == 0) {
return;
}
uint32_t remaining_write_buffer_size =
kWriteBufferSize - writeBufferedAmount();
if (data.View()->length() > remaining_write_buffer_size) {
exception_state.ThrowDOMException(
DOMExceptionCode::kOperationError,
"The write data size of " + String::Number(data.View()->length()) +
" bytes would exceed the remaining write buffer size of " +
String::Number(remaining_write_buffer_size) + " bytes.");
return;
}
Vector<uint8_t> data_vector(data.View()->length());
memcpy(data_vector.data(), data.View()->Data(), data.View()->length());
proxy_->WriteData(std::move(data_vector), /*fin=*/false);
write_buffered_amount_ += data.View()->length();
}
void RTCQuicStream::finish() {
if (!writeable_) {
if (IsClosed()) {
return;
}
if (wrote_fin_) {
return;
}
proxy_->WriteData({}, /*fin=*/true);
writeable_ = false;
wrote_fin_ = true;
if (readable_) {
DCHECK_EQ(state_, RTCQuicStreamState::kOpen);
state_ = RTCQuicStreamState::kClosing;
......@@ -69,15 +105,14 @@ void RTCQuicStream::reset() {
return;
}
proxy_->Reset();
writeable_ = false;
readable_ = false;
Close();
}
void RTCQuicStream::Stop() {
readable_ = false;
writeable_ = false;
state_ = RTCQuicStreamState::kClosed;
write_buffered_amount_ = 0;
proxy_ = nullptr;
}
......@@ -99,7 +134,7 @@ void RTCQuicStream::OnDataReceived(Vector<uint8_t> data, bool fin) {
DCHECK_NE(state_, RTCQuicStreamState::kClosed);
DCHECK(readable_);
readable_ = false;
if (writeable_) {
if (!wrote_fin_) {
DCHECK_EQ(state_, RTCQuicStreamState::kOpen);
state_ = RTCQuicStreamState::kClosing;
} else {
......@@ -109,7 +144,10 @@ void RTCQuicStream::OnDataReceived(Vector<uint8_t> data, bool fin) {
DispatchEvent(*Event::Create(event_type_names::kStatechange));
}
void RTCQuicStream::OnWriteDataConsumed(uint32_t amount) {}
void RTCQuicStream::OnWriteDataConsumed(uint32_t amount) {
DCHECK_GE(write_buffered_amount_, amount);
write_buffered_amount_ -= amount;
}
const AtomicString& RTCQuicStream::InterfaceName() const {
return event_target_names::kRTCQuicStream;
......
......@@ -6,6 +6,8 @@
#define THIRD_PARTY_BLINK_RENDERER_MODULES_PEERCONNECTION_RTC_QUIC_STREAM_H_
#include "third_party/blink/renderer/core/dom/context_lifecycle_observer.h"
#include "third_party/blink/renderer/core/typed_arrays/array_buffer_view_helpers.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h"
#include "third_party/blink/renderer/modules/event_target_modules.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/modules/peerconnection/adapters/quic_stream_proxy.h"
......@@ -22,6 +24,9 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
DEFINE_WRAPPERTYPEINFO();
public:
// TODO(steveanton): These maybe should be adjustable.
static const uint32_t kWriteBufferSize;
RTCQuicStream(ExecutionContext* context,
RTCQuicTransport* transport,
QuicStreamProxy* stream_proxy);
......@@ -36,6 +41,8 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
String state() const;
uint32_t readBufferedAmount() const;
uint32_t writeBufferedAmount() const;
uint32_t maxWriteBufferedAmount() const;
void write(NotShared<DOMUint8Array> data, ExceptionState& exception_state);
void finish();
void reset();
DEFINE_ATTRIBUTE_EVENT_LISTENER(statechange, kStatechange);
......@@ -61,9 +68,9 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
Member<RTCQuicTransport> transport_;
RTCQuicStreamState state_ = RTCQuicStreamState::kOpen;
bool readable_ = true;
bool writeable_ = true;
uint32_t read_buffered_amount_ = 0;
uint32_t write_buffered_amount_ = 0;
bool wrote_fin_ = false;
QuicStreamProxy* proxy_;
};
......
......@@ -20,6 +20,8 @@ enum RTCQuicStreamState {
readonly attribute RTCQuicStreamState state;
readonly attribute unsigned long readBufferedAmount;
readonly attribute unsigned long writeBufferedAmount;
readonly attribute unsigned long maxWriteBufferedAmount;
[RaisesException] void write(Uint8Array data);
void finish();
void reset();
attribute EventHandler onstatechange;
......
......@@ -16,13 +16,34 @@ namespace blink {
namespace {
using testing::_;
using testing::ElementsAre;
using testing::InvokeWithoutArgs;
using testing::Return;
using testing::SaveArg;
NotShared<DOMUint8Array> CreateUint8Array(const Vector<uint8_t>& data) {
return NotShared<DOMUint8Array>(
DOMUint8Array::Create(data.data(), data.size()));
}
NotShared<DOMUint8Array> CreateUint8ArrayOfLength(uint32_t length) {
return NotShared<DOMUint8Array>(DOMUint8Array::Create(length));
}
} // namespace
class RTCQuicStreamTest : public RTCQuicTransportTest {};
class RTCQuicStreamTest : public RTCQuicTransportTest {
public:
RTCQuicStream* CreateQuicStream(V8TestingScope& scope,
MockP2PQuicStream* mock_stream) {
auto p2p_quic_transport = std::make_unique<MockP2PQuicTransport>();
EXPECT_CALL(*p2p_quic_transport, CreateStream())
.WillOnce(Return(mock_stream));
Persistent<RTCQuicTransport> quic_transport =
CreateConnectedQuicTransport(scope, std::move(p2p_quic_transport));
return quic_transport->createStream(ASSERT_NO_EXCEPTION);
}
};
// Test that RTCQuicTransport.createStream causes CreateStream to be called on
// the underlying transport and a P2PQuicStream::Delegate to be set on the
......@@ -171,4 +192,146 @@ TEST_F(RTCQuicStreamTest, PendingOnRemoteResetIgnoredAfterReset) {
EXPECT_EQ("closed", quic_stream->state());
}
// The following group tests write(), finish(), writeBufferedAmount(), and
// maxWriteBufferdAmount().
// Test that write() adds to writeBufferedAmount().
TEST_F(RTCQuicStreamTest, WriteAddsToWriteBufferedAmount) {
V8TestingScope scope;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>();
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->write(CreateUint8Array({1, 2}), ASSERT_NO_EXCEPTION);
EXPECT_EQ(2u, stream->writeBufferedAmount());
stream->write(CreateUint8Array({3, 4, 5}), ASSERT_NO_EXCEPTION);
EXPECT_EQ(5u, stream->writeBufferedAmount());
RunUntilIdle();
}
// Test that write() calls WriteData() on the underlying P2PQuicStream.
TEST_F(RTCQuicStreamTest, WriteCallsWriteData) {
V8TestingScope scope;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>();
EXPECT_CALL(*p2p_quic_stream, WriteData(_, _))
.WillOnce(testing::Invoke([](Vector<uint8_t> data, bool fin) {
EXPECT_THAT(data, ElementsAre(1, 2, 3, 4));
EXPECT_FALSE(fin);
}));
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->write(CreateUint8Array({1, 2, 3, 4}), ASSERT_NO_EXCEPTION);
RunUntilIdle();
}
// Test that write() with no data succeeds but does not post a WriteData() to
// the underlying P2PQuicStream.
TEST_F(RTCQuicStreamTest, WriteWithEmptyArrayDoesNotCallWriteData) {
V8TestingScope scope;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>();
EXPECT_CALL(*p2p_quic_stream, WriteData(_, _)).Times(0);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->write(CreateUint8Array({}), ASSERT_NO_EXCEPTION);
RunUntilIdle();
}
// Test that finish() calls WriteData() on the underlying P2PQuicStream.
TEST_F(RTCQuicStreamTest, FinishCallsWriteData) {
V8TestingScope scope;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>();
EXPECT_CALL(*p2p_quic_stream, WriteData(_, _))
.WillOnce(testing::Invoke([](Vector<uint8_t> data, bool fin) {
EXPECT_THAT(data, ElementsAre());
EXPECT_TRUE(fin);
}));
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->finish();
RunUntilIdle();
}
// Test that writeBufferedAmount is decreased when receiving OnWriteDataConsumed
// from the underlying P2PQuicStream.
TEST_F(RTCQuicStreamTest, OnWriteDataConsumedSubtractsFromWriteBufferedAmount) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->write(CreateUint8ArrayOfLength(4), ASSERT_NO_EXCEPTION);
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnWriteDataConsumed(4);
RunUntilIdle();
EXPECT_EQ(0u, stream->writeBufferedAmount());
}
// Test that writeBufferedAmount is set to 0 if the stream was reset by the
// remote peer.
TEST_F(RTCQuicStreamTest, OnRemoteResetSetsWriteBufferedAmountToZero) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
stream->write(CreateUint8ArrayOfLength(4), ASSERT_NO_EXCEPTION);
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnRemoteReset();
RunUntilIdle();
EXPECT_EQ(0u, stream->writeBufferedAmount());
RunUntilIdle();
}
// Test that write throws an InvalidStateError if the stream was reset by the
// remote peer.
TEST_F(RTCQuicStreamTest, WriteThrowsIfRemoteReset) {
V8TestingScope scope;
P2PQuicStream::Delegate* stream_delegate = nullptr;
auto p2p_quic_stream = std::make_unique<MockP2PQuicStream>(&stream_delegate);
Persistent<RTCQuicStream> stream =
CreateQuicStream(scope, p2p_quic_stream.get());
RunUntilIdle();
ASSERT_TRUE(stream_delegate);
stream_delegate->OnRemoteReset();
RunUntilIdle();
stream->write(CreateUint8ArrayOfLength(1), scope.GetExceptionState());
EXPECT_EQ(DOMExceptionCode::kInvalidStateError,
scope.GetExceptionState().CodeAs<DOMExceptionCode>());
RunUntilIdle();
}
} // namespace blink
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment