Commit c4e604aa authored by Yutaka Hirano's avatar Yutaka Hirano Committed by Commit Bot

Introduce DataPipeBytesConsumer::CompletionNotifier

Move SignalComplete and SignalError from DataPipeBytesConsumer to the
class to separate them from BytesConsumer interface.

Bug: 894815
Change-Id: Ib592ba7615c9c0d0fa24e9a9e1a7a07f56cca8ab
Reviewed-on: https://chromium-review.googlesource.com/c/1304115Reviewed-by: default avatarBen Kelly <wanderview@chromium.org>
Reviewed-by: default avatarMatt Falkenhagen <falken@chromium.org>
Commit-Queue: Matt Falkenhagen <falken@chromium.org>
Cr-Commit-Position: refs/heads/master@{#604530}
parent 44108518
......@@ -14,16 +14,32 @@
namespace blink {
void DataPipeBytesConsumer::CompletionNotifier::SignalComplete() {
if (bytes_consumer_)
bytes_consumer_->SignalComplete();
}
void DataPipeBytesConsumer::CompletionNotifier::SignalError(
const BytesConsumer::Error& error) {
if (bytes_consumer_)
bytes_consumer_->SignalError(error);
}
void DataPipeBytesConsumer::CompletionNotifier::Trace(Visitor* visitor) {
visitor->Trace(bytes_consumer_);
}
DataPipeBytesConsumer::DataPipeBytesConsumer(
ExecutionContext* execution_context,
mojo::ScopedDataPipeConsumerHandle data_pipe)
mojo::ScopedDataPipeConsumerHandle data_pipe,
CompletionNotifier** notifier)
: execution_context_(execution_context),
data_pipe_(std::move(data_pipe)),
watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
execution_context->GetTaskRunner(TaskType::kNetworking)) {
if (!data_pipe_.is_valid())
return;
DCHECK(data_pipe_.is_valid());
*notifier = new CompletionNotifier(this);
watcher_.Watch(
data_pipe_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
......@@ -69,7 +85,7 @@ BytesConsumer::Result DataPipeBytesConsumer::BeginRead(const char** buffer,
return Result::kShouldWait;
return Result::kDone;
default:
SetError();
SetError(Error("error"));
return Result::kError;
}
......@@ -82,7 +98,7 @@ BytesConsumer::Result DataPipeBytesConsumer::EndRead(size_t read) {
DCHECK(IsReadableOrWaiting());
MojoResult rv = data_pipe_->EndReadData(read);
if (rv != MOJO_RESULT_OK) {
SetError();
SetError(Error("error"));
return Result::kError;
}
if (has_pending_complete_) {
......@@ -92,7 +108,7 @@ BytesConsumer::Result DataPipeBytesConsumer::EndRead(size_t read) {
}
if (has_pending_error_) {
has_pending_error_ = false;
SignalError();
SignalError(Error("error"));
return Result::kError;
}
if (has_pending_notification_) {
......@@ -176,7 +192,7 @@ void DataPipeBytesConsumer::SignalComplete() {
watcher_.ArmOrNotify();
}
void DataPipeBytesConsumer::SignalError() {
void DataPipeBytesConsumer::SignalError(const Error& error) {
if (!IsReadableOrWaiting() || has_pending_complete_ || has_pending_error_)
return;
if (is_in_two_phase_read_) {
......@@ -186,18 +202,18 @@ void DataPipeBytesConsumer::SignalError() {
Client* client = client_;
// When we hit an error we switch states immediately. We don't wait for the
// end of the pipe to be read.
SetError();
SetError(error);
if (client)
client->OnStateChange();
}
void DataPipeBytesConsumer::SetError() {
void DataPipeBytesConsumer::SetError(const Error& error) {
DCHECK(!is_in_two_phase_read_);
if (!IsReadableOrWaiting())
return;
ClearDataPipe();
state_ = InternalState::kErrored;
error_ = Error("error");
error_ = error;
ClearClient();
}
......
......@@ -18,9 +18,32 @@ namespace blink {
class ExecutionContext;
// An adapter for mojo::DataPipe. As mojo::DataPipe lacks signals completion and
// error signals, we define another interface, CompletionNotifier, for the
// signals.
class CORE_EXPORT DataPipeBytesConsumer final : public BytesConsumer {
public:
DataPipeBytesConsumer(ExecutionContext*, mojo::ScopedDataPipeConsumerHandle);
class CORE_EXPORT CompletionNotifier final
: public GarbageCollected<CompletionNotifier> {
public:
explicit CompletionNotifier(DataPipeBytesConsumer* bytes_consumer)
: bytes_consumer_(bytes_consumer) {}
// One of these methods must be called to signal the end of the data
// stream. We cannot assume that the end of the pipe completes the
// stream successfully since errors can occur after the last byte is
// written into the pipe.
void SignalComplete();
void SignalError(const BytesConsumer::Error& error);
void Trace(blink::Visitor*);
private:
const WeakMember<DataPipeBytesConsumer> bytes_consumer_;
};
DataPipeBytesConsumer(ExecutionContext*,
mojo::ScopedDataPipeConsumerHandle,
CompletionNotifier** notifier);
~DataPipeBytesConsumer() override;
Result BeginRead(const char** buffer, size_t* available) override;
......@@ -39,19 +62,14 @@ class CORE_EXPORT DataPipeBytesConsumer final : public BytesConsumer {
void Trace(blink::Visitor*) override;
// One of these methods must be called to signal the end of the data
// stream. We cannot assume that the end of the pipe completes the
// stream successfully since errors can occur after the last byte is
// written into the pipe.
void SignalComplete();
void SignalError();
private:
bool IsReadableOrWaiting() const;
void MaybeClose();
void SetError();
void SetError(const Error& error);
void Notify(MojoResult);
void ClearDataPipe();
void SignalComplete();
void SignalError(const Error& error);
Member<ExecutionContext> execution_context_;
mojo::ScopedDataPipeConsumerHandle data_pipe_;
......
......@@ -32,9 +32,10 @@ TEST_F(DataPipeBytesConsumerTest, TwoPhaseRead) {
// completion is signaled below.
pipe.producer_handle.reset();
DataPipeBytesConsumer::CompletionNotifier* notifier = nullptr;
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
consumer->SignalComplete();
&GetDocument(), std::move(pipe.consumer_handle), &notifier);
notifier->SignalComplete();
auto result = (new BytesConsumerTestUtil::TwoPhaseReader(consumer))->Run();
EXPECT_EQ(Result::kDone, result.first);
EXPECT_EQ(
......@@ -56,12 +57,13 @@ TEST_F(DataPipeBytesConsumerTest, TwoPhaseRead_SignalError) {
pipe.producer_handle.reset();
DataPipeBytesConsumer::CompletionNotifier* notifier = nullptr;
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
&GetDocument(), std::move(pipe.consumer_handle), &notifier);
// Then explicitly signal an error. This should override the pipe completion
// and result in kError.
consumer->SignalError();
notifier->SignalError(BytesConsumer::Error());
auto result = (new BytesConsumerTestUtil::TwoPhaseReader(consumer))->Run();
EXPECT_EQ(Result::kError, result.first);
......@@ -75,8 +77,9 @@ TEST_F(DataPipeBytesConsumerTest, EndOfPipeBeforeComplete) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer::CompletionNotifier* notifier = nullptr;
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
&GetDocument(), std::move(pipe.consumer_handle), &notifier);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
......@@ -91,7 +94,7 @@ TEST_F(DataPipeBytesConsumerTest, EndOfPipeBeforeComplete) {
EXPECT_EQ(Result::kShouldWait, rv);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
consumer->SignalComplete();
notifier->SignalComplete();
EXPECT_EQ(PublicState::kClosed, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
......@@ -102,8 +105,9 @@ TEST_F(DataPipeBytesConsumerTest, CompleteBeforeEndOfPipe) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer::CompletionNotifier* notifier = nullptr;
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
&GetDocument(), std::move(pipe.consumer_handle), &notifier);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
......@@ -113,7 +117,7 @@ TEST_F(DataPipeBytesConsumerTest, CompleteBeforeEndOfPipe) {
Result rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
consumer->SignalComplete();
notifier->SignalComplete();
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
......@@ -132,8 +136,9 @@ TEST_F(DataPipeBytesConsumerTest, EndOfPipeBeforeError) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer::CompletionNotifier* notifier = nullptr;
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
&GetDocument(), std::move(pipe.consumer_handle), &notifier);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
......@@ -148,7 +153,7 @@ TEST_F(DataPipeBytesConsumerTest, EndOfPipeBeforeError) {
EXPECT_EQ(Result::kShouldWait, rv);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
consumer->SignalError();
notifier->SignalError(BytesConsumer::Error());
EXPECT_EQ(PublicState::kErrored, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
......@@ -159,8 +164,9 @@ TEST_F(DataPipeBytesConsumerTest, ErrorBeforeEndOfPipe) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer::CompletionNotifier* notifier = nullptr;
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
&GetDocument(), std::move(pipe.consumer_handle), &notifier);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
......@@ -170,7 +176,7 @@ TEST_F(DataPipeBytesConsumerTest, ErrorBeforeEndOfPipe) {
Result rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
consumer->SignalError();
notifier->SignalError(BytesConsumer::Error());
EXPECT_EQ(PublicState::kErrored, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
......@@ -188,8 +194,9 @@ TEST_F(DataPipeBytesConsumerTest, DrainPipeBeforeComplete) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer::CompletionNotifier* notifier = nullptr;
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
&GetDocument(), std::move(pipe.consumer_handle), &notifier);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
......@@ -206,7 +213,7 @@ TEST_F(DataPipeBytesConsumerTest, DrainPipeBeforeComplete) {
EXPECT_EQ(Result::kShouldWait, rv);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
consumer->SignalComplete();
notifier->SignalComplete();
EXPECT_EQ(PublicState::kClosed, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
......@@ -217,8 +224,9 @@ TEST_F(DataPipeBytesConsumerTest, CompleteBeforeDrainPipe) {
mojo::DataPipe pipe;
ASSERT_TRUE(pipe.producer_handle.is_valid());
DataPipeBytesConsumer::CompletionNotifier* notifier = nullptr;
DataPipeBytesConsumer* consumer = new DataPipeBytesConsumer(
&GetDocument(), std::move(pipe.consumer_handle));
&GetDocument(), std::move(pipe.consumer_handle), &notifier);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
......@@ -228,7 +236,7 @@ TEST_F(DataPipeBytesConsumerTest, CompleteBeforeDrainPipe) {
Result rv = consumer->BeginRead(&buffer, &available);
EXPECT_EQ(Result::kShouldWait, rv);
consumer->SignalComplete();
notifier->SignalComplete();
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer->GetPublicState());
rv = consumer->BeginRead(&buffer, &available);
......
......@@ -10,7 +10,6 @@
#include "third_party/blink/renderer/bindings/core/v8/to_v8_for_core.h"
#include "third_party/blink/renderer/core/dom/abort_signal.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
#include "third_party/blink/renderer/core/fetch/data_pipe_bytes_consumer.h"
#include "third_party/blink/renderer/core/fetch/request.h"
#include "third_party/blink/renderer/core/fetch/response.h"
#include "third_party/blink/renderer/core/frame/use_counter.h"
......@@ -114,15 +113,19 @@ void FetchEvent::OnNavigationPreloadResponse(
DCHECK(!preload_response_);
ScriptState::Scope scope(script_state);
preload_response_ = std::move(response);
DataPipeBytesConsumer* bytes_consumer = nullptr;
if (data_pipe.is_valid()) {
data_pipe_consumer_ = new DataPipeBytesConsumer(
ExecutionContext::From(script_state), std::move(data_pipe));
DataPipeBytesConsumer::CompletionNotifier* completion_notifier = nullptr;
bytes_consumer =
new DataPipeBytesConsumer(ExecutionContext::From(script_state),
std::move(data_pipe), &completion_notifier);
body_completion_notifier_ = completion_notifier;
}
// TODO(ricea): Verify that this response can't be aborted from JS.
FetchResponseData* response_data =
data_pipe_consumer_
bytes_consumer
? FetchResponseData::CreateWithBuffer(new BodyStreamBuffer(
script_state, data_pipe_consumer_,
script_state, bytes_consumer,
new AbortSignal(ExecutionContext::From(script_state))))
: FetchResponseData::Create();
Vector<KURL> url_list(1);
......@@ -150,9 +153,9 @@ void FetchEvent::OnNavigationPreloadError(
std::unique_ptr<WebServiceWorkerError> error) {
if (!script_state->ContextIsValid())
return;
if (data_pipe_consumer_) {
data_pipe_consumer_->SignalError();
data_pipe_consumer_ = nullptr;
if (body_completion_notifier_) {
body_completion_notifier_->SignalError(BytesConsumer::Error());
body_completion_notifier_ = nullptr;
}
DCHECK(preload_response_property_);
if (preload_response_property_->GetState() !=
......@@ -170,9 +173,9 @@ void FetchEvent::OnNavigationPreloadComplete(
int64_t encoded_body_length,
int64_t decoded_body_length) {
DCHECK(preload_response_);
if (data_pipe_consumer_) {
data_pipe_consumer_->SignalComplete();
data_pipe_consumer_ = nullptr;
if (body_completion_notifier_) {
body_completion_notifier_->SignalComplete();
body_completion_notifier_ = nullptr;
}
std::unique_ptr<WebURLResponse> response = std::move(preload_response_);
ResourceResponse resource_response = response->ToResourceResponse();
......@@ -197,7 +200,7 @@ void FetchEvent::Trace(blink::Visitor* visitor) {
visitor->Trace(observer_);
visitor->Trace(request_);
visitor->Trace(preload_response_property_);
visitor->Trace(data_pipe_consumer_);
visitor->Trace(body_completion_notifier_);
ExtendableEvent::Trace(visitor);
ContextClient::Trace(visitor);
}
......
......@@ -11,6 +11,7 @@
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_property.h"
#include "third_party/blink/renderer/core/dom/context_lifecycle_observer.h"
#include "third_party/blink/renderer/core/fetch/data_pipe_bytes_consumer.h"
#include "third_party/blink/renderer/core/fetch/request.h"
#include "third_party/blink/renderer/modules/event_modules.h"
#include "third_party/blink/renderer/modules/modules_export.h"
......@@ -22,7 +23,6 @@
namespace blink {
class DataPipeBytesConsumer;
class ExceptionState;
class FetchRespondWithObserver;
class Request;
......@@ -96,7 +96,7 @@ class MODULES_EXPORT FetchEvent final
TraceWrapperMember<Request> request_;
Member<PreloadResponseProperty> preload_response_property_;
std::unique_ptr<WebURLResponse> preload_response_;
Member<DataPipeBytesConsumer> data_pipe_consumer_;
Member<DataPipeBytesConsumer::CompletionNotifier> body_completion_notifier_;
String client_id_;
bool is_reload_;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment