Commit 307321d4 authored by Keita Suzuki's avatar Keita Suzuki Committed by Commit Bot

[WebSocket] Add support to consume Datapipe in network service

This commit adds support to network service to consume websocket frame
data from mojo datapipe produced from blink. To achieve this, this
commit adds a new message SendMessage() in mojo WebSocket interface.
It also adds SimpleWatcher to watch the consumer and be notified when
needed, and add callback function OnReadable() to support this.

This commit also adds ReadAndSendFrameFromDataPipe() to actually
consume data from the datapipe.

The producer side of the datapipe and the caller of SendMessage() are
not implemented yet for security review. WIP CLs are uploaded here:
https://chromium-review.googlesource.com/c/chromium/src/+/2082869
https://chromium-review.googlesource.com/c/chromium/src/+/2083777

Design Doc:
https://docs.google.com/document/d/1YWj1z9r8wxemGdod6S2tkchudhp6PvNaH3qSO0oucfY/

Bug: 1056030
Change-Id: I580e3776bf0f1a27e8d83e5c92087f5542b19cb6
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2071189
Commit-Queue: Keita Suzuki <suzukikeita@google.com>
Reviewed-by: default avatarKinuko Yasuda <kinuko@chromium.org>
Reviewed-by: default avatarKaran Bhatia <karandeepb@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarAdam Rice <ricea@chromium.org>
Reviewed-by: default avatarYoichi Osato <yoichio@chromium.org>
Cr-Commit-Position: refs/heads/master@{#751557}
parent 629f0149
...@@ -166,7 +166,8 @@ void WebRequestProxyingWebSocket::OnConnectionEstablished( ...@@ -166,7 +166,8 @@ void WebRequestProxyingWebSocket::OnConnectionEstablished(
mojo::PendingRemote<network::mojom::WebSocket> websocket, mojo::PendingRemote<network::mojom::WebSocket> websocket,
mojo::PendingReceiver<network::mojom::WebSocketClient> client_receiver, mojo::PendingReceiver<network::mojom::WebSocketClient> client_receiver,
network::mojom::WebSocketHandshakeResponsePtr response, network::mojom::WebSocketHandshakeResponsePtr response,
mojo::ScopedDataPipeConsumerHandle readable) { mojo::ScopedDataPipeConsumerHandle readable,
mojo::ScopedDataPipeProducerHandle writable) {
DCHECK(forwarding_handshake_client_); DCHECK(forwarding_handshake_client_);
DCHECK(!is_done_); DCHECK(!is_done_);
is_done_ = true; is_done_ = true;
...@@ -174,6 +175,7 @@ void WebRequestProxyingWebSocket::OnConnectionEstablished( ...@@ -174,6 +175,7 @@ void WebRequestProxyingWebSocket::OnConnectionEstablished(
client_receiver_ = std::move(client_receiver); client_receiver_ = std::move(client_receiver);
handshake_response_ = std::move(response); handshake_response_ = std::move(response);
readable_ = std::move(readable); readable_ = std::move(readable);
writable_ = std::move(writable);
response_->remote_endpoint = handshake_response_->remote_endpoint; response_->remote_endpoint = handshake_response_->remote_endpoint;
...@@ -203,7 +205,8 @@ void WebRequestProxyingWebSocket::ContinueToCompleted() { ...@@ -203,7 +205,8 @@ void WebRequestProxyingWebSocket::ContinueToCompleted() {
browser_context_, &info_, net::ERR_WS_UPGRADE); browser_context_, &info_, net::ERR_WS_UPGRADE);
forwarding_handshake_client_->OnConnectionEstablished( forwarding_handshake_client_->OnConnectionEstablished(
std::move(websocket_), std::move(client_receiver_), std::move(websocket_), std::move(client_receiver_),
std::move(handshake_response_), std::move(readable_)); std::move(handshake_response_), std::move(readable_),
std::move(writable_));
// Deletes |this|. // Deletes |this|.
proxies_->RemoveProxy(this); proxies_->RemoveProxy(this);
......
...@@ -64,7 +64,8 @@ class WebRequestProxyingWebSocket ...@@ -64,7 +64,8 @@ class WebRequestProxyingWebSocket
mojo::PendingRemote<network::mojom::WebSocket> websocket, mojo::PendingRemote<network::mojom::WebSocket> websocket,
mojo::PendingReceiver<network::mojom::WebSocketClient> client_receiver, mojo::PendingReceiver<network::mojom::WebSocketClient> client_receiver,
network::mojom::WebSocketHandshakeResponsePtr response, network::mojom::WebSocketHandshakeResponsePtr response,
mojo::ScopedDataPipeConsumerHandle readable) override; mojo::ScopedDataPipeConsumerHandle readable,
mojo::ScopedDataPipeProducerHandle writable) override;
// network::mojom::AuthenticationHandler method: // network::mojom::AuthenticationHandler method:
void OnAuthRequired(const net::AuthChallengeInfo& auth_info, void OnAuthRequired(const net::AuthChallengeInfo& auth_info,
...@@ -147,6 +148,7 @@ class WebRequestProxyingWebSocket ...@@ -147,6 +148,7 @@ class WebRequestProxyingWebSocket
mojo::PendingReceiver<network::mojom::WebSocketClient> client_receiver_; mojo::PendingReceiver<network::mojom::WebSocketClient> client_receiver_;
network::mojom::WebSocketHandshakeResponsePtr handshake_response_ = nullptr; network::mojom::WebSocketHandshakeResponsePtr handshake_response_ = nullptr;
mojo::ScopedDataPipeConsumerHandle readable_; mojo::ScopedDataPipeConsumerHandle readable_;
mojo::ScopedDataPipeProducerHandle writable_;
WebRequestInfo info_; WebRequestInfo info_;
......
...@@ -54,6 +54,8 @@ interface AuthenticationHandler { ...@@ -54,6 +54,8 @@ interface AuthenticationHandler {
IPEndPoint remote_endpoint) => (AuthCredentials? credentials); IPEndPoint remote_endpoint) => (AuthCredentials? credentials);
}; };
// This interface is for client-side WebSocket handshake. Used to initialize
// the WebSocket Connection.
interface WebSocketHandshakeClient { interface WebSocketHandshakeClient {
// Notify the renderer that the browser has started an opening handshake. // Notify the renderer that the browser has started an opening handshake.
OnOpeningHandshakeStarted(WebSocketHandshakeRequest request); OnOpeningHandshakeStarted(WebSocketHandshakeRequest request);
...@@ -61,13 +63,20 @@ interface WebSocketHandshakeClient { ...@@ -61,13 +63,20 @@ interface WebSocketHandshakeClient {
// Called when the connection is established. // Called when the connection is established.
// |response| may contain cookie-related headers when the client has // |response| may contain cookie-related headers when the client has
// an access to raw cookie information. // an access to raw cookie information.
// |readable| is readable datapipe to receive data from browser. // |readable| is readable datapipe to receive data from network service.
// |writable| is writable datapipe used to transfer the actual content of the
// message(data) to the network service. The network services later sends out
// the actual message by framing each message from the meta-info given from
// the renderer side with |SendMessage()|.
OnConnectionEstablished(pending_remote<WebSocket> socket, OnConnectionEstablished(pending_remote<WebSocket> socket,
pending_receiver<WebSocketClient> client_receiver, pending_receiver<WebSocketClient> client_receiver,
WebSocketHandshakeResponse response, WebSocketHandshakeResponse response,
handle<data_pipe_consumer> readable); handle<data_pipe_consumer> readable,
handle<data_pipe_producer> writable);
}; };
// The interface for the client side of WebSocket. Implemented by renderer
// processes to receive messages from the network service.
interface WebSocketClient { interface WebSocketClient {
// Receive a non-control frame from the remote server. // Receive a non-control frame from the remote server.
// - |fin| indicates that this frame is the last in the current message. // - |fin| indicates that this frame is the last in the current message.
...@@ -112,6 +121,8 @@ interface WebSocketClient { ...@@ -112,6 +121,8 @@ interface WebSocketClient {
OnClosingHandshake(); OnClosingHandshake();
}; };
// The interface for the server side of WebSocket. Implemented by the network
// service. Used to send out data to the network service.
interface WebSocket { interface WebSocket {
// The client side may observe the following disconnection reason from the // The client side may observe the following disconnection reason from the
// service side: // service side:
...@@ -131,6 +142,17 @@ interface WebSocket { ...@@ -131,6 +142,17 @@ interface WebSocket {
WebSocketMessageType type, WebSocketMessageType type,
mojo_base.mojom.ReadOnlyBuffer data); mojo_base.mojom.ReadOnlyBuffer data);
// Sends a message via mojo datapipe to the remote server.
// - |type| is the type of the message. It must be set to either
// WebSocketMessageType.TEXT or WebSocketMessageType.BINARY.
// - |data_length| is the actual length of message. The message is written to
// the datapipe named |writable| in the
// WebSocketHandshakeClient.OnConnectionEstablished message.
//
// If |type| is WebSocketMessageType.TEXT, then the message must be
// valid UTF-8.
SendMessage(WebSocketMessageType type, uint64 data_length);
// Let browser to start receiving WebSocket data frames from network stream. // Let browser to start receiving WebSocket data frames from network stream.
// TODO(yoichio): Remove this by move Connect() after checking throttle at // TODO(yoichio): Remove this by move Connect() after checking throttle at
// WebSocketChannelImpl::Connect so that OnAddChannelResponse is // WebSocketChannelImpl::Connect so that OnAddChannelResponse is
......
...@@ -211,6 +211,20 @@ void WebSocket::WebSocketEventHandler::OnAddChannelResponse( ...@@ -211,6 +211,20 @@ void WebSocket::WebSocketEventHandler::OnAddChannelResponse(
base::BindRepeating(&WebSocket::OnWritable, base::Unretained(impl_))); base::BindRepeating(&WebSocket::OnWritable, base::Unretained(impl_)));
DCHECK_EQ(mojo_result, MOJO_RESULT_OK); DCHECK_EQ(mojo_result, MOJO_RESULT_OK);
mojo::ScopedDataPipeProducerHandle writable;
const MojoResult write_pipe_result =
mojo::CreateDataPipe(&data_pipe_options, &writable, &impl_->readable_);
if (write_pipe_result != MOJO_RESULT_OK) {
DVLOG(1) << "mojo::CreateDataPipe error:" << result;
impl_->Reset();
return;
}
const MojoResult mojo_readable_result = impl_->readable_watcher_.Watch(
impl_->readable_.get(), MOJO_HANDLE_SIGNAL_READABLE,
MOJO_WATCH_CONDITION_SATISFIED,
base::BindRepeating(&WebSocket::OnReadable, base::Unretained(impl_)));
DCHECK_EQ(mojo_readable_result, MOJO_RESULT_OK);
mojom::WebSocketHandshakeResponsePtr mojo_response = mojom::WebSocketHandshakeResponsePtr mojo_response =
ToMojo(std::move(response), !!impl_->has_raw_headers_access_); ToMojo(std::move(response), !!impl_->has_raw_headers_access_);
mojo_response->selected_protocol = selected_protocol; mojo_response->selected_protocol = selected_protocol;
...@@ -218,7 +232,7 @@ void WebSocket::WebSocketEventHandler::OnAddChannelResponse( ...@@ -218,7 +232,7 @@ void WebSocket::WebSocketEventHandler::OnAddChannelResponse(
impl_->handshake_client_->OnConnectionEstablished( impl_->handshake_client_->OnConnectionEstablished(
impl_->receiver_.BindNewPipeAndPassRemote(), impl_->receiver_.BindNewPipeAndPassRemote(),
impl_->client_.BindNewPipeAndPassReceiver(), std::move(mojo_response), impl_->client_.BindNewPipeAndPassReceiver(), std::move(mojo_response),
std::move(readable)); std::move(readable), std::move(writable));
impl_->receiver_.set_disconnect_handler(base::BindOnce( impl_->receiver_.set_disconnect_handler(base::BindOnce(
&WebSocket::OnConnectionError, base::Unretained(impl_), FROM_HERE)); &WebSocket::OnConnectionError, base::Unretained(impl_), FROM_HERE));
impl_->handshake_client_.reset(); impl_->handshake_client_.reset();
...@@ -385,6 +399,9 @@ WebSocket::WebSocket( ...@@ -385,6 +399,9 @@ WebSocket::WebSocket(
site_for_cookies_(site_for_cookies), site_for_cookies_(site_for_cookies),
has_raw_headers_access_(has_raw_headers_access), has_raw_headers_access_(has_raw_headers_access),
writable_watcher_(FROM_HERE, writable_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::ThreadTaskRunnerHandle::Get()),
readable_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL, mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::ThreadTaskRunnerHandle::Get()) { base::ThreadTaskRunnerHandle::Get()) {
DCHECK(handshake_client_); DCHECK(handshake_client_);
...@@ -449,6 +466,26 @@ void WebSocket::SendFrame(bool fin, ...@@ -449,6 +466,26 @@ void WebSocket::SendFrame(bool fin,
data.size()); data.size());
} }
void WebSocket::SendMessage(mojom::WebSocketMessageType type,
uint64_t data_length) {
DVLOG(3) << "WebSocket::SendMessage @" << reinterpret_cast<void*>(this)
<< " type=" << type << " data is " << data_length << " bytes";
DCHECK(channel_) << "WebSocket::SendMessage is called but there is "
"no active channel.";
DCHECK(handshake_succeeded_);
// This is guaranteed by mojo.
if (type == mojom::WebSocketMessageType::CONTINUATION) {
Reset();
return;
}
DCHECK(IsKnownEnumValue(type));
pending_send_data_frames_.push(DataFrame(type, data_length));
ReadAndSendFromDataPipe();
}
void WebSocket::StartReceiving() { void WebSocket::StartReceiving() {
DCHECK(pending_data_frames_.empty()); DCHECK(pending_data_frames_.empty());
ignore_result(channel_->ReadFrames()); ignore_result(channel_->ReadFrames());
...@@ -627,6 +664,70 @@ void WebSocket::SendDataFrame(base::span<const char>* payload) { ...@@ -627,6 +664,70 @@ void WebSocket::SendDataFrame(base::span<const char>* payload) {
return; return;
} }
void WebSocket::OnReadable(MojoResult result,
const mojo::HandleSignalsState& state) {
if (result != MOJO_RESULT_OK) {
DVLOG(1) << "WebSocket::OnWritable mojo error=" << result;
Reset();
return;
}
wait_for_readable_ = false;
ReadAndSendFromDataPipe();
}
void WebSocket::ReadAndSendFromDataPipe() {
if (wait_for_readable_) {
return;
}
while (!pending_send_data_frames_.empty()) {
DataFrame& data_frame = pending_send_data_frames_.front();
DVLOG(2) << " ConsumePendingDataFrame frame=(" << data_frame.type
<< ", (data_length = " << data_frame.data_length << "))";
if (data_frame.data_length == 0) {
auto data_to_pass = base::MakeRefCounted<net::IOBuffer>(0);
channel_->SendFrame(true, MessageTypeToOpCode(data_frame.type),
std::move(data_to_pass), 0);
pending_send_data_frames_.pop();
continue;
}
const void* buffer;
uint32_t readable_size;
const MojoResult begin_result = readable_->BeginReadData(
&buffer, &readable_size, MOJO_READ_DATA_FLAG_NONE);
if (begin_result == MOJO_RESULT_SHOULD_WAIT) {
wait_for_readable_ = true;
readable_watcher_.ArmOrNotify();
return;
}
if (begin_result == MOJO_RESULT_FAILED_PRECONDITION) {
return;
}
DCHECK_EQ(begin_result, MOJO_RESULT_OK);
const size_t size_to_send =
std::min(static_cast<uint64_t>(readable_size), data_frame.data_length);
auto data_to_pass = base::MakeRefCounted<net::IOBuffer>(size_to_send);
const bool is_final = (size_to_send == data_frame.data_length);
memcpy(data_to_pass->data(), buffer, size_to_send);
channel_->SendFrame(is_final, MessageTypeToOpCode(data_frame.type),
std::move(data_to_pass), size_to_send);
const MojoResult end_result = readable_->EndReadData(size_to_send);
DCHECK_EQ(end_result, MOJO_RESULT_OK);
if (size_to_send == data_frame.data_length) {
pending_send_data_frames_.pop();
continue;
}
DCHECK_GT(data_frame.data_length, size_to_send);
data_frame.type = mojom::WebSocketMessageType::CONTINUATION;
data_frame.data_length -= size_to_send;
}
return;
}
void WebSocket::OnSSLCertificateErrorResponse( void WebSocket::OnSSLCertificateErrorResponse(
std::unique_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks, std::unique_ptr<net::WebSocketEventInterface::SSLErrorCallbacks> callbacks,
const net::SSLInfo& ssl_info, const net::SSLInfo& ssl_info,
......
...@@ -72,6 +72,8 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket { ...@@ -72,6 +72,8 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
void SendFrame(bool fin, void SendFrame(bool fin,
mojom::WebSocketMessageType type, mojom::WebSocketMessageType type,
base::span<const uint8_t> data) override; base::span<const uint8_t> data) override;
void SendMessage(mojom::WebSocketMessageType type,
uint64_t data_length) override;
void StartReceiving() override; void StartReceiving() override;
void StartClosingHandshake(uint16_t code, const std::string& reason) override; void StartClosingHandshake(uint16_t code, const std::string& reason) override;
...@@ -116,6 +118,13 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket { ...@@ -116,6 +118,13 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
DISALLOW_COPY_AND_ASSIGN(UnownedPointer); DISALLOW_COPY_AND_ASSIGN(UnownedPointer);
}; };
struct DataFrame final {
DataFrame(mojom::WebSocketMessageType type, uint64_t data_length)
: type(type), data_length(data_length) {}
mojom::WebSocketMessageType type;
uint64_t data_length;
};
void OnConnectionError(const base::Location& set_from); void OnConnectionError(const base::Location& set_from);
void AddChannel(const GURL& socket_url, void AddChannel(const GURL& socket_url,
const std::vector<std::string>& requested_protocols, const std::vector<std::string>& requested_protocols,
...@@ -150,6 +159,10 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket { ...@@ -150,6 +159,10 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
void SendPendingDataFrames(); void SendPendingDataFrames();
void SendDataFrame(base::span<const char>* data_span); void SendDataFrame(base::span<const char>* data_span);
// Datapipe functions to send.
void OnReadable(MojoResult result, const mojo::HandleSignalsState& state);
void ReadAndSendFromDataPipe();
// |factory_| owns |this|. // |factory_| owns |this|.
WebSocketFactory* const factory_; WebSocketFactory* const factory_;
mojo::Receiver<mojom::WebSocket> receiver_{this}; mojo::Receiver<mojom::WebSocket> receiver_{this};
...@@ -189,6 +202,12 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket { ...@@ -189,6 +202,12 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) WebSocket : public mojom::WebSocket {
base::queue<base::span<const char>> pending_data_frames_; base::queue<base::span<const char>> pending_data_frames_;
bool wait_for_writable_ = false; bool wait_for_writable_ = false;
// Datapipe fields to send.
mojo::ScopedDataPipeConsumerHandle readable_;
mojo::SimpleWatcher readable_watcher_;
base::queue<DataFrame> pending_send_data_frames_;
bool wait_for_readable_ = false;
base::WeakPtrFactory<WebSocket> weak_ptr_factory_{this}; base::WeakPtrFactory<WebSocket> weak_ptr_factory_{this};
DISALLOW_COPY_AND_ASSIGN(WebSocket); DISALLOW_COPY_AND_ASSIGN(WebSocket);
......
...@@ -456,7 +456,8 @@ void WebSocketChannelImpl::OnConnectionEstablished( ...@@ -456,7 +456,8 @@ void WebSocketChannelImpl::OnConnectionEstablished(
mojo::PendingReceiver<network::mojom::blink::WebSocketClient> mojo::PendingReceiver<network::mojom::blink::WebSocketClient>
client_receiver, client_receiver,
network::mojom::blink::WebSocketHandshakeResponsePtr response, network::mojom::blink::WebSocketHandshakeResponsePtr response,
mojo::ScopedDataPipeConsumerHandle readable) { mojo::ScopedDataPipeConsumerHandle readable,
mojo::ScopedDataPipeProducerHandle writable) {
DCHECK_EQ(GetState(), State::kConnecting); DCHECK_EQ(GetState(), State::kConnecting);
const String& protocol = response->selected_protocol; const String& protocol = response->selected_protocol;
const String& extensions = response->extensions; const String& extensions = response->extensions;
...@@ -484,6 +485,8 @@ void WebSocketChannelImpl::OnConnectionEstablished( ...@@ -484,6 +485,8 @@ void WebSocketChannelImpl::OnConnectionEstablished(
websocket_.Bind(std::move(websocket), websocket_.Bind(std::move(websocket),
execution_context_->GetTaskRunner(TaskType::kNetworking)); execution_context_->GetTaskRunner(TaskType::kNetworking));
readable_ = std::move(readable); readable_ = std::move(readable);
// TODO(suzukikeita): Implement upload via |writable_| instead of SendFrame.
writable_ = std::move(writable);
const MojoResult mojo_result = readable_watcher_.Watch( const MojoResult mojo_result = readable_watcher_.Watch(
readable_.get(), MOJO_HANDLE_SIGNAL_READABLE, readable_.get(), MOJO_HANDLE_SIGNAL_READABLE,
MOJO_WATCH_CONDITION_SATISFIED, MOJO_WATCH_CONDITION_SATISFIED,
......
...@@ -119,7 +119,8 @@ class MODULES_EXPORT WebSocketChannelImpl final ...@@ -119,7 +119,8 @@ class MODULES_EXPORT WebSocketChannelImpl final
mojo::PendingReceiver<network::mojom::blink::WebSocketClient> mojo::PendingReceiver<network::mojom::blink::WebSocketClient>
client_receiver, client_receiver,
network::mojom::blink::WebSocketHandshakeResponsePtr, network::mojom::blink::WebSocketHandshakeResponsePtr,
mojo::ScopedDataPipeConsumerHandle readable) override; mojo::ScopedDataPipeConsumerHandle readable,
mojo::ScopedDataPipeProducerHandle writable) override;
// network::mojom::blink::WebSocketClient methods: // network::mojom::blink::WebSocketClient methods:
void OnDataFrame(bool fin, void OnDataFrame(bool fin,
...@@ -265,6 +266,8 @@ class MODULES_EXPORT WebSocketChannelImpl final ...@@ -265,6 +266,8 @@ class MODULES_EXPORT WebSocketChannelImpl final
mojo::SimpleWatcher readable_watcher_; mojo::SimpleWatcher readable_watcher_;
WTF::Deque<DataFrame> pending_data_frames_; WTF::Deque<DataFrame> pending_data_frames_;
mojo::ScopedDataPipeProducerHandle writable_;
const scoped_refptr<base::SingleThreadTaskRunner> file_reading_task_runner_; const scoped_refptr<base::SingleThreadTaskRunner> file_reading_task_runner_;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment