Commit 533f5276 authored by yzshen's avatar yzshen Committed by Commit bot

Mojo service implementation for HTTP server - part 3

This CL adds WebSocket support and correspnoding tests.

BUG=478249
TEST=Newly added tests.

Review URL: https://codereview.chromium.org/1144843002

Cr-Commit-Position: refs/heads/master@{#330884}
parent 87896a75
...@@ -14,6 +14,9 @@ ...@@ -14,6 +14,9 @@
#include "mojo/common/handle_watcher.h" #include "mojo/common/handle_watcher.h"
#include "mojo/services/network/http_server_impl.h" #include "mojo/services/network/http_server_impl.h"
#include "mojo/services/network/net_adapters.h" #include "mojo/services/network/net_adapters.h"
#include "mojo/services/network/public/cpp/web_socket_read_queue.h"
#include "mojo/services/network/public/cpp/web_socket_write_queue.h"
#include "mojo/services/network/public/interfaces/web_socket.mojom.h"
#include "net/base/net_errors.h" #include "net/base/net_errors.h"
#include "net/http/http_request_headers.h" #include "net/http/http_request_headers.h"
#include "net/http/http_status_code.h" #include "net/http/http_status_code.h"
...@@ -82,6 +85,143 @@ class HttpConnectionImpl::SimpleDataPipeReader { ...@@ -82,6 +85,143 @@ class HttpConnectionImpl::SimpleDataPipeReader {
DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader); DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader);
}; };
class HttpConnectionImpl::WebSocketImpl : public WebSocket,
public ErrorHandler {
public:
// |connection| must outlive this object.
WebSocketImpl(HttpConnectionImpl* connection,
InterfaceRequest<WebSocket> request,
ScopedDataPipeConsumerHandle send_stream,
WebSocketClientPtr client)
: connection_(connection),
binding_(this, request.Pass()),
client_(client.Pass()),
send_stream_(send_stream.Pass()),
read_send_stream_(new WebSocketReadQueue(send_stream_.get())),
pending_send_count_(0) {
DCHECK(binding_.is_bound());
DCHECK(client_);
DCHECK(send_stream_.is_valid());
binding_.set_error_handler(this);
client_.set_error_handler(this);
DataPipe data_pipe;
receive_stream_ = data_pipe.producer_handle.Pass();
write_receive_stream_.reset(new WebSocketWriteQueue(receive_stream_.get()));
client_->DidConnect("", "", data_pipe.consumer_handle.Pass());
}
~WebSocketImpl() override {}
void Close() {
DCHECK(!IsClosing());
binding_.Close();
client_.reset();
NotifyOwnerCloseIfAllDone();
}
void OnReceivedWebSocketMessage(const std::string& data) {
if (IsClosing())
return;
// TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
// WebSocket{Read,Write}Queue doesn't handle that correctly.
if (data.empty())
return;
uint32_t size = static_cast<uint32_t>(data.size());
write_receive_stream_->Write(
&data[0], size,
base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream,
base::Unretained(this), size));
}
private:
// WebSocket implementation.
void Connect(const String& url,
Array<String> protocols,
const String& origin,
ScopedDataPipeConsumerHandle send_stream,
WebSocketClientPtr client) override {
NOTREACHED();
}
void Send(bool fin, MessageType type, uint32_t num_bytes) override {
if (!fin || type != MESSAGE_TYPE_TEXT) {
NOTIMPLEMENTED();
Close();
}
// TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
// WebSocket{Read,Write}Queue doesn't handle that correctly.
if (num_bytes == 0)
return;
pending_send_count_++;
read_send_stream_->Read(
num_bytes, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream,
base::Unretained(this), num_bytes));
}
void FlowControl(int64_t quota) override { NOTIMPLEMENTED(); }
void Close(uint16_t code, const String& reason) override {
Close();
}
// ErrorHandler implementation.
void OnConnectionError() override { Close(); }
void OnFinishedReadingSendStream(uint32_t num_bytes, const char* data) {
DCHECK_GT(pending_send_count_, 0u);
pending_send_count_--;
if (data) {
connection_->server_->server()->SendOverWebSocket(
connection_->connection_id_, std::string(data, num_bytes));
}
if (IsClosing())
NotifyOwnerCloseIfAllDone();
}
void OnFinishedWritingReceiveStream(uint32_t num_bytes, const char* buffer) {
if (IsClosing())
return;
if (buffer)
client_->DidReceiveData(true, MESSAGE_TYPE_TEXT, num_bytes);
}
// Checks whether Close() has been called.
bool IsClosing() const { return !binding_.is_bound(); }
void NotifyOwnerCloseIfAllDone() {
DCHECK(IsClosing());
if (pending_send_count_ == 0)
connection_->OnWebSocketClosed();
}
HttpConnectionImpl* const connection_;
Binding<WebSocket> binding_;
WebSocketClientPtr client_;
ScopedDataPipeConsumerHandle send_stream_;
scoped_ptr<WebSocketReadQueue> read_send_stream_;
size_t pending_send_count_;
ScopedDataPipeProducerHandle receive_stream_;
scoped_ptr<WebSocketWriteQueue> write_receive_stream_;
DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
};
template <> template <>
struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> { struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> {
static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) { static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) {
...@@ -115,11 +255,11 @@ struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> { ...@@ -115,11 +255,11 @@ struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> {
}; };
HttpConnectionImpl::HttpConnectionImpl(int connection_id, HttpConnectionImpl::HttpConnectionImpl(int connection_id,
HttpServerImpl* owner, HttpServerImpl* server,
HttpConnectionDelegatePtr delegate, HttpConnectionDelegatePtr delegate,
HttpConnectionPtr* connection) HttpConnectionPtr* connection)
: connection_id_(connection_id), : connection_id_(connection_id),
owner_(owner), server_(server),
delegate_(delegate.Pass()), delegate_(delegate.Pass()),
binding_(this, connection) { binding_(this, connection) {
DCHECK(delegate_); DCHECK(delegate_);
...@@ -133,7 +273,7 @@ HttpConnectionImpl::~HttpConnectionImpl() { ...@@ -133,7 +273,7 @@ HttpConnectionImpl::~HttpConnectionImpl() {
void HttpConnectionImpl::OnReceivedHttpRequest( void HttpConnectionImpl::OnReceivedHttpRequest(
const net::HttpServerRequestInfo& info) { const net::HttpServerRequestInfo& info) {
if (EncounteredConnectionError()) if (IsClosing())
return; return;
delegate_->OnReceivedRequest( delegate_->OnReceivedRequest(
...@@ -154,11 +294,32 @@ void HttpConnectionImpl::OnReceivedHttpRequest( ...@@ -154,11 +294,32 @@ void HttpConnectionImpl::OnReceivedHttpRequest(
void HttpConnectionImpl::OnReceivedWebSocketRequest( void HttpConnectionImpl::OnReceivedWebSocketRequest(
const net::HttpServerRequestInfo& info) { const net::HttpServerRequestInfo& info) {
// TODO(yzshen): implement it. if (IsClosing())
return;
delegate_->OnReceivedWebSocketRequest(
HttpRequest::From(info),
[this, info](InterfaceRequest<WebSocket> web_socket,
ScopedDataPipeConsumerHandle send_stream,
WebSocketClientPtr web_socket_client) {
if (!web_socket.is_pending() || !send_stream.is_valid() ||
!web_socket_client) {
Close();
return;
}
web_socket_.reset(new WebSocketImpl(this, web_socket.Pass(),
send_stream.Pass(),
web_socket_client.Pass()));
server_->server()->AcceptWebSocket(connection_id_, info);
});
} }
void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) { void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) {
// TODO(yzshen): implement it. if (IsClosing())
return;
web_socket_->OnReceivedWebSocketMessage(data);
} }
void HttpConnectionImpl::SetSendBufferSize( void HttpConnectionImpl::SetSendBufferSize(
...@@ -167,8 +328,8 @@ void HttpConnectionImpl::SetSendBufferSize( ...@@ -167,8 +328,8 @@ void HttpConnectionImpl::SetSendBufferSize(
if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max())) if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
size = std::numeric_limits<int32_t>::max(); size = std::numeric_limits<int32_t>::max();
owner_->server()->SetSendBufferSize( server_->server()->SetSendBufferSize(connection_id_,
connection_id_, static_cast<int32_t>(size)); static_cast<int32_t>(size));
callback.Run(MakeNetworkError(net::OK)); callback.Run(MakeNetworkError(net::OK));
} }
...@@ -178,8 +339,8 @@ void HttpConnectionImpl::SetReceiveBufferSize( ...@@ -178,8 +339,8 @@ void HttpConnectionImpl::SetReceiveBufferSize(
if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max())) if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
size = std::numeric_limits<int32_t>::max(); size = std::numeric_limits<int32_t>::max();
owner_->server()->SetReceiveBufferSize( server_->server()->SetReceiveBufferSize(connection_id_,
connection_id_, static_cast<int32_t>(size)); static_cast<int32_t>(size));
callback.Run(MakeNetworkError(net::OK)); callback.Run(MakeNetworkError(net::OK));
} }
...@@ -188,14 +349,7 @@ void HttpConnectionImpl::OnConnectionError() { ...@@ -188,14 +349,7 @@ void HttpConnectionImpl::OnConnectionError() {
// |delegate_| has closed the pipe. Although it is set as error handler for // |delegate_| has closed the pipe. Although it is set as error handler for
// both |binding_| and |delegate_|, it will only be called at most once // both |binding_| and |delegate_|, it will only be called at most once
// because when called it closes/resets |binding_| and |delegate_|. // because when called it closes/resets |binding_| and |delegate_|.
DCHECK(!EncounteredConnectionError()); Close();
binding_.Close();
delegate_.reset();
// Don't close the connection until all pending responses are sent.
if (response_body_readers_.empty())
owner_->server()->Close(connection_id_);
} }
void HttpConnectionImpl::OnFinishedReadingResponseBody( void HttpConnectionImpl::OnFinishedReadingResponseBody(
...@@ -235,10 +389,44 @@ void HttpConnectionImpl::OnFinishedReadingResponseBody( ...@@ -235,10 +389,44 @@ void HttpConnectionImpl::OnFinishedReadingResponseBody(
if (body) if (body)
info.SetBody(*body, content_type); info.SetBody(*body, content_type);
owner_->server()->SendResponse(connection_id_, info); server_->server()->SendResponse(connection_id_, info);
if (IsClosing())
NotifyOwnerCloseIfAllDone();
}
void HttpConnectionImpl::Close() {
DCHECK(!IsClosing());
binding_.Close();
delegate_.reset();
if (web_socket_)
web_socket_->Close();
NotifyOwnerCloseIfAllDone();
}
void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() {
DCHECK(IsClosing());
if (response_body_readers_.empty() && EncounteredConnectionError()) // Don't close the connection until all pending sends are done.
owner_->server()->Close(connection_id_); bool should_wait = !response_body_readers_.empty() || web_socket_;
if (!should_wait)
server_->server()->Close(connection_id_);
}
void HttpConnectionImpl::OnWebSocketClosed() {
web_socket_.reset();
if (IsClosing()) {
// The close operation is initiated by this object.
NotifyOwnerCloseIfAllDone();
} else {
// The close operation is initiated by |web_socket_|; start closing this
// object.
Close();
}
} }
} // namespace mojo } // namespace mojo
...@@ -26,9 +26,9 @@ class HttpServerImpl; ...@@ -26,9 +26,9 @@ class HttpServerImpl;
class HttpConnectionImpl : public HttpConnection, class HttpConnectionImpl : public HttpConnection,
public ErrorHandler { public ErrorHandler {
public: public:
// |owner| must outlive this object. // |server| must outlive this object.
HttpConnectionImpl(int connection_id, HttpConnectionImpl(int connection_id,
HttpServerImpl* owner, HttpServerImpl* server,
HttpConnectionDelegatePtr delegate, HttpConnectionDelegatePtr delegate,
HttpConnectionPtr* connection); HttpConnectionPtr* connection);
...@@ -40,6 +40,7 @@ class HttpConnectionImpl : public HttpConnection, ...@@ -40,6 +40,7 @@ class HttpConnectionImpl : public HttpConnection,
private: private:
class SimpleDataPipeReader; class SimpleDataPipeReader;
class WebSocketImpl;
// HttpConnection implementation. // HttpConnection implementation.
void SetSendBufferSize(uint32_t size, void SetSendBufferSize(uint32_t size,
...@@ -55,17 +56,27 @@ class HttpConnectionImpl : public HttpConnection, ...@@ -55,17 +56,27 @@ class HttpConnectionImpl : public HttpConnection,
SimpleDataPipeReader* reader, SimpleDataPipeReader* reader,
scoped_ptr<std::string> body); scoped_ptr<std::string> body);
bool EncounteredConnectionError() const { void Close();
return !binding_.is_bound() || !delegate_;
} // Checks whether Close() has been called.
bool IsClosing() const { return !binding_.is_bound(); }
// Checks whether all wrap-up work has been done during the closing process.
// If yes, notifies the owner, which may result in the destruction of this
// object.
void NotifyOwnerCloseIfAllDone();
void OnWebSocketClosed();
const int connection_id_; const int connection_id_;
HttpServerImpl* const owner_; HttpServerImpl* const server_;
HttpConnectionDelegatePtr delegate_; HttpConnectionDelegatePtr delegate_;
Binding<HttpConnection> binding_; Binding<HttpConnection> binding_;
// Owns its elements. // Owns its elements.
std::set<SimpleDataPipeReader*> response_body_readers_; std::set<SimpleDataPipeReader*> response_body_readers_;
scoped_ptr<WebSocketImpl> web_socket_;
DISALLOW_COPY_AND_ASSIGN(HttpConnectionImpl); DISALLOW_COPY_AND_ASSIGN(HttpConnectionImpl);
}; };
......
...@@ -2,19 +2,27 @@ ...@@ -2,19 +2,27 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#include "base/logging.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/linked_ptr.h" #include "base/memory/linked_ptr.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h" #include "base/memory/scoped_ptr.h"
#include "base/run_loop.h" #include "base/run_loop.h"
#include "base/strings/string_util.h" #include "base/strings/string_util.h"
#include "base/strings/stringprintf.h"
#include "mojo/application/public/cpp/application_connection.h" #include "mojo/application/public/cpp/application_connection.h"
#include "mojo/application/public/cpp/application_impl.h" #include "mojo/application/public/cpp/application_impl.h"
#include "mojo/application/public/cpp/application_test_base.h" #include "mojo/application/public/cpp/application_test_base.h"
#include "mojo/common/data_pipe_utils.h" #include "mojo/common/data_pipe_utils.h"
#include "mojo/services/network/net_address_type_converters.h" #include "mojo/services/network/net_address_type_converters.h"
#include "mojo/services/network/public/cpp/web_socket_read_queue.h"
#include "mojo/services/network/public/cpp/web_socket_write_queue.h"
#include "mojo/services/network/public/interfaces/http_connection.mojom.h"
#include "mojo/services/network/public/interfaces/http_message.mojom.h"
#include "mojo/services/network/public/interfaces/http_server.mojom.h" #include "mojo/services/network/public/interfaces/http_server.mojom.h"
#include "mojo/services/network/public/interfaces/net_address.mojom.h"
#include "mojo/services/network/public/interfaces/network_service.mojom.h" #include "mojo/services/network/public/interfaces/network_service.mojom.h"
#include "mojo/services/network/public/interfaces/web_socket.mojom.h"
#include "net/base/io_buffer.h" #include "net/base/io_buffer.h"
#include "net/base/net_errors.h" #include "net/base/net_errors.h"
#include "net/base/test_completion_callback.h" #include "net/base/test_completion_callback.h"
...@@ -261,6 +269,141 @@ class TestHttpClient { ...@@ -261,6 +269,141 @@ class TestHttpClient {
DISALLOW_COPY_AND_ASSIGN(TestHttpClient); DISALLOW_COPY_AND_ASSIGN(TestHttpClient);
}; };
class WebSocketClientImpl : public WebSocketClient {
public:
explicit WebSocketClientImpl()
: binding_(this, &client_ptr_),
wait_for_message_count_(0),
run_loop_(nullptr) {}
~WebSocketClientImpl() override {}
// Establishes a connection from the client side.
void Connect(WebSocketPtr web_socket, const std::string& url) {
web_socket_ = web_socket.Pass();
DataPipe data_pipe;
send_stream_ = data_pipe.producer_handle.Pass();
write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get()));
web_socket_->Connect(url, Array<String>(0), "http://example.com",
data_pipe.consumer_handle.Pass(), client_ptr_.Pass());
}
// Establishes a connection from the server side.
void AcceptConnectRequest(
const HttpConnectionDelegate::OnReceivedWebSocketRequestCallback&
callback) {
InterfaceRequest<WebSocket> web_socket_request = GetProxy(&web_socket_);
DataPipe data_pipe;
send_stream_ = data_pipe.producer_handle.Pass();
write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get()));
callback.Run(web_socket_request.Pass(), data_pipe.consumer_handle.Pass(),
client_ptr_.Pass());
}
void WaitForConnectCompletion() {
DCHECK(!run_loop_);
if (receive_stream_.is_valid())
return;
base::RunLoop run_loop;
run_loop_ = &run_loop;
run_loop.Run();
run_loop_ = nullptr;
}
void Send(const std::string& message) {
DCHECK(!message.empty());
uint32_t size = static_cast<uint32_t>(message.size());
write_send_stream_->Write(
&message[0], size,
base::Bind(&WebSocketClientImpl::OnFinishedWritingSendStream,
base::Unretained(this), size));
}
void WaitForMessage(size_t count) {
DCHECK(!run_loop_);
if (received_messages_.size() >= count)
return;
wait_for_message_count_ = count;
base::RunLoop run_loop;
run_loop_ = &run_loop;
run_loop.Run();
run_loop_ = nullptr;
}
std::vector<std::string>& received_messages() { return received_messages_; }
private:
// WebSocketClient implementation.
void DidConnect(const String& selected_subprotocol,
const String& extensions,
ScopedDataPipeConsumerHandle receive_stream) override {
receive_stream_ = receive_stream.Pass();
read_receive_stream_.reset(new WebSocketReadQueue(receive_stream_.get()));
web_socket_->FlowControl(2048);
if (run_loop_)
run_loop_->Quit();
}
void DidReceiveData(bool fin,
WebSocket::MessageType type,
uint32_t num_bytes) override {
DCHECK(num_bytes > 0);
read_receive_stream_->Read(
num_bytes,
base::Bind(&WebSocketClientImpl::OnFinishedReadingReceiveStream,
base::Unretained(this), num_bytes));
}
void DidReceiveFlowControl(int64_t quota) override {}
void DidFail(const String& message) override {}
void DidClose(bool was_clean, uint16_t code, const String& reason) override {}
void OnFinishedWritingSendStream(uint32_t num_bytes, const char* buffer) {
EXPECT_TRUE(buffer);
web_socket_->Send(true, WebSocket::MESSAGE_TYPE_TEXT, num_bytes);
}
void OnFinishedReadingReceiveStream(uint32_t num_bytes, const char* data) {
EXPECT_TRUE(data);
received_messages_.push_back(std::string(data, num_bytes));
if (run_loop_ && received_messages_.size() >= wait_for_message_count_) {
wait_for_message_count_ = 0;
run_loop_->Quit();
}
}
WebSocketClientPtr client_ptr_;
Binding<WebSocketClient> binding_;
WebSocketPtr web_socket_;
ScopedDataPipeProducerHandle send_stream_;
scoped_ptr<WebSocketWriteQueue> write_send_stream_;
ScopedDataPipeConsumerHandle receive_stream_;
scoped_ptr<WebSocketReadQueue> read_receive_stream_;
std::vector<std::string> received_messages_;
size_t wait_for_message_count_;
// Pointing to a stack-allocated RunLoop instance.
base::RunLoop* run_loop_;
DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl);
};
class HttpConnectionDelegateImpl : public HttpConnectionDelegate { class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
public: public:
struct PendingRequest { struct PendingRequest {
...@@ -272,8 +415,8 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { ...@@ -272,8 +415,8 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
InterfaceRequest<HttpConnectionDelegate> request) InterfaceRequest<HttpConnectionDelegate> request)
: connection_(connection.Pass()), : connection_(connection.Pass()),
binding_(this, request.Pass()), binding_(this, request.Pass()),
run_loop_(nullptr), wait_for_request_count_(0),
wait_for_request_count_(0) {} run_loop_(nullptr) {}
~HttpConnectionDelegateImpl() override {} ~HttpConnectionDelegateImpl() override {}
// HttpConnectionDelegate implementation: // HttpConnectionDelegate implementation:
...@@ -292,7 +435,12 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { ...@@ -292,7 +435,12 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
void OnReceivedWebSocketRequest( void OnReceivedWebSocketRequest(
HttpRequestPtr request, HttpRequestPtr request,
const OnReceivedWebSocketRequestCallback& callback) override { const OnReceivedWebSocketRequestCallback& callback) override {
NOTREACHED(); web_socket_.reset(new WebSocketClientImpl());
web_socket_->AcceptConnectRequest(callback);
if (run_loop_)
run_loop_->Quit();
} }
void SendResponse(HttpResponsePtr response) { void SendResponse(HttpResponsePtr response) {
...@@ -305,6 +453,9 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { ...@@ -305,6 +453,9 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
void WaitForRequest(size_t count) { void WaitForRequest(size_t count) {
DCHECK(!run_loop_); DCHECK(!run_loop_);
if (pending_requests_.size() >= count)
return;
wait_for_request_count_ = count; wait_for_request_count_ = count;
base::RunLoop run_loop; base::RunLoop run_loop;
run_loop_ = &run_loop; run_loop_ = &run_loop;
...@@ -312,17 +463,33 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { ...@@ -312,17 +463,33 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate {
run_loop_ = nullptr; run_loop_ = nullptr;
} }
void WaitForWebSocketRequest() {
DCHECK(!run_loop_);
if (web_socket_)
return;
base::RunLoop run_loop;
run_loop_ = &run_loop;
run_loop.Run();
run_loop_ = nullptr;
}
std::vector<linked_ptr<PendingRequest>>& pending_requests() { std::vector<linked_ptr<PendingRequest>>& pending_requests() {
return pending_requests_; return pending_requests_;
} }
WebSocketClientImpl* web_socket() { return web_socket_.get(); }
private: private:
HttpConnectionPtr connection_; HttpConnectionPtr connection_;
Binding<HttpConnectionDelegate> binding_; Binding<HttpConnectionDelegate> binding_;
std::vector<linked_ptr<PendingRequest>> pending_requests_; std::vector<linked_ptr<PendingRequest>> pending_requests_;
size_t wait_for_request_count_;
scoped_ptr<WebSocketClientImpl> web_socket_;
// Pointing to a stack-allocated RunLoop instance. // Pointing to a stack-allocated RunLoop instance.
base::RunLoop* run_loop_; base::RunLoop* run_loop_;
size_t wait_for_request_count_;
DISALLOW_COPY_AND_ASSIGN(HttpConnectionDelegateImpl); DISALLOW_COPY_AND_ASSIGN(HttpConnectionDelegateImpl);
}; };
...@@ -331,8 +498,8 @@ class HttpServerDelegateImpl : public HttpServerDelegate { ...@@ -331,8 +498,8 @@ class HttpServerDelegateImpl : public HttpServerDelegate {
public: public:
explicit HttpServerDelegateImpl(HttpServerDelegatePtr* delegate_ptr) explicit HttpServerDelegateImpl(HttpServerDelegatePtr* delegate_ptr)
: binding_(this, delegate_ptr), : binding_(this, delegate_ptr),
run_loop_(nullptr), wait_for_connection_count_(0),
wait_for_connection_count_(0) {} run_loop_(nullptr) {}
~HttpServerDelegateImpl() override {} ~HttpServerDelegateImpl() override {}
// HttpServerDelegate implementation. // HttpServerDelegate implementation.
...@@ -349,6 +516,9 @@ class HttpServerDelegateImpl : public HttpServerDelegate { ...@@ -349,6 +516,9 @@ class HttpServerDelegateImpl : public HttpServerDelegate {
void WaitForConnection(size_t count) { void WaitForConnection(size_t count) {
DCHECK(!run_loop_); DCHECK(!run_loop_);
if (connections_.size() >= count)
return;
wait_for_connection_count_ = count; wait_for_connection_count_ = count;
base::RunLoop run_loop; base::RunLoop run_loop;
run_loop_ = &run_loop; run_loop_ = &run_loop;
...@@ -363,9 +533,9 @@ class HttpServerDelegateImpl : public HttpServerDelegate { ...@@ -363,9 +533,9 @@ class HttpServerDelegateImpl : public HttpServerDelegate {
private: private:
Binding<HttpServerDelegate> binding_; Binding<HttpServerDelegate> binding_;
std::vector<linked_ptr<HttpConnectionDelegateImpl>> connections_; std::vector<linked_ptr<HttpConnectionDelegateImpl>> connections_;
size_t wait_for_connection_count_;
// Pointing to a stack-allocated RunLoop instance. // Pointing to a stack-allocated RunLoop instance.
base::RunLoop* run_loop_; base::RunLoop* run_loop_;
size_t wait_for_connection_count_;
DISALLOW_COPY_AND_ASSIGN(HttpServerDelegateImpl); DISALLOW_COPY_AND_ASSIGN(HttpServerDelegateImpl);
}; };
...@@ -480,4 +650,42 @@ TEST_F(HttpServerAppTest, HttpRequestResponseWithBody) { ...@@ -480,4 +650,42 @@ TEST_F(HttpServerAppTest, HttpRequestResponseWithBody) {
CheckResponse(response_data, response_message); CheckResponse(response_data, response_message);
} }
TEST_F(HttpServerAppTest, WebSocket) {
NetAddressPtr bound_to;
HttpServerDelegatePtr server_delegate_ptr;
HttpServerDelegateImpl server_delegate_impl(&server_delegate_ptr);
CreateHttpServer(server_delegate_ptr.Pass(), &bound_to);
WebSocketPtr web_socket_ptr;
network_service_->CreateWebSocket(GetProxy(&web_socket_ptr));
WebSocketClientImpl socket_0;
socket_0.Connect(
web_socket_ptr.Pass(),
base::StringPrintf("ws://127.0.0.1:%d/hello", bound_to->ipv4->port));
server_delegate_impl.WaitForConnection(1);
HttpConnectionDelegateImpl& connection =
*server_delegate_impl.connections()[0];
connection.WaitForWebSocketRequest();
WebSocketClientImpl& socket_1 = *connection.web_socket();
socket_1.WaitForConnectCompletion();
socket_0.WaitForConnectCompletion();
socket_0.Send("Hello");
socket_0.Send("world!");
socket_1.WaitForMessage(2);
EXPECT_EQ("Hello", socket_1.received_messages()[0]);
EXPECT_EQ("world!", socket_1.received_messages()[1]);
socket_1.Send("How do");
socket_1.Send("you do?");
socket_0.WaitForMessage(2);
EXPECT_EQ("How do", socket_0.received_messages()[0]);
EXPECT_EQ("you do?", socket_0.received_messages()[1]);
}
} // namespace mojo } // namespace mojo
...@@ -28,6 +28,13 @@ interface HttpConnectionDelegate { ...@@ -28,6 +28,13 @@ interface HttpConnectionDelegate {
// WebSocket should be written to the producer end of the |send_stream|. // WebSocket should be written to the producer end of the |send_stream|.
// |web_socket| will be already connected. There is no need to call Connect() // |web_socket| will be already connected. There is no need to call Connect()
// on it. But |client| will still receive a DidConnect() notification. // on it. But |client| will still receive a DidConnect() notification.
//
// NOTE: WebSocket server support is not fully implemented. For now the
// following are not supported:
// - negotiating subprotocol or extension;
// - fragmented or non-text messages;
// - failure or close notification;
// - flow control.
OnReceivedWebSocketRequest(HttpRequest request) OnReceivedWebSocketRequest(HttpRequest request)
=> (WebSocket&? web_socket, => (WebSocket&? web_socket,
handle<data_pipe_consumer>? send_stream, handle<data_pipe_consumer>? send_stream,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment