Commit e2c48d9b authored by Fabrice de Gans-Riberi's avatar Fabrice de Gans-Riberi Committed by Commit Bot

[fuchsia] Transition Cast Streaming to use Cast MessagePorts

Bug: 1147941
Change-Id: Id88d06e4be726d745ced8a0944b0cfbf9241d42a
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2532861
Commit-Queue: Fabrice de Gans-Riberi <fdegans@chromium.org>
Reviewed-by: default avatarKevin Marshall <kmarshall@chromium.org>
Cr-Commit-Position: refs/heads/master@{#826908}
parent c3aed755
......@@ -9,7 +9,6 @@ source_set("cast_streaming") {
"//base",
"//components/openscreen_platform",
"//components/openscreen_platform:openscreen_platform_network_service",
"//fuchsia/base",
"//media",
"//media/mojo/common",
"//media/mojo/mojom",
......@@ -20,6 +19,7 @@ source_set("cast_streaming") {
"//third_party/openscreen/src/platform:api",
"//third_party/openscreen/src/util",
]
public_deps = [ "//components/cast/message_port" ]
visibility = [ "//fuchsia/engine/*" ]
public = [
"public/cast_streaming.h",
......
include_rules = [
"+components/cast/message_port",
"+components/openscreen_platform",
"+media/base",
"+media/mojo",
......
......@@ -4,12 +4,10 @@
#include "fuchsia/cast_streaming/cast_message_port_impl.h"
#include "base/fuchsia/fuchsia_logging.h"
#include "base/json/json_reader.h"
#include "base/json/json_writer.h"
#include "base/logging.h"
#include "base/values.h"
#include "fuchsia/base/mem_buffer_util.h"
#include "third_party/openscreen/src/platform/base/error.h"
namespace cast_streaming {
......@@ -50,26 +48,13 @@ const char kInitialConnectMessage[] = R"(
}
)";
// Upper limit for pending FIDL messages. Messages are going to be pending as
// long as the other end of the MessagePort does not acknowledge the latest
// message. This is to prevent the queue from being overrun in case the other
// end of the FIDL MessagePort is misbehaving.
// This should cover the largest burst of messages from the Open Screen
// implementation.
constexpr size_t kMaxPendingFidlMessages = 10;
// Extracts |buffer| data into |sender_id|, |message_namespace| and |message|.
// Returns true on success.
bool ParseMessageBuffer(const fuchsia::mem::Buffer& buffer,
bool ParseMessageBuffer(base::StringPiece buffer,
std::string* sender_id,
std::string* message_namespace,
std::string* message) {
std::string string_buffer;
if (!cr_fuchsia::StringFromMemBuffer(buffer, &string_buffer))
return false;
base::Optional<base::Value> converted_value =
base::JSONReader::Read(string_buffer);
base::Optional<base::Value> converted_value = base::JSONReader::Read(buffer);
if (!converted_value)
return false;
......@@ -93,9 +78,9 @@ bool ParseMessageBuffer(const fuchsia::mem::Buffer& buffer,
return true;
}
// Creates a WebMessage out of the |sender_id|, |message_namespace| and
// Creates a message string out of the |sender_id|, |message_namespace| and
// |message|.
fuchsia::web::WebMessage CreateWebMessage(const std::string& sender_id,
std::string CreateStringMessage(const std::string& sender_id,
const std::string& message_namespace,
const std::string& message) {
base::Value value(base::Value::Type::DICTIONARY);
......@@ -105,31 +90,16 @@ fuchsia::web::WebMessage CreateWebMessage(const std::string& sender_id,
std::string json_message;
CHECK(base::JSONWriter::Write(value, &json_message));
fuchsia::mem::Buffer buffer;
buffer.size = json_message.size();
zx_status_t status = zx::vmo::create(json_message.size(), 0, &buffer.vmo);
ZX_DCHECK(status == ZX_OK, status);
status = buffer.vmo.write(json_message.data(), 0, json_message.size());
ZX_DCHECK(status == ZX_OK, status);
fuchsia::web::WebMessage web_message;
web_message.set_data(std::move(buffer));
return web_message;
return json_message;
}
} // namespace
CastMessagePortImpl::CastMessagePortImpl(
fidl::InterfaceRequest<fuchsia::web::MessagePort> message_port_request)
: message_port_binding_(this, std::move(message_port_request)) {
std::unique_ptr<cast_api_bindings::MessagePort> message_port)
: message_port_(std::move(message_port)) {
DVLOG(1) << __func__;
DCHECK(message_port_binding_.is_bound());
message_port_binding_.set_error_handler([this](zx_status_t status) {
ZX_LOG(ERROR, status) << "MessagePort disconnected.";
MaybeCloseWithEpitaph(ZX_ERR_BAD_STATE);
});
message_port_->SetReceiver(this);
// Initialize the connection with the Cast Streaming Sender.
PostMessage(kValueSystemSenderId, kSystemNamespace, kInitialConnectMessage);
......@@ -137,24 +107,13 @@ CastMessagePortImpl::CastMessagePortImpl(
CastMessagePortImpl::~CastMessagePortImpl() = default;
void CastMessagePortImpl::MaybeSendMessageToFidl() {
DVLOG(3) << __func__;
if (!receive_message_callback_ || pending_fidl_messages_.empty())
return;
receive_message_callback_(std::move(pending_fidl_messages_.front()));
receive_message_callback_ = nullptr;
pending_fidl_messages_.pop_front();
}
void CastMessagePortImpl::MaybeCloseWithEpitaph(zx_status_t epitaph) {
if (message_port_binding_.is_bound())
message_port_binding_.Close(epitaph);
void CastMessagePortImpl::MaybeClose() {
if (message_port_)
message_port_.reset();
if (client_) {
client_->OnError(
openscreen::Error(openscreen::Error::Code::kCastV2CastSocketError));
}
pending_fidl_messages_.clear();
}
void CastMessagePortImpl::SetClient(
......@@ -164,12 +123,12 @@ void CastMessagePortImpl::SetClient(
DCHECK_NE(!client_, !client);
client_ = client;
if (!client_)
MaybeCloseWithEpitaph(ZX_OK);
MaybeClose();
}
void CastMessagePortImpl::ResetClient() {
client_ = nullptr;
MaybeCloseWithEpitaph(ZX_OK);
MaybeClose();
}
void CastMessagePortImpl::SendInjectResponse(const std::string& sender_id,
......@@ -222,42 +181,41 @@ void CastMessagePortImpl::PostMessage(const std::string& sender_id,
const std::string& message_namespace,
const std::string& message) {
DVLOG(3) << __func__;
if (!message_port_binding_.is_bound())
return;
if (pending_fidl_messages_.size() > kMaxPendingFidlMessages) {
LOG(ERROR) << "Too many buffered Open Screen messages.";
MaybeCloseWithEpitaph(ZX_ERR_BAD_STATE);
if (!message_port_)
return;
}
DVLOG(3) << "Received Open Screen message. SenderId: " << sender_id
<< ". Namespace: " << message_namespace << ". Message: " << message;
pending_fidl_messages_.push_back(
CreateWebMessage(sender_id, message_namespace, message));
MaybeSendMessageToFidl();
message_port_->PostMessage(
CreateStringMessage(sender_id, message_namespace, message));
}
void CastMessagePortImpl::PostMessage(
fuchsia::web::WebMessage message,
fuchsia::web::MessagePort::PostMessageCallback callback) {
bool CastMessagePortImpl::OnMessage(
base::StringPiece message,
std::vector<std::unique_ptr<cast_api_bindings::MessagePort>> ports) {
DVLOG(3) << __func__;
// If |client_| was cleared, the binding should have been closed.
// If |client_| was cleared, |message_port_| should have been reset.
DCHECK(client_);
if (!ports.empty()) {
// We should never receive any ports for Cast Streaming.
LOG(ERROR) << "Received ports on Cast Streaming MessagePort.";
MaybeClose();
return false;
}
std::string sender_id;
std::string message_namespace;
std::string str_message;
if (!ParseMessageBuffer(message.data(), &sender_id, &message_namespace,
if (!ParseMessageBuffer(message, &sender_id, &message_namespace,
&str_message)) {
LOG(ERROR) << "Received bad message.";
client_->OnError(
openscreen::Error(openscreen::Error::Code::kCastV2InvalidMessage));
return;
return false;
}
DVLOG(3) << "Received FIDL message. SenderId: " << sender_id
DVLOG(3) << "Received Cast message. SenderId: " << sender_id
<< ". Namespace: " << message_namespace
<< ". Message: " << str_message;
......@@ -274,22 +232,12 @@ void CastMessagePortImpl::PostMessage(
<< ", message=" << str_message;
}
// Acknowledge the message and unblock the receipt of another.
fuchsia::web::MessagePort_PostMessage_Result result;
result.set_response(fuchsia::web::MessagePort_PostMessage_Response());
callback(std::move(result));
return true;
}
void CastMessagePortImpl::ReceiveMessage(
fuchsia::web::MessagePort::ReceiveMessageCallback callback) {
void CastMessagePortImpl::OnPipeError() {
DVLOG(3) << __func__;
if (receive_message_callback_) {
MaybeCloseWithEpitaph(ZX_ERR_BAD_STATE);
return;
}
receive_message_callback_ = std::move(callback);
MaybeSendMessageToFidl();
MaybeClose();
}
} // namespace cast_streaming
......@@ -5,21 +5,18 @@
#ifndef FUCHSIA_CAST_STREAMING_CAST_MESSAGE_PORT_IMPL_H_
#define FUCHSIA_CAST_STREAMING_CAST_MESSAGE_PORT_IMPL_H_
#include <fuchsia/web/cpp/fidl.h>
#include <lib/fidl/cpp/binding.h>
#include "base/containers/circular_deque.h"
#include "components/cast/message_port/message_port.h"
#include "third_party/openscreen/src/cast/common/public/message_port.h"
namespace cast_streaming {
// Wrapper for a fuchsia.web.MessagePort that provides an Open Screen
// MessagePort implementation.
// Wrapper for a cast MessagePort that provides an Open Screen MessagePort
// implementation.
class CastMessagePortImpl : public openscreen::cast::MessagePort,
public fuchsia::web::MessagePort {
public cast_api_bindings::MessagePort::Receiver {
public:
explicit CastMessagePortImpl(
fidl::InterfaceRequest<fuchsia::web::MessagePort> message_port_request);
std::unique_ptr<cast_api_bindings::MessagePort> message_port);
~CastMessagePortImpl() final;
CastMessagePortImpl(const CastMessagePortImpl&) = delete;
......@@ -33,34 +30,23 @@ class CastMessagePortImpl : public openscreen::cast::MessagePort,
const std::string& message) final;
private:
// Sends one message in |pending_fidl_messages_| if
// |receive_message_callback_| is set.
void MaybeSendMessageToFidl();
// Closes the fuchsia.web.MessagePort connection and cleans up internal state.
// * Closes |message_port_binding_| with |epitaph| if it is still open.
// * Signals an error to |client_| if |client_| is set.
// * Empties |pending_fidl_messages_|.
void MaybeCloseWithEpitaph(zx_status_t epitaph);
// Resets |message_port_| if it is open and signals an error to |client_| if
// |client_| is set.
void MaybeClose();
// Returns a "not supported" error message to the sender for messages from
// the inject namespace.
void SendInjectResponse(const std::string& sender_id,
const std::string& message);
// fuchsia::web::MessagePort implementation.
void PostMessage(fuchsia::web::WebMessage message,
PostMessageCallback callback) final;
void ReceiveMessage(ReceiveMessageCallback callback) final;
// cast_api_bindings::MessagePort::Receiver implementation.
bool OnMessage(
base::StringPiece message,
std::vector<std::unique_ptr<cast_api_bindings::MessagePort>> ports) final;
void OnPipeError() final;
Client* client_ = nullptr;
// Holds WebMessages waiting to be sent over FIDL.
base::circular_deque<fuchsia::web::WebMessage> pending_fidl_messages_;
ReceiveMessageCallback receive_message_callback_;
fidl::Binding<fuchsia::web::MessagePort> message_port_binding_;
std::unique_ptr<cast_api_bindings::MessagePort> message_port_;
};
} // namespace cast_streaming
......
......@@ -4,8 +4,6 @@
#include "fuchsia/cast_streaming/public/cast_streaming_session.h"
#include <lib/zx/time.h>
#include "base/bind.h"
#include "base/notreached.h"
#include "base/timer/timer.h"
......@@ -109,13 +107,12 @@ void CastStreamingSession::SetNetworkContextGetter(
class CastStreamingSession::Internal
: public openscreen::cast::ReceiverSession::Client {
public:
Internal(
CastStreamingSession::Client* client,
fidl::InterfaceRequest<fuchsia::web::MessagePort> message_port_request,
Internal(CastStreamingSession::Client* client,
std::unique_ptr<cast_api_bindings::MessagePort> message_port,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: task_runner_(task_runner),
environment_(&openscreen::Clock::now, &task_runner_),
cast_message_port_impl_(std::move(message_port_request)),
cast_message_port_impl_(std::move(message_port)),
client_(client) {
DCHECK(task_runner);
DCHECK(client_);
......@@ -336,12 +333,12 @@ CastStreamingSession::~CastStreamingSession() = default;
void CastStreamingSession::Start(
Client* client,
fidl::InterfaceRequest<fuchsia::web::MessagePort> message_port_request,
std::unique_ptr<cast_api_bindings::MessagePort> message_port,
scoped_refptr<base::SequencedTaskRunner> task_runner) {
DCHECK(client);
DCHECK(!internal_);
internal_ = std::make_unique<Internal>(
client, std::move(message_port_request), task_runner);
internal_ =
std::make_unique<Internal>(client, std::move(message_port), task_runner);
}
void CastStreamingSession::Stop() {
......
......@@ -5,13 +5,12 @@
#ifndef FUCHSIA_CAST_STREAMING_PUBLIC_CAST_STREAMING_SESSION_H_
#define FUCHSIA_CAST_STREAMING_PUBLIC_CAST_STREAMING_SESSION_H_
#include <fuchsia/web/cpp/fidl.h>
#include <memory>
#include "base/callback.h"
#include "base/optional.h"
#include "base/sequenced_task_runner.h"
#include "components/cast/message_port/message_port.h"
#include "media/base/audio_decoder_config.h"
#include "media/base/video_decoder_config.h"
#include "media/mojo/mojom/media_types.mojom.h"
......@@ -93,9 +92,8 @@ class CastStreamingSession {
// * On failure, OnSessionEnded() will be called.
// * When a new offer is sent by the Cast Streaming Sender,
// OnSessionReinitialization() will be called.
void Start(
Client* client,
fidl::InterfaceRequest<fuchsia::web::MessagePort> message_port_request,
void Start(Client* client,
std::unique_ptr<cast_api_bindings::MessagePort> message_port,
scoped_refptr<base::SequencedTaskRunner> task_runner);
// Stops the Cast Streaming Session. This can only be called once during the
......
......@@ -5,8 +5,6 @@
#ifndef FUCHSIA_CAST_STREAMING_STREAM_CONSUMER_H_
#define FUCHSIA_CAST_STREAMING_STREAM_CONSUMER_H_
#include <fuchsia/media/cpp/fidl.h>
#include "base/callback.h"
#include "base/timer/timer.h"
#include "media/mojo/mojom/media_types.mojom.h"
......
include_rules = [
"+cc/base/switches.h",
"+components/cast/message_port",
"+components/version_info",
"+components/viz/common",
"+components/media_control",
......
......@@ -5,6 +5,7 @@
#include "fuchsia/engine/browser/cast_streaming_session_client.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "components/cast/message_port/message_port_fuchsia.h"
#include "media/base/audio_decoder_config.h"
#include "media/base/video_decoder_config.h"
#include "media/mojo/mojom/media_types.mojom.h"
......@@ -33,7 +34,9 @@ void CastStreamingSessionClient::StartMojoConnection(
void CastStreamingSessionClient::OnReceiverEnabled() {
DVLOG(1) << __func__;
DCHECK(message_port_request_);
cast_streaming_session_.Start(this, std::move(message_port_request_),
cast_streaming_session_.Start(this,
cast_api_bindings::MessagePortFuchsia::Create(
std::move(message_port_request_)),
base::SequencedTaskRunnerHandle::Get());
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment