Commit a8255fec authored by Marijn Kruisselbrink's avatar Marijn Kruisselbrink Committed by Commit Bot

[Blobs] Make task runner used by BlobBytesProvider more explicit.

Rather than just documenting that the class should be bound to a mojo
pipe on a different thread than the main/worker threads, explicitly make
the API such that it creates its own task runner to bind on.

That same task runner is then also passed on to BlobBytesStreamer, to
replace its usage of SequencedTaskRunnerHandle::Get().

Bug: 786332
Change-Id: Ic2e38a53514bc612ffaf579c3eb1bc4a88b29efe
Reviewed-on: https://chromium-review.googlesource.com/976098Reviewed-by: default avatarHajime Hoshi <hajimehoshi@chromium.org>
Reviewed-by: default avatarAlexander Timin <altimin@chromium.org>
Commit-Queue: Marijn Kruisselbrink <mek@chromium.org>
Cr-Commit-Position: refs/heads/master@{#545863}
parent cf7360dc
......@@ -5,6 +5,8 @@
#include "platform/blob/BlobBytesProvider.h"
#include "base/numerics/safe_conversions.h"
#include "base/task_scheduler/post_task.h"
#include "mojo/public/cpp/bindings/strong_binding.h"
#include "platform/CrossThreadFunctional.h"
#include "platform/Histogram.h"
#include "platform/WebTaskRunner.h"
......@@ -21,12 +23,13 @@ namespace {
class BlobBytesStreamer {
public:
BlobBytesStreamer(Vector<scoped_refptr<RawData>> data,
mojo::ScopedDataPipeProducerHandle pipe)
mojo::ScopedDataPipeProducerHandle pipe,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: data_(std::move(data)),
pipe_(std::move(pipe)),
watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC,
base::SequencedTaskRunnerHandle::Get()) {
std::move(task_runner)) {
watcher_.Watch(pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
WTF::BindRepeating(&BlobBytesStreamer::OnWritable,
WTF::Unretained(this)));
......@@ -110,13 +113,30 @@ void DecreaseChildProcessRefCount() {
constexpr size_t BlobBytesProvider::kMaxConsolidatedItemSizeInBytes;
BlobBytesProvider::BlobBytesProvider() {
IncreaseChildProcessRefCount();
// static
BlobBytesProvider* BlobBytesProvider::CreateAndBind(
mojom::blink::BytesProviderRequest request) {
auto task_runner = base::CreateSequencedTaskRunnerWithTraits(
{base::MayBlock(), base::TaskPriority::USER_VISIBLE});
auto provider = base::WrapUnique(new BlobBytesProvider(task_runner));
auto* result = provider.get();
// TODO(mek): Consider binding BytesProvider on the IPC thread instead, only
// using the MayBlock taskrunner for actual file operations.
PostCrossThreadTask(
*task_runner, FROM_HERE,
CrossThreadBind(
[](std::unique_ptr<BlobBytesProvider> provider,
mojom::blink::BytesProviderRequest request) {
mojo::MakeStrongBinding(std::move(provider), std::move(request));
},
WTF::Passed(std::move(provider)), WTF::Passed(std::move(request))));
return result;
}
BlobBytesProvider::BlobBytesProvider(scoped_refptr<RawData> data)
: BlobBytesProvider() {
AppendData(std::move(data));
// static
std::unique_ptr<BlobBytesProvider> BlobBytesProvider::CreateForTesting(
scoped_refptr<base::SequencedTaskRunner> task_runner) {
return base::WrapUnique(new BlobBytesProvider(std::move(task_runner)));
}
BlobBytesProvider::~BlobBytesProvider() {
......@@ -140,6 +160,7 @@ void BlobBytesProvider::AppendData(base::span<const char> data) {
}
void BlobBytesProvider::RequestAsReply(RequestAsReplyCallback callback) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
// TODO(mek): Once better metrics are created we could experiment with ways
// to reduce the number of copies of data that are made here.
Vector<uint8_t> result;
......@@ -150,8 +171,9 @@ void BlobBytesProvider::RequestAsReply(RequestAsReplyCallback callback) {
void BlobBytesProvider::RequestAsStream(
mojo::ScopedDataPipeProducerHandle pipe) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
// BlobBytesStreamer will self delete when done.
new BlobBytesStreamer(std::move(data_), std::move(pipe));
new BlobBytesStreamer(std::move(data_), std::move(pipe), task_runner_);
}
void BlobBytesProvider::RequestAsFile(uint64_t source_offset,
......@@ -159,8 +181,7 @@ void BlobBytesProvider::RequestAsFile(uint64_t source_offset,
base::File file,
uint64_t file_offset,
RequestAsFileCallback callback) {
DCHECK(!Platform::Current()->FileTaskRunner() ||
Platform::Current()->FileTaskRunner()->RunsTasksInCurrentSequence());
DCHECK(task_runner_->RunsTasksInCurrentSequence());
DEFINE_THREAD_SAFE_STATIC_LOCAL(BooleanHistogram, seek_histogram,
("Storage.Blob.RendererFileSeekFailed"));
DEFINE_THREAD_SAFE_STATIC_LOCAL(BooleanHistogram, write_histogram,
......@@ -231,4 +252,10 @@ void BlobBytesProvider::RequestAsFile(uint64_t source_offset,
std::move(callback).Run(info.last_modified);
}
BlobBytesProvider::BlobBytesProvider(
scoped_refptr<base::SequencedTaskRunner> task_runner)
: task_runner_(std::move(task_runner)) {
IncreaseChildProcessRefCount();
}
} // namespace blink
......@@ -23,8 +23,13 @@ class PLATFORM_EXPORT BlobBytesProvider : public mojom::blink::BytesProvider {
// data appended to the same item.
static constexpr size_t kMaxConsolidatedItemSizeInBytes = 15 * 1024;
BlobBytesProvider();
explicit BlobBytesProvider(scoped_refptr<RawData>);
// Creates a new instance, and binds it on a new SequencedTaskRunner. The
// returned instance should only be considered valid as long as the request
// passed in to this method is still known to be valid.
static BlobBytesProvider* CreateAndBind(mojom::blink::BytesProviderRequest);
static std::unique_ptr<BlobBytesProvider> CreateForTesting(
scoped_refptr<base::SequencedTaskRunner>);
~BlobBytesProvider() override;
void AppendData(scoped_refptr<RawData>);
......@@ -42,6 +47,12 @@ class PLATFORM_EXPORT BlobBytesProvider : public mojom::blink::BytesProvider {
private:
FRIEND_TEST_ALL_PREFIXES(BlobBytesProviderTest, Consolidation);
BlobBytesProvider(scoped_refptr<base::SequencedTaskRunner>);
// The task runner this class is bound on, as well as what is used by the
// RequestAsStream method to monitor the data pipe.
scoped_refptr<base::SequencedTaskRunner> task_runner_;
Vector<scoped_refptr<RawData>> data_;
// |offsets_| always contains exactly one fewer item than |data_| (except when
// |data_| itself is empty).
......
......@@ -41,6 +41,15 @@ class BlobBytesProviderTest : public ::testing::Test {
combined_bytes_.AppendVector(test_bytes3_);
}
std::unique_ptr<BlobBytesProvider> CreateProvider(
scoped_refptr<RawData> data = nullptr) {
auto result = BlobBytesProvider::CreateForTesting(
blink::scheduler::GetSequencedTaskRunnerForTesting());
if (data)
result->AppendData(std::move(data));
return result;
}
protected:
base::test::ScopedTaskEnvironment scoped_task_environment_;
......@@ -54,29 +63,29 @@ class BlobBytesProviderTest : public ::testing::Test {
};
TEST_F(BlobBytesProviderTest, Consolidation) {
BlobBytesProvider data;
data.AppendData(base::make_span("abc", 3));
data.AppendData(base::make_span("def", 3));
data.AppendData(base::make_span("ps1", 3));
data.AppendData(base::make_span("ps2", 3));
auto data = CreateProvider();
data->AppendData(base::make_span("abc", 3));
data->AppendData(base::make_span("def", 3));
data->AppendData(base::make_span("ps1", 3));
data->AppendData(base::make_span("ps2", 3));
EXPECT_EQ(1u, data.data_.size());
EXPECT_EQ(12u, data.data_[0]->length());
EXPECT_EQ(0, memcmp(data.data_[0]->data(), "abcdefps1ps2", 12));
EXPECT_EQ(1u, data->data_.size());
EXPECT_EQ(12u, data->data_[0]->length());
EXPECT_EQ(0, memcmp(data->data_[0]->data(), "abcdefps1ps2", 12));
auto large_data = std::make_unique<char[]>(
BlobBytesProvider::kMaxConsolidatedItemSizeInBytes);
data.AppendData(base::make_span(
data->AppendData(base::make_span(
large_data.get(), BlobBytesProvider::kMaxConsolidatedItemSizeInBytes));
EXPECT_EQ(2u, data.data_.size());
EXPECT_EQ(12u, data.data_[0]->length());
EXPECT_EQ(2u, data->data_.size());
EXPECT_EQ(12u, data->data_[0]->length());
EXPECT_EQ(BlobBytesProvider::kMaxConsolidatedItemSizeInBytes,
data.data_[1]->length());
data->data_[1]->length());
}
TEST_F(BlobBytesProviderTest, RequestAsReply) {
auto provider = std::make_unique<BlobBytesProvider>(test_data1_);
auto provider = CreateProvider(test_data1_);
Vector<uint8_t> received_bytes;
provider->RequestAsReply(
base::BindOnce([](Vector<uint8_t>* bytes_out,
......@@ -85,7 +94,8 @@ TEST_F(BlobBytesProviderTest, RequestAsReply) {
EXPECT_EQ(test_bytes1_, received_bytes);
received_bytes.clear();
provider = std::make_unique<BlobBytesProvider>(test_data1_);
provider = CreateProvider();
provider->AppendData(test_data1_);
provider->AppendData(test_data2_);
provider->AppendData(test_data3_);
provider->RequestAsReply(
......@@ -111,7 +121,8 @@ class RequestAsFile : public BlobBytesProviderTest,
public:
void SetUp() override {
BlobBytesProviderTest::SetUp();
test_provider_ = std::make_unique<BlobBytesProvider>(test_data1_);
test_provider_ = CreateProvider();
test_provider_->AppendData(test_data1_);
test_provider_->AppendData(test_data2_);
test_provider_->AppendData(test_data3_);
......@@ -241,7 +252,8 @@ INSTANTIATE_TEST_CASE_P(BlobBytesProviderTest,
::testing::ValuesIn(file_tests));
TEST_F(BlobBytesProviderTest, RequestAsFile_MultipleChunks) {
auto provider = std::make_unique<BlobBytesProvider>(test_data1_);
auto provider = CreateProvider();
provider->AppendData(test_data1_);
provider->AppendData(test_data2_);
provider->AppendData(test_data3_);
......@@ -273,7 +285,7 @@ TEST_F(BlobBytesProviderTest, RequestAsFile_MultipleChunks) {
}
TEST_F(BlobBytesProviderTest, RequestAsFile_InvaldFile) {
auto provider = std::make_unique<BlobBytesProvider>(test_data1_);
auto provider = CreateProvider(test_data1_);
provider->RequestAsFile(
0, 16, base::File(), 0,
......@@ -283,7 +295,7 @@ TEST_F(BlobBytesProviderTest, RequestAsFile_InvaldFile) {
}
TEST_F(BlobBytesProviderTest, RequestAsFile_UnwritableFile) {
auto provider = std::make_unique<BlobBytesProvider>(test_data1_);
auto provider = CreateProvider(test_data1_);
base::FilePath path;
base::CreateTemporaryFile(&path);
......@@ -301,7 +313,8 @@ TEST_F(BlobBytesProviderTest, RequestAsFile_UnwritableFile) {
}
TEST_F(BlobBytesProviderTest, RequestAsStream) {
auto provider = std::make_unique<BlobBytesProvider>(test_data1_);
auto provider = CreateProvider();
provider->AppendData(test_data1_);
provider->AppendData(test_data2_);
provider->AppendData(test_data3_);
......
......@@ -82,11 +82,6 @@ bool IsValidBlobType(const String& type) {
return true;
}
void BindBytesProvider(std::unique_ptr<BlobBytesProvider> provider,
BytesProviderRequest request) {
mojo::MakeStrongBinding(std::move(provider), std::move(request));
}
mojom::blink::BlobRegistry* g_blob_registry_for_testing = nullptr;
mojom::blink::BlobRegistry* GetThreadSpecificRegistry() {
......@@ -291,21 +286,9 @@ void BlobData::AppendDataInternal(base::span<const char> data,
}
} else {
BytesProviderPtrInfo bytes_provider_info;
auto provider = std::make_unique<BlobBytesProvider>();
last_bytes_provider_ = provider.get();
scoped_refptr<base::SingleThreadTaskRunner> file_runner =
Platform::Current()->FileTaskRunner();
if (file_runner) {
// TODO(mek): Considering binding BytesProvider on the IO thread
// instead, only using the File thread for actual file operations.
PostCrossThreadTask(
*file_runner, FROM_HERE,
CrossThreadBind(&BindBytesProvider, WTF::Passed(std::move(provider)),
WTF::Passed(MakeRequest(&bytes_provider_info))));
} else {
BindBytesProvider(std::move(provider), MakeRequest(&bytes_provider_info));
}
last_bytes_provider_ =
BlobBytesProvider::CreateAndBind(MakeRequest(&bytes_provider_info));
auto bytes_element = DataElementBytes::New(data.length(), WTF::nullopt,
std::move(bytes_provider_info));
if (should_embed_bytes) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment