Commit 274ebeda authored by Fabrice de Gans-Riberi's avatar Fabrice de Gans-Riberi Committed by Commit Bot

[Fuchsia] Implement timeout for Cast Streaming

* Kill the Session after 10s of no activity in the audio or video
  stream.
* Kill the Session after 5s if no offer message is received.
* Fix reporting of messages from unknown namespaces in the Cast
  Streaming MessagePort.

Bug: 1087528, 1087528
Change-Id: I69b44bea3b142e7cf12c32106a279ef37710c56b
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2451525Reviewed-by: default avatarKevin Marshall <kmarshall@chromium.org>
Commit-Queue: Fabrice de Gans-Riberi <fdegans@chromium.org>
Cr-Commit-Position: refs/heads/master@{#815876}
parent 14e8b73a
...@@ -267,7 +267,7 @@ void CastMessagePortImpl::PostMessage( ...@@ -267,7 +267,7 @@ void CastMessagePortImpl::PostMessage(
client_->OnMessage(sender_id, message_namespace, str_message); client_->OnMessage(sender_id, message_namespace, str_message);
} else if (message_namespace == kInjectNamespace) { } else if (message_namespace == kInjectNamespace) {
SendInjectResponse(sender_id, str_message); SendInjectResponse(sender_id, str_message);
} else if (message_namespace == kSystemNamespace) { } else if (message_namespace != kSystemNamespace) {
// System messages are ignored, log messages from unknown namespaces. // System messages are ignored, log messages from unknown namespaces.
DVLOG(2) << "Unknown message from " << sender_id DVLOG(2) << "Unknown message from " << sender_id
<< ", namespace=" << message_namespace << ", namespace=" << message_namespace
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "base/bind.h" #include "base/bind.h"
#include "base/notreached.h" #include "base/notreached.h"
#include "base/timer/timer.h"
#include "components/openscreen_platform/network_context.h" #include "components/openscreen_platform/network_context.h"
#include "components/openscreen_platform/network_util.h" #include "components/openscreen_platform/network_util.h"
#include "components/openscreen_platform/task_runner.h" #include "components/openscreen_platform/task_runner.h"
...@@ -26,6 +27,9 @@ namespace { ...@@ -26,6 +27,9 @@ namespace {
constexpr char kVideoCodecH264[] = "h264"; constexpr char kVideoCodecH264[] = "h264";
constexpr char kVideoCodecVp8[] = "vp8"; constexpr char kVideoCodecVp8[] = "vp8";
// Timeout to end the Session when no offer message is sent.
constexpr base::TimeDelta kInitTimeout = base::TimeDelta::FromSeconds(5);
} // namespace } // namespace
namespace cast_streaming { namespace cast_streaming {
...@@ -48,19 +52,24 @@ class CastStreamingSession::Internal ...@@ -48,19 +52,24 @@ class CastStreamingSession::Internal
: task_runner_(task_runner), : task_runner_(task_runner),
environment_(&openscreen::Clock::now, &task_runner_), environment_(&openscreen::Clock::now, &task_runner_),
cast_message_port_impl_(std::move(message_port_request)), cast_message_port_impl_(std::move(message_port_request)),
client_(client) {
DCHECK(task_runner);
DCHECK(client_);
// TODO(crbug.com/1087520): Add streaming session Constraints and // TODO(crbug.com/1087520): Add streaming session Constraints and
// DisplayDescription. // DisplayDescription.
receiver_session_(this, receiver_session_ = std::make_unique<openscreen::cast::ReceiverSession>(
&environment_, this, &environment_, &cast_message_port_impl_,
&cast_message_port_impl_,
openscreen::cast::ReceiverSession::Preferences( openscreen::cast::ReceiverSession::Preferences(
{openscreen::cast::VideoCodec::kH264, {openscreen::cast::VideoCodec::kH264,
openscreen::cast::VideoCodec::kVp8}, openscreen::cast::VideoCodec::kVp8},
{openscreen::cast::AudioCodec::kAac, {openscreen::cast::AudioCodec::kAac,
openscreen::cast::AudioCodec::kOpus})), openscreen::cast::AudioCodec::kOpus}));
client_(client) {
DCHECK(task_runner); init_timeout_timer_.Start(
DCHECK(client_); FROM_HERE, kInitTimeout,
base::BindOnce(&CastStreamingSession::Internal::OnInitializationTimeout,
base::Unretained(this)));
} }
~Internal() final = default; ~Internal() final = default;
...@@ -69,14 +78,22 @@ class CastStreamingSession::Internal ...@@ -69,14 +78,22 @@ class CastStreamingSession::Internal
Internal& operator=(const Internal&) = delete; Internal& operator=(const Internal&) = delete;
private: private:
void OnInitializationTimeout() {
DVLOG(1) << __func__;
DCHECK(!is_initialized_);
client_->OnInitializationFailure();
is_initialized_ = true;
}
// openscreen::cast::ReceiverSession::Client implementation. // openscreen::cast::ReceiverSession::Client implementation.
void OnNegotiated( void OnNegotiated(
const openscreen::cast::ReceiverSession* session, const openscreen::cast::ReceiverSession* session,
openscreen::cast::ReceiverSession::ConfiguredReceivers receivers) final { openscreen::cast::ReceiverSession::ConfiguredReceivers receivers) final {
DVLOG(1) << __func__; DVLOG(1) << __func__;
DCHECK_EQ(session, &receiver_session_); DCHECK_EQ(session, receiver_session_.get());
init_timeout_timer_.Stop();
if (initialized_called_) { if (is_initialized_) {
// TODO(crbug.com/1116185): Handle multiple offer messages properly. // TODO(crbug.com/1116185): Handle multiple offer messages properly.
return; return;
} }
...@@ -99,11 +116,15 @@ class CastStreamingSession::Internal ...@@ -99,11 +116,15 @@ class CastStreamingSession::Internal
} }
// Initialize the audio consumer. // Initialize the audio consumer.
// We can use unretained pointers here because StreamConsumer is owned by
// this object and |client_| is guaranteed to outlive this object.
audio_consumer_ = std::make_unique<StreamConsumer>( audio_consumer_ = std::make_unique<StreamConsumer>(
receivers.audio->receiver, std::move(data_pipe_producer), receivers.audio->receiver, std::move(data_pipe_producer),
base::BindRepeating( base::BindRepeating(
&CastStreamingSession::Client::OnAudioBufferReceived, &CastStreamingSession::Client::OnAudioBufferReceived,
base::Unretained(client_))); base::Unretained(client_)),
base::BindOnce(&CastStreamingSession::Internal::OnDataTimeout,
base::Unretained(this)));
// Gather data for the audio decoder config. // Gather data for the audio decoder config.
media::ChannelLayout channel_layout = media::ChannelLayout channel_layout =
...@@ -143,11 +164,15 @@ class CastStreamingSession::Internal ...@@ -143,11 +164,15 @@ class CastStreamingSession::Internal
} }
// Initialize the video consumer. // Initialize the video consumer.
// We can use unretained pointers here because StreamConsumer is owned by
// this object and |client_| is guaranteed to outlive this object.
video_consumer_ = std::make_unique<StreamConsumer>( video_consumer_ = std::make_unique<StreamConsumer>(
receivers.video->receiver, std::move(data_pipe_producer), receivers.video->receiver, std::move(data_pipe_producer),
base::BindRepeating( base::BindRepeating(
&CastStreamingSession::Client::OnVideoBufferReceived, &CastStreamingSession::Client::OnVideoBufferReceived,
base::Unretained(client_))); base::Unretained(client_)),
base::BindOnce(&CastStreamingSession::Internal::OnDataTimeout,
base::Unretained(this)));
// Gather data for the video decoder config. // Gather data for the video decoder config.
const std::string& video_codec = const std::string& video_codec =
...@@ -193,14 +218,15 @@ class CastStreamingSession::Internal ...@@ -193,14 +218,15 @@ class CastStreamingSession::Internal
client_->OnInitializationSuccess(std::move(audio_stream_info), client_->OnInitializationSuccess(std::move(audio_stream_info),
std::move(video_stream_info)); std::move(video_stream_info));
} }
initialized_called_ = true; is_initialized_ = true;
} }
// TODO(https://crbug.com/1116185): Handle |reason| and reset streams on a // TODO(https://crbug.com/1116185): Handle |reason| and reset streams on a
// new offer message. // new offer message.
void OnReceiversDestroying(const openscreen::cast::ReceiverSession* session, void OnReceiversDestroying(const openscreen::cast::ReceiverSession* session,
ReceiversDestroyingReason reason) final { ReceiversDestroyingReason reason) final {
DCHECK_EQ(session, &receiver_session_); // This can be called when |receiver_session_| is being destroyed, so we
// do not sanity-check |session| here.
DVLOG(1) << __func__; DVLOG(1) << __func__;
audio_consumer_.reset(); audio_consumer_.reset();
video_consumer_.reset(); video_consumer_.reset();
...@@ -209,20 +235,26 @@ class CastStreamingSession::Internal ...@@ -209,20 +235,26 @@ class CastStreamingSession::Internal
void OnError(const openscreen::cast::ReceiverSession* session, void OnError(const openscreen::cast::ReceiverSession* session,
openscreen::Error error) final { openscreen::Error error) final {
DCHECK_EQ(session, &receiver_session_); DCHECK_EQ(session, receiver_session_.get());
LOG(ERROR) << error; LOG(ERROR) << error;
if (!initialized_called_) { if (!is_initialized_) {
client_->OnInitializationFailure(); client_->OnInitializationFailure();
initialized_called_ = true; is_initialized_ = true;
}
} }
void OnDataTimeout() {
DVLOG(1) << __func__;
receiver_session_.reset();
} }
openscreen_platform::TaskRunner task_runner_; openscreen_platform::TaskRunner task_runner_;
openscreen::cast::Environment environment_; openscreen::cast::Environment environment_;
CastMessagePortImpl cast_message_port_impl_; CastMessagePortImpl cast_message_port_impl_;
openscreen::cast::ReceiverSession receiver_session_; std::unique_ptr<openscreen::cast::ReceiverSession> receiver_session_;
base::OneShotTimer init_timeout_timer_;
bool initialized_called_ = false; bool is_initialized_ = false;
CastStreamingSession::Client* const client_; CastStreamingSession::Client* const client_;
std::unique_ptr<openscreen::cast::Receiver::Consumer> audio_consumer_; std::unique_ptr<openscreen::cast::Receiver::Consumer> audio_consumer_;
std::unique_ptr<openscreen::cast::Receiver::Consumer> video_consumer_; std::unique_ptr<openscreen::cast::Receiver::Consumer> video_consumer_;
......
...@@ -10,9 +10,17 @@ ...@@ -10,9 +10,17 @@
namespace cast_streaming { namespace cast_streaming {
namespace {
// Timeout to stop the Session when no data is received.
constexpr base::TimeDelta kNoDataTimeout = base::TimeDelta::FromSeconds(10);
} // namespace
StreamConsumer::StreamConsumer(openscreen::cast::Receiver* receiver, StreamConsumer::StreamConsumer(openscreen::cast::Receiver* receiver,
mojo::ScopedDataPipeProducerHandle data_pipe, mojo::ScopedDataPipeProducerHandle data_pipe,
FrameReceivedCB frame_received_cb) FrameReceivedCB frame_received_cb,
base::OnceClosure on_timeout)
: receiver_(receiver), : receiver_(receiver),
data_pipe_(std::move(data_pipe)), data_pipe_(std::move(data_pipe)),
frame_received_cb_(std::move(frame_received_cb)), frame_received_cb_(std::move(frame_received_cb)),
...@@ -27,7 +35,10 @@ StreamConsumer::StreamConsumer(openscreen::cast::Receiver* receiver, ...@@ -27,7 +35,10 @@ StreamConsumer::StreamConsumer(openscreen::cast::Receiver* receiver,
base::Unretained(this))); base::Unretained(this)));
if (result != MOJO_RESULT_OK) { if (result != MOJO_RESULT_OK) {
CloseDataPipeOnError(); CloseDataPipeOnError();
return;
} }
data_timeout_timer_.Start(FROM_HERE, kNoDataTimeout, std::move(on_timeout));
} }
StreamConsumer::~StreamConsumer() { StreamConsumer::~StreamConsumer() {
...@@ -39,6 +50,7 @@ void StreamConsumer::CloseDataPipeOnError() { ...@@ -39,6 +50,7 @@ void StreamConsumer::CloseDataPipeOnError() {
receiver_->SetConsumer(nullptr); receiver_->SetConsumer(nullptr);
pipe_watcher_.Cancel(); pipe_watcher_.Cancel();
data_pipe_.reset(); data_pipe_.reset();
data_timeout_timer_.Stop();
} }
void StreamConsumer::OnPipeWritable(MojoResult result) { void StreamConsumer::OnPipeWritable(MojoResult result) {
...@@ -72,6 +84,7 @@ void StreamConsumer::OnPipeWritable(MojoResult result) { ...@@ -72,6 +84,7 @@ void StreamConsumer::OnPipeWritable(MojoResult result) {
void StreamConsumer::OnFramesReady(int next_frame_buffer_size) { void StreamConsumer::OnFramesReady(int next_frame_buffer_size) {
DCHECK(data_pipe_); DCHECK(data_pipe_);
data_timeout_timer_.Reset();
if (pending_buffer_remaining_bytes_ != 0) { if (pending_buffer_remaining_bytes_ != 0) {
// There already is a pending frame. Ignore this one for now. // There already is a pending frame. Ignore this one for now.
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <fuchsia/media/cpp/fidl.h> #include <fuchsia/media/cpp/fidl.h>
#include "base/callback.h" #include "base/callback.h"
#include "base/timer/timer.h"
#include "media/mojo/mojom/media_types.mojom.h" #include "media/mojo/mojom/media_types.mojom.h"
#include "mojo/public/cpp/system/data_pipe.h" #include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/simple_watcher.h" #include "mojo/public/cpp/system/simple_watcher.h"
...@@ -33,9 +34,11 @@ class StreamConsumer : public openscreen::cast::Receiver::Consumer { ...@@ -33,9 +34,11 @@ class StreamConsumer : public openscreen::cast::Receiver::Consumer {
// |receiver| sends frames to this object. It must outlive this object. // |receiver| sends frames to this object. It must outlive this object.
// |frame_received_cb| is called on every new frame, after a new frame has // |frame_received_cb| is called on every new frame, after a new frame has
// been written to |data_pipe|. On error, |data_pipe| will be closed. // been written to |data_pipe|. On error, |data_pipe| will be closed.
// If no data is received for 10 seconds, |on_timeout| will be closed.
StreamConsumer(openscreen::cast::Receiver* receiver, StreamConsumer(openscreen::cast::Receiver* receiver,
mojo::ScopedDataPipeProducerHandle data_pipe, mojo::ScopedDataPipeProducerHandle data_pipe,
FrameReceivedCB frame_received_cb); FrameReceivedCB frame_received_cb,
base::OnceClosure on_timeout);
~StreamConsumer() final; ~StreamConsumer() final;
StreamConsumer(const StreamConsumer&) = delete; StreamConsumer(const StreamConsumer&) = delete;
...@@ -70,6 +73,9 @@ class StreamConsumer : public openscreen::cast::Receiver::Consumer { ...@@ -70,6 +73,9 @@ class StreamConsumer : public openscreen::cast::Receiver::Consumer {
// Remaining bytes to write from |pending_buffer_| to |data_pipe_|. // Remaining bytes to write from |pending_buffer_| to |data_pipe_|.
size_t pending_buffer_remaining_bytes_ = 0; size_t pending_buffer_remaining_bytes_ = 0;
// Timer to trigger connection closure if no data is received for 10 seconds.
base::OneShotTimer data_timeout_timer_;
}; };
} // namespace cast_streaming } // namespace cast_streaming
......
...@@ -89,6 +89,13 @@ void CastStreamingSessionClient::OnReceiverSessionEnded() { ...@@ -89,6 +89,13 @@ void CastStreamingSessionClient::OnReceiverSessionEnded() {
// Tear down the Mojo connection. // Tear down the Mojo connection.
cast_streaming_receiver_.reset(); cast_streaming_receiver_.reset();
// Tear down all remaining Mojo objects if needed. This is necessary if the
// Cast Streaming Session ending was initiated by the receiver component.
if (audio_remote_)
audio_remote_.reset();
if (video_remote_)
video_remote_.reset();
} }
void CastStreamingSessionClient::OnMojoDisconnect() { void CastStreamingSessionClient::OnMojoDisconnect() {
......
...@@ -63,7 +63,7 @@ class CastStreamingDemuxerStream : public media::DemuxerStream, ...@@ -63,7 +63,7 @@ class CastStreamingDemuxerStream : public media::DemuxerStream,
return; return;
if (current_buffer_->end_of_stream()) { if (current_buffer_->end_of_stream()) {
std::move(pending_read_cb_).Run(Status::kAborted, nullptr); std::move(pending_read_cb_).Run(Status::kError, nullptr);
return; return;
} }
...@@ -198,7 +198,6 @@ CastStreamingDemuxer::CastStreamingDemuxer( ...@@ -198,7 +198,6 @@ CastStreamingDemuxer::CastStreamingDemuxer(
CastStreamingDemuxer::~CastStreamingDemuxer() { CastStreamingDemuxer::~CastStreamingDemuxer() {
DVLOG(1) << __func__; DVLOG(1) << __func__;
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (was_initialization_successful_) { if (was_initialization_successful_) {
original_task_runner_->PostTask( original_task_runner_->PostTask(
......
...@@ -17,7 +17,7 @@ found in the LICENSE file. ...@@ -17,7 +17,7 @@ found in the LICENSE file.
</head> </head>
<body> <body>
<video src="data:cast_streaming_receiver" autoplay> <video src="data:cast_streaming_receiver">
<script> <script>
// The Cast Streaming session must stop when the stream is no longer visible. crbug.com/1111886 // The Cast Streaming session must stop when the stream is no longer visible. crbug.com/1111886
...@@ -27,10 +27,10 @@ found in the LICENSE file. ...@@ -27,10 +27,10 @@ found in the LICENSE file.
} }
}); });
// TODO(crbug.com/1087528): This should not be necessary. Figure out why
// autoplay is not enough here.
var video = document.querySelector('video'); var video = document.querySelector('video');
video.play(); video.addEventListener('ended', window.close);
video.addEventListener('error', window.close);
video.play().catch(window.close);
</script> </script>
</body> </body>
</html> </html>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment