Commit dd90480c authored by Ken MacKay's avatar Ken MacKay Committed by Chromium LUCI CQ

[Chromecast] Use custom thread for mixer

This saves ~60 microseconds per posted task (~90% of posted task
overhead).

Bug: internal b/167285412
Change-Id: I1b644c0d5a68799e0852768078841831b6d437b3
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2599583
Commit-Queue: Kenneth MacKay <kmackay@chromium.org>
Reviewed-by: default avatarYuchen Liu <yucliu@chromium.org>
Cr-Commit-Position: refs/heads/master@{#839019}
parent c8fb7466
...@@ -62,7 +62,7 @@ class AudioOutputRedirector::RedirectionConnection ...@@ -62,7 +62,7 @@ class AudioOutputRedirector::RedirectionConnection
public: public:
explicit RedirectionConnection( explicit RedirectionConnection(
std::unique_ptr<mixer_service::MixerSocket> socket, std::unique_ptr<mixer_service::MixerSocket> socket,
scoped_refptr<base::SequencedTaskRunner> mixer_task_runner, scoped_refptr<base::TaskRunner> mixer_task_runner,
base::WeakPtr<AudioOutputRedirector> redirector) base::WeakPtr<AudioOutputRedirector> redirector)
: socket_(std::move(socket)), : socket_(std::move(socket)),
mixer_task_runner_(std::move(mixer_task_runner)), mixer_task_runner_(std::move(mixer_task_runner)),
...@@ -135,7 +135,7 @@ class AudioOutputRedirector::RedirectionConnection ...@@ -135,7 +135,7 @@ class AudioOutputRedirector::RedirectionConnection
} }
const std::unique_ptr<mixer_service::MixerSocket> socket_; const std::unique_ptr<mixer_service::MixerSocket> socket_;
const scoped_refptr<base::SequencedTaskRunner> mixer_task_runner_; const scoped_refptr<base::TaskRunner> mixer_task_runner_;
const base::WeakPtr<AudioOutputRedirector> redirector_; const base::WeakPtr<AudioOutputRedirector> redirector_;
bool error_ = false; bool error_ = false;
......
...@@ -19,6 +19,9 @@ ...@@ -19,6 +19,9 @@
#include "base/message_loop/message_pump_type.h" #include "base/message_loop/message_pump_type.h"
#include "base/numerics/ranges.h" #include "base/numerics/ranges.h"
#include "base/single_thread_task_runner.h" #include "base/single_thread_task_runner.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"
#include "base/threading/thread_task_runner_handle.h" #include "base/threading/thread_task_runner_handle.h"
#include "build/build_config.h" #include "build/build_config.h"
#include "chromecast/base/chromecast_switches.h" #include "chromecast/base/chromecast_switches.h"
...@@ -48,12 +51,6 @@ ...@@ -48,12 +51,6 @@
base::Unretained(this), ##__VA_ARGS__)); \ base::Unretained(this), ##__VA_ARGS__)); \
} while (0) } while (0)
#define MAKE_SURE_MIXER_THREAD(method, ...) \
if (!mixer_task_runner_->RunsTasksInCurrentSequence()) { \
RUN_ON_MIXER_THREAD(method, ##__VA_ARGS__); \
return; \
}
namespace chromecast { namespace chromecast {
namespace media { namespace media {
...@@ -154,25 +151,102 @@ class StreamMixer::ExternalMediaVolumeChangeRequestObserver ...@@ -154,25 +151,102 @@ class StreamMixer::ExternalMediaVolumeChangeRequestObserver
StreamMixer* const mixer_; StreamMixer* const mixer_;
}; };
class StreamMixer::MixerThread : public base::PlatformThread::Delegate,
public base::TaskRunner {
public:
MixerThread() : cond_(&lock_) {
tasks_.reserve(64);
swapped_tasks_.reserve(64);
CHECK(base::PlatformThread::CreateWithPriority(
256 * 1024, this, &thread_, base::ThreadPriority::REALTIME_AUDIO));
}
void Stop() {
{
base::AutoLock lock(lock_);
if (stopped_) {
return;
}
stopped_ = true;
}
cond_.Signal();
base::PlatformThread::Join(thread_);
}
// base::TaskRunner implementation:
bool PostDelayedTask(const base::Location& from_here,
base::OnceClosure task,
base::TimeDelta delay) override {
// Delay is ignored.
DCHECK_EQ(delay, base::TimeDelta());
{
base::AutoLock lock(lock_);
if (stopped_) {
return false;
}
tasks_.push_back(std::move(task));
}
cond_.Signal();
return true;
}
private:
friend class RefCountedThreadSafe<MixerThread>;
~MixerThread() override { Stop(); }
// base::PlatformThread::Delegate implementation:
void ThreadMain() override {
base::PlatformThread::SetName("CMA mixer");
UseHighPriority();
base::AutoLock lock(lock_);
for (;;) {
swapped_tasks_.swap(tasks_);
if (swapped_tasks_.empty()) {
if (stopped_) {
return;
}
cond_.Wait();
} else {
base::AutoUnlock unlock(lock_);
for (auto& task : swapped_tasks_) {
std::move(task).Run();
}
swapped_tasks_.clear();
}
}
}
base::Lock lock_;
base::ConditionVariable cond_;
bool stopped_ GUARDED_BY(lock_) = false;
std::vector<base::OnceClosure> tasks_ GUARDED_BY(lock_);
// Only used on the mixer thread.
std::vector<base::OnceClosure> swapped_tasks_;
base::PlatformThreadHandle thread_;
};
StreamMixer::StreamMixer( StreamMixer::StreamMixer(
scoped_refptr<base::SequencedTaskRunner> io_task_runner) scoped_refptr<base::SequencedTaskRunner> io_task_runner)
: StreamMixer(nullptr, : StreamMixer(nullptr,
std::make_unique<base::Thread>("CMA mixer"),
nullptr, nullptr,
"", "",
std::move(io_task_runner)) {} std::move(io_task_runner)) {}
StreamMixer::StreamMixer( StreamMixer::StreamMixer(
std::unique_ptr<MixerOutputStream> output, std::unique_ptr<MixerOutputStream> output,
std::unique_ptr<base::Thread> mixer_thread, scoped_refptr<base::SequencedTaskRunner> mixer_task_runner,
scoped_refptr<base::SingleThreadTaskRunner> mixer_task_runner,
const std::string& pipeline_json, const std::string& pipeline_json,
scoped_refptr<base::SequencedTaskRunner> io_task_runner) scoped_refptr<base::SequencedTaskRunner> io_task_runner)
: output_(std::move(output)), : output_(std::move(output)),
post_processing_pipeline_factory_( post_processing_pipeline_factory_(
std::make_unique<PostProcessingPipelineFactoryImpl>()), std::make_unique<PostProcessingPipelineFactoryImpl>()),
mixer_thread_(std::move(mixer_thread)), mixer_task_runner_(mixer_task_runner),
mixer_task_runner_(std::move(mixer_task_runner)),
io_task_runner_(std::move(io_task_runner)), io_task_runner_(std::move(io_task_runner)),
enable_dynamic_channel_count_( enable_dynamic_channel_count_(
GetSwitchValueBoolean(switches::kMixerEnableDynamicChannelCount, GetSwitchValueBoolean(switches::kMixerEnableDynamicChannelCount,
...@@ -192,23 +266,21 @@ StreamMixer::StreamMixer( ...@@ -192,23 +266,21 @@ StreamMixer::StreamMixer(
ExternalAudioPipelineShlib::IsSupported()), ExternalAudioPipelineShlib::IsSupported()),
weak_factory_(this) { weak_factory_(this) {
LOG(INFO) << __func__; LOG(INFO) << __func__;
DETACH_FROM_SEQUENCE(mixer_sequence_checker_);
logging::InitializeAudioLog(); logging::InitializeAudioLog();
volume_info_[AudioContentType::kOther].volume = 1.0f; volume_info_[AudioContentType::kOther].volume = 1.0f;
volume_info_[AudioContentType::kOther].limit = 1.0f; volume_info_[AudioContentType::kOther].limit = 1.0f;
volume_info_[AudioContentType::kOther].muted = false; volume_info_[AudioContentType::kOther].muted = false;
if (mixer_thread_) { if (mixer_task_runner_) {
base::Thread::Options options; // Test mode.
options.priority = base::ThreadPriority::REALTIME_AUDIO; if (!io_task_runner_) {
#if defined(OS_FUCHSIA) io_task_runner_ = mixer_task_runner;
// MixerOutputStreamFuchsia uses FIDL, which works only on IO threads. }
options.message_pump_type = base::MessagePumpType::IO; } else {
#endif mixer_thread_ = base::MakeRefCounted<MixerThread>();
options.stack_size = 512 * 1024; mixer_task_runner_ = mixer_thread_;
mixer_thread_->StartWithOptions(options);
mixer_task_runner_ = mixer_thread_->task_runner();
mixer_task_runner_->PostTask(FROM_HERE, base::BindOnce(&UseHighPriority));
if (!io_task_runner_) { if (!io_task_runner_) {
io_task_runner_ = AudioIoThread::Get()->task_runner(); io_task_runner_ = AudioIoThread::Get()->task_runner();
...@@ -220,8 +292,6 @@ StreamMixer::StreamMixer( ...@@ -220,8 +292,6 @@ StreamMixer::StreamMixer(
base::BindRepeating(&StreamMixer::OnHealthCheckFailed, base::BindRepeating(&StreamMixer::OnHealthCheckFailed,
base::Unretained(this))); base::Unretained(this)));
LOG(INFO) << "Mixer health checker started"; LOG(INFO) << "Mixer health checker started";
} else if (!io_task_runner_) {
io_task_runner_ = mixer_task_runner_;
} }
if (fixed_output_sample_rate_ != MixerOutputStream::kInvalidSampleRate) { if (fixed_output_sample_rate_ != MixerOutputStream::kInvalidSampleRate) {
...@@ -261,7 +331,7 @@ void StreamMixer::ResetPostProcessors(CastMediaShlib::ResultCallback callback) { ...@@ -261,7 +331,7 @@ void StreamMixer::ResetPostProcessors(CastMediaShlib::ResultCallback callback) {
void StreamMixer::ResetPostProcessorsOnThread( void StreamMixer::ResetPostProcessorsOnThread(
CastMediaShlib::ResultCallback callback, CastMediaShlib::ResultCallback callback,
const std::string& override_config) { const std::string& override_config) {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
// Detach inputs. // Detach inputs.
for (const auto& input : inputs_) { for (const auto& input : inputs_) {
...@@ -350,7 +420,7 @@ void StreamMixer::CreatePostProcessors(CastMediaShlib::ResultCallback callback, ...@@ -350,7 +420,7 @@ void StreamMixer::CreatePostProcessors(CastMediaShlib::ResultCallback callback,
void StreamMixer::ResetPostProcessorsForTest( void StreamMixer::ResetPostProcessorsForTest(
std::unique_ptr<PostProcessingPipelineFactory> pipeline_factory, std::unique_ptr<PostProcessingPipelineFactory> pipeline_factory,
const std::string& pipeline_json) { const std::string& pipeline_json) {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
LOG(INFO) << __FUNCTION__ << " disregard previous PostProcessor messages."; LOG(INFO) << __FUNCTION__ << " disregard previous PostProcessor messages.";
mixer_pipeline_.reset(); mixer_pipeline_.reset();
post_processing_pipeline_factory_ = std::move(pipeline_factory); post_processing_pipeline_factory_ = std::move(pipeline_factory);
...@@ -358,12 +428,12 @@ void StreamMixer::ResetPostProcessorsForTest( ...@@ -358,12 +428,12 @@ void StreamMixer::ResetPostProcessorsForTest(
} }
void StreamMixer::SetNumOutputChannelsForTest(int num_output_channels) { void StreamMixer::SetNumOutputChannelsForTest(int num_output_channels) {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
fixed_num_output_channels_ = num_output_channels; fixed_num_output_channels_ = num_output_channels;
} }
void StreamMixer::EnableDynamicChannelCountForTest(bool enable) { void StreamMixer::EnableDynamicChannelCountForTest(bool enable) {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
enable_dynamic_channel_count_ = enable; enable_dynamic_channel_count_ = enable;
} }
...@@ -390,7 +460,7 @@ StreamMixer::~StreamMixer() { ...@@ -390,7 +460,7 @@ StreamMixer::~StreamMixer() {
} }
void StreamMixer::FinalizeOnMixerThread() { void StreamMixer::FinalizeOnMixerThread() {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
Stop(LoopbackInterruptReason::kOutputStopped); Stop(LoopbackInterruptReason::kOutputStopped);
inputs_.clear(); inputs_.clear();
...@@ -414,7 +484,7 @@ void StreamMixer::SetNumOutputChannelsOnThread(int num_channels) { ...@@ -414,7 +484,7 @@ void StreamMixer::SetNumOutputChannelsOnThread(int num_channels) {
void StreamMixer::Start() { void StreamMixer::Start() {
AUDIO_LOG(INFO) << __func__ << " with " << inputs_.size() << " active inputs"; AUDIO_LOG(INFO) << __func__ << " with " << inputs_.size() << " active inputs";
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
DCHECK(state_ == kStateStopped); DCHECK(state_ == kStateStopped);
// Detach inputs. // Detach inputs.
...@@ -539,7 +609,7 @@ void StreamMixer::Start() { ...@@ -539,7 +609,7 @@ void StreamMixer::Start() {
void StreamMixer::Stop(LoopbackInterruptReason reason) { void StreamMixer::Stop(LoopbackInterruptReason reason) {
AUDIO_LOG(INFO) << __func__; AUDIO_LOG(INFO) << __func__;
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
weak_factory_.InvalidateWeakPtrs(); weak_factory_.InvalidateWeakPtrs();
loopback_handler_->SendInterrupt(reason); loopback_handler_->SendInterrupt(reason);
...@@ -554,7 +624,7 @@ void StreamMixer::Stop(LoopbackInterruptReason reason) { ...@@ -554,7 +624,7 @@ void StreamMixer::Stop(LoopbackInterruptReason reason) {
void StreamMixer::CheckChangeOutputParams(int num_input_channels, void StreamMixer::CheckChangeOutputParams(int num_input_channels,
int input_samples_per_second) { int input_samples_per_second) {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
if (state_ != kStateRunning) { if (state_ != kStateRunning) {
return; return;
} }
...@@ -597,7 +667,7 @@ void StreamMixer::SignalError(MixerInput::Source::MixerError error) { ...@@ -597,7 +667,7 @@ void StreamMixer::SignalError(MixerInput::Source::MixerError error) {
} }
inputs_.clear(); inputs_.clear();
SetCloseTimeout(); SetCloseTimeout();
UpdateStreamCounts(); UpdateStreamCountsOnThread();
} }
int StreamMixer::GetEffectiveChannelCount(MixerInput::Source* input_source) { int StreamMixer::GetEffectiveChannelCount(MixerInput::Source* input_source) {
...@@ -615,7 +685,11 @@ int StreamMixer::GetEffectiveChannelCount(MixerInput::Source* input_source) { ...@@ -615,7 +685,11 @@ int StreamMixer::GetEffectiveChannelCount(MixerInput::Source* input_source) {
} }
void StreamMixer::AddInput(MixerInput::Source* input_source) { void StreamMixer::AddInput(MixerInput::Source* input_source) {
MAKE_SURE_MIXER_THREAD(AddInput, input_source); RUN_ON_MIXER_THREAD(AddInputOnThread, input_source);
}
void StreamMixer::AddInputOnThread(MixerInput::Source* input_source) {
DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
DCHECK(input_source); DCHECK(input_source);
// If the new input is a primary one (or there were no inputs previously), we // If the new input is a primary one (or there were no inputs previously), we
...@@ -668,7 +742,7 @@ void StreamMixer::AddInput(MixerInput::Source* input_source) { ...@@ -668,7 +742,7 @@ void StreamMixer::AddInput(MixerInput::Source* input_source) {
inputs_[input_source] = std::move(input); inputs_[input_source] = std::move(input);
UpdatePlayoutChannel(); UpdatePlayoutChannel();
UpdateStreamCounts(); UpdateStreamCountsOnThread();
} }
void StreamMixer::RemoveInput(MixerInput::Source* input_source) { void StreamMixer::RemoveInput(MixerInput::Source* input_source) {
...@@ -677,7 +751,7 @@ void StreamMixer::RemoveInput(MixerInput::Source* input_source) { ...@@ -677,7 +751,7 @@ void StreamMixer::RemoveInput(MixerInput::Source* input_source) {
} }
void StreamMixer::RemoveInputOnThread(MixerInput::Source* input_source) { void StreamMixer::RemoveInputOnThread(MixerInput::Source* input_source) {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
DCHECK(input_source); DCHECK(input_source);
AUDIO_LOG(INFO) << "Remove input " << input_source; AUDIO_LOG(INFO) << "Remove input " << input_source;
...@@ -692,7 +766,7 @@ void StreamMixer::RemoveInputOnThread(MixerInput::Source* input_source) { ...@@ -692,7 +766,7 @@ void StreamMixer::RemoveInputOnThread(MixerInput::Source* input_source) {
ignored_inputs_.erase(input_source); ignored_inputs_.erase(input_source);
UpdatePlayoutChannel(); UpdatePlayoutChannel();
UpdateStreamCounts(); UpdateStreamCountsOnThread();
if (inputs_.empty()) { if (inputs_.empty()) {
SetCloseTimeout(); SetCloseTimeout();
...@@ -706,7 +780,7 @@ void StreamMixer::SetCloseTimeout() { ...@@ -706,7 +780,7 @@ void StreamMixer::SetCloseTimeout() {
} }
void StreamMixer::UpdatePlayoutChannel() { void StreamMixer::UpdatePlayoutChannel() {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
int playout_channel; int playout_channel;
if (inputs_.empty()) { if (inputs_.empty()) {
...@@ -732,8 +806,11 @@ void StreamMixer::UpdatePlayoutChannel() { ...@@ -732,8 +806,11 @@ void StreamMixer::UpdatePlayoutChannel() {
} }
void StreamMixer::UpdateStreamCounts() { void StreamMixer::UpdateStreamCounts() {
MAKE_SURE_MIXER_THREAD(UpdateStreamCounts); RUN_ON_MIXER_THREAD(UpdateStreamCountsOnThread);
}
void StreamMixer::UpdateStreamCountsOnThread() {
DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
int primary = 0; int primary = 0;
int sfx = 0; int sfx = 0;
for (const auto& it : inputs_) { for (const auto& it : inputs_) {
...@@ -755,7 +832,7 @@ void StreamMixer::UpdateStreamCounts() { ...@@ -755,7 +832,7 @@ void StreamMixer::UpdateStreamCounts() {
MediaPipelineBackend::AudioDecoder::RenderingDelay MediaPipelineBackend::AudioDecoder::RenderingDelay
StreamMixer::GetTotalRenderingDelay(FilterGroup* filter_group) { StreamMixer::GetTotalRenderingDelay(FilterGroup* filter_group) {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
if (!output_) { if (!output_) {
return MediaPipelineBackend::AudioDecoder::RenderingDelay(); return MediaPipelineBackend::AudioDecoder::RenderingDelay();
} }
...@@ -768,7 +845,7 @@ StreamMixer::GetTotalRenderingDelay(FilterGroup* filter_group) { ...@@ -768,7 +845,7 @@ StreamMixer::GetTotalRenderingDelay(FilterGroup* filter_group) {
} }
void StreamMixer::PlaybackLoop() { void StreamMixer::PlaybackLoop() {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
if (inputs_.empty() && base::TimeTicks::Now() >= close_timestamp_ && if (inputs_.empty() && base::TimeTicks::Now() >= close_timestamp_ &&
!mixer_pipeline_->IsRinging()) { !mixer_pipeline_->IsRinging()) {
AUDIO_LOG(INFO) << "Close timeout"; AUDIO_LOG(INFO) << "Close timeout";
...@@ -777,7 +854,7 @@ void StreamMixer::PlaybackLoop() { ...@@ -777,7 +854,7 @@ void StreamMixer::PlaybackLoop() {
} }
WriteOneBuffer(); WriteOneBuffer();
UpdateStreamCounts(); UpdateStreamCountsOnThread();
mixer_task_runner_->PostTask(FROM_HERE, playback_loop_task_); mixer_task_runner_->PostTask(FROM_HERE, playback_loop_task_);
} }
...@@ -810,7 +887,7 @@ void StreamMixer::WriteOneBuffer() { ...@@ -810,7 +887,7 @@ void StreamMixer::WriteOneBuffer() {
} }
void StreamMixer::WriteMixedPcm(int frames, int64_t expected_playback_time) { void StreamMixer::WriteMixedPcm(int frames, int64_t expected_playback_time) {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
int loopback_channel_count = loopback_channel_mixer_->output_channel_count(); int loopback_channel_count = loopback_channel_mixer_->output_channel_count();
float* loopback_data = loopback_channel_mixer_->Transform( float* loopback_data = loopback_channel_mixer_->Transform(
...@@ -849,7 +926,12 @@ void StreamMixer::WriteMixedPcm(int frames, int64_t expected_playback_time) { ...@@ -849,7 +926,12 @@ void StreamMixer::WriteMixedPcm(int frames, int64_t expected_playback_time) {
void StreamMixer::AddAudioOutputRedirector( void StreamMixer::AddAudioOutputRedirector(
std::unique_ptr<AudioOutputRedirector> redirector) { std::unique_ptr<AudioOutputRedirector> redirector) {
MAKE_SURE_MIXER_THREAD(AddAudioOutputRedirector, std::move(redirector)); RUN_ON_MIXER_THREAD(AddAudioOutputRedirectorOnThread, std::move(redirector));
}
void StreamMixer::AddAudioOutputRedirectorOnThread(
std::unique_ptr<AudioOutputRedirector> redirector) {
DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
AUDIO_LOG(INFO) << __func__; AUDIO_LOG(INFO) << __func__;
DCHECK(redirector); DCHECK(redirector);
...@@ -873,13 +955,17 @@ void StreamMixer::RemoveAudioOutputRedirector( ...@@ -873,13 +955,17 @@ void StreamMixer::RemoveAudioOutputRedirector(
void StreamMixer::RemoveAudioOutputRedirectorOnThread( void StreamMixer::RemoveAudioOutputRedirectorOnThread(
AudioOutputRedirector* redirector) { AudioOutputRedirector* redirector) {
DCHECK(mixer_task_runner_->BelongsToCurrentThread()); DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
AUDIO_LOG(INFO) << __func__; AUDIO_LOG(INFO) << __func__;
audio_output_redirectors_.erase(redirector); audio_output_redirectors_.erase(redirector);
} }
void StreamMixer::SetVolume(AudioContentType type, float level) { void StreamMixer::SetVolume(AudioContentType type, float level) {
MAKE_SURE_MIXER_THREAD(SetVolume, type, level); RUN_ON_MIXER_THREAD(SetVolumeOnThread, type, level);
}
void StreamMixer::SetVolumeOnThread(AudioContentType type, float level) {
DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
DCHECK(type != AudioContentType::kOther); DCHECK(type != AudioContentType::kOther);
volume_info_[type].volume = level; volume_info_[type].volume = level;
...@@ -892,11 +978,15 @@ void StreamMixer::SetVolume(AudioContentType type, float level) { ...@@ -892,11 +978,15 @@ void StreamMixer::SetVolume(AudioContentType type, float level) {
ExternalAudioPipelineShlib::SetExternalMediaVolume( ExternalAudioPipelineShlib::SetExternalMediaVolume(
std::min(level, volume_info_[type].limit)); std::min(level, volume_info_[type].limit));
} }
UpdateStreamCounts(); UpdateStreamCountsOnThread();
} }
void StreamMixer::SetMuted(AudioContentType type, bool muted) { void StreamMixer::SetMuted(AudioContentType type, bool muted) {
MAKE_SURE_MIXER_THREAD(SetMuted, type, muted); RUN_ON_MIXER_THREAD(SetMutedOnThread, type, muted);
}
void StreamMixer::SetMutedOnThread(AudioContentType type, bool muted) {
DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
DCHECK(type != AudioContentType::kOther); DCHECK(type != AudioContentType::kOther);
volume_info_[type].muted = muted; volume_info_[type].muted = muted;
...@@ -908,11 +998,15 @@ void StreamMixer::SetMuted(AudioContentType type, bool muted) { ...@@ -908,11 +998,15 @@ void StreamMixer::SetMuted(AudioContentType type, bool muted) {
if (external_audio_pipeline_supported_ && type == AudioContentType::kMedia) { if (external_audio_pipeline_supported_ && type == AudioContentType::kMedia) {
ExternalAudioPipelineShlib::SetExternalMediaMuted(muted); ExternalAudioPipelineShlib::SetExternalMediaMuted(muted);
} }
UpdateStreamCounts(); UpdateStreamCountsOnThread();
} }
void StreamMixer::SetOutputLimit(AudioContentType type, float limit) { void StreamMixer::SetOutputLimit(AudioContentType type, float limit) {
MAKE_SURE_MIXER_THREAD(SetOutputLimit, type, limit); RUN_ON_MIXER_THREAD(SetOutputLimitOnThread, type, limit);
}
void StreamMixer::SetOutputLimitOnThread(AudioContentType type, float limit) {
DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
DCHECK(type != AudioContentType::kOther); DCHECK(type != AudioContentType::kOther);
AUDIO_LOG(INFO) << "Set volume limit for " << type << " to " << limit; AUDIO_LOG(INFO) << "Set volume limit for " << type << " to " << limit;
...@@ -935,24 +1029,32 @@ void StreamMixer::SetOutputLimit(AudioContentType type, float limit) { ...@@ -935,24 +1029,32 @@ void StreamMixer::SetOutputLimit(AudioContentType type, float limit) {
ExternalAudioPipelineShlib::SetExternalMediaVolume( ExternalAudioPipelineShlib::SetExternalMediaVolume(
std::min(volume_info_[type].volume, limit)); std::min(volume_info_[type].volume, limit));
} }
UpdateStreamCounts(); UpdateStreamCountsOnThread();
} }
void StreamMixer::SetVolumeMultiplier(MixerInput::Source* source, void StreamMixer::SetVolumeMultiplier(MixerInput::Source* source,
float multiplier) { float multiplier) {
MAKE_SURE_MIXER_THREAD(SetVolumeMultiplier, source, multiplier); RUN_ON_MIXER_THREAD(SetVolumeMultiplierOnThread, source, multiplier);
}
void StreamMixer::SetVolumeMultiplierOnThread(MixerInput::Source* source,
float multiplier) {
DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
auto it = inputs_.find(source); auto it = inputs_.find(source);
if (it != inputs_.end()) { if (it != inputs_.end()) {
it->second->SetVolumeMultiplier(multiplier); it->second->SetVolumeMultiplier(multiplier);
} }
UpdateStreamCounts(); UpdateStreamCountsOnThread();
} }
void StreamMixer::SetPostProcessorConfig(std::string name, std::string config) { void StreamMixer::SetPostProcessorConfig(std::string name, std::string config) {
MAKE_SURE_MIXER_THREAD(SetPostProcessorConfig, std::move(name), RUN_ON_MIXER_THREAD(SetPostProcessorConfigOnThread, std::move(name),
std::move(config)); std::move(config));
}
void StreamMixer::SetPostProcessorConfigOnThread(std::string name,
std::string config) {
DCHECK_CALLED_ON_VALID_SEQUENCE(mixer_sequence_checker_);
mixer_pipeline_->SetPostProcessorConfig(name, config); mixer_pipeline_->SetPostProcessorConfig(name, config);
} }
......
...@@ -18,10 +18,10 @@ ...@@ -18,10 +18,10 @@
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "base/sequence_checker.h"
#include "base/sequenced_task_runner.h" #include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h" #include "base/task_runner.h"
#include "base/threading/sequence_bound.h" #include "base/threading/sequence_bound.h"
#include "base/threading/thread.h"
#include "base/time/time.h" #include "base/time/time.h"
#include "chromecast/media/cma/backend/mixer/mixer_input.h" #include "chromecast/media/cma/backend/mixer/mixer_input.h"
#include "chromecast/media/cma/backend/mixer/mixer_pipeline.h" #include "chromecast/media/cma/backend/mixer/mixer_pipeline.h"
...@@ -73,7 +73,7 @@ class StreamMixer { ...@@ -73,7 +73,7 @@ class StreamMixer {
int num_output_channels() const { return num_output_channels_; } int num_output_channels() const { return num_output_channels_; }
scoped_refptr<base::SingleThreadTaskRunner> task_runner() const { scoped_refptr<base::TaskRunner> task_runner() const {
return mixer_task_runner_; return mixer_task_runner_;
} }
...@@ -120,8 +120,7 @@ class StreamMixer { ...@@ -120,8 +120,7 @@ class StreamMixer {
// Test-only methods. // Test-only methods.
StreamMixer( StreamMixer(
std::unique_ptr<MixerOutputStream> output, std::unique_ptr<MixerOutputStream> output,
std::unique_ptr<base::Thread> mixer_thread, scoped_refptr<base::SequencedTaskRunner> mixer_task_runner,
scoped_refptr<base::SingleThreadTaskRunner> mixer_task_runner,
const std::string& pipeline_json, const std::string& pipeline_json,
scoped_refptr<base::SequencedTaskRunner> io_task_runner = nullptr); scoped_refptr<base::SequencedTaskRunner> io_task_runner = nullptr);
void ResetPostProcessorsForTest( void ResetPostProcessorsForTest(
...@@ -140,6 +139,8 @@ class StreamMixer { ...@@ -140,6 +139,8 @@ class StreamMixer {
}; };
class ExternalMediaVolumeChangeRequestObserver; class ExternalMediaVolumeChangeRequestObserver;
class MixerThread;
enum State { enum State {
kStateStopped, kStateStopped,
kStateRunning, kStateRunning,
...@@ -165,6 +166,7 @@ class StreamMixer { ...@@ -165,6 +166,7 @@ class StreamMixer {
int input_samples_per_second); int input_samples_per_second);
void SignalError(MixerInput::Source::MixerError error); void SignalError(MixerInput::Source::MixerError error);
int GetEffectiveChannelCount(MixerInput::Source* input_source); int GetEffectiveChannelCount(MixerInput::Source* input_source);
void AddInputOnThread(MixerInput::Source* input_source);
void RemoveInputOnThread(MixerInput::Source* input_source); void RemoveInputOnThread(MixerInput::Source* input_source);
void SetCloseTimeout(); void SetCloseTimeout();
void UpdatePlayoutChannel(); void UpdatePlayoutChannel();
...@@ -173,10 +175,21 @@ class StreamMixer { ...@@ -173,10 +175,21 @@ class StreamMixer {
void WriteOneBuffer(); void WriteOneBuffer();
void WriteMixedPcm(int frames, int64_t expected_playback_time); void WriteMixedPcm(int frames, int64_t expected_playback_time);
void UpdateStreamCountsOnThread();
void SetVolumeOnThread(AudioContentType type, float level);
void SetMutedOnThread(AudioContentType type, bool muted);
void SetOutputLimitOnThread(AudioContentType type, float limit);
void SetVolumeMultiplierOnThread(MixerInput::Source* source,
float multiplier);
void SetPostProcessorConfigOnThread(std::string name, std::string config);
void AddAudioOutputRedirectorOnThread(
std::unique_ptr<AudioOutputRedirector> redirector);
void RemoveAudioOutputRedirectorOnThread(AudioOutputRedirector* redirector); void RemoveAudioOutputRedirectorOnThread(AudioOutputRedirector* redirector);
int GetSampleRateForDeviceId(const std::string& device); int GetSampleRateForDeviceId(const std::string& device);
void OnHealthCheckFailed();
MediaPipelineBackend::AudioDecoder::RenderingDelay GetTotalRenderingDelay( MediaPipelineBackend::AudioDecoder::RenderingDelay GetTotalRenderingDelay(
FilterGroup* filter_group); FilterGroup* filter_group);
...@@ -184,16 +197,15 @@ class StreamMixer { ...@@ -184,16 +197,15 @@ class StreamMixer {
std::unique_ptr<PostProcessingPipelineFactory> std::unique_ptr<PostProcessingPipelineFactory>
post_processing_pipeline_factory_; post_processing_pipeline_factory_;
std::unique_ptr<MixerPipeline> mixer_pipeline_; std::unique_ptr<MixerPipeline> mixer_pipeline_;
std::unique_ptr<base::Thread> mixer_thread_; SEQUENCE_CHECKER(mixer_sequence_checker_);
scoped_refptr<base::SingleThreadTaskRunner> mixer_task_runner_; scoped_refptr<MixerThread> mixer_thread_;
scoped_refptr<base::TaskRunner> mixer_task_runner_;
scoped_refptr<base::SequencedTaskRunner> io_task_runner_; scoped_refptr<base::SequencedTaskRunner> io_task_runner_;
std::unique_ptr<ThreadHealthChecker> health_checker_; std::unique_ptr<ThreadHealthChecker> health_checker_;
std::unique_ptr<InterleavedChannelMixer> loopback_channel_mixer_; std::unique_ptr<InterleavedChannelMixer> loopback_channel_mixer_;
std::unique_ptr<InterleavedChannelMixer> output_channel_mixer_; std::unique_ptr<InterleavedChannelMixer> output_channel_mixer_;
void OnHealthCheckFailed();
bool enable_dynamic_channel_count_; bool enable_dynamic_channel_count_;
const int low_sample_rate_cutoff_; const int low_sample_rate_cutoff_;
int fixed_num_output_channels_; int fixed_num_output_channels_;
......
...@@ -83,7 +83,7 @@ class ExternalAudioPipelineTest : public ::testing::Test { ...@@ -83,7 +83,7 @@ class ExternalAudioPipelineTest : public ::testing::Test {
external_audio_pipeline_support_->SetSupported(); external_audio_pipeline_support_->SetSupported();
mixer_ = std::make_unique<StreamMixer>( mixer_ = std::make_unique<StreamMixer>(
nullptr, nullptr, base::ThreadTaskRunnerHandle::Get(), "{}"); nullptr, base::ThreadTaskRunnerHandle::Get(), "{}");
} }
void TearDown() override { void TearDown() override {
......
...@@ -379,7 +379,7 @@ class StreamMixerTest : public testing::Test { ...@@ -379,7 +379,7 @@ class StreamMixerTest : public testing::Test {
auto output = std::make_unique<NiceMock<MockMixerOutput>>(); auto output = std::make_unique<NiceMock<MockMixerOutput>>();
mock_output_ = output.get(); mock_output_ = output.get();
mixer_ = std::make_unique<StreamMixer>( mixer_ = std::make_unique<StreamMixer>(
std::move(output), nullptr, base::ThreadTaskRunnerHandle::Get(), "{}"); std::move(output), base::ThreadTaskRunnerHandle::Get(), "{}");
mixer_->SetVolume(AudioContentType::kMedia, 1.0f); mixer_->SetVolume(AudioContentType::kMedia, 1.0f);
mixer_->SetVolume(AudioContentType::kAlarm, 1.0f); mixer_->SetVolume(AudioContentType::kAlarm, 1.0f);
std::string test_pipeline_json = base::StringPrintf( std::string test_pipeline_json = base::StringPrintf(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment