Commit ced0d0ef authored by yhirano@chromium.org's avatar yhirano@chromium.org

[WebSocket] bufferedAmount should not decrease inside a task.

The spec says that bufferedAmount must not decrease in the task that
send() is called.
Fixed both the new and the old implementation although the old implementaion
is at any rate broken (bufferedAmount in the old implementation includes
the protocol overhead).

BUG=159563

Review URL: https://codereview.chromium.org/311993006

git-svn-id: svn://svn.chromium.org/blink/trunk@176298 bbb929c8-8fbe-4397-9dbb-9b2b20218538
parent 288f4f0f
WebSocket bufferedAmount after send
On success, you will see a series of "PASS" messages, followed by "TEST COMPLETE".
onopen
PASS ws.bufferedAmount is 5
PASS ws.bufferedAmount is 37
PASS ws.bufferedAmount is 42
PASS ws.bufferedAmount is 45
PASS ws.bufferedAmount is 0
onclose
PASS ws.bufferedAmount is 0
PASS successfullyParsed is true
TEST COMPLETE
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML//EN">
<html>
<head>
<script src="/js-test-resources/js-test.js"></script>
</head>
<body>
<script type="text/javascript">
description('WebSocket bufferedAmount after send');
window.jsTestIsAsync = true;
var ws = new WebSocket('ws://localhost:8880/echo');
ws.onopen = function()
{
debug('onopen');
ws.send('hello');
shouldBe('ws.bufferedAmount', '5');
ws.send(new ArrayBuffer(32));
shouldBe('ws.bufferedAmount', '37');
ws.send(new Blob(['abc', 'de']));
shouldBe('ws.bufferedAmount', '42');
ws.send('bye');
shouldBe('ws.bufferedAmount', '45');
// FIXME: This is a bit flaky but I don't know how to fix it.
setTimeout(function() {
shouldBe('ws.bufferedAmount', '0');
ws.close();
}, 50);
};
ws.onclose = function()
{
debug('onclose');
shouldBe('ws.bufferedAmount', '0');
finishJSTest();
};
</script>
</body>
</html>
......@@ -72,11 +72,11 @@ MainThreadWebSocketChannel::MainThreadWebSocketChannel(Document* document, WebSo
, m_closingTimer(this, &MainThreadWebSocketChannel::closingTimerFired)
, m_state(ChannelIdle)
, m_shouldDiscardReceivedData(false)
, m_unhandledBufferedAmount(0)
, m_identifier(0)
, m_hasContinuousFrame(false)
, m_closeEventCode(CloseEventCodeAbnormalClosure)
, m_outgoingFrameQueueStatus(OutgoingFrameQueueOpen)
, m_numConsumedBytesInCurrentFrame(0)
, m_blobLoaderStatus(BlobLoaderNotStarted)
, m_sourceURLAtConstruction(sourceURL)
, m_lineNumberAtConstruction(lineNumber)
......@@ -160,14 +160,6 @@ WebSocketChannel::SendResult MainThreadWebSocketChannel::send(PassOwnPtr<Vector<
return WebSocketChannel::SendSuccess;
}
unsigned long MainThreadWebSocketChannel::bufferedAmount() const
{
WTF_LOG(Network, "MainThreadWebSocketChannel %p bufferedAmount()", this);
ASSERT(m_handle);
ASSERT(!m_suspended);
return m_handle->bufferedAmount();
}
void MainThreadWebSocketChannel::close(int code, const String& reason)
{
WTF_LOG(Network, "MainThreadWebSocketChannel %p close() code=%d reason='%s'", this, code, reason.utf8().data());
......@@ -296,13 +288,12 @@ void MainThreadWebSocketChannel::didCloseSocketStream(SocketStreamHandle* handle
if (m_outgoingFrameQueueStatus != OutgoingFrameQueueClosed)
abortOutgoingFrameQueue();
if (m_handle) {
m_unhandledBufferedAmount = m_handle->bufferedAmount();
WebSocketChannelClient* client = m_client;
m_client = 0;
clearDocument();
m_handle = nullptr;
if (client)
client->didClose(m_unhandledBufferedAmount, m_receivedClosingHandshake ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete, m_closeEventCode, m_closeEventReason);
client->didClose(m_receivedClosingHandshake ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete, m_closeEventCode, m_closeEventReason);
}
deref();
}
......@@ -333,10 +324,35 @@ void MainThreadWebSocketChannel::didReceiveSocketStreamData(SocketStreamHandle*
processBuffer();
}
void MainThreadWebSocketChannel::didUpdateBufferedAmount(SocketStreamHandle*, size_t bufferedAmount)
void MainThreadWebSocketChannel::didConsumeBufferedAmount(SocketStreamHandle*, size_t consumed)
{
if (m_client)
m_client->didUpdateBufferedAmount(bufferedAmount);
if (m_framingOverheadQueue.isEmpty()) {
// Ignore the handshake consumption.
return;
}
if (!m_client || m_state == ChannelClosed)
return;
size_t remain = consumed;
while (remain > 0) {
ASSERT(!m_framingOverheadQueue.isEmpty());
const FramingOverhead& frame = m_framingOverheadQueue.first();
ASSERT(m_numConsumedBytesInCurrentFrame <= frame.frameDataSize());
size_t consumedInThisFrame = std::min(remain, frame.frameDataSize() - m_numConsumedBytesInCurrentFrame);
remain -= consumedInThisFrame;
m_numConsumedBytesInCurrentFrame += consumedInThisFrame;
if (m_numConsumedBytesInCurrentFrame == frame.frameDataSize()) {
if (m_client && WebSocketFrame::isNonControlOpCode(frame.opcode())) {
// FIXME: As |consumed| is the number of possibly compressed
// bytes, we can't determine the number of consumed original
// bytes in the middle of a frame.
m_client->didConsumeBufferedAmount(frame.originalPayloadLength());
}
m_framingOverheadQueue.takeFirst();
m_numConsumedBytesInCurrentFrame = 0;
}
}
}
void MainThreadWebSocketChannel::didFailSocketStream(SocketStreamHandle* handle, const SocketStreamError& error)
......@@ -845,6 +861,7 @@ bool MainThreadWebSocketChannel::sendFrame(WebSocketFrame::OpCode opCode, const
Vector<char> frameData;
frame.makeFrameData(frameData);
m_framingOverheadQueue.append(FramingOverhead(opCode, frameData.size(), dataLength));
m_perMessageDeflate.resetDeflateBuffer();
return m_handle->send(frameData.data(), frameData.size());
......
......@@ -75,7 +75,6 @@ public:
virtual WebSocketChannel::SendResult send(const ArrayBuffer&, unsigned byteOffset, unsigned byteLength) OVERRIDE;
virtual WebSocketChannel::SendResult send(PassRefPtr<BlobDataHandle>) OVERRIDE;
virtual WebSocketChannel::SendResult send(PassOwnPtr<Vector<char> > data) OVERRIDE;
virtual unsigned long bufferedAmount() const OVERRIDE;
// Start closing handshake. Use the CloseEventCodeNotSpecified for the code
// argument to omit payload.
virtual void close(int code, const String& reason) OVERRIDE;
......@@ -89,7 +88,7 @@ public:
virtual void didOpenSocketStream(SocketStreamHandle*) OVERRIDE;
virtual void didCloseSocketStream(SocketStreamHandle*) OVERRIDE;
virtual void didReceiveSocketStreamData(SocketStreamHandle*, const char*, int) OVERRIDE;
virtual void didUpdateBufferedAmount(SocketStreamHandle*, size_t bufferedAmount) OVERRIDE;
virtual void didConsumeBufferedAmount(SocketStreamHandle*, size_t consumed) OVERRIDE;
virtual void didFailSocketStream(SocketStreamHandle*, const SocketStreamError&) OVERRIDE;
// FileReaderLoaderClient functions.
......@@ -101,6 +100,25 @@ public:
private:
MainThreadWebSocketChannel(Document*, WebSocketChannelClient*, const String&, unsigned);
class FramingOverhead {
public:
FramingOverhead(WebSocketFrame::OpCode opcode, size_t frameDataSize, size_t originalPayloadLength)
: m_opcode(opcode)
, m_frameDataSize(frameDataSize)
, m_originalPayloadLength(originalPayloadLength)
{
}
WebSocketFrame::OpCode opcode() const { return m_opcode; }
size_t frameDataSize() const { return m_frameDataSize; }
size_t originalPayloadLength() const { return m_originalPayloadLength; }
private:
WebSocketFrame::OpCode m_opcode;
size_t m_frameDataSize;
size_t m_originalPayloadLength;
};
void clearDocument();
void disconnectHandle();
......@@ -196,7 +214,6 @@ private:
Timer<MainThreadWebSocketChannel> m_closingTimer;
ChannelState m_state;
bool m_shouldDiscardReceivedData;
unsigned long m_unhandledBufferedAmount;
unsigned long m_identifier; // m_identifier == 0 means that we could not obtain a valid identifier.
......@@ -209,6 +226,10 @@ private:
Deque<OwnPtr<QueuedFrame> > m_outgoingFrameQueue;
OutgoingFrameQueueStatus m_outgoingFrameQueueStatus;
Deque<FramingOverhead> m_framingOverheadQueue;
// The number of bytes that are already consumed (i.e. sent) in the
// current frame.
size_t m_numConsumedBytesInCurrentFrame;
// FIXME: Load two or more Blobs simultaneously for better performance.
OwnPtr<FileReaderLoader> m_blobLoader;
......
......@@ -115,7 +115,6 @@ NewWebSocketChannelImpl::NewWebSocketChannelImpl(ExecutionContext* context, WebS
, m_identifier(0)
, m_sendingQuota(0)
, m_receivedDataSizeForFlowControl(receivedDataSizeForFlowControlHighWaterMark * 2) // initial quota
, m_bufferedAmount(0)
, m_sentSizeOfTopMessage(0)
, m_sourceURLAtConstruction(sourceURL)
, m_lineNumberAtConstruction(lineNumber)
......@@ -229,12 +228,6 @@ WebSocketChannel::SendResult NewWebSocketChannelImpl::send(PassOwnPtr<Vector<cha
return SendSuccess;
}
unsigned long NewWebSocketChannelImpl::bufferedAmount() const
{
WTF_LOG(Network, "NewWebSocketChannelImpl %p bufferedAmount()", this);
return m_bufferedAmount;
}
void NewWebSocketChannelImpl::close(int code, const String& reason)
{
WTF_LOG(Network, "NewWebSocketChannelImpl %p close(%d, %s)", this, code, reason.utf8().data());
......@@ -305,7 +298,7 @@ NewWebSocketChannelImpl::Message::Message(PassOwnPtr<Vector<char> > vectorData)
void NewWebSocketChannelImpl::sendInternal()
{
ASSERT(m_handle);
unsigned long bufferedAmount = m_bufferedAmount;
unsigned long consumedBufferedAmount = 0;
while (!m_messages.isEmpty() && m_sendingQuota > 0 && !m_blobLoader) {
bool final = false;
Message* message = m_messages.first().get();
......@@ -318,6 +311,7 @@ void NewWebSocketChannelImpl::sendInternal()
m_handle->send(final, type, message->text.data() + m_sentSizeOfTopMessage, size);
m_sentSizeOfTopMessage += size;
m_sendingQuota -= size;
consumedBufferedAmount += size;
break;
}
case MessageTypeBlob:
......@@ -332,6 +326,7 @@ void NewWebSocketChannelImpl::sendInternal()
m_handle->send(final, type, static_cast<const char*>(message->arrayBuffer->data()) + m_sentSizeOfTopMessage, size);
m_sentSizeOfTopMessage += size;
m_sendingQuota -= size;
consumedBufferedAmount += size;
break;
}
case MessageTypeVector: {
......@@ -342,6 +337,7 @@ void NewWebSocketChannelImpl::sendInternal()
m_handle->send(final, type, message->vectorData->data() + m_sentSizeOfTopMessage, size);
m_sentSizeOfTopMessage += size;
m_sendingQuota -= size;
consumedBufferedAmount += size;
break;
}
}
......@@ -350,9 +346,8 @@ void NewWebSocketChannelImpl::sendInternal()
m_sentSizeOfTopMessage = 0;
}
}
if (m_client && m_bufferedAmount != bufferedAmount) {
m_client->didUpdateBufferedAmount(m_bufferedAmount);
}
if (m_client && consumedBufferedAmount > 0)
m_client->didConsumeBufferedAmount(consumedBufferedAmount);
}
void NewWebSocketChannelImpl::flowControlIfNecessary()
......@@ -383,7 +378,7 @@ void NewWebSocketChannelImpl::handleDidClose(bool wasClean, unsigned short code,
m_client = 0;
WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
wasClean ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete;
client->didClose(m_bufferedAmount, status, code, reason);
client->didClose(status, code, reason);
// client->didClose may delete this object.
}
......
......@@ -82,7 +82,6 @@ public:
virtual WebSocketChannel::SendResult send(const ArrayBuffer&, unsigned byteOffset, unsigned byteLength) OVERRIDE;
virtual WebSocketChannel::SendResult send(PassRefPtr<BlobDataHandle>) OVERRIDE;
virtual WebSocketChannel::SendResult send(PassOwnPtr<Vector<char> > data) OVERRIDE;
virtual unsigned long bufferedAmount() const OVERRIDE;
// Start closing handshake. Use the CloseEventCodeNotSpecified for the code
// argument to omit payload.
virtual void close(int code, const String& reason) OVERRIDE;
......@@ -166,7 +165,6 @@ private:
bool m_receivingMessageTypeIsText;
int64_t m_sendingQuota;
int64_t m_receivedDataSizeForFlowControl;
unsigned long m_bufferedAmount;
size_t m_sentSizeOfTopMessage;
String m_sourceURLAtConstruction;
......
......@@ -80,9 +80,9 @@ void ThreadableWebSocketChannelClientWrapper::didReceiveBinaryData(PassOwnPtr<Ve
processPendingTasks();
}
void ThreadableWebSocketChannelClientWrapper::didUpdateBufferedAmount(unsigned long bufferedAmount)
void ThreadableWebSocketChannelClientWrapper::didConsumeBufferedAmount(unsigned long consumed)
{
m_pendingTasks.append(createCallbackTask(&didUpdateBufferedAmountCallback, this, bufferedAmount));
m_pendingTasks.append(createCallbackTask(&didConsumeBufferedAmountCallback, this, consumed));
if (!m_suspended)
processPendingTasks();
}
......@@ -94,9 +94,9 @@ void ThreadableWebSocketChannelClientWrapper::didStartClosingHandshake()
processPendingTasks();
}
void ThreadableWebSocketChannelClientWrapper::didClose(unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
void ThreadableWebSocketChannelClientWrapper::didClose(WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
m_pendingTasks.append(createCallbackTask(&didCloseCallback, this, unhandledBufferedAmount, closingHandshakeCompletion, code, reason));
m_pendingTasks.append(createCallbackTask(&didCloseCallback, this, closingHandshakeCompletion, code, reason));
if (!m_suspended)
processPendingTasks();
}
......@@ -150,11 +150,11 @@ void ThreadableWebSocketChannelClientWrapper::didReceiveBinaryDataCallback(Execu
wrapper->m_client->didReceiveBinaryData(binaryData);
}
void ThreadableWebSocketChannelClientWrapper::didUpdateBufferedAmountCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper, unsigned long bufferedAmount)
void ThreadableWebSocketChannelClientWrapper::didConsumeBufferedAmountCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper, unsigned long consumed)
{
ASSERT_UNUSED(context, !context);
if (wrapper->m_client)
wrapper->m_client->didUpdateBufferedAmount(bufferedAmount);
wrapper->m_client->didConsumeBufferedAmount(consumed);
}
void ThreadableWebSocketChannelClientWrapper::didStartClosingHandshakeCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper)
......@@ -164,11 +164,11 @@ void ThreadableWebSocketChannelClientWrapper::didStartClosingHandshakeCallback(E
wrapper->m_client->didStartClosingHandshake();
}
void ThreadableWebSocketChannelClientWrapper::didCloseCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
void ThreadableWebSocketChannelClientWrapper::didCloseCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
ASSERT_UNUSED(context, !context);
if (wrapper->m_client)
wrapper->m_client->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason);
wrapper->m_client->didClose(closingHandshakeCompletion, code, reason);
}
void ThreadableWebSocketChannelClientWrapper::didReceiveMessageErrorCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper)
......
......@@ -55,9 +55,9 @@ public:
void didConnect(const String& subprotocol, const String& extensions);
void didReceiveMessage(const String& message);
void didReceiveBinaryData(PassOwnPtr<Vector<char> >);
void didUpdateBufferedAmount(unsigned long bufferedAmount);
void didConsumeBufferedAmount(unsigned long);
void didStartClosingHandshake();
void didClose(unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus, unsigned short code, const String& reason);
void didClose(WebSocketChannelClient::ClosingHandshakeCompletionStatus, unsigned short code, const String& reason);
void didReceiveMessageError();
void suspend();
......@@ -73,9 +73,9 @@ private:
static void didConnectCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, const String& subprotocol, const String& extensions);
static void didReceiveMessageCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, const String& message);
static void didReceiveBinaryDataCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, PassOwnPtr<Vector<char> >);
static void didUpdateBufferedAmountCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, unsigned long bufferedAmount);
static void didConsumeBufferedAmountCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, unsigned long);
static void didStartClosingHandshakeCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>);
static void didCloseCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus, unsigned short code, const String& reason);
static void didCloseCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, WebSocketChannelClient::ClosingHandshakeCompletionStatus, unsigned short code, const String& reason);
static void didReceiveMessageErrorCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>);
WebSocketChannelClient* m_client;
......
......@@ -228,11 +228,13 @@ WebSocket::WebSocket(ExecutionContext* context)
: ActiveDOMObject(context)
, m_state(CONNECTING)
, m_bufferedAmount(0)
, m_consumedBufferedAmount(0)
, m_bufferedAmountAfterClose(0)
, m_binaryType(BinaryTypeBlob)
, m_subprotocol("")
, m_extensions("")
, m_eventQueue(EventQueue::create(this))
, m_bufferedAmountConsumeTimer(this, &WebSocket::reflectBufferedAmountConsumption)
{
ScriptWrappable::init(this);
}
......@@ -373,6 +375,15 @@ void WebSocket::updateBufferedAmountAfterClose(unsigned long payloadSize)
logError("WebSocket is already in CLOSING or CLOSED state.");
}
void WebSocket::reflectBufferedAmountConsumption(Timer<WebSocket>*)
{
ASSERT(m_bufferedAmount >= m_consumedBufferedAmount);
WTF_LOG(Network, "WebSocket %p reflectBufferedAmountConsumption() %lu => %lu", this, m_bufferedAmount, m_bufferedAmount - m_consumedBufferedAmount);
m_bufferedAmount -= m_consumedBufferedAmount;
m_consumedBufferedAmount = 0;
}
void WebSocket::releaseChannel()
{
ASSERT(m_channel);
......@@ -393,6 +404,7 @@ void WebSocket::send(const String& message, ExceptionState& exceptionState)
return;
}
ASSERT(m_channel);
m_bufferedAmount += message.utf8().length();
handleSendResult(m_channel->send(message), exceptionState, WebSocketSendTypeString);
}
......@@ -409,6 +421,7 @@ void WebSocket::send(ArrayBuffer* binaryData, ExceptionState& exceptionState)
return;
}
ASSERT(m_channel);
m_bufferedAmount += binaryData->byteLength();
handleSendResult(m_channel->send(*binaryData, 0, binaryData->byteLength()), exceptionState, WebSocketSendTypeArrayBuffer);
}
......@@ -425,6 +438,7 @@ void WebSocket::send(ArrayBufferView* arrayBufferView, ExceptionState& exception
return;
}
ASSERT(m_channel);
m_bufferedAmount += arrayBufferView->byteLength();
RefPtr<ArrayBuffer> arrayBuffer(arrayBufferView->buffer());
handleSendResult(m_channel->send(*arrayBuffer, arrayBufferView->byteOffset(), arrayBufferView->byteLength()), exceptionState, WebSocketSendTypeArrayBufferView);
}
......@@ -441,6 +455,7 @@ void WebSocket::send(Blob* binaryData, ExceptionState& exceptionState)
updateBufferedAmountAfterClose(static_cast<unsigned long>(binaryData->size()));
return;
}
m_bufferedAmount += binaryData->size();
ASSERT(m_channel);
handleSendResult(m_channel->send(binaryData->blobDataHandle()), exceptionState, WebSocketSendTypeBlob);
}
......@@ -640,12 +655,15 @@ void WebSocket::didReceiveMessageError()
m_eventQueue->dispatch(Event::create(EventTypeNames::error));
}
void WebSocket::didUpdateBufferedAmount(unsigned long bufferedAmount)
void WebSocket::didConsumeBufferedAmount(unsigned long consumed)
{
WTF_LOG(Network, "WebSocket %p didUpdateBufferedAmount() New bufferedAmount is %lu", this, bufferedAmount);
ASSERT(m_bufferedAmount >= consumed);
WTF_LOG(Network, "WebSocket %p didConsumeBufferedAmount(%lu)", this, consumed);
if (m_state == CLOSED)
return;
m_bufferedAmount = bufferedAmount;
m_consumedBufferedAmount += consumed;
if (!m_bufferedAmountConsumeTimer.isActive())
m_bufferedAmountConsumeTimer.startOneShot(0, FROM_HERE);
}
void WebSocket::didStartClosingHandshake()
......@@ -654,15 +672,15 @@ void WebSocket::didStartClosingHandshake()
m_state = CLOSING;
}
void WebSocket::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
void WebSocket::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
WTF_LOG(Network, "WebSocket %p didClose()", this);
if (!m_channel)
return;
bool wasClean = m_state == CLOSING && !unhandledBufferedAmount && closingHandshakeCompletion == ClosingHandshakeComplete && code != WebSocketChannel::CloseEventCodeAbnormalClosure;
bool hasAllDataConsumed = m_bufferedAmount == m_consumedBufferedAmount;
bool wasClean = m_state == CLOSING && hasAllDataConsumed && closingHandshakeCompletion == ClosingHandshakeComplete && code != WebSocketChannel::CloseEventCodeAbnormalClosure;
m_state = CLOSED;
m_bufferedAmount = unhandledBufferedAmount;
m_eventQueue->dispatch(CloseEvent::create(wasClean, code, reason));
releaseChannel();
}
......
......@@ -120,9 +120,9 @@ public:
virtual void didReceiveMessage(const String& message) OVERRIDE;
virtual void didReceiveBinaryData(PassOwnPtr<Vector<char> >) OVERRIDE;
virtual void didReceiveMessageError() OVERRIDE;
virtual void didUpdateBufferedAmount(unsigned long bufferedAmount) OVERRIDE;
virtual void didConsumeBufferedAmount(unsigned long) OVERRIDE;
virtual void didStartClosingHandshake() OVERRIDE;
virtual void didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus, unsigned short code, const String& reason) OVERRIDE;
virtual void didClose(ClosingHandshakeCompletionStatus, unsigned short code, const String& reason) OVERRIDE;
virtual void trace(Visitor*) OVERRIDE;
......@@ -208,6 +208,7 @@ private:
// Updates m_bufferedAmountAfterClose given the amount of data passed to
// send() method after the state changed to CLOSING or CLOSED.
void updateBufferedAmountAfterClose(unsigned long);
void reflectBufferedAmountConsumption(Timer<WebSocket>*);
void releaseChannel();
......@@ -221,6 +222,9 @@ private:
State m_state;
KURL m_url;
unsigned long m_bufferedAmount;
// The consumed buffered amount that will be reflected to m_bufferedAmount
// later. It will be cleared once reflected.
unsigned long m_consumedBufferedAmount;
unsigned long m_bufferedAmountAfterClose;
BinaryType m_binaryType;
// The subprotocol the server selected.
......@@ -228,6 +232,7 @@ private:
String m_extensions;
RefPtrWillBeMember<EventQueue> m_eventQueue;
Timer<WebSocket> m_bufferedAmountConsumeTimer;
};
} // namespace WebCore
......
......@@ -88,7 +88,6 @@ public:
// For WorkerThreadableWebSocketChannel.
virtual SendResult send(PassOwnPtr<Vector<char> >) = 0;
virtual unsigned long bufferedAmount() const = 0;
virtual void close(int code, const String& reason) = 0;
// Log the reason text and close the connection. Will call didClose().
......
......@@ -37,24 +37,24 @@
namespace WebCore {
class WebSocketChannelClient {
public:
virtual ~WebSocketChannelClient() { }
virtual void didConnect(const String& subprotocol, const String& extensions) { }
virtual void didReceiveMessage(const String&) { }
virtual void didReceiveBinaryData(PassOwnPtr<Vector<char> >) { }
virtual void didReceiveMessageError() { }
virtual void didUpdateBufferedAmount(unsigned long /* bufferedAmount */) { }
virtual void didStartClosingHandshake() { }
enum ClosingHandshakeCompletionStatus {
ClosingHandshakeIncomplete,
ClosingHandshakeComplete
};
virtual void didClose(unsigned long /* unhandledBufferedAmount */, ClosingHandshakeCompletionStatus, unsigned short /* code */, const String& /* reason */) { }
protected:
WebSocketChannelClient() { }
class WebSocketChannelClient {
public:
virtual ~WebSocketChannelClient() { }
virtual void didConnect(const String& subprotocol, const String& extensions) { }
virtual void didReceiveMessage(const String&) { }
virtual void didReceiveBinaryData(PassOwnPtr<Vector<char> >) { }
virtual void didReceiveMessageError() { }
virtual void didConsumeBufferedAmount(unsigned long consumed) { }
virtual void didStartClosingHandshake() { }
enum ClosingHandshakeCompletionStatus {
ClosingHandshakeIncomplete,
ClosingHandshakeComplete
};
virtual void didClose(ClosingHandshakeCompletionStatus, unsigned short /* code */, const String& /* reason */) { }
protected:
WebSocketChannelClient() { }
};
} // namespace WebCore
......
......@@ -116,7 +116,7 @@ public:
::testing::Mock::VerifyAndClear(m_websocket->channel());
EXPECT_CALL(channel(), disconnect()).Times(AnyNumber());
m_websocket->didClose(0, WebSocketChannelClient::ClosingHandshakeIncomplete, 1006, "");
m_websocket->didClose(WebSocketChannelClient::ClosingHandshakeIncomplete, 1006, "");
m_websocket.clear();
Heap::collectAllGarbage();
}
......@@ -294,7 +294,7 @@ TEST_F(WebSocketTest, didClose)
EXPECT_FALSE(m_exceptionState.hadException());
EXPECT_EQ(WebSocket::CONNECTING, m_websocket->readyState());
m_websocket->didClose(0, WebSocketChannelClient::ClosingHandshakeIncomplete, 1006, "");
m_websocket->didClose(WebSocketChannelClient::ClosingHandshakeIncomplete, 1006, "");
EXPECT_EQ(WebSocket::CLOSED, m_websocket->readyState());
}
......@@ -463,7 +463,7 @@ TEST_F(WebSocketTest, closeWhenClosed)
EXPECT_FALSE(m_exceptionState.hadException());
EXPECT_EQ(WebSocket::CLOSING, m_websocket->readyState());
m_websocket->didClose(0, WebSocketChannelClient::ClosingHandshakeComplete, 1000, String());
m_websocket->didClose(WebSocketChannelClient::ClosingHandshakeComplete, 1000, String());
EXPECT_EQ(WebSocket::CLOSED, m_websocket->readyState());
m_websocket->close(m_exceptionState);
......@@ -523,7 +523,7 @@ TEST_F(WebSocketTest, sendStringWhenClosed)
EXPECT_FALSE(m_exceptionState.hadException());
m_websocket->didClose(0, WebSocketChannelClient::ClosingHandshakeIncomplete, 1006, "");
m_websocket->didClose(WebSocketChannelClient::ClosingHandshakeIncomplete, 1006, "");
checkpoint.Call(1);
m_websocket->send("hello", m_exceptionState);
......@@ -642,7 +642,7 @@ TEST_F(WebSocketTest, sendArrayBufferWhenClosed)
EXPECT_FALSE(m_exceptionState.hadException());
m_websocket->didClose(0, WebSocketChannelClient::ClosingHandshakeIncomplete, 1006, "");
m_websocket->didClose(WebSocketChannelClient::ClosingHandshakeIncomplete, 1006, "");
checkpoint.Call(1);
m_websocket->send(view->buffer().get(), m_exceptionState);
......
......@@ -74,10 +74,6 @@ public:
{
m_sendRequestResult = sendRequestResult;
}
void setBufferedAmount(unsigned long bufferedAmount)
{
m_bufferedAmount = bufferedAmount;
}
// All getter are called on the worker thread.
bool connectRequestResult() const
......@@ -88,10 +84,6 @@ public:
{
return m_sendRequestResult;
}
unsigned long bufferedAmount() const
{
return m_bufferedAmount;
}
// This should be called after all setters are called and before any
// getters are called.
......@@ -110,14 +102,12 @@ private:
: m_event(event)
, m_connectRequestResult(false)
, m_sendRequestResult(WebSocketChannel::SendFail)
, m_bufferedAmount(0)
{
}
OwnPtr<blink::WebWaitableEvent> m_event;
bool m_connectRequestResult;
WebSocketChannel::SendResult m_sendRequestResult;
unsigned long m_bufferedAmount;
};
WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
......@@ -164,13 +154,6 @@ WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<B
return m_bridge->send(blobData);
}
unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
{
if (!m_bridge)
return 0;
return m_bridge->bufferedAmount();
}
void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
{
if (m_bridge)
......@@ -311,18 +294,6 @@ void WorkerThreadableWebSocketChannel::Peer::sendBlob(PassRefPtr<BlobDataHandle>
m_syncHelper->signalWorkerThread();
}
void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
{
ASSERT(isMainThread());
if (!m_mainWebSocketChannel) {
m_syncHelper->setBufferedAmount(0);
} else {
unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
m_syncHelper->setBufferedAmount(bufferedAmount);
}
m_syncHelper->signalWorkerThread();
}
void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason)
{
ASSERT(isMainThread());
......@@ -400,16 +371,16 @@ void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vec
m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper.get(), binaryData));
}
static void workerGlobalScopeDidUpdateBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long consumed)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
workerClientWrapper->didUpdateBufferedAmount(bufferedAmount);
workerClientWrapper->didConsumeBufferedAmount(consumed);
}
void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned long bufferedAmount)
void WorkerThreadableWebSocketChannel::Peer::didConsumeBufferedAmount(unsigned long consumed)
{
ASSERT(isMainThread());
m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidUpdateBufferedAmount, m_workerClientWrapper.get(), bufferedAmount));
m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidConsumeBufferedAmount, m_workerClientWrapper.get(), consumed));
}
static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
......@@ -424,17 +395,17 @@ void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper.get()));
}
static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
ASSERT_UNUSED(context, context->isWorkerGlobalScope());
workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason);
workerClientWrapper->didClose(closingHandshakeCompletion, code, reason);
}
void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
void WorkerThreadableWebSocketChannel::Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
ASSERT(isMainThread());
m_mainWebSocketChannel = nullptr;
m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidClose, m_workerClientWrapper.get(), unhandledBufferedAmount, closingHandshakeCompletion, code, reason));
m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidClose, m_workerClientWrapper.get(), closingHandshakeCompletion, code, reason));
}
static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
......@@ -532,18 +503,6 @@ WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(Pass
return m_syncHelper->sendRequestResult();
}
unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
{
if (hasTerminatedPeer())
return 0;
RefPtr<Bridge> protect(this);
if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::bufferedAmount, m_peer))))
return 0;
return m_syncHelper->bufferedAmount();
}
void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
{
if (hasTerminatedPeer())
......
......@@ -81,7 +81,6 @@ public:
ASSERT_NOT_REACHED();
return WebSocketChannel::SendFail;
}
virtual unsigned long bufferedAmount() const OVERRIDE;
virtual void close(int code, const String& reason) OVERRIDE;
virtual void fail(const String& reason, MessageLevel, const String&, unsigned) OVERRIDE;
virtual void disconnect() OVERRIDE; // Will suppress didClose().
......@@ -118,9 +117,9 @@ public:
virtual void didConnect(const String& subprotocol, const String& extensions) OVERRIDE;
virtual void didReceiveMessage(const String& message) OVERRIDE;
virtual void didReceiveBinaryData(PassOwnPtr<Vector<char> >) OVERRIDE;
virtual void didUpdateBufferedAmount(unsigned long bufferedAmount) OVERRIDE;
virtual void didConsumeBufferedAmount(unsigned long) OVERRIDE;
virtual void didStartClosingHandshake() OVERRIDE;
virtual void didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus, unsigned short code, const String& reason) OVERRIDE;
virtual void didClose(ClosingHandshakeCompletionStatus, unsigned short code, const String& reason) OVERRIDE;
virtual void didReceiveMessageError() OVERRIDE;
private:
......
......@@ -194,8 +194,6 @@ bool SocketStreamHandle::send(const char* data, int length)
return false;
}
m_buffer.append(data, length);
if (m_client)
m_client->didUpdateBufferedAmount(this, bufferedAmount());
return true;
}
int bytesWritten = 0;
......@@ -203,14 +201,14 @@ bool SocketStreamHandle::send(const char* data, int length)
bytesWritten = sendInternal(data, length);
if (bytesWritten < 0)
return false;
if (m_client)
m_client->didConsumeBufferedAmount(this, bytesWritten);
if (m_buffer.size() + length - bytesWritten > bufferSize) {
// FIXME: report error to indicate that buffer has no more space.
return false;
}
if (bytesWritten < length) {
m_buffer.append(data + bytesWritten, length - bytesWritten);
if (m_client)
m_client->didUpdateBufferedAmount(this, bufferedAmount());
}
return true;
}
......@@ -259,9 +257,9 @@ bool SocketStreamHandle::sendPendingData()
return false;
ASSERT(m_buffer.size() - bytesWritten <= bufferSize);
m_buffer.consume(bytesWritten);
if (m_client)
m_client->didConsumeBufferedAmount(this, bytesWritten);
} while (!pending && !m_buffer.isEmpty());
if (m_client)
m_client->didUpdateBufferedAmount(this, bufferedAmount());
return true;
}
......
......@@ -56,7 +56,6 @@ public:
bool send(const char* data, int length);
void close(); // Disconnect after all data in buffer are sent.
void disconnect();
size_t bufferedAmount() const { return m_buffer.size(); }
SocketStreamHandleClient* client() const { return m_client; }
void setClient(SocketStreamHandleClient*);
......
......@@ -45,7 +45,7 @@ public:
virtual void didOpenSocketStream(SocketStreamHandle*) { }
virtual void didCloseSocketStream(SocketStreamHandle*) { }
virtual void didReceiveSocketStreamData(SocketStreamHandle*, const char* /*data*/, int /*length*/) { }
virtual void didUpdateBufferedAmount(SocketStreamHandle*, size_t /*bufferedAmount*/) { }
virtual void didConsumeBufferedAmount(SocketStreamHandle*, size_t amount) { }
virtual void didFailSocketStream(SocketStreamHandle*, const SocketStreamError&) { }
};
......
......@@ -42,6 +42,7 @@
#include "public/platform/WebURL.h"
#include "public/web/WebDocument.h"
#include "wtf/ArrayBuffer.h"
#include "wtf/text/CString.h"
#include "wtf/text/WTFString.h"
using namespace WebCore;
......@@ -51,6 +52,7 @@ namespace blink {
WebSocketImpl::WebSocketImpl(const WebDocument& document, WebSocketClient* client)
: m_client(client)
, m_binaryType(BinaryTypeBlob)
, m_bufferedAmount(0)
{
RefPtrWillBeRawPtr<Document> coreDocument = PassRefPtrWillBeRawPtr<Document>(document);
if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
......@@ -95,17 +97,19 @@ WebString WebSocketImpl::extensions()
bool WebSocketImpl::sendText(const WebString& message)
{
m_bufferedAmount += message.utf8().length();
return m_private->send(message) == WebSocketChannel::SendSuccess;
}
bool WebSocketImpl::sendArrayBuffer(const WebArrayBuffer& webArrayBuffer)
{
m_bufferedAmount += webArrayBuffer.byteLength();
return m_private->send(*PassRefPtr<ArrayBuffer>(webArrayBuffer), 0, webArrayBuffer.byteLength()) == WebSocketChannel::SendSuccess;
}
unsigned long WebSocketImpl::bufferedAmount() const
{
return m_private->bufferedAmount();
return m_bufferedAmount;
}
void WebSocketImpl::close(int code, const WebString& reason)
......@@ -153,9 +157,9 @@ void WebSocketImpl::didReceiveMessageError()
m_client->didReceiveMessageError();
}
void WebSocketImpl::didUpdateBufferedAmount(unsigned long bufferedAmount)
void WebSocketImpl::didConsumeBufferedAmount(unsigned long consumed)
{
m_client->didUpdateBufferedAmount(bufferedAmount);
m_bufferedAmount -= consumed;
}
void WebSocketImpl::didStartClosingHandshake()
......@@ -163,9 +167,9 @@ void WebSocketImpl::didStartClosingHandshake()
m_client->didStartClosingHandshake();
}
void WebSocketImpl::didClose(unsigned long bufferedAmount, ClosingHandshakeCompletionStatus status, unsigned short code, const String& reason)
void WebSocketImpl::didClose(ClosingHandshakeCompletionStatus status, unsigned short code, const String& reason)
{
m_client->didClose(bufferedAmount, static_cast<WebSocketClient::ClosingHandshakeCompletionStatus>(status), code, WebString(reason));
m_client->didClose(m_bufferedAmount, static_cast<WebSocketClient::ClosingHandshakeCompletionStatus>(status), code, WebString(reason));
}
} // namespace blink
......@@ -71,9 +71,9 @@ public:
virtual void didReceiveMessage(const String& message) OVERRIDE;
virtual void didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData) OVERRIDE;
virtual void didReceiveMessageError() OVERRIDE;
virtual void didUpdateBufferedAmount(unsigned long bufferedAmount) OVERRIDE;
virtual void didConsumeBufferedAmount(unsigned long consumed) OVERRIDE;
virtual void didStartClosingHandshake() OVERRIDE;
virtual void didClose(unsigned long bufferedAmount, ClosingHandshakeCompletionStatus, unsigned short code, const String& reason) OVERRIDE;
virtual void didClose(ClosingHandshakeCompletionStatus, unsigned short code, const String& reason) OVERRIDE;
private:
RefPtrWillBePersistent<WebCore::WebSocketChannel> m_private;
......@@ -81,6 +81,7 @@ private:
BinaryType m_binaryType;
WebString m_subprotocol;
WebString m_extensions;
unsigned long m_bufferedAmount;
};
} // namespace blink
......
......@@ -50,9 +50,8 @@ public:
virtual void didReceiveMessage(const WebString& message) { }
virtual void didReceiveArrayBuffer(const WebArrayBuffer& arrayBuffer) { }
virtual void didReceiveMessageError() { }
virtual void didUpdateBufferedAmount(unsigned long bufferedAmount) { }
virtual void didStartClosingHandshake() { }
virtual void didClose(unsigned long bufferedAmount, ClosingHandshakeCompletionStatus, unsigned short code, const WebString& reason) { }
virtual void didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus, unsigned short code, const WebString& reason) { }
};
} // namespace blink
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment