Commit 2b9d4ce1 authored by Adam Rice's avatar Adam Rice Committed by Commit Bot

[WebSocket] Reassemble small messages

The quota-based WebSocket implementation would never fragment outgoing
messages of 64KB or less as long as quota had time to be updated.

Emulate this behaviour with the new datapipe-based implementation, to
avoid the risk of breaking applications dependent on the old behaviour.

The behaviour is controlled by the feature flag
"WebSocketReassembleShortMessages" which defaults to enabled. This can
be disabled for experiments to determine if the feature is really
needed.

No extra bytes are copied and no extra allocations are done. The main
overhead of the feature is in extra code complexity.

Add a web test, expect-unfragmented.html. This has been tested against
the old quota-based implementation (passes) and with
--disable-feature=WebSocketReassembleShortMessages (fails).

BUG=1086273

Change-Id: I958dd5d99931e3023ee339481f101ffa56ed274e
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2215383
Commit-Queue: Adam Rice <ricea@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Cr-Commit-Position: refs/heads/master@{#772575}
parent 59866be7
...@@ -234,5 +234,8 @@ bool ShouldEnableOutOfBlinkCorsForTesting() { ...@@ -234,5 +234,8 @@ bool ShouldEnableOutOfBlinkCorsForTesting() {
return base::FeatureList::IsEnabled(features::kOutOfBlinkCors); return base::FeatureList::IsEnabled(features::kOutOfBlinkCors);
} }
const base::Feature kWebSocketReassembleShortMessages{
"WebSocketReassembleShortMessages", base::FEATURE_ENABLED_BY_DEFAULT};
} // namespace features } // namespace features
} // namespace network } // namespace network
...@@ -90,6 +90,9 @@ extern const base::FeatureParam<TrustTokenOriginTrialSpec> ...@@ -90,6 +90,9 @@ extern const base::FeatureParam<TrustTokenOriginTrialSpec>
COMPONENT_EXPORT(NETWORK_CPP) COMPONENT_EXPORT(NETWORK_CPP)
bool ShouldEnableOutOfBlinkCorsForTesting(); bool ShouldEnableOutOfBlinkCorsForTesting();
COMPONENT_EXPORT(NETWORK_CPP)
extern const base::Feature kWebSocketReassembleShortMessages;
} // namespace features } // namespace features
} // namespace network } // namespace network
......
...@@ -11,9 +11,11 @@ ...@@ -11,9 +11,11 @@
#include "base/bind.h" #include "base/bind.h"
#include "base/bind_helpers.h" #include "base/bind_helpers.h"
#include "base/feature_list.h"
#include "base/location.h" #include "base/location.h"
#include "base/logging.h" #include "base/logging.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/numerics/safe_conversions.h"
#include "base/single_thread_task_runner.h" #include "base/single_thread_task_runner.h"
#include "base/strings/strcat.h" #include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h" #include "base/strings/string_number_conversions.h"
...@@ -36,11 +38,16 @@ ...@@ -36,11 +38,16 @@
#include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode #include "net/websockets/websocket_frame.h" // for WebSocketFrameHeader::OpCode
#include "net/websockets/websocket_handshake_request_info.h" #include "net/websockets/websocket_handshake_request_info.h"
#include "net/websockets/websocket_handshake_response_info.h" #include "net/websockets/websocket_handshake_response_info.h"
#include "services/network/public/cpp/features.h"
#include "services/network/websocket_factory.h" #include "services/network/websocket_factory.h"
namespace network { namespace network {
namespace { namespace {
// What is considered a "small message" for the purposes of small message
// reassembly.
constexpr uint64_t kSmallMessageThreshhold = 1 << 16;
// Convert a mojom::WebSocketMessageType to a // Convert a mojom::WebSocketMessageType to a
// net::WebSocketFrameHeader::OpCode // net::WebSocketFrameHeader::OpCode
net::WebSocketFrameHeader::OpCode MessageTypeToOpCode( net::WebSocketFrameHeader::OpCode MessageTypeToOpCode(
...@@ -400,7 +407,9 @@ WebSocket::WebSocket( ...@@ -400,7 +407,9 @@ WebSocket::WebSocket(
readable_watcher_(FROM_HERE, readable_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL, mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::ThreadTaskRunnerHandle::Get()), base::ThreadTaskRunnerHandle::Get()),
data_pipe_use_tracker_(std::move(data_pipe_use_tracker)) { data_pipe_use_tracker_(std::move(data_pipe_use_tracker)),
reassemble_short_messages_(base::FeatureList::IsEnabled(
network::features::kWebSocketReassembleShortMessages)) {
DCHECK(handshake_client_); DCHECK(handshake_client_);
// If |require_network_isolation_key| is set on the URLRequestContext, // If |require_network_isolation_key| is set on the URLRequestContext,
// |isolation_info| must not be empty. // |isolation_info| must not be empty.
...@@ -459,7 +468,10 @@ void WebSocket::SendMessage(mojom::WebSocketMessageType type, ...@@ -459,7 +468,10 @@ void WebSocket::SendMessage(mojom::WebSocketMessageType type,
} }
DCHECK(IsKnownEnumValue(type)); DCHECK(IsKnownEnumValue(type));
pending_send_data_frames_.push(DataFrame(type, data_length)); const bool do_not_fragment =
reassemble_short_messages_ && data_length <= kSmallMessageThreshhold;
pending_send_data_frames_.push(DataFrame(type, data_length, do_not_fragment));
// Safe if ReadAndSendFromDataPipe() deletes |this| because this method is // Safe if ReadAndSendFromDataPipe() deletes |this| because this method is
// only called from mojo. // only called from mojo.
...@@ -696,6 +708,43 @@ void WebSocket::ReadAndSendFromDataPipe() { ...@@ -696,6 +708,43 @@ void WebSocket::ReadAndSendFromDataPipe() {
} }
DCHECK_EQ(begin_result, MOJO_RESULT_OK); DCHECK_EQ(begin_result, MOJO_RESULT_OK);
if (readable_size < data_frame.data_length && data_frame.do_not_fragment &&
!message_under_reassembly_) {
// The cast is needed to unambiguously select a constructor on 32-bit
// platforms.
message_under_reassembly_ = base::MakeRefCounted<net::IOBuffer>(
base::checked_cast<size_t>(data_frame.data_length));
DCHECK_EQ(bytes_reassembled_, 0u);
}
if (message_under_reassembly_) {
const size_t bytes_to_copy =
std::min(static_cast<uint64_t>(readable_size),
data_frame.data_length - bytes_reassembled_);
memcpy(message_under_reassembly_->data() + bytes_reassembled_, buffer,
bytes_to_copy);
bytes_reassembled_ += bytes_to_copy;
const MojoResult end_result = readable_->EndReadData(bytes_to_copy);
DCHECK_EQ(end_result, MOJO_RESULT_OK);
DCHECK_LE(bytes_reassembled_, data_frame.data_length);
if (bytes_reassembled_ == data_frame.data_length) {
bytes_reassembled_ = 0;
blocked_on_websocket_channel_ = true;
if (channel_->SendFrame(
/*fin=*/true, MessageTypeToOpCode(data_frame.type),
std::move(message_under_reassembly_), data_frame.data_length) ==
net::WebSocketChannel::CHANNEL_DELETED) {
// |this| has been deleted.
return;
}
pending_send_data_frames_.pop();
}
continue;
}
const size_t size_to_send = const size_t size_to_send =
std::min(static_cast<uint64_t>(readable_size), data_frame.data_length); std::min(static_cast<uint64_t>(readable_size), data_frame.data_length);
auto data_to_pass = base::MakeRefCounted<net::IOBuffer>(size_to_send); auto data_to_pass = base::MakeRefCounted<net::IOBuffer>(size_to_send);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "base/containers/queue.h" #include "base/containers/queue.h"
#include "base/containers/span.h" #include "base/containers/span.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "base/optional.h" #include "base/optional.h"
#include "base/time/time.h" #include "base/time/time.h"
...@@ -35,9 +36,10 @@ class Location; ...@@ -35,9 +36,10 @@ class Location;
} // namespace base } // namespace base
namespace net { namespace net {
class IOBuffer;
class IsolationInfo; class IsolationInfo;
class SiteForCookies;
class SSLInfo; class SSLInfo;
class SiteForCookies;
class WebSocketChannel; class WebSocketChannel;
} // namespace net } // namespace net
...@@ -119,10 +121,15 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket { ...@@ -119,10 +121,15 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
}; };
struct DataFrame final { struct DataFrame final {
DataFrame(mojom::WebSocketMessageType type, uint64_t data_length) DataFrame(mojom::WebSocketMessageType type,
: type(type), data_length(data_length) {} uint64_t data_length,
bool do_not_fragment)
: type(type),
data_length(data_length),
do_not_fragment(do_not_fragment) {}
mojom::WebSocketMessageType type; mojom::WebSocketMessageType type;
uint64_t data_length; uint64_t data_length;
const bool do_not_fragment;
}; };
void OnConnectionError(const base::Location& set_from); void OnConnectionError(const base::Location& set_from);
...@@ -214,6 +221,21 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket { ...@@ -214,6 +221,21 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
DataPipeUseTracker data_pipe_use_tracker_; DataPipeUseTracker data_pipe_use_tracker_;
// True if we should preserve the old behaviour where <=64KB messages were
// never fragmented.
// TODO(ricea): Remove the flag once we know whether we really need this or
// not. See https://crbug.com/1086273.
const bool reassemble_short_messages_;
// Temporary buffer for storage of short messages that have been fragmented by
// the data pipe. Only messages that are actually fragmented are copied into
// here.
scoped_refptr<net::IOBuffer> message_under_reassembly_;
// Number of bytes that have been written to |message_under_reassembly_| so
// far.
size_t bytes_reassembled_ = 0;
base::WeakPtrFactory<WebSocket> weak_ptr_factory_{this}; base::WeakPtrFactory<WebSocket> weak_ptr_factory_{this};
DISALLOW_COPY_AND_ASSIGN(WebSocket); DISALLOW_COPY_AND_ASSIGN(WebSocket);
......
<!doctype html>
<html>
<script src = "/resources/testharness.js"></script>
<script src = "/resources/testharnessreport.js"></script>
<script>
'use strict';
// The old quota-based flow control system would never fragment messages less
// than 64KB as long as there was sufficient time between messages to refresh
// quota. Some servers may have come to rely on this behaviour. The new
// datapipe-based flow control system has code to emulate the behaviour.
// This test verifies that small messages are reassembled correctly.
// This behaviour is Chromium-specific.
// TODO(ricea): If we decide this behaviour is not needed we should remove
// this test. See https://crbug.com/1086273.
// Prime number under 65536. Messages of 65536 bytes or less should not be
// fragmented. Prime so that it is extremely unlikely to fit exactly into
// any data pipe size that might be used.
const MESSAGE_SIZE = 65521;
// 32 * 65521 = 2096672 is probably larger than any data pipe that would be
// used. We can't go much bigger without making the test too slow.
const NUMBER_OF_MESSAGES = 32;
async_test(t => {
const ws = new WebSocket('ws://127.0.0.1:8880/expect-unfragmented');
let finished = false;
let sent_messages = 0;
const message = new ArrayBuffer(MESSAGE_SIZE);
ws.onopen = () => {
// We wait for acknowledgement of each message before sending the next one.
// This ensures that the quota would have been updated with the old
// quota-based flow control.
ws.send(message);
sent_messages = 1;
};
ws.onerror = t.unreached_func('onerror should not be fired');
ws.onmessage = t.step_func(evt => {
if (sent_messages < NUMBER_OF_MESSAGES) {
assert_equals(evt.data,
`OK: message ${sent_messages - 1} not fragmented`);
ws.send(message);
++sent_messages;
return;
}
assert_equals(evt.data, 'OK: message 31 not fragmented');
finished = true;
});
ws.onclose = t.step_func_done(() => {
assert_true(finished, 'Test must finish');
});
}, 'Small messages should not be fragmented');
</script>
</html>
# Read 32 messages and verify that they are not fragmented.
# This can be removed if the "reassemble small messages" feature is removed. See
# https://crbug.com/1086273.
from mod_pywebsocket import common
from mod_pywebsocket import msgutil
NUMBER_OF_MESSAGES = 32
def web_socket_do_extra_handshake(request):
# Disable permessage-deflate because it may reassemble messages.
request.ws_extension_processors = []
def web_socket_transfer_data(request):
for i in range(NUMBER_OF_MESSAGES):
# We need to use an internal function to verify that the frame has the
# "final" flag set.
opcode, recv_payload, final, reserved1, reserved2, reserved3 = \
request.ws_stream._receive_frame()
# We assume that the browser will not send any control messages.
if opcode != common.OPCODE_BINARY:
msgutil.send_message(request, 'FAIL: message %r was not opcode binary' % i)
return
if not final:
msgutil.send_message(request, 'FAIL: message %r was fragmented' % i)
return
msgutil.send_message(request, 'OK: message %r not fragmented' % i)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment