Commit 1018f446 authored by Ken MacKay's avatar Ken MacKay Committed by Chromium LUCI CQ

[Chromecast] Limit send queue size for mixer sockets

Never queue audio data, and limit to only one copy of other messages.

In particular, if a loopback receiver stopped receiving messages, the
mixer's send queue (for the audio data) could grow without bound,
leading to OOM.

Merge-With: eureka-internal/492543

Bug: internal b/165017599
Test: on device

Change-Id: Ib66c9f00fd4f451e8bba66e6586f9023cd1bc668
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2555726Reviewed-by: default avatarYuchen Liu <yucliu@chromium.org>
Commit-Queue: Kenneth MacKay <kmackay@chromium.org>
Cr-Commit-Position: refs/heads/master@{#832118}
parent 26194a11
......@@ -35,7 +35,9 @@ void ControlConnection::SetVolume(AudioContentType type,
auto* volume = message.mutable_set_device_volume();
volume->set_content_type(ConvertContentType(type));
volume->set_volume_multiplier(volume_multiplier);
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
OnSendFailed();
}
}
}
......@@ -50,7 +52,9 @@ void ControlConnection::SetMuted(AudioContentType type, bool muted) {
auto* mute_message = message.mutable_set_device_muted();
mute_message->set_content_type(ConvertContentType(type));
mute_message->set_muted(muted);
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
OnSendFailed();
}
}
}
......@@ -66,7 +70,9 @@ void ControlConnection::SetVolumeLimit(AudioContentType type,
auto* limit = message.mutable_set_volume_limit();
limit->set_content_type(ConvertContentType(type));
limit->set_max_volume_multiplier(max_volume_multiplier);
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
OnSendFailed();
}
}
}
......@@ -76,22 +82,33 @@ void ControlConnection::ListPostprocessors(
if (!socket_) {
return;
}
Generic proto;
proto.mutable_list_postprocessors();
socket_->SendProto(proto);
Generic message;
message.mutable_list_postprocessors();
if (!socket_->SendProto(0, message)) {
OnSendFailed();
}
}
void ControlConnection::ConfigurePostprocessor(std::string postprocessor_name,
std::string config) {
SendPostprocessorMessage(postprocessor_name, config);
postprocessor_config_.insert_or_assign(std::move(postprocessor_name),
std::move(config));
postprocessor_config_.insert_or_assign(postprocessor_name, config);
if (!SendPostprocessorMessageInternal(std::move(postprocessor_name),
std::move(config))) {
OnSendFailed();
}
}
void ControlConnection::SendPostprocessorMessage(std::string postprocessor_name,
std::string message) {
SendPostprocessorMessageInternal(std::move(postprocessor_name),
std::move(message));
}
bool ControlConnection::SendPostprocessorMessageInternal(
std::string postprocessor_name,
std::string message) {
if (!socket_) {
return;
return true;
}
// Erase any ? and subsequent substring from the name.
......@@ -104,7 +121,7 @@ void ControlConnection::SendPostprocessorMessage(std::string postprocessor_name,
auto* content = proto.mutable_configure_postprocessor();
content->set_name(std::move(postprocessor_name));
content->set_config(std::move(message));
socket_->SendProto(proto);
return socket_->SendProto(0, proto);
}
void ControlConnection::ReloadPostprocessors() {
......@@ -113,7 +130,7 @@ void ControlConnection::ReloadPostprocessors() {
}
Generic message;
message.mutable_reload_postprocessors();
socket_->SendProto(message);
socket_->SendProto(0, message);
}
void ControlConnection::SetStreamCountCallback(StreamCountCallback callback) {
......@@ -122,7 +139,9 @@ void ControlConnection::SetStreamCountCallback(StreamCountCallback callback) {
Generic message;
message.mutable_request_stream_count()->set_subscribe(
!stream_count_callback_.is_null());
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
OnSendFailed();
}
}
}
......@@ -131,7 +150,9 @@ void ControlConnection::SetNumOutputChannels(int num_channels) {
if (socket_) {
Generic message;
message.mutable_set_num_output_channels()->set_channels(num_channels);
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
OnSendFailed();
}
}
}
......@@ -144,7 +165,9 @@ void ControlConnection::OnConnected(std::unique_ptr<MixerSocket> socket) {
auto* limit = message.mutable_set_volume_limit();
limit->set_content_type(ConvertContentType(item.first));
limit->set_max_volume_multiplier(item.second);
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
return OnSendFailed();
}
}
for (const auto& item : muted_) {
......@@ -152,7 +175,9 @@ void ControlConnection::OnConnected(std::unique_ptr<MixerSocket> socket) {
auto* muted = message.mutable_set_device_muted();
muted->set_content_type(ConvertContentType(item.first));
muted->set_muted(item.second);
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
return OnSendFailed();
}
}
for (const auto& item : volume_) {
......@@ -160,30 +185,40 @@ void ControlConnection::OnConnected(std::unique_ptr<MixerSocket> socket) {
auto* volume = message.mutable_set_device_volume();
volume->set_content_type(ConvertContentType(item.first));
volume->set_volume_multiplier(item.second);
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
return OnSendFailed();
}
}
if (stream_count_callback_) {
Generic message;
message.mutable_request_stream_count()->set_subscribe(true);
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
return OnSendFailed();
}
}
if (num_output_channels_) {
Generic message;
message.mutable_set_num_output_channels()->set_channels(
num_output_channels_);
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
return OnSendFailed();
}
}
for (const auto& item : postprocessor_config_) {
SendPostprocessorMessage(item.first, item.second);
if (!SendPostprocessorMessageInternal(item.first, item.second)) {
return OnSendFailed();
}
}
if (!list_postprocessors_callbacks_.empty()) {
Generic message;
message.mutable_list_postprocessors();
socket_->SendProto(message);
if (!socket_->SendProto(0, message)) {
return OnSendFailed();
}
}
if (connect_callback_) {
......@@ -191,6 +226,11 @@ void ControlConnection::OnConnected(std::unique_ptr<MixerSocket> socket) {
}
}
void ControlConnection::OnSendFailed() {
LOG(WARNING) << "Failed to send a control message";
OnConnectionError();
}
void ControlConnection::OnConnectionError() {
socket_.reset();
MixerConnection::Connect();
......
......@@ -86,6 +86,10 @@ class ControlConnection : public MixerConnection, public MixerSocket::Delegate {
void SetNumOutputChannels(int num_channels);
private:
bool SendPostprocessorMessageInternal(std::string postprocessor_name,
std::string message);
void OnSendFailed();
// MixerConnection implementation:
void OnConnected(std::unique_ptr<MixerSocket> socket) override;
void OnConnectionError() override;
......
......@@ -18,6 +18,14 @@ namespace chromecast {
namespace media {
namespace mixer_service {
namespace {
enum MessageTypes : int {
kRequest = 1,
};
}
LoopbackConnection::LoopbackConnection(Delegate* delegate)
: LoopbackConnection(delegate, nullptr) {}
......@@ -48,7 +56,7 @@ void LoopbackConnection::OnConnected(std::unique_ptr<MixerSocket> socket) {
Generic message;
message.mutable_loopback_request();
socket_->SendProto(message);
socket_->SendProto(kRequest, message);
}
void LoopbackConnection::OnConnectionError() {
......
......@@ -12,10 +12,11 @@ namespace media {
// StreamInterruption::InterruptionReason in mixer_service.proto.
enum class LoopbackInterruptReason {
kUnknown = 0,
kDisconnected = 1, // Disconnected from mixer.
kUnderrun = 2, // Mixer output underrun.
kConfigChange = 3, // Mixer output config changed.
kOutputStopped = 4, // Mixer stopped playing out audio.
kDisconnected = 1, // Disconnected from mixer.
kUnderrun = 2, // Mixer output underrun.
kConfigChange = 3, // Mixer output config changed.
kOutputStopped = 4, // Mixer stopped playing out audio.
kSocketOverflow = 5, // Mixer couldn't send over the socket.
};
} // namespace media
......
......@@ -145,6 +145,7 @@ message StreamInterruption {
INTERRUPTED_UNDERRUN = 2; // Mixer output underrun.
INTERRUPTED_CONFIG_CHANGE = 3; // Mixer output config changed.
INTERRUPTED_OUTPUT_STOPPED = 4; // Mixer stopped playing out audio.
INTERRUPTED_SOCKET_OVERFLOW = 5; // Mixer couldn't send over the socket.
}
optional InterruptionReason reason = 1;
......
......@@ -144,28 +144,30 @@ void MixerSocket::PrepareAudioBuffer(net::IOBuffer* audio_buffer,
memset(ptr, 0, sizeof(int32_t));
}
void MixerSocket::SendAudioBuffer(scoped_refptr<net::IOBuffer> audio_buffer,
bool MixerSocket::SendAudioBuffer(scoped_refptr<net::IOBuffer> audio_buffer,
int filled_bytes,
int64_t timestamp) {
PrepareAudioBuffer(audio_buffer.get(), filled_bytes, timestamp);
SendPreparedAudioBuffer(std::move(audio_buffer));
return SendPreparedAudioBuffer(std::move(audio_buffer));
}
void MixerSocket::SendPreparedAudioBuffer(
bool MixerSocket::SendPreparedAudioBuffer(
scoped_refptr<net::IOBuffer> audio_buffer) {
uint16_t payload_size;
base::ReadBigEndian(audio_buffer->data(), &payload_size);
DCHECK_GE(payload_size, kAudioHeaderSize);
SendBuffer(std::move(audio_buffer), sizeof(uint16_t) + payload_size);
return SendBuffer(0, std::move(audio_buffer),
sizeof(uint16_t) + payload_size);
}
void MixerSocket::SendProto(const google::protobuf::MessageLite& message) {
int16_t type = static_cast<int16_t>(MessageType::kMetadata);
bool MixerSocket::SendProto(int type,
const google::protobuf::MessageLite& message) {
int16_t packet_type = static_cast<int16_t>(MessageType::kMetadata);
size_t message_size = message.ByteSizeLong();
int32_t padding_bytes = (4 - (message_size % 4)) % 4;
int total_size =
sizeof(type) + sizeof(padding_bytes) + message_size + padding_bytes;
int total_size = sizeof(packet_type) + sizeof(padding_bytes) + message_size +
padding_bytes;
scoped_refptr<net::IOBuffer> buffer;
char* ptr = (socket_ ? static_cast<char*>(socket_->PrepareSend(total_size))
......@@ -184,8 +186,8 @@ void MixerSocket::SendProto(const google::protobuf::MessageLite& message) {
ptr += sizeof(uint16_t);
}
base::WriteBigEndian(ptr, type);
ptr += sizeof(type);
base::WriteBigEndian(ptr, packet_type);
ptr += sizeof(packet_type);
base::WriteBigEndian(ptr, padding_bytes);
ptr += sizeof(padding_bytes);
message.SerializeToArray(ptr, message_size);
......@@ -194,36 +196,46 @@ void MixerSocket::SendProto(const google::protobuf::MessageLite& message) {
if (!buffer) {
socket_->Send();
return;
return true;
}
SendBuffer(std::move(buffer), sizeof(uint16_t) + total_size);
return SendBuffer(type, std::move(buffer), sizeof(uint16_t) + total_size);
}
void MixerSocket::SendBuffer(scoped_refptr<net::IOBuffer> buffer,
bool MixerSocket::SendBuffer(int type,
scoped_refptr<net::IOBuffer> buffer,
size_t buffer_size) {
if (counterpart_task_runner_) {
counterpart_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(base::IgnoreResult(&MixerSocket::OnMessageBuffer),
local_counterpart_, std::move(buffer), buffer_size));
return;
return true;
}
return SendBufferToSocket(type, std::move(buffer), buffer_size);
}
bool MixerSocket::SendBufferToSocket(int type,
scoped_refptr<net::IOBuffer> buffer,
size_t buffer_size) {
DCHECK(socket_);
if (!socket_->SendBuffer(buffer, buffer_size)) {
write_queue_.push(std::move(buffer));
if (type == 0) {
return false;
}
pending_writes_.insert_or_assign(type, std::move(buffer));
}
return true;
}
void MixerSocket::OnSendUnblocked() {
DCHECK(socket_);
while (!write_queue_.empty()) {
base::flat_map<int, scoped_refptr<net::IOBuffer>> pending;
pending_writes_.swap(pending);
for (auto& m : pending) {
uint16_t message_size;
base::ReadBigEndian(write_queue_.front()->data(), &message_size);
if (!socket_->SendBuffer(write_queue_.front().get(),
sizeof(uint16_t) + message_size)) {
return;
}
write_queue_.pop();
base::ReadBigEndian(m.second->data(), &message_size);
SendBufferToSocket(m.first, std::move(m.second),
sizeof(uint16_t) + message_size);
}
}
......@@ -245,18 +257,18 @@ void MixerSocket::OnEndOfStream() {
}
bool MixerSocket::OnMessage(char* data, size_t size) {
int16_t type;
if (size < sizeof(type)) {
int16_t packet_type;
if (size < sizeof(packet_type)) {
LOG(ERROR) << "Invalid message size " << size << " from " << this;
delegate_->OnConnectionError();
return false;
}
memcpy(&type, data, sizeof(type));
data += sizeof(type);
size -= sizeof(type);
memcpy(&packet_type, data, sizeof(packet_type));
data += sizeof(packet_type);
size -= sizeof(packet_type);
switch (static_cast<MessageType>(type)) {
switch (static_cast<MessageType>(packet_type)) {
case MessageType::kMetadata:
return ParseMetadata(data, size);
case MessageType::kAudio:
......
......@@ -7,8 +7,8 @@
#include <cstdint>
#include <memory>
#include <queue>
#include "base/containers/flat_map.h"
#include "base/macros.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
......@@ -105,24 +105,39 @@ class MixerSocket : public SmallMessageSocket::Delegate {
int filled_bytes,
int64_t timestamp);
// Prepares |audio_buffer| and then sends it across the connection.
void SendAudioBuffer(scoped_refptr<net::IOBuffer> audio_buffer,
// Prepares |audio_buffer| and then sends it across the connection. Returns
// |false| if the audio could not be sent.
bool SendAudioBuffer(scoped_refptr<net::IOBuffer> audio_buffer,
int filled_bytes,
int64_t timestamp);
// Sends |audio_buffer| across the connection. |audio_buffer| should have
// previously been prepared using PrepareAudioBuffer().
void SendPreparedAudioBuffer(scoped_refptr<net::IOBuffer> audio_buffer);
// Sends an arbitrary protobuf across the connection.
void SendProto(const google::protobuf::MessageLite& message);
// previously been prepared using PrepareAudioBuffer(). Returns |false| if the
// audio could not be sent.
bool SendPreparedAudioBuffer(scoped_refptr<net::IOBuffer> audio_buffer);
// Sends an arbitrary protobuf across the connection. |type| indicates the
// type of message; if the write cannot complete immediately, one message of
// each type will be stored for later sending; if a newer message is sent with
// the same type, then the previous message is overwritten. When writes become
// available again, the stored messages are written in order of |type| (lowest
// type first). Note that |type| is completely determined by the caller, and
// you can reuse the same type value for different messages as long as they
// are on different socket instances. A type of 0 means to never store the
// message. Returns |false| if the message was not sent or stored.
bool SendProto(int type, const google::protobuf::MessageLite& message);
// Resumes receiving messages. Delegate calls may be called synchronously
// from within this method.
void ReceiveMoreMessages();
private:
void SendBuffer(scoped_refptr<net::IOBuffer> buffer, size_t buffer_size);
bool SendBuffer(int type,
scoped_refptr<net::IOBuffer> buffer,
size_t buffer_size);
bool SendBufferToSocket(int type,
scoped_refptr<net::IOBuffer> buffer,
size_t buffer_size);
// SmallMessageSocket::Delegate implementation:
void OnSendUnblocked() override;
......@@ -142,7 +157,7 @@ class MixerSocket : public SmallMessageSocket::Delegate {
const std::unique_ptr<SmallMessageSocket> socket_;
scoped_refptr<IOBufferPool> buffer_pool_;
std::queue<scoped_refptr<net::IOBuffer>> write_queue_;
base::flat_map<int, scoped_refptr<net::IOBuffer>> pending_writes_;
base::WeakPtr<MixerSocket> local_counterpart_;
scoped_refptr<base::SequencedTaskRunner> counterpart_task_runner_;
......
......@@ -31,6 +31,16 @@ int GetFillSizeFrames(const OutputStreamParams& params) {
return params.sample_rate() / 100;
}
enum MessageTypes : int {
kInitial = 1,
kStartTimestamp,
kPlaybackRate,
kAudioClockRate,
kStreamVolume,
kPauseResume,
kEndOfStream,
};
} // namespace
OutputStreamConnection::OutputStreamConnection(Delegate* delegate,
......@@ -76,7 +86,12 @@ void OutputStreamConnection::SendAudioBuffer(
}
if (filled_frames == 0) {
// Send explicit end-of-stream message.
sent_eos_ = true;
Generic message;
message.mutable_eos_played_out();
socket_->SendProto(kEndOfStream, message);
return;
}
socket_->SendAudioBuffer(std::move(audio_buffer), filled_frames * frame_size_,
pts);
......@@ -87,7 +102,7 @@ void OutputStreamConnection::SetVolumeMultiplier(float multiplier) {
if (socket_) {
Generic message;
message.mutable_set_stream_volume()->set_volume(multiplier);
socket_->SendProto(message);
socket_->SendProto(kStreamVolume, message);
}
}
......@@ -100,7 +115,7 @@ void OutputStreamConnection::SetStartTimestamp(int64_t start_timestamp,
Generic message;
message.mutable_set_start_timestamp()->set_start_timestamp(start_timestamp);
message.mutable_set_start_timestamp()->set_start_pts(pts);
socket_->SendProto(message);
socket_->SendProto(kStartTimestamp, message);
}
}
......@@ -109,7 +124,7 @@ void OutputStreamConnection::SetPlaybackRate(float playback_rate) {
if (socket_) {
Generic message;
message.mutable_set_playback_rate()->set_playback_rate(playback_rate);
socket_->SendProto(message);
socket_->SendProto(kPlaybackRate, message);
}
}
......@@ -118,7 +133,7 @@ void OutputStreamConnection::SetAudioClockRate(double rate) {
if (socket_) {
Generic message;
message.mutable_set_audio_clock_rate()->set_rate(rate);
socket_->SendProto(message);
socket_->SendProto(kAudioClockRate, message);
}
}
......@@ -127,7 +142,7 @@ void OutputStreamConnection::Pause() {
if (socket_) {
Generic message;
message.mutable_set_paused()->set_paused(true);
socket_->SendProto(message);
socket_->SendProto(kPauseResume, message);
}
}
......@@ -136,7 +151,7 @@ void OutputStreamConnection::Resume() {
if (socket_) {
Generic message;
message.mutable_set_paused()->set_paused(false);
socket_->SendProto(message);
socket_->SendProto(kPauseResume, message);
}
}
......@@ -163,7 +178,7 @@ void OutputStreamConnection::OnConnected(std::unique_ptr<MixerSocket> socket) {
if (paused_) {
message.mutable_set_paused()->set_paused(true);
}
socket_->SendProto(message);
socket_->SendProto(kInitial, message);
delegate_->FillNextBuffer(
audio_buffer_->data() + MixerSocket::kAudioMessageHeaderSize,
fill_size_frames_, std::numeric_limits<int64_t>::min());
......
......@@ -21,7 +21,14 @@ namespace media {
namespace mixer_service {
namespace {
constexpr base::TimeDelta kInactivityTimeout = base::TimeDelta::FromSeconds(5);
enum MessageTypes : int {
kPushResult = 1,
kEndOfStream,
};
} // namespace
class ReceiverCma::UnusedSocket : public MixerSocket::Delegate {
......@@ -136,7 +143,7 @@ class ReceiverCma::Stream : public MixerSocket::Delegate,
message.set_next_playback_timestamp(next_playout_timestamp);
mixer_service::Generic generic;
*(generic.mutable_push_result()) = message;
socket_->SendProto(generic);
socket_->SendProto(kPushResult, generic);
last_send_time_ = base::TimeTicks::Now();
}
......@@ -148,7 +155,7 @@ class ReceiverCma::Stream : public MixerSocket::Delegate,
mixer_service::EosPlayedOut message;
mixer_service::Generic generic;
*generic.mutable_eos_played_out() = message;
socket_->SendProto(generic);
socket_->SendProto(kEndOfStream, generic);
last_send_time_ = base::TimeTicks::Now();
cma_audio_.reset();
......
......@@ -30,6 +30,11 @@ void FillPatterns(
}
}
enum MessageTypes : int {
kRedirectionRequest = 1,
kStreamMatchPatterns,
};
} // namespace
RedirectedAudioConnection::RedirectedAudioConnection(const Config& config,
......@@ -46,7 +51,7 @@ void RedirectedAudioConnection::SetStreamMatchPatterns(
if (socket_) {
Generic message;
FillPatterns(stream_match_patterns_, &message);
socket_->SendProto(message);
socket_->SendProto(kStreamMatchPatterns, message);
}
}
......@@ -75,7 +80,7 @@ void RedirectedAudioConnection::OnConnected(
if (!stream_match_patterns_.empty()) {
FillPatterns(stream_match_patterns_, &message);
}
socket_->SendProto(message);
socket_->SendProto(kRedirectionRequest, message);
}
void RedirectedAudioConnection::OnConnectionError() {
......
......@@ -51,6 +51,10 @@ constexpr int kAudioMessageHeaderSize =
return DecoderConfigAdapter::ToMediaChannelLayout(layout);
}
enum MessageTypes : int {
kStreamConfig = 1,
};
} // namespace
class AudioOutputRedirector::RedirectionConnection
......@@ -82,7 +86,7 @@ class AudioOutputRedirector::RedirectionConnection
config->set_sample_rate(sample_rate);
config->set_num_channels(num_channels);
config->set_data_size(data_size);
socket_->SendProto(message);
socket_->SendProto(kStreamConfig, message);
sent_stream_config_ = true;
}
......
......@@ -51,6 +51,14 @@ constexpr int kAudioMessageHeaderSize =
constexpr int kRateShifterOutputFrames = 4096;
enum MessageTypes : int {
kReadyForPlayback = 1,
kPushResult,
kEndOfStream,
kUnderrun,
kError,
};
int64_t SamplesToMicroseconds(double samples, int sample_rate) {
return std::round(samples * 1000000 / sample_rate);
}
......@@ -270,6 +278,10 @@ bool MixerInputConnection::HandleMetadata(
if (message.has_set_paused()) {
SetPaused(message.set_paused().paused());
}
if (message.has_eos_played_out()) {
// Explicit EOS.
HandleAudioData(nullptr, 0, INT64_MIN);
}
return true;
}
......@@ -567,7 +579,7 @@ void MixerInputConnection::WritePcm(scoped_refptr<net::IOBuffer> data) {
mixer_service::Generic message;
message.mutable_push_result()->set_next_playback_timestamp(
next_playback_timestamp);
socket_->SendProto(message);
socket_->SendProto(kPushResult, message);
}
}
......@@ -1022,14 +1034,14 @@ void MixerInputConnection::PostPcmCompletion() {
base::AutoLock lock(lock_);
push_result->set_next_playback_timestamp(next_playback_timestamp_);
}
socket_->SendProto(message);
socket_->SendProto(kPushResult, message);
}
void MixerInputConnection::PostEos() {
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
mixer_service::Generic message;
message.mutable_eos_played_out();
socket_->SendProto(message);
socket_->SendProto(kEndOfStream, message);
}
void MixerInputConnection::PostAudioReadyForPlayback() {
......@@ -1047,7 +1059,7 @@ void MixerInputConnection::PostAudioReadyForPlayback() {
ready_for_playback->set_delay_microseconds(
mixer_rendering_delay_.delay_microseconds);
}
socket_->SendProto(message);
socket_->SendProto(kReadyForPlayback, message);
audio_ready_for_playback_fired_ = true;
}
......@@ -1056,7 +1068,7 @@ void MixerInputConnection::PostStreamUnderrun() {
mixer_service::Generic message;
message.mutable_mixer_underrun()->set_type(
mixer_service::MixerUnderrun::INPUT_UNDERRUN);
socket_->SendProto(message);
socket_->SendProto(kUnderrun, message);
}
void MixerInputConnection::PostOutputUnderrun() {
......@@ -1064,7 +1076,7 @@ void MixerInputConnection::PostOutputUnderrun() {
mixer_service::Generic message;
message.mutable_mixer_underrun()->set_type(
mixer_service::MixerUnderrun::OUTPUT_UNDERRUN);
socket_->SendProto(message);
socket_->SendProto(kUnderrun, message);
}
void MixerInputConnection::OnAudioPlaybackError(MixerError error) {
......@@ -1094,7 +1106,7 @@ void MixerInputConnection::PostError(MixerError error) {
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
mixer_service::Generic message;
message.mutable_error()->set_type(mixer_service::Error::INVALID_STREAM_ERROR);
socket_->SendProto(message);
socket_->SendProto(kError, message);
OnConnectionError();
}
......
......@@ -15,6 +15,15 @@
namespace chromecast {
namespace media {
namespace {
enum MessageTypes : int {
kStreamConfig = 1,
kInterrupt,
};
} // namespace
MixerLoopbackConnection::MixerLoopbackConnection(
std::unique_ptr<mixer_service::MixerSocket> socket)
: socket_(std::move(socket)) {
......@@ -25,6 +34,11 @@ MixerLoopbackConnection::MixerLoopbackConnection(
MixerLoopbackConnection::~MixerLoopbackConnection() = default;
void MixerLoopbackConnection::SetErrorCallback(base::OnceClosure callback) {
if (pending_error_) {
pending_error_ = false;
std::move(callback).Run();
return;
}
error_callback_ = std::move(callback);
}
......@@ -38,7 +52,7 @@ void MixerLoopbackConnection::SetStreamConfig(SampleFormat sample_format,
config->set_sample_rate(sample_rate);
config->set_num_channels(num_channels);
config->set_data_size(data_size);
socket_->SendProto(message);
socket_->SendProto(kStreamConfig, message);
sent_stream_config_ = true;
}
......@@ -48,7 +62,10 @@ void MixerLoopbackConnection::SendAudio(
int data_size_bytes,
int64_t timestamp) {
DCHECK(sent_stream_config_);
socket_->SendAudioBuffer(std::move(audio_buffer), data_size_bytes, timestamp);
if (!socket_->SendAudioBuffer(std::move(audio_buffer), data_size_bytes,
timestamp)) {
SendInterrupt(LoopbackInterruptReason::kSocketOverflow);
}
}
void MixerLoopbackConnection::SendInterrupt(LoopbackInterruptReason reason) {
......@@ -58,7 +75,7 @@ void MixerLoopbackConnection::SendInterrupt(LoopbackInterruptReason reason) {
interrupt->set_reason(
static_cast<mixer_service::StreamInterruption::InterruptionReason>(
reason));
socket_->SendProto(message);
socket_->SendProto(kInterrupt, message);
}
bool MixerLoopbackConnection::HandleMetadata(
......@@ -75,7 +92,9 @@ bool MixerLoopbackConnection::HandleAudioData(char* data,
void MixerLoopbackConnection::OnConnectionError() {
if (error_callback_) {
std::move(error_callback_).Run();
return;
}
pending_error_ = true;
}
} // namespace media
......
......@@ -55,6 +55,7 @@ class MixerLoopbackConnection : public mixer_service::MixerSocket::Delegate {
base::OnceClosure error_callback_;
bool pending_error_ = false;
bool sent_stream_config_ = false;
DISALLOW_COPY_AND_ASSIGN(MixerLoopbackConnection);
......
......@@ -21,6 +21,15 @@
namespace chromecast {
namespace media {
namespace {
enum MessageTypes : int {
kStreamCounts = 1,
kPostProcessorList,
};
} // namespace
class MixerServiceReceiver::ControlConnection
: public mixer_service::MixerSocket::Delegate {
public:
......@@ -45,7 +54,7 @@ class MixerServiceReceiver::ControlConnection
auto* counts = message.mutable_stream_count();
counts->set_primary(receiver_->primary_stream_count_);
counts->set_sfx(receiver_->sfx_stream_count_);
socket_->SendProto(message);
socket_->SendProto(kStreamCounts, message);
}
private:
......@@ -109,7 +118,7 @@ class MixerServiceReceiver::ControlConnection
for (const auto& library_pair : PostProcessorRegistry::Get()->Libraries()) {
postprocessor_list->add_postprocessors(library_pair.first);
}
socket_->SendProto(message);
socket_->SendProto(kPostProcessorList, message);
}
void OnConnectionError() override {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment