Commit 707ebb06 authored by Yutaka Hirano's avatar Yutaka Hirano Committed by Commit Bot

Introduce ResponseBodyLoader::DrainAsBytesConsumer

... in order to drain a BytesConsumer, and use it for Fetch API in the
future. As the BytesConsumer passed to ResponseBodyLoader can be
exposed, we need to create
ResourceLoader::CancelReportingBytesConsumer, a BytesConsumer that
translates a cancelling request to the ResourceLoader.

Bug: 894819
Change-Id: I1700635dbb9bed6f2b34a80ed2b4c8532733e4c4
Reviewed-on: https://chromium-review.googlesource.com/c/1442431Reviewed-by: default avatarMakoto Shimazu <shimazu@chromium.org>
Commit-Queue: Yutaka Hirano <yhirano@chromium.org>
Cr-Commit-Position: refs/heads/master@{#629112}
parent 4502ddc7
......@@ -443,6 +443,10 @@ void ResourceLoader::DidFailLoadingBody() {
DidFail(ResourceError::Failure(resource_->Url()), 0, 0, 0);
}
void ResourceLoader::DidCancelLoadingBody() {
Cancel();
}
void ResourceLoader::StartWith(const ResourceRequest& request) {
DCHECK_NE(ResourceLoadScheduler::kInvalidClientId, scheduler_client_id_);
DCHECK(loader_);
......
......@@ -169,6 +169,7 @@ class PLATFORM_EXPORT ResourceLoader final
void DidReceiveData(base::span<const char> data) override;
void DidFinishLoadingBody() override;
void DidFailLoadingBody() override;
void DidCancelLoadingBody() override;
bool ShouldFetchCodeCache();
void StartWith(const ResourceRequest&);
......
......@@ -6,6 +6,7 @@
#include <algorithm>
#include <utility>
#include "base/auto_reset.h"
#include "third_party/blink/renderer/platform/loader/fetch/fetch_context.h"
#include "third_party/blink/renderer/platform/loader/fetch/resource.h"
#include "third_party/blink/renderer/platform/loader/fetch/resource_loader.h"
......@@ -15,6 +16,243 @@ namespace blink {
constexpr size_t ResponseBodyLoader::kMaxNumConsumedBytesInTask;
class ResponseBodyLoader::DelegatingBytesConsumer final
: public BytesConsumer,
public BytesConsumer::Client {
USING_GARBAGE_COLLECTED_MIXIN(DelegatingBytesConsumer);
public:
DelegatingBytesConsumer(
BytesConsumer& bytes_consumer,
ResponseBodyLoader& loader,
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: bytes_consumer_(bytes_consumer),
loader_(loader),
task_runner_(std::move(task_runner)) {}
Result BeginRead(const char** buffer, size_t* available) override {
*buffer = nullptr;
*available = 0;
if (loader_->IsAborted()) {
return Result::kError;
}
if (loader_->IsSuspended()) {
return Result::kShouldWait;
}
auto result = bytes_consumer_->BeginRead(buffer, available);
if (result == Result::kOk) {
*available = std::min(*available, lookahead_bytes_);
if (*available == 0) {
*buffer = nullptr;
result = Result::kShouldWait;
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&DelegatingBytesConsumer::OnStateChange,
WrapPersistent(this)));
}
}
HandleResult(result);
return result;
}
Result EndRead(size_t read_size) override {
DCHECK_LE(read_size, lookahead_bytes_);
lookahead_bytes_ -= read_size;
auto result = bytes_consumer_->EndRead(read_size);
if (loader_->IsAborted()) {
return Result::kError;
}
HandleResult(result);
return result;
}
scoped_refptr<BlobDataHandle> DrainAsBlobDataHandle(
BlobSizePolicy policy) override {
if (loader_->IsAborted()) {
return nullptr;
}
auto handle = bytes_consumer_->DrainAsBlobDataHandle(policy);
if (handle) {
HandleResult(Result::kDone);
}
return handle;
}
scoped_refptr<EncodedFormData> DrainAsFormData() override {
if (loader_->IsAborted()) {
return nullptr;
}
auto form_data = bytes_consumer_->DrainAsFormData();
if (form_data) {
HandleResult(Result::kDone);
}
return form_data;
}
mojo::ScopedDataPipeConsumerHandle DrainAsDataPipe() override {
if (loader_->IsAborted()) {
return {};
}
auto handle = bytes_consumer_->DrainAsDataPipe();
if (handle) {
HandleResult(Result::kDone);
}
return handle;
}
void SetClient(BytesConsumer::Client* client) override {
DCHECK(!bytes_consumer_client_);
DCHECK(client);
if (state_ != State::kLoading) {
return;
}
bytes_consumer_client_ = client;
}
void ClearClient() override { bytes_consumer_client_ = nullptr; }
void Cancel() override {
if (state_ != State::kLoading) {
return;
}
state_ = State::kCancelled;
bytes_consumer_->Cancel();
if (in_on_state_change_) {
has_pending_signal_ = true;
return;
}
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ResponseBodyLoader::DidCancelLoadingBody,
WrapWeakPersistent(loader_.Get())));
}
PublicState GetPublicState() const override {
if (loader_->IsAborted())
return PublicState::kErrored;
return bytes_consumer_->GetPublicState();
}
Error GetError() const override { return bytes_consumer_->GetError(); }
String DebugName() const override { return "DelegatingBytesConsumer"; }
void Abort() {
if (state_ != State::kLoading) {
return;
}
if (bytes_consumer_client_) {
bytes_consumer_client_->OnStateChange();
}
}
void OnStateChange() override {
DCHECK(!in_on_state_change_);
base::AutoReset<bool> auto_reset(&in_on_state_change_, true);
if (loader_->IsAborted() || loader_->IsSuspended() ||
state_ == State::kCancelled) {
return;
}
// Peek available bytes from |bytes_consumer_| and report them to
// |loader_|.
const char* buffer = nullptr;
size_t available = 0;
// Poissible state change caused by BeginRead will be realized by the
// following logic, so we don't need to worry about it here.
auto result = bytes_consumer_->BeginRead(&buffer, &available);
if (result == Result::kOk) {
if (lookahead_bytes_ < available) {
loader_->DidReceiveData(base::make_span(buffer + lookahead_bytes_,
available - lookahead_bytes_));
lookahead_bytes_ = available;
}
// Poissible state change caused by EndRead will be realized by the
// following logic, so we don't need to worry about it here.
Result unused = bytes_consumer_->EndRead(0);
ALLOW_UNUSED_LOCAL(unused);
}
if (bytes_consumer_client_) {
bytes_consumer_client_->OnStateChange();
}
switch (GetPublicState()) {
case PublicState::kReadableOrWaiting:
break;
case PublicState::kClosed:
HandleResult(Result::kDone);
break;
case PublicState::kErrored:
HandleResult(Result::kError);
break;
}
if (has_pending_signal_) {
has_pending_signal_ = false;
switch (state_) {
case State::kLoading:
NOTREACHED();
break;
case State::kDone:
loader_->DidFinishLoadingBody();
break;
case State::kErrored:
loader_->DidFailLoadingBody();
break;
case State::kCancelled:
loader_->DidCancelLoadingBody();
break;
}
}
}
void Trace(Visitor* visitor) override {
visitor->Trace(bytes_consumer_);
visitor->Trace(loader_);
visitor->Trace(bytes_consumer_client_);
BytesConsumer::Trace(visitor);
}
private:
enum class State {
kLoading,
kDone,
kErrored,
kCancelled,
};
void HandleResult(Result result) {
if (state_ != State::kLoading) {
return;
}
if (result == Result::kDone) {
state_ = State::kDone;
if (in_on_state_change_) {
has_pending_signal_ = true;
} else {
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ResponseBodyLoader::DidFinishLoadingBody,
WrapWeakPersistent(loader_.Get())));
}
}
if (result == Result::kError) {
state_ = State::kErrored;
if (in_on_state_change_) {
has_pending_signal_ = true;
} else {
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ResponseBodyLoader::DidFailLoadingBody,
WrapWeakPersistent(loader_.Get())));
}
}
}
const Member<BytesConsumer> bytes_consumer_;
const Member<ResponseBodyLoader> loader_;
Member<BytesConsumer::Client> bytes_consumer_client_;
const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
// The size of body which has been reported to |loader_|.
size_t lookahead_bytes_ = 0;
State state_ = State::kLoading;
bool in_on_state_change_ = false;
bool has_pending_signal_ = false;
};
ResponseBodyLoader::ResponseBodyLoader(
BytesConsumer& bytes_consumer,
ResponseBodyLoaderClient& client,
......@@ -28,24 +266,38 @@ ResponseBodyLoader::ResponseBodyLoader(
mojo::ScopedDataPipeConsumerHandle ResponseBodyLoader::DrainAsDataPipe(
ResponseBodyLoaderClient** client) {
DCHECK(!started_);
DCHECK(!drained_);
DCHECK(!aborted_);
*client = nullptr;
if (drained_ || aborted_) {
return {};
}
DCHECK(bytes_consumer_);
auto data_pipe = bytes_consumer_->DrainAsDataPipe();
if (!data_pipe) {
return data_pipe;
}
if (data_pipe) {
drained_ = true;
bytes_consumer_ = nullptr;
*client = this;
}
return data_pipe;
}
BytesConsumer& ResponseBodyLoader::DrainAsBytesConsumer() {
DCHECK(!started_);
DCHECK(!drained_);
DCHECK(!aborted_);
DCHECK(bytes_consumer_);
DCHECK(!delegating_bytes_consumer_);
delegating_bytes_consumer_ = MakeGarbageCollected<DelegatingBytesConsumer>(
*bytes_consumer_, *this, task_runner_);
bytes_consumer_->ClearClient();
bytes_consumer_->SetClient(delegating_bytes_consumer_);
bytes_consumer_ = nullptr;
drained_ = true;
return *delegating_bytes_consumer_;
}
void ResponseBodyLoader::DidReceiveData(base::span<const char> data) {
if (aborted_)
return;
......@@ -79,6 +331,19 @@ void ResponseBodyLoader::DidFailLoadingBody() {
client_->DidFailLoadingBody();
}
void ResponseBodyLoader::DidCancelLoadingBody() {
if (aborted_)
return;
if (suspended_) {
cancel_signal_is_pending_ = true;
return;
}
cancel_signal_is_pending_ = false;
client_->DidCancelLoadingBody();
}
void ResponseBodyLoader::Start() {
DCHECK(!started_);
DCHECK(!drained_);
......@@ -88,13 +353,19 @@ void ResponseBodyLoader::Start() {
}
void ResponseBodyLoader::Abort() {
DCHECK(!suspended_);
if (aborted_)
return;
aborted_ = true;
if (bytes_consumer_ && !in_two_phase_read_)
if (bytes_consumer_ && !in_two_phase_read_) {
bytes_consumer_->Cancel();
}
if (delegating_bytes_consumer_) {
delegating_bytes_consumer_->Abort();
}
}
void ResponseBodyLoader::Suspend() {
......@@ -120,6 +391,10 @@ void ResponseBodyLoader::Resume() {
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ResponseBodyLoader::DidFailLoadingBody,
WrapPersistent(this)));
} else if (cancel_signal_is_pending_) {
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ResponseBodyLoader::DidCancelLoadingBody,
WrapPersistent(this)));
} else {
task_runner_->PostTask(FROM_HERE,
base::BindOnce(&ResponseBodyLoader::OnStateChange,
......@@ -178,6 +453,7 @@ void ResponseBodyLoader::OnStateChange() {
void ResponseBodyLoader::Trace(Visitor* visitor) {
visitor->Trace(bytes_consumer_);
visitor->Trace(delegating_bytes_consumer_);
visitor->Trace(client_);
ResponseBodyLoaderDrainableInterface::Trace(visitor);
ResponseBodyLoaderClient::Trace(visitor);
......
......@@ -24,22 +24,35 @@ namespace blink {
class ResponseBodyLoader;
// See ResponseBodyLoader for details. This is a virtual interface to expose
// only DrainAsDataPipe function.
// only Drain functions.
class PLATFORM_EXPORT ResponseBodyLoaderDrainableInterface
: public GarbageCollectedFinalized<ResponseBodyLoaderDrainableInterface> {
public:
virtual ~ResponseBodyLoaderDrainableInterface() = default;
// Drains the response body and returns it. This function must be called
// before calling Start(). This function may return an invalid handle when
// it is unable to convert the body to a data pipe, even when the body itself
// is valid. In that case, this function is no-op.
// If this function returns a valid handle, the caller is responsible for
// reading the body and providing the information to the client this
// function provides.
// Drains the response body and returns it. This function must not be called
// when the load has already been started or aborted, or the body has already
// been drained. This function may return an invalid handle when it is
// unable to convert the body to a data pipe, even when the body itself is
// valid. In that case, this function is no-op. If this function returns a
// valid handle, the caller is responsible for reading the body and providing
// the information to the client this function provides.
// Note that the notification from the client is *synchronously* propergated
// to the original client ResponseBodyLoader owns, e.g., when the caller
// calls ResponseBodyLoaderClient::DidCancelLoadingBody, it synchronously
// cancels the resource loading (if |this| is associated with
// blink::ResourceLoader). A user of this function should ensure that calling
// the client's method doesn't lead to a reentrant problem.
virtual mojo::ScopedDataPipeConsumerHandle DrainAsDataPipe(
ResponseBodyLoaderClient** client) = 0;
// Drains the response body and returns it. This function must not be called
// when the load has already been started or aborted, or the body has already
// been drained. Unlike DrainAsDataPipe, this function always succeeds.
// This ResponseBodyLoader will still monitor the loading signals, and report
// them back to the associated client asynchronously.
virtual BytesConsumer& DrainAsBytesConsumer() = 0;
virtual void Trace(Visitor*) {}
};
......@@ -50,6 +63,8 @@ class PLATFORM_EXPORT ResponseBodyLoaderDrainableInterface
// - By calling DrainAsDataPipe, a user can "drain" the contents from
// ResponseBodyLoader. The caller is responsible for reading the body and
// providing the information to the client this function provides.
// - By calling DrainBytesConsumer, a user can "drain" the contents from
// ResponseBodyLoader.
// A ResponseBodyLoader is bound to the thread on which it is created.
class PLATFORM_EXPORT ResponseBodyLoader final
: public ResponseBodyLoaderDrainableInterface,
......@@ -65,13 +80,14 @@ class PLATFORM_EXPORT ResponseBodyLoader final
// ResponseBodyLoaderDrainableInterface implementation.
mojo::ScopedDataPipeConsumerHandle DrainAsDataPipe(
ResponseBodyLoaderClient**) override;
BytesConsumer& DrainAsBytesConsumer() override;
// Starts loading.
void Start();
// Aborts loading. This is expected to be called from the client's side, and
// does not report the failure to the client. This doesn't affect a
// drained data pipe.
// drained data pipe. This function cannot be called when suspended.
void Abort();
// Suspendes loading.
......@@ -94,16 +110,20 @@ class PLATFORM_EXPORT ResponseBodyLoader final
static constexpr size_t kMaxNumConsumedBytesInTask = 64 * 1024;
private:
class DelegatingBytesConsumer;
// ResponseBodyLoaderClient implementation.
void DidReceiveData(base::span<const char> data) override;
void DidFinishLoadingBody() override;
void DidFailLoadingBody() override;
void DidCancelLoadingBody() override;
// BytesConsumer::Client implementation.
void OnStateChange() override;
String DebugName() const override { return "ResponseBodyLoader"; }
Member<BytesConsumer> bytes_consumer_;
Member<DelegatingBytesConsumer> delegating_bytes_consumer_;
const Member<ResponseBodyLoaderClient> client_;
const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
bool started_ = false;
......@@ -112,6 +132,7 @@ class PLATFORM_EXPORT ResponseBodyLoader final
bool drained_ = false;
bool finish_signal_is_pending_ = false;
bool fail_signal_is_pending_ = false;
bool cancel_signal_is_pending_ = false;
bool in_two_phase_read_ = false;
};
......
......@@ -22,6 +22,9 @@ class ResponseBodyLoaderClient : public GarbageCollectedMixin {
// Called when seeing an error while reading the body. This must be the last
// signal.
virtual void DidFailLoadingBody() = 0;
// Called when the loader cancelled loading the body.
virtual void DidCancelLoadingBody() = 0;
};
} // namespace blink
......
......@@ -10,6 +10,7 @@
#include "testing/gtest/include/gtest/gtest.h"
#include "third_party/blink/renderer/platform/heap/persistent.h"
#include "third_party/blink/renderer/platform/loader/fetch/data_pipe_bytes_consumer.h"
#include "third_party/blink/renderer/platform/loader/testing/bytes_consumer_test_reader.h"
#include "third_party/blink/renderer/platform/loader/testing/replaying_bytes_consumer.h"
#include "third_party/blink/renderer/platform/scheduler/test/fake_task_runner.h"
......@@ -20,6 +21,8 @@ namespace {
class ResponseBodyLoaderTest : public testing::Test {
protected:
using Command = ReplayingBytesConsumer::Command;
using PublicState = BytesConsumer::PublicState;
using Result = BytesConsumer::Result;
class TestClient final : public GarbageCollectedFinalized<TestClient>,
public ResponseBodyLoaderClient {
USING_GARBAGE_COLLECTED_MIXIN(TestClient);
......@@ -36,8 +39,9 @@ class ResponseBodyLoaderTest : public testing::Test {
virtual ~TestClient() {}
String GetData() const { return data_; }
bool LoadingIsFinished() { return finished_; }
bool LoadingIsFailed() { return failed_; }
bool LoadingIsFinished() const { return finished_; }
bool LoadingIsFailed() const { return failed_; }
bool LoadingIsCancelled() const { return cancelled_; }
void DidReceiveData(base::span<const char> data) override {
DCHECK(!finished_);
......@@ -64,6 +68,11 @@ class ResponseBodyLoaderTest : public testing::Test {
DCHECK(!failed_);
failed_ = true;
}
void DidCancelLoadingBody() override {
DCHECK(!finished_);
DCHECK(!failed_);
cancelled_ = true;
}
void SetLoader(ResponseBodyLoader& loader) { loader_ = loader; }
void Trace(Visitor* visitor) override { visitor->Trace(loader_); }
......@@ -74,9 +83,65 @@ class ResponseBodyLoaderTest : public testing::Test {
String data_;
bool finished_ = false;
bool failed_ = false;
bool cancelled_ = false;
};
class ReadingClient final : public GarbageCollectedFinalized<ReadingClient>,
public BytesConsumer::Client {
USING_GARBAGE_COLLECTED_MIXIN(ReadingClient);
public:
ReadingClient(BytesConsumer& bytes_consumer,
TestClient& test_response_body_loader_client)
: bytes_consumer_(bytes_consumer),
test_response_body_loader_client_(test_response_body_loader_client) {}
void OnStateChangeInternal() {
while (true) {
const char* buffer = nullptr;
size_t available = 0;
Result result = bytes_consumer_->BeginRead(&buffer, &available);
if (result == Result::kShouldWait)
return;
if (result == Result::kOk) {
result = bytes_consumer_->EndRead(available);
}
if (result != Result::kOk)
return;
}
}
// BytesConsumer::Client implementation
void OnStateChange() override {
on_state_change_called_ = true;
OnStateChangeInternal();
// Notification is done asynchronously.
EXPECT_FALSE(test_response_body_loader_client_->LoadingIsCancelled());
EXPECT_FALSE(test_response_body_loader_client_->LoadingIsFinished());
EXPECT_FALSE(test_response_body_loader_client_->LoadingIsFailed());
}
String DebugName() const override { return "ReadingClient"; }
void Trace(Visitor* visitor) override {
visitor->Trace(bytes_consumer_);
visitor->Trace(test_response_body_loader_client_);
BytesConsumer::Client::Trace(visitor);
}
bool IsOnStateChangeCalled() const { return on_state_change_called_; }
private:
bool on_state_change_called_ = false;
const Member<BytesConsumer> bytes_consumer_;
const Member<TestClient> test_response_body_loader_client_;
};
};
class ResponseBodyLoaderDrainedBytesConsumerNotificationOutOfOnStateChangeTest
: public ResponseBodyLoaderTest {};
class ResponseBodyLoaderDrainedBytesConsumerNotificationInOnStateChangeTest
: public ResponseBodyLoaderTest {};
TEST_F(ResponseBodyLoaderTest, Load) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* consumer = MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
......@@ -407,6 +472,491 @@ TEST_F(ResponseBodyLoaderTest, DrainAsDataPipeAndReportError) {
EXPECT_EQ("xyzabc", client->GetData());
}
TEST_F(ResponseBodyLoaderTest, DrainAsBytesConsumer) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kData, "he"));
original_consumer->Add(Command(Command::kWait));
original_consumer->Add(Command(Command::kData, "l"));
original_consumer->Add(Command(Command::kData, "lo"));
original_consumer->Add(Command(Command::kDone));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
EXPECT_TRUE(body_loader->IsDrained());
EXPECT_NE(&consumer, original_consumer);
auto* reader = MakeGarbageCollected<BytesConsumerTestReader>(&consumer);
auto result = reader->Run(task_runner.get());
EXPECT_EQ(result.first, BytesConsumer::Result::kDone);
EXPECT_EQ(String(result.second.data(), result.second.size()), "hello");
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_TRUE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
EXPECT_EQ("hello", client->GetData());
}
TEST_F(ResponseBodyLoaderTest, CancelDrainedBytesConsumer) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kData, "he"));
original_consumer->Add(Command(Command::kWait));
original_consumer->Add(Command(Command::kData, "llo"));
original_consumer->Add(Command(Command::kDone));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
EXPECT_TRUE(body_loader->IsDrained());
EXPECT_NE(&consumer, original_consumer);
consumer.Cancel();
auto* reader = MakeGarbageCollected<BytesConsumerTestReader>(&consumer);
auto result = reader->Run(task_runner.get());
EXPECT_EQ(result.first, BytesConsumer::Result::kDone);
EXPECT_EQ(String(result.second.data(), result.second.size()), String());
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
task_runner->RunUntilIdle();
EXPECT_TRUE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderTest, DrainAsBytesConsumerWithError) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kData, "he"));
original_consumer->Add(Command(Command::kWait));
original_consumer->Add(Command(Command::kData, "llo"));
original_consumer->Add(Command(Command::kError));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
EXPECT_TRUE(body_loader->IsDrained());
EXPECT_NE(&consumer, original_consumer);
auto* reader = MakeGarbageCollected<BytesConsumerTestReader>(&consumer);
auto result = reader->Run(task_runner.get());
EXPECT_EQ(result.first, BytesConsumer::Result::kError);
EXPECT_EQ(String(result.second.data(), result.second.size()), "hello");
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_TRUE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderTest, AbortAfterBytesConsumerIsDrained) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kData, "he"));
original_consumer->Add(Command(Command::kWait));
original_consumer->Add(Command(Command::kData, "llo"));
original_consumer->Add(Command(Command::kDone));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
auto* bytes_consumer_client =
MakeGarbageCollected<ReadingClient>(consumer, *client);
consumer.SetClient(bytes_consumer_client);
EXPECT_TRUE(body_loader->IsDrained());
EXPECT_NE(&consumer, original_consumer);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer.GetPublicState());
EXPECT_FALSE(bytes_consumer_client->IsOnStateChangeCalled());
body_loader->Abort();
EXPECT_EQ(PublicState::kErrored, consumer.GetPublicState());
EXPECT_TRUE(bytes_consumer_client->IsOnStateChangeCalled());
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderTest, AbortAfterBytesConsumerIsDrainedIsNotified) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
EXPECT_TRUE(body_loader->IsDrained());
EXPECT_NE(&consumer, original_consumer);
EXPECT_EQ(PublicState::kReadableOrWaiting, consumer.GetPublicState());
body_loader->Abort();
EXPECT_EQ(PublicState::kErrored, consumer.GetPublicState());
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderDrainedBytesConsumerNotificationOutOfOnStateChangeTest,
BeginReadAndDone) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kWait));
original_consumer->Add(Command(Command::kDataAndDone, "hello"));
original_consumer->Add(Command(Command::kDone));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
const char* buffer = nullptr;
size_t available = 0;
Result result = consumer.BeginRead(&buffer, &available);
EXPECT_EQ(result, Result::kShouldWait);
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
result = consumer.BeginRead(&buffer, &available);
EXPECT_EQ(result, Result::kOk);
ASSERT_EQ(available, 5u);
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
result = consumer.EndRead(available);
EXPECT_EQ(result, Result::kDone);
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_TRUE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderDrainedBytesConsumerNotificationOutOfOnStateChangeTest,
BeginReadAndError) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kWait));
original_consumer->Add(Command(Command::kData, "hello"));
original_consumer->Add(Command(Command::kError));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
const char* buffer = nullptr;
size_t available = 0;
Result result = consumer.BeginRead(&buffer, &available);
EXPECT_EQ(result, Result::kShouldWait);
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
result = consumer.BeginRead(&buffer, &available);
EXPECT_EQ(result, Result::kOk);
ASSERT_EQ(available, 5u);
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
result = consumer.EndRead(available);
EXPECT_EQ(result, Result::kOk);
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
result = consumer.BeginRead(&buffer, &available);
EXPECT_EQ(result, Result::kError);
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_TRUE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderDrainedBytesConsumerNotificationOutOfOnStateChangeTest,
EndReadAndDone) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kWait));
original_consumer->Add(Command(Command::kDataAndDone, "hello"));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
const char* buffer = nullptr;
size_t available = 0;
Result result = consumer.BeginRead(&buffer, &available);
EXPECT_EQ(result, Result::kShouldWait);
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
result = consumer.BeginRead(&buffer, &available);
EXPECT_EQ(result, Result::kOk);
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
ASSERT_EQ(5u, available);
EXPECT_EQ(String(buffer, available), "hello");
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
result = consumer.EndRead(available);
EXPECT_EQ(result, Result::kDone);
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_TRUE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderDrainedBytesConsumerNotificationOutOfOnStateChangeTest,
DrainAsDataPipe) {
mojo::ScopedDataPipeConsumerHandle consumer_end;
mojo::ScopedDataPipeProducerHandle producer_end;
auto result = mojo::CreateDataPipe(nullptr, &producer_end, &consumer_end);
ASSERT_EQ(result, MOJO_RESULT_OK);
DataPipeBytesConsumer::CompletionNotifier* completion_notifier = nullptr;
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer = MakeGarbageCollected<DataPipeBytesConsumer>(
task_runner, std::move(consumer_end), &completion_notifier);
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
EXPECT_TRUE(consumer.DrainAsDataPipe());
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_TRUE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderDrainedBytesConsumerNotificationOutOfOnStateChangeTest,
Cancel) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kWait));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
task_runner->RunUntilIdle();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
consumer.Cancel();
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
task_runner->RunUntilIdle();
EXPECT_TRUE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderDrainedBytesConsumerNotificationInOnStateChangeTest,
BeginReadAndDone) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kWait));
original_consumer->Add(Command(Command::kDone));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
auto* reading_client = MakeGarbageCollected<ReadingClient>(consumer, *client);
consumer.SetClient(reading_client);
const char* buffer = nullptr;
size_t available = 0;
// This BeginRead posts a task which calls OnStateChange.
Result result = consumer.BeginRead(&buffer, &available);
EXPECT_EQ(result, Result::kShouldWait);
// We'll see the change without waiting for another task.
task_runner->PostTask(FROM_HERE,
base::BindOnce(
[](TestClient* client) {
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_TRUE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
},
WrapPersistent(client)));
task_runner->RunUntilIdle();
EXPECT_TRUE(reading_client->IsOnStateChangeCalled());
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_TRUE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderDrainedBytesConsumerNotificationInOnStateChangeTest,
BeginReadAndError) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kWait));
original_consumer->Add(Command(Command::kError));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
auto* reading_client = MakeGarbageCollected<ReadingClient>(consumer, *client);
consumer.SetClient(reading_client);
const char* buffer = nullptr;
size_t available = 0;
// This BeginRead posts a task which calls OnStateChange.
Result result = consumer.BeginRead(&buffer, &available);
EXPECT_EQ(result, Result::kShouldWait);
// We'll see the change without waiting for another task.
task_runner->PostTask(FROM_HERE,
base::BindOnce(
[](TestClient* client) {
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_TRUE(client->LoadingIsFailed());
},
WrapPersistent(client)));
task_runner->RunUntilIdle();
EXPECT_TRUE(reading_client->IsOnStateChangeCalled());
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_TRUE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderDrainedBytesConsumerNotificationInOnStateChangeTest,
EndReadAndDone) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* original_consumer =
MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
original_consumer->Add(Command(Command::kWait));
original_consumer->Add(Command(Command::kDataAndDone, "hahaha"));
auto* client = MakeGarbageCollected<TestClient>();
auto* body_loader = MakeGarbageCollected<ResponseBodyLoader>(
*original_consumer, *client, task_runner);
BytesConsumer& consumer = body_loader->DrainAsBytesConsumer();
auto* reading_client = MakeGarbageCollected<ReadingClient>(consumer, *client);
consumer.SetClient(reading_client);
const char* buffer = nullptr;
size_t available = 0;
// This BeginRead posts a task which calls OnStateChange.
Result result = consumer.BeginRead(&buffer, &available);
EXPECT_EQ(result, Result::kShouldWait);
// We'll see the change without waiting for another task.
task_runner->PostTask(FROM_HERE,
base::BindOnce(
[](TestClient* client) {
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_TRUE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
},
WrapPersistent(client)));
task_runner->RunUntilIdle();
EXPECT_TRUE(reading_client->IsOnStateChangeCalled());
EXPECT_FALSE(client->LoadingIsCancelled());
EXPECT_TRUE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
}
} // namespace
} // namespace blink
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment