Commit b34518fa authored by yzshen's avatar yzshen Committed by Commit bot

Fix WebSocket{Read,Write}Queue.

This CL fixes:
- WebSocketReadQueue may discard data unexpectedly.
- WebSocket{Read,Write}Queue: may stop processing queued operations.

BUG=490196,490200
TEST=None

Review URL: https://codereview.chromium.org/1148913002

Cr-Commit-Position: refs/heads/master@{#330846}
parent 1fc345cd
......@@ -4,7 +4,6 @@ include_rules = [
"-net",
"-services",
"-testing",
"!base",
"!mojo/common",
"+mojo/services/network/public",
"+third_party/mojo/src/mojo/public",
......
......@@ -5,6 +5,7 @@
#include "network/public/cpp/web_socket_read_queue.h"
#include "base/bind.h"
#include "base/logging.h"
namespace mojo {
......@@ -14,7 +15,7 @@ struct WebSocketReadQueue::Operation {
};
WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle)
: handle_(handle), is_waiting_(false) {
: handle_(handle), is_busy_(false), weak_factory_(this) {
}
WebSocketReadQueue::~WebSocketReadQueue() {
......@@ -27,18 +28,23 @@ void WebSocketReadQueue::Read(uint32_t num_bytes,
op->callback_ = callback;
queue_.push_back(op);
if (!is_waiting_)
if (is_busy_)
return;
is_busy_ = true;
TryToRead();
}
void WebSocketReadQueue::TryToRead() {
DCHECK(is_busy_);
DCHECK(!queue_.empty());
do {
Operation* op = queue_[0];
const void* buffer = NULL;
uint32_t bytes_read = op->num_bytes_;
MojoResult result = BeginReadDataRaw(
handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
EndReadDataRaw(handle_, bytes_read);
Wait();
return;
}
......@@ -46,15 +52,33 @@ void WebSocketReadQueue::TryToRead() {
// Ensure |op| is deleted, whether or not |this| goes away.
scoped_ptr<Operation> op_deleter(op);
queue_.weak_erase(queue_.begin());
// http://crbug.com/490193 This should run callback as well. May need to
// change the callback signature.
if (result != MOJO_RESULT_OK)
return;
uint32_t num_bytes = op_deleter->num_bytes_;
DCHECK_LE(num_bytes, bytes_read);
DataPipeConsumerHandle handle = handle_;
op->callback_.Run(static_cast<const char*>(buffer)); // may delete |this|
EndReadDataRaw(handle, bytes_read);
base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr());
// This call may delete |this|. In that case, |self| will be invalidated.
// It may re-enter Read() too. Because |is_busy_| is true during the whole
// process, TryToRead() won't be re-entered.
op->callback_.Run(static_cast<const char*>(buffer));
EndReadDataRaw(handle, num_bytes);
if (!self)
return;
} while (!queue_.empty());
is_busy_ = false;
}
void WebSocketReadQueue::Wait() {
is_waiting_ = true;
DCHECK(is_busy_);
handle_watcher_.Start(
handle_,
MOJO_HANDLE_SIGNAL_READABLE,
......@@ -63,7 +87,7 @@ void WebSocketReadQueue::Wait() {
}
void WebSocketReadQueue::OnHandleReady(MojoResult result) {
is_waiting_ = false;
DCHECK(is_busy_);
TryToRead();
}
......
......@@ -7,6 +7,7 @@
#include "base/callback.h"
#include "base/memory/scoped_vector.h"
#include "base/memory/weak_ptr.h"
#include "mojo/common/handle_watcher.h"
#include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h"
......@@ -19,7 +20,7 @@ namespace mojo {
// See also: WebSocketWriteQueue
class WebSocketReadQueue {
public:
WebSocketReadQueue(DataPipeConsumerHandle handle);
explicit WebSocketReadQueue(DataPipeConsumerHandle handle);
~WebSocketReadQueue();
void Read(uint32_t num_bytes, base::Callback<void(const char*)> callback);
......@@ -34,7 +35,8 @@ class WebSocketReadQueue {
DataPipeConsumerHandle handle_;
common::HandleWatcher handle_watcher_;
ScopedVector<Operation> queue_;
bool is_waiting_;
bool is_busy_;
base::WeakPtrFactory<WebSocketReadQueue> weak_factory_;
};
} // namespace mojo
......
......@@ -5,6 +5,7 @@
#include "network/public/cpp/web_socket_write_queue.h"
#include "base/bind.h"
#include "base/logging.h"
namespace mojo {
......@@ -19,7 +20,7 @@ struct WebSocketWriteQueue::Operation {
};
WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle)
: handle_(handle), is_waiting_(false) {
: handle_(handle), is_busy_(false), weak_factory_(this) {
}
WebSocketWriteQueue::~WebSocketWriteQueue() {
......@@ -34,41 +35,58 @@ void WebSocketWriteQueue::Write(const char* data,
op->data_ = data;
queue_.push_back(op);
MojoResult result = MOJO_RESULT_SHOULD_WAIT;
if (!is_waiting_)
result = TryToWrite();
if (!is_busy_) {
is_busy_ = true;
// This call may reset |is_busy_| to false.
TryToWrite();
}
if (is_busy_) {
// If we have to wait, make a local copy of the data so we know it will
// live until we need it.
if (result == MOJO_RESULT_SHOULD_WAIT) {
op->data_copy_.resize(num_bytes);
memcpy(&op->data_copy_[0], data, num_bytes);
op->data_ = &op->data_copy_[0];
}
}
MojoResult WebSocketWriteQueue::TryToWrite() {
void WebSocketWriteQueue::TryToWrite() {
DCHECK(is_busy_);
DCHECK(!queue_.empty());
do {
Operation* op = queue_[0];
uint32_t bytes_written = op->num_bytes_;
MojoResult result = WriteDataRaw(
handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
Wait();
return result;
return;
}
// Ensure |op| is deleted, whether or not |this| goes away.
scoped_ptr<Operation> op_deleter(op);
queue_.weak_erase(queue_.begin());
// http://crbug.com/490193 This should run callback as well. May need to
// change the callback signature.
if (result != MOJO_RESULT_OK)
return result;
return;
base::WeakPtr<WebSocketWriteQueue> self(weak_factory_.GetWeakPtr());
// This call may delete |this|. In that case, |self| will be invalidated.
// It may re-enter Write() too. Because |is_busy_| is true during the whole
// process, TryToWrite() won't be re-entered.
op->callback_.Run(op->data_);
op->callback_.Run(op->data_); // may delete |this|
return result;
if (!self)
return;
} while (!queue_.empty());
is_busy_ = false;
}
void WebSocketWriteQueue::Wait() {
is_waiting_ = true;
DCHECK(is_busy_);
handle_watcher_.Start(handle_,
MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_DEADLINE_INDEFINITE,
......@@ -77,7 +95,7 @@ void WebSocketWriteQueue::Wait() {
}
void WebSocketWriteQueue::OnHandleReady(MojoResult result) {
is_waiting_ = false;
DCHECK(is_busy_);
TryToWrite();
}
......
......@@ -7,6 +7,7 @@
#include "base/callback.h"
#include "base/memory/scoped_vector.h"
#include "base/memory/weak_ptr.h"
#include "mojo/common/handle_watcher.h"
#include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h"
......@@ -19,7 +20,7 @@ namespace mojo {
// See also: WebSocketReadQueue
class WebSocketWriteQueue {
public:
WebSocketWriteQueue(DataPipeProducerHandle handle);
explicit WebSocketWriteQueue(DataPipeProducerHandle handle);
~WebSocketWriteQueue();
void Write(const char* data,
......@@ -29,14 +30,15 @@ class WebSocketWriteQueue {
private:
struct Operation;
MojoResult TryToWrite();
void TryToWrite();
void Wait();
void OnHandleReady(MojoResult result);
DataPipeProducerHandle handle_;
common::HandleWatcher handle_watcher_;
ScopedVector<Operation> queue_;
bool is_waiting_;
bool is_busy_;
base::WeakPtrFactory<WebSocketWriteQueue> weak_factory_;
};
} // namespace mojo
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment