Commit b5decd74 authored by Rakina Zata Amni's avatar Rakina Zata Amni Committed by Commit Bot

Make DeferredOnStartLoadingResponseBody drain the body pipe

Currently DeferredOnStartLoadingResponseBody keeps the body's
ScopedDataPipeConsumerHandle around until it gets flushed. This might
result in the pipe blocking the network service if we're deferred for a
long time, which is likely if a page is put into the back-forward cache.

With this CL, DeferredOnStartLoadingResponseBody will drain the original
body pipe and channel the data into a new pipe, so that data coming from
the network service will be handled quickly even when deferred.

Bug: 1137682
Change-Id: Id4dadf4b85657c57821696443e72725fe188cfe1
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2513314
Commit-Queue: Rakina Zata Amni <rakina@chromium.org>
Reviewed-by: default avatarKinuko Yasuda <kinuko@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Cr-Commit-Position: refs/heads/master@{#825090}
parent 0f50adb6
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "content/public/common/url_utils.h" #include "content/public/common/url_utils.h"
#include "content/public/renderer/content_renderer_client.h" #include "content/public/renderer/content_renderer_client.h"
#include "content/renderer/loader/resource_dispatcher.h" #include "content/renderer/loader/resource_dispatcher.h"
#include "mojo/public/cpp/system/data_pipe_drainer.h"
#include "net/url_request/redirect_info.h" #include "net/url_request/redirect_info.h"
#include "services/network/public/cpp/features.h" #include "services/network/public/cpp/features.h"
#include "services/network/public/mojom/url_response_head.mojom.h" #include "services/network/public/mojom/url_response_head.mojom.h"
...@@ -143,6 +144,100 @@ class URLLoaderClientImpl::DeferredOnComplete final : public DeferredMessage { ...@@ -143,6 +144,100 @@ class URLLoaderClientImpl::DeferredOnComplete final : public DeferredMessage {
const network::URLLoaderCompletionStatus status_; const network::URLLoaderCompletionStatus status_;
}; };
class URLLoaderClientImpl::BodyBuffer final
: public mojo::DataPipeDrainer::Client {
public:
BodyBuffer(URLLoaderClientImpl* owner,
mojo::ScopedDataPipeConsumerHandle readable,
mojo::ScopedDataPipeProducerHandle writable,
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: owner_(owner),
writable_(std::move(writable)),
writable_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
std::move(task_runner)) {
pipe_drainer_ =
std::make_unique<mojo::DataPipeDrainer>(this, std::move(readable));
}
bool active() const { return draining_ || writable_watcher_.IsWatching(); }
// mojo::DataPipeDrainer::Client
void OnDataAvailable(const void* data, size_t num_bytes) override {
DCHECK(draining_);
const auto span =
base::make_span(static_cast<const char*>(data), num_bytes);
buffered_body_.insert(buffered_body_.end(), span.begin(), span.end());
bytes_remaining_in_buffer_ += num_bytes;
}
void OnDataComplete() override {
DCHECK(draining_);
draining_ = false;
// We've finished draining from the original response body pipe, now wait
// until we can write the buffered body to the new pipe.
writable_watcher_.Watch(
writable_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
base::BindRepeating(&BodyBuffer::WriteBufferedBody,
base::Unretained(this)));
writable_watcher_.ArmOrNotify();
}
private:
void WriteBufferedBody(MojoResult) {
DCHECK(!draining_);
while (bytes_remaining_in_buffer_ > 0) {
// Try to write all the remaining parts of |buffered_body_|.
size_t start_position =
buffered_body_.size() - bytes_remaining_in_buffer_;
uint32_t bytes_sent =
base::saturated_cast<uint32_t>(bytes_remaining_in_buffer_);
MojoResult result =
writable_->WriteData(buffered_body_.data() + start_position,
&bytes_sent, MOJO_WRITE_DATA_FLAG_NONE);
switch (result) {
case MOJO_RESULT_OK:
break;
case MOJO_RESULT_FAILED_PRECONDITION:
// The pipe is closed unexpectedly, finish writing now.
Finish();
return;
case MOJO_RESULT_SHOULD_WAIT:
writable_watcher_.ArmOrNotify();
return;
default:
NOTREACHED();
return;
}
DCHECK_GE(bytes_remaining_in_buffer_, bytes_sent);
bytes_remaining_in_buffer_ -= bytes_sent;
}
Finish();
}
void Finish() {
DCHECK(!draining_);
// We've read and written all the data from the original pipe.
writable_watcher_.Cancel();
writable_.reset();
// There might be a deferred OnComplete message waiting for us to finish
// draining the response body, so flush the deferred messages in
// the owner URLLoaderClientImpl.
owner_->FlushDeferredMessages();
}
URLLoaderClientImpl* const owner_;
mojo::ScopedDataPipeProducerHandle writable_;
mojo::SimpleWatcher writable_watcher_;
std::unique_ptr<mojo::DataPipeDrainer> pipe_drainer_;
std::vector<char> buffered_body_;
size_t bytes_remaining_in_buffer_ = 0;
bool draining_ = true;
};
URLLoaderClientImpl::URLLoaderClientImpl( URLLoaderClientImpl::URLLoaderClientImpl(
int request_id, int request_id,
ResourceDispatcher* resource_dispatcher, ResourceDispatcher* resource_dispatcher,
...@@ -223,6 +318,15 @@ void URLLoaderClientImpl::FlushDeferredMessages() { ...@@ -223,6 +318,15 @@ void URLLoaderClientImpl::FlushDeferredMessages() {
if (has_completion_message) { if (has_completion_message) {
DCHECK_GT(messages.size(), 0u); DCHECK_GT(messages.size(), 0u);
DCHECK(messages.back()->IsCompletionMessage()); DCHECK(messages.back()->IsCompletionMessage());
if (body_buffer_ && body_buffer_->active()) {
// If we still have an active body buffer, it means we haven't drained all
// of the contents of the response body yet. We shouldn't dispatch the
// completion message now, so
// put the message back into |deferred_messages_| to be sent later after
// the body buffer is no longer active.
deferred_messages_.emplace_back(std::move(messages.back()));
return;
}
messages.back()->HandleMessage(resource_dispatcher_, request_id_); messages.back()->HandleMessage(resource_dispatcher_, request_id_);
} }
} }
...@@ -322,8 +426,20 @@ void URLLoaderClientImpl::OnStartLoadingResponseBody( ...@@ -322,8 +426,20 @@ void URLLoaderClientImpl::OnStartLoadingResponseBody(
} }
if (NeedsStoringMessage()) { if (NeedsStoringMessage()) {
StoreAndDispatch( // When deferring OnStartLoadingResponseBody, we should drain the original
std::make_unique<DeferredOnStartLoadingResponseBody>(std::move(body))); // pipe containing the response body into a new pipe so that we won't block
// the network service if we're deferred for a long time.
mojo::ScopedDataPipeProducerHandle new_body_producer;
mojo::ScopedDataPipeConsumerHandle new_body_consumer;
MojoResult result =
mojo::CreateDataPipe(nullptr, &new_body_producer, &new_body_consumer);
// If we fail to make a pipe, we'll treat it as an OOM error.
CHECK_EQ(result, MOJO_RESULT_OK);
body_buffer_ = std::make_unique<BodyBuffer>(
this, std::move(body), std::move(new_body_producer), task_runner_);
StoreAndDispatch(std::make_unique<DeferredOnStartLoadingResponseBody>(
std::move(new_body_consumer)));
} else { } else {
resource_dispatcher_->OnStartLoadingResponseBody(request_id_, resource_dispatcher_->OnStartLoadingResponseBody(request_id_,
std::move(body)); std::move(body));
......
...@@ -77,6 +77,7 @@ class CONTENT_EXPORT URLLoaderClientImpl final ...@@ -77,6 +77,7 @@ class CONTENT_EXPORT URLLoaderClientImpl final
void OnComplete(const network::URLLoaderCompletionStatus& status) override; void OnComplete(const network::URLLoaderCompletionStatus& status) override;
private: private:
class BodyBuffer;
class DeferredMessage; class DeferredMessage;
class DeferredOnReceiveResponse; class DeferredOnReceiveResponse;
class DeferredOnReceiveRedirect; class DeferredOnReceiveRedirect;
...@@ -90,6 +91,7 @@ class CONTENT_EXPORT URLLoaderClientImpl final ...@@ -90,6 +91,7 @@ class CONTENT_EXPORT URLLoaderClientImpl final
void OnConnectionClosed(); void OnConnectionClosed();
std::vector<std::unique_ptr<DeferredMessage>> deferred_messages_; std::vector<std::unique_ptr<DeferredMessage>> deferred_messages_;
std::unique_ptr<BodyBuffer> body_buffer_;
const int request_id_; const int request_id_;
bool has_received_response_head_ = false; bool has_received_response_head_ = false;
bool has_received_response_body_ = false; bool has_received_response_body_ = false;
......
...@@ -313,6 +313,98 @@ TEST_F(URLLoaderClientImplTest, DeferWithResponseBody) { ...@@ -313,6 +313,98 @@ TEST_F(URLLoaderClientImplTest, DeferWithResponseBody) {
EXPECT_EQ("hello", GetRequestPeerContextBody(&request_peer_context_)); EXPECT_EQ("hello", GetRequestPeerContextBody(&request_peer_context_));
} }
TEST_F(URLLoaderClientImplTest, StoppedDeferringBeforeResponseBodyDrained) {
// Call OnReceiveResponse, OnStartLoadingResponseBody, OnComplete while
// deferred.
dispatcher_->SetDefersLoading(request_id_, true);
url_loader_client_->OnReceiveResponse(network::mojom::URLResponseHead::New());
mojo::ScopedDataPipeProducerHandle producer_handle;
mojo::ScopedDataPipeConsumerHandle consumer_handle;
ASSERT_EQ(MOJO_RESULT_OK,
mojo::CreateDataPipe(nullptr, &producer_handle, &consumer_handle));
url_loader_client_->OnStartLoadingResponseBody(std::move(consumer_handle));
network::URLLoaderCompletionStatus status;
url_loader_client_->OnComplete(status);
base::RunLoop().RunUntilIdle();
EXPECT_FALSE(request_peer_context_.received_response);
EXPECT_FALSE(request_peer_context_.complete);
EXPECT_EQ("", GetRequestPeerContextBody(&request_peer_context_));
// Write data to the response body pipe, but don't close the connection yet.
uint32_t size = 5;
ASSERT_EQ(MOJO_RESULT_OK, producer_handle->WriteData(
"hello", &size, MOJO_WRITE_DATA_FLAG_NONE));
EXPECT_EQ(5u, size);
// Stop deferring. OnComplete message shouldn't be dispatched yet because
// we're still waiting for the response body pipe to be closed.
dispatcher_->SetDefersLoading(request_id_, false);
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(request_peer_context_.received_response);
EXPECT_FALSE(request_peer_context_.complete);
EXPECT_EQ("", GetRequestPeerContextBody(&request_peer_context_));
// Close the response body pipe.
producer_handle.reset();
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(request_peer_context_.received_response);
EXPECT_TRUE(request_peer_context_.complete);
EXPECT_EQ("hello", GetRequestPeerContextBody(&request_peer_context_));
}
TEST_F(URLLoaderClientImplTest, DeferredWithLongResponseBody) {
// Call OnReceiveResponse, OnStartLoadingResponseBody, OnComplete while
// deferred.
dispatcher_->SetDefersLoading(request_id_, true);
url_loader_client_->OnReceiveResponse(network::mojom::URLResponseHead::New());
mojo::ScopedDataPipeProducerHandle producer_handle;
mojo::ScopedDataPipeConsumerHandle consumer_handle;
ASSERT_EQ(MOJO_RESULT_OK,
mojo::CreateDataPipe(nullptr, &producer_handle, &consumer_handle));
url_loader_client_->OnStartLoadingResponseBody(std::move(consumer_handle));
network::URLLoaderCompletionStatus status;
url_loader_client_->OnComplete(status);
base::RunLoop().RunUntilIdle();
EXPECT_FALSE(request_peer_context_.received_response);
EXPECT_FALSE(request_peer_context_.complete);
EXPECT_EQ("", GetRequestPeerContextBody(&request_peer_context_));
// Write to the response body pipe. It will take several writes.
uint32_t body_size = 70000;
uint32_t bytes_remaining = body_size;
std::string body(body_size, '*');
while (bytes_remaining > 0) {
uint32_t start_position = body_size - bytes_remaining;
uint32_t bytes_sent = bytes_remaining;
MojoResult result = producer_handle->WriteData(
body.c_str() + start_position, &bytes_sent, MOJO_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
base::RunLoop().RunUntilIdle();
continue;
}
EXPECT_GE(bytes_remaining, bytes_sent);
bytes_remaining -= bytes_sent;
}
producer_handle.reset();
// Stop deferring.
dispatcher_->SetDefersLoading(request_id_, false);
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(request_peer_context_.received_response);
// BodyBuffer hasn't finished writing to the new response body pipe.
EXPECT_FALSE(request_peer_context_.complete);
// Calling GetRequestPeerContextBody to read data from the new response body
// pipe will make BodyBuffer write the rest of the body to the pipe.
uint32_t read_size = 0;
while (read_size < body_size) {
read_size = GetRequestPeerContextBody(&request_peer_context_).size();
base::RunLoop().RunUntilIdle();
}
EXPECT_EQ(body_size, read_size);
EXPECT_TRUE(request_peer_context_.complete);
}
// As "transfer size update" message is handled specially in the implementation, // As "transfer size update" message is handled specially in the implementation,
// we have a separate test. // we have a separate test.
TEST_F(URLLoaderClientImplTest, DeferWithTransferSizeUpdated) { TEST_F(URLLoaderClientImplTest, DeferWithTransferSizeUpdated) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment