Commit ee3ef33f authored by Adam Rice's avatar Adam Rice Committed by Commit Bot

WebSocketStream: Use a hwm of 1 for the readable

Initially WebSocketStream used a hwm of 0 for the ReadableStream that is
returned from the `connected` promise, but that resulted in poor
throughput when the application wasn't continuously reading.

Change the hwm of 1 so that up to one message will still be transferred
to the render process even when it is not inside a read() call.
Backpressure will still be applied if more than 1 message is in transit.

BUG=983030

Change-Id: I883d9e62b8465d45bc9b383db8f639fbf791c78d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2054605Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Commit-Queue: Adam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/master@{#742029}
parent 372f9000
...@@ -424,12 +424,16 @@ void WebSocketChannelImpl::CancelHandshake() { ...@@ -424,12 +424,16 @@ void WebSocketChannelImpl::CancelHandshake() {
} }
void WebSocketChannelImpl::ApplyBackpressure() { void WebSocketChannelImpl::ApplyBackpressure() {
NETWORK_DVLOG(1) << this << " ApplyBackpressure";
backpressure_ = true; backpressure_ = true;
} }
void WebSocketChannelImpl::RemoveBackpressure() { void WebSocketChannelImpl::RemoveBackpressure() {
backpressure_ = false; NETWORK_DVLOG(1) << this << " RemoveBackpressure";
ConsumePendingDataFrames(); if (backpressure_) {
backpressure_ = false;
ConsumePendingDataFrames();
}
} }
void WebSocketChannelImpl::OnOpeningHandshakeStarted( void WebSocketChannelImpl::OnOpeningHandshakeStarted(
......
...@@ -485,7 +485,7 @@ void WebSocketStream::DidConnect(const String& subprotocol, ...@@ -485,7 +485,7 @@ void WebSocketStream::DidConnect(const String& subprotocol,
connection->setExtensions(extensions); connection->setExtensions(extensions);
source_ = MakeGarbageCollected<UnderlyingSource>(script_state_, this); source_ = MakeGarbageCollected<UnderlyingSource>(script_state_, this);
auto* readable = ReadableStream::CreateWithCountQueueingStrategy( auto* readable = ReadableStream::CreateWithCountQueueingStrategy(
script_state_, source_, 0); script_state_, source_, 1);
sink_ = MakeGarbageCollected<UnderlyingSink>(this); sink_ = MakeGarbageCollected<UnderlyingSink>(this);
auto* writable = auto* writable =
WritableStream::CreateWithCountQueueingStrategy(script_state_, sink_, 1); WritableStream::CreateWithCountQueueingStrategy(script_state_, sink_, 1);
......
...@@ -19,8 +19,15 @@ def web_socket_do_extra_handshake(request): ...@@ -19,8 +19,15 @@ def web_socket_do_extra_handshake(request):
def web_socket_transfer_data(request): def web_socket_transfer_data(request):
# Send empty message to fill the ReadableStream queue
request.ws_stream.send_message(b'', binary=True)
# TODO(ricea@chromium.org): Use time.perf_counter() when migration to python # TODO(ricea@chromium.org): Use time.perf_counter() when migration to python
# 3 is complete. time.time() can go backwards. # 3 is complete. time.time() can go backwards.
start_time = time.time() start_time = time.time()
# The large message that will be blocked by backpressure.
request.ws_stream.send_message(b' ' * MESSAGE_SIZE, binary=True) request.ws_stream.send_message(b' ' * MESSAGE_SIZE, binary=True)
# Report the time taken to send the large message.
request.ws_stream.send_message(six.text_type(time.time() - start_time), binary=False) request.ws_stream.send_message(six.text_type(time.time() - start_time), binary=False)
...@@ -19,6 +19,9 @@ promise_test(async t => { ...@@ -19,6 +19,9 @@ promise_test(async t => {
// Create backpressure for 2 seconds. // Create backpressure for 2 seconds.
await new Promise(resolve => t.step_timeout(resolve, 2000)); await new Promise(resolve => t.step_timeout(resolve, 2000));
// Skip the empty message used to fill the readable queue.
await reader.read();
// Skip the large message. // Skip the large message.
await reader.read(); await reader.read();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment