Commit c43eba93 authored by Ben Kelly's avatar Ben Kelly Committed by Commit Bot

Make DataPipeBytesConsumer support ReadableStream loading better.

This CL fixes some edge condition interactions when being loaded as
a ReadableStream.  In particular, it:

1. Properly distinguishes between the end of the DataPipe and a closed
   DataPipe with bytes left to be read.  A ReadableStream that is not
   actively draining the pipe could get closed too early.
2. Responses must support explicit completion in order to handle error
   conditions properly.  This CL makes DataPipeBytesConsumer wait
   for an explicit signal before closing.
3. Service worker navigation preload is updated to provide the explicit
   completion signals.

Bug: 894815
Change-Id: I8cff3de94aa2dcbc8deb4a9601a95c13b8ab94d9
Reviewed-on: https://chromium-review.googlesource.com/c/1272715
Commit-Queue: Ben Kelly <wanderview@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarMarijn Kruisselbrink <mek@chromium.org>
Reviewed-by: default avatarKinuko Yasuda <kinuko@chromium.org>
Cr-Commit-Position: refs/heads/master@{#599192}
parent b85f543a
......@@ -2295,7 +2295,94 @@ IN_PROC_BROWSER_TEST_F(ServiceWorkerNavigationPreloadTest,
EXPECT_EQ(body, LoadNavigationPreloadTestPage(page_url, worker_url, "PASS"));
// The page request must be sent only once, since the worker responded with
// "Hello world".
// a synthetic Response.
EXPECT_EQ(1, GetRequestCount(kPageUrl));
}
IN_PROC_BROWSER_TEST_F(ServiceWorkerNavigationPreloadTest,
GetLargeResponseCloneText) {
const char kPageUrl[] = "/service_worker/navigation_preload.html";
const char kWorkerUrl[] = "/service_worker/navigation_preload.js";
std::string title = "<title>PASS</title>";
// A large body that exceeds the default size of a mojo::DataPipe.
constexpr size_t kBodySize = 128 * 1024;
// Randomly generate the body data
int index = 0;
std::string body;
for (size_t i = 0; i < kBodySize; ++i) {
body += static_cast<char>(index + 'a');
index = (37 * index + 11) % 26;
}
const std::string kScript =
kEnableNavigationPreloadScript +
"self.addEventListener('fetch', event => {\n"
" event.respondWith(\n"
" event.preloadResponse\n"
" .then(response => response.clone())\n"
" .then(response => response.text())\n"
" .then(text =>\n"
" new Response(\n"
" text,\n"
" {headers: [['content-type', 'text/html']]})));\n"
" });";
const GURL page_url = embedded_test_server()->GetURL(kPageUrl);
const GURL worker_url = embedded_test_server()->GetURL(kWorkerUrl);
RegisterStaticFile(kPageUrl, title + body, "text/html");
RegisterStaticFile(kWorkerUrl, kScript, "text/javascript");
EXPECT_EQ(body, LoadNavigationPreloadTestPage(page_url, worker_url, "PASS"));
// The page request must be sent only once, since the worker responded with
// a synthetic Response.
EXPECT_EQ(1, GetRequestCount(kPageUrl));
}
IN_PROC_BROWSER_TEST_F(ServiceWorkerNavigationPreloadTest,
GetLargeResponseReadableStream) {
const char kPageUrl[] = "/service_worker/navigation_preload.html";
const char kWorkerUrl[] = "/service_worker/navigation_preload.js";
std::string title = "<title>PASS</title>";
// A large body that exceeds the default size of a mojo::DataPipe.
constexpr size_t kBodySize = 128 * 1024;
// Randomly generate the body data
int index = 0;
std::string body;
for (size_t i = 0; i < kBodySize; ++i) {
body += static_cast<char>(index + 'a');
index = (37 * index + 11) % 26;
}
const std::string kScript =
kEnableNavigationPreloadScript +
"function drain(reader) {\n"
" var data = [];\n"
" var decoder = new TextDecoder();\n"
" function nextChunk(chunk) {\n"
" if (chunk.done)\n"
" return data.join();\n"
" data.push(decoder.decode(chunk.value));\n"
" return reader.read().then(nextChunk);\n"
" }\n"
" return reader.read().then(nextChunk);\n"
"}\n"
"self.addEventListener('fetch', event => {\n"
" event.respondWith(\n"
" event.preloadResponse\n"
" .then(response => response.body.getReader())\n"
" .then(reader => drain(reader))\n"
" .then(text =>\n"
" new Response(\n"
" text,\n"
" {headers: [['content-type', 'text/html']]})));\n"
" });";
const GURL page_url = embedded_test_server()->GetURL(kPageUrl);
const GURL worker_url = embedded_test_server()->GetURL(kWorkerUrl);
RegisterStaticFile(kPageUrl, title + body, "text/html");
RegisterStaticFile(kWorkerUrl, kScript, "text/javascript");
EXPECT_EQ(body, LoadNavigationPreloadTestPage(page_url, worker_url, "PASS"));
// The page request must be sent only once, since the worker responded with
// a synthetic Response.
EXPECT_EQ(1, GetRequestCount(kPageUrl));
}
......
......@@ -132,8 +132,12 @@ class CORE_EXPORT BytesConsumer
// Drains the data as a ScopedDataPipeConsumerHandle.
// When this function returns a valid handle, the returned pipe handle
// contains bytes that would be read through the BeginRead and
// EndRead functions without calling this function. In such a case, this
// object becomes closed.
// EndRead functions without calling this function. The consumer may
// become closed or remain in the open state depending on if it has
// received an explicit completion signal. If the consumer becomes
// closed OnstateChange() will *not* be called. Instead manually
// call GetPublicState() to check if draining closed the consumer.
//
// When this function returns an invalid handle, this function does nothing.
virtual mojo::ScopedDataPipeConsumerHandle DrainAsDataPipe() {
return mojo::ScopedDataPipeConsumerHandle();
......
......@@ -21,7 +21,14 @@ DataPipeBytesConsumer::DataPipeBytesConsumer(
data_pipe_(std::move(data_pipe)),
watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
execution_context->GetTaskRunner(TaskType::kNetworking)) {}
execution_context->GetTaskRunner(TaskType::kNetworking)) {
if (!data_pipe_.is_valid())
return;
watcher_.Watch(
data_pipe_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
WTF::BindRepeating(&DataPipeBytesConsumer::Notify, WrapPersistent(this)));
}
DataPipeBytesConsumer::~DataPipeBytesConsumer() {}
......@@ -35,6 +42,11 @@ BytesConsumer::Result DataPipeBytesConsumer::BeginRead(const char** buffer,
if (state_ == InternalState::kErrored)
return Result::kError;
// If we have already reached the end of the pipe then we are simply
// waiting for either SignalComplete() or SignalError() to be called.
if (!data_pipe_.is_valid())
return Result::kShouldWait;
uint32_t pipe_available = 0;
MojoResult rv =
data_pipe_->BeginReadData(reinterpret_cast<const void**>(buffer),
......@@ -46,11 +58,15 @@ BytesConsumer::Result DataPipeBytesConsumer::BeginRead(const char** buffer,
*available = pipe_available;
return Result::kOk;
case MOJO_RESULT_SHOULD_WAIT:
MaybeStartWatching();
watcher_.ArmOrNotify();
return Result::kShouldWait;
case MOJO_RESULT_FAILED_PRECONDITION:
Close();
ClearDataPipe();
MaybeClose();
// We hit the end of the pipe, but we may still need to wait for
// SignalComplete() or SignalError() to be called.
if (IsReadableOrWaiting())
return Result::kShouldWait;
return Result::kDone;
default:
SetError();
......@@ -63,14 +79,22 @@ BytesConsumer::Result DataPipeBytesConsumer::BeginRead(const char** buffer,
BytesConsumer::Result DataPipeBytesConsumer::EndRead(size_t read) {
DCHECK(is_in_two_phase_read_);
is_in_two_phase_read_ = false;
DCHECK(state_ == InternalState::kReadable ||
state_ == InternalState::kWaiting);
DCHECK(IsReadableOrWaiting());
MojoResult rv = data_pipe_->EndReadData(read);
if (rv != MOJO_RESULT_OK) {
SetError();
return Result::kError;
}
total_read_ += read;
if (has_pending_complete_) {
has_pending_complete_ = false;
SignalComplete();
return Result::kOk;
}
if (has_pending_error_) {
has_pending_error_ = false;
SignalError();
return Result::kError;
}
if (has_pending_notification_) {
has_pending_notification_ = false;
execution_context_->GetTaskRunner(TaskType::kNetworking)
......@@ -82,15 +106,18 @@ BytesConsumer::Result DataPipeBytesConsumer::EndRead(size_t read) {
mojo::ScopedDataPipeConsumerHandle DataPipeBytesConsumer::DrainAsDataPipe() {
DCHECK(!is_in_two_phase_read_);
watcher_.Cancel();
mojo::ScopedDataPipeConsumerHandle data_pipe = std::move(data_pipe_);
Cancel();
MaybeClose();
// The caller is responsible for calling GetPublicState to determine if
// the consumer has closed due to draining.
return data_pipe;
}
void DataPipeBytesConsumer::SetClient(BytesConsumer::Client* client) {
DCHECK(!client_);
DCHECK(client);
if (state_ == InternalState::kReadable || state_ == InternalState::kWaiting)
if (IsReadableOrWaiting())
client_ = client;
}
......@@ -99,13 +126,9 @@ void DataPipeBytesConsumer::ClearClient() {
}
void DataPipeBytesConsumer::Cancel() {
if (state_ == InternalState::kReadable || state_ == InternalState::kWaiting) {
// We don't want the client to be notified in this case.
BytesConsumer::Client* client = client_;
client_ = nullptr;
Close();
client_ = client;
}
DCHECK(!is_in_two_phase_read_);
ClearDataPipe();
SignalComplete();
}
BytesConsumer::PublicState DataPipeBytesConsumer::GetPublicState() const {
......@@ -118,66 +141,107 @@ void DataPipeBytesConsumer::Trace(blink::Visitor* visitor) {
BytesConsumer::Trace(visitor);
}
void DataPipeBytesConsumer::Close() {
bool DataPipeBytesConsumer::IsReadableOrWaiting() const {
return state_ == InternalState::kReadable ||
state_ == InternalState::kWaiting;
}
void DataPipeBytesConsumer::MaybeClose() {
DCHECK(!is_in_two_phase_read_);
if (state_ == InternalState::kClosed)
if (!completion_signaled_ || data_pipe_.is_valid() || !IsReadableOrWaiting())
return;
DCHECK(state_ == InternalState::kReadable ||
state_ == InternalState::kWaiting);
DCHECK(!watcher_.IsWatching());
state_ = InternalState::kClosed;
data_pipe_ = mojo::ScopedDataPipeConsumerHandle();
watcher_.Cancel();
ClearClient();
}
void DataPipeBytesConsumer::SignalComplete() {
if (!IsReadableOrWaiting() || has_pending_complete_ || has_pending_error_)
return;
if (is_in_two_phase_read_) {
has_pending_complete_ = true;
return;
}
completion_signaled_ = true;
Client* client = client_;
MaybeClose();
if (!IsReadableOrWaiting()) {
if (client)
client->OnStateChange();
return;
}
// We have the explicit completion signal, but we may still need to wait
// to hit the end of the pipe. Arm the watcher to make sure we see the
// pipe close even if the stream is not being actively read.
watcher_.ArmOrNotify();
}
void DataPipeBytesConsumer::SignalError() {
if (!IsReadableOrWaiting() || has_pending_complete_ || has_pending_error_)
return;
if (is_in_two_phase_read_) {
has_pending_error_ = true;
return;
}
Client* client = client_;
// When we hit an error we switch states immediately. We don't wait for the
// end of the pipe to be read.
SetError();
if (client)
client->OnStateChange();
}
void DataPipeBytesConsumer::SetError() {
DCHECK(!is_in_two_phase_read_);
if (state_ == InternalState::kErrored)
if (!IsReadableOrWaiting())
return;
DCHECK(state_ == InternalState::kReadable ||
state_ == InternalState::kWaiting);
ClearDataPipe();
state_ = InternalState::kErrored;
data_pipe_ = mojo::ScopedDataPipeConsumerHandle();
watcher_.Cancel();
error_ = Error("error");
ClearClient();
}
void DataPipeBytesConsumer::Notify(MojoResult) {
if (state_ == InternalState::kClosed || state_ == InternalState::kErrored) {
if (!IsReadableOrWaiting())
return;
}
// If the pipe signals us in the middle of our client reading, then delay
// processing the signal until the read is complete.
if (is_in_two_phase_read_) {
has_pending_notification_ = true;
return;
}
uint32_t read_size = 0;
MojoResult rv =
data_pipe_->ReadData(nullptr, &read_size, MOJO_READ_DATA_FLAG_NONE);
// Use QuerySignalsState() instead of a zero-length read so that we can
// detect a closed pipe with data left to read. A zero-length read cannot
// distinguish that case from the end of the pipe.
mojo::HandleSignalsState state = data_pipe_->QuerySignalsState();
BytesConsumer::Client* client = client_;
switch (rv) {
case MOJO_RESULT_OK:
case MOJO_RESULT_FAILED_PRECONDITION:
break;
case MOJO_RESULT_SHOULD_WAIT:
watcher_.ArmOrNotify();
if (state.never_readable()) {
// We've reached the end of the pipe.
ClearDataPipe();
MaybeClose();
// If we're still waiting for the explicit completion signal then
// return immediately. The client needs to keep waiting.
if (IsReadableOrWaiting())
return;
default:
SetError();
break;
} else if (!state.readable()) {
// We were signaled, but the pipe is still not readable. Continue to wait.
// We don't need to notify the client.
watcher_.ArmOrNotify();
return;
}
if (client)
client->OnStateChange();
}
void DataPipeBytesConsumer::MaybeStartWatching() {
if (watcher_.IsWatching())
return;
watcher_.Watch(
data_pipe_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
WTF::BindRepeating(&DataPipeBytesConsumer::Notify, WrapPersistent(this)));
void DataPipeBytesConsumer::ClearDataPipe() {
DCHECK(!is_in_two_phase_read_);
watcher_.Cancel();
data_pipe_.reset();
}
} // namespace blink
......@@ -39,12 +39,19 @@ class CORE_EXPORT DataPipeBytesConsumer final : public BytesConsumer {
void Trace(blink::Visitor*) override;
// One of these methods must be called to signal the end of the data
// stream. We cannot assume that the end of the pipe completes the
// stream successfully since errors can occur after the last byte is
// written into the pipe.
void SignalComplete();
void SignalError();
private:
void Close();
bool IsReadableOrWaiting() const;
void MaybeClose();
void SetError();
void Notify(MojoResult);
void MaybeStartWatching();
void ClearDataPipe();
Member<ExecutionContext> execution_context_;
mojo::ScopedDataPipeConsumerHandle data_pipe_;
......@@ -54,7 +61,9 @@ class CORE_EXPORT DataPipeBytesConsumer final : public BytesConsumer {
Error error_;
bool is_in_two_phase_read_ = false;
bool has_pending_notification_ = false;
uint64_t total_read_ = 0;
bool has_pending_complete_ = false;
bool has_pending_error_ = false;
bool completion_signaled_ = false;
};
} // namespace blink
......
......@@ -11,6 +11,8 @@
namespace blink {
class DataPipeBytesConsumerTest : public PageTestBase {
public:
using PublicState = BytesConsumer::PublicState;
using Result = BytesConsumer::Result;
void SetUp() override { PageTestBase::SetUp(IntSize()); }
};
......@@ -26,16 +28,218 @@ TEST_F(DataPipeBytesConsumerTest, TwoPhaseRead) {
ASSERT_EQ(MOJO_RESULT_OK, rv);
ASSERT_EQ(kData.size(), write_size);
// Close the producer so the consumer will reach the kDone state.
// Close the producer so the consumer will reach the kDone state after
// completion is signaled below.
pipe.producer_handle.reset();
BytesConsumer* consumer = new DataPipeBytesConsumer(
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
consumer->SignalComplete();
auto result = (new BytesConsumerTestUtil::TwoPhaseReader(consumer))->Run();
EXPECT_EQ(BytesConsumer::Result::kDone, result.first);
EXPECT_EQ(Result::kDone, result.first);
EXPECT_EQ(
kData,
BytesConsumerTestUtil::CharVectorToString(result.second).Utf8().data());
}
TEST_F(DataPipeBytesConsumerTest, TwoPhaseRead_SignalError) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
const std::string kData = "Such hospitality. I'm underwhelmed.";
uint32_t write_size = kData.size();
MojoResult rv = pipe.producer_handle->WriteData(kData.c_str(), &write_size,
MOJO_WRITE_DATA_FLAG_NONE);
ASSERT_EQ(MOJO_RESULT_OK, rv);
ASSERT_EQ(kData.size(), write_size);
pipe.producer_handle.reset();
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
// Then explicitly signal an error. This should override the pipe completion
// and result in kError.
consumer->SignalError();
auto result = (new BytesConsumerTestUtil::TwoPhaseReader(consumer))->Run();
EXPECT_EQ(Result::kError, result.first);
EXPECT_TRUE(result.second.IsEmpty());
}
// Verify that both the DataPipe must close and SignalComplete()
// must be called for the DataPipeBytesConsumer to reach the closed
// state.
TEST_F(DataPipeBytesConsumerTest, EndOfPipeBeforeComplete) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
const char* buffer = nullptr;
size_t available = 0;
Result rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
pipe.producer_handle.reset();
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
consumer->SignalComplete();
EXPECT_EQ(PublicState::kClosed, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kDone, rv);
}
TEST_F(DataPipeBytesConsumerTest, CompleteBeforeEndOfPipe) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
const char* buffer = nullptr;
size_t available = 0;
Result rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
consumer->SignalComplete();
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
pipe.producer_handle.reset();
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kDone, rv);
EXPECT_EQ(PublicState::kClosed, consumer->GetPublicState());
}
// Verify that SignalError moves the DataPipeBytesConsumer to the
// errored state immediately without waiting for the end of the
// DataPipe.
TEST_F(DataPipeBytesConsumerTest, EndOfPipeBeforeError) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
const char* buffer = nullptr;
size_t available = 0;
Result rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
pipe.producer_handle.reset();
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
consumer->SignalError();
EXPECT_EQ(PublicState::kErrored, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kError, rv);
}
TEST_F(DataPipeBytesConsumerTest, ErrorBeforeEndOfPipe) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
const char* buffer = nullptr;
size_t available = 0;
Result rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
consumer->SignalError();
EXPECT_EQ(PublicState::kErrored, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kError, rv);
pipe.producer_handle.reset();
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kError, rv);
EXPECT_EQ(PublicState::kErrored, consumer->GetPublicState());
}
// Verify that draining the DataPipe and SignalComplete() will
// close the DataPipeBytesConsumer.
TEST_F(DataPipeBytesConsumerTest, DrainPipeBeforeComplete) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
const char* buffer = nullptr;
size_t available = 0;
Result rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
mojo::ScopedDataPipeConsumerHandle drained = consumer->DrainAsDataPipe();
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
consumer->SignalComplete();
EXPECT_EQ(PublicState::kClosed, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kDone, rv);
}
TEST_F(DataPipeBytesConsumerTest, CompleteBeforeDrainPipe) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
const char* buffer = nullptr;
size_t available = 0;
Result rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
consumer->SignalComplete();
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
mojo::ScopedDataPipeConsumerHandle drained = consumer->DrainAsDataPipe();
EXPECT_EQ(PublicState::kClosed, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kDone, rv);
EXPECT_EQ(PublicState::kClosed, consumer->GetPublicState());
}
} // namespace blink
......@@ -506,12 +506,7 @@ class FetchDataLoaderAsDataPipe final : public FetchDataLoader,
// the pipe to be closed to signal completion.
mojo::ScopedDataPipeConsumerHandle pipe_consumer =
consumer->DrainAsDataPipe();
if (pipe_consumer.is_valid()) {
data_pipe_watcher_.Watch(
pipe_consumer.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
WTF::BindRepeating(&FetchDataLoaderAsDataPipe::OnClosed,
WrapWeakPersistent(this)));
} else {
if (!pipe_consumer.is_valid()) {
// If we cannot drain the pipe from the consumer then we must copy
// data from the consumer into a new pipe.
MojoCreateDataPipeOptions options;
......@@ -535,18 +530,20 @@ class FetchDataLoaderAsDataPipe final : public FetchDataLoader,
out_data_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
WTF::BindRepeating(&FetchDataLoaderAsDataPipe::OnWritable,
WrapWeakPersistent(this)));
data_pipe_watcher_.ArmOrNotify();
}
// Give the resulting pipe consumer handle to the client.
DCHECK(pipe_consumer.is_valid());
client_->DidFetchDataStartedDataPipe(std::move(pipe_consumer));
data_pipe_watcher_.ArmOrNotify();
}
void OnClosed(MojoResult) {
StopInternal();
client_->DidFetchDataLoadedDataPipe();
// Its possible that the consumer is immediately closed after draining
// the DataPipe.
if (consumer->GetPublicState() == BytesConsumer::PublicState::kClosed) {
StopInternal();
client_->DidFetchDataLoadedDataPipe();
}
}
void OnWritable(MojoResult) { OnStateChange(); }
......
......@@ -53,6 +53,8 @@ class CORE_EXPORT FetchDataLoader
}
// This is called after all data are read from |handle| and written
// to |out_data_pipe|, and |out_data_pipe| is closed or aborted.
// This may be called synchronously from FetchDataLoader::Start() or
// delayed to a later task.
virtual void DidFetchDataLoadedDataPipe() { NOTREACHED(); }
// This function is called when a "custom" FetchDataLoader (none of the
......
......@@ -114,13 +114,15 @@ void FetchEvent::OnNavigationPreloadResponse(
DCHECK(!preload_response_);
ScriptState::Scope scope(script_state);
preload_response_ = std::move(response);
if (data_pipe.is_valid()) {
data_pipe_consumer_ = new DataPipeBytesConsumer(
ExecutionContext::From(script_state), std::move(data_pipe));
}
// TODO(ricea): Verify that this response can't be aborted from JS.
FetchResponseData* response_data =
data_pipe.is_valid()
data_pipe_consumer_
? FetchResponseData::CreateWithBuffer(new BodyStreamBuffer(
script_state,
new DataPipeBytesConsumer(ExecutionContext::From(script_state),
std::move(data_pipe)),
script_state, data_pipe_consumer_,
new AbortSignal(ExecutionContext::From(script_state))))
: FetchResponseData::Create();
Vector<KURL> url_list(1);
......@@ -148,6 +150,10 @@ void FetchEvent::OnNavigationPreloadError(
std::unique_ptr<WebServiceWorkerError> error) {
if (!script_state->ContextIsValid())
return;
if (data_pipe_consumer_) {
data_pipe_consumer_->SignalError();
data_pipe_consumer_ = nullptr;
}
DCHECK(preload_response_property_);
if (preload_response_property_->GetState() !=
PreloadResponseProperty::kPending) {
......@@ -164,6 +170,10 @@ void FetchEvent::OnNavigationPreloadComplete(
int64_t encoded_body_length,
int64_t decoded_body_length) {
DCHECK(preload_response_);
if (data_pipe_consumer_) {
data_pipe_consumer_->SignalComplete();
data_pipe_consumer_ = nullptr;
}
std::unique_ptr<WebURLResponse> response = std::move(preload_response_);
ResourceResponse resource_response = response->ToResourceResponse();
resource_response.SetEncodedDataLength(encoded_data_length);
......@@ -187,6 +197,7 @@ void FetchEvent::Trace(blink::Visitor* visitor) {
visitor->Trace(observer_);
visitor->Trace(request_);
visitor->Trace(preload_response_property_);
visitor->Trace(data_pipe_consumer_);
ExtendableEvent::Trace(visitor);
ContextClient::Trace(visitor);
}
......
......@@ -22,6 +22,7 @@
namespace blink {
class DataPipeBytesConsumer;
class ExceptionState;
class FetchRespondWithObserver;
class Request;
......@@ -95,6 +96,7 @@ class MODULES_EXPORT FetchEvent final
TraceWrapperMember<Request> request_;
Member<PreloadResponseProperty> preload_response_property_;
std::unique_ptr<WebURLResponse> preload_response_;
Member<DataPipeBytesConsumer> data_pipe_consumer_;
String client_id_;
bool is_reload_;
};
......
......@@ -138,9 +138,25 @@ class FetchLoaderClient final
}
void DidFetchDataLoadedDataPipe() override {
DCHECK(handle_);
// If this method is called synchronously from StartLoading() then we need
// to delay notifying the handle until after
// RespondToFetchEventWithResponseStream() is called.
if (!started_) {
pending_complete_ = true;
return;
}
pending_complete_ = false;
handle_->Completed();
}
void DidFetchDataLoadFailed() override {
// If this method is called synchronously from StartLoading() then we need
// to delay notifying the handle until after
// RespondToFetchEventWithResponseStream() is called.
if (!started_) {
pending_failure_ = true;
return;
}
pending_failure_ = false;
if (handle_)
handle_->Aborted();
}
......@@ -152,6 +168,17 @@ class FetchLoaderClient final
handle_->Aborted();
}
void SetStarted() {
DCHECK(!started_);
// Note that RespondToFetchEventWithResponseStream() has been called and
// flush any pending operation.
started_ = true;
if (pending_complete_)
DidFetchDataLoadedDataPipe();
else if (pending_failure_)
DidFetchDataLoadFailed();
}
WebServiceWorkerStreamHandle* Handle() const { return handle_.get(); }
void Trace(blink::Visitor* visitor) override {
......@@ -160,6 +187,9 @@ class FetchLoaderClient final
private:
std::unique_ptr<WebServiceWorkerStreamHandle> handle_;
bool started_ = false;
bool pending_complete_ = false;
bool pending_failure_ = false;
};
} // namespace
......@@ -324,6 +354,8 @@ void FetchRespondWithObserver::OnResponseFulfilled(
->RespondToFetchEventWithResponseStream(
event_id_, web_response, fetch_loader_client->Handle(),
event_dispatch_time_, base::TimeTicks::Now());
fetch_loader_client->SetStarted();
return;
}
ServiceWorkerGlobalScopeClient::From(GetExecutionContext())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment