Commit c2680c95 authored by mpcomplete's avatar mpcomplete Committed by Commit bot

Mojo: WebSocket interface now reuses the DataPipe for subsequent sends or

receives, rather then allocating a new DataPipe for each chunk.

BUG=403930

Review URL: https://codereview.chromium.org/550003005

Cr-Commit-Position: refs/heads/master@{#296293}
parent 861ff750
...@@ -121,6 +121,10 @@ ...@@ -121,6 +121,10 @@
'services/html_viewer/weburlloader_impl.h', 'services/html_viewer/weburlloader_impl.h',
'services/html_viewer/weblayertreeview_impl.cc', 'services/html_viewer/weblayertreeview_impl.cc',
'services/html_viewer/weblayertreeview_impl.h', 'services/html_viewer/weblayertreeview_impl.h',
'services/public/cpp/network/web_socket_read_queue.cc',
'services/public/cpp/network/web_socket_read_queue.h',
'services/public/cpp/network/web_socket_write_queue.cc',
'services/public/cpp/network/web_socket_write_queue.h',
], ],
}, },
{ {
...@@ -490,6 +494,10 @@ ...@@ -490,6 +494,10 @@
'services/network/url_loader_impl.h', 'services/network/url_loader_impl.h',
'services/network/web_socket_impl.cc', 'services/network/web_socket_impl.cc',
'services/network/web_socket_impl.h', 'services/network/web_socket_impl.h',
'services/public/cpp/network/web_socket_read_queue.cc',
'services/public/cpp/network/web_socket_read_queue.h',
'services/public/cpp/network/web_socket_write_queue.cc',
'services/public/cpp/network/web_socket_write_queue.h',
], ],
}, },
{ {
......
...@@ -56,8 +56,9 @@ shared_library("html_viewer") { ...@@ -56,8 +56,9 @@ shared_library("html_viewer") {
"//mojo/public/c/system:for_shared_library", "//mojo/public/c/system:for_shared_library",
"//mojo/public/cpp/bindings", "//mojo/public/cpp/bindings",
"//mojo/public/cpp/utility", "//mojo/public/cpp/utility",
"//mojo/services/public/cpp/view_manager",
"//mojo/public/interfaces/application", "//mojo/public/interfaces/application",
"//mojo/services/public/cpp/network",
"//mojo/services/public/cpp/view_manager",
"//mojo/services/public/interfaces/clipboard", "//mojo/services/public/interfaces/clipboard",
"//mojo/services/public/interfaces/content_handler", "//mojo/services/public/interfaces/content_handler",
"//mojo/services/public/interfaces/gpu", "//mojo/services/public/interfaces/gpu",
......
...@@ -6,7 +6,11 @@ ...@@ -6,7 +6,11 @@
#include <vector> #include <vector>
#include "base/bind.h"
#include "base/memory/scoped_vector.h"
#include "mojo/services/html_viewer/blink_basic_type_converters.h" #include "mojo/services/html_viewer/blink_basic_type_converters.h"
#include "mojo/services/public/cpp/network/web_socket_read_queue.h"
#include "mojo/services/public/cpp/network/web_socket_write_queue.h"
#include "mojo/services/public/interfaces/network/network_service.mojom.h" #include "mojo/services/public/interfaces/network/network_service.mojom.h"
#include "third_party/WebKit/public/platform/WebSerializedOrigin.h" #include "third_party/WebKit/public/platform/WebSerializedOrigin.h"
#include "third_party/WebKit/public/platform/WebSocketHandleClient.h" #include "third_party/WebKit/public/platform/WebSocketHandleClient.h"
...@@ -67,12 +71,15 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> { ...@@ -67,12 +71,15 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> {
private: private:
// WebSocketClient methods: // WebSocketClient methods:
virtual void DidConnect( virtual void DidConnect(bool fail,
bool fail, const String& selected_subprotocol,
const String& selected_subprotocol, const String& extensions,
const String& extensions) OVERRIDE { ScopedDataPipeConsumerHandle receive_stream)
OVERRIDE {
blink::WebSocketHandleClient* client = client_; blink::WebSocketHandleClient* client = client_;
WebSocketHandleImpl* handle = handle_; WebSocketHandleImpl* handle = handle_;
receive_stream_ = receive_stream.Pass();
read_queue_.reset(new WebSocketReadQueue(receive_stream_.get()));
if (fail) if (fail)
handle->Disconnect(); // deletes |this| handle->Disconnect(); // deletes |this|
client->didConnect(handle, client->didConnect(handle,
...@@ -84,19 +91,11 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> { ...@@ -84,19 +91,11 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> {
virtual void DidReceiveData(bool fin, virtual void DidReceiveData(bool fin,
WebSocket::MessageType type, WebSocket::MessageType type,
ScopedDataPipeConsumerHandle data_pipe) OVERRIDE { uint32_t num_bytes) OVERRIDE {
uint32_t num_bytes; read_queue_->Read(num_bytes,
ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY); base::Bind(&WebSocketClientImpl::DidReadFromReceiveStream,
std::vector<char> data(num_bytes); base::Unretained(this),
ReadDataRaw( fin, type, num_bytes));
data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE);
const char* data_ptr = data.empty() ? NULL : &data[0];
client_->didReceiveData(handle_,
fin,
ConvertTo<WebSocketHandle::MessageType>(type),
data_ptr,
data.size());
// |handle| can be deleted here.
} }
virtual void DidReceiveFlowControl(int64_t quota) OVERRIDE { virtual void DidReceiveFlowControl(int64_t quota) OVERRIDE {
...@@ -122,8 +121,23 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> { ...@@ -122,8 +121,23 @@ class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> {
// |handle| can be deleted here. // |handle| can be deleted here.
} }
void DidReadFromReceiveStream(bool fin,
WebSocket::MessageType type,
uint32_t num_bytes,
const char* data) {
client_->didReceiveData(handle_,
fin,
ConvertTo<WebSocketHandle::MessageType>(type),
data,
num_bytes);
// |handle_| can be deleted here.
}
// |handle_| owns this object, so it is guaranteed to outlive us.
WebSocketHandleImpl* handle_; WebSocketHandleImpl* handle_;
blink::WebSocketHandleClient* client_; blink::WebSocketHandleClient* client_;
ScopedDataPipeConsumerHandle receive_stream_;
scoped_ptr<WebSocketReadQueue> read_queue_;
DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl); DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl);
}; };
...@@ -150,9 +164,14 @@ void WebSocketHandleImpl::connect(const WebURL& url, ...@@ -150,9 +164,14 @@ void WebSocketHandleImpl::connect(const WebURL& url,
// TODO(mpcomplete): Is this the right ownership model? Or should mojo own // TODO(mpcomplete): Is this the right ownership model? Or should mojo own
// |client_|? // |client_|?
WeakBindToProxy(client_.get(), &client_ptr); WeakBindToProxy(client_.get(), &client_ptr);
DataPipe data_pipe;
send_stream_ = data_pipe.producer_handle.Pass();
write_queue_.reset(new WebSocketWriteQueue(send_stream_.get()));
web_socket_->Connect(url.string().utf8(), web_socket_->Connect(url.string().utf8(),
Array<String>::From(protocols), Array<String>::From(protocols),
origin.string().utf8(), origin.string().utf8(),
data_pipe.consumer_handle.Pass(),
client_ptr.Pass()); client_ptr.Pass());
} }
...@@ -163,22 +182,12 @@ void WebSocketHandleImpl::send(bool fin, ...@@ -163,22 +182,12 @@ void WebSocketHandleImpl::send(bool fin,
if (!client_) if (!client_)
return; return;
// TODO(mpcomplete): reuse the data pipe for subsequent sends. uint32_t size32 = static_cast<uint32_t>(size);
uint32_t num_bytes = static_cast<uint32_t>(size); write_queue_->Write(
MojoCreateDataPipeOptions options; data, size32,
options.struct_size = sizeof(MojoCreateDataPipeOptions); base::Bind(&WebSocketHandleImpl::DidWriteToSendStream,
options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; base::Unretained(this),
options.element_num_bytes = 1; fin, type, size32));
options.capacity_num_bytes = num_bytes;
DataPipe data_pipe(options);
WriteDataRaw(data_pipe.producer_handle.get(),
data,
&num_bytes,
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
web_socket_->Send(
fin,
ConvertTo<WebSocket::MessageType>(type),
data_pipe.consumer_handle.Pass());
} }
void WebSocketHandleImpl::flowControl(int64_t quota) { void WebSocketHandleImpl::flowControl(int64_t quota) {
...@@ -192,6 +201,14 @@ void WebSocketHandleImpl::close(unsigned short code, const WebString& reason) { ...@@ -192,6 +201,14 @@ void WebSocketHandleImpl::close(unsigned short code, const WebString& reason) {
web_socket_->Close(code, reason.utf8()); web_socket_->Close(code, reason.utf8());
} }
void WebSocketHandleImpl::DidWriteToSendStream(
bool fin,
WebSocketHandle::MessageType type,
uint32_t num_bytes,
const char* data) {
web_socket_->Send(fin, ConvertTo<WebSocket::MessageType>(type), num_bytes);
}
void WebSocketHandleImpl::Disconnect() { void WebSocketHandleImpl::Disconnect() {
did_close_ = true; did_close_ = true;
client_.reset(); client_.reset();
......
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
#define MOJO_SERVICES_HTML_VIEWER_WEBSOCKETHANDLE_IMPL_H_ #define MOJO_SERVICES_HTML_VIEWER_WEBSOCKETHANDLE_IMPL_H_
#include "base/memory/scoped_ptr.h" #include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "mojo/common/handle_watcher.h" #include "mojo/common/handle_watcher.h"
#include "mojo/services/public/interfaces/network/web_socket.mojom.h" #include "mojo/services/public/interfaces/network/web_socket.mojom.h"
#include "third_party/WebKit/public/platform/WebSocketHandle.h" #include "third_party/WebKit/public/platform/WebSocketHandle.h"
...@@ -14,6 +13,7 @@ ...@@ -14,6 +13,7 @@
namespace mojo { namespace mojo {
class NetworkService; class NetworkService;
class WebSocketClientImpl; class WebSocketClientImpl;
class WebSocketWriteQueue;
// Implements WebSocketHandle by talking to the mojo WebSocket interface. // Implements WebSocketHandle by talking to the mojo WebSocket interface.
class WebSocketHandleImpl : public blink::WebSocketHandle { class WebSocketHandleImpl : public blink::WebSocketHandle {
...@@ -38,11 +38,19 @@ class WebSocketHandleImpl : public blink::WebSocketHandle { ...@@ -38,11 +38,19 @@ class WebSocketHandleImpl : public blink::WebSocketHandle {
virtual void close(unsigned short code, virtual void close(unsigned short code,
const blink::WebString& reason) OVERRIDE; const blink::WebString& reason) OVERRIDE;
// Called when we finished writing to |send_stream_|.
void DidWriteToSendStream(bool fin,
WebSocketHandle::MessageType type,
uint32_t num_bytes,
const char* data);
// Called when the socket is closed. // Called when the socket is closed.
void Disconnect(); void Disconnect();
WebSocketPtr web_socket_; WebSocketPtr web_socket_;
scoped_ptr<WebSocketClientImpl> client_; scoped_ptr<WebSocketClientImpl> client_;
ScopedDataPipeProducerHandle send_stream_;
scoped_ptr<WebSocketWriteQueue> write_queue_;
// True if close() was called. // True if close() was called.
bool did_close_; bool did_close_;
......
...@@ -12,6 +12,7 @@ shared_library("network") { ...@@ -12,6 +12,7 @@ shared_library("network") {
"//mojo/application", "//mojo/application",
"//mojo/public/c/system:for_shared_library", "//mojo/public/c/system:for_shared_library",
"//mojo/public/cpp/bindings:bindings", "//mojo/public/cpp/bindings:bindings",
"//mojo/services/public/cpp/network",
"//mojo/services/public/interfaces/network", "//mojo/services/public/interfaces/network",
] ]
...@@ -25,6 +26,7 @@ source_set("lib") { ...@@ -25,6 +26,7 @@ source_set("lib") {
"//mojo/application", "//mojo/application",
"//mojo/common", "//mojo/common",
"//mojo/environment:chromium", "//mojo/environment:chromium",
"//mojo/services/public/cpp/network",
"//mojo/services/public/interfaces/network", "//mojo/services/public/interfaces/network",
"//net", "//net",
"//url", "//url",
......
...@@ -5,7 +5,11 @@ ...@@ -5,7 +5,11 @@
#include "mojo/services/network/web_socket_impl.h" #include "mojo/services/network/web_socket_impl.h"
#include "base/logging.h" #include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "mojo/common/handle_watcher.h"
#include "mojo/services/network/network_context.h" #include "mojo/services/network/network_context.h"
#include "mojo/services/public/cpp/network/web_socket_read_queue.h"
#include "mojo/services/public/cpp/network/web_socket_write_queue.h"
#include "net/websockets/websocket_channel.h" #include "net/websockets/websocket_channel.h"
#include "net/websockets/websocket_errors.h" #include "net/websockets/websocket_errors.h"
#include "net/websockets/websocket_event_interface.h" #include "net/websockets/websocket_event_interface.h"
...@@ -88,7 +92,14 @@ struct WebSocketEventHandler : public net::WebSocketEventInterface { ...@@ -88,7 +92,14 @@ struct WebSocketEventHandler : public net::WebSocketEventInterface {
const net::SSLInfo& ssl_info, const net::SSLInfo& ssl_info,
bool fatal) OVERRIDE; bool fatal) OVERRIDE;
// Called once we've written to |receive_stream_|.
void DidWriteToReceiveStream(bool fin,
net::WebSocketFrameHeader::OpCode type,
uint32_t num_bytes,
const char* buffer);
WebSocketClientPtr client_; WebSocketClientPtr client_;
ScopedDataPipeProducerHandle receive_stream_;
scoped_ptr<WebSocketWriteQueue> write_queue_;
DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler); DISALLOW_COPY_AND_ASSIGN(WebSocketEventHandler);
}; };
...@@ -97,7 +108,11 @@ ChannelState WebSocketEventHandler::OnAddChannelResponse( ...@@ -97,7 +108,11 @@ ChannelState WebSocketEventHandler::OnAddChannelResponse(
bool fail, bool fail,
const std::string& selected_protocol, const std::string& selected_protocol,
const std::string& extensions) { const std::string& extensions) {
client_->DidConnect(fail, selected_protocol, extensions); DataPipe data_pipe;
receive_stream_ = data_pipe.producer_handle.Pass();
write_queue_.reset(new WebSocketWriteQueue(receive_stream_.get()));
client_->DidConnect(
fail, selected_protocol, extensions, data_pipe.consumer_handle.Pass());
if (fail) if (fail)
return WebSocketEventInterface::CHANNEL_DELETED; return WebSocketEventInterface::CHANNEL_DELETED;
return WebSocketEventInterface::CHANNEL_ALIVE; return WebSocketEventInterface::CHANNEL_ALIVE;
...@@ -107,20 +122,12 @@ ChannelState WebSocketEventHandler::OnDataFrame( ...@@ -107,20 +122,12 @@ ChannelState WebSocketEventHandler::OnDataFrame(
bool fin, bool fin,
net::WebSocketFrameHeader::OpCode type, net::WebSocketFrameHeader::OpCode type,
const std::vector<char>& data) { const std::vector<char>& data) {
// TODO(mpcomplete): reuse the data pipe for subsequent frames. uint32_t size = static_cast<uint32_t>(data.size());
uint32_t num_bytes = static_cast<uint32_t>(data.size()); write_queue_->Write(
MojoCreateDataPipeOptions options; &data[0], size,
options.struct_size = sizeof(MojoCreateDataPipeOptions); base::Bind(&WebSocketEventHandler::DidWriteToReceiveStream,
options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; base::Unretained(this),
options.element_num_bytes = 1; fin, type, size));
options.capacity_num_bytes = num_bytes;
DataPipe data_pipe(options);
WriteDataRaw(data_pipe.producer_handle.get(),
&data[0],
&num_bytes,
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
client_->DidReceiveData(fin, ConvertTo<WebSocket::MessageType>(type),
data_pipe.consumer_handle.Pass());
return WebSocketEventInterface::CHANNEL_ALIVE; return WebSocketEventInterface::CHANNEL_ALIVE;
} }
...@@ -164,6 +171,15 @@ ChannelState WebSocketEventHandler::OnSSLCertificateError( ...@@ -164,6 +171,15 @@ ChannelState WebSocketEventHandler::OnSSLCertificateError(
return WebSocketEventInterface::CHANNEL_DELETED; return WebSocketEventInterface::CHANNEL_DELETED;
} }
void WebSocketEventHandler::DidWriteToReceiveStream(
bool fin,
net::WebSocketFrameHeader::OpCode type,
uint32_t num_bytes,
const char* buffer) {
client_->DidReceiveData(
fin, ConvertTo<WebSocket::MessageType>(type), num_bytes);
}
} // namespace mojo } // namespace mojo
WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) { WebSocketImpl::WebSocketImpl(NetworkContext* context) : context_(context) {
...@@ -175,8 +191,11 @@ WebSocketImpl::~WebSocketImpl() { ...@@ -175,8 +191,11 @@ WebSocketImpl::~WebSocketImpl() {
void WebSocketImpl::Connect(const String& url, void WebSocketImpl::Connect(const String& url,
Array<String> protocols, Array<String> protocols,
const String& origin, const String& origin,
ScopedDataPipeConsumerHandle send_stream,
WebSocketClientPtr client) { WebSocketClientPtr client) {
DCHECK(!channel_); DCHECK(!channel_);
send_stream_ = send_stream.Pass();
read_queue_.reset(new WebSocketReadQueue(send_stream_.get()));
scoped_ptr<net::WebSocketEventInterface> event_interface( scoped_ptr<net::WebSocketEventInterface> event_interface(
new WebSocketEventHandler(client.Pass())); new WebSocketEventHandler(client.Pass()));
channel_.reset(new net::WebSocketChannel(event_interface.Pass(), channel_.reset(new net::WebSocketChannel(event_interface.Pass(),
...@@ -188,14 +207,12 @@ void WebSocketImpl::Connect(const String& url, ...@@ -188,14 +207,12 @@ void WebSocketImpl::Connect(const String& url,
void WebSocketImpl::Send(bool fin, void WebSocketImpl::Send(bool fin,
WebSocket::MessageType type, WebSocket::MessageType type,
ScopedDataPipeConsumerHandle data_pipe) { uint32_t num_bytes) {
DCHECK(channel_); DCHECK(channel_);
uint32_t num_bytes; read_queue_->Read(num_bytes,
ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY); base::Bind(&WebSocketImpl::DidReadFromSendStream,
std::vector<char> data(num_bytes); base::Unretained(this),
ReadDataRaw(data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE); fin, type, num_bytes));
channel_->SendFrame(
fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), data);
} }
void WebSocketImpl::FlowControl(int64_t quota) { void WebSocketImpl::FlowControl(int64_t quota) {
...@@ -208,4 +225,15 @@ void WebSocketImpl::Close(uint16_t code, const String& reason) { ...@@ -208,4 +225,15 @@ void WebSocketImpl::Close(uint16_t code, const String& reason) {
channel_->StartClosingHandshake(code, reason); channel_->StartClosingHandshake(code, reason);
} }
void WebSocketImpl::DidReadFromSendStream(bool fin,
WebSocket::MessageType type,
uint32_t num_bytes,
const char* data) {
std::vector<char> buffer(num_bytes);
memcpy(&buffer[0], data, num_bytes);
DCHECK(channel_);
channel_->SendFrame(
fin, ConvertTo<net::WebSocketFrameHeader::OpCode>(type), buffer);
}
} // namespace mojo } // namespace mojo
...@@ -16,6 +16,7 @@ class WebSocketChannel; ...@@ -16,6 +16,7 @@ class WebSocketChannel;
namespace mojo { namespace mojo {
class NetworkContext; class NetworkContext;
class WebSocketReadQueue;
// Forms a bridge between the WebSocket mojo interface and the net::WebSocket // Forms a bridge between the WebSocket mojo interface and the net::WebSocket
// implementation. // implementation.
...@@ -25,22 +26,28 @@ class WebSocketImpl : public InterfaceImpl<WebSocket> { ...@@ -25,22 +26,28 @@ class WebSocketImpl : public InterfaceImpl<WebSocket> {
virtual ~WebSocketImpl(); virtual ~WebSocketImpl();
private: private:
class PendingWriteToDataPipe;
class DependentIOBuffer;
// WebSocket methods: // WebSocket methods:
virtual void Connect(const String& url, virtual void Connect(const String& url,
Array<String> protocols, Array<String> protocols,
const String& origin, const String& origin,
ScopedDataPipeConsumerHandle send_stream,
WebSocketClientPtr client) OVERRIDE; WebSocketClientPtr client) OVERRIDE;
virtual void Send(bool fin, virtual void Send(bool fin,
WebSocket::MessageType type, WebSocket::MessageType type,
ScopedDataPipeConsumerHandle data) OVERRIDE; uint32_t num_bytes) OVERRIDE;
virtual void FlowControl(int64_t quota) OVERRIDE; virtual void FlowControl(int64_t quota) OVERRIDE;
virtual void Close(uint16_t code, const String& reason) OVERRIDE; virtual void Close(uint16_t code, const String& reason) OVERRIDE;
// Called with the data to send once it has been read from |send_stream_|.
void DidReadFromSendStream(bool fin,
WebSocket::MessageType type,
uint32_t num_bytes,
const char* data);
// The channel we use to send events to the network. // The channel we use to send events to the network.
scoped_ptr<net::WebSocketChannel> channel_; scoped_ptr<net::WebSocketChannel> channel_;
ScopedDataPipeConsumerHandle send_stream_;
scoped_ptr<WebSocketReadQueue> read_queue_;
NetworkContext* context_; NetworkContext* context_;
}; };
......
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# GYP version: mojo/mojo_services.gypi:mojo_network_utility
component("network") {
deps = [
"//base",
"//mojo/application",
"//mojo/common",
"//mojo/environment:chromium",
"//mojo/public/c/system:for_component",
]
sources = [
"web_socket_read_queue.cc",
"web_socket_read_queue.h",
"web_socket_write_queue.cc",
"web_socket_write_queue.h",
]
}
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/services/public/cpp/network/web_socket_read_queue.h"
#include "base/bind.h"
namespace mojo {
struct WebSocketReadQueue::Operation {
uint32_t num_bytes_;
base::Callback<void(const char*)> callback_;
};
WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle)
: handle_(handle), is_waiting_(false) {
}
WebSocketReadQueue::~WebSocketReadQueue() {
}
void WebSocketReadQueue::Read(uint32_t num_bytes,
base::Callback<void(const char*)> callback) {
Operation* op = new Operation;
op->num_bytes_ = num_bytes;
op->callback_ = callback;
queue_.push_back(op);
if (!is_waiting_)
TryToRead();
}
void WebSocketReadQueue::TryToRead() {
Operation* op = queue_[0];
const void* buffer = NULL;
uint32_t bytes_read = op->num_bytes_;
MojoResult result = BeginReadDataRaw(
handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
EndReadDataRaw(handle_, bytes_read);
Wait();
return;
}
// Ensure |op| is deleted, whether or not |this| goes away.
scoped_ptr<Operation> op_deleter(op);
queue_.weak_erase(queue_.begin());
if (result != MOJO_RESULT_OK)
return;
DataPipeConsumerHandle handle = handle_;
op->callback_.Run(static_cast<const char*>(buffer)); // may delete |this|
EndReadDataRaw(handle, bytes_read);
}
void WebSocketReadQueue::Wait() {
is_waiting_ = true;
handle_watcher_.Start(
handle_,
MOJO_HANDLE_SIGNAL_READABLE,
MOJO_DEADLINE_INDEFINITE,
base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this)));
}
void WebSocketReadQueue::OnHandleReady(MojoResult result) {
is_waiting_ = false;
TryToRead();
}
} // namespace mojo
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_SERVICES_PUBLIC_CPP_NETWORK_WEB_SOCKET_READ_QUEUE_H_
#define MOJO_SERVICES_PUBLIC_CPP_NETWORK_WEB_SOCKET_READ_QUEUE_H_
#include "base/callback.h"
#include "base/memory/scoped_vector.h"
#include "mojo/common/handle_watcher.h"
#include "mojo/public/cpp/system/data_pipe.h"
namespace mojo {
// This class simplifies the handling of multiple Reads on a DataPipe. It reads
// the data in the expected chunk size, calling the callback once a full chunk
// is ready. Callbacks are owned by this class, and are guaranteed not to be
// called after this class is destroyed.
// See also: WebSocketWriteQueue
class WebSocketReadQueue {
public:
WebSocketReadQueue(DataPipeConsumerHandle handle);
~WebSocketReadQueue();
void Read(uint32_t num_bytes, base::Callback<void(const char*)> callback);
private:
struct Operation;
void TryToRead();
void Wait();
void OnHandleReady(MojoResult result);
DataPipeConsumerHandle handle_;
common::HandleWatcher handle_watcher_;
ScopedVector<Operation> queue_;
bool is_waiting_;
};
} // namespace mojo
#endif // MOJO_SERVICES_PUBLIC_CPP_NETWORK_WEB_SOCKET_READ_QUEUE_H_
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/services/public/cpp/network/web_socket_write_queue.h"
#include "base/bind.h"
namespace mojo {
struct WebSocketWriteQueue::Operation {
uint32_t num_bytes_;
base::Callback<void(const char*)> callback_;
const char* data_;
// Only initialized if the initial Write fails. This saves a copy in
// the common case.
std::vector<char> data_copy_;
};
WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle)
: handle_(handle), is_waiting_(false) {
}
WebSocketWriteQueue::~WebSocketWriteQueue() {
}
void WebSocketWriteQueue::Write(const char* data,
uint32_t num_bytes,
base::Callback<void(const char*)> callback) {
Operation* op = new Operation;
op->num_bytes_ = num_bytes;
op->callback_ = callback;
op->data_ = data;
queue_.push_back(op);
MojoResult result = MOJO_RESULT_SHOULD_WAIT;
if (!is_waiting_)
result = TryToWrite();
// If we have to wait, make a local copy of the data so we know it will
// live until we need it.
if (result == MOJO_RESULT_SHOULD_WAIT) {
op->data_copy_.resize(num_bytes);
memcpy(&op->data_copy_[0], data, num_bytes);
op->data_ = &op->data_copy_[0];
}
}
MojoResult WebSocketWriteQueue::TryToWrite() {
Operation* op = queue_[0];
uint32_t bytes_written = op->num_bytes_;
MojoResult result = WriteDataRaw(
handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
Wait();
return result;
}
// Ensure |op| is deleted, whether or not |this| goes away.
scoped_ptr<Operation> op_deleter(op);
queue_.weak_erase(queue_.begin());
if (result != MOJO_RESULT_OK)
return result;
op->callback_.Run(op->data_); // may delete |this|
return result;
}
void WebSocketWriteQueue::Wait() {
is_waiting_ = true;
handle_watcher_.Start(handle_,
MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_DEADLINE_INDEFINITE,
base::Bind(&WebSocketWriteQueue::OnHandleReady,
base::Unretained(this)));
}
void WebSocketWriteQueue::OnHandleReady(MojoResult result) {
is_waiting_ = false;
TryToWrite();
}
} // namespace mojo
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_SERVICES_PUBLIC_CPP_NETWORK_WEB_SOCKET_WRITE_QUEUE_H_
#define MOJO_SERVICES_PUBLIC_CPP_NETWORK_WEB_SOCKET_WRITE_QUEUE_H_
#include "base/callback.h"
#include "base/memory/scoped_vector.h"
#include "mojo/common/handle_watcher.h"
#include "mojo/public/cpp/system/data_pipe.h"
namespace mojo {
// This class simplifies the handling of multiple Writes on a DataPipe. It
// writes each chunk all at once (or waits until the pipe is ready before
// writing), calling the callback when finished. Callbacks are owned by this
// class, and are guaranteed not to be called after this class is destroyed.
// See also: WebSocketReadQueue
class WebSocketWriteQueue {
public:
WebSocketWriteQueue(DataPipeProducerHandle handle);
~WebSocketWriteQueue();
void Write(const char* data,
uint32_t num_bytes,
base::Callback<void(const char*)> callback);
private:
struct Operation;
MojoResult TryToWrite();
void Wait();
void OnHandleReady(MojoResult result);
DataPipeProducerHandle handle_;
common::HandleWatcher handle_watcher_;
ScopedVector<Operation> queue_;
bool is_waiting_;
};
} // namespace mojo
#endif // MOJO_SERVICES_PUBLIC_CPP_NETWORK_WEB_SOCKET_WRITE_QUEUE_H_
...@@ -14,19 +14,42 @@ interface WebSocket { ...@@ -14,19 +14,42 @@ interface WebSocket {
}; };
const uint16 kAbnormalCloseCode = 1006; // stolen from websocket_bridge const uint16 kAbnormalCloseCode = 1006; // stolen from websocket_bridge
Connect( // Initiates a WebSocket connection to the given url. |send_stream| is a data
string url, string[] protocols, string origin, WebSocketClient client); // pipe which should remain open for the lifetime of the WebSocket. Data
Send(bool fin, MessageType type, handle<data_pipe_consumer> data); // to send over the WebSocket should be written to the producer end of the
// |send_stream|.
Connect(string url,
string[] protocols,
string origin,
handle<data_pipe_consumer> send_stream,
WebSocketClient client);
// Called after writing |num_bytes| worth of data to the WebSocket's
// |send_stream|.
Send(bool fin, MessageType type, uint32 num_bytes);
FlowControl(int64 quota); FlowControl(int64 quota);
Close(uint16 code, string reason); Close(uint16 code, string reason);
}; };
interface WebSocketClient { interface WebSocketClient {
DidConnect(bool fail, string selected_subprotocol, string extensions); // Called in response to a WebSocket.Connect call to indicate success or
DidReceiveData( // failure. |receive_stream| is a data pipe which where incoming data from
bool fin, WebSocket.MessageType type, handle<data_pipe_consumer> data); // the server is written.
DidConnect(bool fail,
string selected_subprotocol,
string extensions,
handle<data_pipe_consumer> receive_stream);
// Called when there is |num_bytes| worth of incoming data available on the
// |receive_stream|.
DidReceiveData(bool fin, WebSocket.MessageType type, uint32 num_bytes);
DidReceiveFlowControl(int64 quota); DidReceiveFlowControl(int64 quota);
DidFail(string message); DidFail(string message);
DidClose(bool was_clean, uint16 code, string reason); DidClose(bool was_clean, uint16 code, string reason);
// Blink has 3 extra methods that we don't implement, because they are used // Blink has 3 extra methods that we don't implement, because they are used
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment