Commit d0095703 authored by Adam Rice's avatar Adam Rice Committed by Commit Bot

[WebSocket] Update renderer to use mojo writable datapipe

This commit updates the renderer to use mojo datapipe to transfer
WebSocket mesasge to the network service, which later sends out the
message with appropriate framing. It adds new ProducePendingData() and
ProduceData() functions to write pending messages to the datapipe. It
also adds mojo SimpleWatcher and OnWritable callback function in case
when the data pipe is unavailable at the moment.

This also updates the unittests to stop testing the quota system,
and start testing the new datapipe transfer.

This commit is a follow-up CL from the following CL:
https://chromium-review.googlesource.com/c/chromium/src/+/2071189

Design Doc:
https://docs.google.com/document/d/1YWj1z9r8wxemGdod6S2tkchudhp6PvNaH3qSO0oucfY/

This is a rebase of
https://chromium-review.googlesource.com/c/chromium/src/+/2089564,
originally by Keita Suzuki.

Bug: 1056030
Change-Id: Ic57abc92b95dad0765c55945439ced5adaf227b9
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2206556Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Commit-Queue: Adam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/master@{#771697}
parent 41d5f7c6
......@@ -44,7 +44,6 @@ namespace {
using base::StreamingUtf8Validator;
const int kDefaultSendQuotaLowWaterMark = 1 << 16;
const int kDefaultSendQuotaHighWaterMark = 1 << 17;
const size_t kWebSocketCloseCodeLength = 2;
// Timeout for waiting for the server to acknowledge a closing handshake.
......@@ -286,7 +285,6 @@ WebSocketChannel::WebSocketChannel(
URLRequestContext* url_request_context)
: event_interface_(std::move(event_interface)),
url_request_context_(url_request_context),
send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
current_send_quota_(0),
closing_handshake_timeout_(
......@@ -354,12 +352,6 @@ WebSocketChannel::ChannelState WebSocketChannel::SendFrame(
}
DCHECK_EQ(state_, CONNECTED);
if (buffer_size > base::checked_cast<size_t>(current_send_quota_)) {
// TODO(ricea): Kill renderer.
FailChannel("Send quota exceeded", kWebSocketErrorGoingAway, "");
return CHANNEL_DELETED;
// |this| has been deleted.
}
DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(op_code))
<< "Got SendFrame with bogus op_code " << op_code << " fin=" << fin
......@@ -381,7 +373,6 @@ WebSocketChannel::ChannelState WebSocketChannel::SendFrame(
sending_text_message_ = !fin;
DCHECK(!fin || state == StreamingUtf8Validator::VALID_ENDPOINT);
}
current_send_quota_ -= buffer_size;
// TODO(ricea): If current_send_quota_ has dropped below
// send_quota_low_water_mark_, it might be good to increase the "low
// water mark" and "high water mark", but only if the link to the WebSocket
......@@ -599,21 +590,7 @@ ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
return WriteFrames();
} else {
data_being_sent_.reset();
if (current_send_quota_ < send_quota_low_water_mark_) {
// TODO(ricea): Increase low_water_mark and high_water_mark if
// throughput is high, reduce them if throughput is low. Low water
// mark needs to be >= the bandwidth delay product *of the IPC
// channel*. Because factors like context-switch time, thread wake-up
// time, and bus speed come into play it is complex and probably needs
// to be determined empirically.
DCHECK_LE(send_quota_low_water_mark_, send_quota_high_water_mark_);
// TODO(ricea): Truncate quota by the quota specified by the remote
// server, if the protocol in use supports quota.
int fresh_quota = send_quota_high_water_mark_ - current_send_quota_;
current_send_quota_ += fresh_quota;
event_interface_->OnSendFlowControlQuotaAdded(fresh_quota);
return CHANNEL_ALIVE;
}
event_interface_->OnSendDataFrameDone();
}
return CHANNEL_ALIVE;
......
......@@ -359,10 +359,6 @@ class NET_EXPORT WebSocketChannel {
// during the connection process.
std::unique_ptr<WebSocketStreamRequest> stream_request_;
// If the renderer's send quota reaches this level, it is sent a quota
// refresh. "quota units" are currently bytes. TODO(ricea): Update the
// definition of quota units when necessary.
int send_quota_low_water_mark_;
// The level the quota is refreshed to when it reaches the low_water_mark
// (quota units).
int send_quota_high_water_mark_;
......
......@@ -135,10 +135,6 @@ const size_t kBinaryBlobSize = base::size(kBinaryBlob);
// TODO(ricea): If kDefaultSendQuotaHighWaterMark changes, then this value will
// need to be updated.
const size_t kDefaultInitialQuota = 1 << 17;
// The amount of bytes we need to send after the initial connection to trigger a
// quota refresh. TODO(ricea): Change this if kDefaultSendQuotaHighWaterMark or
// kDefaultSendQuotaLowWaterMark change.
const size_t kDefaultQuotaRefreshTrigger = (1 << 16) + 1;
const int kVeryBigTimeoutMillis = 60 * 60 * 24 * 1000;
......@@ -177,7 +173,7 @@ class MockWebSocketEventInterface : public WebSocketEventInterface {
WebSocketMessageType,
const std::vector<char>&)); // NOLINT
MOCK_METHOD0(HasPendingDataFrames, bool(void)); // NOLINT
MOCK_METHOD1(OnSendFlowControlQuotaAdded, void(int64_t)); // NOLINT
MOCK_METHOD0(OnSendDataFrameDone, void(void)); // NOLINT
MOCK_METHOD0(OnClosingHandshake, void(void)); // NOLINT
MOCK_METHOD1(OnFailChannel, void(const std::string&)); // NOLINT
MOCK_METHOD3(OnDropChannel,
......@@ -230,8 +226,8 @@ class FakeWebSocketEventInterface : public WebSocketEventInterface {
void OnDataFrame(bool fin,
WebSocketMessageType type,
base::span<const char> data_span) override {}
void OnSendDataFrameDone() override {}
bool HasPendingDataFrames() override { return false; }
void OnSendFlowControlQuotaAdded(int64_t quota) override {}
void OnClosingHandshake() override {}
void OnFailChannel(const std::string& message) override {}
void OnDropChannel(bool was_clean,
......@@ -935,6 +931,7 @@ class WebSocketChannelStreamTest : public WebSocketChannelEventInterfaceTest {
EXPECT_CALL(*event_interface_, OnDataFrameVector(_, _, _))
.Times(AnyNumber());
EXPECT_CALL(*event_interface_, OnClosingHandshake()).Times(AnyNumber());
EXPECT_CALL(*event_interface_, OnSendDataFrameDone()).Times(AnyNumber());
EXPECT_CALL(*event_interface_, OnFailChannel(_)).Times(AnyNumber());
EXPECT_CALL(*event_interface_, OnDropChannel(_, _, _)).Times(AnyNumber());
}
......@@ -963,6 +960,7 @@ class WebSocketChannelSendUtf8Test
// whether these methods are called or not.
EXPECT_CALL(*event_interface_, OnAddChannelResponse(_, _, _, _))
.Times(AnyNumber());
EXPECT_CALL(*event_interface_, OnSendDataFrameDone()).Times(AnyNumber());
}
};
......@@ -1411,98 +1409,6 @@ TEST_F(WebSocketChannelEventInterfaceTest, FrameAfterInvalidFrame) {
base::RunLoop().RunUntilIdle();
}
// If the renderer sends lots of small writes, we don't want to update the quota
// for each one.
TEST_F(WebSocketChannelEventInterfaceTest, SmallWriteDoesntUpdateQuota) {
set_stream(std::make_unique<WriteableFakeWebSocketStream>());
{
InSequence s;
EXPECT_CALL(*event_interface_, OnAddChannelResponse(_, _, _, _));
}
CreateChannelAndConnectSuccessfully();
EXPECT_EQ(channel_->SendFrame(true, WebSocketFrameHeader::kOpCodeText,
AsIOBuffer("B"), 1U),
WebSocketChannel::CHANNEL_ALIVE);
}
// If we send enough to go below |send_quota_low_water_mark_| we should get our
// quota refreshed.
TEST_F(WebSocketChannelEventInterfaceTest, LargeWriteUpdatesQuota) {
set_stream(std::make_unique<WriteableFakeWebSocketStream>());
// We use this checkpoint object to verify that the quota update comes after
// the write.
Checkpoint checkpoint;
{
InSequence s;
EXPECT_CALL(*event_interface_, OnAddChannelResponse(_, _, _, _));
EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(*event_interface_, OnSendFlowControlQuotaAdded(_));
EXPECT_CALL(checkpoint, Call(2));
}
CreateChannelAndConnectSuccessfully();
checkpoint.Call(1);
EXPECT_EQ(
channel_->SendFrame(true, WebSocketFrameHeader::kOpCodeText,
AsIOBuffer(std::string(kDefaultInitialQuota, 'B')),
kDefaultInitialQuota),
WebSocketChannel::CHANNEL_ALIVE);
checkpoint.Call(2);
}
// Verify that our quota actually is refreshed when we are told it is.
TEST_F(WebSocketChannelEventInterfaceTest, QuotaReallyIsRefreshed) {
set_stream(std::make_unique<WriteableFakeWebSocketStream>());
Checkpoint checkpoint;
{
InSequence s;
EXPECT_CALL(*event_interface_, OnAddChannelResponse(_, _, _, _));
EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(*event_interface_, OnSendFlowControlQuotaAdded(_));
EXPECT_CALL(checkpoint, Call(2));
// If quota was not really refreshed, we would get an OnDropChannel()
// message.
EXPECT_CALL(*event_interface_, OnSendFlowControlQuotaAdded(_));
EXPECT_CALL(checkpoint, Call(3));
}
CreateChannelAndConnectSuccessfully();
checkpoint.Call(1);
EXPECT_EQ(channel_->SendFrame(
true, WebSocketFrameHeader::kOpCodeText,
AsIOBuffer(std::string(kDefaultQuotaRefreshTrigger, 'D')),
kDefaultQuotaRefreshTrigger),
WebSocketChannel::CHANNEL_ALIVE);
checkpoint.Call(2);
// We should have received more quota at this point.
EXPECT_EQ(channel_->SendFrame(
true, WebSocketFrameHeader::kOpCodeText,
AsIOBuffer(std::string(kDefaultQuotaRefreshTrigger, 'E')),
kDefaultQuotaRefreshTrigger),
WebSocketChannel::CHANNEL_ALIVE);
checkpoint.Call(3);
}
// If we send more than the available quota then the connection will be closed
// with an error.
TEST_F(WebSocketChannelEventInterfaceTest, WriteOverQuotaIsRejected) {
set_stream(std::make_unique<WriteableFakeWebSocketStream>());
{
InSequence s;
EXPECT_CALL(*event_interface_,
OnAddChannelResponse(_, _, _, kDefaultInitialQuota));
EXPECT_CALL(*event_interface_, OnFailChannel("Send quota exceeded"));
}
CreateChannelAndConnectSuccessfully();
EXPECT_EQ(channel_->SendFrame(
true, WebSocketFrameHeader::kOpCodeText,
AsIOBuffer(std::string(kDefaultInitialQuota + 1, 'C')),
kDefaultInitialQuota + 1),
WebSocketChannel::CHANNEL_DELETED);
}
// If a write fails, the channel is dropped.
TEST_F(WebSocketChannelEventInterfaceTest, FailedWrite) {
set_stream(std::make_unique<UnWriteableFakeWebSocketStream>());
......@@ -1531,6 +1437,7 @@ TEST_F(WebSocketChannelEventInterfaceTest, SendCloseDropsChannel) {
{
InSequence s;
EXPECT_CALL(*event_interface_, OnAddChannelResponse(_, _, _, _));
EXPECT_CALL(*event_interface_, OnSendDataFrameDone());
EXPECT_CALL(*event_interface_,
OnDropChannel(true, kWebSocketNormalClosure, "Fred"));
}
......@@ -2283,26 +2190,6 @@ TEST_F(WebSocketChannelStreamTest, WaitingMessagesAreBatched) {
std::move(write_callback).Run(OK);
}
// When the renderer sends more on a channel than it has quota for, we send the
// remote server a kWebSocketErrorGoingAway error code.
TEST_F(WebSocketChannelStreamTest, SendGoingAwayOnRendererQuotaExceeded) {
static const InitFrame expected[] = {
{FINAL_FRAME, WebSocketFrameHeader::kOpCodeClose,
MASKED, CLOSE_DATA(GOING_AWAY, "")}};
EXPECT_CALL(*mock_stream_, ReadFramesInternal(_, _))
.WillOnce(Return(ERR_IO_PENDING));
EXPECT_CALL(*mock_stream_, WriteFramesInternal(EqualsFrames(expected), _))
.WillOnce(Return(OK));
EXPECT_CALL(*mock_stream_, Close());
CreateChannelAndConnectSuccessfully();
EXPECT_EQ(channel_->SendFrame(
true, WebSocketFrameHeader::kOpCodeText,
AsIOBuffer(std::string(kDefaultInitialQuota + 1, 'C')),
kDefaultInitialQuota + 1),
WebSocketChannel::CHANNEL_DELETED);
}
// For convenience, most of these tests use Text frames. However, the WebSocket
// protocol also has Binary frames and those need to be 8-bit clean. For the
// sake of completeness, this test verifies that they are.
......@@ -2925,30 +2812,5 @@ TEST_F(WebSocketChannelStreamTimeoutTest, ConnectionCloseTimesOut) {
completion.WaitForResult();
}
// Verify that current_send_quota() returns a non-zero value for a newly
// connected channel.
TEST_F(WebSocketChannelTest, CurrentSendQuotaNonZero) {
CreateChannelAndConnectSuccessfully();
EXPECT_GT(channel_->current_send_quota(), 0);
}
// Verify that current_send_quota() is updated when SendFrame() is called.
TEST_F(WebSocketChannelTest, CurrentSendQuotaUpdated) {
const int kMessageSize = 5;
set_stream(std::make_unique<WriteableFakeWebSocketStream>());
CreateChannelAndConnectSuccessfully();
int initial_send_quota = channel_->current_send_quota();
EXPECT_GE(initial_send_quota, kMessageSize);
EXPECT_EQ(channel_->SendFrame(
true, WebSocketFrameHeader::kOpCodeText,
AsIOBuffer(std::string(static_cast<size_t>(kMessageSize), 'a')),
static_cast<size_t>(kMessageSize)),
WebSocketChannel::CHANNEL_ALIVE);
int new_send_quota = channel_->current_send_quota();
EXPECT_EQ(kMessageSize, initial_send_quota - new_send_quota);
}
} // namespace
} // namespace net
......@@ -109,7 +109,7 @@ class ConnectTestingEventInterface : public WebSocketEventInterface {
bool HasPendingDataFrames() override { return false; }
void OnSendFlowControlQuotaAdded(int64_t quota) override;
void OnSendDataFrameDone() override;
void OnClosingHandshake() override;
......@@ -182,7 +182,7 @@ void ConnectTestingEventInterface::OnDataFrame(bool fin,
base::span<const char> payload) {
}
void ConnectTestingEventInterface::OnSendFlowControlQuotaAdded(int64_t quota) {}
void ConnectTestingEventInterface::OnSendDataFrameDone() {}
void ConnectTestingEventInterface::OnClosingHandshake() {}
......
......@@ -64,9 +64,9 @@ class NET_EXPORT WebSocketEventInterface {
// out. The network service should not read more from network until that.
virtual bool HasPendingDataFrames() = 0;
// Called to provide more send quota for this channel to the renderer
// process.
virtual void OnSendFlowControlQuotaAdded(int64_t quota) = 0;
// Called once for each call to SendFrame() once the frame has been passed to
// the OS.
virtual void OnSendDataFrameDone() = 0;
// Called when the remote server has Started the WebSocket Closing
// Handshake. The client should not attempt to send any more messages after
......
......@@ -123,9 +123,9 @@ class WebSocket::WebSocketEventHandler final
void OnDataFrame(bool fin,
WebSocketMessageType type,
base::span<const char> payload) override;
void OnSendDataFrameDone() override;
bool HasPendingDataFrames() override;
void OnClosingHandshake() override;
void OnSendFlowControlQuotaAdded(int64_t quota) override;
void OnDropChannel(bool was_clean,
uint16_t code,
const std::string& reason) override;
......@@ -242,8 +242,6 @@ void WebSocket::WebSocketEventHandler::OnAddChannelResponse(
impl_->header_client_.reset();
impl_->client_.set_disconnect_handler(base::BindOnce(
&WebSocket::OnConnectionError, base::Unretained(impl_), FROM_HERE));
impl_->client_->AddSendFlowControlQuota(send_flow_control_quota);
}
void WebSocket::WebSocketEventHandler::OnDataFrame(
......@@ -260,6 +258,11 @@ void WebSocket::WebSocketEventHandler::OnDataFrame(
impl_->SendPendingDataFrames();
}
void WebSocket::WebSocketEventHandler::OnSendDataFrameDone() {
impl_->ResumeDataPipeReading();
return;
}
bool WebSocket::WebSocketEventHandler::HasPendingDataFrames() {
return !impl_->pending_data_frames_.empty();
}
......@@ -271,14 +274,6 @@ void WebSocket::WebSocketEventHandler::OnClosingHandshake() {
impl_->client_->OnClosingHandshake();
}
void WebSocket::WebSocketEventHandler::OnSendFlowControlQuotaAdded(
int64_t quota) {
DVLOG(3) << "WebSocketEventHandler::OnSendFlowControlQuotaAdded @"
<< reinterpret_cast<void*>(this) << " quota=" << quota;
impl_->client_->AddSendFlowControlQuota(quota);
}
void WebSocket::WebSocketEventHandler::OnDropChannel(
bool was_clean,
uint16_t code,
......@@ -496,7 +491,8 @@ void WebSocket::SendMessage(mojom::WebSocketMessageType type,
// Safe if ReadAndSendFromDataPipe() deletes |this| because this method is
// only called from mojo.
ReadAndSendFromDataPipe();
if (!blocked_on_websocket_channel_)
ReadAndSendFromDataPipe();
}
void WebSocket::StartReceiving() {
......@@ -631,6 +627,7 @@ void WebSocket::SendPendingDataFrames() {
<< reinterpret_cast<void*>(this)
<< ", pending_data_frames_.size=" << pending_data_frames_.size()
<< ", wait_for_writable_?" << wait_for_writable_;
if (wait_for_writable_) {
return;
}
......@@ -717,7 +714,9 @@ void WebSocket::ReadAndSendFromDataPipe() {
&buffer, &readable_size, MOJO_READ_DATA_FLAG_NONE);
if (begin_result == MOJO_RESULT_SHOULD_WAIT) {
wait_for_readable_ = true;
readable_watcher_.ArmOrNotify();
if (!blocked_on_websocket_channel_) {
readable_watcher_.ArmOrNotify();
}
return;
}
if (begin_result == MOJO_RESULT_FAILED_PRECONDITION) {
......@@ -730,13 +729,13 @@ void WebSocket::ReadAndSendFromDataPipe() {
auto data_to_pass = base::MakeRefCounted<net::IOBuffer>(size_to_send);
const bool is_final = (size_to_send == data_frame.data_length);
memcpy(data_to_pass->data(), buffer, size_to_send);
blocked_on_websocket_channel_ = true;
if (channel_->SendFrame(is_final, MessageTypeToOpCode(data_frame.type),
std::move(data_to_pass), size_to_send) ==
net::WebSocketChannel::CHANNEL_DELETED) {
// |this| has been deleted.
return;
}
const MojoResult end_result = readable_->EndReadData(size_to_send);
DCHECK_EQ(end_result, MOJO_RESULT_OK);
......@@ -752,6 +751,11 @@ void WebSocket::ReadAndSendFromDataPipe() {
return;
}
void WebSocket::ResumeDataPipeReading() {
blocked_on_websocket_channel_ = false;
readable_watcher_.ArmOrNotify();
}
void WebSocket::OnSSLCertificateErrorResponse(
std::unique_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks,
const net::SSLInfo& ssl_info,
......
......@@ -167,6 +167,7 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
// ReadAndSendFromDataPipe() may indirectly delete |this|.
void ReadAndSendFromDataPipe();
void ResumeDataPipeReading();
// |factory_| owns |this|.
WebSocketFactory* const factory_;
......@@ -212,6 +213,7 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
mojo::SimpleWatcher readable_watcher_;
base::queue<DataFrame> pending_send_data_frames_;
bool wait_for_readable_ = false;
bool blocked_on_websocket_channel_ = false;
DataPipeUseTracker data_pipe_use_tracker_;
......
......@@ -189,17 +189,14 @@ class MODULES_EXPORT WebSocketChannelImpl final
};
State GetState() const;
void SendInternal(network::mojom::blink::WebSocketMessageType,
const char* data,
size_t total_size,
uint64_t* consumed_buffered_amount);
void SendAndAdjustQuota(bool final,
network::mojom::blink::WebSocketMessageType,
base::span<const char>,
uint64_t* consumed_buffered_amount);
bool MaybeSendSynchronously(network::mojom::blink::WebSocketMessageType,
base::span<const char>);
base::span<const char>* data);
void ProcessSendQueue();
bool SendMessageData(base::span<const char>* data);
void FailAsError(const String& reason) {
Fail(reason, mojom::ConsoleMessageLevel::kError,
location_at_construction_->Clone());
......@@ -226,6 +223,10 @@ class MODULES_EXPORT WebSocketChannelImpl final
network::mojom::blink::WebSocketMessageType type,
const char* data,
size_t data_size);
// Called when |writable_| becomes writable.
void OnWritable(MojoResult result, const mojo::HandleSignalsState& state);
MojoResult ProduceData(base::span<const char>* data,
uint64_t* consumed_buffered_amount);
String GetTextMessage(const Vector<base::span<const char>>& chunks,
wtf_size_t size);
void OnConnectionError(const base::Location& set_from,
......@@ -275,6 +276,8 @@ class MODULES_EXPORT WebSocketChannelImpl final
WTF::Deque<DataFrame> pending_data_frames_;
mojo::ScopedDataPipeProducerHandle writable_;
mojo::SimpleWatcher writable_watcher_;
bool wait_for_writable_ = false;
const scoped_refptr<base::SingleThreadTaskRunner> file_reading_task_runner_;
};
......
<!doctype html>
<html>
<script src = "/resources/testharness.js"></script>
<script src = "/resources/testharnessreport.js"></script>
<script>
'use strict';
// Reproduce an issue where the renderer would crash if the remote server
// closed the connection in the middle of sending a large message. The test
// passes if it doesn't crash.
async_test(t => {
const ws = new WebSocket('ws://127.0.0.1:8880/close-on-frame');
ws.onopen = () => {
// Experimentally determined to be large enough to reliably trigger the
// issue.
const ab = new ArrayBuffer(8 << 20);
ws.send(ab);
};
ws.onerror = t.unreached_func('onerror should not be fired');
ws.onmessage = t.unreached_func('onmessage should not be fired');
ws.onclose = evt => t.done();
}, 'Renderer shouldn\'t crash if connection is closed during message send');
</script>
</html>
# Wait for a frame to be received from the client and then immediately close.
from mod_pywebsocket import common
from mod_pywebsocket import msgutil
def web_socket_do_extra_handshake(request):
pass
def web_socket_transfer_data(request):
# We need to use an internal function to read one frame without
# waiting for the rest of the message.
opcode, recv_payload, final, reserved1, reserved2, reserved3 = \
request.ws_stream._receive_frame()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment