Commit 9aebf3ac authored by Ben Kelly's avatar Ben Kelly Committed by Commit Bot

Fetch: Make blob() stream data directly to BlobRegistry.

This CL refactors the FetchDataLoader to use a mojo::DataPipe to stream
data directly to the BlobRegistry when loading to a blob.  Previously
it would accumulate the entire data set in memory before creating the
blob which would then send it down to the registry.  Streaming the data
is more memory efficient for very large blobs.  For example, with this
CL we will be able to stream data directly from a network response to
a file managed by the BlobRegistry.

Bug: 1127008
Change-Id: I9748690028ac67948b49a98f2e721c4ac4c88c19
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2404562
Commit-Queue: Ben Kelly <wanderview@chromium.org>
Reviewed-by: default avatarMarijn Kruisselbrink <mek@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Cr-Commit-Position: refs/heads/master@{#807998}
parent 1d5f11f6
include_rules = [ include_rules = [
"+base/run_loop.h",
"+gin/public", "+gin/public",
"+mojo/public/cpp/system/data_pipe.h", "+mojo/public/cpp/system/data_pipe.h",
"+mojo/public/cpp/system/data_pipe_utils.h", "+mojo/public/cpp/system/data_pipe_utils.h",
......
...@@ -204,8 +204,10 @@ ScriptPromise Body::blob(ScriptState* script_state, ...@@ -204,8 +204,10 @@ ScriptPromise Body::blob(ScriptState* script_state,
auto* resolver = MakeGarbageCollected<ScriptPromiseResolver>(script_state); auto* resolver = MakeGarbageCollected<ScriptPromiseResolver>(script_state);
ScriptPromise promise = resolver->Promise(); ScriptPromise promise = resolver->Promise();
if (BodyBuffer()) { if (BodyBuffer()) {
ExecutionContext* context = ExecutionContext::From(script_state);
BodyBuffer()->StartLoading( BodyBuffer()->StartLoading(
FetchDataLoader::CreateLoaderAsBlobHandle(MimeType()), FetchDataLoader::CreateLoaderAsBlobHandle(
MimeType(), context->GetTaskRunner(TaskType::kNetworking)),
MakeGarbageCollected<BodyBlobConsumer>(resolver), exception_state); MakeGarbageCollected<BodyBlobConsumer>(resolver), exception_state);
if (exception_state.HadException()) { if (exception_state.HadException()) {
// Need to resolve the ScriptPromiseResolver to avoid a DCHECK(). // Need to resolve the ScriptPromiseResolver to avoid a DCHECK().
......
...@@ -23,12 +23,14 @@ ...@@ -23,12 +23,14 @@
#include "third_party/blink/renderer/platform/blob/blob_data.h" #include "third_party/blink/renderer/platform/blob/blob_data.h"
#include "third_party/blink/renderer/platform/blob/blob_url.h" #include "third_party/blink/renderer/platform/blob/blob_url.h"
#include "third_party/blink/renderer/platform/blob/testing/fake_blob.h" #include "third_party/blink/renderer/platform/blob/testing/fake_blob.h"
#include "third_party/blink/renderer/platform/blob/testing/fake_blob_registry.h"
#include "third_party/blink/renderer/platform/heap/garbage_collected.h" #include "third_party/blink/renderer/platform/heap/garbage_collected.h"
#include "third_party/blink/renderer/platform/heap/heap.h" #include "third_party/blink/renderer/platform/heap/heap.h"
#include "third_party/blink/renderer/platform/loader/fetch/bytes_consumer.h" #include "third_party/blink/renderer/platform/loader/fetch/bytes_consumer.h"
#include "third_party/blink/renderer/platform/loader/fetch/text_resource_decoder_options.h" #include "third_party/blink/renderer/platform/loader/fetch/text_resource_decoder_options.h"
#include "third_party/blink/renderer/platform/loader/testing/replaying_bytes_consumer.h" #include "third_party/blink/renderer/platform/loader/testing/replaying_bytes_consumer.h"
#include "third_party/blink/renderer/platform/network/encoded_form_data.h" #include "third_party/blink/renderer/platform/network/encoded_form_data.h"
#include "third_party/blink/renderer/platform/scheduler/test/fake_task_runner.h"
#include "third_party/blink/renderer/platform/testing/unit_test_helpers.h" #include "third_party/blink/renderer/platform/testing/unit_test_helpers.h"
namespace blink { namespace blink {
...@@ -417,7 +419,30 @@ TEST_F(BodyStreamBufferTest, LoadBodyStreamBufferAsArrayBuffer) { ...@@ -417,7 +419,30 @@ TEST_F(BodyStreamBufferTest, LoadBodyStreamBufferAsArrayBuffer) {
array_buffer->ByteLengthAsSizeT())); array_buffer->ByteLengthAsSizeT()));
} }
TEST_F(BodyStreamBufferTest, LoadBodyStreamBufferAsBlob) { class BodyStreamBufferBlobTest : public BodyStreamBufferTest {
public:
BodyStreamBufferBlobTest()
: fake_task_runner_(base::MakeRefCounted<scheduler::FakeTaskRunner>()),
blob_registry_receiver_(
&fake_blob_registry_,
blob_registry_remote_.BindNewPipeAndPassReceiver()) {
BlobDataHandle::SetBlobRegistryForTesting(blob_registry_remote_.get());
}
~BodyStreamBufferBlobTest() override {
BlobDataHandle::SetBlobRegistryForTesting(nullptr);
}
protected:
scoped_refptr<scheduler::FakeTaskRunner> fake_task_runner_;
private:
FakeBlobRegistry fake_blob_registry_;
mojo::Remote<mojom::blink::BlobRegistry> blob_registry_remote_;
mojo::Receiver<mojom::blink::BlobRegistry> blob_registry_receiver_;
};
TEST_F(BodyStreamBufferBlobTest, LoadBodyStreamBufferAsBlob) {
V8TestingScope scope; V8TestingScope scope;
Checkpoint checkpoint; Checkpoint checkpoint;
auto* client = MakeGarbageCollected<MockFetchDataLoaderClient>(); auto* client = MakeGarbageCollected<MockFetchDataLoaderClient>();
...@@ -439,7 +464,8 @@ TEST_F(BodyStreamBufferTest, LoadBodyStreamBufferAsBlob) { ...@@ -439,7 +464,8 @@ TEST_F(BodyStreamBufferTest, LoadBodyStreamBufferAsBlob) {
BodyStreamBuffer::Create(scope.GetScriptState(), src, BodyStreamBuffer::Create(scope.GetScriptState(), src,
/* abort_signal = */ nullptr, side_data_blob); /* abort_signal = */ nullptr, side_data_blob);
EXPECT_EQ(side_data_blob, buffer->GetSideDataBlobForTest()); EXPECT_EQ(side_data_blob, buffer->GetSideDataBlobForTest());
buffer->StartLoading(FetchDataLoader::CreateLoaderAsBlobHandle("text/plain"), buffer->StartLoading(FetchDataLoader::CreateLoaderAsBlobHandle(
"text/plain", fake_task_runner_),
client, ASSERT_NO_EXCEPTION); client, ASSERT_NO_EXCEPTION);
EXPECT_EQ(nullptr, buffer->GetSideDataBlobForTest()); EXPECT_EQ(nullptr, buffer->GetSideDataBlobForTest());
...@@ -448,6 +474,7 @@ TEST_F(BodyStreamBufferTest, LoadBodyStreamBufferAsBlob) { ...@@ -448,6 +474,7 @@ TEST_F(BodyStreamBufferTest, LoadBodyStreamBufferAsBlob) {
EXPECT_TRUE(buffer->HasPendingActivity()); EXPECT_TRUE(buffer->HasPendingActivity());
checkpoint.Call(1); checkpoint.Call(1);
fake_task_runner_->RunUntilIdle();
test::RunPendingTasks(); test::RunPendingTasks();
checkpoint.Call(2); checkpoint.Call(2);
......
...@@ -27,6 +27,8 @@ BytesConsumerTestUtil::MockBytesConsumer::MockBytesConsumer() { ...@@ -27,6 +27,8 @@ BytesConsumerTestUtil::MockBytesConsumer::MockBytesConsumer() {
ON_CALL(*this, GetPublicState()).WillByDefault(Return(PublicState::kErrored)); ON_CALL(*this, GetPublicState()).WillByDefault(Return(PublicState::kErrored));
ON_CALL(*this, DrainAsBlobDataHandle(_)) ON_CALL(*this, DrainAsBlobDataHandle(_))
.WillByDefault(Return(ByMove(nullptr))); .WillByDefault(Return(ByMove(nullptr)));
ON_CALL(*this, DrainAsDataPipe())
.WillByDefault(Return(ByMove(mojo::ScopedDataPipeConsumerHandle())));
ON_CALL(*this, DrainAsFormData()).WillByDefault(Return(ByMove(nullptr))); ON_CALL(*this, DrainAsFormData()).WillByDefault(Return(ByMove(nullptr)));
} }
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#ifndef THIRD_PARTY_BLINK_RENDERER_CORE_FETCH_BYTES_CONSUMER_TEST_UTIL_H_ #ifndef THIRD_PARTY_BLINK_RENDERER_CORE_FETCH_BYTES_CONSUMER_TEST_UTIL_H_
#define THIRD_PARTY_BLINK_RENDERER_CORE_FETCH_BYTES_CONSUMER_TEST_UTIL_H_ #define THIRD_PARTY_BLINK_RENDERER_CORE_FETCH_BYTES_CONSUMER_TEST_UTIL_H_
#include "mojo/public/cpp/system/data_pipe.h"
#include "testing/gmock/include/gmock/gmock.h" #include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h" #include "testing/gtest/include/gtest/gtest.h"
#include "third_party/blink/renderer/core/fetch/fetch_data_loader.h" #include "third_party/blink/renderer/core/fetch/fetch_data_loader.h"
...@@ -24,6 +25,7 @@ class BytesConsumerTestUtil { ...@@ -24,6 +25,7 @@ class BytesConsumerTestUtil {
MOCK_METHOD1(EndRead, Result(size_t)); MOCK_METHOD1(EndRead, Result(size_t));
MOCK_METHOD1(DrainAsBlobDataHandle, MOCK_METHOD1(DrainAsBlobDataHandle,
scoped_refptr<BlobDataHandle>(BlobSizePolicy)); scoped_refptr<BlobDataHandle>(BlobSizePolicy));
MOCK_METHOD0(DrainAsDataPipe, mojo::ScopedDataPipeConsumerHandle());
MOCK_METHOD0(DrainAsFormData, scoped_refptr<EncodedFormData>()); MOCK_METHOD0(DrainAsFormData, scoped_refptr<EncodedFormData>());
MOCK_METHOD1(SetClient, void(Client*)); MOCK_METHOD1(SetClient, void(Client*));
MOCK_METHOD0(ClearClient, void()); MOCK_METHOD0(ClearClient, void());
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "base/optional.h" #include "base/optional.h"
#include "mojo/public/cpp/system/simple_watcher.h" #include "mojo/public/cpp/system/simple_watcher.h"
#include "third_party/blink/public/mojom/blob/blob_registry.mojom-blink.h"
#include "third_party/blink/renderer/core/fetch/multipart_parser.h" #include "third_party/blink/renderer/core/fetch/multipart_parser.h"
#include "third_party/blink/renderer/core/fileapi/file.h" #include "third_party/blink/renderer/core/fileapi/file.h"
#include "third_party/blink/renderer/core/html/forms/form_data.h" #include "third_party/blink/renderer/core/html/forms/form_data.h"
...@@ -31,10 +32,12 @@ namespace blink { ...@@ -31,10 +32,12 @@ namespace blink {
namespace { namespace {
class FetchDataLoaderAsBlobHandle final : public FetchDataLoader, class FetchDataLoaderAsBlobHandle final : public FetchDataLoader,
public BytesConsumer::Client { public FetchDataLoader::Client {
public: public:
explicit FetchDataLoaderAsBlobHandle(const String& mime_type) FetchDataLoaderAsBlobHandle(
: mime_type_(mime_type) {} const String& mime_type,
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: mime_type_(mime_type), task_runner_(std::move(task_runner)) {}
void Start(BytesConsumer* consumer, void Start(BytesConsumer* consumer,
FetchDataLoader::Client* client) override { FetchDataLoader::Client* client) override {
...@@ -62,59 +65,60 @@ class FetchDataLoaderAsBlobHandle final : public FetchDataLoader, ...@@ -62,59 +65,60 @@ class FetchDataLoaderAsBlobHandle final : public FetchDataLoader,
return; return;
} }
blob_data_ = std::make_unique<BlobData>(); data_pipe_loader_ = CreateLoaderAsDataPipe(task_runner_);
blob_data_->SetContentType(mime_type_); data_pipe_loader_->Start(consumer_, this);
consumer_->SetClient(this);
OnStateChange();
} }
void Cancel() override { consumer_->Cancel(); } void Cancel() override { consumer_->Cancel(); }
void OnStateChange() override { void DidFetchDataStartedDataPipe(
while (true) { mojo::ScopedDataPipeConsumerHandle handle) override {
const char* buffer; DCHECK(BlobDataHandle::GetBlobRegistry());
size_t available; BlobDataHandle::GetBlobRegistry()->RegisterFromStream(
auto result = consumer_->BeginRead(&buffer, &available); mime_type_ ? mime_type_ : "", /*content_disposition=*/"",
if (result == BytesConsumer::Result::kShouldWait) /*length_hint=*/0, std::move(handle),
return; mojo::PendingAssociatedRemote<mojom::blink::ProgressClient>(),
if (result == BytesConsumer::Result::kOk) { WTF::Bind(&FetchDataLoaderAsBlobHandle::FinishedCreatingFromDataPipe,
blob_data_->AppendBytes(buffer, available); WrapWeakPersistent(this)));
result = consumer_->EndRead(available);
}
switch (result) {
case BytesConsumer::Result::kOk:
break;
case BytesConsumer::Result::kShouldWait:
NOTREACHED();
return;
case BytesConsumer::Result::kDone: {
auto size = blob_data_->length();
client_->DidFetchDataLoadedBlobHandle(
BlobDataHandle::Create(std::move(blob_data_), size));
return;
}
case BytesConsumer::Result::kError:
client_->DidFetchDataLoadFailed();
return;
}
}
} }
String DebugName() const override { return "FetchDataLoaderAsBlobHandle"; } void DidFetchDataLoadedDataPipe() override {
DCHECK(!load_complete_);
load_complete_ = true;
if (blob_handle_)
client_->DidFetchDataLoadedBlobHandle(std::move(blob_handle_));
}
void DidFetchDataLoadFailed() override { client_->DidFetchDataLoadFailed(); }
void Abort() override { client_->Abort(); }
void Trace(Visitor* visitor) const override { void Trace(Visitor* visitor) const override {
visitor->Trace(consumer_); visitor->Trace(consumer_);
visitor->Trace(client_); visitor->Trace(client_);
visitor->Trace(data_pipe_loader_);
FetchDataLoader::Trace(visitor); FetchDataLoader::Trace(visitor);
BytesConsumer::Client::Trace(visitor); FetchDataLoader::Client::Trace(visitor);
} }
private: private:
void FinishedCreatingFromDataPipe(
const scoped_refptr<BlobDataHandle>& blob_handle) {
if (!load_complete_) {
blob_handle_ = std::move(blob_handle);
return;
}
client_->DidFetchDataLoadedBlobHandle(blob_handle);
}
Member<BytesConsumer> consumer_; Member<BytesConsumer> consumer_;
Member<FetchDataLoader::Client> client_; Member<FetchDataLoader::Client> client_;
Member<FetchDataLoader> data_pipe_loader_;
String mime_type_; const String mime_type_;
std::unique_ptr<BlobData> blob_data_; const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
scoped_refptr<BlobDataHandle> blob_handle_;
bool load_complete_ = false;
}; };
class FetchDataLoaderAsArrayBuffer final : public FetchDataLoader, class FetchDataLoaderAsArrayBuffer final : public FetchDataLoader,
...@@ -546,8 +550,8 @@ class FetchDataLoaderAsDataPipe final : public FetchDataLoader, ...@@ -546,8 +550,8 @@ class FetchDataLoaderAsDataPipe final : public FetchDataLoader,
options.struct_size = sizeof(MojoCreateDataPipeOptions); options.struct_size = sizeof(MojoCreateDataPipeOptions);
options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE; options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE;
options.element_num_bytes = 1; options.element_num_bytes = 1;
// Use the default pipe capacity since we don't know the total // Use the default pipe capacity since we don't know the total data
// data size to target. // size to target.
options.capacity_num_bytes = 0; options.capacity_num_bytes = 0;
MojoResult rv = MojoResult rv =
...@@ -659,8 +663,10 @@ class FetchDataLoaderAsDataPipe final : public FetchDataLoader, ...@@ -659,8 +663,10 @@ class FetchDataLoaderAsDataPipe final : public FetchDataLoader,
} // namespace } // namespace
FetchDataLoader* FetchDataLoader::CreateLoaderAsBlobHandle( FetchDataLoader* FetchDataLoader::CreateLoaderAsBlobHandle(
const String& mime_type) { const String& mime_type,
return MakeGarbageCollected<FetchDataLoaderAsBlobHandle>(mime_type); scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
return MakeGarbageCollected<FetchDataLoaderAsBlobHandle>(
mime_type, std::move(task_runner));
} }
FetchDataLoader* FetchDataLoader::CreateLoaderAsArrayBuffer() { FetchDataLoader* FetchDataLoader::CreateLoaderAsArrayBuffer() {
......
...@@ -69,7 +69,11 @@ class CORE_EXPORT FetchDataLoader : public GarbageCollected<FetchDataLoader> { ...@@ -69,7 +69,11 @@ class CORE_EXPORT FetchDataLoader : public GarbageCollected<FetchDataLoader> {
void Trace(Visitor* visitor) const override {} void Trace(Visitor* visitor) const override {}
}; };
static FetchDataLoader* CreateLoaderAsBlobHandle(const String& mime_type); // The task runner is used to post tasks necessary for creating a blob
// from certain kinds of consumers.
static FetchDataLoader* CreateLoaderAsBlobHandle(
const String& mime_type,
scoped_refptr<base::SingleThreadTaskRunner>);
static FetchDataLoader* CreateLoaderAsArrayBuffer(); static FetchDataLoader* CreateLoaderAsArrayBuffer();
static FetchDataLoader* CreateLoaderAsFailure(); static FetchDataLoader* CreateLoaderAsFailure();
static FetchDataLoader* CreateLoaderAsFormData( static FetchDataLoader* CreateLoaderAsFormData(
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <memory> #include <memory>
#include "base/run_loop.h"
#include "base/stl_util.h" #include "base/stl_util.h"
#include "mojo/public/cpp/system/data_pipe_utils.h" #include "mojo/public/cpp/system/data_pipe_utils.h"
#include "testing/gmock/include/gmock/gmock.h" #include "testing/gmock/include/gmock/gmock.h"
...@@ -13,6 +14,7 @@ ...@@ -13,6 +14,7 @@
#include "third_party/blink/renderer/core/fetch/bytes_consumer_test_util.h" #include "third_party/blink/renderer/core/fetch/bytes_consumer_test_util.h"
#include "third_party/blink/renderer/core/fileapi/blob.h" #include "third_party/blink/renderer/core/fileapi/blob.h"
#include "third_party/blink/renderer/core/html/forms/form_data.h" #include "third_party/blink/renderer/core/html/forms/form_data.h"
#include "third_party/blink/renderer/platform/blob/testing/fake_blob_registry.h"
#include "third_party/blink/renderer/platform/loader/fetch/data_pipe_bytes_consumer.h" #include "third_party/blink/renderer/platform/loader/fetch/data_pipe_bytes_consumer.h"
#include "third_party/blink/renderer/platform/loader/fetch/text_resource_decoder_options.h" #include "third_party/blink/renderer/platform/loader/fetch/text_resource_decoder_options.h"
#include "third_party/blink/renderer/platform/loader/testing/bytes_consumer_test_reader.h" #include "third_party/blink/renderer/platform/loader/testing/bytes_consumer_test_reader.h"
...@@ -105,17 +107,46 @@ class FetchDataLoaderTest : public testing::Test { ...@@ -105,17 +107,46 @@ class FetchDataLoaderTest : public testing::Test {
}; };
}; };
TEST_F(FetchDataLoaderTest, LoadAsBlob) { class FetchDataLoaderBlobTest : public FetchDataLoaderTest {
public:
FetchDataLoaderBlobTest()
: fake_task_runner_(base::MakeRefCounted<scheduler::FakeTaskRunner>()),
blob_registry_receiver_(
&fake_blob_registry_,
blob_registry_remote_.BindNewPipeAndPassReceiver()) {
BlobDataHandle::SetBlobRegistryForTesting(blob_registry_remote_.get());
}
~FetchDataLoaderBlobTest() override {
BlobDataHandle::SetBlobRegistryForTesting(nullptr);
}
protected:
scoped_refptr<scheduler::FakeTaskRunner> fake_task_runner_;
private:
FakeBlobRegistry fake_blob_registry_;
mojo::Remote<mojom::blink::BlobRegistry> blob_registry_remote_;
mojo::Receiver<mojom::blink::BlobRegistry> blob_registry_receiver_;
};
ACTION_P(QUITLOOP, loop) {
loop->Quit();
}
TEST_F(FetchDataLoaderBlobTest, LoadAsBlob) {
Checkpoint checkpoint; Checkpoint checkpoint;
BytesConsumer::Client* client = nullptr; BytesConsumer::Client* client = nullptr;
auto* consumer = MakeGarbageCollected<MockBytesConsumer>(); auto* consumer = MakeGarbageCollected<MockBytesConsumer>();
FetchDataLoader* fetch_data_loader = FetchDataLoader* fetch_data_loader =
FetchDataLoader::CreateLoaderAsBlobHandle("text/test"); FetchDataLoader::CreateLoaderAsBlobHandle("text/test", fake_task_runner_);
auto* fetch_data_loader_client = auto* fetch_data_loader_client =
MakeGarbageCollected<MockFetchDataLoaderClient>(); MakeGarbageCollected<MockFetchDataLoaderClient>();
scoped_refptr<BlobDataHandle> blob_data_handle; scoped_refptr<BlobDataHandle> blob_data_handle;
base::RunLoop run_loop;
InSequence s; InSequence s;
EXPECT_CALL(checkpoint, Call(1)); EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(*consumer, EXPECT_CALL(*consumer,
...@@ -123,10 +154,14 @@ TEST_F(FetchDataLoaderTest, LoadAsBlob) { ...@@ -123,10 +154,14 @@ TEST_F(FetchDataLoaderTest, LoadAsBlob) {
BytesConsumer::BlobSizePolicy::kDisallowBlobWithInvalidSize)) BytesConsumer::BlobSizePolicy::kDisallowBlobWithInvalidSize))
.WillOnce(Return(ByMove(nullptr))); .WillOnce(Return(ByMove(nullptr)));
EXPECT_CALL(*consumer, SetClient(_)).WillOnce(SaveArg<0>(&client)); EXPECT_CALL(*consumer, SetClient(_)).WillOnce(SaveArg<0>(&client));
EXPECT_CALL(*consumer, DrainAsDataPipe());
EXPECT_CALL(*consumer, GetPublicState())
.WillOnce(Return(BytesConsumer::PublicState::kReadableOrWaiting));
EXPECT_CALL(checkpoint, Call(2));
EXPECT_CALL(*consumer, BeginRead(_, _)) EXPECT_CALL(*consumer, BeginRead(_, _))
.WillOnce(DoAll(SetArgPointee<0>(nullptr), SetArgPointee<1>(0), .WillOnce(DoAll(SetArgPointee<0>(nullptr), SetArgPointee<1>(0),
Return(Result::kShouldWait))); Return(Result::kShouldWait)));
EXPECT_CALL(checkpoint, Call(2)); EXPECT_CALL(checkpoint, Call(3));
EXPECT_CALL(*consumer, BeginRead(_, _)) EXPECT_CALL(*consumer, BeginRead(_, _))
.WillOnce(DoAll(SetArgPointee<0>(kQuickBrownFox), .WillOnce(DoAll(SetArgPointee<0>(kQuickBrownFox),
SetArgPointee<1>(kQuickBrownFoxLengthWithTerminatingNull), SetArgPointee<1>(kQuickBrownFoxLengthWithTerminatingNull),
...@@ -134,19 +169,20 @@ TEST_F(FetchDataLoaderTest, LoadAsBlob) { ...@@ -134,19 +169,20 @@ TEST_F(FetchDataLoaderTest, LoadAsBlob) {
EXPECT_CALL(*consumer, EndRead(kQuickBrownFoxLengthWithTerminatingNull)) EXPECT_CALL(*consumer, EndRead(kQuickBrownFoxLengthWithTerminatingNull))
.WillOnce(Return(Result::kOk)); .WillOnce(Return(Result::kOk));
EXPECT_CALL(*consumer, BeginRead(_, _)).WillOnce(Return(Result::kDone)); EXPECT_CALL(*consumer, BeginRead(_, _)).WillOnce(Return(Result::kDone));
EXPECT_CALL(*fetch_data_loader_client, DidFetchDataLoadedBlobHandleMock(_))
.WillOnce(SaveArg<0>(&blob_data_handle));
EXPECT_CALL(checkpoint, Call(3));
EXPECT_CALL(*consumer, Cancel()); EXPECT_CALL(*consumer, Cancel());
EXPECT_CALL(*fetch_data_loader_client, DidFetchDataLoadedBlobHandleMock(_))
.WillOnce(DoAll(SaveArg<0>(&blob_data_handle), QUITLOOP(&run_loop)));
EXPECT_CALL(checkpoint, Call(4)); EXPECT_CALL(checkpoint, Call(4));
checkpoint.Call(1); checkpoint.Call(1);
fetch_data_loader->Start(consumer, fetch_data_loader_client); fetch_data_loader->Start(consumer, fetch_data_loader_client);
checkpoint.Call(2); checkpoint.Call(2);
ASSERT_TRUE(client); // Pump the |task_runner| to process the data pipe's task indicating its
client->OnStateChange(); // writable.
fake_task_runner_->RunUntilIdle();
checkpoint.Call(3); checkpoint.Call(3);
fetch_data_loader->Cancel(); client->OnStateChange();
run_loop.Run();
checkpoint.Call(4); checkpoint.Call(4);
ASSERT_TRUE(blob_data_handle); ASSERT_TRUE(blob_data_handle);
...@@ -154,13 +190,13 @@ TEST_F(FetchDataLoaderTest, LoadAsBlob) { ...@@ -154,13 +190,13 @@ TEST_F(FetchDataLoaderTest, LoadAsBlob) {
EXPECT_EQ(String("text/test"), blob_data_handle->GetType()); EXPECT_EQ(String("text/test"), blob_data_handle->GetType());
} }
TEST_F(FetchDataLoaderTest, LoadAsBlobFailed) { TEST_F(FetchDataLoaderBlobTest, LoadAsBlobFailed) {
Checkpoint checkpoint; Checkpoint checkpoint;
BytesConsumer::Client* client = nullptr; BytesConsumer::Client* client = nullptr;
auto* consumer = MakeGarbageCollected<MockBytesConsumer>(); auto* consumer = MakeGarbageCollected<MockBytesConsumer>();
FetchDataLoader* fetch_data_loader = FetchDataLoader* fetch_data_loader =
FetchDataLoader::CreateLoaderAsBlobHandle("text/test"); FetchDataLoader::CreateLoaderAsBlobHandle("text/test", fake_task_runner_);
auto* fetch_data_loader_client = auto* fetch_data_loader_client =
MakeGarbageCollected<MockFetchDataLoaderClient>(); MakeGarbageCollected<MockFetchDataLoaderClient>();
...@@ -171,10 +207,14 @@ TEST_F(FetchDataLoaderTest, LoadAsBlobFailed) { ...@@ -171,10 +207,14 @@ TEST_F(FetchDataLoaderTest, LoadAsBlobFailed) {
BytesConsumer::BlobSizePolicy::kDisallowBlobWithInvalidSize)) BytesConsumer::BlobSizePolicy::kDisallowBlobWithInvalidSize))
.WillOnce(Return(ByMove(nullptr))); .WillOnce(Return(ByMove(nullptr)));
EXPECT_CALL(*consumer, SetClient(_)).WillOnce(SaveArg<0>(&client)); EXPECT_CALL(*consumer, SetClient(_)).WillOnce(SaveArg<0>(&client));
EXPECT_CALL(*consumer, DrainAsDataPipe());
EXPECT_CALL(*consumer, GetPublicState())
.WillOnce(Return(BytesConsumer::PublicState::kReadableOrWaiting));
EXPECT_CALL(checkpoint, Call(2));
EXPECT_CALL(*consumer, BeginRead(_, _)) EXPECT_CALL(*consumer, BeginRead(_, _))
.WillOnce(DoAll(SetArgPointee<0>(nullptr), SetArgPointee<1>(0), .WillOnce(DoAll(SetArgPointee<0>(nullptr), SetArgPointee<1>(0),
Return(Result::kShouldWait))); Return(Result::kShouldWait)));
EXPECT_CALL(checkpoint, Call(2)); EXPECT_CALL(checkpoint, Call(3));
EXPECT_CALL(*consumer, BeginRead(_, _)) EXPECT_CALL(*consumer, BeginRead(_, _))
.WillOnce(DoAll(SetArgPointee<0>(kQuickBrownFox), .WillOnce(DoAll(SetArgPointee<0>(kQuickBrownFox),
SetArgPointee<1>(kQuickBrownFoxLengthWithTerminatingNull), SetArgPointee<1>(kQuickBrownFoxLengthWithTerminatingNull),
...@@ -182,28 +222,29 @@ TEST_F(FetchDataLoaderTest, LoadAsBlobFailed) { ...@@ -182,28 +222,29 @@ TEST_F(FetchDataLoaderTest, LoadAsBlobFailed) {
EXPECT_CALL(*consumer, EndRead(kQuickBrownFoxLengthWithTerminatingNull)) EXPECT_CALL(*consumer, EndRead(kQuickBrownFoxLengthWithTerminatingNull))
.WillOnce(Return(Result::kOk)); .WillOnce(Return(Result::kOk));
EXPECT_CALL(*consumer, BeginRead(_, _)).WillOnce(Return(Result::kError)); EXPECT_CALL(*consumer, BeginRead(_, _)).WillOnce(Return(Result::kError));
EXPECT_CALL(*fetch_data_loader_client, DidFetchDataLoadFailed());
EXPECT_CALL(checkpoint, Call(3));
EXPECT_CALL(*consumer, Cancel()); EXPECT_CALL(*consumer, Cancel());
EXPECT_CALL(*fetch_data_loader_client, DidFetchDataLoadFailed());
EXPECT_CALL(checkpoint, Call(4)); EXPECT_CALL(checkpoint, Call(4));
checkpoint.Call(1); checkpoint.Call(1);
fetch_data_loader->Start(consumer, fetch_data_loader_client); fetch_data_loader->Start(consumer, fetch_data_loader_client);
checkpoint.Call(2); checkpoint.Call(2);
// Pump the |task_runner| to process the data pipe's task indicating its
// writable.
fake_task_runner_->RunUntilIdle();
checkpoint.Call(3);
ASSERT_TRUE(client); ASSERT_TRUE(client);
client->OnStateChange(); client->OnStateChange();
checkpoint.Call(3);
fetch_data_loader->Cancel();
checkpoint.Call(4); checkpoint.Call(4);
} }
TEST_F(FetchDataLoaderTest, LoadAsBlobCancel) { TEST_F(FetchDataLoaderBlobTest, LoadAsBlobCancel) {
Checkpoint checkpoint; Checkpoint checkpoint;
BytesConsumer::Client* client = nullptr; BytesConsumer::Client* client = nullptr;
auto* consumer = MakeGarbageCollected<MockBytesConsumer>(); auto* consumer = MakeGarbageCollected<MockBytesConsumer>();
FetchDataLoader* fetch_data_loader = FetchDataLoader* fetch_data_loader =
FetchDataLoader::CreateLoaderAsBlobHandle("text/test"); FetchDataLoader::CreateLoaderAsBlobHandle("text/test", fake_task_runner_);
auto* fetch_data_loader_client = auto* fetch_data_loader_client =
MakeGarbageCollected<MockFetchDataLoaderClient>(); MakeGarbageCollected<MockFetchDataLoaderClient>();
...@@ -214,21 +255,29 @@ TEST_F(FetchDataLoaderTest, LoadAsBlobCancel) { ...@@ -214,21 +255,29 @@ TEST_F(FetchDataLoaderTest, LoadAsBlobCancel) {
BytesConsumer::BlobSizePolicy::kDisallowBlobWithInvalidSize)) BytesConsumer::BlobSizePolicy::kDisallowBlobWithInvalidSize))
.WillOnce(Return(ByMove(nullptr))); .WillOnce(Return(ByMove(nullptr)));
EXPECT_CALL(*consumer, SetClient(_)).WillOnce(SaveArg<0>(&client)); EXPECT_CALL(*consumer, SetClient(_)).WillOnce(SaveArg<0>(&client));
EXPECT_CALL(*consumer, DrainAsDataPipe());
EXPECT_CALL(*consumer, GetPublicState())
.WillOnce(Return(BytesConsumer::PublicState::kReadableOrWaiting));
EXPECT_CALL(checkpoint, Call(2));
EXPECT_CALL(*consumer, BeginRead(_, _)) EXPECT_CALL(*consumer, BeginRead(_, _))
.WillOnce(DoAll(SetArgPointee<0>(nullptr), SetArgPointee<1>(0), .WillOnce(DoAll(SetArgPointee<0>(nullptr), SetArgPointee<1>(0),
Return(Result::kShouldWait))); Return(Result::kShouldWait)));
EXPECT_CALL(checkpoint, Call(2));
EXPECT_CALL(*consumer, Cancel());
EXPECT_CALL(checkpoint, Call(3)); EXPECT_CALL(checkpoint, Call(3));
EXPECT_CALL(*consumer, Cancel());
EXPECT_CALL(checkpoint, Call(4));
checkpoint.Call(1); checkpoint.Call(1);
fetch_data_loader->Start(consumer, fetch_data_loader_client); fetch_data_loader->Start(consumer, fetch_data_loader_client);
checkpoint.Call(2); checkpoint.Call(2);
fetch_data_loader->Cancel(); // Pump the |task_runner| to process the data pipe's task indicating its
// writable.
fake_task_runner_->RunUntilIdle();
checkpoint.Call(3); checkpoint.Call(3);
fetch_data_loader->Cancel();
checkpoint.Call(4);
} }
TEST_F(FetchDataLoaderTest, TEST_F(FetchDataLoaderBlobTest,
LoadAsBlobViaDrainAsBlobDataHandleWithSameContentType) { LoadAsBlobViaDrainAsBlobDataHandleWithSameContentType) {
auto blob_data = std::make_unique<BlobData>(); auto blob_data = std::make_unique<BlobData>();
blob_data->AppendBytes(kQuickBrownFox, blob_data->AppendBytes(kQuickBrownFox,
...@@ -241,7 +290,7 @@ TEST_F(FetchDataLoaderTest, ...@@ -241,7 +290,7 @@ TEST_F(FetchDataLoaderTest,
auto* consumer = MakeGarbageCollected<MockBytesConsumer>(); auto* consumer = MakeGarbageCollected<MockBytesConsumer>();
FetchDataLoader* fetch_data_loader = FetchDataLoader* fetch_data_loader =
FetchDataLoader::CreateLoaderAsBlobHandle("text/test"); FetchDataLoader::CreateLoaderAsBlobHandle("text/test", fake_task_runner_);
auto* fetch_data_loader_client = auto* fetch_data_loader_client =
MakeGarbageCollected<MockFetchDataLoaderClient>(); MakeGarbageCollected<MockFetchDataLoaderClient>();
scoped_refptr<BlobDataHandle> blob_data_handle; scoped_refptr<BlobDataHandle> blob_data_handle;
...@@ -270,7 +319,7 @@ TEST_F(FetchDataLoaderTest, ...@@ -270,7 +319,7 @@ TEST_F(FetchDataLoaderTest,
EXPECT_EQ(String("text/test"), blob_data_handle->GetType()); EXPECT_EQ(String("text/test"), blob_data_handle->GetType());
} }
TEST_F(FetchDataLoaderTest, TEST_F(FetchDataLoaderBlobTest,
LoadAsBlobViaDrainAsBlobDataHandleWithDifferentContentType) { LoadAsBlobViaDrainAsBlobDataHandleWithDifferentContentType) {
auto blob_data = std::make_unique<BlobData>(); auto blob_data = std::make_unique<BlobData>();
blob_data->AppendBytes(kQuickBrownFox, blob_data->AppendBytes(kQuickBrownFox,
...@@ -283,7 +332,7 @@ TEST_F(FetchDataLoaderTest, ...@@ -283,7 +332,7 @@ TEST_F(FetchDataLoaderTest,
auto* consumer = MakeGarbageCollected<MockBytesConsumer>(); auto* consumer = MakeGarbageCollected<MockBytesConsumer>();
FetchDataLoader* fetch_data_loader = FetchDataLoader* fetch_data_loader =
FetchDataLoader::CreateLoaderAsBlobHandle("text/test"); FetchDataLoader::CreateLoaderAsBlobHandle("text/test", fake_task_runner_);
auto* fetch_data_loader_client = auto* fetch_data_loader_client =
MakeGarbageCollected<MockFetchDataLoaderClient>(); MakeGarbageCollected<MockFetchDataLoaderClient>();
scoped_refptr<BlobDataHandle> blob_data_handle; scoped_refptr<BlobDataHandle> blob_data_handle;
......
...@@ -1007,10 +1007,12 @@ ScriptPromise Cache::PutImpl(ScriptState* script_state, ...@@ -1007,10 +1007,12 @@ ScriptPromise Cache::PutImpl(ScriptState* script_state,
} }
if (buffer) { if (buffer) {
ExecutionContext* context = ExecutionContext::From(script_state);
// If the response has body, read the all data and create // If the response has body, read the all data and create
// the blob handle and dispatch the put batch asynchronously. // the blob handle and dispatch the put batch asynchronously.
FetchDataLoader* loader = FetchDataLoader::CreateLoaderAsBlobHandle( FetchDataLoader* loader = FetchDataLoader::CreateLoaderAsBlobHandle(
responses[i]->InternalMIMEType()); responses[i]->InternalMIMEType(),
context->GetTaskRunner(TaskType::kNetworking));
buffer->StartLoading(loader, buffer->StartLoading(loader,
MakeGarbageCollected<BlobHandleCallbackForPut>( MakeGarbageCollected<BlobHandleCallbackForPut>(
i, barrier_callback, requests[i], responses[i]), i, barrier_callback, requests[i], responses[i]),
......
...@@ -10,6 +10,37 @@ ...@@ -10,6 +10,37 @@
namespace blink { namespace blink {
class FakeBlobRegistry::DataPipeDrainerClient
: public mojo::DataPipeDrainer::Client {
public:
DataPipeDrainerClient(const String& uuid,
const String& content_type,
RegisterFromStreamCallback callback)
: uuid_(uuid),
content_type_(content_type),
callback_(std::move(callback)) {}
void OnDataAvailable(const void* data, size_t num_bytes) override {
length_ += num_bytes;
}
void OnDataComplete() override {
mojo::Remote<mojom::blink::Blob> blob;
mojo::MakeSelfOwnedReceiver(std::make_unique<FakeBlob>(uuid_),
blob.BindNewPipeAndPassReceiver());
auto handle =
BlobDataHandle::Create(uuid_, content_type_, length_, blob.Unbind());
std::move(callback_).Run(std::move(handle));
}
private:
const String uuid_;
const String content_type_;
RegisterFromStreamCallback callback_;
uint64_t length_ = 0;
};
FakeBlobRegistry::FakeBlobRegistry() = default;
FakeBlobRegistry::~FakeBlobRegistry() = default;
void FakeBlobRegistry::Register(mojo::PendingReceiver<mojom::blink::Blob> blob, void FakeBlobRegistry::Register(mojo::PendingReceiver<mojom::blink::Blob> blob,
const String& uuid, const String& uuid,
const String& content_type, const String& content_type,
...@@ -29,8 +60,13 @@ void FakeBlobRegistry::RegisterFromStream( ...@@ -29,8 +60,13 @@ void FakeBlobRegistry::RegisterFromStream(
uint64_t expected_length, uint64_t expected_length,
mojo::ScopedDataPipeConsumerHandle data, mojo::ScopedDataPipeConsumerHandle data,
mojo::PendingAssociatedRemote<mojom::blink::ProgressClient>, mojo::PendingAssociatedRemote<mojom::blink::ProgressClient>,
RegisterFromStreamCallback) { RegisterFromStreamCallback callback) {
NOTREACHED(); DCHECK(!drainer_);
DCHECK(!drainer_client_);
drainer_client_ = std::make_unique<DataPipeDrainerClient>(
"someuuid", content_type, std::move(callback));
drainer_ = std::make_unique<mojo::DataPipeDrainer>(drainer_client_.get(),
std::move(data));
} }
void FakeBlobRegistry::GetBlobFromUUID( void FakeBlobRegistry::GetBlobFromUUID(
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#ifndef THIRD_PARTY_BLINK_RENDERER_PLATFORM_BLOB_TESTING_FAKE_BLOB_REGISTRY_H_ #ifndef THIRD_PARTY_BLINK_RENDERER_PLATFORM_BLOB_TESTING_FAKE_BLOB_REGISTRY_H_
#define THIRD_PARTY_BLINK_RENDERER_PLATFORM_BLOB_TESTING_FAKE_BLOB_REGISTRY_H_ #define THIRD_PARTY_BLINK_RENDERER_PLATFORM_BLOB_TESTING_FAKE_BLOB_REGISTRY_H_
#include "mojo/public/cpp/system/data_pipe_drainer.h"
#include "third_party/blink/public/mojom/blob/blob_registry.mojom-blink.h" #include "third_party/blink/public/mojom/blob/blob_registry.mojom-blink.h"
namespace blink { namespace blink {
...@@ -14,6 +15,9 @@ namespace blink { ...@@ -14,6 +15,9 @@ namespace blink {
// FakeBlob instance with the correct uuid. // FakeBlob instance with the correct uuid.
class FakeBlobRegistry : public mojom::blink::BlobRegistry { class FakeBlobRegistry : public mojom::blink::BlobRegistry {
public: public:
FakeBlobRegistry();
~FakeBlobRegistry() override;
void Register(mojo::PendingReceiver<mojom::blink::Blob>, void Register(mojo::PendingReceiver<mojom::blink::Blob>,
const String& uuid, const String& uuid,
const String& content_type, const String& content_type,
...@@ -49,6 +53,11 @@ class FakeBlobRegistry : public mojom::blink::BlobRegistry { ...@@ -49,6 +53,11 @@ class FakeBlobRegistry : public mojom::blink::BlobRegistry {
String uuid; String uuid;
}; };
Vector<OwnedReceiver> owned_receivers; Vector<OwnedReceiver> owned_receivers;
std::unique_ptr<mojo::DataPipeDrainer> drainer_;
class DataPipeDrainerClient;
std::unique_ptr<DataPipeDrainerClient> drainer_client_;
}; };
} // namespace blink } // namespace blink
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment