Commit 2ee87b95 authored by Adam Rice's avatar Adam Rice Committed by Commit Bot

Turn on backpressure for blink::WebSocketStream sooner

Apply backpressure to blink::WebSocketChannel before the WebSocketStream
handshake starts, rather than after it completes. This avoids a race
condition where more of the first message is read than should be.

Also fix flakiness in the
external/wpt/websockets/stream-tentative/backpressure-receive.any.js by
doubling the size of message used.

BUG=1002780

Change-Id: I915c71a4b81b95375b7c4c584f7782d5336cba85
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1844911Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Commit-Queue: Adam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/master@{#703610}
parent 24617a01
...@@ -468,8 +468,6 @@ void WebSocketStream::DidConnect(const String& subprotocol, ...@@ -468,8 +468,6 @@ void WebSocketStream::DidConnect(const String& subprotocol,
return; return;
common_.SetState(WebSocketCommon::kOpen); common_.SetState(WebSocketCommon::kOpen);
was_ever_connected_ = true; was_ever_connected_ = true;
// Don't read all of a huge initial message before read() has been called.
channel_->ApplyBackpressure();
auto* connection = MakeGarbageCollected<WebSocketConnection>(); auto* connection = MakeGarbageCollected<WebSocketConnection>();
connection->setProtocol(subprotocol); connection->setProtocol(subprotocol);
connection->setExtensions(extensions); connection->setExtensions(extensions);
...@@ -599,6 +597,9 @@ void WebSocketStream::Connect(ScriptState* script_state, ...@@ -599,6 +597,9 @@ void WebSocketStream::Connect(ScriptState* script_state,
DVLOG(1) << "WebSocketStream " << this << " Connect() url=" << url DVLOG(1) << "WebSocketStream " << this << " Connect() url=" << url
<< " options=" << options; << " options=" << options;
// Don't read all of a huge initial message before read() has been called.
channel_->ApplyBackpressure();
auto* signal = options->signal(); auto* signal = options->signal();
if (signal && signal->aborted()) { if (signal && signal->aborted()) {
auto exception = V8ThrowDOMException::CreateOrEmpty( auto exception = V8ThrowDOMException::CreateOrEmpty(
......
...@@ -63,6 +63,9 @@ class WebSocketStreamTest : public ::testing::Test { ...@@ -63,6 +63,9 @@ class WebSocketStreamTest : public ::testing::Test {
TEST_F(WebSocketStreamTest, ConstructWithBadURL) { TEST_F(WebSocketStreamTest, ConstructWithBadURL) {
V8TestingScope scope; V8TestingScope scope;
auto& exception_state = scope.GetExceptionState(); auto& exception_state = scope.GetExceptionState();
EXPECT_CALL(Channel(), ApplyBackpressure());
auto* stream = Create(scope.GetScriptState(), "bad-scheme:", exception_state); auto* stream = Create(scope.GetScriptState(), "bad-scheme:", exception_state);
EXPECT_FALSE(stream); EXPECT_FALSE(stream);
...@@ -72,7 +75,7 @@ TEST_F(WebSocketStreamTest, ConstructWithBadURL) { ...@@ -72,7 +75,7 @@ TEST_F(WebSocketStreamTest, ConstructWithBadURL) {
EXPECT_EQ( EXPECT_EQ(
"The URL's scheme must be either 'ws' or 'wss'. 'bad-scheme' is not " "The URL's scheme must be either 'ws' or 'wss'. 'bad-scheme' is not "
"allowed.", "allowed.",
scope.GetExceptionState().Message()); exception_state.Message());
} }
// Most coverage for bad constructor arguments is provided by // Most coverage for bad constructor arguments is provided by
...@@ -84,6 +87,7 @@ TEST_F(WebSocketStreamTest, Connect) { ...@@ -84,6 +87,7 @@ TEST_F(WebSocketStreamTest, Connect) {
{ {
InSequence s; InSequence s;
EXPECT_CALL(Channel(), ApplyBackpressure());
EXPECT_CALL(Channel(), Connect(KURL("ws://example.com/hoge"), String())) EXPECT_CALL(Channel(), Connect(KURL("ws://example.com/hoge"), String()))
.WillOnce(Return(true)); .WillOnce(Return(true));
} }
...@@ -100,6 +104,7 @@ TEST_F(WebSocketStreamTest, ConnectWithProtocols) { ...@@ -100,6 +104,7 @@ TEST_F(WebSocketStreamTest, ConnectWithProtocols) {
{ {
InSequence s; InSequence s;
EXPECT_CALL(Channel(), ApplyBackpressure());
EXPECT_CALL(Channel(), EXPECT_CALL(Channel(),
Connect(KURL("ws://example.com/chat"), String("chat0, chat1"))) Connect(KURL("ws://example.com/chat"), String("chat0, chat1")))
.WillOnce(Return(true)); .WillOnce(Return(true));
...@@ -119,6 +124,7 @@ TEST_F(WebSocketStreamTest, ConnectWithFailedHandshake) { ...@@ -119,6 +124,7 @@ TEST_F(WebSocketStreamTest, ConnectWithFailedHandshake) {
{ {
InSequence s; InSequence s;
EXPECT_CALL(Channel(), ApplyBackpressure());
EXPECT_CALL(Channel(), Connect(KURL("ws://example.com/chat"), String())) EXPECT_CALL(Channel(), Connect(KURL("ws://example.com/chat"), String()))
.WillOnce(Return(true)); .WillOnce(Return(true));
EXPECT_CALL(Channel(), Disconnect()); EXPECT_CALL(Channel(), Disconnect());
...@@ -143,10 +149,10 @@ TEST_F(WebSocketStreamTest, ConnectWithSuccessfulHandshake) { ...@@ -143,10 +149,10 @@ TEST_F(WebSocketStreamTest, ConnectWithSuccessfulHandshake) {
{ {
InSequence s; InSequence s;
EXPECT_CALL(Channel(), ApplyBackpressure());
EXPECT_CALL(Channel(), EXPECT_CALL(Channel(),
Connect(KURL("ws://example.com/chat"), String("chat"))) Connect(KURL("ws://example.com/chat"), String("chat")))
.WillOnce(Return(true)); .WillOnce(Return(true));
EXPECT_CALL(Channel(), ApplyBackpressure());
EXPECT_CALL(checkpoint, Call(1)); EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(Channel(), Close(1001, String())); EXPECT_CALL(Channel(), Close(1001, String()));
} }
...@@ -172,9 +178,9 @@ TEST_F(WebSocketStreamTest, ConnectThenCloseCleanly) { ...@@ -172,9 +178,9 @@ TEST_F(WebSocketStreamTest, ConnectThenCloseCleanly) {
{ {
InSequence s; InSequence s;
EXPECT_CALL(Channel(), ApplyBackpressure());
EXPECT_CALL(Channel(), Connect(KURL("ws://example.com/echo"), String())) EXPECT_CALL(Channel(), Connect(KURL("ws://example.com/echo"), String()))
.WillOnce(Return(true)); .WillOnce(Return(true));
EXPECT_CALL(Channel(), ApplyBackpressure());
EXPECT_CALL(Channel(), Close(-1, String(""))); EXPECT_CALL(Channel(), Close(-1, String("")));
EXPECT_CALL(Channel(), Disconnect()); EXPECT_CALL(Channel(), Disconnect());
} }
...@@ -195,6 +201,7 @@ TEST_F(WebSocketStreamTest, CloseDuringHandshake) { ...@@ -195,6 +201,7 @@ TEST_F(WebSocketStreamTest, CloseDuringHandshake) {
{ {
InSequence s; InSequence s;
EXPECT_CALL(Channel(), ApplyBackpressure());
EXPECT_CALL(Channel(), Connect(KURL("ws://example.com/echo"), String())) EXPECT_CALL(Channel(), Connect(KURL("ws://example.com/echo"), String()))
.WillOnce(Return(true)); .WillOnce(Return(true));
EXPECT_CALL( EXPECT_CALL(
......
...@@ -2074,11 +2074,6 @@ crbug.com/803276 [ Win ] inspector-protocol/memory/sampling-native-snapshot.js [ ...@@ -2074,11 +2074,6 @@ crbug.com/803276 [ Win ] inspector-protocol/memory/sampling-native-snapshot.js [
# Run these tests with under virtual/scalefactor... only. # Run these tests with under virtual/scalefactor... only.
crbug.com/567837 fast/hidpi/static [ Skip ] crbug.com/567837 fast/hidpi/static [ Skip ]
crbug.com/1002780 [ Linux ] external/wpt/websockets/stream-tentative/backpressure-receive.any.html [ Failure Pass ]
crbug.com/1002780 [ Linux ] external/wpt/websockets/stream-tentative/backpressure-receive.any.serviceworker.html [ Failure Pass ]
crbug.com/1002780 [ Linux ] external/wpt/websockets/stream-tentative/backpressure-receive.any.sharedworker.html [ Failure Pass ]
crbug.com/1002780 [ Linux ] external/wpt/websockets/stream-tentative/backpressure-receive.any.worker.html [ Failure Pass ]
# For win10, see crbug.com/955109 # For win10, see crbug.com/955109
crbug.com/538697 [ Win Win10 ] virtual/threaded/printing/webgl-oversized-printing.html [ Failure Crash ] crbug.com/538697 [ Win Win10 ] virtual/threaded/printing/webgl-oversized-printing.html [ Failure Crash ]
crbug.com/538697 [ Win ] printing/webgl-oversized-printing.html [ Failure Crash ] crbug.com/538697 [ Win ] printing/webgl-oversized-printing.html [ Failure Crash ]
......
...@@ -2,13 +2,18 @@ ...@@ -2,13 +2,18 @@
import time import time
# The amount of buffering a WebSocket connection has is not standardised, but # The amount of internal buffering a WebSocket connection has is not
# it's reasonable to expect that it will not be as large as 8MB. # standardised, and varies depending upon the OS. Setting this number too small
MESSAGE_SIZE = 8 * 1024 * 1024 # will result in false negatives, as the entire message gets buffered. Setting
# this number too large will result in false positives, when it takes more than
# 2 seconds to transmit the message anyway. This number was arrived at by
# trial-and-error.
MESSAGE_SIZE = 16 * 1024 * 1024
def web_socket_do_extra_handshake(request): def web_socket_do_extra_handshake(request):
# Turn off permessage-deflate, otherwise it shrinks our 8MB buffer to 8KB. # Turn off permessage-deflate, otherwise it shrinks our big message to a
# tiny message.
request.ws_extension_processors = [] request.ws_extension_processors = []
......
...@@ -3,11 +3,14 @@ ...@@ -3,11 +3,14 @@
// META: global=window,worker // META: global=window,worker
// META: timeout=long // META: timeout=long
// This test works by using a server WebSocket handler which sends an 8MB // Allow for this much timer jitter.
const JITTER_ALLOWANCE_MS = 200;
// This test works by using a server WebSocket handler which sends a large
// message, and then sends a second message with the time it measured the first // message, and then sends a second message with the time it measured the first
// message taking. On the browser side, we wait 2 seconds before reading from // message taking. On the browser side, we wait 2 seconds before reading from
// the socket. This should ensure it takes at least 2 seconds to finish sending // the socket. This should ensure it takes at least 2 seconds to finish sending
// the 8MB message. // the large message.
promise_test(async t => { promise_test(async t => {
const wss = new WebSocketStream(`${BASEURL}/send-backpressure`); const wss = new WebSocketStream(`${BASEURL}/send-backpressure`);
const { readable } = await wss.connection; const { readable } = await wss.connection;
...@@ -16,7 +19,7 @@ promise_test(async t => { ...@@ -16,7 +19,7 @@ promise_test(async t => {
// Create backpressure for 2 seconds. // Create backpressure for 2 seconds.
await new Promise(resolve => t.step_timeout(resolve, 2000)); await new Promise(resolve => t.step_timeout(resolve, 2000));
// Skip the 8MB message. // Skip the large message.
await reader.read(); await reader.read();
// Read the time it took. // Read the time it took.
...@@ -24,6 +27,6 @@ promise_test(async t => { ...@@ -24,6 +27,6 @@ promise_test(async t => {
// A browser can pass this test simply by being slow. This may be a source of // A browser can pass this test simply by being slow. This may be a source of
// flakiness for browsers that do not implement backpressure properly. // flakiness for browsers that do not implement backpressure properly.
assert_greater_than_equal(Number(value), 2, assert_greater_than_equal(Number(value), 2 - JITTER_ALLOWANCE_MS / 1000,
'data send should have taken at least 2 seconds'); 'data send should have taken at least 2 seconds');
}, 'backpressure should be applied to received messages'); }, 'backpressure should be applied to received messages');
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment