Commit 7fd0a1f1 authored by yhirano@chromium.org's avatar yhirano@chromium.org

Add 'stream' to XMLHttpRequest response type.

Make XMLHttpRequest return a ReadableStream when the response type is set to
'stream'. This CL also modifies ReadableStream interface and implementation to
make it enable for XMLHttpRequest to use ReadableStream functionalities.

BUG=401396

Review URL: https://codereview.chromium.org/455303002

git-svn-id: svn://svn.chromium.org/blink/trunk@180432 bbb929c8-8fbe-4397-9dbb-9b2b20218538
parent 4b5aff85
<!DOCTYPE html>
<html>
<body>
<script src="../resources/testharness.js"></script>
<script src="../resources/testharnessreport.js"></script>
<script type="text/javascript">
var testInLoadingState = async_test('Test aborting XMLHttpRequest with responseType set to "stream" in LOADING state.');
testInLoadingState.step(function()
{
var xhr = new XMLHttpRequest;
xhr.responseType = 'stream';
var seenStates = [];
xhr.onreadystatechange = testInLoadingState.step_func(function() {
// onreadystatechange can be invoked multiple times in LOADING state.
if (seenStates.length == 0 || xhr.readyState != seenStates[seenStates.length - 1])
seenStates.push(xhr.readyState);
switch (xhr.readyState) {
case xhr.UNSENT:
assert_unreached('Unexpected readyState: UNSENT');
return;
case xhr.OPENED:
case xhr.HEADERS_RECEIVED:
return;
case xhr.LOADING:
var stream = xhr.response;
assert_true(stream instanceof ReadableStream, 'xhr.response shoud be ReadableStream');
assert_equals(stream.state, 'readable', 'stream state before abort() call');
assert_array_equals(seenStates, [xhr.OPENED, xhr.HEADERS_RECEIVED, xhr.LOADING]);
xhr.abort();
assert_equals(stream.state, 'errored', 'stream state after abort() call');
assert_equals(xhr.readyState, xhr.UNSENT, 'xhr.readyState after abort() call');
assert_equals(xhr.response, null, 'xhr.response after abort() call');
assert_array_equals(seenStates, [xhr.OPENED, xhr.HEADERS_RECEIVED, xhr.LOADING, xhr.DONE]);
testInLoadingState.done();
return;
case xhr.DONE:
return;
default:
assert_unreached('Unexpected readyState: ' + xhr.readyState);
return;
}
});
xhr.open('GET', '../resources/test.ogv', true);
xhr.send();
});
var testInDoneState = async_test('Test aborting XMLHttpRequest with responseType set to "stream" in DONE state.');
testInDoneState.step(function()
{
var xhr = new XMLHttpRequest;
xhr.responseType = 'stream';
var seenStates = [];
xhr.onreadystatechange = testInDoneState.step_func(function() {
// onreadystatechange can be invoked multiple times in LOADING state.
if (seenStates.length == 0 || xhr.readyState != seenStates[seenStates.length - 1])
seenStates.push(xhr.readyState);
switch (xhr.readyState) {
case xhr.UNSENT:
case xhr.OPENED:
case xhr.HEADERS_RECEIVED:
case xhr.LOADING:
return;
case xhr.DONE:
var stream = xhr.response;
assert_true(stream instanceof ReadableStream, 'xhr.response shoud be ReadableStream');
assert_equals(stream.state, 'readable', 'stream state before abort() call');
assert_equals(xhr.status, 200, 'xhr.status');
assert_not_equals(xhr.response, null, 'xhr.response during DONE');
xhr.abort();
assert_equals(stream.state, 'errored', 'stream state after abort() call');
assert_equals(xhr.readyState, xhr.UNSENT, 'xhr.readyState after abort() call');
assert_equals(xhr.response, null, 'xhr.response after abort() call');
assert_array_equals(seenStates, [xhr.OPENED, xhr.HEADERS_RECEIVED, xhr.LOADING, xhr.DONE]);
testInDoneState.done();
return;
default:
assert_unreached('Unexpected readyState: ' + xhr.readyState);
return;
}
});
xhr.open('GET', '../resources/test.ogv', true);
xhr.send();
});
</script>
</body>
</html>
<!DOCTYPE html>
<script src="../resources/testharness.js"></script>
<script src="../resources/testharnessreport.js"></script>
<script type="text/javascript">
var test = async_test('Test canceling XMLHttpRequest with responseType set to "stream".');
test.step(function()
{
var xhr = new XMLHttpRequest;
xhr.responseType = 'stream';
var seenStates = [];
xhr.onreadystatechange = test.step_func(function() {
// onreadystatechange can be invoked multiple times in LOADING state.
if (seenStates.length == 0 || xhr.readyState != seenStates[seenStates.length - 1])
seenStates.push(xhr.readyState);
switch (xhr.readyState) {
case xhr.UNSENT:
assert_unreached('Unexpected readyState: UNSENT');
return;
case xhr.OPENED:
assert_equals(xhr.response, null, 'xhr.response during OPENED');
return;
case xhr.HEADERS_RECEIVED:
assert_equals(xhr.response, null, 'xhr.response during HEADERS_RECEIVED');
return;
case xhr.LOADING:
var stream = xhr.response;
assert_true(stream instanceof ReadableStream,
'xhr.response should be ReadableStream during LOADING');
stream.cancel('canceled via ReadableStream.cancel');
assert_equals(stream.state, 'closed', 'stream.state after cancel');
// Check that we saw all states.
assert_array_equals(seenStates,
[xhr.OPENED, xhr.HEADERS_RECEIVED, xhr.LOADING, xhr.DONE]);
assert_equals(xhr.readyState, xhr.UNSENT, 'xhr.readyState after cancel');
assert_equals(xhr.response, null, 'xhr.response after cancel');
stream.closed.then(test.step_func(function(value) {
assert_equals(value, undefined,
'stream.closed should be resolved with undefined');
test.done();
}), test.step_func(function() {
assert_unreached('stream.closed should not be rejected');
}));
case xhr.DONE:
return;
default:
assert_unreached('Unexpected readyState: ' + xhr.readyState)
return;
}
});
xhr.open('GET', '../resources/load-and-stall.php?name=test.ogv&stallAt=32768&stallFor=10', true);
xhr.send();
});
</script>
<!DOCTYPE html>
<script src="../resources/testharness.js"></script>
<script src="../resources/testharnessreport.js"></script>
<script type="text/javascript">
var test = async_test('Test response of XMLHttpRequest with responseType set to "stream" for various readyState.');
test.step(function()
{
var xhr = new XMLHttpRequest;
xhr.responseType = 'stream';
assert_equals(xhr.responseType, 'stream', 'xhr.responseType');
assert_equals(xhr.readyState, xhr.UNSENT, 'xhr.readyState');
assert_equals(xhr.response, null, 'xhr.response during UNSENT');
var seenStates = [];
function readStream(stream) {
var chunks = [];
function rec(resolve, reject) {
while (stream.state === 'readable') {
chunks.push(stream.read());
}
if (stream.state === 'closed') {
resolve(chunks);
return;
}
stream.wait().then(function() {
rec(resolve, reject);
}).catch(reject);
}
return new Promise(rec);
}
var streamPromise = undefined;
xhr.onreadystatechange = test.step_func(function() {
// onreadystatechange can be invoked multiple times in LOADING state.
if (seenStates.length == 0 || xhr.readyState != seenStates[seenStates.length - 1])
seenStates.push(xhr.readyState);
switch (xhr.readyState) {
case xhr.UNSENT:
assert_unreached('Unexpected readyState: UNSENT');
return;
case xhr.OPENED:
assert_equals(xhr.response, null, 'xhr.response during OPENED');
return;
case xhr.HEADERS_RECEIVED:
assert_equals(xhr.response, null, 'xhr.response during HEADERS_RECEIVED');
return;
case xhr.LOADING:
assert_not_equals(xhr.response, null, 'xhr.response during LOADING');
assert_true(xhr.response instanceof ReadableStream,
'xhr.response should be ReadableStream during LOADING');
if (streamPromise === undefined) {
streamPromise = readStream(xhr.response);
}
return;
case xhr.DONE:
assert_equals(xhr.status, 200, 'xhr.status');
// Check that we saw all states.
assert_array_equals(seenStates,
[xhr.OPENED, xhr.HEADERS_RECEIVED, xhr.LOADING, xhr.DONE]);
assert_not_equals(streamPromise, undefined, 'streamPromise');
streamPromise.then(test.step_func(function(chunks) {
assert_equals(xhr.response.state, 'closed', 'stream status');
var size = 0;
for (var i = 0; i < chunks.length; ++i) {
size += chunks[i].byteLength;
}
assert_equals(size, 103746, 'response size');
test.done();
}), test.step_func(function(e) {
assert_unreached('failed to read the response stream: ' + e);
}));
return;
default:
assert_unreached('Unexpected readyState: ' + xhr.readyState)
return;
}
});
xhr.open('GET', '../resources/test.ogv', true);
xhr.send();
});
</script>
......@@ -38,11 +38,13 @@
#include "bindings/core/v8/V8Document.h"
#include "bindings/core/v8/V8FormData.h"
#include "bindings/core/v8/V8HTMLDocument.h"
#include "bindings/core/v8/V8ReadableStream.h"
#include "bindings/core/v8/V8Stream.h"
#include "bindings/core/v8/custom/V8ArrayBufferCustom.h"
#include "bindings/core/v8/custom/V8ArrayBufferViewCustom.h"
#include "core/dom/Document.h"
#include "core/inspector/InspectorInstrumentation.h"
#include "core/streams/ReadableStream.h"
#include "core/streams/Stream.h"
#include "core/workers/WorkerGlobalScope.h"
#include "core/xml/XMLHttpRequest.h"
......@@ -132,7 +134,14 @@ void V8XMLHttpRequest::responseAttributeGetterCustom(const v8::PropertyCallbackI
case XMLHttpRequest::ResponseTypeLegacyStream:
{
Stream* stream = xmlHttpRequest->responseStream();
Stream* stream = xmlHttpRequest->responseLegacyStream();
v8SetReturnValueFast(info, stream, xmlHttpRequest);
return;
}
case XMLHttpRequest::ResponseTypeStream:
{
ReadableStream* stream = xmlHttpRequest->responseStream();
v8SetReturnValueFast(info, stream, xmlHttpRequest);
return;
}
......
......@@ -16,43 +16,39 @@
namespace blink {
class ReadableStream::OnStarted : public ScriptFunction {
public:
OnStarted(v8::Isolate* isolate, ReadableStream* stream)
: ScriptFunction(isolate)
, m_stream(stream) { }
virtual ScriptValue call(ScriptValue value) OVERRIDE
{
m_stream->onStarted();
return value;
}
private:
Persistent<ReadableStream> m_stream;
};
ReadableStream::ReadableStream(ScriptState* scriptState, UnderlyingSource* source, ExceptionState* exceptionState)
: ContextLifecycleObserver(scriptState->executionContext())
, m_source(source)
ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSource* source)
: m_source(source)
, m_isStarted(false)
, m_isDraining(false)
, m_isPulling(false)
, m_isSchedulingPull(false)
, m_state(Waiting)
, m_wait(new WaitPromise(scriptState->executionContext(), this, WaitPromise::Ready))
, m_closed(new ClosedPromise(scriptState->executionContext(), this, ClosedPromise::Closed))
, m_wait(new WaitPromise(executionContext, this, WaitPromise::Ready))
, m_closed(new ClosedPromise(executionContext, this, ClosedPromise::Closed))
{
ScriptWrappable::init(this);
ScriptPromise promise = source->startSource(exceptionState);
// The underlying source calls |this->error| on failure.
promise.then(adoptPtr(new OnStarted(scriptState->isolate(), this)));
}
ReadableStream::~ReadableStream()
{
}
String ReadableStream::stateString() const
{
switch (m_state) {
case Readable:
return "readable";
case Waiting:
return "waiting";
case Closed:
return "closed";
case Errored:
return "errored";
}
ASSERT(false);
return String();
}
bool ReadableStream::enqueuePreliminaryCheck(size_t chunkSize)
{
if (m_state == Errored || m_state == Closed || m_isDraining)
......@@ -88,18 +84,18 @@ void ReadableStream::close()
}
}
void ReadableStream::readPreliminaryCheck(ExceptionState* exceptionState)
void ReadableStream::readPreliminaryCheck(ExceptionState& exceptionState)
{
if (m_state == Waiting) {
exceptionState->throwTypeError("read is called while state is waiting");
exceptionState.throwTypeError("read is called while state is waiting");
return;
}
if (m_state == Closed) {
exceptionState->throwTypeError("read is called while state is closed");
exceptionState.throwTypeError("read is called while state is closed");
return;
}
if (m_state == Errored) {
exceptionState->throwDOMException(m_exception->code(), m_exception->message());
exceptionState.throwDOMException(m_exception->code(), m_exception->message());
return;
}
}
......@@ -174,7 +170,7 @@ void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception)
}
}
void ReadableStream::onStarted()
void ReadableStream::didSourceStart()
{
m_isStarted = true;
if (m_isSchedulingPull)
......
......@@ -23,7 +23,7 @@ class DOMException;
class ExceptionState;
class UnderlyingSource;
class ReadableStream : public GarbageCollectedFinalized<ReadableStream>, public ScriptWrappable, public ContextLifecycleObserver {
class ReadableStream : public GarbageCollectedFinalized<ReadableStream>, public ScriptWrappable {
public:
enum State {
Readable,
......@@ -34,15 +34,19 @@ public:
// FIXME: Define Strategy here.
// FIXME: Add |strategy| constructor parameter.
ReadableStream(ScriptState*, UnderlyingSource*, ExceptionState*);
// After ReadableStream construction, |didSourceStart| must be called when
// |source| initialization succeeds and |error| must be called when
// |source| initialization fails.
ReadableStream(ExecutionContext*, UnderlyingSource* /* source */);
virtual ~ReadableStream();
bool isStarted() const { return m_isStarted; }
bool isDraining() const { return m_isDraining; }
bool isPulling() const { return m_isPulling; }
State state() const { return m_state; }
String stateString() const;
virtual ScriptValue read(ScriptState*, ExceptionState*) = 0;
virtual ScriptValue read(ScriptState*, ExceptionState&) = 0;
ScriptPromise wait(ScriptState*);
ScriptPromise cancel(ScriptState*, ScriptValue reason);
ScriptPromise closed(ScriptState*);
......@@ -50,23 +54,23 @@ public:
void close();
void error(PassRefPtrWillBeRawPtr<DOMException>);
void didSourceStart();
virtual void trace(Visitor*);
protected:
bool enqueuePreliminaryCheck(size_t chunkSize);
bool enqueuePostAction(size_t totalQueueSize);
void readPreliminaryCheck(ExceptionState*);
void readPreliminaryCheck(ExceptionState&);
void readPostAction();
private:
class OnStarted;
typedef ScriptPromiseProperty<Member<ReadableStream>, V8UndefinedType, RefPtrWillBeMember<DOMException> > WaitPromise;
typedef ScriptPromiseProperty<Member<ReadableStream>, V8UndefinedType, RefPtrWillBeMember<DOMException> > ClosedPromise;
virtual bool isQueueEmpty() const = 0;
virtual void clearQueue() = 0;
void onStarted(void);
void callOrSchedulePull();
Member<UnderlyingSource> m_source;
......
......@@ -2,8 +2,22 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
enum ReadableStreamState {
"readable",
"waiting",
"closed",
"errored"
};
[
RuntimeEnabled=Stream,
GarbageCollected
] interface ReadableStream {
[CallWith=ScriptState, RaisesException] any read();
[CallWith=ScriptState] Promise wait();
[ImplementedAs=stateString] readonly attribute ReadableStreamState state;
[CallWith=ScriptState] Promise cancel(any reason);
[CallWith=ScriptState] readonly attribute Promise closed;
};
......@@ -42,7 +42,7 @@ public:
typedef RefPtr<ArrayBuffer> HoldType;
typedef PassRefPtr<ArrayBuffer> PassType;
static size_t size(PassType value) { return value->byteLength(); }
static size_t size(const PassType& value) { return value->byteLength(); }
static size_t size(const HoldType& value) { return value->byteLength(); }
static ScriptValue toScriptValue(ScriptState* scriptState, const HoldType& value)
{
......@@ -56,13 +56,13 @@ public:
template <typename ChunkTypeTraits>
class ReadableStreamImpl : public ReadableStream {
public:
ReadableStreamImpl(ScriptState* scriptState, UnderlyingSource* source, ExceptionState* exceptionState)
: ReadableStream(scriptState, source, exceptionState)
ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* source)
: ReadableStream(executionContext, source)
, m_totalQueueSize(0) { }
virtual ~ReadableStreamImpl() { }
// ReadableStream methods
virtual ScriptValue read(ScriptState*, ExceptionState*) OVERRIDE;
virtual ScriptValue read(ScriptState*, ExceptionState&) OVERRIDE;
bool enqueue(typename ChunkTypeTraits::PassType);
......@@ -96,10 +96,10 @@ bool ReadableStreamImpl<ChunkTypeTraits>::enqueue(typename ChunkTypeTraits::Pass
}
template <typename ChunkTypeTraits>
ScriptValue ReadableStreamImpl<ChunkTypeTraits>::read(ScriptState* scriptState, ExceptionState* exceptionState)
ScriptValue ReadableStreamImpl<ChunkTypeTraits>::read(ScriptState* scriptState, ExceptionState& exceptionState)
{
readPreliminaryCheck(exceptionState);
if (exceptionState->hadException())
if (exceptionState.hadException())
return ScriptValue();
ASSERT(state() == Readable);
ASSERT(!m_queue.isEmpty());
......
......@@ -49,11 +49,11 @@ private:
String* m_value;
};
class MockUnderlyingSource : public UnderlyingSource {
class MockUnderlyingSource : public GarbageCollectedFinalized<MockUnderlyingSource>, public UnderlyingSource {
USING_GARBAGE_COLLECTED_MIXIN(MockUnderlyingSource);
public:
virtual ~MockUnderlyingSource() { }
MOCK_METHOD1(startSource, ScriptPromise(ExceptionState*));
MOCK_METHOD0(pullSource, void());
MOCK_METHOD2(cancelSource, ScriptPromise(ScriptState*, ScriptValue));
};
......@@ -96,16 +96,10 @@ public:
return StringCapturingFunction::create(isolate(), value);
}
// Note: This function calls RunMicrotasks.
StringStream* construct()
{
RefPtr<ScriptPromiseResolver> resolver = ScriptPromiseResolver::create(scriptState());
ScriptPromise promise = resolver->promise();
resolver->resolve();
EXPECT_CALL(*m_underlyingSource, startSource(&m_exceptionState)).WillOnce(Return(promise));
StringStream* stream = new StringStream(scriptState(), m_underlyingSource, &m_exceptionState);
isolate()->RunMicrotasks();
StringStream* stream = new StringStream(scriptState()->executionContext(), m_underlyingSource);
stream->didSourceStart();
return stream;
}
......@@ -115,27 +109,16 @@ public:
ExceptionState m_exceptionState;
};
TEST_F(ReadableStreamTest, Construct)
TEST_F(ReadableStreamTest, Start)
{
RefPtr<ScriptPromiseResolver> resolver = ScriptPromiseResolver::create(scriptState());
ScriptPromise promise = resolver->promise();
{
InSequence s;
EXPECT_CALL(*m_underlyingSource, startSource(&m_exceptionState)).WillOnce(Return(promise));
}
StringStream* stream = new StringStream(scriptState(), m_underlyingSource, &m_exceptionState);
StringStream* stream = new StringStream(scriptState()->executionContext(), m_underlyingSource);
EXPECT_FALSE(m_exceptionState.hadException());
EXPECT_FALSE(stream->isStarted());
EXPECT_FALSE(stream->isDraining());
EXPECT_FALSE(stream->isPulling());
EXPECT_EQ(stream->state(), ReadableStream::Waiting);
isolate()->RunMicrotasks();
EXPECT_FALSE(stream->isStarted());
resolver->resolve();
isolate()->RunMicrotasks();
stream->didSourceStart();
EXPECT_TRUE(stream->isStarted());
EXPECT_FALSE(stream->isDraining());
......@@ -143,42 +126,16 @@ TEST_F(ReadableStreamTest, Construct)
EXPECT_EQ(stream->state(), ReadableStream::Waiting);
}
TEST_F(ReadableStreamTest, ConstructError)
TEST_F(ReadableStreamTest, StartFail)
{
{
InSequence s;
EXPECT_CALL(*m_underlyingSource, startSource(&m_exceptionState))
.WillOnce(DoAll(Invoke(ThrowError("hello")), Return(ScriptPromise())));
}
new StringStream(scriptState(), m_underlyingSource, &m_exceptionState);
EXPECT_TRUE(m_exceptionState.hadException());
}
TEST_F(ReadableStreamTest, StartFailAsynchronously)
{
RefPtr<ScriptPromiseResolver> resolver = ScriptPromiseResolver::create(scriptState());
ScriptPromise promise = resolver->promise();
{
InSequence s;
EXPECT_CALL(*m_underlyingSource, startSource(&m_exceptionState)).WillOnce(Return(promise));
}
StringStream* stream = new StringStream(scriptState(), m_underlyingSource, &m_exceptionState);
StringStream* stream = new StringStream(scriptState()->executionContext(), m_underlyingSource);
EXPECT_FALSE(m_exceptionState.hadException());
EXPECT_FALSE(stream->isStarted());
EXPECT_FALSE(stream->isDraining());
EXPECT_FALSE(stream->isPulling());
EXPECT_EQ(stream->state(), ReadableStream::Waiting);
isolate()->RunMicrotasks();
EXPECT_FALSE(stream->isStarted());
EXPECT_FALSE(stream->isDraining());
EXPECT_FALSE(stream->isPulling());
EXPECT_EQ(stream->state(), ReadableStream::Waiting);
resolver->reject();
stream->error(DOMException::create(NotFoundError));
isolate()->RunMicrotasks();
EXPECT_FALSE(stream->isStarted());
EXPECT_FALSE(stream->isDraining());
......@@ -214,13 +171,7 @@ TEST_F(ReadableStreamTest, WaitOnWaiting)
TEST_F(ReadableStreamTest, WaitDuringStarting)
{
RefPtr<ScriptPromiseResolver> resolver = ScriptPromiseResolver::create(scriptState());
ScriptPromise promise = resolver->promise();
{
InSequence s;
EXPECT_CALL(*m_underlyingSource, startSource(&m_exceptionState)).WillOnce(Return(promise));
}
StringStream* stream = new StringStream(scriptState(), m_underlyingSource, &m_exceptionState);
StringStream* stream = new StringStream(scriptState()->executionContext(), m_underlyingSource);
Checkpoint checkpoint;
EXPECT_EQ(ReadableStream::Waiting, stream->state());
......@@ -240,8 +191,7 @@ TEST_F(ReadableStreamTest, WaitDuringStarting)
EXPECT_TRUE(stream->isPulling());
resolver->resolve();
isolate()->RunMicrotasks();
stream->didSourceStart();
EXPECT_EQ(ReadableStream::Waiting, stream->state());
EXPECT_TRUE(stream->isPulling());
......@@ -442,7 +392,7 @@ TEST_F(ReadableStreamTest, ReadWhenWaiting)
EXPECT_EQ(ReadableStream::Waiting, stream->state());
EXPECT_FALSE(m_exceptionState.hadException());
stream->read(scriptState(), &m_exceptionState);
stream->read(scriptState(), m_exceptionState);
EXPECT_EQ(ReadableStream::Waiting, stream->state());
EXPECT_TRUE(m_exceptionState.hadException());
EXPECT_EQ(V8TypeError, m_exceptionState.code());
......@@ -457,7 +407,7 @@ TEST_F(ReadableStreamTest, ReadWhenClosed)
EXPECT_EQ(ReadableStream::Closed, stream->state());
EXPECT_FALSE(m_exceptionState.hadException());
stream->read(scriptState(), &m_exceptionState);
stream->read(scriptState(), m_exceptionState);
EXPECT_EQ(ReadableStream::Closed, stream->state());
EXPECT_TRUE(m_exceptionState.hadException());
EXPECT_EQ(V8TypeError, m_exceptionState.code());
......@@ -475,7 +425,7 @@ TEST_F(ReadableStreamTest, ReadWhenErrored)
EXPECT_EQ(ReadableStream::Errored, stream->state());
EXPECT_FALSE(m_exceptionState.hadException());
stream->read(scriptState(), &m_exceptionState);
stream->read(scriptState(), m_exceptionState);
EXPECT_EQ(ReadableStream::Errored, stream->state());
EXPECT_TRUE(m_exceptionState.hadException());
EXPECT_EQ(notFoundExceptionCode, m_exceptionState.code());
......@@ -502,7 +452,7 @@ TEST_F(ReadableStreamTest, EnqueuedAndRead)
checkpoint.Call(0);
String chunk;
EXPECT_TRUE(stream->read(scriptState(), &m_exceptionState).toString(chunk));
EXPECT_TRUE(stream->read(scriptState(), m_exceptionState).toString(chunk));
checkpoint.Call(1);
EXPECT_FALSE(m_exceptionState.hadException());
EXPECT_EQ("hello", chunk);
......@@ -537,7 +487,7 @@ TEST_F(ReadableStreamTest, EnqueTwiceAndRead)
checkpoint.Call(0);
String chunk;
EXPECT_TRUE(stream->read(scriptState(), &m_exceptionState).toString(chunk));
EXPECT_TRUE(stream->read(scriptState(), m_exceptionState).toString(chunk));
checkpoint.Call(1);
EXPECT_FALSE(m_exceptionState.hadException());
EXPECT_EQ("hello", chunk);
......@@ -567,7 +517,7 @@ TEST_F(ReadableStreamTest, CloseWhenReadable)
EXPECT_TRUE(stream->isDraining());
String chunk;
EXPECT_TRUE(stream->read(scriptState(), &m_exceptionState).toString(chunk));
EXPECT_TRUE(stream->read(scriptState(), m_exceptionState).toString(chunk));
EXPECT_EQ("hello", chunk);
EXPECT_EQ(promise, stream->wait(scriptState()));
......@@ -577,7 +527,7 @@ TEST_F(ReadableStreamTest, CloseWhenReadable)
EXPECT_FALSE(stream->isPulling());
EXPECT_TRUE(stream->isDraining());
EXPECT_TRUE(stream->read(scriptState(), &m_exceptionState).toString(chunk));
EXPECT_TRUE(stream->read(scriptState(), m_exceptionState).toString(chunk));
EXPECT_EQ("bye", chunk);
EXPECT_FALSE(m_exceptionState.hadException());
......@@ -698,12 +648,7 @@ TEST_F(ReadableStreamTest, CancelWhenReadable)
TEST_F(ReadableStreamTest, ReadableArrayBufferCompileTest)
{
// This test tests if ReadableStreamImpl<ArrayBuffer> can be instantiated.
{
InSequence s;
EXPECT_CALL(*m_underlyingSource, startSource(&m_exceptionState)).WillOnce(Return(ScriptPromise()));
}
new ReadableStreamImpl<ReadableStreamChunkTypeTraits<ArrayBuffer> >(scriptState(), m_underlyingSource, &m_exceptionState);
new ReadableStreamImpl<ReadableStreamChunkTypeTraits<ArrayBuffer> >(scriptState()->executionContext(), m_underlyingSource);
}
} // namespace blink
......
......@@ -5,6 +5,7 @@
#ifndef UnderlyingSource_h
#define UnderlyingSource_h
#include "bindings/core/v8/ScriptPromise.h"
#include "bindings/core/v8/ScriptValue.h"
#include "platform/heap/Heap.h"
......@@ -13,13 +14,10 @@ namespace blink {
class ExceptionState;
class ScriptState;
class UnderlyingSource : public GarbageCollectedFinalized<UnderlyingSource> {
class UnderlyingSource : public GarbageCollectedMixin {
public:
virtual ~UnderlyingSource() { }
// When startSource fails asynchronously, it must call
// ReadableStream::error with a DOM exception.
virtual ScriptPromise startSource(ExceptionState*) = 0;
virtual void pullSource() = 0;
virtual ScriptPromise cancelSource(ScriptState*, ScriptValue reason) = 0;
virtual void trace(Visitor*) { }
......
......@@ -26,6 +26,7 @@
#include "bindings/core/v8/ExceptionState.h"
#include "core/FetchInitiatorTypeNames.h"
#include "core/dom/ContextFeatures.h"
#include "core/dom/DOMException.h"
#include "core/dom/DOMImplementation.h"
#include "core/dom/ExceptionCode.h"
#include "core/dom/XMLDocument.h"
......@@ -44,7 +45,10 @@
#include "core/inspector/InspectorInstrumentation.h"
#include "core/inspector/InspectorTraceEvents.h"
#include "core/loader/ThreadableLoader.h"
#include "core/streams/ReadableStream.h"
#include "core/streams/ReadableStreamImpl.h"
#include "core/streams/Stream.h"
#include "core/streams/UnderlyingSource.h"
#include "core/xml/XMLHttpRequestProgressEvent.h"
#include "core/xml/XMLHttpRequestUpload.h"
#include "platform/Logging.h"
......@@ -111,6 +115,34 @@ static void logConsoleError(ExecutionContext* context, const String& message)
context->addConsoleMessage(ConsoleMessage::create(JSMessageSource, ErrorMessageLevel, message));
}
namespace {
class ReadableStreamSource : public GarbageCollectedFinalized<ReadableStreamSource>, public UnderlyingSource {
USING_GARBAGE_COLLECTED_MIXIN(ReadableStreamSource);
public:
ReadableStreamSource(XMLHttpRequest* owner) : m_owner(owner) { }
virtual ~ReadableStreamSource() { }
virtual void pullSource() OVERRIDE { }
virtual ScriptPromise cancelSource(ScriptState* scriptState, ScriptValue reason) OVERRIDE
{
m_owner->abort();
return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate()));
}
virtual void trace(Visitor* visitor) OVERRIDE
{
visitor->trace(m_owner);
UnderlyingSource::trace(visitor);
}
private:
// This is RawPtr in non-oilpan build to avoid the reference cycle. To
// avoid use-after free, the associated ReadableStream must be closed
// or errored when m_owner is gone.
RawPtrWillBeMember<XMLHttpRequest> m_owner;
};
} // namespace
PassRefPtrWillBeRawPtr<XMLHttpRequest> XMLHttpRequest::create(ExecutionContext* context, PassRefPtr<SecurityOrigin> securityOrigin)
{
RefPtrWillBeRawPtr<XMLHttpRequest> xmlHttpRequest = adoptRefWillBeRefCountedGarbageCollected(new XMLHttpRequest(context, securityOrigin));
......@@ -295,14 +327,23 @@ ArrayBuffer* XMLHttpRequest::responseArrayBuffer()
return m_responseArrayBuffer.get();
}
Stream* XMLHttpRequest::responseStream()
Stream* XMLHttpRequest::responseLegacyStream()
{
ASSERT(m_responseTypeCode == ResponseTypeLegacyStream);
if (m_error || (m_state != LOADING && m_state != DONE))
return 0;
return m_responseStream.get();
return m_responseLegacyStream.get();
}
ReadableStream* XMLHttpRequest::responseStream()
{
ASSERT(m_responseTypeCode == ResponseTypeStream);
if (m_error || (m_state != LOADING && m_state != DONE))
return 0;
return m_responseStream;
}
void XMLHttpRequest::setTimeout(unsigned long timeout, ExceptionState& exceptionState)
......@@ -356,6 +397,11 @@ void XMLHttpRequest::setResponseType(const String& responseType, ExceptionState&
m_responseTypeCode = ResponseTypeLegacyStream;
else
return;
} else if (responseType == "stream") {
if (RuntimeEnabledFeatures::streamEnabled())
m_responseTypeCode = ResponseTypeStream;
else
return;
} else {
ASSERT_NOT_REACHED();
}
......@@ -378,6 +424,8 @@ String XMLHttpRequest::responseType()
return "arraybuffer";
case ResponseTypeLegacyStream:
return "legacystream";
case ResponseTypeStream:
return "stream";
}
return "";
}
......@@ -893,8 +941,15 @@ bool XMLHttpRequest::internalAbort()
InspectorInstrumentation::didFailXHRLoading(executionContext(), this, this);
if (m_responseStream && m_state != DONE)
m_responseStream->abort();
if (m_responseLegacyStream && m_state != DONE)
m_responseLegacyStream->abort();
if (m_responseStream) {
// When the stream is already closed (including canceled from the
// user), |error| does nothing.
// FIXME: Create a more specific error.
m_responseStream->error(DOMException::create(!m_async && m_exceptionCode ? m_exceptionCode : AbortError, "XMLHttpRequest::abort"));
}
if (!m_loader)
return true;
......@@ -936,6 +991,7 @@ void XMLHttpRequest::clearResponse()
m_responseBlob = nullptr;
m_downloadedBlobLength = 0;
m_responseLegacyStream = nullptr;
m_responseStream = nullptr;
// These variables may referred by the response accessors. So, we can clear
......@@ -1229,8 +1285,11 @@ void XMLHttpRequest::didFinishLoading(unsigned long identifier, double)
if (m_decoder)
m_responseText = m_responseText.concatenateWith(m_decoder->flush());
if (m_responseLegacyStream)
m_responseLegacyStream->finalize();
if (m_responseStream)
m_responseStream->finalize();
m_responseStream->close();
clearVariablesForLoading();
......@@ -1326,9 +1385,15 @@ void XMLHttpRequest::didReceiveData(const char* data, int len)
m_binaryResponseBuilder = SharedBuffer::create();
m_binaryResponseBuilder->append(data, len);
} else if (m_responseTypeCode == ResponseTypeLegacyStream) {
if (!m_responseStream)
m_responseStream = Stream::create(executionContext(), finalResponseMIMEType());
m_responseStream->addData(data, len);
if (!m_responseLegacyStream)
m_responseLegacyStream = Stream::create(executionContext(), responseType());
m_responseLegacyStream->addData(data, len);
} else if (m_responseTypeCode == ResponseTypeStream) {
if (!m_responseStream) {
m_responseStream = new ReadableStreamImpl<ReadableStreamChunkTypeTraits<ArrayBuffer> >(executionContext(), new ReadableStreamSource(this));
m_responseStream->didSourceStart();
}
m_responseStream->enqueue(ArrayBuffer::create(data, len));
}
if (m_error)
......@@ -1417,7 +1482,9 @@ ExecutionContext* XMLHttpRequest::executionContext() const
void XMLHttpRequest::trace(Visitor* visitor)
{
visitor->trace(m_responseBlob);
visitor->trace(m_responseLegacyStream);
visitor->trace(m_responseStream);
visitor->trace(m_streamSource);
visitor->trace(m_responseDocument);
visitor->trace(m_progressEventThrottle);
visitor->trace(m_upload);
......
......@@ -26,6 +26,7 @@
#include "core/dom/ActiveDOMObject.h"
#include "core/events/EventListener.h"
#include "core/loader/ThreadableLoaderClient.h"
#include "core/streams/ReadableStreamImpl.h"
#include "core/xml/XMLHttpRequestEventTarget.h"
#include "core/xml/XMLHttpRequestProgressEventThrottle.h"
#include "platform/heap/Handle.h"
......@@ -48,6 +49,7 @@ class SharedBuffer;
class Stream;
class TextResourceDecoder;
class ThreadableLoader;
class UnderlyingSource;
typedef int ExceptionCode;
......@@ -79,7 +81,8 @@ public:
ResponseTypeDocument,
ResponseTypeBlob,
ResponseTypeArrayBuffer,
ResponseTypeLegacyStream
ResponseTypeLegacyStream,
ResponseTypeStream,
};
virtual void contextDestroyed() OVERRIDE;
......@@ -117,7 +120,8 @@ public:
ScriptString responseJSONSource();
Document* responseXML(ExceptionState&);
Blob* responseBlob();
Stream* responseStream();
Stream* responseLegacyStream();
ReadableStream* responseStream();
unsigned long timeout() const { return m_timeoutMilliseconds; }
void setTimeout(unsigned long timeout, ExceptionState&);
......@@ -230,7 +234,9 @@ private:
AtomicString m_mimeTypeOverride;
unsigned long m_timeoutMilliseconds;
RefPtrWillBeMember<Blob> m_responseBlob;
RefPtrWillBeMember<Stream> m_responseStream;
RefPtrWillBeMember<Stream> m_responseLegacyStream;
PersistentWillBeMember<ReadableStreamImpl<ReadableStreamChunkTypeTraits<ArrayBuffer> > > m_responseStream;
PersistentWillBeMember<UnderlyingSource> m_streamSource;
RefPtr<ThreadableLoader> m_loader;
State m_state;
......
......@@ -33,7 +33,8 @@ enum XMLHttpRequestResponseType {
"document",
"json",
"text",
"legacystream"
"legacystream",
"stream"
};
[
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment