Commit c88ab65c authored by Nidhi Jaju's avatar Nidhi Jaju Committed by Commit Bot

Implement Polymorphism for Readable Stream Readers

This CL aims to add polymorphism to the readable stream reader
classes. In particular, it makes the ReadableStreamGenericReader
the base class, and the ReadableStreamDefaultReader and
ReadableStreamBYOBReader classes inherit from the GenericReader.

This is based on the approach discussed in
https://docs.google.com/document/d/1rvKpGjppeqRSWntokY-ft_hU2i2us8gis6zgrrLWSTI/view

Bug: 614302
Change-Id: I4dde2976938325c21aeb87c6980ac4ad1def483f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2528557Reviewed-by: default avatarAdam Rice <ricea@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Commit-Queue: Nidhi Jaju <nidhijaju@google.com>
Cr-Commit-Position: refs/heads/master@{#827034}
parent 0d4a6638
...@@ -27,6 +27,7 @@ blink_core_sources_streams = [ ...@@ -27,6 +27,7 @@ blink_core_sources_streams = [
"readable_stream_default_controller.h", "readable_stream_default_controller.h",
"readable_stream_default_controller_with_script_scope.cc", "readable_stream_default_controller_with_script_scope.cc",
"readable_stream_default_controller_with_script_scope.h", "readable_stream_default_controller_with_script_scope.h",
"readable_stream_default_reader.cc",
"readable_stream_default_reader.h", "readable_stream_default_reader.h",
"readable_stream_generic_reader.cc", "readable_stream_generic_reader.cc",
"readable_stream_generic_reader.h", "readable_stream_generic_reader.h",
......
...@@ -689,7 +689,7 @@ class ReadableStream::PipeToEngine final ...@@ -689,7 +689,7 @@ class ReadableStream::PipeToEngine final
Member<ScriptState> script_state_; Member<ScriptState> script_state_;
Member<PipeOptions> pipe_options_; Member<PipeOptions> pipe_options_;
Member<ReadableStreamGenericReader> reader_; Member<ReadableStreamDefaultReader> reader_;
Member<WritableStreamDefaultWriter> writer_; Member<WritableStreamDefaultWriter> writer_;
Member<StreamPromiseResolver> promise_; Member<StreamPromiseResolver> promise_;
TraceWrapperV8Reference<v8::Promise> last_write_; TraceWrapperV8Reference<v8::Promise> last_write_;
...@@ -729,7 +729,7 @@ class ReadableStream::TeeEngine final : public GarbageCollected<TeeEngine> { ...@@ -729,7 +729,7 @@ class ReadableStream::TeeEngine final : public GarbageCollected<TeeEngine> {
class CancelAlgorithm; class CancelAlgorithm;
Member<ReadableStream> stream_; Member<ReadableStream> stream_;
Member<ReadableStreamGenericReader> reader_; Member<ReadableStreamDefaultReader> reader_;
Member<StreamPromiseResolver> cancel_promise_; Member<StreamPromiseResolver> cancel_promise_;
bool closed_ = false; bool closed_ = false;
...@@ -759,7 +759,7 @@ class ReadableStream::TeeEngine::PullAlgorithm final : public StreamAlgorithm { ...@@ -759,7 +759,7 @@ class ReadableStream::TeeEngine::PullAlgorithm final : public StreamAlgorithm {
// and performs the following steps: // and performs the following steps:
return StreamThenPromise( return StreamThenPromise(
script_state->GetContext(), script_state->GetContext(),
ReadableStreamGenericReader::Read(script_state, engine_->reader_) ReadableStreamDefaultReader::Read(script_state, engine_->reader_)
->V8Promise(script_state->GetIsolate()), ->V8Promise(script_state->GetIsolate()),
MakeGarbageCollected<ResolveFunction>(script_state, engine_)); MakeGarbageCollected<ResolveFunction>(script_state, engine_));
} }
...@@ -1596,7 +1596,10 @@ StreamPromiseResolver* ReadableStream::AddReadRequest(ScriptState* script_state, ...@@ -1596,7 +1596,10 @@ StreamPromiseResolver* ReadableStream::AddReadRequest(ScriptState* script_state,
ReadableStream* stream) { ReadableStream* stream) {
// https://streams.spec.whatwg.org/#readable-stream-add-read-request // https://streams.spec.whatwg.org/#readable-stream-add-read-request
// 1. Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true. // 1. Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.
DCHECK(stream->reader_); ReadableStreamGenericReader* reader = stream->reader_;
ReadableStreamDefaultReader* default_reader =
static_cast<ReadableStreamDefaultReader*>(reader);
DCHECK(default_reader);
// 2. Assert: stream.[[state]] is "readable". // 2. Assert: stream.[[state]] is "readable".
CHECK_EQ(stream->state_, kReadable); CHECK_EQ(stream->state_, kReadable);
...@@ -1609,7 +1612,7 @@ StreamPromiseResolver* ReadableStream::AddReadRequest(ScriptState* script_state, ...@@ -1609,7 +1612,7 @@ StreamPromiseResolver* ReadableStream::AddReadRequest(ScriptState* script_state,
// 4. Let readRequest be Record {[[promise]]: promise}. // 4. Let readRequest be Record {[[promise]]: promise}.
// 5. Append readRequest as the last element of stream.[[reader]]. // 5. Append readRequest as the last element of stream.[[reader]].
// [[readRequests]]. // [[readRequests]].
stream->reader_->read_requests_.push_back(promise); default_reader->read_requests_.push_back(promise);
// 6. Return promise. // 6. Return promise.
return promise; return promise;
...@@ -1679,18 +1682,23 @@ void ReadableStream::Close(ScriptState* script_state, ReadableStream* stream) { ...@@ -1679,18 +1682,23 @@ void ReadableStream::Close(ScriptState* script_state, ReadableStream* stream) {
// TODO(ricea): Support BYOB readers. // TODO(ricea): Support BYOB readers.
// 5. If ! IsReadableStreamDefaultReader(reader) is true, // 5. If ! IsReadableStreamDefaultReader(reader) is true,
// TODO(nidhijaju): Add a check that the reader is a default reader once BYOB
// readers have been implemented, so the static_cast can be changed later on.
// a. Repeat for each readRequest that is an element of reader. // a. Repeat for each readRequest that is an element of reader.
// [[readRequests]], // [[readRequests]],
HeapDeque<Member<StreamPromiseResolver>> requests; HeapDeque<Member<StreamPromiseResolver>> requests;
requests.Swap(reader->read_requests_); requests.Swap(
static_cast<ReadableStreamDefaultReader*>(reader)->read_requests_);
for (StreamPromiseResolver* promise : requests) { for (StreamPromiseResolver* promise : requests) {
// i. Resolve readRequest.[[promise]] with ! // i. Resolve readRequest.[[promise]] with !
// ReadableStreamCreateReadResult(undefined, true, reader. // ReadableStreamCreateReadResult(undefined, true, reader.
// [[forAuthorCode]]). // [[forAuthorCode]]).
promise->Resolve(script_state, promise->Resolve(
CreateReadResult(script_state, script_state,
v8::Undefined(script_state->GetIsolate()), CreateReadResult(script_state,
true, reader->for_author_code_)); v8::Undefined(script_state->GetIsolate()), true,
static_cast<ReadableStreamDefaultReader*>(reader)
->for_author_code_));
} }
// b. Set reader.[[readRequests]] to an empty List. // b. Set reader.[[readRequests]] to an empty List.
...@@ -1772,13 +1780,15 @@ void ReadableStream::Error(ScriptState* script_state, ...@@ -1772,13 +1780,15 @@ void ReadableStream::Error(ScriptState* script_state,
// TODO(ricea): Support BYOB readers. // TODO(ricea): Support BYOB readers.
// a. Repeat for each readRequest that is an element of reader. // a. Repeat for each readRequest that is an element of reader.
// [[readRequests]], // [[readRequests]],
for (StreamPromiseResolver* promise : reader->read_requests_) { ReadableStreamDefaultReader* default_reader =
static_cast<ReadableStreamDefaultReader*>(reader);
for (StreamPromiseResolver* promise : default_reader->read_requests_) {
// i. Reject readRequest.[[promise]] with e. // i. Reject readRequest.[[promise]] with e.
promise->Reject(script_state, e); promise->Reject(script_state, e);
} }
// b. Set reader.[[readRequests]] to a new empty List. // b. Set reader.[[readRequests]] to a new empty List.
reader->read_requests_.clear(); default_reader->read_requests_.clear();
// 9. Reject reader.[[closedPromise]] with e. // 9. Reject reader.[[closedPromise]] with e.
reader->closed_promise_->Reject(script_state, e); reader->closed_promise_->Reject(script_state, e);
...@@ -1794,26 +1804,30 @@ void ReadableStream::FulfillReadRequest(ScriptState* script_state, ...@@ -1794,26 +1804,30 @@ void ReadableStream::FulfillReadRequest(ScriptState* script_state,
// https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request // https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request
// 1. Let reader be stream.[[reader]]. // 1. Let reader be stream.[[reader]].
ReadableStreamGenericReader* reader = stream->reader_; ReadableStreamGenericReader* reader = stream->reader_;
ReadableStreamDefaultReader* default_reader =
static_cast<ReadableStreamDefaultReader*>(reader);
// 2. Let readRequest be the first element of reader.[[readRequests]]. // 2. Let readRequest be the first element of reader.[[readRequests]].
StreamPromiseResolver* read_request = reader->read_requests_.front(); StreamPromiseResolver* read_request = default_reader->read_requests_.front();
// 3. Remove readIntoRequest from reader.[[readIntoRequests]], shifting all // 3. Remove readIntoRequest from reader.[[readIntoRequests]], shifting all
// other elements downward (so that the second becomes the first, and so // other elements downward (so that the second becomes the first, and so
// on). // on).
reader->read_requests_.pop_front(); default_reader->read_requests_.pop_front();
// 4. Resolve readIntoRequest.[[promise]] with ! // 4. Resolve readIntoRequest.[[promise]] with !
// ReadableStreamCreateReadResult(chunk, done, reader.[[forAuthorCode]]). // ReadableStreamCreateReadResult(chunk, done, reader.[[forAuthorCode]]).
read_request->Resolve( read_request->Resolve(script_state, ReadableStream::CreateReadResult(
script_state, ReadableStream::CreateReadResult(script_state, chunk, done, script_state, chunk, done,
reader->for_author_code_)); default_reader->for_author_code_));
} }
int ReadableStream::GetNumReadRequests(const ReadableStream* stream) { int ReadableStream::GetNumReadRequests(const ReadableStream* stream) {
// https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests // https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests
// 1. Return the number of elements in stream.[[reader]].[[readRequests]]. // 1. Return the number of elements in stream.[[reader]].[[readRequests]].
return stream->reader_->read_requests_.size(); ReadableStreamGenericReader* reader = stream->reader_;
return static_cast<ReadableStreamDefaultReader*>(reader)
->read_requests_.size();
} }
// //
......
...@@ -224,6 +224,7 @@ class CORE_EXPORT ReadableStream : public ScriptWrappable { ...@@ -224,6 +224,7 @@ class CORE_EXPORT ReadableStream : public ScriptWrappable {
private: private:
friend class ReadableStreamDefaultController; friend class ReadableStreamDefaultController;
friend class ReadableStreamDefaultReader;
friend class ReadableStreamGenericReader; friend class ReadableStreamGenericReader;
class PipeToEngine; class PipeToEngine;
......
...@@ -43,24 +43,6 @@ void ReadableStreamBYOBReader::releaseLock(ScriptState* script_state, ...@@ -43,24 +43,6 @@ void ReadableStreamBYOBReader::releaseLock(ScriptState* script_state,
return; return;
} }
ScriptPromise ReadableStreamBYOBReader::closed(
ScriptState* script_state) const {
return RejectUnimplemented(script_state);
}
ScriptPromise ReadableStreamBYOBReader::cancel(
ScriptState* script_state,
ExceptionState& exception_state) {
return RejectUnimplemented(script_state);
}
ScriptPromise ReadableStreamBYOBReader::cancel(
ScriptState* script_state,
ScriptValue reason,
ExceptionState& exception_state) {
return RejectUnimplemented(script_state);
}
void ReadableStreamBYOBReader::ThrowUnimplemented( void ReadableStreamBYOBReader::ThrowUnimplemented(
ExceptionState& exception_state) { ExceptionState& exception_state) {
exception_state.ThrowTypeError("unimplemented"); exception_state.ThrowTypeError("unimplemented");
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "third_party/blink/renderer/bindings/core/v8/script_value.h" #include "third_party/blink/renderer/bindings/core/v8/script_value.h"
#include "third_party/blink/renderer/core/core_export.h" #include "third_party/blink/renderer/core/core_export.h"
#include "third_party/blink/renderer/core/streams/readable_stream_generic_reader.h"
#include "third_party/blink/renderer/core/typed_arrays/array_buffer_view_helpers.h" #include "third_party/blink/renderer/core/typed_arrays/array_buffer_view_helpers.h"
#include "third_party/blink/renderer/platform/bindings/script_wrappable.h" #include "third_party/blink/renderer/platform/bindings/script_wrappable.h"
...@@ -18,7 +19,8 @@ class ScriptState; ...@@ -18,7 +19,8 @@ class ScriptState;
class ReadableStream; class ReadableStream;
class DOMArrayBufferView; class DOMArrayBufferView;
class CORE_EXPORT ReadableStreamBYOBReader : public ScriptWrappable { class CORE_EXPORT ReadableStreamBYOBReader
: public ReadableStreamGenericReader {
DEFINE_WRAPPERTYPEINFO(); DEFINE_WRAPPERTYPEINFO();
public: public:
...@@ -40,13 +42,6 @@ class CORE_EXPORT ReadableStreamBYOBReader : public ScriptWrappable { ...@@ -40,13 +42,6 @@ class CORE_EXPORT ReadableStreamBYOBReader : public ScriptWrappable {
// https://streams.spec.whatwg.org/#byob-reader-release-lock // https://streams.spec.whatwg.org/#byob-reader-release-lock
void releaseLock(ScriptState*, ExceptionState&); void releaseLock(ScriptState*, ExceptionState&);
// https://streams.spec.whatwg.org/#generic-reader-closed
ScriptPromise closed(ScriptState*) const;
// https://streams.spec.whatwg.org/#generic-reader-cancel
ScriptPromise cancel(ScriptState*, ExceptionState&);
ScriptPromise cancel(ScriptState*, ScriptValue reason, ExceptionState&);
private: private:
friend class ReadableStream; friend class ReadableStream;
static void ThrowUnimplemented(ExceptionState&); static void ThrowUnimplemented(ExceptionState&);
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "third_party/blink/renderer/core/streams/promise_handler.h" #include "third_party/blink/renderer/core/streams/promise_handler.h"
#include "third_party/blink/renderer/core/streams/queue_with_sizes.h" #include "third_party/blink/renderer/core/streams/queue_with_sizes.h"
#include "third_party/blink/renderer/core/streams/readable_stream.h" #include "third_party/blink/renderer/core/streams/readable_stream.h"
#include "third_party/blink/renderer/core/streams/readable_stream_generic_reader.h" #include "third_party/blink/renderer/core/streams/readable_stream_default_reader.h"
#include "third_party/blink/renderer/core/streams/stream_algorithms.h" #include "third_party/blink/renderer/core/streams/stream_algorithms.h"
#include "third_party/blink/renderer/core/streams/stream_promise_resolver.h" #include "third_party/blink/renderer/core/streams/stream_promise_resolver.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h" #include "third_party/blink/renderer/platform/bindings/exception_state.h"
...@@ -314,10 +314,12 @@ StreamPromiseResolver* ReadableStreamDefaultController::PullSteps( ...@@ -314,10 +314,12 @@ StreamPromiseResolver* ReadableStreamDefaultController::PullSteps(
// d. Return a promise resolved with ! // d. Return a promise resolved with !
// ReadableStreamCreateReadResult(chunk, false, // ReadableStreamCreateReadResult(chunk, false,
// stream.[[reader]].[[forAuthorCode]]). // stream.[[reader]].[[forAuthorCode]]).
ReadableStreamGenericReader* reader = stream->reader_;
return StreamPromiseResolver::CreateResolved( return StreamPromiseResolver::CreateResolved(
script_state, script_state, ReadableStream::CreateReadResult(
ReadableStream::CreateReadResult(script_state, chunk, false, script_state, chunk, false,
stream->reader_->for_author_code_)); static_cast<ReadableStreamDefaultReader*>(reader)
->for_author_code_));
} }
// 3. Let pendingPromise be ! ReadableStreamAddReadRequest(stream). // 3. Let pendingPromise be ! ReadableStreamAddReadRequest(stream).
......
...@@ -74,7 +74,7 @@ class ReadableStreamDefaultController : public ScriptWrappable { ...@@ -74,7 +74,7 @@ class ReadableStreamDefaultController : public ScriptWrappable {
private: private:
friend class ReadableStream; friend class ReadableStream;
friend class ReadableStreamGenericReader; friend class ReadableStreamDefaultReader;
// https://streams.spec.whatwg.org/#rs-default-controller-private-cancel // https://streams.spec.whatwg.org/#rs-default-controller-private-cancel
v8::Local<v8::Promise> CancelSteps(ScriptState*, v8::Local<v8::Value> reason); v8::Local<v8::Promise> CancelSteps(ScriptState*, v8::Local<v8::Value> reason);
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/core/streams/readable_stream_default_reader.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/core/streams/readable_stream.h"
#include "third_party/blink/renderer/core/streams/readable_stream_default_controller.h"
#include "third_party/blink/renderer/core/streams/stream_promise_resolver.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h"
namespace blink {
ReadableStreamDefaultReader* ReadableStreamDefaultReader::Create(
ScriptState* script_state,
ReadableStream* stream,
ExceptionState& exception_state) {
auto* reader = MakeGarbageCollected<ReadableStreamDefaultReader>(
script_state, stream, exception_state);
if (exception_state.HadException()) {
return nullptr;
}
return reader;
}
ReadableStreamDefaultReader::ReadableStreamDefaultReader(
ScriptState* script_state,
ReadableStream* stream,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#default-reader-constructor
// 1. Perform ? SetUpReadableStreamDefaultReader(this, stream).
SetUpDefaultReader(script_state, this, stream, exception_state);
}
ReadableStreamDefaultReader::~ReadableStreamDefaultReader() = default;
ScriptPromise ReadableStreamDefaultReader::read(
ScriptState* script_state,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#default-reader-read
// 2. If this.[[ownerReadableStream]] is undefined, return a promise rejected
// with a TypeError exception.
if (!owner_readable_stream_) {
exception_state.ThrowTypeError(
"This readable stream reader has been released and cannot be used to "
"read from its previous owner stream");
return ScriptPromise();
}
// 3. Return ! ReadableStreamReaderRead(this).
return Read(script_state, this)->GetScriptPromise(script_state);
}
StreamPromiseResolver* ReadableStreamDefaultReader::Read(
ScriptState* script_state,
ReadableStreamDefaultReader* reader) {
auto* isolate = script_state->GetIsolate();
// https://streams.spec.whatwg.org/#readable-stream-default-reader-read
// 1. Let stream be reader.[[ownerReadableStream]].
ReadableStream* stream = reader->owner_readable_stream_;
// 2. Assert: stream is not undefined.
DCHECK(stream);
// 3. Set stream.[[disturbed]] to true.
stream->is_disturbed_ = true;
switch (stream->state_) {
// 4. If stream.[[state]] is "closed", return a promise resolved with !
// ReadableStreamCreateReadResult(undefined, true,
// reader.[[forAuthorCode]]).
case ReadableStream::kClosed:
return StreamPromiseResolver::CreateResolved(
script_state,
ReadableStream::CreateReadResult(script_state, v8::Undefined(isolate),
true, reader->for_author_code_));
// 5. If stream.[[state]] is "errored", return a promise rejected with
// stream.[[storedError]].
case ReadableStream::kErrored:
return StreamPromiseResolver::CreateRejected(
script_state, stream->GetStoredError(isolate));
case ReadableStream::kReadable:
// 6. Assert: stream.[[state]] is "readable".
DCHECK_EQ(stream->state_, ReadableStream::kReadable);
// 7. Return ! stream.[[readableStreamController]].[[PullSteps]]().
return stream->GetController()->PullSteps(script_state);
}
}
void ReadableStreamDefaultReader::releaseLock(ScriptState* script_state,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#default-reader-release-lock
// 2. If this.[[ownerReadableStream]] is undefined, return.
if (!owner_readable_stream_) {
return;
}
// 3. If this.[[readRequests]] is not empty, throw a TypeError exception.
if (read_requests_.size() > 0) {
exception_state.ThrowTypeError(
"Cannot release a readable stream reader when it still has outstanding "
"read() calls that have not yet settled");
return;
}
// 4. Perform ! ReadableStreamReaderGenericRelease(this).
GenericRelease(script_state, this);
}
void ReadableStreamDefaultReader::SetUpDefaultReader(
ScriptState* script_state,
ReadableStreamDefaultReader* reader,
ReadableStream* stream,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#set-up-readable-stream-default-reader
// 1. If ! IsReadableStreamLocked(stream) is true, throw a TypeError
// exception.
if (ReadableStream::IsLocked(stream)) {
exception_state.ThrowTypeError(
"ReadableStreamDefaultReader constructor can only accept readable "
"streams "
"that are not yet locked to a reader");
return;
}
DCHECK(reader->for_author_code_);
// 2. Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
ReadableStreamGenericReader::GenericInitialize(script_state, reader, stream);
// 3. Set reader.[[readRequests]] to a new empty List.
DCHECK_EQ(reader->read_requests_.size(), 0u);
}
void ReadableStreamDefaultReader::Trace(Visitor* visitor) const {
visitor->Trace(read_requests_);
ReadableStreamGenericReader::Trace(visitor);
}
} // namespace blink
...@@ -5,14 +5,61 @@ ...@@ -5,14 +5,61 @@
#ifndef THIRD_PARTY_BLINK_RENDERER_CORE_STREAMS_READABLE_STREAM_DEFAULT_READER_H_ #ifndef THIRD_PARTY_BLINK_RENDERER_CORE_STREAMS_READABLE_STREAM_DEFAULT_READER_H_
#define THIRD_PARTY_BLINK_RENDERER_CORE_STREAMS_READABLE_STREAM_DEFAULT_READER_H_ #define THIRD_PARTY_BLINK_RENDERER_CORE_STREAMS_READABLE_STREAM_DEFAULT_READER_H_
#include "third_party/blink/renderer/core/core_export.h"
#include "third_party/blink/renderer/core/streams/readable_stream_generic_reader.h" #include "third_party/blink/renderer/core/streams/readable_stream_generic_reader.h"
#include "third_party/blink/renderer/platform/bindings/script_wrappable.h"
namespace blink { namespace blink {
// ReadableStreamDefaultReader is implemented by the ReadableStreamReader class. class ExceptionState;
// See the header file for the reasoning. class ReadableStream;
class ScriptPromise;
class ScriptState;
class StreamPromiseResolver;
using ReadableStreamDefaultReader = ReadableStreamGenericReader; class CORE_EXPORT ReadableStreamDefaultReader
: public ReadableStreamGenericReader {
DEFINE_WRAPPERTYPEINFO();
public:
static ReadableStreamDefaultReader* Create(ScriptState*,
ReadableStream* stream,
ExceptionState&);
// https://streams.spec.whatwg.org/#default-reader-constructor
ReadableStreamDefaultReader(ScriptState*,
ReadableStream* stream,
ExceptionState&);
~ReadableStreamDefaultReader() override;
// https://streams.spec.whatwg.org/#default-reader-read
ScriptPromise read(ScriptState*, ExceptionState&);
// https://streams.spec.whatwg.org/#default-reader-release-lock
void releaseLock(ScriptState*, ExceptionState&);
static void SetUpDefaultReader(ScriptState*,
ReadableStreamDefaultReader* reader,
ReadableStream* stream,
ExceptionState&);
//
// Readable stream reader abstract operations
//
// https://streams.spec.whatwg.org/#readable-stream-default-reader-read
static StreamPromiseResolver* Read(ScriptState*,
ReadableStreamDefaultReader* reader);
void Trace(Visitor*) const override;
private:
friend class ReadableStreamDefaultController;
friend class ReadableStream;
HeapDeque<Member<StreamPromiseResolver>> read_requests_;
bool for_author_code_ = true;
};
} // namespace blink } // namespace blink
......
...@@ -4,8 +4,7 @@ ...@@ -4,8 +4,7 @@
// https://streams.spec.whatwg.org/#default-reader-class-definition // https://streams.spec.whatwg.org/#default-reader-class-definition
[ [
Exposed=(Window,Worker,Worklet), Exposed=(Window,Worker,Worklet)
ImplementedAs=ReadableStreamGenericReader
] interface ReadableStreamDefaultReader { ] interface ReadableStreamDefaultReader {
[CallWith=ScriptState, RaisesException] constructor(ReadableStream stream); [CallWith=ScriptState, RaisesException] constructor(ReadableStream stream);
[CallWith=ScriptState] readonly attribute Promise<void> [CallWith=ScriptState] readonly attribute Promise<void>
......
...@@ -15,46 +15,14 @@ ...@@ -15,46 +15,14 @@
namespace blink { namespace blink {
ReadableStreamGenericReader* ReadableStreamGenericReader::Create( ReadableStreamGenericReader::ReadableStreamGenericReader() = default;
ScriptState* script_state,
ReadableStream* stream,
ExceptionState& exception_state) {
auto* reader = MakeGarbageCollected<ReadableStreamGenericReader>(
script_state, stream, exception_state);
if (exception_state.HadException()) {
return nullptr;
}
return reader;
}
ReadableStreamGenericReader::ReadableStreamGenericReader(
ScriptState* script_state,
ReadableStream* stream,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#default-reader-constructor
// 2. If ! IsReadableStreamLocked(stream) is true, throw a TypeError
// exception.
if (ReadableStream::IsLocked(stream)) {
exception_state.ThrowTypeError(
"ReadableStreamDefaultReader constructor can only accept readable "
"streams "
"that are not yet locked to a reader");
return;
}
// 3. Perform ! ReadableStreamReaderGenericInitialize(this, stream).
GenericInitialize(script_state, this, stream);
// 4. Set this.[[readRequests]] to a new empty List.
DCHECK_EQ(read_requests_.size(), 0u);
}
ReadableStreamGenericReader::~ReadableStreamGenericReader() = default; ReadableStreamGenericReader::~ReadableStreamGenericReader() = default;
ScriptPromise ReadableStreamGenericReader::closed( ScriptPromise ReadableStreamGenericReader::closed(
ScriptState* script_state) const { ScriptState* script_state) const {
// https://streams.spec.whatwg.org/#default-reader-closed // https://streams.spec.whatwg.org/#default-reader-closed
// 2. Return this.[[closedPromise]]. // 1. Return this.[[closedPromise]].
return closed_promise_->GetScriptPromise(script_state); return closed_promise_->GetScriptPromise(script_state);
} }
...@@ -87,83 +55,6 @@ ScriptPromise ReadableStreamGenericReader::cancel( ...@@ -87,83 +55,6 @@ ScriptPromise ReadableStreamGenericReader::cancel(
return ScriptPromise(script_state, result); return ScriptPromise(script_state, result);
} }
ScriptPromise ReadableStreamGenericReader::read(
ScriptState* script_state,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#default-reader-read
// 2. If this.[[ownerReadableStream]] is undefined, return a promise rejected
// with a TypeError exception.
if (!owner_readable_stream_) {
exception_state.ThrowTypeError(
"This readable stream reader has been released and cannot be used to "
"read from its previous owner stream");
return ScriptPromise();
}
// 3. Return ! ReadableStreamReaderRead(this).
return ReadableStreamGenericReader::Read(script_state, this)
->GetScriptPromise(script_state);
}
void ReadableStreamGenericReader::releaseLock(ScriptState* script_state,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#default-reader-release-lock
// 2. If this.[[ownerReadableStream]] is undefined, return.
if (!owner_readable_stream_) {
return;
}
// 3. If this.[[readRequests]] is not empty, throw a TypeError exception.
if (read_requests_.size() > 0) {
exception_state.ThrowTypeError(
"Cannot release a readable stream reader when it still has outstanding "
"read() calls that have not yet settled");
return;
}
// 4. Perform ! ReadableStreamReaderGenericRelease(this).
GenericRelease(script_state, this);
}
StreamPromiseResolver* ReadableStreamGenericReader::Read(
ScriptState* script_state,
ReadableStreamGenericReader* reader) {
auto* isolate = script_state->GetIsolate();
// https://streams.spec.whatwg.org/#readable-stream-default-reader-read
// 1. Let stream be reader.[[ownerReadableStream]].
ReadableStream* stream = reader->owner_readable_stream_;
// 2. Assert: stream is not undefined.
DCHECK(stream);
// 3. Set stream.[[disturbed]] to true.
stream->is_disturbed_ = true;
switch (stream->state_) {
// 4. If stream.[[state]] is "closed", return a promise resolved with !
// ReadableStreamCreateReadResult(undefined, true,
// reader.[[forAuthorCode]]).
case ReadableStream::kClosed:
return StreamPromiseResolver::CreateResolved(
script_state,
ReadableStream::CreateReadResult(script_state, v8::Undefined(isolate),
true, reader->for_author_code_));
// 5. If stream.[[state]] is "errored", return a promise rejected with
// stream.[[storedError]].
case ReadableStream::kErrored:
return StreamPromiseResolver::CreateRejected(
script_state, stream->GetStoredError(isolate));
case ReadableStream::kReadable:
// 6. Assert: stream.[[state]] is "readable".
DCHECK_EQ(stream->state_, ReadableStream::kReadable);
// 7. Return ! stream.[[readableStreamController]].[[PullSteps]]().
return stream->GetController()->PullSteps(script_state);
}
}
void ReadableStreamGenericReader::GenericRelease( void ReadableStreamGenericReader::GenericRelease(
ScriptState* script_state, ScriptState* script_state,
ReadableStreamGenericReader* reader) { ReadableStreamGenericReader* reader) {
...@@ -208,7 +99,6 @@ void ReadableStreamGenericReader::GenericRelease( ...@@ -208,7 +99,6 @@ void ReadableStreamGenericReader::GenericRelease(
void ReadableStreamGenericReader::Trace(Visitor* visitor) const { void ReadableStreamGenericReader::Trace(Visitor* visitor) const {
visitor->Trace(closed_promise_); visitor->Trace(closed_promise_);
visitor->Trace(owner_readable_stream_); visitor->Trace(owner_readable_stream_);
visitor->Trace(read_requests_);
ScriptWrappable::Trace(visitor); ScriptWrappable::Trace(visitor);
} }
...@@ -234,31 +124,28 @@ void ReadableStreamGenericReader::GenericInitialize( ...@@ -234,31 +124,28 @@ void ReadableStreamGenericReader::GenericInitialize(
auto* isolate = script_state->GetIsolate(); auto* isolate = script_state->GetIsolate();
// https://streams.spec.whatwg.org/#readable-stream-reader-generic-initialize // https://streams.spec.whatwg.org/#readable-stream-reader-generic-initialize
// 1. Set reader.[[forAuthorCode]] to true. // 1. Set reader.[[ownerReadableStream]] to stream.
DCHECK(reader->for_author_code_);
// 2. Set reader.[[ownerReadableStream]] to stream.
reader->owner_readable_stream_ = stream; reader->owner_readable_stream_ = stream;
// 3. Set stream.[[reader]] to reader. // 2. Set stream.[[reader]] to reader.
stream->reader_ = reader; stream->reader_ = static_cast<ReadableStreamDefaultReader*>(reader);
switch (stream->state_) { switch (stream->state_) {
// 4. If stream.[[state]] is "readable", // 3. If stream.[[state]] is "readable",
case ReadableStream::kReadable: case ReadableStream::kReadable:
// a. Set reader.[[closedPromise]] to a new promise. // a. Set reader.[[closedPromise]] to a new promise.
reader->closed_promise_ = reader->closed_promise_ =
MakeGarbageCollected<StreamPromiseResolver>(script_state); MakeGarbageCollected<StreamPromiseResolver>(script_state);
break; break;
// 5. Otherwise, if stream.[[state]] is "closed", // 4. Otherwise, if stream.[[state]] is "closed",
case ReadableStream::kClosed: case ReadableStream::kClosed:
// a. Set reader.[[closedPromise]] to a promise resolved with undefined. // a. Set reader.[[closedPromise]] to a promise resolved with undefined.
reader->closed_promise_ = reader->closed_promise_ =
StreamPromiseResolver::CreateResolvedWithUndefined(script_state); StreamPromiseResolver::CreateResolvedWithUndefined(script_state);
break; break;
// 6. Otherwise, // 5. Otherwise,
case ReadableStream::kErrored: case ReadableStream::kErrored:
// a. Assert: stream.[[state]] is "errored". // a. Assert: stream.[[state]] is "errored".
DCHECK_EQ(stream->state_, ReadableStream::kErrored); DCHECK_EQ(stream->state_, ReadableStream::kErrored);
......
...@@ -13,9 +13,9 @@ ...@@ -13,9 +13,9 @@
namespace blink { namespace blink {
class ExceptionState; class ExceptionState;
class ReadableStream;
class ScriptPromise; class ScriptPromise;
class ScriptState; class ScriptState;
class ReadableStream;
class StreamPromiseResolver; class StreamPromiseResolver;
class Visitor; class Visitor;
...@@ -26,51 +26,25 @@ class Visitor; ...@@ -26,51 +26,25 @@ class Visitor;
// ReadableStreamGenericReader class. // ReadableStreamGenericReader class.
// TODO(ricea): Refactor this when implementing ReadableStreamBYOBReader. // TODO(ricea): Refactor this when implementing ReadableStreamBYOBReader.
class CORE_EXPORT ReadableStreamGenericReader : public ScriptWrappable { class CORE_EXPORT ReadableStreamGenericReader : public ScriptWrappable {
DEFINE_WRAPPERTYPEINFO();
public: public:
static ReadableStreamGenericReader* Create(ScriptState*, ReadableStreamGenericReader();
ReadableStream* stream,
ExceptionState&);
// https://streams.spec.whatwg.org/#default-reader-constructor
ReadableStreamGenericReader(ScriptState*,
ReadableStream* stream,
ExceptionState&);
~ReadableStreamGenericReader() override; ~ReadableStreamGenericReader() override;
// https://streams.spec.whatwg.org/#default-reader-closed // https://streams.spec.whatwg.org/#generic-reader-closed
ScriptPromise closed(ScriptState*) const; ScriptPromise closed(ScriptState*) const;
// https://streams.spec.whatwg.org/#default-reader-cancel // https://streams.spec.whatwg.org/#generic-reader-cancel
ScriptPromise cancel(ScriptState*, ExceptionState&); ScriptPromise cancel(ScriptState*, ExceptionState&);
ScriptPromise cancel(ScriptState*, ScriptValue reason, ExceptionState&); ScriptPromise cancel(ScriptState*, ScriptValue reason, ExceptionState&);
// https://streams.spec.whatwg.org/#default-reader-read
ScriptPromise read(ScriptState*, ExceptionState&);
// https://streams.spec.whatwg.org/#default-reader-release-lock
void releaseLock(ScriptState*, ExceptionState&);
// //
// Readable stream reader abstract operations // Readable stream reader abstract operations
// //
// https://streams.spec.whatwg.org/#readable-stream-default-reader-read
static StreamPromiseResolver* Read(ScriptState* script_state,
ReadableStreamGenericReader* reader);
// https://streams.spec.whatwg.org/#readable-stream-reader-generic-release // https://streams.spec.whatwg.org/#readable-stream-reader-generic-release
static void GenericRelease(ScriptState*, ReadableStreamGenericReader*); static void GenericRelease(ScriptState*, ReadableStreamGenericReader*);
StreamPromiseResolver* ClosedPromise() { return closed_promise_; }
void Trace(Visitor*) const override;
private:
friend class ReadableStreamDefaultController;
friend class ReadableStream;
// https://streams.spec.whatwg.org/#readable-stream-reader-generic-cancel // https://streams.spec.whatwg.org/#readable-stream-reader-generic-cancel
static v8::Local<v8::Promise> GenericCancel(ScriptState*, static v8::Local<v8::Promise> GenericCancel(ScriptState*,
ReadableStreamGenericReader*, ReadableStreamGenericReader*,
...@@ -81,10 +55,18 @@ class CORE_EXPORT ReadableStreamGenericReader : public ScriptWrappable { ...@@ -81,10 +55,18 @@ class CORE_EXPORT ReadableStreamGenericReader : public ScriptWrappable {
ReadableStreamGenericReader*, ReadableStreamGenericReader*,
ReadableStream*); ReadableStream*);
StreamPromiseResolver* ClosedPromise() const { return closed_promise_; }
void Trace(Visitor*) const override;
private:
friend class ReadableStreamDefaultController;
friend class ReadableStream;
Member<StreamPromiseResolver> closed_promise_; Member<StreamPromiseResolver> closed_promise_;
bool for_author_code_ = true;
protected:
Member<ReadableStream> owner_readable_stream_; Member<ReadableStream> owner_readable_stream_;
HeapDeque<Member<StreamPromiseResolver>> read_requests_;
}; };
} // namespace blink } // namespace blink
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment