Commit 53e071ee authored by Adam Rice's avatar Adam Rice Committed by Commit Bot

QuicTransport: Implement createSendStream()

Implement the SendStream interface and OutgoingStream mixin from the
WebTransport specification. Implement the QuicTransport
createSendStream() method to create a SendStream() object.

SendStream is trivial as all interfaces are inherited from
OutgoingStream.

A new interface, WebTransportCloseProxy acts a bridge between
QuicTransport and OutgoingStream. It abstracts away that a single
stream_id can be shared between an IncomingStream and OutgoingStream in
the case of a bidirectional stream.

Also add a browser test for the new API.

Bug: 1011392
Change-Id: I68731e606527d5dd94d54f9bfed05a22fde4d59e
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2024393Reviewed-by: default avatarKinuko Yasuda <kinuko@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Commit-Queue: Adam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/master@{#736817}
parent 21bb72f4
......@@ -203,5 +203,33 @@ IN_PROC_BROWSER_TEST_F(QuicTransportTest, ClientIndicationFailure) {
ASSERT_TRUE(WaitForTitle(ASCIIToUTF16("PASS"), {ASCIIToUTF16("FAIL")}));
}
IN_PROC_BROWSER_TEST_F(QuicTransportTest, CreateSendStream) {
ASSERT_TRUE(embedded_test_server()->Start());
ASSERT_TRUE(
NavigateToURL(shell(), embedded_test_server()->GetURL("/title2.html")));
ASSERT_TRUE(WaitForTitle(ASCIIToUTF16("Title Of Awesomeness")));
ASSERT_TRUE(ExecuteScript(
shell(), base::StringPrintf(R"JS(
async function run() {
const transport = new QuicTransport('quic-transport://localhost:%d/echo');
await transport.ready;
const sendStream = await transport.createSendStream();
const writer = sendStream.writable.getWriter();
await writer.write(new Uint8Array([65, 66, 67]));
await writer.close();
}
run().then(() => { document.title = 'PASS'; },
(e) => { console.log(e); document.title = 'FAIL'; });
)JS",
server_.server_address().port())));
ASSERT_TRUE(WaitForTitle(ASCIIToUTF16("PASS"), {ASCIIToUTF16("FAIL")}));
}
} // namespace
} // namespace content
......@@ -498,7 +498,9 @@ modules_idl_files =
"websockets/close_event.idl",
"websockets/websocket.idl",
"websockets/websocket_stream.idl",
"webtransport/outgoing_stream.idl",
"webtransport/quic_transport.idl",
"webtransport/send_stream.idl",
"webusb/usb.idl",
"webusb/usb_alternate_interface.idl",
"webusb/usb_configuration.idl",
......@@ -899,6 +901,7 @@ modules_dictionary_idl_files =
"websockets/websocket_connection.idl",
"websockets/websocket_stream_options.idl",
"webtransport/web_transport_close_info.idl",
"webtransport/stream_abort_info.idl",
"webusb/usb_connection_event_init.idl",
"webusb/usb_control_transfer_parameters.idl",
"webusb/usb_device_filter.idl",
......
......@@ -6,14 +6,24 @@ import("//third_party/blink/renderer/modules/modules.gni")
blink_modules_sources("webtransport") {
sources = [
"outgoing_stream.cc",
"outgoing_stream.h",
"quic_transport.cc",
"quic_transport.h",
"send_stream.cc",
"send_stream.h",
"web_transport_close_proxy.h",
]
}
jumbo_source_set("unit_tests") {
testonly = true
sources = [ "quic_transport_test.cc" ]
sources = [
"mock_web_transport_close_proxy.cc",
"mock_web_transport_close_proxy.h",
"outgoing_stream_test.cc",
"quic_transport_test.cc",
]
configs += [
"//third_party/blink/renderer:config",
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/webtransport/mock_web_transport_close_proxy.h"
namespace blink {
MockWebTransportCloseProxy::MockWebTransportCloseProxy() = default;
MockWebTransportCloseProxy::~MockWebTransportCloseProxy() = default;
} // namespace blink
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_MOCK_WEB_TRANSPORT_CLOSE_PROXY_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_MOCK_WEB_TRANSPORT_CLOSE_PROXY_H_
#include "testing/gmock/include/gmock/gmock.h"
#include "third_party/blink/renderer/modules/webtransport/web_transport_close_proxy.h"
namespace blink {
// A mock implementation of WebTransportCloseProxy.
class MockWebTransportCloseProxy : public WebTransportCloseProxy {
public:
// Constructor and destructor are out-of-line to reduce compile time:
// https://github.com/google/googletest/blob/master/googlemock/docs/cook_book.md#making-the-compilation-faster.
MockWebTransportCloseProxy();
~MockWebTransportCloseProxy() override;
MOCK_METHOD1(OnIncomingStreamClosed, void(bool));
MOCK_METHOD0(SendFin, void());
MOCK_METHOD0(Reset, void());
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_MOCK_WEB_TRANSPORT_CLOSE_PROXY_H_
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_OUTGOING_STREAM_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_OUTGOING_STREAM_H_
#include <stdint.h>
#include "base/containers/span.h"
#include "base/util/type_safety/strong_alias.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "third_party/blink/renderer/bindings/core/v8/active_script_wrappable.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/core/execution_context/context_lifecycle_observer.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/platform/bindings/script_wrappable.h"
#include "third_party/blink/renderer/platform/heap/thread_state.h"
#include "third_party/blink/renderer/platform/wtf/wtf_size_t.h"
namespace blink {
class ArrayBuffer;
class ScriptState;
class StreamAbortInfo;
class WebTransportCloseProxy;
class WritableStream;
class WritableStreamDefaultController;
// Implementation of the OutgoingStream mixin from the standard. SendStream and
// BidirectionalStream inherit from this.
class MODULES_EXPORT OutgoingStream
: public ScriptWrappable,
public ActiveScriptWrappable<OutgoingStream>,
public ContextLifecycleObserver {
DEFINE_WRAPPERTYPEINFO();
USING_PRE_FINALIZER(OutgoingStream, Dispose);
USING_GARBAGE_COLLECTED_MIXIN(OutgoingStream);
public:
OutgoingStream(ScriptState*,
WebTransportCloseProxy*,
mojo::ScopedDataPipeProducerHandle);
~OutgoingStream() override;
// Init() must be called before the stream is used.
virtual void Init();
WebTransportCloseProxy* GetWebTransportCloseProxy() { return close_proxy_; }
// Implementation of outgoing_stream.idl.
WritableStream* writable() const {
DVLOG(1) << "OutgoingStream::writable() called";
return writable_;
}
ScriptPromise writingAborted() const { return writing_aborted_; }
void abortWriting();
void abortWriting(StreamAbortInfo*);
// Called via WebTransportCloseProxy. Expects a JavaScript scope to be
// entered.
void Reset();
// OutgoingStream cannot be collected until it is explicitly closed, either
// remotely or locally.
bool HasPendingActivity() const final { return writing_aborted_resolver_; }
// Implementation of ContextLifecycleObserver.
void ContextDestroyed(ExecutionContext*) override;
void Trace(Visitor*) override;
private:
class UnderlyingSink;
using IsLocalAbort = util::StrongAlias<class IsLocalAbortTag, bool>;
// Called when |data_pipe_| becomes writable or errored.
void OnHandleReady(MojoResult, const mojo::HandleSignalsState&);
// Called when |data_pipe_| is closed.
void OnPeerClosed(MojoResult, const mojo::HandleSignalsState&);
// Rejects any unfinished write() calls and resets |data_pipe_|.
void HandlePipeClosed();
// Implements UnderlyingSink::write().
ScriptPromise SinkWrite(ScriptState*, ScriptValue chunk, ExceptionState&);
// Writes |data| to |data_pipe_|, possible saving unwritten data to
// |cached_data_|.
ScriptPromise WriteOrCacheData(ScriptState*, base::span<const uint8_t> data);
// Attempts to write some more of |cached_data_| to |data_pipe_|.
void WriteCachedData();
// Writes zero or more bytes of |data| synchronously to |data_pipe_|,
// returning the number of bytes that were written.
size_t WriteDataSynchronously(base::span<const uint8_t> data);
// Creates a DOMException indicating that the stream has been aborted.
// If IsLocalAbort it true it will indicate a locally-initiated abort,
// otherwise it will indicate a remote-initiated abort.
ScriptValue CreateAbortException(IsLocalAbort);
// Errors |writable_|, resolves |writing_aborted_| and resets |data_pipe_|.
// The error message used to error |writable_| depends on whether IsLocalAbort
// is true or not.
void ErrorStreamAbortAndReset(IsLocalAbort);
// Resolve the |writing_aborted_| promise and reset the |data_pipe_|.
void AbortAndReset();
// Resets |data_pipe_| and clears the watchers. Also discards |cached_data_|.
// If the pipe is open it will be closed as a side-effect.
void ResetPipe();
// Prepares the object for destruction.
void Dispose();
const Member<ScriptState> script_state_;
const Member<WebTransportCloseProxy> close_proxy_;
mojo::ScopedDataPipeProducerHandle data_pipe_;
// Only armed when we need to write something.
mojo::SimpleWatcher write_watcher_;
// Always armed to detect close.
mojo::SimpleWatcher close_watcher_;
// Data which has been passed to write() but still needs to be written
// asynchronously.
// Uses an ArrayBuffer rather than a Vector because WTF::Vector is currently
// limited to 2GB.
// TODO(ricea): Change this to a Vector when it becomes 64-bit safe.
scoped_refptr<ArrayBuffer> cached_data_;
// The offset into |cached_data_| of the first byte that still needs to be
// written.
wtf_size_t offset_ = 0;
Member<WritableStream> writable_;
Member<WritableStreamDefaultController> controller_;
// Promise returned by the |writingAborted| attribute.
ScriptPromise writing_aborted_;
Member<ScriptPromiseResolver> writing_aborted_resolver_;
// If an asynchronous write() on the underlying sink object is pending, this
// will be non-null.
Member<ScriptPromiseResolver> write_promise_resolver_;
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_OUTGOING_STREAM_H_
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// https://wicg.github.io/web-transport/#outgoingstream
[ActiveScriptWrappable]
interface mixin OutgoingStream {
readonly attribute WritableStream writable;
readonly attribute Promise<StreamAbortInfo> writingAborted;
void abortWriting(optional StreamAbortInfo abortInfo);
};
......@@ -16,6 +16,7 @@
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_array_buffer.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_array_buffer_view.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_throw_dom_exception.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
#include "third_party/blink/renderer/core/frame/csp/content_security_policy.h"
#include "third_party/blink/renderer/core/streams/readable_stream.h"
......@@ -24,6 +25,7 @@
#include "third_party/blink/renderer/core/streams/underlying_source_base.h"
#include "third_party/blink/renderer/core/streams/writable_stream.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h"
#include "third_party/blink/renderer/modules/webtransport/send_stream.h"
#include "third_party/blink/renderer/platform/bindings/exception_code.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h"
......@@ -142,6 +144,7 @@ class QuicTransport::DatagramUnderlyingSource final
ScriptPromise Cancel(ScriptState* script_state, ScriptValue reason) override {
// Stop Enqueue() from being called again.
quic_transport_->received_datagrams_controller_->NoteHasBeenCanceled();
quic_transport_->received_datagrams_controller_ = nullptr;
quic_transport_ = nullptr;
return ScriptPromise::CastUndefined(script_state);
......@@ -173,12 +176,61 @@ QuicTransport::QuicTransport(PassKey,
script_state_(script_state),
url_(NullURL(), url) {}
ScriptPromise QuicTransport::createSendStream(ScriptState* script_state,
ExceptionState& exception_state) {
DVLOG(1) << "QuicTransport::createSendStream() this=" << this;
if (!quic_transport_) {
// TODO(ricea): Should we wait if we're still connecting?
exception_state.ThrowDOMException(DOMExceptionCode::kNetworkError,
"No connection.");
return ScriptPromise();
}
MojoCreateDataPipeOptions options;
options.struct_size = sizeof(MojoCreateDataPipeOptions);
options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE;
options.element_num_bytes = 1;
// TODO(ricea): Find an appropriate value for capacity_num_bytes.
options.capacity_num_bytes = 0;
mojo::ScopedDataPipeProducerHandle data_pipe_producer;
mojo::ScopedDataPipeConsumerHandle data_pipe_consumer;
MojoResult result =
mojo::CreateDataPipe(&options, &data_pipe_producer, &data_pipe_consumer);
if (result != MOJO_RESULT_OK) {
// Probably out of resources.
exception_state.ThrowDOMException(DOMExceptionCode::kUnknownError,
"Insufficient resources.");
return ScriptPromise();
}
auto* resolver = MakeGarbageCollected<ScriptPromiseResolver>(script_state);
create_send_stream_resolvers_.insert(resolver);
quic_transport_->CreateStream(
std::move(data_pipe_consumer), mojo::ScopedDataPipeProducerHandle(),
WTF::Bind(&QuicTransport::OnCreateStreamResponse,
WrapWeakPersistent(this), WrapWeakPersistent(resolver),
std::move(data_pipe_producer)));
return resolver->Promise();
}
void QuicTransport::close(const WebTransportCloseInfo* close_info) {
DVLOG(1) << "QuicTransport::close() this=" << this;
// TODO(ricea): Send |close_info| to the network service.
if (cleanly_closed_) {
// close() has already been called. Ignore it.
return;
}
cleanly_closed_ = true;
received_datagrams_controller_->Close();
if (received_datagrams_controller_) {
received_datagrams_controller_->Close();
received_datagrams_controller_ = nullptr;
}
// If we don't manage to close the writable stream here, then it will
// error when a write() is attempted.
if (!WritableStream::IsLocked(outgoing_datagrams_) &&
......@@ -191,7 +243,8 @@ void QuicTransport::close(const WebTransportCloseInfo* close_info) {
v8::Local<v8::Value> reason = V8ThrowException::CreateTypeError(
script_state_->GetIsolate(), "Connection closed.");
ready_resolver_->Reject(reason);
Dispose();
RejectPendingStreamResolvers();
ResetAll();
}
void QuicTransport::OnConnectionEstablished(
......@@ -220,14 +273,14 @@ QuicTransport::~QuicTransport() = default;
void QuicTransport::OnHandshakeFailed() {
DVLOG(1) << "QuicTransport::OnHandshakeFailed() this=" << this;
ScriptState::Scope scope(script_state_);
{
ScriptState::Scope scope(script_state_);
v8::Local<v8::Value> reason = V8ThrowException::CreateTypeError(
script_state_->GetIsolate(), "Connection lost.");
ready_resolver_->Reject(reason);
closed_resolver_->Reject(reason);
}
Dispose();
ResetAll();
}
void QuicTransport::OnDatagramReceived(base::span<const uint8_t> data) {
......@@ -255,7 +308,8 @@ void QuicTransport::OnIncomingStreamClosed(uint32_t stream_id,
bool fin_received) {
DVLOG(1) << "QuicTransport::OnIncomingStreamClosed(" << stream_id << ", "
<< fin_received << ") this=" << this;
// TODO(ricea): Implement this.
WebTransportCloseProxy* stream = stream_map_.Take(stream_id);
stream->OnIncomingStreamClosed(fin_received);
}
void QuicTransport::ContextDestroyed(ExecutionContext* execution_context) {
......@@ -268,15 +322,22 @@ bool QuicTransport::HasPendingActivity() const {
return handshake_client_receiver_.is_bound() || client_receiver_.is_bound();
}
void QuicTransport::SendFin(uint32_t stream_id) {
quic_transport_->SendFin(stream_id);
stream_map_.erase(stream_id);
}
void QuicTransport::Trace(Visitor* visitor) {
visitor->Trace(received_datagrams_);
visitor->Trace(received_datagrams_controller_);
visitor->Trace(outgoing_datagrams_);
visitor->Trace(script_state_);
visitor->Trace(create_send_stream_resolvers_);
visitor->Trace(ready_resolver_);
visitor->Trace(ready_);
visitor->Trace(closed_resolver_);
visitor->Trace(closed_);
visitor->Trace(stream_map_);
ContextLifecycleObserver::Trace(visitor);
ScriptWrappable::Trace(visitor);
}
......@@ -356,28 +417,91 @@ void QuicTransport::Init(const String& url, ExceptionState& exception_state) {
script_state_, MakeGarbageCollected<DatagramUnderlyingSink>(this), 1);
}
void QuicTransport::ResetAll() {
DVLOG(1) << "QuicTransport::ResetAll() this=" << this;
// This loop is safe even if re-entered. It will always terminate because
// every iteration erases one entry from the map.
while (!stream_map_.IsEmpty()) {
auto it = stream_map_.begin();
auto close_proxy = it->value;
stream_map_.erase(it);
close_proxy->Reset();
}
Dispose();
}
void QuicTransport::Dispose() {
DVLOG(1) << "QuicTransport::Dispose() this=" << this;
quic_transport_.reset();
handshake_client_receiver_.reset();
client_receiver_.reset();
stream_map_.clear();
}
void QuicTransport::OnConnectionError() {
DVLOG(1) << "QuicTransport::OnConnectionError() this=" << this;
ScriptState::Scope scope(script_state_);
if (!cleanly_closed_) {
ScriptState::Scope scope(script_state_);
v8::Local<v8::Value> reason = V8ThrowException::CreateTypeError(
script_state_->GetIsolate(), "Connection lost.");
received_datagrams_controller_->Error(reason);
if (received_datagrams_controller_) {
received_datagrams_controller_->Error(reason);
received_datagrams_controller_ = nullptr;
}
WritableStreamDefaultController::Error(
script_state_, outgoing_datagrams_->Controller(), reason);
ready_resolver_->Reject(reason);
closed_resolver_->Reject(reason);
}
Dispose();
RejectPendingStreamResolvers();
ResetAll();
}
void QuicTransport::RejectPendingStreamResolvers() {
v8::Local<v8::Value> reason = V8ThrowException::CreateTypeError(
script_state_->GetIsolate(), "Connection lost.");
for (ScriptPromiseResolver* resolver : create_send_stream_resolvers_) {
resolver->Reject(reason);
}
create_send_stream_resolvers_.clear();
}
void QuicTransport::OnCreateStreamResponse(
ScriptPromiseResolver* resolver,
mojo::ScopedDataPipeProducerHandle producer,
bool succeeded,
uint32_t stream_id) {
DVLOG(1) << "QuicTransport::OnCreateStreamResponse() this=" << this
<< " succeeded=" << succeeded << " stream_id=" << stream_id;
// Shouldn't resolve the promise if the execution context has gone away.
if (!GetExecutionContext())
return;
// Shouldn't resolve the promise if the mojo interface is disconnected.
if (!resolver || !create_send_stream_resolvers_.Take(resolver))
return;
ScriptState::Scope scope(script_state_);
if (!succeeded) {
resolver->Reject(V8ThrowDOMException::CreateOrEmpty(
script_state_->GetIsolate(), DOMExceptionCode::kNetworkError,
"Failed to create send stream."));
return;
}
auto* send_stream = MakeGarbageCollected<SendStream>(
script_state_, this, stream_id, std::move(producer));
send_stream->Init();
// 0xfffffffe and 0xffffffff are reserved values in stream_map_.
CHECK_LT(stream_id, 0xfffffffe);
stream_map_.insert(stream_id, send_stream->GetWebTransportCloseProxy());
resolver->Resolve(send_stream);
}
} // namespace blink
......@@ -5,6 +5,8 @@
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_QUIC_TRANSPORT_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_QUIC_TRANSPORT_H_
#include <stdint.h>
#include "base/containers/span.h"
#include "base/util/type_safety/pass_key.h"
#include "mojo/public/cpp/bindings/pending_receiver.h"
......@@ -18,6 +20,7 @@
#include "third_party/blink/renderer/core/execution_context/context_lifecycle_observer.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/platform/bindings/script_wrappable.h"
#include "third_party/blink/renderer/platform/heap/heap_allocator.h"
#include "third_party/blink/renderer/platform/weborigin/kurl.h"
#include "third_party/blink/renderer/platform/wtf/forward.h"
......@@ -31,6 +34,9 @@ class ScriptPromiseResolver;
class ScriptState;
class WebTransportCloseInfo;
class WritableStream;
class ScriptPromise;
class ScriptPromiseResolver;
class WebTransportCloseProxy;
// https://wicg.github.io/web-transport/#quic-transport
class MODULES_EXPORT QuicTransport final
......@@ -53,6 +59,8 @@ class MODULES_EXPORT QuicTransport final
~QuicTransport() override;
// QuicTransport IDL implementation.
ScriptPromise createSendStream(ScriptState*, ExceptionState&);
WritableStream* sendDatagrams() { return outgoing_datagrams_; }
ReadableStream* receiveDatagrams() { return received_datagrams_; }
void close(const WebTransportCloseInfo*);
......@@ -76,6 +84,9 @@ class MODULES_EXPORT QuicTransport final
// Implementation of ActiveScriptWrappable
bool HasPendingActivity() const final;
// Forwards a SendFin() message to the mojo interface.
void SendFin(uint32_t stream_id);
// ScriptWrappable implementation
void Trace(Visitor* visitor) override;
......@@ -84,8 +95,16 @@ class MODULES_EXPORT QuicTransport final
class DatagramUnderlyingSource;
void Init(const String& url, ExceptionState&);
// Reset the QuicTransport object and all associated streams.
void ResetAll();
void Dispose();
void OnConnectionError();
void RejectPendingStreamResolvers();
void OnCreateStreamResponse(ScriptPromiseResolver*,
mojo::ScopedDataPipeProducerHandle producer,
bool succeeded,
uint32_t stream_id);
bool cleanly_closed_ = false;
Member<ReadableStream> received_datagrams_;
......@@ -95,9 +114,20 @@ class MODULES_EXPORT QuicTransport final
// This corresponds to the [[SentDatagrams]] internal slot in the standard.
Member<WritableStream> outgoing_datagrams_;
Member<ScriptState> script_state_;
const Member<ScriptState> script_state_;
const KURL url_;
// Map from stream_id to SendStream, ReceiveStream or BidirectionalStream.
// Intentionally keeps streams reachable by GC as long as they are open.
// This doesn't support stream ids of 0xfffffffe or larger.
// TODO(ricea): Find out if such large stream ids are possible.
HeapHashMap<uint32_t,
Member<WebTransportCloseProxy>,
WTF::DefaultHash<uint32_t>::Hash,
WTF::UnsignedWithZeroKeyHashTraits<uint32_t>>
stream_map_;
mojo::Remote<network::mojom::blink::QuicTransport> quic_transport_;
mojo::Receiver<network::mojom::blink::QuicTransportHandshakeClient>
handshake_client_receiver_{this};
......@@ -107,6 +137,10 @@ class MODULES_EXPORT QuicTransport final
ScriptPromise ready_;
Member<ScriptPromiseResolver> closed_resolver_;
ScriptPromise closed_;
// Tracks resolvers for in-progress createSendStream() operations so they can
// be rejected
HeapHashSet<Member<ScriptPromiseResolver>> create_send_stream_resolvers_;
};
} // namespace blink
......
......@@ -14,6 +14,12 @@
// QuicTransport includes, but we define all their methods/attributes here
// for simplicity.
// From UnidirectionalStreamsTransport
// TODO(ricea): Change this to Promise<SendStream> once the IDL compiler is
// fixed. See https://crbug.com/1047113.
[CallWith=ScriptState, RaisesException] Promise<any>
createSendStream();
// From DatagramTransport mixin
WritableStream sendDatagrams();
ReadableStream receiveDatagrams();
......
......@@ -20,16 +20,19 @@
#include "third_party/blink/renderer/bindings/core/v8/script_value.h"
#include "third_party/blink/renderer/bindings/core/v8/to_v8_for_core.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_binding_for_testing.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_dom_exception.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_gc_controller.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_uint8_array.h"
#include "third_party/blink/renderer/bindings/modules/v8/v8_web_transport_close_info.h"
#include "third_party/blink/renderer/bindings/modules/v8/v8_send_stream.h"
#include "third_party/blink/renderer/core/frame/csp/content_security_policy.h"
#include "third_party/blink/renderer/core/streams/readable_stream.h"
#include "third_party/blink/renderer/core/streams/readable_stream_default_reader.h"
#include "third_party/blink/renderer/core/streams/writable_stream.h"
#include "third_party/blink/renderer/core/streams/writable_stream_default_writer.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h"
#include "third_party/blink/renderer/modules/webtransport/send_stream.h"
#include "third_party/blink/renderer/modules/webtransport/web_transport_close_info.h"
#include "third_party/blink/renderer/platform/bindings/exception_code.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/testing/unit_test_helpers.h"
......@@ -46,6 +49,7 @@ using ::testing::Invoke;
using ::testing::Mock;
using ::testing::StrictMock;
using ::testing::Truly;
using ::testing::Unused;
class QuicTransportConnector final
: public mojom::blink::QuicTransportConnector {
......@@ -170,6 +174,28 @@ class QuicTransportTest : public ::testing::Test {
return quic_transport;
}
SendStream* CreateSendStreamSuccessfully(const V8TestingScope& scope,
QuicTransport* quic_transport) {
EXPECT_CALL(*mock_quic_transport_, CreateStream(_, _, _))
.WillOnce([this](Unused, Unused,
base::OnceCallback<void(bool, uint32_t)> callback) {
std::move(callback).Run(true, next_stream_id_++);
});
auto* script_state = scope.GetScriptState();
ScriptPromise send_stream_promise =
quic_transport->createSendStream(script_state, ASSERT_NO_EXCEPTION);
ScriptPromiseTester tester(script_state, send_stream_promise);
tester.WaitUntilSettled();
EXPECT_TRUE(tester.IsFulfilled());
auto* send_stream = V8SendStream::ToImplWithTypeCheck(
scope.GetIsolate(), tester.Value().V8Value());
EXPECT_TRUE(send_stream);
return send_stream;
}
void BindConnector(mojo::ScopedMessagePipeHandle handle) {
connector_.Bind(mojo::PendingReceiver<mojom::blink::QuicTransportConnector>(
std::move(handle)));
......@@ -186,6 +212,7 @@ class QuicTransportTest : public ::testing::Test {
QuicTransportConnector connector_;
std::unique_ptr<MockQuicTransport> mock_quic_transport_;
mojo::Remote<network::mojom::blink::QuicTransportClient> client_remote_;
uint32_t next_stream_id_ = 0;
base::WeakPtrFactory<QuicTransportTest> weak_ptr_factory_{this};
};
......@@ -642,6 +669,144 @@ TEST_F(QuicTransportTest, DatagramsAreDropped) {
ElementsAre('C'));
}
bool ValidProducerHandle(const mojo::ScopedDataPipeProducerHandle& handle) {
return handle.is_valid();
}
bool ValidConsumerHandle(const mojo::ScopedDataPipeConsumerHandle& handle) {
return handle.is_valid();
}
TEST_F(QuicTransportTest, CreateSendStream) {
V8TestingScope scope;
auto* quic_transport =
CreateAndConnectSuccessfully(scope, "quic-transport://example.com");
EXPECT_CALL(*mock_quic_transport_,
CreateStream(Truly(ValidConsumerHandle),
Not(Truly(ValidProducerHandle)), _))
.WillOnce([](Unused, Unused,
base::OnceCallback<void(bool, uint32_t)> callback) {
std::move(callback).Run(true, 0);
});
auto* script_state = scope.GetScriptState();
ScriptPromise send_stream_promise =
quic_transport->createSendStream(script_state, ASSERT_NO_EXCEPTION);
ScriptPromiseTester tester(script_state, send_stream_promise);
tester.WaitUntilSettled();
EXPECT_TRUE(tester.IsFulfilled());
auto* send_stream = V8SendStream::ToImplWithTypeCheck(
scope.GetIsolate(), tester.Value().V8Value());
EXPECT_TRUE(send_stream);
}
TEST_F(QuicTransportTest, CreateSendStreamBeforeConnect) {
V8TestingScope scope;
auto* script_state = scope.GetScriptState();
auto* quic_transport = QuicTransport::Create(
script_state, "quic-transport://example.com", ASSERT_NO_EXCEPTION);
auto& exception_state = scope.GetExceptionState();
ScriptPromise send_stream_promise =
quic_transport->createSendStream(script_state, exception_state);
EXPECT_TRUE(send_stream_promise.IsEmpty());
EXPECT_TRUE(exception_state.HadException());
EXPECT_EQ(static_cast<int>(DOMExceptionCode::kNetworkError),
exception_state.Code());
}
TEST_F(QuicTransportTest, CreateSendStreamFailure) {
V8TestingScope scope;
auto* quic_transport =
CreateAndConnectSuccessfully(scope, "quic-transport://example.com");
EXPECT_CALL(*mock_quic_transport_, CreateStream(_, _, _))
.WillOnce([](Unused, Unused,
base::OnceCallback<void(bool, uint32_t)> callback) {
std::move(callback).Run(false, 0);
});
auto* script_state = scope.GetScriptState();
ScriptPromise send_stream_promise =
quic_transport->createSendStream(script_state, ASSERT_NO_EXCEPTION);
ScriptPromiseTester tester(script_state, send_stream_promise);
tester.WaitUntilSettled();
EXPECT_TRUE(tester.IsRejected());
DOMException* exception = V8DOMException::ToImplWithTypeCheck(
scope.GetIsolate(), tester.Value().V8Value());
EXPECT_EQ(exception->name(), "NetworkError");
EXPECT_EQ(exception->message(), "Failed to create send stream.");
}
// Every active stream is kept alive by the QuicTransport object.
TEST_F(QuicTransportTest, SendStreamGarbageCollection) {
V8TestingScope scope;
WeakPersistent<QuicTransport> quic_transport;
WeakPersistent<SendStream> send_stream;
{
// The streams created when creating a QuicTransport or SendStream create
// some v8 handles. To ensure these are collected, we need to create a
// handle scope. This is not a problem for garbage collection in normal
// operation.
v8::HandleScope handle_scope(scope.GetIsolate());
quic_transport =
CreateAndConnectSuccessfully(scope, "quic-transport://example.com");
send_stream = CreateSendStreamSuccessfully(scope, quic_transport);
}
V8GCController::CollectAllGarbageForTesting(
scope.GetIsolate(), v8::EmbedderHeapTracer::EmbedderStackState::kEmpty);
EXPECT_TRUE(quic_transport);
EXPECT_TRUE(send_stream);
quic_transport->close(nullptr);
test::RunPendingTasks();
V8GCController::CollectAllGarbageForTesting(
scope.GetIsolate(), v8::EmbedderHeapTracer::EmbedderStackState::kEmpty);
EXPECT_FALSE(quic_transport);
EXPECT_FALSE(send_stream);
}
TEST_F(QuicTransportTest, CreateSendStreamAbortedByClose) {
V8TestingScope scope;
auto* script_state = scope.GetScriptState();
auto* quic_transport =
CreateAndConnectSuccessfully(scope, "quic-transport://example.com");
base::OnceCallback<void(bool, uint32_t)> create_stream_callback;
EXPECT_CALL(*mock_quic_transport_, CreateStream(_, _, _))
.WillOnce([&](Unused, Unused,
base::OnceCallback<void(bool, uint32_t)> callback) {
create_stream_callback = std::move(callback);
});
ScriptPromise send_stream_promise =
quic_transport->createSendStream(script_state, ASSERT_NO_EXCEPTION);
ScriptPromiseTester tester(script_state, send_stream_promise);
test::RunPendingTasks();
quic_transport->close(nullptr);
std::move(create_stream_callback).Run(true, 0);
tester.WaitUntilSettled();
EXPECT_TRUE(tester.IsRejected());
}
} // namespace
} // namespace blink
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/webtransport/send_stream.h"
#include <utility>
#include "third_party/blink/renderer/modules/webtransport/quic_transport.h"
#include "third_party/blink/renderer/platform/heap/visitor.h"
namespace blink {
namespace {
class CloseProxy : public WebTransportCloseProxy {
public:
CloseProxy(QuicTransport* quic_transport,
OutgoingStream* outgoing_stream,
uint32_t stream_id)
: quic_transport_(quic_transport),
outgoing_stream_(outgoing_stream),
stream_id_(stream_id) {}
void OnIncomingStreamClosed(bool fin_received) override {
// OnIncomingStreamClosed only applies to IncomingStreams.
}
void SendFin() override { quic_transport_->SendFin(stream_id_); }
void Reset() override { outgoing_stream_->Reset(); }
void Trace(Visitor* visitor) override {
visitor->Trace(quic_transport_);
visitor->Trace(outgoing_stream_);
WebTransportCloseProxy::Trace(visitor);
}
private:
const Member<QuicTransport> quic_transport_;
const Member<OutgoingStream> outgoing_stream_;
const uint32_t stream_id_;
};
} // namespace
SendStream::SendStream(ScriptState* script_state,
QuicTransport* quic_transport,
uint32_t stream_id,
mojo::ScopedDataPipeProducerHandle handle)
: OutgoingStream(
script_state,
MakeGarbageCollected<CloseProxy>(quic_transport, this, stream_id),
std::move(handle)) {}
} // namespace blink
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_SEND_STREAM_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_SEND_STREAM_H_
#include <stdint.h>
#include "mojo/public/cpp/system/data_pipe.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/modules/webtransport/outgoing_stream.h"
#include "third_party/blink/renderer/modules/webtransport/web_transport_close_proxy.h"
#include "third_party/blink/renderer/platform/heap/garbage_collected.h"
namespace blink {
class ScriptState;
class QuicTransport;
class MODULES_EXPORT SendStream final : public OutgoingStream {
DEFINE_WRAPPERTYPEINFO();
USING_GARBAGE_COLLECTED_MIXIN(SendStream);
public:
// SendStream doesn't have a JavaScript constructor. It is only constructed
// from C++.
explicit SendStream(ScriptState*,
QuicTransport*,
uint32_t stream_id,
mojo::ScopedDataPipeProducerHandle);
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_SEND_STREAM_H_
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// https://wicg.github.io/web-transport/#send-stream
[ Exposed=(Window,Worker),
ActiveScriptWrappable,
RuntimeEnabled=QuicTransport ]
interface SendStream {
};
SendStream includes OutgoingStream;
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// https://wicg.github.io/web-transport/#stream-abort-info
dictionary StreamAbortInfo {
unsigned short errorCode = 0;
};
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_WEB_TRANSPORT_CLOSE_PROXY_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_WEB_TRANSPORT_CLOSE_PROXY_H_
#include "third_party/blink/renderer/platform/heap/heap.h"
namespace blink {
class Visitor;
// This is an internal proxy type for passing close messages between
// QuicTransport and the IncomingStream and OutgoingStream mixins. It is not
// part of the standard. It exists to abstract over the fact that a single
// stream_id can correspond to one or two JavaScript streams.
class WebTransportCloseProxy : public GarbageCollected<WebTransportCloseProxy> {
public:
virtual ~WebTransportCloseProxy() = default;
// These match the mojo interfaces, but without the stream_id argument.
// Process an IncomingStreamClosed message from the network service. This is
// called by QuicTransport objects. May execute user JavaScript.
virtual void OnIncomingStreamClosed(bool fin_received) = 0;
// Send a Fin signal to the network service. This is used by OutgoingStream
// and IncomingStream.
virtual void SendFin() = 0;
// Called from QuicTransport whenever the mojo connection is torn down. Should
// close and free data pipes. May execute user JavaScript.
virtual void Reset() = 0;
virtual void Trace(Visitor*) {}
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_WEB_TRANSPORT_CLOSE_PROXY_H_
......@@ -1238,6 +1238,7 @@ interface QuicTransport
getter ready
method close
method constructor
method createSendStream
method receiveDatagrams
method sendDatagrams
interface ReadableStream
......@@ -1315,6 +1316,12 @@ interface SecurityPolicyViolationEvent : Event
getter statusCode
getter violatedDirective
method constructor
interface SendStream
attribute @@toStringTag
getter writable
getter writingAborted
method abortWriting
method constructor
interface ServiceWorker : EventTarget
attribute @@toStringTag
getter onerror
......
......@@ -1168,6 +1168,7 @@ Starting worker: resources/global-interface-listing-worker.js
[Worker] getter ready
[Worker] method close
[Worker] method constructor
[Worker] method createSendStream
[Worker] method receiveDatagrams
[Worker] method sendDatagrams
[Worker] interface ReadableStream
......@@ -1249,6 +1250,12 @@ Starting worker: resources/global-interface-listing-worker.js
[Worker] getter statusCode
[Worker] getter violatedDirective
[Worker] method constructor
[Worker] interface SendStream
[Worker] attribute @@toStringTag
[Worker] getter writable
[Worker] getter writingAborted
[Worker] method abortWriting
[Worker] method constructor
[Worker] interface Serial : EventTarget
[Worker] attribute @@toStringTag
[Worker] getter onconnect
......
......@@ -6213,6 +6213,7 @@ interface QuicTransport
getter ready
method close
method constructor
method createSendStream
method receiveDatagrams
method sendDatagrams
interface RTCCertificate
......@@ -7895,6 +7896,12 @@ interface Selection
method setBaseAndExtent
method setPosition
method toString
interface SendStream
attribute @@toStringTag
getter writable
getter writingAborted
method abortWriting
method constructor
interface Sensor : EventTarget
attribute @@toStringTag
getter activated
......
......@@ -1150,6 +1150,7 @@ Starting worker: resources/global-interface-listing-worker.js
[Worker] getter ready
[Worker] method close
[Worker] method constructor
[Worker] method createSendStream
[Worker] method receiveDatagrams
[Worker] method sendDatagrams
[Worker] interface ReadableStream
......@@ -1227,6 +1228,12 @@ Starting worker: resources/global-interface-listing-worker.js
[Worker] getter statusCode
[Worker] getter violatedDirective
[Worker] method constructor
[Worker] interface SendStream
[Worker] attribute @@toStringTag
[Worker] getter writable
[Worker] getter writingAborted
[Worker] method abortWriting
[Worker] method constructor
[Worker] interface ServiceWorkerRegistration : EventTarget
[Worker] attribute @@toStringTag
[Worker] getter active
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment