Commit 8d3376a0 authored by Yutaka Hirano's avatar Yutaka Hirano Committed by Commit Bot

Merge WebSocketHandleImpl into WebSocketChannelImpl

In the past we wanted to separate IPC stuff from other parts and that's
why WebSocketHandle[Impl] was created, but with mojo we don't need the
separation.

Bug: 977912
Change-Id: I35dd04c17ef4df8e0122ac0490696c255b5c1192
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1773010
Commit-Queue: Yutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarAdam Rice <ricea@chromium.org>
Reviewed-by: default avatarYoichi Osato <yoichio@chromium.org>
Cr-Commit-Position: refs/heads/master@{#692150}
parent f34bc8a5
......@@ -21,10 +21,6 @@ blink_modules_sources("websockets") {
"websocket_channel_impl.h",
"websocket_common.cc",
"websocket_common.h",
"websocket_handle.h",
"websocket_handle_client.h",
"websocket_handle_impl.cc",
"websocket_handle_impl.h",
"websocket_message_chunk_accumulator.cc",
"websocket_message_chunk_accumulator.h",
"websocket_stream.cc",
......
include_rules = [
"+mojo/public/cpp/bindings",
"+mojo/public/cpp/system",
"-third_party/blink/renderer/modules",
"+third_party/blink/renderer/modules/event_modules.h",
"+third_party/blink/renderer/modules/event_target_modules.h",
......
......@@ -58,7 +58,6 @@
#include "third_party/blink/renderer/core/workers/worker_global_scope.h"
#include "third_party/blink/renderer/modules/websockets/inspector_websocket_events.h"
#include "third_party/blink/renderer/modules/websockets/websocket_channel_client.h"
#include "third_party/blink/renderer/modules/websockets/websocket_handle_impl.h"
#include "third_party/blink/renderer/platform/loader/fetch/resource_fetcher.h"
#include "third_party/blink/renderer/platform/loader/fetch/unique_identifier.h"
#include "third_party/blink/renderer/platform/network/network_log.h"
......@@ -166,9 +165,7 @@ WebSocketChannelImpl* WebSocketChannelImpl::CreateForTesting(
std::unique_ptr<SourceLocation> location,
std::unique_ptr<WebSocketHandshakeThrottle> handshake_throttle) {
auto* channel = MakeGarbageCollected<WebSocketChannelImpl>(
execution_context, client, std::move(location),
std::make_unique<WebSocketHandleImpl>(
execution_context->GetTaskRunner(TaskType::kNetworking)));
execution_context, client, std::move(location));
channel->handshake_throttle_ = std::move(handshake_throttle);
return channel;
}
......@@ -179,9 +176,7 @@ WebSocketChannelImpl* WebSocketChannelImpl::Create(
WebSocketChannelClient* client,
std::unique_ptr<SourceLocation> location) {
auto* channel = MakeGarbageCollected<WebSocketChannelImpl>(
execution_context, client, std::move(location),
std::make_unique<WebSocketHandleImpl>(
execution_context->GetTaskRunner(TaskType::kNetworking)));
execution_context, client, std::move(location));
channel->handshake_throttle_ =
channel->GetBaseFetchContext()->CreateWebSocketHandshakeThrottle();
return channel;
......@@ -190,31 +185,32 @@ WebSocketChannelImpl* WebSocketChannelImpl::Create(
WebSocketChannelImpl::WebSocketChannelImpl(
ExecutionContext* execution_context,
WebSocketChannelClient* client,
std::unique_ptr<SourceLocation> location,
std::unique_ptr<WebSocketHandle> handle)
: handle_(std::move(handle)),
client_(client),
std::unique_ptr<SourceLocation> location)
: client_(client),
identifier_(CreateUniqueIdentifier()),
message_chunks_(execution_context->GetTaskRunner(TaskType::kNetworking)),
execution_context_(execution_context),
location_at_construction_(std::move(location)),
client_receiver_(this),
readable_watcher_(
FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
execution_context->GetTaskRunner(TaskType::kNetworking)),
file_reading_task_runner_(
execution_context->GetTaskRunner(TaskType::kFileReading)) {
if (auto* scope = DynamicTo<WorkerGlobalScope>(*execution_context_))
scope->EnsureFetcher();
}
WebSocketChannelImpl::~WebSocketChannelImpl() {
DCHECK(!blob_loader_);
}
WebSocketChannelImpl::~WebSocketChannelImpl() = default;
bool WebSocketChannelImpl::Connect(const KURL& url, const String& protocol) {
NETWORK_DVLOG(1) << this << " Connect()";
if (!handle_)
return false;
if (GetBaseFetchContext()->ShouldBlockWebSocketByMixedContentCheck(url))
if (GetBaseFetchContext()->ShouldBlockWebSocketByMixedContentCheck(url)) {
has_initiated_opening_handshake_ = false;
return false;
}
if (auto* scheduler = execution_context_->GetScheduler()) {
feature_handle_for_scheduler_ = scheduler->RegisterFeature(
......@@ -264,9 +260,16 @@ bool WebSocketChannelImpl::Connect(const KURL& url, const String& protocol) {
// a mojo connection error.
ignore_result(connector.BindNewPipeAndPassReceiver());
}
handle_->Connect(std::move(connector), url, protocols,
GetBaseFetchContext()->GetSiteForCookies(),
execution_context_->UserAgent(), this);
connector->Connect(
url, protocols, GetBaseFetchContext()->GetSiteForCookies(),
execution_context_->UserAgent(),
handshake_client_receiver_.BindNewPipeAndPassRemote(
execution_context_->GetTaskRunner(TaskType::kWebSocket)));
handshake_client_receiver_.set_disconnect_with_reason_handler(
WTF::Bind(&WebSocketChannelImpl::OnConnectionError,
WrapWeakPersistent(this), FROM_HERE));
has_initiated_opening_handshake_ = true;
if (handshake_throttle_) {
// The use of WrapWeakPersistent is safe and motivated by the fact that if
......@@ -296,7 +299,8 @@ WebSocketChannel::SendResult WebSocketChannelImpl::Send(
WebSocketOpCode::kOpCodeText, true,
message.c_str(), message.length());
if (messages_.empty() &&
MaybeSendSynchronously(WebSocketHandle::kMessageTypeText, message)) {
MaybeSendSynchronously(network::mojom::blink::WebSocketMessageType::TEXT,
message)) {
return SendResult::SENT_SYNCHRONOUSLY;
}
......@@ -341,7 +345,7 @@ WebSocketChannel::SendResult WebSocketChannelImpl::Send(
static_cast<const char*>(buffer.Data()) + byte_offset, byte_length);
if (messages_.empty() &&
MaybeSendSynchronously(
WebSocketHandle::kMessageTypeBinary,
network::mojom::blink::WebSocketMessageType::BINARY,
base::make_span(static_cast<const char*>(buffer.Data()) + byte_offset,
byte_length))) {
return SendResult::SENT_SYNCHRONOUSLY;
......@@ -362,8 +366,8 @@ WebSocketChannel::SendResult WebSocketChannelImpl::Send(
}
void WebSocketChannelImpl::Close(int code, const String& reason) {
DCHECK_EQ(GetState(), State::kOpen);
NETWORK_DVLOG(1) << this << " Close(" << code << ", " << reason << ")";
DCHECK(handle_);
uint16_t code_to_send = static_cast<uint16_t>(
code == kCloseEventCodeNotSpecified ? kCloseEventCodeNoStatusRcvd : code);
messages_.push_back(MakeGarbageCollected<Message>(code_to_send, reason));
......@@ -406,12 +410,9 @@ void WebSocketChannelImpl::Disconnect() {
"data", InspectorWebSocketEvent::Data(execution_context_, identifier_));
probe::DidCloseWebSocket(execution_context_, identifier_);
}
feature_handle_for_scheduler_.reset();
AbortAsyncOperations();
handshake_throttle_.reset();
handle_.reset();
client_ = nullptr;
identifier_ = 0;
Dispose();
}
void WebSocketChannelImpl::ApplyBackpressure() {
......@@ -420,7 +421,137 @@ void WebSocketChannelImpl::ApplyBackpressure() {
void WebSocketChannelImpl::RemoveBackpressure() {
backpressure_ = false;
handle_->ConsumePendingDataFrames();
ConsumePendingDataFrames();
}
void WebSocketChannelImpl::OnOpeningHandshakeStarted(
network::mojom::blink::WebSocketHandshakeRequestPtr request) {
DCHECK_EQ(GetState(), State::kConnecting);
NETWORK_DVLOG(1) << this << " OnOpeningHandshakeStarted("
<< request->url.GetString() << ")";
TRACE_EVENT_INSTANT1(
"devtools.timeline", "WebSocketSendHandshakeRequest",
TRACE_EVENT_SCOPE_THREAD, "data",
InspectorWebSocketEvent::Data(execution_context_, identifier_));
probe::WillSendWebSocketHandshakeRequest(execution_context_, identifier_,
request.get());
handshake_request_ = std::move(request);
}
void WebSocketChannelImpl::OnResponseReceived(
network::mojom::blink::WebSocketHandshakeResponsePtr response) {
DCHECK_EQ(GetState(), State::kConnecting);
NETWORK_DVLOG(1) << this << " OnResponseReceived("
<< response->url.GetString() << ")";
TRACE_EVENT_INSTANT1(
"devtools.timeline", "WebSocketReceiveHandshakeResponse",
TRACE_EVENT_SCOPE_THREAD, "data",
InspectorWebSocketEvent::Data(execution_context_, identifier_));
probe::DidReceiveWebSocketHandshakeResponse(execution_context_, identifier_,
handshake_request_.get(),
response.get());
handshake_request_ = nullptr;
}
void WebSocketChannelImpl::OnConnectionEstablished(
mojo::PendingRemote<network::mojom::blink::WebSocket> websocket,
mojo::PendingReceiver<network::mojom::blink::WebSocketClient>
client_receiver,
const String& protocol,
const String& extensions,
mojo::ScopedDataPipeConsumerHandle readable) {
DCHECK_EQ(GetState(), State::kConnecting);
NETWORK_DVLOG(1) << this << " OnConnectionEstablished(" << protocol << ", "
<< extensions << ")";
// From now on, we will detect mojo errors via |client_receiver_|.
handshake_client_receiver_.reset();
client_receiver_.Bind(
std::move(client_receiver),
execution_context_->GetTaskRunner(TaskType::kNetworking));
client_receiver_.set_disconnect_with_reason_handler(
WTF::Bind(&WebSocketChannelImpl::OnConnectionError,
WrapWeakPersistent(this), FROM_HERE));
DCHECK(!websocket_);
websocket_.Bind(std::move(websocket),
execution_context_->GetTaskRunner(TaskType::kNetworking));
readable_ = std::move(readable);
const MojoResult mojo_result = readable_watcher_.Watch(
readable_.get(), MOJO_HANDLE_SIGNAL_READABLE,
MOJO_WATCH_CONDITION_SATISFIED,
WTF::BindRepeating(&WebSocketChannelImpl::OnReadable,
WrapWeakPersistent(this)));
DCHECK_EQ(mojo_result, MOJO_RESULT_OK);
if (!throttle_passed_) {
connect_info_ = std::make_unique<ConnectInfo>(protocol, extensions);
return;
}
DCHECK_EQ(GetState(), State::kOpen);
websocket_->StartReceiving();
handshake_throttle_.reset();
client_->DidConnect(protocol, extensions);
}
void WebSocketChannelImpl::OnDataFrame(
bool fin,
network::mojom::blink::WebSocketMessageType type,
uint64_t data_length) {
DCHECK_EQ(GetState(), State::kOpen);
NETWORK_DVLOG(1) << this << " OnDataFrame(" << fin << ", " << type << ", "
<< "(data_length = " << data_length << "))";
pending_data_frames_.push_back(
DataFrame(fin, type, static_cast<uint32_t>(data_length)));
ConsumePendingDataFrames();
}
void WebSocketChannelImpl::AddSendFlowControlQuota(int64_t quota) {
// TODO(yhirano): This should be DCHECK_EQ(GetState(), State::kOpen).
DCHECK(GetState() == State::kOpen || GetState() == State::kConnecting);
NETWORK_DVLOG(1) << this << " AddSendFlowControlQuota(" << quota << ")";
sending_quota_ += quota;
ProcessSendQueue();
}
void WebSocketChannelImpl::OnDropChannel(bool was_clean,
uint16_t code,
const String& reason) {
// TODO(yhirano): This should be DCHECK_EQ(GetState(), State::kOpen).
DCHECK(GetState() == State::kOpen || GetState() == State::kConnecting);
NETWORK_DVLOG(1) << this << " OnDropChannel(" << was_clean << ", " << code
<< ", " << reason << ")";
if (identifier_) {
TRACE_EVENT_INSTANT1(
"devtools.timeline", "WebSocketDestroy", TRACE_EVENT_SCOPE_THREAD,
"data", InspectorWebSocketEvent::Data(execution_context_, identifier_));
probe::DidCloseWebSocket(execution_context_, identifier_);
identifier_ = 0;
}
HandleDidClose(was_clean, code, reason);
}
void WebSocketChannelImpl::OnClosingHandshake() {
DCHECK_EQ(GetState(), State::kOpen);
NETWORK_DVLOG(1) << this << " OnClosingHandshake()";
client_->DidStartClosingHandshake();
}
ExecutionContext* WebSocketChannelImpl::GetExecutionContext() {
return execution_context_;
}
void WebSocketChannelImpl::Trace(blink::Visitor* visitor) {
visitor->Trace(blob_loader_);
visitor->Trace(messages_);
visitor->Trace(client_);
visitor->Trace(execution_context_);
WebSocketChannel::Trace(visitor);
}
WebSocketChannelImpl::Message::Message(const std::string& text,
......@@ -442,14 +573,28 @@ WebSocketChannelImpl::Message::Message(DOMArrayBuffer* array_buffer,
WebSocketChannelImpl::Message::Message(uint16_t code, const String& reason)
: type(kMessageTypeClose), code(code), reason(reason) {}
WebSocketChannelImpl::State WebSocketChannelImpl::GetState() const {
if (!has_initiated_opening_handshake_) {
return State::kConnecting;
}
if (client_receiver_.is_bound() && throttle_passed_) {
return State::kOpen;
}
if (handshake_client_receiver_.is_bound() || client_receiver_.is_bound()) {
return State::kConnecting;
}
return State::kDisconnected;
}
void WebSocketChannelImpl::SendInternal(
WebSocketHandle::MessageType message_type,
network::mojom::blink::WebSocketMessageType message_type,
const char* data,
wtf_size_t total_size,
uint64_t* consumed_buffered_amount) {
WebSocketHandle::MessageType frame_type =
sent_size_of_top_message_ ? WebSocketHandle::kMessageTypeContinuation
: message_type;
network::mojom::blink::WebSocketMessageType frame_type =
sent_size_of_top_message_
? network::mojom::blink::WebSocketMessageType::CONTINUATION
: message_type;
DCHECK_GE(total_size, sent_size_of_top_message_);
// The first cast is safe since the result of min() never exceeds
// the range of wtf_size_t. The second cast is necessary to compile
......@@ -476,23 +621,27 @@ void WebSocketChannelImpl::SendInternal(
}
void WebSocketChannelImpl::SendAndAdjustQuota(
bool final,
WebSocketHandle::MessageType frame_type,
bool fin,
network::mojom::blink::WebSocketMessageType type,
base::span<const char> data,
uint64_t* consumed_buffered_amount) {
const auto size = data.size();
// TODO(darin): Avoid this copy.
Vector<uint8_t> data_to_pass;
// This cast is always valid because the data size is limited by
// sending_quota_, which is controlled by the browser process and in practice
// is always much smaller than 4GB.
// TODO(ricea): Change the type of sending_quota_ to wtf_size_t.
handle_->Send(final, frame_type, data.data(), static_cast<wtf_size_t>(size));
sending_quota_ -= size;
*consumed_buffered_amount += size;
data_to_pass.ReserveInitialCapacity(static_cast<wtf_size_t>(data.size()));
data_to_pass.Append(data.data(), static_cast<wtf_size_t>(data.size()));
websocket_->SendFrame(fin, type, data_to_pass);
sending_quota_ -= data.size();
*consumed_buffered_amount += data.size();
}
bool WebSocketChannelImpl::MaybeSendSynchronously(
WebSocketHandle::MessageType frame_type,
network::mojom::blink::WebSocketMessageType frame_type,
base::span<const char> data) {
DCHECK(messages_.empty());
if (data.size() > sending_quota_)
......@@ -507,7 +656,8 @@ bool WebSocketChannelImpl::MaybeSendSynchronously(
}
void WebSocketChannelImpl::ProcessSendQueue() {
DCHECK(handle_);
// TODO(yhirano): This should be DCHECK_EQ(GetState(), State::kOpen).
DCHECK(GetState() == State::kOpen || GetState() == State::kConnecting);
uint64_t consumed_buffered_amount = 0;
while (!messages_.IsEmpty() && !blob_loader_) {
Message* message = messages_.front().Get();
......@@ -516,7 +666,8 @@ void WebSocketChannelImpl::ProcessSendQueue() {
break;
switch (message->type) {
case kMessageTypeText:
SendInternal(WebSocketHandle::kMessageTypeText, message->text.data(),
SendInternal(network::mojom::blink::WebSocketMessageType::TEXT,
message->text.data(),
static_cast<wtf_size_t>(message->text.length()),
&consumed_buffered_amount);
break;
......@@ -529,7 +680,7 @@ void WebSocketChannelImpl::ProcessSendQueue() {
break;
case kMessageTypeArrayBuffer:
CHECK(message->array_buffer);
SendInternal(WebSocketHandle::kMessageTypeBinary,
SendInternal(network::mojom::blink::WebSocketMessageType::BINARY,
static_cast<const char*>(message->array_buffer->Data()),
message->array_buffer->ByteLength(),
&consumed_buffered_amount);
......@@ -539,7 +690,9 @@ void WebSocketChannelImpl::ProcessSendQueue() {
DCHECK_EQ(messages_.size(), 1u);
DCHECK_EQ(sent_size_of_top_message_, 0u);
handshake_throttle_.reset();
handle_->Close(message->code, message->reason);
websocket_->StartClosingHandshake(
message->code,
message->reason.IsNull() ? g_empty_string : message->reason);
messages_.pop_front();
break;
}
......@@ -559,121 +712,172 @@ void WebSocketChannelImpl::AbortAsyncOperations() {
void WebSocketChannelImpl::HandleDidClose(bool was_clean,
uint16_t code,
const String& reason) {
handshake_throttle_.reset();
handle_.reset();
AbortAsyncOperations();
if (!client_) {
return;
}
WebSocketChannelClient* client = client_;
client_ = nullptr;
DCHECK_NE(GetState(), State::kDisconnected);
WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
was_clean ? WebSocketChannelClient::kClosingHandshakeComplete
: WebSocketChannelClient::kClosingHandshakeIncomplete;
client->DidClose(status, code, reason);
client_->DidClose(status, code, reason);
AbortAsyncOperations();
Dispose();
}
void WebSocketChannelImpl::DidConnect(WebSocketHandle* handle,
const String& selected_protocol,
const String& extensions) {
NETWORK_DVLOG(1) << this << " DidConnect(" << handle << ", "
<< String(selected_protocol) << ", " << String(extensions)
<< "), throttle_passed_?" << throttle_passed_;
DCHECK(handle_);
DCHECK_EQ(handle, handle_.get());
DCHECK(client_);
void WebSocketChannelImpl::OnCompletion(
const base::Optional<WebString>& console_message) {
DCHECK(!throttle_passed_);
DCHECK(handshake_throttle_);
handshake_throttle_ = nullptr;
if (!throttle_passed_) {
connect_info_ =
std::make_unique<ConnectInfo>(selected_protocol, extensions);
if (GetState() == State::kDisconnected) {
return;
}
DCHECK_EQ(GetState(), State::kConnecting);
if (console_message) {
FailAsError(*console_message);
return;
}
handle_->StartReceiving();
throttle_passed_ = true;
if (connect_info_) {
websocket_->StartReceiving();
client_->DidConnect(std::move(connect_info_->selected_protocol),
std::move(connect_info_->extensions));
connect_info_.reset();
DCHECK_EQ(GetState(), State::kOpen);
}
}
handshake_throttle_.reset();
void WebSocketChannelImpl::DidFinishLoadingBlob(DOMArrayBuffer* buffer) {
DCHECK_EQ(GetState(), State::kOpen);
client_->DidConnect(selected_protocol, extensions);
blob_loader_.Clear();
// The loaded blob is always placed on |messages_[0]|.
DCHECK_GT(messages_.size(), 0u);
DCHECK_EQ(messages_.front()->type, kMessageTypeBlob);
// We replace it with the loaded blob.
messages_.front() =
MakeGarbageCollected<Message>(buffer, base::OnceClosure());
ProcessSendQueue();
}
void WebSocketChannelImpl::DidStartOpeningHandshake(
WebSocketHandle* handle,
network::mojom::blink::WebSocketHandshakeRequestPtr request) {
NETWORK_DVLOG(1) << this << " DidStartOpeningHandshake(" << handle << ")";
DCHECK(handle_);
DCHECK_EQ(handle, handle_.get());
void WebSocketChannelImpl::DidFailLoadingBlob(FileErrorCode error_code) {
DCHECK_EQ(GetState(), State::kOpen);
TRACE_EVENT_INSTANT1(
"devtools.timeline", "WebSocketSendHandshakeRequest",
TRACE_EVENT_SCOPE_THREAD, "data",
InspectorWebSocketEvent::Data(execution_context_, identifier_));
probe::WillSendWebSocketHandshakeRequest(execution_context_, identifier_,
request.get());
handshake_request_ = std::move(request);
blob_loader_.Clear();
if (error_code == FileErrorCode::kAbortErr) {
// The error is caused by cancel().
return;
}
// FIXME: Generate human-friendly reason message.
FailAsError("Failed to load Blob: error code = " +
String::Number(static_cast<unsigned>(error_code)));
}
void WebSocketChannelImpl::DidFinishOpeningHandshake(
WebSocketHandle* handle,
network::mojom::blink::WebSocketHandshakeResponsePtr response) {
NETWORK_DVLOG(1) << this << " DidFinishOpeningHandshake(" << handle << ")";
DCHECK(handle_);
DCHECK_EQ(handle, handle_.get());
void WebSocketChannelImpl::TearDownFailedConnection() {
if (GetState() == State::kDisconnected) {
return;
}
client_->DidError();
if (GetState() == State::kDisconnected) {
return;
}
HandleDidClose(false, kCloseEventCodeAbnormalClosure, String());
}
TRACE_EVENT_INSTANT1(
"devtools.timeline", "WebSocketReceiveHandshakeResponse",
TRACE_EVENT_SCOPE_THREAD, "data",
InspectorWebSocketEvent::Data(execution_context_, identifier_));
probe::DidReceiveWebSocketHandshakeResponse(execution_context_, identifier_,
handshake_request_.get(),
response.get());
handshake_request_ = nullptr;
bool WebSocketChannelImpl::ShouldDisallowConnection(const KURL& url) {
SubresourceFilter* subresource_filter =
GetBaseFetchContext()->GetSubresourceFilter();
if (!subresource_filter)
return false;
return !subresource_filter->AllowWebSocketConnection(url);
}
void WebSocketChannelImpl::DidFail(WebSocketHandle* handle,
const String& message) {
NETWORK_DVLOG(1) << this << " DidFail(" << handle << ", " << String(message)
<< ")";
BaseFetchContext* WebSocketChannelImpl::GetBaseFetchContext() const {
ResourceFetcher* resource_fetcher = execution_context_->Fetcher();
return static_cast<BaseFetchContext*>(&resource_fetcher->Context());
}
feature_handle_for_scheduler_.reset();
void WebSocketChannelImpl::OnReadable(MojoResult result,
const mojo::HandleSignalsState& state) {
DCHECK_EQ(GetState(), State::kOpen);
NETWORK_DVLOG(2) << this << " OnReadable mojo_result=" << result;
if (result != MOJO_RESULT_OK) {
// We don't detect mojo errors on data pipe. Mojo connection errors will
// be detected via |client_receiver_|.
return;
}
ConsumePendingDataFrames();
}
void WebSocketChannelImpl::ConsumePendingDataFrames() {
DCHECK_EQ(GetState(), State::kOpen);
while (!pending_data_frames_.empty() && !backpressure_ &&
GetState() == State::kOpen) {
DataFrame& data_frame = pending_data_frames_.front();
NETWORK_DVLOG(2) << " ConsumePendingDataFrame frame=(" << data_frame.fin
<< ", " << data_frame.type
<< ", (data_length = " << data_frame.data_length << "))";
if (data_frame.data_length == 0) {
ConsumeDataFrame(data_frame.fin, data_frame.type, nullptr, 0);
pending_data_frames_.pop_front();
continue;
}
DCHECK(handle_);
DCHECK_EQ(handle, handle_.get());
const void* buffer;
uint32_t readable_size;
const MojoResult begin_result = readable_->BeginReadData(
&buffer, &readable_size, MOJO_READ_DATA_FLAG_NONE);
if (begin_result == MOJO_RESULT_SHOULD_WAIT) {
readable_watcher_.ArmOrNotify();
return;
}
if (begin_result == MOJO_RESULT_FAILED_PRECONDITION) {
// |client_receiver_| will catch the connection error.
return;
}
DCHECK_EQ(begin_result, MOJO_RESULT_OK);
if (readable_size >= data_frame.data_length) {
ConsumeDataFrame(data_frame.fin, data_frame.type,
static_cast<const char*>(buffer),
data_frame.data_length);
const MojoResult end_result =
readable_->EndReadData(data_frame.data_length);
DCHECK_EQ(end_result, MOJO_RESULT_OK);
pending_data_frames_.pop_front();
continue;
}
// This function is called when the browser is required to fail the
// WebSocketConnection. Hence we fail this channel by calling
// |this->failAsError| function.
FailAsError(message);
DCHECK_LT(readable_size, data_frame.data_length);
ConsumeDataFrame(false, data_frame.type, static_cast<const char*>(buffer),
readable_size);
const MojoResult end_result = readable_->EndReadData(readable_size);
DCHECK_EQ(end_result, MOJO_RESULT_OK);
data_frame.type = network::mojom::blink::WebSocketMessageType::CONTINUATION;
data_frame.data_length -= readable_size;
}
}
void WebSocketChannelImpl::DidReceiveData(WebSocketHandle* handle,
bool fin,
WebSocketHandle::MessageType type,
const char* data,
size_t size) {
NETWORK_DVLOG(1) << this << " DidReceiveData(" << handle << ", " << fin
<< ", " << type << ", (" << static_cast<const void*>(data)
<< ", " << size << "))";
void WebSocketChannelImpl::ConsumeDataFrame(
bool fin,
network::mojom::blink::WebSocketMessageType type,
const char* data,
size_t size) {
DCHECK_EQ(GetState(), State::kOpen);
DCHECK(!backpressure_);
DCHECK(handle_);
DCHECK_EQ(handle, handle_.get());
DCHECK(client_);
// Non-final frames cannot be empty.
DCHECK(fin || size);
DCHECK(fin || size > 0);
switch (type) {
case WebSocketHandle::kMessageTypeText:
case network::mojom::blink::WebSocketMessageType::CONTINUATION:
break;
case network::mojom::blink::WebSocketMessageType::TEXT:
DCHECK_EQ(message_chunks_.GetSize(), 0u);
receiving_message_type_is_text_ = true;
break;
case WebSocketHandle::kMessageTypeBinary:
case network::mojom::blink::WebSocketMessageType::BINARY:
DCHECK_EQ(message_chunks_.GetSize(), 0u);
receiving_message_type_is_text_ = false;
break;
case WebSocketHandle::kMessageTypeContinuation:
break;
}
const size_t message_size_so_far = message_chunks_.GetSize();
......@@ -725,132 +929,34 @@ void WebSocketChannelImpl::DidReceiveData(WebSocketHandle* handle,
message_chunks_.Clear();
}
void WebSocketChannelImpl::DidClose(WebSocketHandle* handle,
bool was_clean,
uint16_t code,
const String& reason) {
NETWORK_DVLOG(1) << this << " DidClose(" << handle << ", " << was_clean
<< ", " << code << ", " << String(reason) << ")";
feature_handle_for_scheduler_.reset();
DCHECK(handle_);
DCHECK_EQ(handle, handle_.get());
handle_.reset();
if (identifier_) {
TRACE_EVENT_INSTANT1(
"devtools.timeline", "WebSocketDestroy", TRACE_EVENT_SCOPE_THREAD,
"data", InspectorWebSocketEvent::Data(execution_context_, identifier_));
probe::DidCloseWebSocket(execution_context_, identifier_);
identifier_ = 0;
}
HandleDidClose(was_clean, code, reason);
}
void WebSocketChannelImpl::AddSendFlowControlQuota(WebSocketHandle* handle,
int64_t quota) {
NETWORK_DVLOG(1) << this << " AddSendFlowControlQuota(" << handle << ", "
<< quota << ")";
DCHECK(handle_);
DCHECK_EQ(handle, handle_.get());
DCHECK_GE(quota, 0);
sending_quota_ += quota;
ProcessSendQueue();
}
void WebSocketChannelImpl::DidStartClosingHandshake(WebSocketHandle* handle) {
NETWORK_DVLOG(1) << this << " DidStartClosingHandshake(" << handle << ")";
DCHECK(handle_);
DCHECK_EQ(handle, handle_.get());
if (client_)
client_->DidStartClosingHandshake();
}
void WebSocketChannelImpl::OnCompletion(
const base::Optional<WebString>& console_message) {
DCHECK(!throttle_passed_);
DCHECK(handshake_throttle_);
handshake_throttle_ = nullptr;
if (console_message) {
FailAsError(*console_message);
return;
void WebSocketChannelImpl::OnConnectionError(const base::Location& set_from,
uint32_t custom_reason,
const std::string& description) {
DCHECK_NE(GetState(), State::kDisconnected);
NETWORK_DVLOG(1) << " OnConnectionError("
<< " reason: " << custom_reason
<< ", description:" << description
<< "), set_from:" << set_from.ToString();
String message = "Unknown reason";
if (custom_reason == network::mojom::blink::WebSocket::kInternalFailure) {
message = String::FromUTF8(description.c_str(), description.size());
}
throttle_passed_ = true;
if (connect_info_) {
handle_->StartReceiving();
client_->DidConnect(std::move(connect_info_->selected_protocol),
std::move(connect_info_->extensions));
connect_info_.reset();
}
}
void WebSocketChannelImpl::DidFinishLoadingBlob(DOMArrayBuffer* buffer) {
blob_loader_.Clear();
DCHECK(handle_);
// The loaded blob is always placed on |messages_[0]|.
DCHECK_GT(messages_.size(), 0u);
DCHECK_EQ(messages_.front()->type, kMessageTypeBlob);
// We replace it with the loaded blob.
messages_.front() =
MakeGarbageCollected<Message>(buffer, base::OnceClosure());
ProcessSendQueue();
}
void WebSocketChannelImpl::DidFailLoadingBlob(FileErrorCode error_code) {
blob_loader_.Clear();
if (error_code == FileErrorCode::kAbortErr) {
// The error is caused by cancel().
return;
}
// FIXME: Generate human-friendly reason message.
FailAsError("Failed to load Blob: error code = " +
String::Number(static_cast<unsigned>(error_code)));
// This function is called when the implementation in the network service is
// required to fail the WebSocket connection. Hence we fail this channel by
// calling FailAsError function.
FailAsError(message);
}
void WebSocketChannelImpl::TearDownFailedConnection() {
// |handle_| and |client_| can be null here.
void WebSocketChannelImpl::Dispose() {
has_initiated_opening_handshake_ = true;
feature_handle_for_scheduler_.reset();
handshake_throttle_.reset();
if (client_)
client_->DidError();
HandleDidClose(false, kCloseEventCodeAbnormalClosure, String());
}
bool WebSocketChannelImpl::ShouldDisallowConnection(const KURL& url) {
DCHECK(handle_);
SubresourceFilter* subresource_filter =
GetBaseFetchContext()->GetSubresourceFilter();
if (!subresource_filter)
return false;
return !subresource_filter->AllowWebSocketConnection(url);
}
BaseFetchContext* WebSocketChannelImpl::GetBaseFetchContext() const {
ResourceFetcher* resource_fetcher = execution_context_->Fetcher();
return static_cast<BaseFetchContext*>(&resource_fetcher->Context());
}
ExecutionContext* WebSocketChannelImpl::GetExecutionContext() {
return execution_context_;
}
void WebSocketChannelImpl::Trace(blink::Visitor* visitor) {
visitor->Trace(blob_loader_);
visitor->Trace(messages_);
visitor->Trace(client_);
visitor->Trace(execution_context_);
WebSocketChannel::Trace(visitor);
websocket_.reset();
readable_watcher_.Cancel();
handshake_client_receiver_.reset();
client_receiver_.reset();
identifier_ = 0;
}
std::ostream& operator<<(std::ostream& ostream,
......
......@@ -36,12 +36,17 @@
#include <utility>
#include "base/containers/span.h"
#include "base/memory/scoped_refptr.h"
#include "mojo/public/cpp/bindings/pending_receiver.h"
#include "mojo/public/cpp/bindings/pending_remote.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "services/network/public/mojom/websocket.mojom-blink.h"
#include "third_party/blink/public/mojom/websockets/websocket_connector.mojom-blink.h"
#include "third_party/blink/renderer/bindings/core/v8/source_location.h"
#include "third_party/blink/renderer/core/fileapi/blob.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/modules/websockets/websocket_channel.h"
#include "third_party/blink/renderer/modules/websockets/websocket_handle.h"
#include "third_party/blink/renderer/modules/websockets/websocket_message_chunk_accumulator.h"
#include "third_party/blink/renderer/platform/heap/handle.h"
#include "third_party/blink/renderer/platform/scheduler/public/frame_scheduler.h"
......@@ -61,7 +66,12 @@ class WebSocketHandshakeThrottle;
// This is an implementation of WebSocketChannel. This is created on the main
// thread for Document, or on the worker thread for WorkerGlobalScope. All
// functions must be called on the execution context's thread.
class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
class MODULES_EXPORT WebSocketChannelImpl final
: public WebSocketChannel,
public network::mojom::blink::WebSocketHandshakeClient,
public network::mojom::blink::WebSocketClient {
USING_PRE_FINALIZER(WebSocketChannelImpl, Dispose);
public:
// You can specify the source file and the line number information
// explicitly by passing the last parameter.
......@@ -78,8 +88,7 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
WebSocketChannelImpl(ExecutionContext*,
WebSocketChannelClient*,
std::unique_ptr<SourceLocation>,
std::unique_ptr<WebSocketHandle>);
std::unique_ptr<SourceLocation>);
~WebSocketChannelImpl() override;
// WebSocketChannel functions.
......@@ -101,59 +110,45 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
void ApplyBackpressure() override;
void RemoveBackpressure() override;
ExecutionContext* GetExecutionContext();
// network::mojom::blink::WebSocketHandshakeClient methods:
void OnOpeningHandshakeStarted(
network::mojom::blink::WebSocketHandshakeRequestPtr) override;
void OnResponseReceived(
network::mojom::blink::WebSocketHandshakeResponsePtr) override;
void OnConnectionEstablished(
mojo::PendingRemote<network::mojom::blink::WebSocket> websocket,
mojo::PendingReceiver<network::mojom::blink::WebSocketClient>
client_receiver,
const String& selected_protocol,
const String& extensions,
mojo::ScopedDataPipeConsumerHandle readable) override;
// network::mojom::blink::WebSocketClient methods:
void OnDataFrame(bool fin,
network::mojom::blink::WebSocketMessageType,
uint64_t data_length) override;
void AddSendFlowControlQuota(int64_t quota) override;
void OnDropChannel(bool was_clean,
uint16_t code,
const String& reason) override;
void OnClosingHandshake() override;
// Called when the handle is opened.
void DidConnect(WebSocketHandle* handle,
const String& selected_protocol,
const String& extensions);
// Called when the browser starts the opening handshake.
// This notification can be omitted when the inspector is not active.
void DidStartOpeningHandshake(
WebSocketHandle*,
network::mojom::blink::WebSocketHandshakeRequestPtr);
// Called when the browser finishes the opening handshake.
// This notification precedes didConnect.
// This notification can be omitted when the inspector is not active.
void DidFinishOpeningHandshake(
WebSocketHandle*,
network::mojom::blink::WebSocketHandshakeResponsePtr);
// Called when the browser is required to fail the connection.
// |message| can be displayed in the inspector, but should not be passed
// to scripts.
// This message also implies that channel is closed with
// (wasClean = false, code = 1006, reason = "") and
// |handle| becomes unavailable.
void DidFail(WebSocketHandle*, const String& message);
// Called when data are received.
void DidReceiveData(WebSocketHandle*,
bool fin,
WebSocketHandle::MessageType,
const char* data,
size_t);
bool HasBackPressureToReceiveData() { return backpressure_; }
// Called when the handle is closed.
// |handle| becomes unavailable once this notification arrives.
void DidClose(WebSocketHandle* handle,
bool was_clean,
uint16_t code,
const String& reason);
void AddSendFlowControlQuota(WebSocketHandle*, int64_t quota);
// Called when the browser receives a Close frame from the remote
// server. Not called when the renderer initiates the closing handshake.
void DidStartClosingHandshake(WebSocketHandle*);
bool IsHandleAlive() const { return handle_.get(); }
ExecutionContext* GetExecutionContext();
void Trace(blink::Visitor*) override;
private:
struct DataFrame final {
DataFrame(bool fin,
network::mojom::blink::WebSocketMessageType type,
uint32_t data_length)
: fin(fin), type(type), data_length(data_length) {}
bool fin;
network::mojom::blink::WebSocketMessageType type;
uint32_t data_length;
};
friend class WebSocketChannelImplHandshakeThrottleTest;
FRIEND_TEST_ALL_PREFIXES(WebSocketChannelImplHandshakeThrottleTest,
ThrottleSucceedsFirst);
......@@ -180,15 +175,31 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
Vector<char> data;
};
void SendInternal(WebSocketHandle::MessageType,
// The state is defined to see the conceptual state more clearly than checking
// various members (for DCHECKs for example). This is only used internally.
enum class State {
// The channel is running an opening handshake. This is the initial state.
// It becomes |kOpen| when the connection is established. It becomes
// |kDisconnected| when detecting an error.
kConnecting,
// The channel is ready to send / receive messages. It becomes
// |kDisconnected| when the connection is closed or when an error happens.
kOpen,
// The channel is not ready for communication. The channel stays in this
// state forever.
kDisconnected,
};
State GetState() const;
void SendInternal(network::mojom::blink::WebSocketMessageType,
const char* data,
wtf_size_t total_size,
uint64_t* consumed_buffered_amount);
void SendAndAdjustQuota(bool final,
WebSocketHandle::MessageType,
network::mojom::blink::WebSocketMessageType,
base::span<const char>,
uint64_t* consumed_buffered_amount);
bool MaybeSendSynchronously(WebSocketHandle::MessageType,
bool MaybeSendSynchronously(network::mojom::blink::WebSocketMessageType,
base::span<const char>);
void ProcessSendQueue();
void FailAsError(const String& reason) {
......@@ -210,13 +221,19 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
BaseFetchContext* GetBaseFetchContext() const;
// |handle_| is a handle of the connection.
// |handle_| == nullptr means this channel is closed.
std::unique_ptr<WebSocketHandle> handle_;
// |client_| can be deleted while this channel is alive, but this class
// expects that disconnect() is called before the deletion.
Member<WebSocketChannelClient> client_;
// Called when |readable_| becomes readable.
void OnReadable(MojoResult result, const mojo::HandleSignalsState& state);
void ConsumePendingDataFrames();
void ConsumeDataFrame(bool fin,
network::mojom::blink::WebSocketMessageType type,
const char* data,
size_t data_size);
void OnConnectionError(const base::Location& set_from,
uint32_t custom_reason,
const std::string& description);
void Dispose();
const Member<WebSocketChannelClient> client_;
KURL url_;
uint64_t identifier_;
Member<BlobLoader> blob_loader_;
......@@ -227,6 +244,7 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
bool backpressure_ = false;
bool receiving_message_type_is_text_ = false;
bool throttle_passed_ = false;
bool has_initiated_opening_handshake_ = false;
uint64_t sending_quota_ = 0;
wtf_size_t sent_size_of_top_message_ = 0;
FrameScheduler::SchedulingAffectingFeatureHandle
......@@ -239,6 +257,15 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
// throttle response when DidConnect is called.
std::unique_ptr<ConnectInfo> connect_info_;
mojo::Remote<network::mojom::blink::WebSocket> websocket_;
mojo::Receiver<network::mojom::blink::WebSocketHandshakeClient>
handshake_client_receiver_{this};
mojo::Receiver<network::mojom::blink::WebSocketClient> client_receiver_;
mojo::ScopedDataPipeConsumerHandle readable_;
mojo::SimpleWatcher readable_watcher_;
WTF::Deque<DataFrame> pending_data_frames_;
const scoped_refptr<base::SingleThreadTaskRunner> file_reading_task_runner_;
};
......
......@@ -1354,9 +1354,9 @@ TEST_F(WebSocketChannelImplHandshakeThrottleTest, FailDuringThrottle) {
{
InSequence s;
EXPECT_CALL(*raw_handshake_throttle_, ThrottleHandshake(_, _));
EXPECT_CALL(*raw_handshake_throttle_, Destructor());
EXPECT_CALL(*ChannelClient(), DidError());
EXPECT_CALL(*ChannelClient(), DidClose(_, _, _));
EXPECT_CALL(*raw_handshake_throttle_, Destructor());
EXPECT_CALL(checkpoint, Call(1));
}
......@@ -1375,9 +1375,9 @@ TEST_F(WebSocketChannelImplHandshakeThrottleTest,
{
InSequence s;
EXPECT_CALL(*raw_handshake_throttle_, ThrottleHandshake(_, _));
EXPECT_CALL(*raw_handshake_throttle_, Destructor());
EXPECT_CALL(*ChannelClient(), DidError());
EXPECT_CALL(*ChannelClient(), DidClose(_, _, _));
EXPECT_CALL(*raw_handshake_throttle_, Destructor());
EXPECT_CALL(checkpoint, Call(1));
}
......@@ -1485,9 +1485,9 @@ TEST_F(WebSocketChannelImplHandshakeThrottleTest, ConnectFailBeforeThrottle) {
{
InSequence s;
EXPECT_CALL(*raw_handshake_throttle_, ThrottleHandshake(_, _));
EXPECT_CALL(*raw_handshake_throttle_, Destructor());
EXPECT_CALL(*ChannelClient(), DidError());
EXPECT_CALL(*ChannelClient(), DidClose(_, _, _));
EXPECT_CALL(*raw_handshake_throttle_, Destructor());
}
ASSERT_TRUE(Channel()->Connect(url(), ""));
......
/*
* Copyright (C) 2013 Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_WEBSOCKETS_WEBSOCKET_HANDLE_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_WEBSOCKETS_WEBSOCKET_HANDLE_H_
#include <stdint.h>
#include "mojo/public/cpp/bindings/remote.h"
#include "services/network/public/mojom/websocket.mojom-blink.h"
#include "third_party/blink/public/mojom/websockets/websocket_connector.mojom-blink.h"
#include "third_party/blink/renderer/platform/wtf/forward.h"
#include "third_party/blink/renderer/platform/wtf/vector.h"
#include "third_party/blink/renderer/platform/wtf/wtf_size_t.h"
namespace blink {
class KURL;
class WebSocketChannelImpl;
// WebSocketHandle is an interface class designed to be a handle of WebSocket
// connection. WebSocketHandle will be used together with
// WebSocketChannelImpl.
//
// Once a WebSocketHandle is deleted there will be no notification to the
// corresponding WebSocketChannelImpl. Once a WebSocketChannelImpl receives
// didClose, any method of the corresponding WebSocketHandle can't be called.
class WebSocketHandle {
public:
enum MessageType {
kMessageTypeContinuation,
kMessageTypeText,
kMessageTypeBinary,
};
virtual ~WebSocketHandle() = default;
virtual void Connect(mojo::Remote<mojom::blink::WebSocketConnector>,
const KURL&,
const Vector<String>& protocols,
const KURL& site_for_cookies,
const String& user_agent_override,
WebSocketChannelImpl*) = 0;
virtual void Send(bool fin, MessageType, const char* data, wtf_size_t) = 0;
virtual void StartReceiving() = 0;
virtual void ConsumePendingDataFrames() = 0;
virtual void Close(uint16_t code, const String& reason) = 0;
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_MODULES_WEBSOCKETS_WEBSOCKET_HANDLE_H_
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/websockets/websocket_handle_impl.h"
#include "base/location.h"
#include "base/single_thread_task_runner.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "third_party/blink/public/platform/platform.h"
#include "third_party/blink/renderer/modules/websockets/websocket_channel_impl.h"
#include "third_party/blink/renderer/platform/network/http_names.h"
#include "third_party/blink/renderer/platform/network/network_log.h"
#include "third_party/blink/renderer/platform/scheduler/public/thread_scheduler.h"
#include "third_party/blink/renderer/platform/weborigin/kurl.h"
#include "third_party/blink/renderer/platform/wtf/functional.h"
#include "third_party/blink/renderer/platform/wtf/text/wtf_string.h"
namespace blink {
namespace {
const uint16_t kAbnormalShutdownOpCode = 1006;
} // namespace
WebSocketHandleImpl::WebSocketHandleImpl(
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: task_runner_(std::move(task_runner)),
channel_(nullptr),
readable_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
task_runner_) {
NETWORK_DVLOG(1) << this << " created";
}
WebSocketHandleImpl::~WebSocketHandleImpl() {
NETWORK_DVLOG(1) << this << " deleted";
if (websocket_)
websocket_->StartClosingHandshake(kAbnormalShutdownOpCode, g_empty_string);
}
void WebSocketHandleImpl::Connect(
mojo::Remote<mojom::blink::WebSocketConnector> connector,
const KURL& url,
const Vector<String>& protocols,
const KURL& site_for_cookies,
const String& user_agent_override,
WebSocketChannelImpl* channel) {
NETWORK_DVLOG(1) << this << " connect(" << url.GetString() << ")";
DCHECK(!channel_);
DCHECK(channel);
channel_ = channel;
connector->Connect(
url, protocols, site_for_cookies, user_agent_override,
handshake_client_receiver_.BindNewPipeAndPassRemote(task_runner_));
handshake_client_receiver_.set_disconnect_with_reason_handler(
WTF::Bind(&WebSocketHandleImpl::OnConnectionError, WTF::Unretained(this),
FROM_HERE));
}
void WebSocketHandleImpl::Send(bool fin,
WebSocketHandle::MessageType type,
const char* data,
wtf_size_t size) {
DCHECK(websocket_);
network::mojom::blink::WebSocketMessageType type_to_pass;
switch (type) {
case WebSocketHandle::kMessageTypeContinuation:
type_to_pass = network::mojom::blink::WebSocketMessageType::CONTINUATION;
break;
case WebSocketHandle::kMessageTypeText:
type_to_pass = network::mojom::blink::WebSocketMessageType::TEXT;
break;
case WebSocketHandle::kMessageTypeBinary:
type_to_pass = network::mojom::blink::WebSocketMessageType::BINARY;
break;
default:
NOTREACHED();
return;
}
NETWORK_DVLOG(1) << this << " send(" << fin << ", " << type_to_pass << ", "
<< "(data size = " << size << "))";
// TODO(darin): Avoid this copy.
Vector<uint8_t> data_to_pass(size);
std::copy(data, data + size, data_to_pass.begin());
websocket_->SendFrame(fin, type_to_pass, data_to_pass);
}
void WebSocketHandleImpl::StartReceiving() {
websocket_->StartReceiving();
}
void WebSocketHandleImpl::Close(uint16_t code, const String& reason) {
DCHECK(websocket_);
NETWORK_DVLOG(1) << this << " close(" << code << ", " << reason << ")";
websocket_->StartClosingHandshake(code,
reason.IsNull() ? g_empty_string : reason);
}
void WebSocketHandleImpl::Disconnect() {
websocket_.reset();
channel_ = nullptr;
}
void WebSocketHandleImpl::OnConnectionError(const base::Location& set_from,
uint32_t custom_reason,
const std::string& description) {
NETWORK_DVLOG(1) << " OnConnectionError("
<< " reason: " << custom_reason
<< ", description:" << description
<< "), set_from:" << set_from.ToString();
String message = "Unknown reason";
if (custom_reason == network::mojom::blink::WebSocket::kInternalFailure) {
message = String::FromUTF8(description.c_str(), description.size());
}
WebSocketChannelImpl* channel = channel_;
Disconnect();
if (!channel)
return;
channel->DidFail(this, message);
// |this| can be deleted here.
}
void WebSocketHandleImpl::OnOpeningHandshakeStarted(
network::mojom::blink::WebSocketHandshakeRequestPtr request) {
NETWORK_DVLOG(1) << this << " OnOpeningHandshakeStarted("
<< request->url.GetString() << ")";
channel_->DidStartOpeningHandshake(this, std::move(request));
}
void WebSocketHandleImpl::OnResponseReceived(
network::mojom::blink::WebSocketHandshakeResponsePtr response) {
NETWORK_DVLOG(1) << this << " OnResponseReceived("
<< response->url.GetString() << ")";
channel_->DidFinishOpeningHandshake(this, std::move(response));
}
void WebSocketHandleImpl::OnConnectionEstablished(
mojo::PendingRemote<network::mojom::blink::WebSocket> websocket,
mojo::PendingReceiver<network::mojom::blink::WebSocketClient>
client_receiver,
const String& protocol,
const String& extensions,
mojo::ScopedDataPipeConsumerHandle readable) {
NETWORK_DVLOG(1) << this << " OnConnectionEstablished(" << protocol << ", "
<< extensions << ")";
if (!channel_)
return;
// From now on, we will detect mojo errors via |client_receiver_|.
handshake_client_receiver_.reset();
client_receiver_.Bind(std::move(client_receiver), task_runner_);
client_receiver_.set_disconnect_with_reason_handler(
WTF::Bind(&WebSocketHandleImpl::OnConnectionError, WTF::Unretained(this),
FROM_HERE));
DCHECK(!websocket_);
websocket_.Bind(std::move(websocket));
readable_ = std::move(readable);
const MojoResult mojo_result = readable_watcher_.Watch(
readable_.get(), MOJO_HANDLE_SIGNAL_READABLE,
MOJO_WATCH_CONDITION_SATISFIED,
WTF::BindRepeating(&WebSocketHandleImpl::OnReadable,
WTF::Unretained(this)));
DCHECK_EQ(mojo_result, MOJO_RESULT_OK);
channel_->DidConnect(this, protocol, extensions);
// |this| can be deleted here.
}
void WebSocketHandleImpl::OnReadable(MojoResult result,
const mojo::HandleSignalsState& state) {
NETWORK_DVLOG(2) << this << " OnReadble mojo_result=" << result;
if (result != MOJO_RESULT_OK) {
if (channel_) {
channel_->DidFail(this, "Unknown reason");
} else {
Disconnect();
}
return;
}
ConsumePendingDataFrames();
}
void WebSocketHandleImpl::OnDataFrame(
bool fin,
network::mojom::blink::WebSocketMessageType type,
uint64_t data_length) {
NETWORK_DVLOG(1) << this << " OnDataFrame(" << fin << ", " << type << ", "
<< "(data_length = " << data_length << "))";
pending_data_frames_.push_back(
DataFrame(fin, type, static_cast<uint32_t>(data_length)));
ConsumePendingDataFrames();
}
void WebSocketHandleImpl::ConsumePendingDataFrames() {
DCHECK(channel_);
if (channel_->HasBackPressureToReceiveData())
return;
while (!pending_data_frames_.empty() &&
!channel_->HasBackPressureToReceiveData()) {
DataFrame& data_frame = pending_data_frames_.front();
NETWORK_DVLOG(2) << " ConsumePendingDataFrame frame=(" << data_frame.fin
<< ", " << data_frame.type
<< ", (data_length = " << data_frame.data_length << "))";
if (data_frame.data_length == 0) {
if (!ConsumeDataFrame(data_frame.fin, data_frame.type, nullptr, 0)) {
// |this| is deleted.
return;
}
pending_data_frames_.pop_front();
continue;
}
const void* buffer;
uint32_t readable_size;
const MojoResult begin_result = readable_->BeginReadData(
&buffer, &readable_size, MOJO_READ_DATA_FLAG_NONE);
if (begin_result == MOJO_RESULT_SHOULD_WAIT) {
readable_watcher_.ArmOrNotify();
return;
}
if (begin_result == MOJO_RESULT_FAILED_PRECONDITION) {
// |client_receiver_| will catch the connection error.
return;
}
DCHECK_EQ(begin_result, MOJO_RESULT_OK);
if (readable_size >= data_frame.data_length) {
if (!ConsumeDataFrame(data_frame.fin, data_frame.type, buffer,
data_frame.data_length)) {
// |this| is deleted.
return;
}
const MojoResult end_result =
readable_->EndReadData(data_frame.data_length);
DCHECK_EQ(end_result, MOJO_RESULT_OK);
pending_data_frames_.pop_front();
continue;
}
DCHECK_LT(readable_size, data_frame.data_length);
if (!ConsumeDataFrame(false, data_frame.type, buffer, readable_size)) {
// |this| is deleted.
return;
}
const MojoResult end_result = readable_->EndReadData(readable_size);
DCHECK_EQ(end_result, MOJO_RESULT_OK);
data_frame.type = network::mojom::blink::WebSocketMessageType::CONTINUATION;
data_frame.data_length -= readable_size;
}
}
bool WebSocketHandleImpl::ConsumeDataFrame(
bool fin,
network::mojom::blink::WebSocketMessageType type,
const void* data,
size_t data_size) {
WebSocketHandle::MessageType type_to_pass =
WebSocketHandle::kMessageTypeContinuation;
switch (type) {
case network::mojom::blink::WebSocketMessageType::CONTINUATION:
type_to_pass = WebSocketHandle::kMessageTypeContinuation;
break;
case network::mojom::blink::WebSocketMessageType::TEXT:
type_to_pass = WebSocketHandle::kMessageTypeText;
break;
case network::mojom::blink::WebSocketMessageType::BINARY:
type_to_pass = WebSocketHandle::kMessageTypeBinary;
break;
}
const char* data_to_pass = reinterpret_cast<const char*>(data);
WebSocketChannelImpl* channel = channel_.Get();
channel_->DidReceiveData(this, fin, type_to_pass, data_to_pass, data_size);
// DidReceiveData can delete |this|.
return channel->IsHandleAlive();
}
void WebSocketHandleImpl::AddSendFlowControlQuota(int64_t quota) {
NETWORK_DVLOG(1) << this << " AddSendFlowControlQuota(" << quota << ")";
if (!channel_)
return;
channel_->AddSendFlowControlQuota(this, quota);
// |this| can be deleted here.
}
void WebSocketHandleImpl::OnDropChannel(bool was_clean,
uint16_t code,
const String& reason) {
NETWORK_DVLOG(1) << this << " OnDropChannel(" << was_clean << ", " << code
<< ", " << reason << ")";
WebSocketChannelImpl* channel = channel_;
Disconnect();
if (!channel)
return;
channel->DidClose(this, was_clean, code, reason);
// |this| can be deleted here.
}
void WebSocketHandleImpl::OnClosingHandshake() {
NETWORK_DVLOG(1) << this << " OnClosingHandshake()";
if (!channel_)
return;
channel_->DidStartClosingHandshake(this);
// |this| can be deleted here.
}
} // namespace blink
/*
* Copyright (C) 2013 Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_WEBSOCKETS_WEBSOCKET_HANDLE_IMPL_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_WEBSOCKETS_WEBSOCKET_HANDLE_IMPL_H_
#include "mojo/public/cpp/bindings/receiver.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "services/network/public/mojom/websocket.mojom-blink.h"
#include "third_party/blink/public/mojom/websockets/websocket_connector.mojom-blink.h"
#include "third_party/blink/renderer/modules/websockets/websocket_handle.h"
#include "third_party/blink/renderer/platform/heap/persistent.h"
#include "third_party/blink/renderer/platform/wtf/deque.h"
#include "third_party/blink/renderer/platform/wtf/wtf_size_t.h"
namespace base {
class Location;
class SingleThreadTaskRunner;
} // namespace base
namespace blink {
class WebSocketHandleImpl
: public WebSocketHandle,
public network::mojom::blink::WebSocketHandshakeClient,
public network::mojom::blink::WebSocketClient {
public:
explicit WebSocketHandleImpl(scoped_refptr<base::SingleThreadTaskRunner>);
~WebSocketHandleImpl() override;
void Connect(mojo::Remote<mojom::blink::WebSocketConnector>,
const KURL&,
const Vector<String>& protocols,
const KURL& site_for_cookies,
const String& user_agent_override,
WebSocketChannelImpl*) override;
void Send(bool fin, MessageType, const char* data, wtf_size_t) override;
void StartReceiving() override;
void ConsumePendingDataFrames() override;
void Close(uint16_t code, const String& reason) override;
private:
struct DataFrame final {
DataFrame(bool fin,
network::mojom::blink::WebSocketMessageType type,
uint32_t data_length)
: fin(fin), type(type), data_length(data_length) {}
bool fin;
network::mojom::blink::WebSocketMessageType type;
uint32_t data_length;
};
void Disconnect();
void OnConnectionError(const base::Location& set_from,
uint32_t custom_reason,
const std::string& description);
// network::mojom::blink::WebSocketHandshakeClient methods:
void OnOpeningHandshakeStarted(
network::mojom::blink::WebSocketHandshakeRequestPtr) override;
void OnResponseReceived(
network::mojom::blink::WebSocketHandshakeResponsePtr) override;
void OnConnectionEstablished(
mojo::PendingRemote<network::mojom::blink::WebSocket> websocket,
mojo::PendingReceiver<network::mojom::blink::WebSocketClient>
client_receiver,
const String& selected_protocol,
const String& extensions,
mojo::ScopedDataPipeConsumerHandle readable) override;
// network::mojom::blink::WebSocketClient methods:
void OnDataFrame(bool fin,
network::mojom::blink::WebSocketMessageType,
uint64_t data_length) override;
void AddSendFlowControlQuota(int64_t quota) override;
void OnDropChannel(bool was_clean,
uint16_t code,
const String& reason) override;
void OnClosingHandshake() override;
// Datapipe functions to receive.
void OnReadable(MojoResult result, const mojo::HandleSignalsState& state);
// Returns false if |this| is deleted.
bool ConsumeDataFrame(bool fin,
network::mojom::blink::WebSocketMessageType type,
const void* data,
size_t data_size);
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
WeakPersistent<WebSocketChannelImpl> channel_;
mojo::Remote<network::mojom::blink::WebSocket> websocket_;
mojo::Receiver<network::mojom::blink::WebSocketHandshakeClient>
handshake_client_receiver_{this};
mojo::Receiver<network::mojom::blink::WebSocketClient> client_receiver_{this};
// Datapipe fields to receive.
mojo::ScopedDataPipeConsumerHandle readable_;
mojo::SimpleWatcher readable_watcher_;
WTF::Deque<DataFrame> pending_data_frames_;
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_MODULES_WEBSOCKETS_WEBSOCKET_HANDLE_IMPL_H_
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment