Commit 298d8e0c authored by Ben Kelly's avatar Ben Kelly Committed by Commit Bot

Fetch: Delay buffering Response data.

When a service worker executes code like `evt.respondWith(fetch(r))` we
should be able to directly pass the fetch's resulting mojo::DataPipe
through without any data copying.  This previously did not work,
however, since the BufferingBytesConsumer would immediately start
buffering the Response body data.

This CL fixes this issue by delaying the start of the buffering by a
short amount of time.  This gives the service worker time to drain the
pipe.

The delay is currently disabled by default behind the
"BufferingBytesConsumerDelay" feature.

Based on yhirano's draft CL at crrev.com/c/1383755.

Bug: 911036
Change-Id: I65675ce62a7ce593c8994b3e1258634840ba6c2d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1679669
Commit-Queue: Ben Kelly <wanderview@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarKinuko Yasuda <kinuko@chromium.org>
Reviewed-by: default avatarRobert Kaplow <rkaplow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#676053}
parent 8bdba37d
......@@ -14,8 +14,9 @@ function delayedUpload() {
function send_fetch() {
fetch("done.html")
.then(res => res.text())
.then(
function(response) {
function() {
if (navigate) {
window.top.location.href = "done.html";
} else {
......
......@@ -1345,6 +1345,28 @@
]
}
],
"BufferingBytesConsumerDelay": [
{
"platforms": [
"windows",
"mac",
"chromeos",
"linux",
"android"
],
"experiments": [
{
"name": "Enabled",
"params": {
"milliseconds": "50"
},
"enable_features": [
"BufferingBytesConsumerDelay"
]
}
]
}
],
"CertDualVerificationTrial": [
{
"platforms": [
......
......@@ -343,5 +343,12 @@ const base::Feature kBlinkHeapIncrementalMarkingStress{
const base::Feature kBlinkHeapUnifiedGCScheduling{
"BlinkHeapUnifiedGCScheduling", base::FEATURE_ENABLED_BY_DEFAULT};
// Enables a delay before BufferingBytesConsumer begins reading from its
// underlying consumer when instantiated with CreateWithDelay().
const base::Feature kBufferingBytesConsumerDelay{
"BufferingBytesConsumerDelay", base::FEATURE_DISABLED_BY_DEFAULT};
const base::FeatureParam<int> kBufferingBytesConsumerDelayMilliseconds{
&kBufferingBytesConsumerDelay, "milliseconds", 50};
} // namespace features
} // namespace blink
......@@ -6,6 +6,7 @@
#define THIRD_PARTY_BLINK_PUBLIC_COMMON_FEATURES_H_
#include "base/feature_list.h"
#include "base/metrics/field_trial_params.h"
#include "third_party/blink/public/common/common_export.h"
namespace blink {
......@@ -106,6 +107,10 @@ BLINK_COMMON_EXPORT extern const base::Feature
kBlinkHeapIncrementalMarkingStress;
BLINK_COMMON_EXPORT extern const base::Feature kBlinkHeapUnifiedGCScheduling;
BLINK_COMMON_EXPORT extern const base::Feature kBufferingBytesConsumerDelay;
BLINK_COMMON_EXPORT extern const base::FeatureParam<int>
kBufferingBytesConsumerDelayMilliseconds;
} // namespace features
} // namespace blink
......
......@@ -480,13 +480,15 @@ void FetchManager::Loader::DidStartLoadingResponseBody(BytesConsumer& body) {
if (fetch_request_data_->Integrity().IsEmpty() &&
!response_has_no_store_header_) {
// BufferingBytesConsumer reads chunks from |bytes_consumer| as soon as
// they get available to relieve backpressure.
// they get available to relieve backpressure. Buffering starts after
// a short delay, however, to allow the Response to be drained; e.g.
// when the Response is passed to FetchEvent.respondWith(), etc.
//
// https://fetch.spec.whatwg.org/#fetching
// The user agent should ignore the suspension request if the ongoing
// fetch is updating the response in the HTTP cache for the request.
place_holder_body_->Update(
MakeGarbageCollected<BufferingBytesConsumer>(&body));
place_holder_body_->Update(BufferingBytesConsumer::CreateWithDelay(
&body, GetExecutionContext()->GetTaskRunner(TaskType::kNetworking)));
} else {
place_holder_body_->Update(&body);
}
......
......@@ -4,22 +4,72 @@
#include "third_party/blink/renderer/platform/loader/fetch/buffering_bytes_consumer.h"
#include "base/feature_list.h"
#include "base/metrics/field_trial_params.h"
#include "third_party/blink/public/common/features.h"
#include "third_party/blink/renderer/platform/wtf/std_lib_extras.h"
namespace blink {
BufferingBytesConsumer::BufferingBytesConsumer(BytesConsumer* bytes_consumer)
: bytes_consumer_(bytes_consumer) {
// static
BufferingBytesConsumer* BufferingBytesConsumer::CreateWithDelay(
BytesConsumer* bytes_consumer,
scoped_refptr<base::SingleThreadTaskRunner> timer_task_runner) {
if (!base::FeatureList::IsEnabled(features::kBufferingBytesConsumerDelay))
return Create(bytes_consumer);
return MakeGarbageCollected<BufferingBytesConsumer>(
bytes_consumer, std::move(timer_task_runner),
base::TimeDelta::FromMilliseconds(
features::kBufferingBytesConsumerDelayMilliseconds.Get()));
}
// static
BufferingBytesConsumer* BufferingBytesConsumer::Create(
BytesConsumer* bytes_consumer) {
return MakeGarbageCollected<BufferingBytesConsumer>(bytes_consumer, nullptr,
base::TimeDelta());
}
BufferingBytesConsumer::BufferingBytesConsumer(
BytesConsumer* bytes_consumer,
scoped_refptr<base::SingleThreadTaskRunner> timer_task_runner,
base::TimeDelta buffering_start_delay)
: bytes_consumer_(bytes_consumer),
timer_(std::move(timer_task_runner),
this,
&BufferingBytesConsumer::OnTimerFired) {
bytes_consumer_->SetClient(this);
BufferData();
if (buffering_start_delay.is_zero()) {
MaybeStartBuffering();
return;
}
timer_.StartOneShot(buffering_start_delay, FROM_HERE);
}
BufferingBytesConsumer::~BufferingBytesConsumer() = default;
void BufferingBytesConsumer::MaybeStartBuffering() {
if (buffering_state_ != BufferingState::kDelayed)
return;
timer_.Stop();
buffering_state_ = BufferingState::kStarted;
BufferData();
}
void BufferingBytesConsumer::StopBuffering() {
timer_.Stop();
buffering_state_ = BufferingState::kStopped;
}
BytesConsumer::Result BufferingBytesConsumer::BeginRead(const char** buffer,
size_t* available) {
// Stop delaying buffering on the first read as it will no longer be safe to
// drain the underlying |bytes_consumer_| anyway.
MaybeStartBuffering();
if (buffer_.IsEmpty()) {
if (!is_buffering_)
if (buffering_state_ != BufferingState::kStarted)
return bytes_consumer_->BeginRead(buffer, available);
if (has_seen_error_)
......@@ -47,7 +97,7 @@ BytesConsumer::Result BufferingBytesConsumer::BeginRead(const char** buffer,
BytesConsumer::Result BufferingBytesConsumer::EndRead(size_t read_size) {
if (buffer_.IsEmpty()) {
if (!is_buffering_)
if (buffering_state_ != BufferingState::kStarted)
return bytes_consumer_->EndRead(read_size);
DCHECK(has_seen_error_);
......@@ -79,7 +129,7 @@ scoped_refptr<EncodedFormData> BufferingBytesConsumer::DrainAsFormData() {
}
mojo::ScopedDataPipeConsumerHandle BufferingBytesConsumer::DrainAsDataPipe() {
if (!is_buffering_)
if (buffering_state_ != BufferingState::kStarted)
return bytes_consumer_->DrainAsDataPipe();
// We intentionally return an empty handle here, because returning a DataPipe
......@@ -117,6 +167,10 @@ void BufferingBytesConsumer::Trace(Visitor* visitor) {
BytesConsumer::Client::Trace(visitor);
}
void BufferingBytesConsumer::OnTimerFired(TimerBase*) {
MaybeStartBuffering();
}
void BufferingBytesConsumer::OnStateChange() {
BytesConsumer::Client* client = client_;
BufferData();
......@@ -125,7 +179,7 @@ void BufferingBytesConsumer::OnStateChange() {
}
void BufferingBytesConsumer::BufferData() {
if (!is_buffering_)
if (buffering_state_ != BufferingState::kStarted)
return;
while (true) {
......
......@@ -11,6 +11,7 @@
#include "third_party/blink/renderer/platform/heap/handle.h"
#include "third_party/blink/renderer/platform/loader/fetch/bytes_consumer.h"
#include "third_party/blink/renderer/platform/platform_export.h"
#include "third_party/blink/renderer/platform/timer.h"
#include "third_party/blink/renderer/platform/wtf/deque.h"
#include "third_party/blink/renderer/platform/wtf/text/wtf_string.h"
#include "third_party/blink/renderer/platform/wtf/vector.h"
......@@ -30,17 +31,39 @@ class PLATFORM_EXPORT BufferingBytesConsumer final
USING_GARBAGE_COLLECTED_MIXIN(BufferingBytesConsumer);
public:
// Creates a BufferingBytesConsumer. |bytes_consumer| is the original
// BytesConsumer.
// |bytes_consumer| must not have a client.
explicit BufferingBytesConsumer(BytesConsumer* bytes_consumer);
// Creates a BufferingBytesConsumer that waits some delay before beginning
// to buffer data from the underlying consumer. This delay provides an
// opportunity for the data to be drained before buffering begins. The
// |bytes_consumer| is the original BytesConsumer. |bytes_consumer| must
// not have a client.
static BufferingBytesConsumer* CreateWithDelay(
BytesConsumer* bytes_consumer,
scoped_refptr<base::SingleThreadTaskRunner> timer_task_runner);
// Creates a BufferingBytesConsumer that buffers immediately without any
// delay. |bytes_consumer| is the original BytesConsumer. |bytes_consumer|
// must not have a client.
static BufferingBytesConsumer* Create(BytesConsumer* bytes_consumer);
// Use the Create*() factory methods instead of direct instantiation.
// TODO(crbug/954442): Use util::PassKey to prevent external callers.
BufferingBytesConsumer(
BytesConsumer* bytes_consumer,
scoped_refptr<base::SingleThreadTaskRunner> timer_task_runner,
base::TimeDelta buffering_start_delay);
~BufferingBytesConsumer() override;
// Attempt to start buffering data from the underlying consumer. This will
// only have an effect if we're currently in the kDelayed state. If
// buffering has already started or been explicitly stopped then this method
// has no effect.
void MaybeStartBuffering();
// After this function is called, |this| will not do buffering. Already
// buffered data still waits to be consumed, but after all the buffered data
// is consumed, BeginRead and EndRead will result in BeginRead and EndRead
// calls to the original BytesConsumer.
void StopBuffering() { is_buffering_ = false; }
void StopBuffering();
// BufferingBytesConsumer
Result BeginRead(const char** buffer, size_t* available) override;
......@@ -58,14 +81,24 @@ class PLATFORM_EXPORT BufferingBytesConsumer final
void Trace(blink::Visitor*) override;
private:
void OnTimerFired(TimerBase*);
// BufferingBytesConsumer::Client
void OnStateChange() override;
void BufferData();
const Member<BytesConsumer> bytes_consumer_;
TaskRunnerTimer<BufferingBytesConsumer> timer_;
Deque<Vector<char>> buffer_;
size_t offset_for_first_chunk_ = 0;
bool is_buffering_ = true;
enum class BufferingState {
kDelayed,
kStarted,
kStopped,
};
BufferingState buffering_state_ = BufferingState::kDelayed;
bool has_seen_end_of_data_ = false;
bool has_seen_error_ = false;
Member<BytesConsumer::Client> client_;
......
......@@ -268,8 +268,8 @@ void RawResource::ResponseBodyReceived(
if (!client && GetResourceRequest().UseStreamOnResponse()) {
// For preload, we want to store the body while dispatching
// onload and onerror events.
bytes_consumer_for_preload_ = MakeGarbageCollected<BufferingBytesConsumer>(
&body_loader.DrainAsBytesConsumer());
bytes_consumer_for_preload_ =
BufferingBytesConsumer::Create(&body_loader.DrainAsBytesConsumer());
return;
}
......
......@@ -34,7 +34,7 @@ let frame_content = "data:text/html;utf8,<body>" +
"};" +
"let observer = new PerformanceObserver(observe);" +
"observer.observe({ entryTypes: ['resource'] });" +
"fetch(url);" +
"fetch(url).then(r => r.text());" +
"</" + "script></body>";
document.getElementById("frameContext").src = frame_content;
</script>
......
......@@ -34,7 +34,7 @@ let frame_content = "data:text/html;utf8,<body>" +
"};" +
"let observer = new PerformanceObserver(observe);" +
"observer.observe({ entryTypes: ['resource'] });" +
"fetch(url);" +
"fetch(url).then(r => r.text());" +
"</" + "script></body>";
document.getElementById("frameContext").src = frame_content;
</script>
......
......@@ -12,7 +12,7 @@
<script>
let eventFired = false;
let loadRandomResource = () => {
return fetch(window.location.href + "?" + Math.random());
return fetch(window.location.href + "?" + Math.random()).then(r => r.text());
}
setup(() => {
......
......@@ -23,7 +23,7 @@
}
let observer = new PerformanceObserver(observe);
observer.observe({ entryTypes: ["resource"] });
fetch(url);
fetch(url).then(r => r.text());
</script>
</body>
</html>
......
......@@ -11,7 +11,7 @@
async function logResponse(url, encoding, quality, sizeOnly) {
testRunner.log(`\nResults for ${url} encoding=${encoding} q=${quality} sizeOnly=${sizeOnly}`);
session.evaluate(`fetch(${JSON.stringify(url)})`);
session.evaluate(`fetch(${JSON.stringify(url)}).then(r => r.text())`);
const requestId = (await dp.Network.onceResponseReceived()).params.requestId;
const result = (await dp.Audits.getEncodedResponse({requestId, encoding, quality, sizeOnly})).result;
......
......@@ -10,7 +10,7 @@
var pendingRequests = 0;
function sendRequest(url) {
dp.Runtime.evaluate({expression: `fetch('${url}')`});
dp.Runtime.evaluate({expression: `fetch('${url}').then(r => r.text())`});
pendingRequests++;
}
......
......@@ -5,7 +5,7 @@
await dp.Network.enable();
async function logResponseBody(url) {
session.evaluate(`fetch(${JSON.stringify(url)});`);
session.evaluate(`fetch(${JSON.stringify(url)}).then(r => r.text());`);
var requestWillBeSent = (await dp.Network.onceRequestWillBeSent()).params;
testRunner.log(`Request for ${requestWillBeSent.request.url}`);
......
......@@ -18,7 +18,7 @@
]});
testRunner.log('Request interception patterns sent.');
session.evaluate(`fetch('${testRunner.url('../resources/redirect1.php')}')`);
session.evaluate(`fetch('${testRunner.url('../resources/redirect1.php')}').then(r => r.text())`);
await waitForInterceptionEventAndContinue("/redirect1.php");
await waitForInterceptionEventAndContinue("/redirect1.php");
......
......@@ -65,7 +65,7 @@
await new Promise(resolve => {
session.protocol.Network.onResponseReceived(resolve);
session.evaluate(`
fetch('${testRunner.url('../resources/ping-redirect.php')}');
fetch('${testRunner.url('../resources/ping-redirect.php')}').then(r => r.text());
`);
});
......
......@@ -73,11 +73,11 @@
*/
async function testUrls() {
requestInterceptionWaitingMap.clear();
session.evaluate(`fetch('../network/resources/small-test-1.txt')`);
session.evaluate(`fetch('../network/resources/small-test-1.txt').then(r => r.text())`);
await new Promise(resolve => responseWasReceivedCallback = resolve);
session.evaluate(`fetch('../network/resources/small-test-2.txt')`);
session.evaluate(`fetch('../network/resources/small-test-2.txt').then(r => r.text())`);
await new Promise(resolve => responseWasReceivedCallback = resolve);
session.evaluate(`fetch('../resources/test-page.html')`);
session.evaluate(`fetch('../resources/test-page.html').then(r => r.text())`);
await new Promise(resolve => responseWasReceivedCallback = resolve);
testRunner.log('');
}
......
......@@ -4,7 +4,7 @@
await dp.Network.enable();
var url = testRunner.url('./resources/final.js');
session.evaluate(`fetch("${url}");`);
session.evaluate(`fetch("${url}").then(r => r.text());`);
var requestWillBeSent = (await dp.Network.onceRequestWillBeSent()).params;
testRunner.log(`Request for ${requestWillBeSent.request.url}`);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment