Commit 37d1ee1a authored by Rakina Zata Amni's avatar Rakina Zata Amni Committed by Commit Bot

Reland BufferedBody with queue + fix to write body ASAP + behind flag

The BufferedBody CL got reverted at https://crrev.com/c/2532869 because
it introduced flakes. This CL relands it and adds a potential fix:
write the drained body gradually, as previously we won't write to the
newly created pipe until we've finished draining completely.

This CL also changes the buffered_body_ data type to use queue of chunks
so that we can free memory as soon as we finished writing a chunk of
the response body to prevent potential OOMs, and protects all of this
code behind a flag (later on we'll also add a check so that we'll buffer
the body only when we're deferring due to bfcache)

Bug: 1147081, 1148130, 1137682
Change-Id: I61d95c9c67220a296daa66f7409f27fdeb0e8182
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2531338Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarAlexander Timin <altimin@chromium.org>
Commit-Queue: Rakina Zata Amni <rakina@chromium.org>
Cr-Commit-Position: refs/heads/master@{#827172}
parent 5e1e2457
......@@ -14,6 +14,7 @@
#include "content/public/common/url_utils.h"
#include "content/public/renderer/content_renderer_client.h"
#include "content/renderer/loader/resource_dispatcher.h"
#include "mojo/public/cpp/system/data_pipe_drainer.h"
#include "net/url_request/redirect_info.h"
#include "services/network/public/cpp/features.h"
#include "services/network/public/mojom/url_response_head.mojom.h"
......@@ -143,6 +144,115 @@ class URLLoaderClientImpl::DeferredOnComplete final : public DeferredMessage {
const network::URLLoaderCompletionStatus status_;
};
class URLLoaderClientImpl::BodyBuffer final
: public mojo::DataPipeDrainer::Client {
public:
BodyBuffer(URLLoaderClientImpl* owner,
mojo::ScopedDataPipeConsumerHandle readable,
mojo::ScopedDataPipeProducerHandle writable,
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: owner_(owner),
writable_(std::move(writable)),
writable_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
std::move(task_runner)) {
pipe_drainer_ =
std::make_unique<mojo::DataPipeDrainer>(this, std::move(readable));
writable_watcher_.Watch(
writable_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
base::BindRepeating(&BodyBuffer::WriteBufferedBody,
base::Unretained(this)));
}
bool active() const { return writable_watcher_.IsWatching(); }
// mojo::DataPipeDrainer::Client
void OnDataAvailable(const void* data, size_t num_bytes) override {
DCHECK(draining_);
SCOPED_CRASH_KEY_NUMBER(OnDataAvailable, buffered_body_size,
buffered_body_.size());
SCOPED_CRASH_KEY_NUMBER(OnDataAvailable, data_bytes, num_bytes);
SCOPED_CRASH_KEY_STRING256(
OnDataAvailable, last_loaded_url,
owner_->last_loaded_url().possibly_invalid_spec());
buffered_body_.emplace(static_cast<const char*>(data),
static_cast<const char*>(data) + num_bytes);
WriteBufferedBody(MOJO_RESULT_OK);
}
void OnDataComplete() override {
DCHECK(draining_);
draining_ = false;
WriteBufferedBody(MOJO_RESULT_OK);
}
private:
void WriteBufferedBody(MojoResult) {
// Try to write all the remaining chunks in |buffered_body_|.
while (!buffered_body_.empty()) {
// Write the chunk at the front of |buffered_body_|.
const std::vector<char>& current_chunk = buffered_body_.front();
DCHECK_LE(offset_in_current_chunk_, current_chunk.size());
uint32_t bytes_sent = base::saturated_cast<uint32_t>(
current_chunk.size() - offset_in_current_chunk_);
MojoResult result =
writable_->WriteData(current_chunk.data() + offset_in_current_chunk_,
&bytes_sent, MOJO_WRITE_DATA_FLAG_NONE);
switch (result) {
case MOJO_RESULT_OK:
break;
case MOJO_RESULT_FAILED_PRECONDITION:
// The pipe is closed unexpectedly, finish writing now.
draining_ = false;
Finish();
return;
case MOJO_RESULT_SHOULD_WAIT:
writable_watcher_.ArmOrNotify();
return;
default:
NOTREACHED();
return;
}
// We've sent |bytes_sent| bytes, update the current offset in the
// frontmost chunk.
offset_in_current_chunk_ += bytes_sent;
DCHECK_LE(offset_in_current_chunk_, current_chunk.size());
if (offset_in_current_chunk_ == current_chunk.size()) {
// We've finished writing the chunk at the front of the queue, pop it so
// that we'll write the next chunk next time.
buffered_body_.pop();
offset_in_current_chunk_ = 0;
}
}
// We're finished if we've drained the original pipe and sent all the
// buffered body.
if (!draining_)
Finish();
}
void Finish() {
DCHECK(!draining_);
// We've read and written all the data from the original pipe.
writable_watcher_.Cancel();
writable_.reset();
// There might be a deferred OnComplete message waiting for us to finish
// draining the response body, so flush the deferred messages in
// the owner URLLoaderClientImpl.
owner_->FlushDeferredMessages();
}
URLLoaderClientImpl* const owner_;
mojo::ScopedDataPipeProducerHandle writable_;
mojo::SimpleWatcher writable_watcher_;
std::unique_ptr<mojo::DataPipeDrainer> pipe_drainer_;
// We save the received response body as a queue of chunks so that we can free
// memory as soon as we finish sending a chunk completely.
base::queue<std::vector<char>> buffered_body_;
uint32_t offset_in_current_chunk_ = 0;
bool draining_ = true;
};
URLLoaderClientImpl::URLLoaderClientImpl(
int request_id,
ResourceDispatcher* resource_dispatcher,
......@@ -223,6 +333,15 @@ void URLLoaderClientImpl::FlushDeferredMessages() {
if (has_completion_message) {
DCHECK_GT(messages.size(), 0u);
DCHECK(messages.back()->IsCompletionMessage());
if (body_buffer_ && body_buffer_->active()) {
// If we still have an active body buffer, it means we haven't drained all
// of the contents of the response body yet. We shouldn't dispatch the
// completion message now, so
// put the message back into |deferred_messages_| to be sent later after
// the body buffer is no longer active.
deferred_messages_.emplace_back(std::move(messages.back()));
return;
}
messages.back()->HandleMessage(resource_dispatcher_, request_id_);
}
}
......@@ -321,13 +440,41 @@ void URLLoaderClientImpl::OnStartLoadingResponseBody(
base::TimeTicks::Now() - on_receive_response_time_);
}
if (NeedsStoringMessage()) {
StoreAndDispatch(
std::make_unique<DeferredOnStartLoadingResponseBody>(std::move(body)));
} else {
if (!NeedsStoringMessage()) {
// Send the message immediately.
resource_dispatcher_->OnStartLoadingResponseBody(request_id_,
std::move(body));
return;
}
if (!base::FeatureList::IsEnabled(
blink::features::kLoadingTasksUnfreezable)) {
// Defer the message, storing the original body pipe.
StoreAndDispatch(
std::make_unique<DeferredOnStartLoadingResponseBody>(std::move(body)));
return;
}
// We want to run loading tasks while deferred (but without dispatching the
// messages). Drain the original pipe containing the response body into a
// new pipe so that we won't block the network service if we're deferred for
// a long time.
mojo::ScopedDataPipeProducerHandle new_body_producer;
mojo::ScopedDataPipeConsumerHandle new_body_consumer;
MojoResult result =
mojo::CreateDataPipe(nullptr, &new_body_producer, &new_body_consumer);
if (result != MOJO_RESULT_OK) {
// We failed to make a new pipe, close the connections and dispatch an
// OnComplete message instead.
url_loader_.reset();
url_loader_client_receiver_.reset();
OnComplete(
network::URLLoaderCompletionStatus(net::ERR_INSUFFICIENT_RESOURCES));
return;
}
body_buffer_ = std::make_unique<BodyBuffer>(
this, std::move(body), std::move(new_body_producer), task_runner_);
StoreAndDispatch(std::make_unique<DeferredOnStartLoadingResponseBody>(
std::move(new_body_consumer)));
}
void URLLoaderClientImpl::OnComplete(
......
......@@ -79,6 +79,7 @@ class CONTENT_EXPORT URLLoaderClientImpl final
const GURL& last_loaded_url() const { return last_loaded_url_; }
private:
class BodyBuffer;
class DeferredMessage;
class DeferredOnReceiveResponse;
class DeferredOnReceiveRedirect;
......@@ -92,6 +93,7 @@ class CONTENT_EXPORT URLLoaderClientImpl final
void OnConnectionClosed();
std::vector<std::unique_ptr<DeferredMessage>> deferred_messages_;
std::unique_ptr<BodyBuffer> body_buffer_;
const int request_id_;
bool has_received_response_head_ = false;
bool has_received_response_body_ = false;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment