Commit 9822f6b1 authored by yhirano@chromium.org's avatar yhirano@chromium.org

Make ReadableStream implementation up to date.

Make already-implemented functions up to date. Unimplemented features such
as piping and strategy are still unimplemented.

BUG=393911

Review URL: https://codereview.chromium.org/712043002

git-svn-id: svn://svn.chromium.org/blink/trunk@185181 bbb929c8-8fbe-4397-9dbb-9b2b20218538
parent 0e495071
......@@ -16,12 +16,30 @@
namespace blink {
namespace {
class ConstUndefined : public ScriptFunction {
public:
static v8::Handle<v8::Function> create(ScriptState* scriptState)
{
return (new ConstUndefined(scriptState))->bindToV8Function();
}
private:
explicit ConstUndefined(ScriptState* scriptState) : ScriptFunction(scriptState) { }
ScriptValue call(ScriptValue value) override
{
return ScriptValue(scriptState(), v8::Undefined(scriptState()->isolate()));
}
};
} // namespace
ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSource* source)
: m_source(source)
, m_isStarted(false)
, m_isDraining(false)
, m_isPulling(false)
, m_isSchedulingPull(false)
, m_state(Waiting)
, m_wait(new WaitPromise(executionContext, this, WaitPromise::Ready))
, m_closed(new ClosedPromise(executionContext, this, ClosedPromise::Closed))
......@@ -50,10 +68,12 @@ String ReadableStream::stateString() const
bool ReadableStream::enqueuePreliminaryCheck(size_t chunkSize)
{
// This is a bit different from what spec says: it says we should throw
// an exception here. But sometimes a caller is not in any JavaScript
// context, and we don't want to throw an exception in such a case.
if (m_state == Errored || m_state == Closed || m_isDraining)
return false;
// FIXME: Query strategy.
return true;
}
......@@ -61,15 +81,15 @@ bool ReadableStream::enqueuePostAction(size_t totalQueueSize)
{
m_isPulling = false;
// FIXME: Set needsMore correctly.
bool needsMore = true;
// FIXME: Set |shouldApplyBackpressure| correctly.
bool shouldApplyBackpressure = false;
if (m_state == Waiting) {
m_state = Readable;
m_wait->resolve(V8UndefinedType());
}
return needsMore;
return !shouldApplyBackpressure;
}
void ReadableStream::close()
......@@ -105,47 +125,34 @@ void ReadableStream::readPostAction()
if (isQueueEmpty()) {
if (m_isDraining) {
m_state = Closed;
m_wait->reset();
m_wait->resolve(V8UndefinedType());
m_closed->resolve(V8UndefinedType());
} else {
m_state = Waiting;
m_wait->reset();
callOrSchedulePull();
}
}
callPullIfNeeded();
}
ScriptPromise ReadableStream::wait(ScriptState* scriptState)
{
if (m_state == Waiting)
callOrSchedulePull();
return m_wait->promise(scriptState->world());
}
ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reason)
{
if (m_state == Errored) {
RefPtr<ScriptPromiseResolver> resolver = ScriptPromiseResolver::create(scriptState);
ScriptPromise promise = resolver->promise();
resolver->reject(m_exception);
return promise;
}
if (m_state == Closed)
return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate()));
if (m_state == Errored)
return ScriptPromise::rejectWithDOMException(scriptState, m_exception);
if (m_state == Waiting) {
m_wait->resolve(V8UndefinedType());
} else {
ASSERT(m_state == Readable);
m_wait->reset();
ASSERT(m_state == Readable || m_state == Waiting);
if (m_state == Waiting)
m_wait->resolve(V8UndefinedType());
}
clearQueue();
m_state = Closed;
m_closed->resolve(V8UndefinedType());
return m_source->cancelSource(scriptState, reason);
return m_source->cancelSource(scriptState, reason).then(ConstUndefined::create(scriptState));
}
ScriptPromise ReadableStream::closed(ScriptState* scriptState)
......@@ -155,36 +162,42 @@ ScriptPromise ReadableStream::closed(ScriptState* scriptState)
void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception)
{
if (m_state == Readable) {
switch (m_state) {
case Waiting:
m_state = Errored;
m_exception = exception;
m_wait->reject(m_exception);
m_closed->reject(m_exception);
break;
case Readable:
clearQueue();
m_wait->reset();
}
if (m_state == Waiting || m_state == Readable) {
m_state = Errored;
m_exception = exception;
if (m_wait->state() == m_wait->Pending)
m_wait->reject(m_exception);
m_wait->reset();
m_wait->reject(m_exception);
m_closed->reject(m_exception);
break;
default:
break;
}
}
void ReadableStream::didSourceStart()
{
m_isStarted = true;
if (m_isSchedulingPull)
m_source->pullSource();
callPullIfNeeded();
}
void ReadableStream::callOrSchedulePull()
void ReadableStream::callPullIfNeeded()
{
if (m_isPulling)
if (m_isPulling || m_isDraining || !m_isStarted || m_state == Closed || m_state == Errored)
return;
// FIXME: Set shouldApplyBackpressure correctly.
bool shouldApplyBackpressure = false;
if (shouldApplyBackpressure)
return;
m_isPulling = true;
if (m_isStarted)
m_source->pullSource();
else
m_isSchedulingPull = true;
m_source->pullSource();
}
void ReadableStream::trace(Visitor* visitor)
......
......@@ -72,13 +72,12 @@ private:
virtual bool isQueueEmpty() const = 0;
virtual void clearQueue() = 0;
void callOrSchedulePull();
void callPullIfNeeded();
Member<UnderlyingSource> m_source;
bool m_isStarted;
bool m_isDraining;
bool m_isPulling;
bool m_isSchedulingPull;
State m_state;
Member<WaitPromise> m_wait;
......
......@@ -105,8 +105,17 @@ public:
StringStream* construct()
{
Checkpoint checkpoint;
{
InSequence s;
EXPECT_CALL(checkpoint, Call(0));
EXPECT_CALL(*m_underlyingSource, pullSource()).Times(1);
EXPECT_CALL(checkpoint, Call(1));
}
StringStream* stream = new StringStream(scriptState()->executionContext(), m_underlyingSource);
checkpoint.Call(0);
stream->didSourceStart();
checkpoint.Call(1);
return stream;
}
......@@ -118,6 +127,14 @@ public:
TEST_F(ReadableStreamTest, Start)
{
Checkpoint checkpoint;
{
InSequence s;
EXPECT_CALL(checkpoint, Call(0));
EXPECT_CALL(*m_underlyingSource, pullSource()).Times(1);
EXPECT_CALL(checkpoint, Call(1));
}
StringStream* stream = new StringStream(scriptState()->executionContext(), m_underlyingSource);
EXPECT_FALSE(m_exceptionState.hadException());
EXPECT_FALSE(stream->isStarted());
......@@ -125,11 +142,13 @@ TEST_F(ReadableStreamTest, Start)
EXPECT_FALSE(stream->isPulling());
EXPECT_EQ(stream->state(), ReadableStream::Waiting);
checkpoint.Call(0);
stream->didSourceStart();
checkpoint.Call(1);
EXPECT_TRUE(stream->isStarted());
EXPECT_FALSE(stream->isDraining());
EXPECT_FALSE(stream->isPulling());
EXPECT_TRUE(stream->isPulling());
EXPECT_EQ(stream->state(), ReadableStream::Waiting);
}
......@@ -157,22 +176,12 @@ TEST_F(ReadableStreamTest, WaitOnWaiting)
EXPECT_EQ(ReadableStream::Waiting, stream->state());
EXPECT_TRUE(stream->isStarted());
EXPECT_FALSE(stream->isPulling());
{
InSequence s;
EXPECT_CALL(checkpoint, Call(0));
EXPECT_CALL(*m_underlyingSource, pullSource()).Times(1);
EXPECT_CALL(checkpoint, Call(1));
}
EXPECT_TRUE(stream->isPulling());
checkpoint.Call(0);
ScriptPromise p = stream->wait(scriptState());
ScriptPromise q = stream->wait(scriptState());
checkpoint.Call(1);
EXPECT_EQ(ReadableStream::Waiting, stream->state());
EXPECT_TRUE(stream->isPulling());
EXPECT_EQ(q, p);
}
......@@ -188,19 +197,17 @@ TEST_F(ReadableStreamTest, WaitDuringStarting)
{
InSequence s;
EXPECT_CALL(checkpoint, Call(0));
EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(*m_underlyingSource, pullSource()).Times(1);
EXPECT_CALL(checkpoint, Call(1));
}
checkpoint.Call(0);
stream->wait(scriptState());
checkpoint.Call(1);
EXPECT_TRUE(stream->isPulling());
checkpoint.Call(0);
stream->didSourceStart();
checkpoint.Call(1);
EXPECT_EQ(ReadableStream::Waiting, stream->state());
EXPECT_TRUE(stream->isStarted());
EXPECT_TRUE(stream->isPulling());
}
......@@ -209,11 +216,6 @@ TEST_F(ReadableStreamTest, WaitAndError)
StringStream* stream = construct();
String onFulfilled, onRejected;
{
InSequence s;
EXPECT_CALL(*m_underlyingSource, pullSource()).Times(1);
}
ScriptPromise promise = stream->wait(scriptState());
promise.then(createCaptor(&onFulfilled), createCaptor(&onRejected));
EXPECT_EQ(ReadableStream::Waiting, stream->state());
......@@ -283,11 +285,6 @@ TEST_F(ReadableStreamTest, WaitAndEnqueue)
String onFulfilled, onRejected;
EXPECT_EQ(ReadableStream::Waiting, stream->state());
{
InSequence s;
EXPECT_CALL(*m_underlyingSource, pullSource()).Times(1);
}
stream->wait(scriptState()).then(createCaptor(&onFulfilled), createCaptor(&onRejected));
isolate()->RunMicrotasks();
......@@ -314,11 +311,6 @@ TEST_F(ReadableStreamTest, WaitAndEnqueueAndError)
String onFulfilled, onRejected;
EXPECT_EQ(ReadableStream::Waiting, stream->state());
{
InSequence s;
EXPECT_CALL(*m_underlyingSource, pullSource()).Times(1);
}
ScriptPromise promise = stream->wait(scriptState());
promise.then(createCaptor(&onFulfilled), createCaptor(&onRejected));
isolate()->RunMicrotasks();
......@@ -352,11 +344,6 @@ TEST_F(ReadableStreamTest, CloseWhenWaiting)
StringStream* stream = construct();
{
InSequence s;
EXPECT_CALL(*m_underlyingSource, pullSource()).Times(1);
}
EXPECT_EQ(ReadableStream::Waiting, stream->state());
stream->wait(scriptState()).then(createCaptor(&onWaitFulfilled), createCaptor(&onWaitRejected));
stream->closed(scriptState()).then(createCaptor(&onClosedFulfilled), createCaptor(&onClosedRejected));
......@@ -475,7 +462,7 @@ TEST_F(ReadableStreamTest, EnqueuedAndRead)
EXPECT_TRUE(onRejected.isNull());
}
TEST_F(ReadableStreamTest, EnqueTwiceAndRead)
TEST_F(ReadableStreamTest, EnqueueTwiceAndRead)
{
StringStream* stream = construct();
Checkpoint checkpoint;
......@@ -483,6 +470,7 @@ TEST_F(ReadableStreamTest, EnqueTwiceAndRead)
{
InSequence s;
EXPECT_CALL(checkpoint, Call(0));
EXPECT_CALL(*m_underlyingSource, pullSource()).Times(1);
EXPECT_CALL(checkpoint, Call(1));
}
......@@ -492,14 +480,14 @@ TEST_F(ReadableStreamTest, EnqueTwiceAndRead)
EXPECT_EQ(ReadableStream::Readable, stream->state());
EXPECT_FALSE(stream->isPulling());
checkpoint.Call(0);
String chunk;
checkpoint.Call(0);
EXPECT_TRUE(stream->read(scriptState(), m_exceptionState).toString(chunk));
checkpoint.Call(1);
EXPECT_FALSE(m_exceptionState.hadException());
EXPECT_EQ("hello", chunk);
EXPECT_EQ(ReadableStream::Readable, stream->state());
EXPECT_FALSE(stream->isPulling());
EXPECT_TRUE(stream->isPulling());
EXPECT_FALSE(stream->isDraining());
ScriptPromise newPromise = stream->wait(scriptState());
......@@ -509,7 +497,6 @@ TEST_F(ReadableStreamTest, EnqueTwiceAndRead)
TEST_F(ReadableStreamTest, CloseWhenReadable)
{
StringStream* stream = construct();
String onWaitFulfilled, onWaitRejected;
String onClosedFulfilled, onClosedRejected;
stream->closed(scriptState()).then(createCaptor(&onClosedFulfilled), createCaptor(&onClosedRejected));
......@@ -538,21 +525,16 @@ TEST_F(ReadableStreamTest, CloseWhenReadable)
EXPECT_EQ("bye", chunk);
EXPECT_FALSE(m_exceptionState.hadException());
EXPECT_NE(promise, stream->wait(scriptState()));
stream->wait(scriptState()).then(createCaptor(&onWaitFulfilled), createCaptor(&onWaitRejected));
EXPECT_EQ(promise, stream->wait(scriptState()));
EXPECT_EQ(ReadableStream::Closed, stream->state());
EXPECT_FALSE(stream->isPulling());
EXPECT_TRUE(stream->isDraining());
EXPECT_TRUE(onWaitFulfilled.isNull());
EXPECT_TRUE(onWaitRejected.isNull());
EXPECT_TRUE(onClosedFulfilled.isNull());
EXPECT_TRUE(onClosedRejected.isNull());
isolate()->RunMicrotasks();
EXPECT_EQ("undefined", onWaitFulfilled);
EXPECT_TRUE(onWaitRejected.isNull());
EXPECT_EQ("undefined", onClosedFulfilled);
EXPECT_TRUE(onClosedRejected.isNull());
}
......@@ -604,13 +586,12 @@ TEST_F(ReadableStreamTest, CancelWhenWaiting)
{
InSequence s;
EXPECT_CALL(*m_underlyingSource, pullSource()).Times(1);
EXPECT_CALL(*m_underlyingSource, cancelSource(scriptState(), reason)).WillOnce(Return(promise));
}
EXPECT_EQ(ReadableStream::Waiting, stream->state());
ScriptPromise wait = stream->wait(scriptState());
EXPECT_EQ(promise, stream->cancel(scriptState(), reason));
EXPECT_NE(promise, stream->cancel(scriptState(), reason));
EXPECT_EQ(ReadableStream::Closed, stream->state());
EXPECT_EQ(stream->wait(scriptState()), wait);
......@@ -627,6 +608,7 @@ TEST_F(ReadableStreamTest, CancelWhenReadable)
{
StringStream* stream = construct();
String onFulfilled, onRejected;
String onCancelFulfilled, onCancelRejected;
ScriptValue reason(scriptState(), v8String(scriptState()->isolate(), "reason"));
ScriptPromise promise = ScriptPromise::cast(scriptState(), v8String(scriptState()->isolate(), "hello"));
......@@ -638,18 +620,21 @@ TEST_F(ReadableStreamTest, CancelWhenReadable)
stream->enqueue("hello");
ScriptPromise wait = stream->wait(scriptState());
EXPECT_EQ(ReadableStream::Readable, stream->state());
EXPECT_EQ(promise, stream->cancel(scriptState(), reason));
ScriptPromise cancelResult = stream->cancel(scriptState(), reason);
cancelResult.then(createCaptor(&onCancelFulfilled), createCaptor(&onCancelRejected));
EXPECT_NE(promise, cancelResult);
EXPECT_EQ(ReadableStream::Closed, stream->state());
EXPECT_NE(stream->wait(scriptState()), wait);
EXPECT_EQ(stream->wait(scriptState()), wait);
stream->wait(scriptState()).then(createCaptor(&onFulfilled), createCaptor(&onRejected));
EXPECT_TRUE(onFulfilled.isNull());
EXPECT_TRUE(onRejected.isNull());
EXPECT_TRUE(onCancelFulfilled.isNull());
EXPECT_TRUE(onCancelRejected.isNull());
isolate()->RunMicrotasks();
EXPECT_EQ("undefined", onFulfilled);
EXPECT_TRUE(onRejected.isNull());
EXPECT_EQ("undefined", onCancelFulfilled);
EXPECT_TRUE(onCancelRejected.isNull());
}
TEST_F(ReadableStreamTest, ReadableArrayBufferCompileTest)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment