Commit 5f909874 authored by Adam Rice's avatar Adam Rice Committed by Commit Bot

Add WebSocketChannel ApplyBackpressure() and RemoveBackpressure()

Previously flow control was not exposed to clients of
blink::WebSocketChannel. Expose it by adding ApplyBackpressure() and
RemoveBackpressure() methods. These are called by clients to start and
stop the additional of new receiving flow control quota.

Because there may still be outstanding receive quota,
DidReceive*Message() can be called again even after ApplyBackpressure()
is called. Once available receive quota is exhausted the messages will
stop until RemoveBackpressure() is called.

This feature will be used by the WebSocketStream implementation. See the
design doc at

https://docs.google.com/document/d/1XuxEshh5VYBYm1qRVKordTamCOsR-uGQBCYFcHXP4L0/edit#heading=h.7nki9mck5t64

Also move some initialisers for simple member variables to the header in
WebSocketChannelImpl.

BUG=983030

Change-Id: If3423f43adf7b7225d0d89eda2a94ce2cef4cb4f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1718966
Commit-Queue: Adam Rice <ricea@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarYoichi Osato <yoichio@chromium.org>
Cr-Commit-Position: refs/heads/master@{#682248}
parent 246c8db4
......@@ -73,6 +73,8 @@ class MockWebSocketChannel : public WebSocketChannel {
FailMock(reason, level, location.get());
}
MOCK_METHOD0(Disconnect, void());
MOCK_METHOD0(ApplyBackpressure, void());
MOCK_METHOD0(RemoveBackpressure, void());
MockWebSocketChannel() = default;
};
......
......@@ -52,6 +52,7 @@ class MODULES_EXPORT WebSocketChannel
enum class SendResult { SENT_SYNCHRONOUSLY, CALLBACK_WILL_BE_CALLED };
WebSocketChannel() = default;
virtual ~WebSocketChannel() = default;
enum CloseEventCode {
kCloseEventCodeNotSpecified = -1,
......@@ -101,7 +102,14 @@ class MODULES_EXPORT WebSocketChannel
// Do not call any methods after calling this method.
virtual void Disconnect() = 0; // Will suppress didClose().
virtual ~WebSocketChannel() = default;
// Clients can call ApplyBackpressure() to indicate that they want to stop
// receiving new messages. WebSocketChannelClient::DidReceive*Message() may
// still be called after this, until existing flow control quota is used up.
virtual void ApplyBackpressure() = 0;
// Clients should call RemoveBackpressure() after calling ApplyBackpressure()
// to indicate that they are ready to receive new messages.
virtual void RemoveBackpressure() = 0;
virtual void Trace(blink::Visitor* visitor) {}
......
......@@ -194,11 +194,7 @@ WebSocketChannelImpl::WebSocketChannelImpl(
client_(client),
identifier_(CreateUniqueIdentifier()),
execution_context_(execution_context),
sending_quota_(0),
received_data_size_for_flow_control_(0),
sent_size_of_top_message_(0),
location_at_construction_(std::move(location)),
throttle_passed_(false),
file_reading_task_runner_(
execution_context->GetTaskRunner(TaskType::kFileReading)) {
if (auto* scope = DynamicTo<WorkerGlobalScope>(*execution_context_))
......@@ -416,6 +412,15 @@ void WebSocketChannelImpl::Disconnect() {
identifier_ = 0;
}
void WebSocketChannelImpl::ApplyBackpressure() {
backpressure_ = true;
}
void WebSocketChannelImpl::RemoveBackpressure() {
backpressure_ = false;
AddReceiveFlowControlIfNecessary();
}
WebSocketChannelImpl::Message::Message(const std::string& text,
base::OnceClosure completion_callback)
: type(kMessageTypeText),
......@@ -692,7 +697,8 @@ void WebSocketChannelImpl::DidReceiveData(WebSocketHandle* handle,
}
received_data_size_for_flow_control_ += size;
AddReceiveFlowControlIfNecessary();
if (!backpressure_)
AddReceiveFlowControlIfNecessary();
const size_t message_size_so_far =
(receiving_message_data_ ? receiving_message_data_->size() : 0) + size;
......
......@@ -105,6 +105,8 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
mojom::ConsoleMessageLevel,
std::unique_ptr<SourceLocation>) override;
void Disconnect() override;
void ApplyBackpressure() override;
void RemoveBackpressure() override;
ExecutionContext* GetExecutionContext();
......@@ -227,24 +229,25 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
Member<BlobLoader> blob_loader_;
HeapDeque<Member<Message>> messages_;
scoped_refptr<SharedBuffer> receiving_message_data_;
Member<ExecutionContext> execution_context_;
bool receiving_message_type_is_text_;
uint64_t sending_quota_;
uint64_t received_data_size_for_flow_control_;
wtf_size_t sent_size_of_top_message_;
const Member<ExecutionContext> execution_context_;
bool backpressure_ = false;
bool receiving_message_type_is_text_ = false;
bool throttle_passed_ = false;
uint64_t sending_quota_ = 0;
uint64_t received_data_size_for_flow_control_ = 0;
wtf_size_t sent_size_of_top_message_ = 0;
FrameScheduler::SchedulingAffectingFeatureHandle
feature_handle_for_scheduler_;
std::unique_ptr<SourceLocation> location_at_construction_;
const std::unique_ptr<const SourceLocation> location_at_construction_;
network::mojom::blink::WebSocketHandshakeRequestPtr handshake_request_;
std::unique_ptr<WebSocketHandshakeThrottle> handshake_throttle_;
// This field is only initialised if the object is still waiting for a
// throttle response when DidConnect is called.
std::unique_ptr<ConnectInfo> connect_info_;
bool throttle_passed_;
scoped_refptr<base::SingleThreadTaskRunner> file_reading_task_runner_;
const scoped_refptr<base::SingleThreadTaskRunner> file_reading_task_runner_;
base::Optional<uint64_t> receive_quota_threshold_;
};
......
......@@ -36,6 +36,8 @@ using testing::SaveArg;
namespace blink {
constexpr uint64_t kInitialReceiveFlowControlQuota = 65536;
typedef testing::StrictMock<testing::MockFunction<void(int)>> Checkpoint;
class MockWebSocketChannelClient
......@@ -165,7 +167,8 @@ class WebSocketChannelImplTest : public PageTestBase {
InSequence s;
EXPECT_CALL(*Handle(),
Connect(KURL("ws://localhost/"), _, _, _, ChannelImpl()));
EXPECT_CALL(*Handle(), AddReceiveFlowControlQuota(65536));
EXPECT_CALL(*Handle(),
AddReceiveFlowControlQuota(kInitialReceiveFlowControlQuota));
EXPECT_CALL(*ChannelClient(), DidConnect(String("a"), String("b")));
}
EXPECT_TRUE(Channel()->Connect(KURL("ws://localhost/"), "x"));
......@@ -234,7 +237,8 @@ TEST_F(WebSocketChannelImplTest, connectSuccess) {
KURLEq("http://example.com/"), _, ChannelImpl()))
.WillOnce(SaveArg<1>(&protocols));
EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(*Handle(), AddReceiveFlowControlQuota(65536));
EXPECT_CALL(*Handle(),
AddReceiveFlowControlQuota(kInitialReceiveFlowControlQuota));
EXPECT_CALL(*ChannelClient(), DidConnect(String("a"), String("b")));
}
......@@ -719,6 +723,32 @@ TEST_F(WebSocketChannelImplTest, receiveBinaryNonUTF8) {
Handle(), true, WebSocketHandle::kMessageTypeBinary, "\x80\xff", 2);
}
TEST_F(WebSocketChannelImplTest, receiveWithBackpressure) {
Connect();
std::string data(kInitialReceiveFlowControlQuota, 'a');
Checkpoint checkpoint;
{
InSequence s;
EXPECT_CALL(*ChannelClient(), DidReceiveTextMessage(_));
EXPECT_CALL(checkpoint, Call(1));
EXPECT_CALL(*Handle(),
AddReceiveFlowControlQuota(kInitialReceiveFlowControlQuota));
EXPECT_CALL(*Handle(),
AddReceiveFlowControlQuota(kInitialReceiveFlowControlQuota));
EXPECT_CALL(*ChannelClient(), DidReceiveTextMessage(_));
}
ChannelImpl()->ApplyBackpressure();
ChannelImpl()->DidReceiveData(Handle(), true,
WebSocketHandle::kMessageTypeText, data.data(),
data.size());
checkpoint.Call(1);
ChannelImpl()->RemoveBackpressure();
ChannelImpl()->DidReceiveData(Handle(), true,
WebSocketHandle::kMessageTypeText, data.data(),
data.size());
}
TEST_F(WebSocketChannelImplTest, closeFromBrowser) {
Connect();
Checkpoint checkpoint;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment