Commit 96d4072f authored by Rakina Zata Amni's avatar Rakina Zata Amni Committed by Commit Bot

Use deque for ResponseBodyLoader's buffered_data_ and protect with flag

To use memory more efficiently, we're using a deque of chunks so that
sent can be removed immediately after we're finished sending it. Also,
protects the buffering behind a flag (later on we'll do it only when
we're deferring due to bfcache, when it's possible to know that).

Bug: 1148064
Change-Id: I13902906675a337bf8e02653f0a7f23e4f579ee4
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2531803
Commit-Queue: Rakina Zata Amni <rakina@chromium.org>
Auto-Submit: Rakina Zata Amni <rakina@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarAlexander Timin <altimin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#827193}
parent fc9d822f
......@@ -7,6 +7,7 @@
#include <algorithm>
#include <utility>
#include "base/auto_reset.h"
#include "third_party/blink/public/common/features.h"
#include "third_party/blink/renderer/platform/loader/fetch/fetch_context.h"
#include "third_party/blink/renderer/platform/loader/fetch/resource.h"
#include "third_party/blink/renderer/platform/loader/fetch/resource_loader.h"
......@@ -277,14 +278,65 @@ class ResponseBodyLoader::DelegatingBytesConsumer final
bool waiting_for_lookahead_bytes_ = false;
};
class ResponseBodyLoader::Buffer final
: public GarbageCollected<ResponseBodyLoader::Buffer> {
public:
explicit Buffer(ResponseBodyLoader* owner) : owner_(owner) {}
bool IsEmpty() const { return buffered_data_.IsEmpty(); }
void AddChunk(const char* buffer, size_t available) {
Vector<char> new_chunk;
new_chunk.Append(buffer, available);
buffered_data_.emplace_back(new_chunk);
}
// Dispatches the frontmost chunk in |buffered_data_|. Returns the size of
// the data that got dispatched.
size_t DispatchChunk(size_t max_chunk_size) {
// Dispatch the chunk at the front of the queue.
const Vector<char>& current_chunk = buffered_data_.front();
DCHECK_LT(offset_in_current_chunk_, current_chunk.size());
// Send as much of the chunk as possible without exceeding |max_chunk_size|.
base::span<const char> span(current_chunk);
span = span.subspan(offset_in_current_chunk_);
span = span.subspan(0, std::min(span.size(), max_chunk_size));
owner_->DidReceiveData(span);
size_t sent_size = span.size();
offset_in_current_chunk_ += sent_size;
if (offset_in_current_chunk_ == current_chunk.size()) {
// We've finished sending the chunk at the front of the queue, pop it so
// that we'll send the next chunk next time.
offset_in_current_chunk_ = 0;
buffered_data_.pop_front();
}
return sent_size;
}
void Trace(Visitor* visitor) const { visitor->Trace(owner_); }
private:
const Member<ResponseBodyLoader> owner_;
// We save the response body read when suspended as a queue of chunks so that
// we can free memory as soon as we finish sending a chunk completely.
Deque<Vector<char>> buffered_data_;
size_t offset_in_current_chunk_ = 0;
};
ResponseBodyLoader::ResponseBodyLoader(
BytesConsumer& bytes_consumer,
ResponseBodyLoaderClient& client,
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: bytes_consumer_(bytes_consumer),
client_(client),
task_runner_(std::move(task_runner)) {
task_runner_(std::move(task_runner)),
buffer_data_while_suspended_(
base::FeatureList::IsEnabled(features::kLoadingTasksUnfreezable)) {
bytes_consumer_->SetClient(this);
if (buffer_data_while_suspended_)
body_buffer_ = MakeGarbageCollected<Buffer>(this);
}
mojo::ScopedDataPipeConsumerHandle ResponseBodyLoader::DrainAsDataPipe(
......@@ -433,7 +485,7 @@ void ResponseBodyLoader::OnStateChange() {
size_t num_bytes_consumed = 0;
while (!aborted_) {
while (!aborted_ && (!suspended_ || buffer_data_while_suspended_)) {
if (kMaxNumConsumedBytesInTask == num_bytes_consumed) {
// We've already consumed many bytes in this task. Defer the remaining
// to the next task.
......@@ -443,19 +495,12 @@ void ResponseBodyLoader::OnStateChange() {
return;
}
if (!suspended_ && bytes_remaining_in_buffer_ > 0) {
// We need to empty |buffered_data_| first before reading more from
if (!suspended_ && body_buffer_ && !body_buffer_->IsEmpty()) {
DCHECK(buffer_data_while_suspended_);
// We need to empty |body_buffer_| first before reading more from
// |bytes_consumer_|.
auto* start_position = buffered_data_.end() - bytes_remaining_in_buffer_;
size_t size_to_send =
std::min(bytes_remaining_in_buffer_,
kMaxNumConsumedBytesInTask - num_bytes_consumed);
DidReceiveData(
base::make_span(start_position, start_position + size_to_send));
bytes_remaining_in_buffer_ -= size_to_send;
num_bytes_consumed += size_to_send;
if (bytes_remaining_in_buffer_ == 0)
buffered_data_.clear();
num_bytes_consumed += body_buffer_->DispatchChunk(
kMaxNumConsumedBytesInTask - num_bytes_consumed);
continue;
}
......@@ -472,9 +517,9 @@ void ResponseBodyLoader::OnStateChange() {
available =
std::min(available, kMaxNumConsumedBytesInTask - num_bytes_consumed);
if (suspended_) {
// When suspended, save the read data into |buffered_data_| instead.
buffered_data_.insert(buffered_data_.size(), buffer, available);
bytes_remaining_in_buffer_ += available;
DCHECK(buffer_data_while_suspended_);
// When suspended, save the read data into |body_buffer_| instead.
body_buffer_->AddChunk(buffer, available);
} else {
DidReceiveData(base::make_span(buffer, available));
}
......@@ -509,6 +554,7 @@ void ResponseBodyLoader::Trace(Visitor* visitor) const {
visitor->Trace(bytes_consumer_);
visitor->Trace(delegating_bytes_consumer_);
visitor->Trace(client_);
visitor->Trace(body_buffer_);
ResponseBodyLoaderDrainableInterface::Trace(visitor);
ResponseBodyLoaderClient::Trace(visitor);
BytesConsumer::Client::Trace(visitor);
......
......@@ -109,6 +109,7 @@ class PLATFORM_EXPORT ResponseBodyLoader final
static constexpr size_t kMaxNumConsumedBytesInTask = 64 * 1024;
private:
class Buffer;
class DelegatingBytesConsumer;
// ResponseBodyLoaderClient implementation.
......@@ -120,13 +121,13 @@ class PLATFORM_EXPORT ResponseBodyLoader final
// BytesConsumer::Client implementation.
void OnStateChange() override;
String DebugName() const override { return "ResponseBodyLoader"; }
// When |buffer_data_while_suspended_| is true, we'll save the response body
// read when suspended.
Member<Buffer> body_buffer_;
Member<BytesConsumer> bytes_consumer_;
Member<DelegatingBytesConsumer> delegating_bytes_consumer_;
const Member<ResponseBodyLoaderClient> client_;
const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
Vector<char> buffered_data_;
size_t bytes_remaining_in_buffer_ = 0;
bool started_ = false;
bool aborted_ = false;
bool suspended_ = false;
......@@ -135,6 +136,7 @@ class PLATFORM_EXPORT ResponseBodyLoader final
bool fail_signal_is_pending_ = false;
bool cancel_signal_is_pending_ = false;
bool in_two_phase_read_ = false;
const bool buffer_data_while_suspended_;
};
} // namespace blink
......
......@@ -7,7 +7,9 @@
#include <memory>
#include <string>
#include <utility>
#include "base/test/scoped_feature_list.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "third_party/blink/public/common/features.h"
#include "third_party/blink/renderer/platform/heap/persistent.h"
#include "third_party/blink/renderer/platform/loader/fetch/data_pipe_bytes_consumer.h"
#include "third_party/blink/renderer/platform/loader/testing/bytes_consumer_test_reader.h"
......@@ -431,7 +433,25 @@ TEST_F(ResponseBodyLoaderTest, DrainAsDataPipe) {
EXPECT_EQ("xyzabc", client->GetData());
}
TEST_F(ResponseBodyLoaderTest, ReadDataFromConsumerWhileSuspended) {
class ResponseBodyLoaderLoadingTasksUnfreezableTest
: public ResponseBodyLoaderTest,
public ::testing::WithParamInterface<bool> {
protected:
ResponseBodyLoaderLoadingTasksUnfreezableTest() {
if (BufferDataWhileSuspended()) {
scoped_feature_list_.InitAndEnableFeature(
features::kLoadingTasksUnfreezable);
}
}
bool BufferDataWhileSuspended() { return GetParam(); }
private:
base::test::ScopedFeatureList scoped_feature_list_;
};
TEST_P(ResponseBodyLoaderLoadingTasksUnfreezableTest,
ReadDataFromConsumerWhileSuspended) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* consumer = MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
auto* client = MakeGarbageCollected<TestClient>();
......@@ -449,12 +469,17 @@ TEST_F(ResponseBodyLoaderTest, ReadDataFromConsumerWhileSuspended) {
consumer->Add(Command(Command::kData, "llo"));
consumer->Add(Command(Command::kWait));
consumer->Add(Command(Command::kData, "wo"));
// ResponseBodyLoader will buffer data when deferred, and won't notify the
// client until it's resumed.
// If kLoadingTasksUnfreezable is enabled, ResponseBodyLoader will buffer data
// when deferred, and won't notify the client until it's resumed.
EXPECT_FALSE(consumer->IsCommandsEmpty());
consumer->TriggerOnStateChange();
while (!consumer->IsCommandsEmpty()) {
task_runner->RunUntilIdle();
if (BufferDataWhileSuspended()) {
while (!consumer->IsCommandsEmpty()) {
task_runner->RunUntilIdle();
}
} else {
EXPECT_FALSE(consumer->IsCommandsEmpty());
}
EXPECT_EQ("he", client->GetData());
......@@ -473,7 +498,8 @@ TEST_F(ResponseBodyLoaderTest, ReadDataFromConsumerWhileSuspended) {
EXPECT_FALSE(client->LoadingIsFailed());
}
TEST_F(ResponseBodyLoaderTest, ReadDataFromConsumerWhileSuspendedLong) {
TEST_P(ResponseBodyLoaderLoadingTasksUnfreezableTest,
ReadDataFromConsumerWhileSuspendedLong) {
auto task_runner = base::MakeRefCounted<scheduler::FakeTaskRunner>();
auto* consumer = MakeGarbageCollected<ReplayingBytesConsumer>(task_runner);
auto* client = MakeGarbageCollected<TestClient>();
......@@ -489,13 +515,19 @@ TEST_F(ResponseBodyLoaderTest, ReadDataFromConsumerWhileSuspendedLong) {
body_loader->Suspend();
std::string body(70000, '*');
consumer->Add(Command(Command::kDataAndDone, body.c_str()));
// ResponseBodyLoader will buffer data when deferred, and won't notify the
// client until it's resumed.
// If kLoadingTasksUnfreezable is enabled, ResponseBodyLoader will buffer data
// when deferred, and won't notify the client until it's resumed.
EXPECT_FALSE(consumer->IsCommandsEmpty());
consumer->TriggerOnStateChange();
while (!consumer->IsCommandsEmpty()) {
task_runner->RunUntilIdle();
if (BufferDataWhileSuspended()) {
while (!consumer->IsCommandsEmpty()) {
task_runner->RunUntilIdle();
}
} else {
EXPECT_FALSE(consumer->IsCommandsEmpty());
}
EXPECT_EQ("", client->GetData());
EXPECT_FALSE(client->LoadingIsFinished());
EXPECT_FALSE(client->LoadingIsFailed());
......@@ -508,6 +540,10 @@ TEST_F(ResponseBodyLoaderTest, ReadDataFromConsumerWhileSuspendedLong) {
EXPECT_FALSE(client->LoadingIsFailed());
}
INSTANTIATE_TEST_SUITE_P(All,
ResponseBodyLoaderLoadingTasksUnfreezableTest,
::testing::Bool());
TEST_F(ResponseBodyLoaderTest, DrainAsDataPipeAndReportError) {
mojo::ScopedDataPipeConsumerHandle consumer_end;
mojo::ScopedDataPipeProducerHandle producer_end;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment