Commit 6f7b2afb authored by Adam Rice's avatar Adam Rice Committed by Commit Bot

Update buffered amount when async callbacks are called

If an asynchronous callback is called, it means we must have returned to
the start of the event loop. Ensure that any consumed bufferedAmount is
reflected in that case. Do not reflect bufferedAmount if the EventQueue
is paused, as that means that we may be in a nested event loop.

Add a unit test for this case. Also add a unit test for normal
bufferedAmount behaviour, as there wasn't one.

Add a web platform test for what happens if a sync XHR is performed
between calling send() and looking at bufferedAmount.

BUG=856651

Change-Id: Iafa2d619a1eb5284b64500ac03d336fb6380193b
Reviewed-on: https://chromium-review.googlesource.com/1151086
Commit-Queue: Adam Rice <ricea@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Cr-Commit-Position: refs/heads/master@{#580078}
parent 46bed525
// META: script=websocket.sub.js
// META: global=sharedworker
async_test(t => {
const url = 'wss://' + __SERVER__NAME + ':' + __SECURE__PORT + '/echo';
const ws = new WebSocket(url);
ws.onopen = t.step_func(() => {
ws.onclose = ws.onerror = null;
assert_equals(ws.bufferedAmount, 0);
ws.send('hello');
assert_equals(ws.bufferedAmount, 5);
// Stop execution for 1s with a sync XHR.
const xhr = new XMLHttpRequest();
xhr.open('GET', '/common/blank.html?pipe=trickle(d1)', false);
xhr.send();
assert_equals(ws.bufferedAmount, 5);
ws.close();
t.done();
});
ws.onerror = ws.onclose = t.unreached_func('open should succeed');
}, 'bufferedAmount should not be updated during a sync XHR');
done();
......@@ -100,7 +100,7 @@ bool DOMWebSocket::EventQueue::IsEmpty() const {
}
void DOMWebSocket::EventQueue::Pause() {
if (state_ != kActive && state_ != kUnpausePosted)
if (state_ == kStopped || state_ == kPaused)
return;
state_ = kPaused;
......@@ -125,6 +125,10 @@ void DOMWebSocket::EventQueue::ContextDestroyed() {
events_.clear();
}
bool DOMWebSocket::EventQueue::IsPaused() {
return state_ == kPaused || state_ == kUnpausePosted;
}
void DOMWebSocket::EventQueue::DispatchQueuedEvents() {
if (state_ != kActive)
return;
......@@ -391,15 +395,25 @@ void DOMWebSocket::UpdateBufferedAmountAfterClose(uint64_t payload_size) {
LogError("WebSocket is already in CLOSING or CLOSED state.");
}
void DOMWebSocket::PostBufferedAmountUpdateTask() {
if (buffered_amount_update_task_pending_)
return;
buffered_amount_update_task_pending_ = true;
GetExecutionContext()
->GetTaskRunner(TaskType::kWebSocket)
->PostTask(FROM_HERE, WTF::Bind(&DOMWebSocket::BufferedAmountUpdateTask,
WrapWeakPersistent(this)));
}
void DOMWebSocket::BufferedAmountUpdateTask() {
buffered_amount_update_task_pending_ = false;
ReflectBufferedAmountConsumption();
}
void DOMWebSocket::ReflectBufferedAmountConsumption() {
if (event_queue_->IsPaused())
return;
DCHECK_GE(buffered_amount_, consumed_buffered_amount_);
// Cast to unsigned long long is required since clang doesn't accept
// combination of %llu and uint64_t (known as unsigned long).
NETWORK_DVLOG(1) << "WebSocket " << this
<< " reflectBufferedAmountConsumption() " << buffered_amount_
<< " => " << (buffered_amount_ - consumed_buffered_amount_);
......@@ -653,6 +667,11 @@ void DOMWebSocket::Pause() {
void DOMWebSocket::Unpause() {
event_queue_->Unpause();
// If |consumed_buffered_amount_| was updated while the object was paused then
// the changes to |buffered_amount_| will not yet have been applied. Post
// another task to update it.
PostBufferedAmountUpdateTask();
}
void DOMWebSocket::DidConnect(const String& subprotocol,
......@@ -669,7 +688,7 @@ void DOMWebSocket::DidConnect(const String& subprotocol,
void DOMWebSocket::DidReceiveTextMessage(const String& msg) {
NETWORK_DVLOG(1) << "WebSocket " << this
<< " DidReceiveTextMessage() Text message " << msg;
ReflectBufferedAmountConsumption();
DCHECK_NE(state_, kConnecting);
if (state_ != kOpen)
return;
......@@ -683,7 +702,7 @@ void DOMWebSocket::DidReceiveBinaryMessage(
std::unique_ptr<Vector<char>> binary_data) {
NETWORK_DVLOG(1) << "WebSocket " << this << " DidReceiveBinaryMessage() "
<< binary_data->size() << " byte binary message";
ReflectBufferedAmountConsumption();
DCHECK(!origin_string_.IsNull());
DCHECK_NE(state_, kConnecting);
......@@ -719,6 +738,7 @@ void DOMWebSocket::DidReceiveBinaryMessage(
void DOMWebSocket::DidError() {
NETWORK_DVLOG(1) << "WebSocket " << this << " DidError()";
ReflectBufferedAmountConsumption();
state_ = kClosed;
event_queue_->Dispatch(Event::Create(EventTypeNames::error));
}
......@@ -730,16 +750,12 @@ void DOMWebSocket::DidConsumeBufferedAmount(uint64_t consumed) {
if (state_ == kClosed)
return;
consumed_buffered_amount_ += consumed;
if (buffered_amount_update_task_pending_)
return;
GetExecutionContext()
->GetTaskRunner(TaskType::kWebSocket)
->PostTask(FROM_HERE, WTF::Bind(&DOMWebSocket::BufferedAmountUpdateTask,
WrapWeakPersistent(this)));
PostBufferedAmountUpdateTask();
}
void DOMWebSocket::DidStartClosingHandshake() {
NETWORK_DVLOG(1) << "WebSocket " << this << " DidStartClosingHandshake()";
ReflectBufferedAmountConsumption();
state_ = kClosing;
}
......@@ -748,6 +764,7 @@ void DOMWebSocket::DidClose(
unsigned short code,
const String& reason) {
NETWORK_DVLOG(1) << "WebSocket " << this << " DidClose()";
ReflectBufferedAmountConsumption();
if (!channel_)
return;
bool all_data_has_been_consumed =
......
......@@ -168,6 +168,8 @@ class MODULES_EXPORT DOMWebSocket : public EventTargetWithInlineData,
void Unpause();
void ContextDestroyed();
bool IsPaused();
void Trace(blink::Visitor*);
private:
......@@ -225,7 +227,17 @@ class MODULES_EXPORT DOMWebSocket : public EventTargetWithInlineData,
// Updates |buffered_amount_after_close_| given the amount of data passed to
// send() method after the state changed to CLOSING or CLOSED.
void UpdateBufferedAmountAfterClose(uint64_t);
// Causes |buffered_amount_| to be updated asynchronously after returning to
// the event loop. Uses |buffered_amount_update_task_pending_| to avoid
// posting multiple tasks simultaneously.
void PostBufferedAmountUpdateTask();
// Updates |buffered_amount_| and resets
// |buffered_amount_update_task_pending_|.
void BufferedAmountUpdateTask();
// Updates |buffered_amount_| provided the object is not currently paused.
void ReflectBufferedAmountConsumption();
void ReleaseChannel();
......
......@@ -19,6 +19,7 @@
#include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/heap/handle.h"
#include "third_party/blink/renderer/platform/testing/unit_test_helpers.h"
#include "third_party/blink/renderer/platform/wtf/text/cstring.h"
#include "third_party/blink/renderer/platform/wtf/text/string_builder.h"
#include "third_party/blink/renderer/platform/wtf/text/wtf_string.h"
......@@ -809,7 +810,68 @@ TEST(DOMWebSocketTest, sendArrayBufferSuccess) {
// FIXME: We should have Blob tests here.
// We can't create a Blob because the blob registration cannot be mocked yet.
// FIXME: We should add tests for bufferedAmount.
TEST(DOMWebSocketTest, bufferedAmountUpdated) {
V8TestingScope scope;
DOMWebSocketTestScope websocket_scope(scope.GetExecutionContext());
{
InSequence s;
EXPECT_CALL(websocket_scope.Channel(),
Connect(KURL("ws://example.com/"), String()))
.WillOnce(Return(true));
EXPECT_CALL(websocket_scope.Channel(), Send(CString("hello")));
EXPECT_CALL(websocket_scope.Channel(), Send(CString("world")));
}
websocket_scope.Socket().Connect("ws://example.com/", Vector<String>(),
scope.GetExceptionState());
EXPECT_FALSE(scope.GetExceptionState().HadException());
websocket_scope.Socket().DidConnect("", "");
websocket_scope.Socket().send("hello", scope.GetExceptionState());
EXPECT_EQ(websocket_scope.Socket().bufferedAmount(), 5u);
websocket_scope.Socket().send("world", scope.GetExceptionState());
EXPECT_EQ(websocket_scope.Socket().bufferedAmount(), 10u);
websocket_scope.Socket().DidConsumeBufferedAmount(5);
websocket_scope.Socket().DidConsumeBufferedAmount(5);
EXPECT_EQ(websocket_scope.Socket().bufferedAmount(), 10u);
blink::test::RunPendingTasks();
EXPECT_EQ(websocket_scope.Socket().bufferedAmount(), 0u);
EXPECT_FALSE(scope.GetExceptionState().HadException());
}
TEST(DOMWebSocketTest, bufferedAmountUpdatedBeforeOnMessage) {
V8TestingScope scope;
DOMWebSocketTestScope websocket_scope(scope.GetExecutionContext());
{
InSequence s;
EXPECT_CALL(websocket_scope.Channel(),
Connect(KURL("ws://example.com/"), String()))
.WillOnce(Return(true));
EXPECT_CALL(websocket_scope.Channel(), Send(CString("hello")));
}
websocket_scope.Socket().Connect("ws://example.com/", Vector<String>(),
scope.GetExceptionState());
EXPECT_FALSE(scope.GetExceptionState().HadException());
websocket_scope.Socket().DidConnect("", "");
// send() is called from onopen
websocket_scope.Socket().send("hello", scope.GetExceptionState());
// (return to event loop)
websocket_scope.Socket().DidConsumeBufferedAmount(5);
EXPECT_EQ(websocket_scope.Socket().bufferedAmount(), 5ul);
// New message was already queued, is processed before task posted from
// DidConsumeBufferedAmount().
websocket_scope.Socket().DidReceiveTextMessage("hello");
// bufferedAmount is observed inside onmessage event handler.
EXPECT_EQ(websocket_scope.Socket().bufferedAmount(), 0ul);
blink::test::RunPendingTasks();
EXPECT_EQ(websocket_scope.Socket().bufferedAmount(), 0ul);
EXPECT_FALSE(scope.GetExceptionState().HadException());
}
// FIXME: We should add tests for data receiving.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment