Commit 68ea9d19 authored by Adam Rice's avatar Adam Rice Committed by Commit Bot

Make ReadableStreamSerialize operation take a |port| parameter

It turns out to better to create the MessagePort early in C++ and then
pass it to ReadableStreamSerialize. Make ReadableStreamSerialize take a
second argument instead of creating its own MessageChannel.

This also enables us to remove MessageChannel from the binding object.

Also add wrapper methods on ReadableStream to abstract out the V8 Extras
implementation.

No user-visible changes.

BUG=894838

Change-Id: I2616855d34a925915919245ec10338463ac5ef37
Reviewed-on: https://chromium-review.googlesource.com/c/1329813Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarYuki Shiino <yukishiino@chromium.org>
Commit-Queue: Adam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/master@{#607498}
parent e94d2968
...@@ -130,29 +130,19 @@ void AddOriginals(ScriptState* script_state, v8::Local<v8::Object> binding) { ...@@ -130,29 +130,19 @@ void AddOriginals(ScriptState* script_state, v8::Local<v8::Object> binding) {
DCHECK(result); DCHECK(result);
}; };
v8::Local<v8::Value> message_channel = ObjectGet(global, "MessageChannel"); v8::Local<v8::Value> message_port = ObjectGet(global, "MessagePort");
// Some Worklets don't have MessageChannel. In this case, serialization will // Some Worklets don't have MessagePort. In this case, serialization will
// be disabled. // be disabled.
if (message_channel->IsUndefined()) if (message_port->IsUndefined())
return; return;
Bind("MessageChannel", message_channel);
v8::Local<v8::Value> message_channel_prototype =
GetPrototype(message_channel);
Bind("MessageChannel_port1_get",
GetOwnPDGet(message_channel_prototype, "port1"));
Bind("MessageChannel_port2_get",
GetOwnPDGet(message_channel_prototype, "port2"));
v8::Local<v8::Value> event_target_prototype = v8::Local<v8::Value> event_target_prototype =
GetPrototype(ObjectGet(global, "EventTarget")); GetPrototype(ObjectGet(global, "EventTarget"));
Bind("EventTarget_addEventListener", Bind("EventTarget_addEventListener",
ObjectGet(event_target_prototype, "addEventListener")); ObjectGet(event_target_prototype, "addEventListener"));
v8::Local<v8::Value> message_port_prototype = v8::Local<v8::Value> message_port_prototype = GetPrototype(message_port);
GetPrototype(ObjectGet(global, "MessagePort"));
Bind("MessagePort_postMessage", Bind("MessagePort_postMessage",
ObjectGet(message_port_prototype, "postMessage")); ObjectGet(message_port_prototype, "postMessage"));
Bind("MessagePort_close", ObjectGet(message_port_prototype, "close")); Bind("MessagePort_close", ObjectGet(message_port_prototype, "close"));
......
...@@ -1041,24 +1041,24 @@ ...@@ -1041,24 +1041,24 @@
// Functions for transferable streams. // Functions for transferable streams.
// //
function ReadableStreamSerialize(readable) { // The |port| which is passed to this function must be a MessagePort which is
// attached by a MessageChannel to the |port| that will be passed to
// ReadableStreamDeserialize.
function ReadableStreamSerialize(readable, port) {
// assert(IsReadableStream(readable), // assert(IsReadableStream(readable),
// `! IsReadableStream(_readable_) is true`); // `! IsReadableStream(_readable_) is true`);
if (IsReadableStreamLocked(readable)) { if (IsReadableStreamLocked(readable)) {
throw new TypeError(errCannotTransferLockedStream); throw new TypeError(errCannotTransferLockedStream);
} }
if (!binding.MessageChannel) { if (!binding.MessagePort_postMessage) {
throw new TypeError(errCannotTransferUnsupportedContext); throw new TypeError(errCannotTransferUnsupportedContext);
} }
const mc = new binding.MessageChannel(); const writable = CreateCrossRealmTransformWritable(port);
const writable = CreateCrossRealmTransformWritable(
callFunction(binding.MessageChannel_port1_get, mc));
const promise = const promise =
ReadableStreamPipeTo(readable, writable, false, false, false); ReadableStreamPipeTo(readable, writable, false, false, false);
markPromiseAsHandled(promise); markPromiseAsHandled(promise);
return callFunction(binding.MessageChannel_port2_get, mc);
} }
function ReadableStreamDeserialize(port) { function ReadableStreamDeserialize(port) {
......
...@@ -448,6 +448,25 @@ void ReadableStream::LockAndDisturb(ScriptState* script_state, ...@@ -448,6 +448,25 @@ void ReadableStream::LockAndDisturb(ScriptState* script_state,
ReadableStreamOperations::DefaultReaderRead(script_state, reader); ReadableStreamOperations::DefaultReaderRead(script_state, reader);
} }
void ReadableStream::Serialize(ScriptState* script_state,
MessagePort* port,
ExceptionState& exception_state) {
ReadableStreamOperations::Serialize(
script_state, GetInternalStream(script_state), port, exception_state);
}
// static
ReadableStream* ReadableStream::Deserialize(ScriptState* script_state,
MessagePort* port,
ExceptionState& exception_state) {
ScriptValue internal_stream = ReadableStreamOperations::Deserialize(
script_state, port, exception_state);
if (exception_state.HadException())
return nullptr;
return CreateFromInternalStream(script_state, internal_stream,
exception_state);
}
ScriptValue ReadableStream::GetInternalStream(ScriptState* script_state) const { ScriptValue ReadableStream::GetInternalStream(ScriptState* script_state) const {
return ScriptValue(script_state, return ScriptValue(script_state,
object_.NewLocal(script_state->GetIsolate())); object_.NewLocal(script_state->GetIsolate()));
......
...@@ -17,6 +17,7 @@ class ExceptionState; ...@@ -17,6 +17,7 @@ class ExceptionState;
class ScriptPromise; class ScriptPromise;
class ScriptState; class ScriptState;
class UnderlyingSourceBase; class UnderlyingSourceBase;
class MessagePort;
// This is an implementation of the corresponding IDL interface. // This is an implementation of the corresponding IDL interface.
// Use TraceWrapperMember to hold a reference to an instance of this class. // Use TraceWrapperMember to hold a reference to an instance of this class.
...@@ -86,6 +87,17 @@ class CORE_EXPORT ReadableStream : public ScriptWrappable { ...@@ -86,6 +87,17 @@ class CORE_EXPORT ReadableStream : public ScriptWrappable {
// Makes this stream locked and disturbed. // Makes this stream locked and disturbed.
void LockAndDisturb(ScriptState*, ExceptionState&); void LockAndDisturb(ScriptState*, ExceptionState&);
// Serialize this stream to |port|. The stream will be locked by this
// operation.
void Serialize(ScriptState*, MessagePort* port, ExceptionState&);
// Given a |port| which is entangled with a MessagePort that was previously
// passed to Serialize(), returns a new ReadableStream which behaves like it
// was the original.
static ReadableStream* Deserialize(ScriptState*,
MessagePort* port,
ExceptionState&);
ScriptValue GetInternalStream(ScriptState* script_state) const; ScriptValue GetInternalStream(ScriptState* script_state) const;
// In some cases we are known to fail to trace the stream correctly. In such // In some cases we are known to fail to trace the stream correctly. In such
......
...@@ -268,43 +268,37 @@ ScriptValue ReadableStreamOperations::Tee(ScriptState* script_state, ...@@ -268,43 +268,37 @@ ScriptValue ReadableStreamOperations::Tee(ScriptState* script_state,
return ScriptValue(script_state, result); return ScriptValue(script_state, result);
} }
MessagePort* ReadableStreamOperations::ReadableStreamSerialize( void ReadableStreamOperations::Serialize(ScriptState* script_state,
ScriptState* script_state, ScriptValue stream,
ScriptValue stream, MessagePort* port,
ExceptionState& exception_state) { ExceptionState& exception_state) {
DCHECK(port);
DCHECK(IsReadableStreamForDCheck(script_state, stream)); DCHECK(IsReadableStreamForDCheck(script_state, stream));
DCHECK(RuntimeEnabledFeatures::TransferableStreamsEnabled()); DCHECK(RuntimeEnabledFeatures::TransferableStreamsEnabled());
v8::TryCatch block(script_state->GetIsolate()); v8::TryCatch block(script_state->GetIsolate());
v8::Local<v8::Value> args[] = {stream.V8Value()}; v8::Local<v8::Value> port_v8_value = ToV8(port, script_state);
DCHECK(!port_v8_value.IsEmpty());
v8::Local<v8::Value> args[] = {stream.V8Value(), port_v8_value};
ScriptValue result( ScriptValue result(
script_state, script_state,
V8ScriptRunner::CallExtra(script_state, "ReadableStreamSerialize", args)); V8ScriptRunner::CallExtra(script_state, "ReadableStreamSerialize", args));
if (block.HasCaught()) { if (block.HasCaught()) {
exception_state.RethrowV8Exception(block.Exception()); exception_state.RethrowV8Exception(block.Exception());
return nullptr; return;
}
if (result.IsEmpty()) {
DCHECK(script_state->GetIsolate()->IsExecutionTerminating());
exception_state.ThrowDOMException(
DOMExceptionCode::kInvalidStateError,
"Serialize failed because execution is terminating");
return nullptr;
} }
return;
return V8MessagePort::ToImpl(
result.V8Value()->ToObject(script_state->GetContext()).ToLocalChecked());
} }
ScriptValue ReadableStreamOperations::ReadableStreamDeserialize( ScriptValue ReadableStreamOperations::Deserialize(
ScriptState* script_state, ScriptState* script_state,
MessagePort* port, MessagePort* port,
ExceptionState& exception_state) { ExceptionState& exception_state) {
DCHECK(port); DCHECK(port);
DCHECK(RuntimeEnabledFeatures::TransferableStreamsEnabled()); DCHECK(RuntimeEnabledFeatures::TransferableStreamsEnabled());
auto* isolate = script_state->GetIsolate(); auto* isolate = script_state->GetIsolate();
v8::Local<v8::Context> context = script_state->GetContext();
v8::Local<v8::Value> port_v8 = ToV8(port, context->Global(), isolate);
v8::TryCatch block(isolate); v8::TryCatch block(isolate);
v8::Local<v8::Value> port_v8 = ToV8(port, script_state);
DCHECK(!port_v8.IsEmpty());
v8::Local<v8::Value> args[] = {port_v8}; v8::Local<v8::Value> args[] = {port_v8};
ScriptValue result(script_state, ScriptValue result(script_state,
V8ScriptRunner::CallExtra( V8ScriptRunner::CallExtra(
...@@ -314,7 +308,7 @@ ScriptValue ReadableStreamOperations::ReadableStreamDeserialize( ...@@ -314,7 +308,7 @@ ScriptValue ReadableStreamOperations::ReadableStreamDeserialize(
return ScriptValue(); return ScriptValue();
} }
if (result.IsEmpty()) { if (result.IsEmpty()) {
DCHECK(script_state->GetIsolate()->IsExecutionTerminating()); DCHECK(isolate->IsExecutionTerminating());
return ScriptValue(); return ScriptValue();
} }
DCHECK(IsReadableStreamForDCheck(script_state, result)); DCHECK(IsReadableStreamForDCheck(script_state, result));
......
...@@ -132,19 +132,18 @@ class CORE_EXPORT ReadableStreamOperations { ...@@ -132,19 +132,18 @@ class CORE_EXPORT ReadableStreamOperations {
// This function assumes |IsReadableStream(stream)| and |!IsLocked(stream)| // This function assumes |IsReadableStream(stream)| and |!IsLocked(stream)|
static ScriptValue Tee(ScriptState*, ScriptValue stream, ExceptionState&); static ScriptValue Tee(ScriptState*, ScriptValue stream, ExceptionState&);
// ReadableStreamSerialize. Returns a MessagePort which can be passed to // ReadableStreamSerialize. The MessagePort passed in must be one half of a
// ReadableStreamDeserialize to produce an equivalent ReadableStream in a // MessageChannel. The other half can later be passed to Deserialize to
// different context. // produce an equivalent ReadableStream in a different context.
static MessagePort* ReadableStreamSerialize(ScriptState*, static void Serialize(ScriptState*,
ScriptValue stream, ScriptValue stream,
ExceptionState&); MessagePort* port,
ExceptionState&);
// ReadableStreamDeserialize returns a new ReadableStream in the current // ReadableStreamDeserialize returns a new ReadableStream in the current
// context given a MessagePort which has possibly be transferred from another // context given a MessagePort which is bound to one which was previously
// context. // passed to Serialize().
static ScriptValue ReadableStreamDeserialize(ScriptState*, static ScriptValue Deserialize(ScriptState*, MessagePort*, ExceptionState&);
MessagePort*,
ExceptionState&);
// ReadableStreamCancel // ReadableStreamCancel
// This function assumes |IsReadableStream(stream)| // This function assumes |IsReadableStream(stream)|
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h" #include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_readable_stream.h" #include "third_party/blink/renderer/bindings/core/v8/v8_readable_stream.h"
#include "third_party/blink/renderer/core/dom/document.h" #include "third_party/blink/renderer/core/dom/document.h"
#include "third_party/blink/renderer/core/messaging/message_channel.h"
#include "third_party/blink/renderer/core/streams/readable_stream.h" #include "third_party/blink/renderer/core/streams/readable_stream.h"
#include "third_party/blink/renderer/core/streams/readable_stream_default_controller_wrapper.h" #include "third_party/blink/renderer/core/streams/readable_stream_default_controller_wrapper.h"
#include "third_party/blink/renderer/core/streams/test_underlying_source.h" #include "third_party/blink/renderer/core/streams/test_underlying_source.h"
...@@ -588,13 +589,13 @@ TEST(ReadableStreamOperationsTest, Serialize) { ...@@ -588,13 +589,13 @@ TEST(ReadableStreamOperationsTest, Serialize) {
V8String(scope.GetIsolate(), "hello"))); V8String(scope.GetIsolate(), "hello")));
ScriptValue internal_stream = ScriptValue internal_stream =
stream->GetInternalStream(scope.GetScriptState()); stream->GetInternalStream(scope.GetScriptState());
MessagePort* port = ReadableStreamOperations::ReadableStreamSerialize( MessageChannel* channel = MessageChannel::Create(scope.GetExecutionContext());
scope.GetScriptState(), internal_stream, ASSERT_NO_EXCEPTION); ReadableStreamOperations::Serialize(scope.GetScriptState(), internal_stream,
EXPECT_TRUE(port); channel->port1(), ASSERT_NO_EXCEPTION);
EXPECT_TRUE(ReadableStreamOperations::IsLocked( EXPECT_TRUE(ReadableStreamOperations::IsLocked(
scope.GetScriptState(), internal_stream, ASSERT_NO_EXCEPTION)); scope.GetScriptState(), internal_stream, ASSERT_NO_EXCEPTION));
ScriptValue transferred = ReadableStreamOperations::ReadableStreamDeserialize( ScriptValue transferred = ReadableStreamOperations::Deserialize(
scope.GetScriptState(), port, ASSERT_NO_EXCEPTION); scope.GetScriptState(), channel->port2(), ASSERT_NO_EXCEPTION);
ASSERT_FALSE(transferred.IsEmpty()); ASSERT_FALSE(transferred.IsEmpty());
ScriptValue reader = ReadableStreamOperations::GetReader( ScriptValue reader = ReadableStreamOperations::GetReader(
scope.GetScriptState(), transferred, ASSERT_NO_EXCEPTION); scope.GetScriptState(), transferred, ASSERT_NO_EXCEPTION);
......
...@@ -9,12 +9,14 @@ ...@@ -9,12 +9,14 @@
#include "third_party/blink/renderer/bindings/core/v8/v8_binding_for_core.h" #include "third_party/blink/renderer/bindings/core/v8/v8_binding_for_core.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_binding_for_testing.h" #include "third_party/blink/renderer/bindings/core/v8/v8_binding_for_testing.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_extras_test_utils.h" #include "third_party/blink/renderer/bindings/core/v8/v8_extras_test_utils.h"
#include "third_party/blink/renderer/core/messaging/message_channel.h"
#include "third_party/blink/renderer/core/streams/readable_stream_default_controller_wrapper.h" #include "third_party/blink/renderer/core/streams/readable_stream_default_controller_wrapper.h"
#include "third_party/blink/renderer/core/streams/test_underlying_source.h" #include "third_party/blink/renderer/core/streams/test_underlying_source.h"
#include "third_party/blink/renderer/core/streams/underlying_source_base.h" #include "third_party/blink/renderer/core/streams/underlying_source_base.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h" #include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h" #include "third_party/blink/renderer/platform/bindings/script_state.h"
#include "third_party/blink/renderer/platform/heap/handle.h" #include "third_party/blink/renderer/platform/heap/handle.h"
#include "third_party/blink/renderer/platform/testing/unit_test_helpers.h"
#include "v8/include/v8.h" #include "v8/include/v8.h"
namespace blink { namespace blink {
...@@ -73,6 +75,11 @@ readAll(stream); ...@@ -73,6 +75,11 @@ readAll(stream);
return ToCoreString(result.As<v8::String>()); return ToCoreString(result.As<v8::String>());
} }
// Need to run the event loop for the Serialize test to pass messages
// through the MessagePort.
test::RunPendingTasks();
// Allow Promises to resolve.
v8::MicrotasksScope::PerformCheckpoint(isolate); v8::MicrotasksScope::PerformCheckpoint(isolate);
} }
NOTREACHED(); NOTREACHED();
...@@ -392,6 +399,37 @@ TEST_F(ReadableStreamTest, LockAndDisturb) { ...@@ -392,6 +399,37 @@ TEST_F(ReadableStreamTest, LockAndDisturb) {
base::make_optional(true)); base::make_optional(true));
} }
TEST_F(ReadableStreamTest, Serialize) {
RuntimeEnabledFeatures::SetTransferableStreamsEnabled(true);
V8TestingScope scope;
auto* script_state = scope.GetScriptState();
auto* underlying_source =
MakeGarbageCollected<TestUnderlyingSource>(script_state);
auto* stream = ReadableStream::CreateWithCountQueueingStrategy(
script_state, underlying_source, 0);
ASSERT_TRUE(stream);
MessageChannel* channel = MessageChannel::Create(scope.GetExecutionContext());
stream->Serialize(script_state, channel->port1(), ASSERT_NO_EXCEPTION);
EXPECT_TRUE(stream->IsLocked(script_state, ASSERT_NO_EXCEPTION));
auto* transferred = ReadableStream::Deserialize(
script_state, channel->port2(), ASSERT_NO_EXCEPTION);
ASSERT_TRUE(transferred);
underlying_source->Enqueue(
ScriptValue(script_state, V8String(script_state->GetIsolate(), "hello")));
underlying_source->Enqueue(
ScriptValue(script_state, V8String(script_state->GetIsolate(), ", bye")));
underlying_source->Close();
EXPECT_EQ(ReadAll(scope, transferred),
base::make_optional<String>("hello, bye"));
}
} // namespace } // namespace
} // namespace blink } // namespace blink
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment