Commit abeaa5ea authored by Adam Rice's avatar Adam Rice Committed by Commit Bot

Re-order ReadableStream.js

Change the order of functions in ReadableStream.js to match the standard. This
will make it easier to update the implementation to match changes to the
standard.

No functional changes.

This corresponds to step 3 of issue 710728.

Verified identical to the previous version once comments and blank lines are
removed and the file is sorted by function.

Bug: 710728
Change-Id: I8b0ae4ad41aa5e84f234901560ad6f97e6319001
Reviewed-on: https://chromium-review.googlesource.com/821955Reviewed-by: default avatarTakeshi Yoshino <tyoshino@chromium.org>
Commit-Queue: Adam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/master@{#523739}
parent 594a5f31
......@@ -427,149 +427,196 @@
return promise;
}
class ReadableStreamDefaultController {
constructor(
stream, underlyingSource, size, highWaterMark, isExternallyControlled) {
if (IsReadableStream(stream) === false) {
throw new TypeError(streamErrors.illegalConstructor);
//
// Readable stream abstract operations
//
function AcquireReadableStreamDefaultReader(stream) {
return new ReadableStreamDefaultReader(stream);
}
if (stream[_controller] !== undefined) {
throw new TypeError(streamErrors.illegalConstructor);
function IsReadableStream(x) {
return hasOwnPropertyNoThrow(x, _controller);
}
this[_controlledReadableStream] = stream;
function IsReadableStreamDisturbed(stream) {
return stream[_readableStreamBits] & DISTURBED;
}
this[_underlyingSource] = underlyingSource;
function IsReadableStreamLocked(stream) {
return stream[_reader] !== undefined;
}
this[_queue] = new binding.SimpleQueue();
this[_queueTotalSize] = 0;
// Potential future optimization: use class instances for the underlying
// sources, so that we don't re-create
// closures every time.
this[_readableStreamDefaultControllerBits] = 0b0;
if (isExternallyControlled === true) {
this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
}
// TODO(domenic): shouldClone argument from spec not supported yet
function ReadableStreamTee(stream) {
const reader = AcquireReadableStreamDefaultReader(stream);
const normalizedStrategy =
ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
this[_strategySize] = normalizedStrategy.size;
this[_strategyHWM] = normalizedStrategy.highWaterMark;
let closedOrErrored = false;
let canceled1 = false;
let canceled2 = false;
let reason1;
let reason2;
const promise = v8.createPromise();
const controller = this;
const branch1Stream = new ReadableStream({pull, cancel: cancel1});
const startResult = CallOrNoop1(
underlyingSource, 'start', this, 'underlyingSource.start');
thenPromise(
Promise_resolve(startResult),
() => {
controller[_readableStreamDefaultControllerBits] |= STARTED;
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
},
r => {
if (ReadableStreamGetState(stream) === STATE_READABLE) {
ReadableStreamDefaultControllerError(controller, r);
const branch2Stream = new ReadableStream({pull, cancel: cancel2});
const branch1 = branch1Stream[_controller];
const branch2 = branch2Stream[_controller];
thenPromise(reader[_closedPromise], undefined, function(r) {
if (closedOrErrored === true) {
return;
}
ReadableStreamDefaultControllerError(branch1, r);
ReadableStreamDefaultControllerError(branch2, r);
closedOrErrored = true;
});
}
get desiredSize() {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
return [branch1Stream, branch2Stream];
return ReadableStreamDefaultControllerGetDesiredSize(this);
}
function pull() {
return thenPromise(
ReadableStreamDefaultReaderRead(reader), function(result) {
const value = result.value;
const done = result.done;
close() {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
if (done === true && closedOrErrored === false) {
if (canceled1 === false) {
ReadableStreamDefaultControllerClose(branch1);
}
if (canceled2 === false) {
ReadableStreamDefaultControllerClose(branch2);
}
closedOrErrored = true;
}
const stream = this[_controlledReadableStream];
if (closedOrErrored === true) {
return;
}
if (this[_readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
throw new TypeError(errCloseCloseRequestedStream);
if (canceled1 === false) {
ReadableStreamDefaultControllerEnqueue(branch1, value);
}
const state = ReadableStreamGetState(stream);
if (state === STATE_ERRORED) {
throw new TypeError(errCloseErroredStream);
if (canceled2 === false) {
ReadableStreamDefaultControllerEnqueue(branch2, value);
}
if (state === STATE_CLOSED) {
throw new TypeError(errCloseClosedStream);
});
}
return ReadableStreamDefaultControllerClose(this);
function cancel1(reason) {
canceled1 = true;
reason1 = reason;
if (canceled2 === true) {
const compositeReason = [reason1, reason2];
const cancelResult = ReadableStreamCancel(stream, compositeReason);
resolvePromise(promise, cancelResult);
}
enqueue(chunk) {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
return promise;
}
if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(this)) {
const stream = this[_controlledReadableStream];
throw getReadableStreamEnqueueError(stream, this);
function cancel2(reason) {
canceled2 = true;
reason2 = reason;
if (canceled1 === true) {
const compositeReason = [reason1, reason2];
const cancelResult = ReadableStreamCancel(stream, compositeReason);
resolvePromise(promise, cancelResult);
}
return ReadableStreamDefaultControllerEnqueue(this, chunk);
return promise;
}
}
error(e) {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
//
// Abstract Operations Used By Controllers
//
function ReadableStreamAddReadRequest(stream) {
const promise = v8.createPromise();
stream[_reader][_readRequests].push(promise);
return promise;
}
const stream = this[_controlledReadableStream];
function ReadableStreamCancel(stream, reason) {
stream[_readableStreamBits] |= DISTURBED;
const state = ReadableStreamGetState(stream);
if (state === STATE_ERRORED) {
throw new TypeError(errErrorErroredStream);
}
if (state === STATE_CLOSED) {
throw new TypeError(errErrorClosedStream);
return Promise_resolve(undefined);
}
if (state === STATE_ERRORED) {
return Promise_reject(stream[_storedError]);
}
return ReadableStreamDefaultControllerError(this, e);
ReadableStreamClose(stream);
const sourceCancelPromise =
ReadableStreamDefaultControllerCancel(stream[_controller], reason);
return thenPromise(sourceCancelPromise, () => undefined);
}
function ReadableStreamClose(stream) {
ReadableStreamSetState(stream, STATE_CLOSED);
const reader = stream[_reader];
if (reader === undefined) {
return undefined;
}
function ReadableStreamDefaultControllerCancel(controller, reason) {
controller[_queue] = new binding.SimpleQueue();
if (IsReadableStreamDefaultReader(reader) === true) {
reader[_readRequests].forEach(
request =>
resolvePromise(request, CreateIterResultObject(undefined, true)));
reader[_readRequests] = new binding.SimpleQueue();
}
const underlyingSource = controller[_underlyingSource];
return PromiseCallOrNoop1(
underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
resolvePromise(reader[_closedPromise], undefined);
}
function ReadableStreamDefaultControllerPull(controller) {
const stream = controller[_controlledReadableStream];
function ReadableStreamError(stream, e) {
stream[_storedError] = e;
ReadableStreamSetState(stream, STATE_ERRORED);
if (controller[_queue].length > 0) {
const chunk = DequeueValue(controller);
const reader = stream[_reader];
if (reader === undefined) {
return undefined;
}
if ((controller[_readableStreamDefaultControllerBits] &
CLOSE_REQUESTED) &&
controller[_queue].length === 0) {
ReadableStreamClose(stream);
} else {
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
if (IsReadableStreamDefaultReader(reader) === true) {
reader[_readRequests].forEach(request => rejectPromise(request, e));
reader[_readRequests] = new binding.SimpleQueue();
}
return Promise_resolve(CreateIterResultObject(chunk, false));
rejectPromise(reader[_closedPromise], e);
markPromiseAsHandled(reader[_closedPromise]);
}
const pendingPromise = ReadableStreamAddReadRequest(stream);
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
return pendingPromise;
function ReadableStreamFulfillReadRequest(stream, chunk, done) {
const readRequest = stream[_reader][_readRequests].shift();
resolvePromise(readRequest, CreateIterResultObject(chunk, done));
}
function ReadableStreamAddReadRequest(stream) {
const promise = v8.createPromise();
stream[_reader][_readRequests].push(promise);
return promise;
function ReadableStreamGetNumReadRequests(stream) {
const reader = stream[_reader];
const readRequests = reader[_readRequests];
return readRequests.length;
}
//
// Class ReadableStreamDefaultReader
//
class ReadableStreamDefaultReader {
constructor(stream) {
if (IsReadableStream(stream) === false) {
......@@ -635,267 +682,238 @@
}
}
function ReadableStreamReaderGenericCancel(reader, reason) {
return ReadableStreamCancel(reader[_ownerReadableStream], reason);
}
//
// Readable stream abstract operations
// Readable Stream Reader Abstract Operations
//
function AcquireReadableStreamDefaultReader(stream) {
return new ReadableStreamDefaultReader(stream);
function IsReadableStreamDefaultReader(x) {
return hasOwnPropertyNoThrow(x, _readRequests);
}
function ReadableStreamCancel(stream, reason) {
stream[_readableStreamBits] |= DISTURBED;
const state = ReadableStreamGetState(stream);
if (state === STATE_CLOSED) {
return Promise_resolve(undefined);
}
if (state === STATE_ERRORED) {
return Promise_reject(stream[_storedError]);
function ReadableStreamReaderGenericCancel(reader, reason) {
return ReadableStreamCancel(reader[_ownerReadableStream], reason);
}
ReadableStreamClose(stream);
const sourceCancelPromise =
ReadableStreamDefaultControllerCancel(stream[_controller], reason);
return thenPromise(sourceCancelPromise, () => undefined);
function ReadableStreamReaderGenericInitialize(reader, stream) {
// TODO(yhirano): Remove this when we don't need hasPendingActivity in
// blink::UnderlyingSourceBase.
const controller = stream[_controller];
if (controller[_readableStreamDefaultControllerBits] &
EXTERNALLY_CONTROLLED) {
// The stream is created with an external controller (i.e. made in
// Blink).
const underlyingSource = controller[_underlyingSource];
callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
}
function ReadableStreamDefaultControllerClose(controller) {
const stream = controller[_controlledReadableStream];
controller[_readableStreamDefaultControllerBits] |= CLOSE_REQUESTED;
reader[_ownerReadableStream] = stream;
stream[_reader] = reader;
if (controller[_queue].length === 0) {
ReadableStreamClose(stream);
switch (ReadableStreamGetState(stream)) {
case STATE_READABLE:
reader[_closedPromise] = v8.createPromise();
break;
case STATE_CLOSED:
reader[_closedPromise] = Promise_resolve(undefined);
break;
case STATE_ERRORED:
reader[_closedPromise] = Promise_reject(stream[_storedError]);
markPromiseAsHandled(reader[_closedPromise]);
break;
}
}
function ReadableStreamFulfillReadRequest(stream, chunk, done) {
const readRequest = stream[_reader][_readRequests].shift();
resolvePromise(readRequest, CreateIterResultObject(chunk, done));
function ReadableStreamReaderGenericRelease(reader) {
// TODO(yhirano): Remove this when we don't need hasPendingActivity in
// blink::UnderlyingSourceBase.
const controller = reader[_ownerReadableStream][_controller];
if (controller[_readableStreamDefaultControllerBits] &
EXTERNALLY_CONTROLLED) {
// The stream is created with an external controller (i.e. made in
// Blink).
const underlyingSource = controller[_underlyingSource];
callFunction(underlyingSource.notifyLockReleased, underlyingSource);
}
function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
const stream = controller[_controlledReadableStream];
if (IsReadableStreamLocked(stream) === true &&
ReadableStreamGetNumReadRequests(stream) > 0) {
ReadableStreamFulfillReadRequest(stream, chunk, false);
if (ReadableStreamGetState(reader[_ownerReadableStream]) ===
STATE_READABLE) {
rejectPromise(
reader[_closedPromise],
new TypeError(errReleasedReaderClosedPromise));
} else {
let chunkSize = 1;
const strategySize = controller[_strategySize];
if (strategySize !== undefined) {
try {
chunkSize = strategySize(chunk);
} catch (chunkSizeE) {
if (ReadableStreamGetState(stream) === STATE_READABLE) {
ReadableStreamDefaultControllerError(controller, chunkSizeE);
}
throw chunkSizeE;
}
reader[_closedPromise] =
Promise_reject(new TypeError(errReleasedReaderClosedPromise));
}
markPromiseAsHandled(reader[_closedPromise]);
try {
EnqueueValueWithSize(controller, chunk, chunkSize);
} catch (enqueueE) {
if (ReadableStreamGetState(stream) === STATE_READABLE) {
ReadableStreamDefaultControllerError(controller, enqueueE);
}
throw enqueueE;
}
reader[_ownerReadableStream][_reader] = undefined;
reader[_ownerReadableStream] = undefined;
}
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
}
function ReadableStreamDefaultReaderRead(reader) {
const stream = reader[_ownerReadableStream];
stream[_readableStreamBits] |= DISTURBED;
function ReadableStreamGetState(stream) {
return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
if (ReadableStreamGetState(stream) === STATE_CLOSED) {
return Promise_resolve(CreateIterResultObject(undefined, true));
}
function ReadableStreamSetState(stream, state) {
stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) |
(state << STATE_BITS_OFFSET);
if (ReadableStreamGetState(stream) === STATE_ERRORED) {
return Promise_reject(stream[_storedError]);
}
function ReadableStreamDefaultControllerError(controller, e) {
controller[_queue] = new binding.SimpleQueue();
const stream = controller[_controlledReadableStream];
ReadableStreamError(stream, e);
return ReadableStreamDefaultControllerPull(stream[_controller]);
}
function ReadableStreamError(stream, e) {
stream[_storedError] = e;
ReadableStreamSetState(stream, STATE_ERRORED);
//
// Class ReadableStreamDefaultController
//
const reader = stream[_reader];
if (reader === undefined) {
return undefined;
class ReadableStreamDefaultController {
constructor(
stream, underlyingSource, size, highWaterMark, isExternallyControlled) {
if (IsReadableStream(stream) === false) {
throw new TypeError(streamErrors.illegalConstructor);
}
if (IsReadableStreamDefaultReader(reader) === true) {
reader[_readRequests].forEach(request => rejectPromise(request, e));
reader[_readRequests] = new binding.SimpleQueue();
if (stream[_controller] !== undefined) {
throw new TypeError(streamErrors.illegalConstructor);
}
rejectPromise(reader[_closedPromise], e);
markPromiseAsHandled(reader[_closedPromise]);
}
this[_controlledReadableStream] = stream;
function ReadableStreamClose(stream) {
ReadableStreamSetState(stream, STATE_CLOSED);
this[_underlyingSource] = underlyingSource;
const reader = stream[_reader];
if (reader === undefined) {
return undefined;
}
this[_queue] = new binding.SimpleQueue();
this[_queueTotalSize] = 0;
if (IsReadableStreamDefaultReader(reader) === true) {
reader[_readRequests].forEach(
request =>
resolvePromise(request, CreateIterResultObject(undefined, true)));
reader[_readRequests] = new binding.SimpleQueue();
this[_readableStreamDefaultControllerBits] = 0b0;
if (isExternallyControlled === true) {
this[_readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED;
}
resolvePromise(reader[_closedPromise], undefined);
}
const normalizedStrategy =
ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
this[_strategySize] = normalizedStrategy.size;
this[_strategyHWM] = normalizedStrategy.highWaterMark;
function ReadableStreamDefaultControllerGetDesiredSize(controller) {
return controller[_strategyHWM] - controller[_queueTotalSize];
}
const controller = this;
function ReadableStreamDefaultControllerHasBackpressure(controller) {
return !ReadableStreamDefaultControllerShouldCallPull(controller);
const startResult = CallOrNoop1(
underlyingSource, 'start', this, 'underlyingSource.start');
thenPromise(
Promise_resolve(startResult),
() => {
controller[_readableStreamDefaultControllerBits] |= STARTED;
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
},
r => {
if (ReadableStreamGetState(stream) === STATE_READABLE) {
ReadableStreamDefaultControllerError(controller, r);
}
});
}
function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) {
if (controller[_readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
return false;
get desiredSize() {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
const state = ReadableStreamGetState(controller[_controlledReadableStream]);
return state === STATE_READABLE;
return ReadableStreamDefaultControllerGetDesiredSize(this);
}
function IsReadableStream(x) {
return hasOwnPropertyNoThrow(x, _controller);
close() {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
function IsReadableStreamDisturbed(stream) {
return stream[_readableStreamBits] & DISTURBED;
const stream = this[_controlledReadableStream];
if (this[_readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
throw new TypeError(errCloseCloseRequestedStream);
}
function IsReadableStreamLocked(stream) {
return stream[_reader] !== undefined;
const state = ReadableStreamGetState(stream);
if (state === STATE_ERRORED) {
throw new TypeError(errCloseErroredStream);
}
if (state === STATE_CLOSED) {
throw new TypeError(errCloseClosedStream);
}
function IsReadableStreamDefaultController(x) {
return hasOwnPropertyNoThrow(x, _controlledReadableStream);
return ReadableStreamDefaultControllerClose(this);
}
function IsReadableStreamDefaultReader(x) {
return hasOwnPropertyNoThrow(x, _readRequests);
enqueue(chunk) {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
function IsReadableStreamReadable(stream) {
return ReadableStreamGetState(stream) === STATE_READABLE;
if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(this)) {
const stream = this[_controlledReadableStream];
throw getReadableStreamEnqueueError(stream, this);
}
function IsReadableStreamClosed(stream) {
return ReadableStreamGetState(stream) === STATE_CLOSED;
return ReadableStreamDefaultControllerEnqueue(this, chunk);
}
function IsReadableStreamErrored(stream) {
return ReadableStreamGetState(stream) === STATE_ERRORED;
error(e) {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
// Used internally by enqueue() and also by TransformStream.
function getReadableStreamEnqueueError(stream, controller) {
if (controller[_readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
return new TypeError(errEnqueueCloseRequestedStream);
}
const stream = this[_controlledReadableStream];
const state = ReadableStreamGetState(stream);
if (state === STATE_ERRORED) {
return new TypeError(errEnqueueErroredStream);
}
// assert(state === STATE_CLOSED, 'state is "closed"');
return new TypeError(errEnqueueClosedStream);
throw new TypeError(errErrorErroredStream);
}
function ReadableStreamReaderGenericInitialize(reader, stream) {
// TODO(yhirano): Remove this when we don't need hasPendingActivity in
// blink::UnderlyingSourceBase.
const controller = stream[_controller];
if (controller[_readableStreamDefaultControllerBits] &
EXTERNALLY_CONTROLLED) {
// The stream is created with an external controller (i.e. made in
// Blink).
const underlyingSource = controller[_underlyingSource];
callFunction(underlyingSource.notifyLockAcquired, underlyingSource);
if (state === STATE_CLOSED) {
throw new TypeError(errErrorClosedStream);
}
reader[_ownerReadableStream] = stream;
stream[_reader] = reader;
switch (ReadableStreamGetState(stream)) {
case STATE_READABLE:
reader[_closedPromise] = v8.createPromise();
break;
case STATE_CLOSED:
reader[_closedPromise] = Promise_resolve(undefined);
break;
case STATE_ERRORED:
reader[_closedPromise] = Promise_reject(stream[_storedError]);
markPromiseAsHandled(reader[_closedPromise]);
break;
return ReadableStreamDefaultControllerError(this, e);
}
}
function ReadableStreamReaderGenericRelease(reader) {
// TODO(yhirano): Remove this when we don't need hasPendingActivity in
// blink::UnderlyingSourceBase.
const controller = reader[_ownerReadableStream][_controller];
if (controller[_readableStreamDefaultControllerBits] &
EXTERNALLY_CONTROLLED) {
// The stream is created with an external controller (i.e. made in
// Blink).
// [[CancelSteps]] in the standard.
function ReadableStreamDefaultControllerCancel(controller, reason) {
controller[_queue] = new binding.SimpleQueue();
const underlyingSource = controller[_underlyingSource];
callFunction(underlyingSource.notifyLockReleased, underlyingSource);
return PromiseCallOrNoop1(
underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
}
if (ReadableStreamGetState(reader[_ownerReadableStream]) ===
STATE_READABLE) {
rejectPromise(
reader[_closedPromise],
new TypeError(errReleasedReaderClosedPromise));
// [[PullSteps]] in the standard.
function ReadableStreamDefaultControllerPull(controller) {
const stream = controller[_controlledReadableStream];
if (controller[_queue].length > 0) {
const chunk = DequeueValue(controller);
if ((controller[_readableStreamDefaultControllerBits] &
CLOSE_REQUESTED) &&
controller[_queue].length === 0) {
ReadableStreamClose(stream);
} else {
reader[_closedPromise] =
Promise_reject(new TypeError(errReleasedReaderClosedPromise));
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
}
markPromiseAsHandled(reader[_closedPromise]);
reader[_ownerReadableStream][_reader] = undefined;
reader[_ownerReadableStream] = undefined;
return Promise_resolve(CreateIterResultObject(chunk, false));
}
function ReadableStreamDefaultReaderRead(reader) {
const stream = reader[_ownerReadableStream];
stream[_readableStreamBits] |= DISTURBED;
if (ReadableStreamGetState(stream) === STATE_CLOSED) {
return Promise_resolve(CreateIterResultObject(undefined, true));
const pendingPromise = ReadableStreamAddReadRequest(stream);
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
return pendingPromise;
}
if (ReadableStreamGetState(stream) === STATE_ERRORED) {
return Promise_reject(stream[_storedError]);
}
//
// Readable Stream Default Controller Abstract Operations
//
return ReadableStreamDefaultControllerPull(stream[_controller]);
function IsReadableStreamDefaultController(x) {
return hasOwnPropertyNoThrow(x, _controlledReadableStream);
}
function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
......@@ -964,101 +982,113 @@
return false;
}
function ReadableStreamGetNumReadRequests(stream) {
const reader = stream[_reader];
const readRequests = reader[_readRequests];
return readRequests.length;
}
// Potential future optimization: use class instances for the underlying
// sources, so that we don't re-create
// closures every time.
function ReadableStreamDefaultControllerClose(controller) {
const stream = controller[_controlledReadableStream];
// TODO(domenic): shouldClone argument from spec not supported yet
function ReadableStreamTee(stream) {
const reader = AcquireReadableStreamDefaultReader(stream);
controller[_readableStreamDefaultControllerBits] |= CLOSE_REQUESTED;
let closedOrErrored = false;
let canceled1 = false;
let canceled2 = false;
let reason1;
let reason2;
const promise = v8.createPromise();
if (controller[_queue].length === 0) {
ReadableStreamClose(stream);
}
}
const branch1Stream = new ReadableStream({pull, cancel: cancel1});
function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
const stream = controller[_controlledReadableStream];
const branch2Stream = new ReadableStream({pull, cancel: cancel2});
if (IsReadableStreamLocked(stream) === true &&
ReadableStreamGetNumReadRequests(stream) > 0) {
ReadableStreamFulfillReadRequest(stream, chunk, false);
} else {
let chunkSize = 1;
const branch1 = branch1Stream[_controller];
const branch2 = branch2Stream[_controller];
const strategySize = controller[_strategySize];
if (strategySize !== undefined) {
try {
chunkSize = strategySize(chunk);
} catch (chunkSizeE) {
if (ReadableStreamGetState(stream) === STATE_READABLE) {
ReadableStreamDefaultControllerError(controller, chunkSizeE);
}
throw chunkSizeE;
}
}
thenPromise(reader[_closedPromise], undefined, function(r) {
if (closedOrErrored === true) {
return;
try {
EnqueueValueWithSize(controller, chunk, chunkSize);
} catch (enqueueE) {
if (ReadableStreamGetState(stream) === STATE_READABLE) {
ReadableStreamDefaultControllerError(controller, enqueueE);
}
throw enqueueE;
}
}
ReadableStreamDefaultControllerError(branch1, r);
ReadableStreamDefaultControllerError(branch2, r);
closedOrErrored = true;
});
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
}
return [branch1Stream, branch2Stream];
function ReadableStreamDefaultControllerError(controller, e) {
controller[_queue] = new binding.SimpleQueue();
const stream = controller[_controlledReadableStream];
ReadableStreamError(stream, e);
}
function pull() {
return thenPromise(
ReadableStreamDefaultReaderRead(reader), function(result) {
const value = result.value;
const done = result.done;
function ReadableStreamDefaultControllerGetDesiredSize(controller) {
return controller[_strategyHWM] - controller[_queueTotalSize];
}
if (done === true && closedOrErrored === false) {
if (canceled1 === false) {
ReadableStreamDefaultControllerClose(branch1);
function ReadableStreamDefaultControllerHasBackpressure(controller) {
return !ReadableStreamDefaultControllerShouldCallPull(controller);
}
if (canceled2 === false) {
ReadableStreamDefaultControllerClose(branch2);
function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) {
if (controller[_readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
return false;
}
closedOrErrored = true;
const state = ReadableStreamGetState(controller[_controlledReadableStream]);
return state === STATE_READABLE;
}
if (closedOrErrored === true) {
return;
}
//
// Internal functions. Not part of the standard.
//
if (canceled1 === false) {
ReadableStreamDefaultControllerEnqueue(branch1, value);
function ReadableStreamGetState(stream) {
return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
}
if (canceled2 === false) {
ReadableStreamDefaultControllerEnqueue(branch2, value);
}
});
function ReadableStreamSetState(stream, state) {
stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) |
(state << STATE_BITS_OFFSET);
}
function cancel1(reason) {
canceled1 = true;
reason1 = reason;
//
// Functions exported for use by TransformStream. Not part of the standard.
//
if (canceled2 === true) {
const compositeReason = [reason1, reason2];
const cancelResult = ReadableStreamCancel(stream, compositeReason);
resolvePromise(promise, cancelResult);
function IsReadableStreamReadable(stream) {
return ReadableStreamGetState(stream) === STATE_READABLE;
}
return promise;
function IsReadableStreamClosed(stream) {
return ReadableStreamGetState(stream) === STATE_CLOSED;
}
function cancel2(reason) {
canceled2 = true;
reason2 = reason;
function IsReadableStreamErrored(stream) {
return ReadableStreamGetState(stream) === STATE_ERRORED;
}
if (canceled1 === true) {
const compositeReason = [reason1, reason2];
const cancelResult = ReadableStreamCancel(stream, compositeReason);
resolvePromise(promise, cancelResult);
// Used internally by enqueue() and also by TransformStream.
function getReadableStreamEnqueueError(stream, controller) {
if (controller[_readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
return new TypeError(errEnqueueCloseRequestedStream);
}
return promise;
const state = ReadableStreamGetState(stream);
if (state === STATE_ERRORED) {
return new TypeError(errEnqueueErroredStream);
}
// assert(state === STATE_CLOSED, 'state is "closed"');
return new TypeError(errEnqueueClosedStream);
}
//
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment