Commit fcaa2a2c authored by Yoichi Osato's avatar Yoichi Osato Committed by Commit Bot

[WebSocket] Remove manual quota control for receiving.

Because we're using mojo data pipe, which has own quota control,
to transfer received data, we don't have manual one, or
mojom::WebSocket.AddReceiveFlowControlQuota().
This CL does a sort of work removing the function.

mojom::WebSocket.AddReceiveFlowControlQuota(quota) did 5 tasks:
browser side(mainly on WebSocketChannel)
 1. Trigger the first ReadFrame if renderer gets not throttled.
For #1, this patch moves the trigger to mojo WebSocket.StartReceiving.

 2. Trigger next ReadFrame if there is enough quota.
 3. Send pending dataframes based on added quota.
This patch moves task #2 and #3 to websocket.cc and datapipe itself.

 4. Dropchannel if all pending frames are received by renderer when closed.
Because we can care #4 w/o renderer ping back, this patch move
RespondToClosingHandshake()to WebSocketChannel::ReadFrames().

renderer side(mainly on WebSocketChannelImpl)
 5. Ping browser that backpressure is turned off.
For task #5, this patch changes not to call the mojo but throttle
datapipe reading with WebSocketHandleImpl::ConsumePendingDataFrames().

receive-arraybuffer-1MBx100.htmll?iteration=100 measurement on local build:
ToT:            144 MB/s (stdev: 4.56 MB/s)
Patch:          208 MB/s (stdev: 6.15 MB/s)
  (+44% to ToT, +41% to ReadOnlyBuffer)
ReadOnlyBuffer: 147 MB/s

Change-Id: I61feab06f0e3719520e6a47eea61e0322e4da01f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1763507
Commit-Queue: Yoichi Osato <yoichio@chromium.org>
Reviewed-by: default avatarKinuko Yasuda <kinuko@chromium.org>
Reviewed-by: default avatarKaran Bhatia <karandeepb@chromium.org>
Reviewed-by: default avatarAdam Rice <ricea@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Cr-Commit-Position: refs/heads/master@{#691072}
parent 57627534
......@@ -195,7 +195,6 @@ void WebRequestProxyingWebSocket::OnConnectionEstablished(
mojo::PendingReceiver<network::mojom::WebSocketClient> client_receiver,
const std::string& selected_protocol,
const std::string& extensions,
uint64_t receive_quota_threshold,
mojo::ScopedDataPipeConsumerHandle readable) {
DCHECK(forwarding_handshake_client_);
DCHECK(!is_done_);
......@@ -205,7 +204,7 @@ void WebRequestProxyingWebSocket::OnConnectionEstablished(
forwarding_handshake_client_->OnConnectionEstablished(
std::move(websocket), std::move(client_receiver), selected_protocol,
extensions, receive_quota_threshold, std::move(readable));
extensions, std::move(readable));
// Deletes |this|.
proxies_->RemoveProxy(this);
......
......@@ -69,7 +69,6 @@ class WebRequestProxyingWebSocket
mojo::PendingReceiver<network::mojom::WebSocketClient> client_receiver,
const std::string& selected_protocol,
const std::string& extensions,
uint64_t receive_quota_threshold,
mojo::ScopedDataPipeConsumerHandle readable) override;
// network::mojom::AuthenticationHandler method:
......
......@@ -285,7 +285,6 @@ WebSocketChannel::WebSocketChannel(
send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
current_send_quota_(0),
current_receive_quota_(0),
closing_handshake_timeout_(
base::TimeDelta::FromSeconds(kClosingHandshakeTimeoutSeconds)),
underlying_connection_close_timeout_(base::TimeDelta::FromSeconds(
......@@ -391,61 +390,6 @@ WebSocketChannel::ChannelState WebSocketChannel::SendFrame(
const char kWebSocketReceiveQuotaThreshold[] =
"websocket-renderer-receive-quota-max";
ChannelState WebSocketChannel::AddReceiveFlowControlQuota(int64_t quota) {
DCHECK(state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
// TODO(ricea): Kill the renderer if it tries to send us a negative quota
// value or > INT_MAX.
DCHECK_GE(quota, 0);
DCHECK_LE(quota, INT_MAX);
if (!pending_received_frames_.empty()) {
DCHECK_EQ(0u, current_receive_quota_);
}
while (!pending_received_frames_.empty() && quota > 0) {
PendingReceivedFrame& front = pending_received_frames_.front();
const uint64_t data_size = front.size() - front.offset();
const uint64_t bytes_to_send =
std::min(base::checked_cast<uint64_t>(quota), data_size);
const bool final = front.final() && data_size == bytes_to_send;
scoped_refptr<IOBuffer> buffer_to_pass;
if (front.data()) {
buffer_to_pass =
base::MakeRefCounted<DependentIOBuffer>(front.data(), front.offset());
} else {
DCHECK(!bytes_to_send) << "Non empty data should not be null.";
}
DVLOG(3) << "Sending frame previously split due to quota to the "
<< "renderer: quota=" << quota << " data_size=" << data_size
<< " bytes_to_send=" << bytes_to_send;
event_interface_->OnDataFrame(final, front.opcode(),
std::move(buffer_to_pass), bytes_to_send);
if (bytes_to_send < data_size) {
front.DidConsume(bytes_to_send);
front.ResetOpcode();
return CHANNEL_ALIVE;
}
quota -= bytes_to_send;
pending_received_frames_.pop();
}
if (!InClosingState() && pending_received_frames_.empty() &&
has_received_close_frame_) {
// We've been waiting for the client to consume the frames before
// responding to the closing handshake initiated by the server.
return RespondToClosingHandshake();
}
// If current_receive_quota_ == 0 then there is no pending ReadFrames()
// operation.
const bool start_read =
current_receive_quota_ == 0 && quota > 0 &&
(state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
current_receive_quota_ += quota;
if (start_read) {
return ReadFrames();
}
return CHANNEL_ALIVE;
}
ChannelState WebSocketChannel::StartClosingHandshake(
uint16_t code,
const std::string& reason) {
......@@ -575,7 +519,6 @@ void WebSocketChannel::OnConnectSuccess(
// |stream_request_| is not used once the connection has succeeded.
stream_request_.reset();
ignore_result(ReadFrames());
// |this| may have been deleted.
}
......@@ -690,7 +633,21 @@ ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
ChannelState WebSocketChannel::ReadFrames() {
// TODO(crbug.com/994000) Remove this CHECK.
CHECK(stream_);
while (current_receive_quota_ > 0) {
DCHECK(state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
DCHECK(read_frames_.empty());
if (is_reading_) {
return CHANNEL_ALIVE;
}
// TODO(yoichio): Add test for this case.
if (!InClosingState() && has_received_close_frame_) {
DCHECK(!event_interface_->HasPendingDataFrames());
// We've been waiting for the client to consume the frames before
// responding to the closing handshake initiated by the server.
ignore_result(RespondToClosingHandshake());
}
while (!event_interface_->HasPendingDataFrames()) {
// TODO(crbug.com/994000) Remove this CHECK.
CHECK(stream_);
// This use of base::Unretained is safe because this object owns the
......@@ -701,6 +658,7 @@ ChannelState WebSocketChannel::ReadFrames() {
base::Bind(base::IgnoreResult(&WebSocketChannel::OnReadDone),
base::Unretained(this), false));
if (result == ERR_IO_PENDING) {
is_reading_ = true;
return CHANNEL_ALIVE;
}
if (OnReadDone(true, result) == CHANNEL_DELETED) {
......@@ -714,6 +672,9 @@ ChannelState WebSocketChannel::ReadFrames() {
}
ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
DVLOG(3) << "WebSocketChannel::OnReadDone synchronous?" << synchronous
<< ", result=" << result
<< ", read_frames_.size=" << read_frames_.size();
DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
DCHECK_NE(CONNECTING, state_);
DCHECK_NE(ERR_IO_PENDING, result);
......@@ -728,11 +689,13 @@ ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
return CHANNEL_DELETED;
}
read_frames_.clear();
// There should always be a call to ReadFrames pending.
// TODO(ricea): Unless we are out of quota.
DCHECK_NE(CLOSED, state_);
if (!synchronous)
return ReadFrames();
if (!synchronous) {
is_reading_ = false;
if (!event_interface_->HasPendingDataFrames()) {
return ReadFrames();
}
}
return CHANNEL_ALIVE;
case ERR_WS_PROTOCOL_ERROR:
......@@ -842,7 +805,6 @@ ChannelState WebSocketChannel::HandleFrameByState(
}
// TODO(ricea): Find a way to safely log the message from the close
// message (escape control codes and so on).
DVLOG(1) << "Got Close with code " << code;
return HandleCloseFrame(code, reason);
}
......@@ -858,6 +820,9 @@ ChannelState WebSocketChannel::HandleDataFrame(
bool final,
scoped_refptr<IOBuffer> data_buffer,
uint64_t size) {
DVLOG(3) << "WebSocketChannel::HandleDataFrame opcode=" << opcode
<< ", final?" << final
<< ", data_buffer=" << (void*)data_buffer.get() << ", size=" << size;
if (state_ != CONNECTED) {
DVLOG(3) << "Ignored data packet received in state " << state_;
return CHANNEL_ALIVE;
......@@ -909,23 +874,6 @@ ChannelState WebSocketChannel::HandleDataFrame(
return CHANNEL_ALIVE;
initial_frame_forwarded_ = !final;
if (size > current_receive_quota_ || !pending_received_frames_.empty()) {
const bool no_quota = (current_receive_quota_ == 0);
DCHECK(no_quota || pending_received_frames_.empty());
DVLOG(3) << "Queueing frame to renderer due to quota. quota="
<< current_receive_quota_ << " size=" << size;
WebSocketFrameHeader::OpCode opcode_to_queue =
no_quota ? opcode_to_send : WebSocketFrameHeader::kOpCodeContinuation;
pending_received_frames_.push(PendingReceivedFrame(
final, opcode_to_queue, data_buffer, current_receive_quota_, size));
if (no_quota)
return CHANNEL_ALIVE;
size = current_receive_quota_;
final = false;
}
current_receive_quota_ -= size;
// Sends the received frame to the renderer process.
event_interface_->OnDataFrame(final, opcode_to_send, std::move(data_buffer),
size);
......@@ -940,7 +888,7 @@ ChannelState WebSocketChannel::HandleCloseFrame(uint16_t code,
has_received_close_frame_ = true;
received_close_code_ = code;
received_close_reason_ = reason;
if (!pending_received_frames_.empty()) {
if (event_interface_->HasPendingDataFrames()) {
// We have some data to be sent to the renderer before sending this
// frame.
return CHANNEL_ALIVE;
......
......@@ -96,14 +96,10 @@ class NET_EXPORT WebSocketChannel {
scoped_refptr<IOBuffer> buffer,
size_t buffer_size);
// Sends |quota| units of flow control to the remote side. If the underlying
// transport has a concept of |quota|, then it permits the remote server to
// send up to |quota| units of data.
//
// Calling this function may result in synchronous calls to |event_interface_|
// which may result in this object being deleted. In that case, the return
// value will be CHANNEL_DELETED.
ChannelState AddReceiveFlowControlQuota(int64_t quota) WARN_UNUSED_RESULT;
// Calls WebSocketStream::ReadFrames() with the appropriate arguments. Stops
// calling ReadFrames if no writable buffer in dataframe or WebSocketStream
// starts async read.
ChannelState ReadFrames() WARN_UNUSED_RESULT;
// Starts the closing handshake for a client-initiated shutdown of the
// connection. There is no API to close the connection without a closing
......@@ -241,10 +237,6 @@ class NET_EXPORT WebSocketChannel {
// WriteFrames() itself.
ChannelState OnWriteDone(bool synchronous, int result) WARN_UNUSED_RESULT;
// Calls WebSocketStream::ReadFrames() with the appropriate arguments. Stops
// calling ReadFrames if current_receive_quota_ is 0.
ChannelState ReadFrames() WARN_UNUSED_RESULT;
// Callback from WebSocketStream::ReadFrames. Handles any errors and processes
// the returned chunks appropriately to their type. |result| is a net error
// code. If |synchronous| is true, then OnReadDone() is being called from
......@@ -357,10 +349,6 @@ class NET_EXPORT WebSocketChannel {
// Destination for the current call to WebSocketStream::ReadFrames
std::vector<std::unique_ptr<WebSocketFrame>> read_frames_;
// Frames that have been read but not yet forwarded to the renderer due to
// lack of quota.
base::queue<PendingReceivedFrame> pending_received_frames_;
// Handle to an in-progress WebSocketStream creation request. Only non-NULL
// during the connection process.
std::unique_ptr<WebSocketStreamRequest> stream_request_;
......@@ -375,9 +363,6 @@ class NET_EXPORT WebSocketChannel {
// The current amount of quota that the renderer has available for sending
// on this logical channel (quota units).
int current_send_quota_;
// The remaining amount of quota that the renderer will allow us to send on
// this logical channel (quota units).
uint64_t current_receive_quota_;
// Timer for the closing handshake.
base::OneShotTimer close_timer_;
......@@ -415,6 +400,9 @@ class NET_EXPORT WebSocketChannel {
// message to the renderer. This can be false if the message is empty so far.
bool initial_frame_forwarded_;
// True if we're waiting for OnReadDone() callback.
bool is_reading_ = false;
DISALLOW_COPY_AND_ASSIGN(WebSocketChannel);
};
......
This diff is collapsed.
......@@ -104,6 +104,8 @@ class ConnectTestingEventInterface : public WebSocketEventInterface {
scoped_refptr<IOBuffer> data,
size_t data_size) override;
bool HasPendingDataFrames() override { return false; }
void OnSendFlowControlQuotaAdded(int64_t quota) override;
void OnClosingHandshake() override;
......
......@@ -55,6 +55,10 @@ class NET_EXPORT WebSocketEventInterface {
scoped_refptr<IOBuffer> buffer,
size_t buffer_size) = 0;
// Returns true if data pipe is full and waiting the renderer process read
// out. The network service should not read more from network until that.
virtual bool HasPendingDataFrames() = 0;
// Called to provide more send quota for this channel to the renderer
// process.
virtual void OnSendFlowControlQuotaAdded(int64_t quota) = 0;
......
......@@ -64,15 +64,11 @@ interface WebSocketHandshakeClient {
// sub-protocol the server selected, or empty if no sub-protocol was selected.
// |extensions| is the list of extensions negotiated for the connection.
// default threshold value
// |receive_quota_threshold| is the value that the renderer calls
// AddReceiveFlowControlQuota() to the browser per receiving this value
// so that the browser can continue sending remaining data to the renderer.
// |readable| is readable datapipe to receive data from browser.
OnConnectionEstablished(WebSocket socket,
pending_receiver<WebSocketClient> client_receiver,
string selected_protocol,
string extensions,
uint64 receive_quota_threshold,
handle<data_pipe_consumer> readable);
};
......@@ -137,11 +133,11 @@ interface WebSocket {
// UTF-8. If |fin| is not set, |data| must be non-empty.
SendFrame(bool fin, WebSocketMessageType type, array<uint8> data);
// Add |quota| bytes of receive quota. |quota| must be positive. Initial quota
// is 0. The browser will wait for an AddReceiveFlowControlQuota() message
// before forwarding any messages to the renderer. Total quota must never
// exceed 0x7FFFFFFFFFFFFFFF bytes.
AddReceiveFlowControlQuota(int64 quota);
// Let browser to start receiving WebSocket data frames from network stream.
// TODO(yoichio): Remove this by move Connect() after checking throttle at
// WebSocketChannelImpl::Connect so that OnAddChannelResponse is
// actual signal to start receive data frame.
StartReceiving();
// Close the channel gracefully.
//
......
......@@ -90,6 +90,7 @@ class WebSocket::WebSocketEventHandler final
WebSocketMessageType type,
scoped_refptr<net::IOBuffer> buffer,
size_t buffer_size) override;
bool HasPendingDataFrames() override;
void OnClosingHandshake() override;
void OnSendFlowControlQuotaAdded(int64_t quota) override;
void OnDropChannel(bool was_clean,
......@@ -147,8 +148,8 @@ void WebSocket::WebSocketEventHandler::OnAddChannelResponse(
mojom::WebSocketPtr websocket_to_pass;
impl_->binding_.Bind(mojo::MakeRequest(&websocket_to_pass));
impl_->binding_.set_connection_error_handler(
base::BindOnce(&WebSocket::OnConnectionError, base::Unretained(impl_)));
impl_->binding_.set_connection_error_handler(base::BindOnce(
&WebSocket::OnConnectionError, base::Unretained(impl_), FROM_HERE));
impl_->handshake_succeeded_ = true;
impl_->pending_connection_tracker_.OnCompleteHandshake();
......@@ -185,13 +186,12 @@ void WebSocket::WebSocketEventHandler::OnAddChannelResponse(
impl_->handshake_client_->OnConnectionEstablished(
std::move(websocket_to_pass), mojo::MakeRequest(&impl_->client_),
selected_protocol, extensions, receive_quota_threshold,
std::move(readable));
selected_protocol, extensions, std::move(readable));
impl_->handshake_client_.reset();
impl_->auth_handler_ = nullptr;
impl_->header_client_.reset();
impl_->client_.set_connection_error_handler(
base::BindOnce(&WebSocket::OnConnectionError, base::Unretained(impl_)));
impl_->client_.set_connection_error_handler(base::BindOnce(
&WebSocket::OnConnectionError, base::Unretained(impl_), FROM_HERE));
}
struct WebSocket::DataFrame {
......@@ -219,6 +219,10 @@ void WebSocket::WebSocketEventHandler::OnDataFrame(
impl_->SendPendingDataFrames();
}
bool WebSocket::WebSocketEventHandler::HasPendingDataFrames() {
return !impl_->pending_data_frames_.empty();
}
void WebSocket::WebSocketEventHandler::OnClosingHandshake() {
DVLOG(3) << "WebSocketEventHandler::OnClosingHandshake @"
<< reinterpret_cast<void*>(this);
......@@ -393,17 +397,17 @@ WebSocket::WebSocket(
if (auth_handler_) {
// Make sure the request dies if |auth_handler_| has an error, otherwise
// requests can hang.
auth_handler_.set_connection_error_handler(
base::BindOnce(&WebSocket::OnConnectionError, base::Unretained(this)));
auth_handler_.set_connection_error_handler(base::BindOnce(
&WebSocket::OnConnectionError, base::Unretained(this), FROM_HERE));
}
if (header_client_) {
// Make sure the request dies if |header_client_| has an error, otherwise
// requests can hang.
header_client_.set_disconnect_handler(
base::BindOnce(&WebSocket::OnConnectionError, base::Unretained(this)));
header_client_.set_disconnect_handler(base::BindOnce(
&WebSocket::OnConnectionError, base::Unretained(this), FROM_HERE));
}
handshake_client_.set_disconnect_handler(
base::BindOnce(&WebSocket::OnConnectionError, base::Unretained(this)));
handshake_client_.set_disconnect_handler(base::BindOnce(
&WebSocket::OnConnectionError, base::Unretained(this), FROM_HERE));
if (delay_ > base::TimeDelta()) {
base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE,
......@@ -451,14 +455,9 @@ void WebSocket::SendFrame(bool fin,
data.size());
}
void WebSocket::AddReceiveFlowControlQuota(int64_t quota) {
DVLOG(3) << "WebSocket::AddReceiveFlowControlQuota @"
<< reinterpret_cast<void*>(this) << " quota=" << quota;
DCHECK(channel_) << "WebSocket::AddReceiveFlowControlQuota is called but "
"there is no active channel.";
DCHECK(handshake_succeeded_);
ignore_result(channel_->AddReceiveFlowControlQuota(quota));
void WebSocket::StartReceiving() {
DCHECK(pending_data_frames_.empty());
ignore_result(channel_->ReadFrames());
}
void WebSocket::StartClosingHandshake(uint16_t code,
......@@ -525,8 +524,9 @@ WebSocket* WebSocket::ForRequest(const net::URLRequest& request) {
return pointer->get();
}
void WebSocket::OnConnectionError() {
DVLOG(3) << "WebSocket::OnConnectionError @" << reinterpret_cast<void*>(this);
void WebSocket::OnConnectionError(const base::Location& set_from) {
DVLOG(3) << "WebSocket::OnConnectionError @" << reinterpret_cast<void*>(this)
<< ", set_from=" << set_from.ToString();
factory_->Remove(this);
}
......@@ -572,19 +572,28 @@ void WebSocket::OnWritable(MojoResult result,
Reset();
return;
}
wait_for_writable_ = false;
SendPendingDataFrames();
if (pending_data_frames_.empty()) {
ignore_result(channel_->ReadFrames());
}
}
void WebSocket::SendPendingDataFrames() {
DVLOG(3) << "WebSocket::SendPendingDataFrames @"
<< reinterpret_cast<void*>(this)
<< ", pending_data_frames_.size=" << pending_data_frames_.size();
<< ", pending_data_frames_.size=" << pending_data_frames_.size()
<< ", wait_for_writable_?" << wait_for_writable_;
if (wait_for_writable_) {
return;
}
while (!pending_data_frames_.empty()) {
WebSocket::DataFrame& data_frame = pending_data_frames_.front();
SendDataFrame(&data_frame);
if (data_frame.size > 0) {
// Mojo doesn't have any write buffer so far.
writable_watcher_.ArmOrNotify();
wait_for_writable_ = true;
return;
}
pending_data_frames_.pop();
......@@ -618,7 +627,7 @@ void WebSocket::SendDataFrame(DataFrame* data_frame) {
DCHECK_EQ(begin_result, MOJO_RESULT_FAILED_PRECONDITION);
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&WebSocket::OnConnectionError,
weak_ptr_factory_.GetWeakPtr()));
weak_ptr_factory_.GetWeakPtr(), FROM_HERE));
}
return;
}
......
......@@ -27,6 +27,10 @@
class GURL;
namespace base {
class Location;
} // namespace base
namespace net {
class SSLInfo;
class WebSocketChannel;
......@@ -64,7 +68,7 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
void SendFrame(bool fin,
mojom::WebSocketMessageType type,
const std::vector<uint8_t>& data) override;
void AddReceiveFlowControlQuota(int64_t quota) override;
void StartReceiving() override;
void StartClosingHandshake(uint16_t code, const std::string& reason) override;
bool handshake_succeeded() const { return handshake_succeeded_; }
......@@ -109,7 +113,7 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
DISALLOW_COPY_AND_ASSIGN(UnownedPointer);
};
void OnConnectionError();
void OnConnectionError(const base::Location& set_from);
void AddChannel(const GURL& socket_url,
const std::vector<std::string>& requested_protocols,
const GURL& site_for_cookies,
......@@ -176,6 +180,7 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
mojo::ScopedDataPipeProducerHandle writable_;
mojo::SimpleWatcher writable_watcher_;
base::queue<DataFrame> pending_data_frames_;
bool wait_for_writable_ = false;
base::WeakPtrFactory<WebSocket> weak_ptr_factory_{this};
......
......@@ -190,9 +190,12 @@ DOMWebSocket::DOMWebSocket(ExecutionContext* context)
subprotocol_(""),
extensions_(""),
event_queue_(MakeGarbageCollected<EventQueue>(this)),
buffered_amount_update_task_pending_(false) {}
buffered_amount_update_task_pending_(false) {
NETWORK_DVLOG(1) << "DOMWebSocket " << this << " created";
}
DOMWebSocket::~DOMWebSocket() {
NETWORK_DVLOG(1) << "DOMWebSocket " << this << " destroyed";
DCHECK(!channel_);
}
......
......@@ -419,7 +419,7 @@ void WebSocketChannelImpl::ApplyBackpressure() {
void WebSocketChannelImpl::RemoveBackpressure() {
backpressure_ = false;
AddReceiveFlowControlIfNecessary();
handle_->ConsumePendingDataFrames();
}
WebSocketChannelImpl::Message::Message(const std::string& text,
......@@ -548,23 +548,6 @@ void WebSocketChannelImpl::ProcessSendQueue() {
client_->DidConsumeBufferedAmount(consumed_buffered_amount);
}
void WebSocketChannelImpl::AddReceiveFlowControlIfNecessary() {
DCHECK(receive_quota_threshold_.has_value());
if (!handle_ ||
received_data_size_for_flow_control_ < receive_quota_threshold_.value()) {
return;
}
handle_->AddReceiveFlowControlQuota(received_data_size_for_flow_control_);
received_data_size_for_flow_control_ = 0;
}
void WebSocketChannelImpl::InitialReceiveFlowControl() {
DCHECK(receive_quota_threshold_.has_value());
DCHECK_EQ(received_data_size_for_flow_control_, 0u);
DCHECK(handle_);
handle_->AddReceiveFlowControlQuota(receive_quota_threshold_.value() * 2);
}
void WebSocketChannelImpl::AbortAsyncOperations() {
if (blob_loader_) {
blob_loader_->Cancel();
......@@ -577,7 +560,6 @@ void WebSocketChannelImpl::HandleDidClose(bool was_clean,
const String& reason) {
handshake_throttle_.reset();
handle_.reset();
receive_quota_threshold_.reset();
AbortAsyncOperations();
if (!client_) {
return;
......@@ -592,25 +574,22 @@ void WebSocketChannelImpl::HandleDidClose(bool was_clean,
void WebSocketChannelImpl::DidConnect(WebSocketHandle* handle,
const String& selected_protocol,
const String& extensions,
uint64_t receive_quota_threshold) {
const String& extensions) {
NETWORK_DVLOG(1) << this << " DidConnect(" << handle << ", "
<< String(selected_protocol) << ", " << String(extensions)
<< ", " << receive_quota_threshold << ")";
<< "), throttle_passed_?" << throttle_passed_;
DCHECK(handle_);
DCHECK_EQ(handle, handle_.get());
DCHECK(client_);
receive_quota_threshold_ = receive_quota_threshold;
if (!throttle_passed_) {
connect_info_ =
std::make_unique<ConnectInfo>(selected_protocol, extensions);
return;
}
InitialReceiveFlowControl();
handle_->StartReceiving();
handshake_throttle_.reset();
......@@ -676,7 +655,7 @@ void WebSocketChannelImpl::DidReceiveData(WebSocketHandle* handle,
NETWORK_DVLOG(1) << this << " DidReceiveData(" << handle << ", " << fin
<< ", " << type << ", (" << static_cast<const void*>(data)
<< ", " << size << "))";
DCHECK(!backpressure_);
DCHECK(handle_);
DCHECK_EQ(handle, handle_.get());
DCHECK(client_);
......@@ -696,10 +675,6 @@ void WebSocketChannelImpl::DidReceiveData(WebSocketHandle* handle,
break;
}
received_data_size_for_flow_control_ += size;
if (!backpressure_)
AddReceiveFlowControlIfNecessary();
const size_t message_size_so_far = message_chunks_.GetSize();
if (message_size_so_far > std::numeric_limits<wtf_size_t>::max()) {
message_chunks_.Clear();
......@@ -810,10 +785,7 @@ void WebSocketChannelImpl::OnCompletion(
throttle_passed_ = true;
if (connect_info_) {
// No flow control quota is supplied to the browser until we are ready to
// receive messages. This fixes crbug.com/786776.
InitialReceiveFlowControl();
handle_->StartReceiving();
client_->DidConnect(std::move(connect_info_->selected_protocol),
std::move(connect_info_->extensions));
connect_info_.reset();
......
......@@ -107,8 +107,7 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
// Called when the handle is opened.
void DidConnect(WebSocketHandle* handle,
const String& selected_protocol,
const String& extensions,
uint64_t receive_quota_threshold);
const String& extensions);
// Called when the browser starts the opening handshake.
// This notification can be omitted when the inspector is not active.
......@@ -137,6 +136,7 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
WebSocketHandle::MessageType,
const char* data,
size_t);
bool HasBackPressureToReceiveData() { return backpressure_; }
// Called when the handle is closed.
// |handle| becomes unavailable once this notification arrives.
......@@ -192,8 +192,6 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
bool MaybeSendSynchronously(WebSocketHandle::MessageType,
base::span<const char>);
void ProcessSendQueue();
void AddReceiveFlowControlIfNecessary();
void InitialReceiveFlowControl();
void FailAsError(const String& reason) {
Fail(reason, mojom::ConsoleMessageLevel::kError,
location_at_construction_->Clone());
......@@ -231,7 +229,6 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
bool receiving_message_type_is_text_ = false;
bool throttle_passed_ = false;
uint64_t sending_quota_ = 0;
uint64_t received_data_size_for_flow_control_ = 0;
wtf_size_t sent_size_of_top_message_ = 0;
FrameScheduler::SchedulingAffectingFeatureHandle
feature_handle_for_scheduler_;
......@@ -244,8 +241,6 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
std::unique_ptr<ConnectInfo> connect_info_;
const scoped_refptr<base::SingleThreadTaskRunner> file_reading_task_runner_;
base::Optional<uint64_t> receive_quota_threshold_;
};
MODULES_EXPORT std::ostream& operator<<(std::ostream&,
......
......@@ -36,8 +36,6 @@ using testing::SaveArg;
namespace blink {
constexpr uint64_t kInitialReceiveFlowControlQuota = 65536;
typedef testing::StrictMock<testing::MockFunction<void(int)>> Checkpoint;
class MockWebSocketChannelClient
......@@ -104,7 +102,8 @@ class MockWebSocketHandle : public WebSocketHandle {
MOCK_METHOD4(
Send,
void(bool, WebSocketHandle::MessageType, const char*, wtf_size_t));
MOCK_METHOD1(AddReceiveFlowControlQuota, void(int64_t));
MOCK_METHOD0(StartReceiving, void());
MOCK_METHOD0(ConsumePendingDataFrames, void());
MOCK_METHOD2(Close, void(uint16_t, const String&));
};
......@@ -145,6 +144,7 @@ class WebSocketChannelImplTest : public PageTestBase {
channel_ = WebSocketChannelImpl::CreateForTesting(
&GetDocument(), channel_client_.Get(), SourceLocation::Capture(),
Handle(), base::WrapUnique(handshake_throttle_));
EXPECT_CALL(*Handle(), StartReceiving());
}
MockWebSocketChannelClient* ChannelClient() { return channel_client_.Get(); }
......@@ -166,13 +166,10 @@ class WebSocketChannelImplTest : public PageTestBase {
InSequence s;
EXPECT_CALL(*Handle(),
Connect(KURL("ws://localhost/"), _, _, _, ChannelImpl()));
EXPECT_CALL(*Handle(),
AddReceiveFlowControlQuota(kInitialReceiveFlowControlQuota));
EXPECT_CALL(*ChannelClient(), DidConnect(String("a"), String("b")));
}
EXPECT_TRUE(Channel()->Connect(KURL("ws://localhost/"), "x"));
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"),
kDefaultReceiveQuotaThreshold);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"));
testing::Mock::VerifyAndClearExpectations(this);
}
......@@ -182,8 +179,6 @@ class WebSocketChannelImplTest : public PageTestBase {
MockWebSocketHandshakeThrottle* handshake_throttle_;
Persistent<WebSocketChannelImpl> channel_;
uint64_t sum_of_consumed_buffered_amount_;
static const uint64_t kDefaultReceiveQuotaThreshold = 1 << 15;
};
class CallTrackingClosure {
......@@ -236,8 +231,6 @@ TEST_F(WebSocketChannelImplTest, connectSuccess) {
KURLEq("http://example.com/"), _, ChannelImpl()))
.WillOnce(SaveArg<1>(&protocols));
EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(*Handle(),
AddReceiveFlowControlQuota(kInitialReceiveFlowControlQuota));
EXPECT_CALL(*ChannelClient(), DidConnect(String("a"), String("b")));
}
......@@ -250,8 +243,7 @@ TEST_F(WebSocketChannelImplTest, connectSuccess) {
EXPECT_EQ("x", protocols[0]);
checkpoint.Call(1);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"),
kDefaultReceiveQuotaThreshold);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"));
}
TEST_F(WebSocketChannelImplTest, sendText) {
......@@ -724,23 +716,16 @@ TEST_F(WebSocketChannelImplTest, receiveBinaryNonUTF8) {
TEST_F(WebSocketChannelImplTest, receiveWithBackpressure) {
Connect();
std::string data(kInitialReceiveFlowControlQuota, 'a');
std::string data(100, 'a');
Checkpoint checkpoint;
{
InSequence s;
EXPECT_CALL(*ChannelClient(), DidReceiveTextMessage(_));
EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(*Handle(),
AddReceiveFlowControlQuota(kInitialReceiveFlowControlQuota));
EXPECT_CALL(*Handle(),
AddReceiveFlowControlQuota(kInitialReceiveFlowControlQuota));
EXPECT_CALL(*Handle(), ConsumePendingDataFrames());
EXPECT_CALL(*ChannelClient(), DidReceiveTextMessage(_));
}
ChannelImpl()->ApplyBackpressure();
ChannelImpl()->DidReceiveData(Handle(), true,
WebSocketHandle::kMessageTypeText, data.data(),
data.size());
checkpoint.Call(1);
ChannelImpl()->RemoveBackpressure();
ChannelImpl()->DidReceiveData(Handle(), true,
......@@ -851,6 +836,15 @@ class WebSocketChannelImplHandshakeThrottleTest
handshake_throttle_ = MockWebSocketHandshakeThrottle::Create();
}
void SetUp() override {
PageTestBase::SetUp(IntSize());
const KURL page_url("http://example.com/");
NavigateTo(page_url);
channel_ = WebSocketChannelImpl::CreateForTesting(
&GetDocument(), channel_client_.Get(), SourceLocation::Capture(),
Handle(), base::WrapUnique(handshake_throttle_));
}
// Expectations for the normal result of calling Channel()->Connect() with a
// non-null throttle.
void NormalHandshakeExpectations() {
......@@ -876,15 +870,14 @@ TEST_F(WebSocketChannelImplHandshakeThrottleTest, ThrottleSucceedsFirst) {
EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(*handshake_throttle_, Destructor());
EXPECT_CALL(checkpoint, Call(2));
EXPECT_CALL(*Handle(), AddReceiveFlowControlQuota(_));
EXPECT_CALL(*Handle(), StartReceiving());
EXPECT_CALL(*ChannelClient(), DidConnect(String("a"), String("b")));
}
Channel()->Connect(url(), "");
checkpoint.Call(1);
ChannelImpl()->OnCompletion(base::nullopt);
checkpoint.Call(2);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"),
kDefaultReceiveQuotaThreshold);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"));
}
TEST_F(WebSocketChannelImplHandshakeThrottleTest, HandshakeSucceedsFirst) {
......@@ -895,13 +888,12 @@ TEST_F(WebSocketChannelImplHandshakeThrottleTest, HandshakeSucceedsFirst) {
EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(checkpoint, Call(2));
EXPECT_CALL(*handshake_throttle_, Destructor());
EXPECT_CALL(*Handle(), AddReceiveFlowControlQuota(_));
EXPECT_CALL(*Handle(), StartReceiving());
EXPECT_CALL(*ChannelClient(), DidConnect(String("a"), String("b")));
}
Channel()->Connect(url(), "");
checkpoint.Call(1);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"),
kDefaultReceiveQuotaThreshold);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"));
checkpoint.Call(2);
ChannelImpl()->OnCompletion(base::nullopt);
}
......@@ -938,8 +930,7 @@ TEST_F(WebSocketChannelImplHandshakeThrottleTest,
EXPECT_CALL(checkpoint, Call(1));
}
Channel()->Connect(url(), "");
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"),
kDefaultReceiveQuotaThreshold);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"));
Channel()->Fail("close during handshake",
mojom::ConsoleMessageLevel::kWarning,
std::make_unique<SourceLocation>(String(), 0, 0, nullptr));
......@@ -972,8 +963,7 @@ TEST_F(WebSocketChannelImplHandshakeThrottleTest,
EXPECT_CALL(checkpoint, Call(1));
}
Channel()->Connect(url(), "");
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"),
kDefaultReceiveQuotaThreshold);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"));
Channel()->Close(WebSocketChannelImpl::kCloseEventCodeGoingAway, "");
checkpoint.Call(1);
}
......@@ -1001,8 +991,7 @@ TEST_F(WebSocketChannelImplHandshakeThrottleTest,
EXPECT_CALL(checkpoint, Call(1));
}
Channel()->Connect(url(), "");
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"),
kDefaultReceiveQuotaThreshold);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"));
Channel()->Disconnect();
checkpoint.Call(1);
}
......@@ -1030,8 +1019,7 @@ TEST_F(WebSocketChannelImplHandshakeThrottleTest,
EXPECT_CALL(*ChannelClient(), DidClose(_, _, _));
}
Channel()->Connect(url(), "");
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"),
kDefaultReceiveQuotaThreshold);
ChannelImpl()->DidConnect(Handle(), String("a"), String("b"));
ChannelImpl()->OnCompletion("Connection blocked by throttle");
}
......
......@@ -68,7 +68,8 @@ class WebSocketHandle {
const String& user_agent_override,
WebSocketChannelImpl*) = 0;
virtual void Send(bool fin, MessageType, const char* data, wtf_size_t) = 0;
virtual void AddReceiveFlowControlQuota(int64_t quota) = 0;
virtual void StartReceiving() = 0;
virtual void ConsumePendingDataFrames() = 0;
virtual void Close(uint16_t code, const String& reason) = 0;
};
......
......@@ -4,6 +4,7 @@
#include "third_party/blink/renderer/modules/websockets/websocket_handle_impl.h"
#include "base/location.h"
#include "base/single_thread_task_runner.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "third_party/blink/public/platform/platform.h"
......@@ -56,8 +57,9 @@ void WebSocketHandleImpl::Connect(
connector->Connect(
url, protocols, site_for_cookies, user_agent_override,
handshake_client_receiver_.BindNewPipeAndPassRemote(task_runner_));
handshake_client_receiver_.set_disconnect_with_reason_handler(WTF::Bind(
&WebSocketHandleImpl::OnConnectionError, WTF::Unretained(this)));
handshake_client_receiver_.set_disconnect_with_reason_handler(
WTF::Bind(&WebSocketHandleImpl::OnConnectionError, WTF::Unretained(this),
FROM_HERE));
}
void WebSocketHandleImpl::Send(bool fin,
......@@ -92,12 +94,8 @@ void WebSocketHandleImpl::Send(bool fin,
websocket_->SendFrame(fin, type_to_pass, data_to_pass);
}
void WebSocketHandleImpl::AddReceiveFlowControlQuota(int64_t quota) {
DCHECK(websocket_);
NETWORK_DVLOG(1) << this << " flowControl(" << quota << ")";
websocket_->AddReceiveFlowControlQuota(quota);
void WebSocketHandleImpl::StartReceiving() {
websocket_->StartReceiving();
}
void WebSocketHandleImpl::Close(uint16_t code, const String& reason) {
......@@ -114,10 +112,13 @@ void WebSocketHandleImpl::Disconnect() {
channel_ = nullptr;
}
void WebSocketHandleImpl::OnConnectionError(uint32_t custom_reason,
void WebSocketHandleImpl::OnConnectionError(const base::Location& set_from,
uint32_t custom_reason,
const std::string& description) {
NETWORK_DVLOG(1) << " OnConnectionError( reason: " << custom_reason
<< ", description:" << description;
NETWORK_DVLOG(1) << " OnConnectionError("
<< " reason: " << custom_reason
<< ", description:" << description
<< "), set_from:" << set_from.ToString();
String message = "Unknown reason";
if (custom_reason == network::mojom::blink::WebSocket::kInternalFailure) {
message = String::FromUTF8(description.c_str(), description.size());
......@@ -152,10 +153,9 @@ void WebSocketHandleImpl::OnConnectionEstablished(
client_receiver,
const String& protocol,
const String& extensions,
uint64_t receive_quota_threshold,
mojo::ScopedDataPipeConsumerHandle readable) {
NETWORK_DVLOG(1) << this << " OnConnectionEstablished(" << protocol << ", "
<< extensions << ", " << receive_quota_threshold << ")";
<< extensions << ")";
if (!channel_)
return;
......@@ -163,8 +163,9 @@ void WebSocketHandleImpl::OnConnectionEstablished(
// From now on, we will detect mojo errors via |client_binding_|.
handshake_client_receiver_.reset();
client_binding_.Bind(std::move(client_receiver), task_runner_);
client_binding_.set_connection_error_with_reason_handler(WTF::Bind(
&WebSocketHandleImpl::OnConnectionError, WTF::Unretained(this)));
client_binding_.set_connection_error_with_reason_handler(
WTF::Bind(&WebSocketHandleImpl::OnConnectionError, WTF::Unretained(this),
FROM_HERE));
DCHECK(!websocket_);
websocket_ = std::move(websocket);
......@@ -175,7 +176,7 @@ void WebSocketHandleImpl::OnConnectionEstablished(
WTF::BindRepeating(&WebSocketHandleImpl::OnReadable,
WTF::Unretained(this)));
DCHECK_EQ(mojo_result, MOJO_RESULT_OK);
channel_->DidConnect(this, protocol, extensions, receive_quota_threshold);
channel_->DidConnect(this, protocol, extensions);
// |this| can be deleted here.
}
......@@ -206,7 +207,10 @@ void WebSocketHandleImpl::OnDataFrame(
void WebSocketHandleImpl::ConsumePendingDataFrames() {
DCHECK(channel_);
while (!pending_data_frames_.empty()) {
if (channel_->HasBackPressureToReceiveData())
return;
while (!pending_data_frames_.empty() &&
!channel_->HasBackPressureToReceiveData()) {
DataFrame& data_frame = pending_data_frames_.front();
NETWORK_DVLOG(2) << " ConsumePendingDataFrame frame=(" << data_frame.fin
<< ", " << data_frame.type
......
......@@ -41,6 +41,7 @@
#include "third_party/blink/renderer/platform/wtf/wtf_size_t.h"
namespace base {
class Location;
class SingleThreadTaskRunner;
} // namespace base
......@@ -61,7 +62,8 @@ class WebSocketHandleImpl
const String& user_agent_override,
WebSocketChannelImpl*) override;
void Send(bool fin, MessageType, const char* data, wtf_size_t) override;
void AddReceiveFlowControlQuota(int64_t quota) override;
void StartReceiving() override;
void ConsumePendingDataFrames() override;
void Close(uint16_t code, const String& reason) override;
private:
......@@ -77,7 +79,8 @@ class WebSocketHandleImpl
};
void Disconnect();
void OnConnectionError(uint32_t custom_reason,
void OnConnectionError(const base::Location& set_from,
uint32_t custom_reason,
const std::string& description);
// network::mojom::blink::WebSocketHandshakeClient methods:
......@@ -91,7 +94,6 @@ class WebSocketHandleImpl
client_receiver,
const String& selected_protocol,
const String& extensions,
uint64_t receive_quota_threshold,
mojo::ScopedDataPipeConsumerHandle readable) override;
// network::mojom::blink::WebSocketClient methods:
......@@ -106,7 +108,6 @@ class WebSocketHandleImpl
// Datapipe functions to receive.
void OnReadable(MojoResult result, const mojo::HandleSignalsState& state);
void ConsumePendingDataFrames();
// Returns false if |this| is deleted.
bool ConsumeDataFrame(bool fin,
network::mojom::blink::WebSocketMessageType type,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment