Commit 7fc25505 authored by Ken MacKay's avatar Ken MacKay Committed by Commit Bot

[Chromecast] Fix underrun issues

Add a hack (since priority inheritance doesn't work with PostTask) to
avoid priority inversion on the mixer thread, to reduce underruns. Add
logging for per-stream underrun. Add some special-case buffering for
communications streams to avoid issues with voice calling.

Bug: internal b/74603750
Change-Id: I82be8f03558cccac6bf04967ef48a31828f29cda
Reviewed-on: https://chromium-review.googlesource.com/977037
Commit-Queue: Kenneth MacKay <kmackay@chromium.org>
Reviewed-by: default avatarStephen Lanham <slan@chromium.org>
Cr-Commit-Position: refs/heads/master@{#545597}
parent 71be3f13
......@@ -77,7 +77,7 @@ constexpr int64_t kNoTimestamp = std::numeric_limits<int64_t>::min();
constexpr char kOutputDeviceDefaultName[] = "default";
constexpr bool kPcmRecoverIsSilent = false;
constexpr int kDefaultOutputBufferSizeFrames = 512;
constexpr int kDefaultOutputBufferSizeFrames = 1024;
// A list of supported sample rates.
// TODO(jyw): move this up into chromecast/public for 1) documentation and
......
......@@ -17,13 +17,15 @@
#include "base/threading/thread_task_runner_handle.h"
#include "chromecast/media/cma/backend/stream_mixer.h"
#include "chromecast/media/cma/base/decoder_buffer_base.h"
#include "media/audio/audio_device_description.h"
#include "media/base/audio_bus.h"
#include "media/base/audio_timestamp_helper.h"
#define POST_TASK_TO_CALLER_THREAD(task, ...) \
caller_task_runner_->PostTask( \
FROM_HERE, \
base::BindOnce(&BufferingMixerSource::task, weak_this_, ##__VA_ARGS__));
#define POST_TASK_TO_CALLER_THREAD(task, ...) \
shim_task_runner_->PostTask( \
FROM_HERE, base::BindOnce(&PostTaskShim, caller_task_runner_, \
base::BindOnce(&BufferingMixerSource::task, \
weak_this_, ##__VA_ARGS__)));
namespace chromecast {
namespace media {
......@@ -34,6 +36,16 @@ const int kNumOutputChannels = 2;
const int64_t kInputQueueMs = 90;
const int kFadeTimeMs = 5;
// Special queue size and start threshold for "communications" streams to avoid
// issues with voice calling.
const int64_t kCommsInputQueueMs = 200;
const int64_t kCommsStartThresholdMs = 150;
void PostTaskShim(scoped_refptr<base::SingleThreadTaskRunner> task_runner,
base::OnceClosure task) {
task_runner->PostTask(FROM_HERE, std::move(task));
}
std::string AudioContentTypeToString(media::AudioContentType type) {
switch (type) {
case media::AudioContentType::kAlarm:
......@@ -55,6 +67,20 @@ int64_t SamplesToMicroseconds(int64_t samples, int sample_rate) {
.InMicroseconds();
}
int MaxQueuedFrames(const std::string& device_id, int sample_rate) {
if (device_id == ::media::AudioDeviceDescription::kCommunicationsDeviceId) {
return MsToSamples(kCommsInputQueueMs, sample_rate);
}
return MsToSamples(kInputQueueMs, sample_rate);
}
int StartThreshold(const std::string& device_id, int sample_rate) {
if (device_id == ::media::AudioDeviceDescription::kCommunicationsDeviceId) {
return MsToSamples(kCommsStartThresholdMs, sample_rate);
}
return 0;
}
} // namespace
BufferingMixerSource::LockedMembers::Members::Members(
......@@ -70,7 +96,8 @@ BufferingMixerSource::LockedMembers::Members::Members(
fader_(source,
num_channels,
MsToSamples(kFadeTimeMs, input_samples_per_second)),
zero_fader_frames_(false) {}
zero_fader_frames_(false),
started_(false) {}
BufferingMixerSource::LockedMembers::Members::~Members() = default;
......@@ -124,13 +151,17 @@ BufferingMixerSource::BufferingMixerSource(Delegate* delegate,
playout_channel_(playout_channel),
mixer_(StreamMixer::Get()),
caller_task_runner_(base::ThreadTaskRunnerHandle::Get()),
max_queued_frames_(MsToSamples(kInputQueueMs, input_samples_per_second)),
shim_task_runner_(mixer_->shim_task_runner()),
max_queued_frames_(MaxQueuedFrames(device_id, input_samples_per_second)),
start_threshold_frames_(
StartThreshold(device_id, input_samples_per_second)),
locked_members_(this, input_samples_per_second, num_channels_),
weak_factory_(this) {
LOG(INFO) << "Create " << device_id_ << " (" << this
<< "), content type = " << AudioContentTypeToString(content_type_);
DCHECK(delegate_);
DCHECK(mixer_);
DCHECK_LE(start_threshold_frames_, max_queued_frames_);
weak_this_ = weak_factory_.GetWeakPtr();
mixer_->AddInput(this);
......@@ -254,10 +285,22 @@ int BufferingMixerSource::FillAudioPlaybackFrames(
// In normal playback, don't pass data to the fader if we can't satisfy the
// full request. This will allow us to buffer up more data so we can fully
// fade in.
locked->zero_fader_frames_ =
(locked->state_ == State::kNormalPlayback &&
locked->queued_frames_ <
locked->fader_.FramesNeededFromSource(num_frames));
if (locked->state_ == State::kNormalPlayback &&
(locked->queued_frames_ <
locked->fader_.FramesNeededFromSource(num_frames) ||
(!locked->started_ &&
locked->queued_frames_ < start_threshold_frames_))) {
if (locked->started_) {
LOG(INFO) << "Stream underrun for " << device_id_ << " (" << this
<< ")";
}
locked->zero_fader_frames_ = true;
locked->started_ = false;
} else {
locked->zero_fader_frames_ = false;
locked->started_ = true;
}
filled = locked->fader_.FillFrames(num_frames, buffer);
locked->mixer_rendering_delay_ = rendering_delay;
......@@ -291,6 +334,7 @@ int BufferingMixerSource::FillAudioPlaybackFrames(
if (signal_eos) {
POST_TASK_TO_CALLER_THREAD(PostEos);
}
if (remove_self) {
mixer_->RemoveInput(this);
}
......
......@@ -120,6 +120,7 @@ class BufferingMixerSource : public MixerInput::Source,
int current_buffer_offset_;
AudioFader fader_;
bool zero_fader_frames_;
bool started_;
private:
DISALLOW_COPY_AND_ASSIGN(Members);
......@@ -200,7 +201,10 @@ class BufferingMixerSource : public MixerInput::Source,
const int playout_channel_;
StreamMixer* const mixer_;
const scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
const scoped_refptr<base::SingleThreadTaskRunner> shim_task_runner_;
const int max_queued_frames_;
// Minimum number of frames buffered before starting to fill data.
const int start_threshold_frames_;
LockedMembers locked_members_;
......
......@@ -53,6 +53,20 @@ class PostProcessingPipelineFactory;
// input sources, then the output sample rate is updated to match the input
// sample rate of the new source.
// * Otherwise, the output sample rate remains unchanged.
//
// We use a "shim thread" to avoid priority inversion due to PostTask to
// low-priority threads. Suppose we want to PostTask to some low-priority
// thread, and the internal PostTask mutex is locked by that thread (and the
// low-priority thread isn't running right now). If the mixer thread does the
// PostTask, then it will block until the low-priority thread gets to run again
// and unlocks the mutex. Instead, we can PostTask to the shim thread (which is
// high-priority, and won't be holding the mutex for significant periods) and so
// avoid blocking waiting for the low-priority thread. The shim thread will
// maybe block when it does the PostTask to the low-priority thread, but we
// don't really care about that.
// All loopback audio handling is done on the shim thread, and any tasks posted
// from/to potentially low-priority threads should be done through the shim
// thread.
class StreamMixer {
public:
// Returns the mixer instance for this process. Caller must not delete the
......@@ -105,6 +119,10 @@ class StreamMixer {
void ValidatePostProcessorsForTest();
int num_output_channels() const { return num_output_channels_; }
scoped_refptr<base::SingleThreadTaskRunner> shim_task_runner() const {
return shim_task_runner_;
}
private:
enum State {
kStateStopped,
......@@ -127,6 +145,7 @@ class StreamMixer {
void Stop();
void CheckChangeOutputRate(int input_samples_per_second);
void SignalError(MixerInput::Source::MixerError error);
void AddInputOnThread(MixerInput::Source* input_source);
void RemoveInputOnThread(MixerInput::Source* input_source);
void SetCloseTimeout();
void UpdatePlayoutChannel();
......@@ -137,12 +156,35 @@ class StreamMixer {
void WriteOneBuffer();
void WriteMixedPcm(int frames, int64_t expected_playback_time);
void SetVolumeOnThread(AudioContentType type, float level);
void SetMutedOnThread(AudioContentType type, bool muted);
void SetOutputLimitOnThread(AudioContentType type, float limit);
void SetVolumeMultiplierOnThread(MixerInput::Source* source,
float multiplier);
void SetPostProcessorConfigOnThread(const std::string& name,
const std::string& config);
void AddLoopbackAudioObserverOnShimThread(
CastMediaShlib::LoopbackAudioObserver* observer);
void RemoveLoopbackAudioObserverOnShimThread(
CastMediaShlib::LoopbackAudioObserver* observer);
void SendLoopbackData(int64_t expected_playback_time,
int sample_rate,
int frames,
int channels,
std::unique_ptr<float[]> data);
void LoopbackInterrupted();
std::unique_ptr<MixerOutputStream> output_;
std::unique_ptr<PostProcessingPipelineFactory>
post_processing_pipeline_factory_;
std::unique_ptr<base::Thread> mixer_thread_;
scoped_refptr<base::SingleThreadTaskRunner> mixer_task_runner_;
// Special thread to avoid underruns due to priority inversion.
std::unique_ptr<base::Thread> shim_thread_;
scoped_refptr<base::SingleThreadTaskRunner> shim_task_runner_;
int num_output_channels_;
const int low_sample_rate_cutoff_;
const int fixed_sample_rate_;
......
......@@ -455,6 +455,15 @@ class StreamMixerTest : public testing::Test {
static_cast<size_t>(kNumPostProcessors));
}
void WaitForMixer() {
base::RunLoop run_loop1;
message_loop_->task_runner()->PostTask(FROM_HERE, run_loop1.QuitClosure());
run_loop1.Run();
base::RunLoop run_loop2;
message_loop_->task_runner()->PostTask(FROM_HERE, run_loop2.QuitClosure());
run_loop2.Run();
}
void PlaybackOnce() {
// Run one playback iteration.
EXPECT_CALL(*mock_output_, Write(_,
......@@ -465,13 +474,7 @@ class StreamMixerTest : public testing::Test {
base::RunLoop run_loop;
message_loop_->task_runner()->PostTask(FROM_HERE, run_loop.QuitClosure());
run_loop.Run();
}
void WaitForRemoval() {
// Need to wait for the removal task (it is always posted).
base::RunLoop run_loop;
message_loop_->task_runner()->PostTask(FROM_HERE, run_loop.QuitClosure());
run_loop.Run();
testing::Mock::VerifyAndClearExpectations(mock_output_);
}
protected:
......@@ -490,6 +493,7 @@ TEST_F(StreamMixerTest, AddSingleInput) {
EXPECT_CALL(*mock_output_, Start(kTestSamplesPerSecond, _)).Times(1);
EXPECT_CALL(*mock_output_, Stop()).Times(0);
mixer_->AddInput(&input);
WaitForMixer();
mixer_.reset();
}
......@@ -504,6 +508,7 @@ TEST_F(StreamMixerTest, AddMultipleInputs) {
EXPECT_CALL(*mock_output_, Stop()).Times(0);
mixer_->AddInput(&input1);
mixer_->AddInput(&input2);
WaitForMixer();
mixer_.reset();
}
......@@ -524,12 +529,14 @@ TEST_F(StreamMixerTest, RemoveInput) {
mixer_->AddInput(inputs[i].get());
}
WaitForMixer();
for (size_t i = 0; i < inputs.size(); ++i) {
EXPECT_CALL(*inputs[i], FinalizeAudioPlayback()).Times(1);
mixer_->RemoveInput(inputs[i].get());
}
WaitForRemoval();
WaitForMixer();
}
TEST_F(StreamMixerTest, WriteFrames) {
......@@ -547,6 +554,8 @@ TEST_F(StreamMixerTest, WriteFrames) {
mixer_->AddInput(inputs[i].get());
}
WaitForMixer();
for (size_t i = 0; i < inputs.size(); ++i) {
EXPECT_CALL(*inputs[i], FillAudioPlaybackFrames(_, _, _)).Times(1);
}
......@@ -563,6 +572,7 @@ TEST_F(StreamMixerTest, OneStreamMixesProperly) {
EXPECT_CALL(*mock_output_, Start(kTestSamplesPerSecond, _)).Times(1);
EXPECT_CALL(*mock_output_, Stop()).Times(0);
mixer_->AddInput(&input);
WaitForMixer();
mock_output_->ClearData();
// Populate the stream with data.
......@@ -589,17 +599,17 @@ TEST_F(StreamMixerTest, OneStreamIsScaledDownProperly) {
EXPECT_CALL(input, InitializeAudioPlayback(_, _)).Times(1);
EXPECT_CALL(*mock_output_, Start(kTestSamplesPerSecond, _)).Times(1);
EXPECT_CALL(*mock_output_, Stop()).Times(0);
input.set_multiplier(0.75f);
mixer_->AddInput(&input);
mock_output_->ClearData();
mixer_->SetVolumeMultiplier(&input, input.multiplier());
WaitForMixer();
// Populate the stream with data.
const int kNumFrames = 32;
ASSERT_EQ(sizeof(kTestData[0]), kNumChannels * kNumFrames * kBytesPerSample);
input.SetData(GetTestData(0));
// Set the volume multiplier.
input.set_multiplier(0.75f);
mixer_->SetVolumeMultiplier(&input, input.multiplier());
mock_output_->ClearData();
EXPECT_CALL(input, FillAudioPlaybackFrames(_, _, _)).Times(1);
PlaybackOnce();
......@@ -630,6 +640,7 @@ TEST_F(StreamMixerTest, TwoUnscaledStreamsMixProperly) {
EXPECT_CALL(*inputs[i], InitializeAudioPlayback(_, _)).Times(1);
mixer_->AddInput(inputs[i].get());
}
WaitForMixer();
mock_output_->ClearData();
// Populate the streams with data.
......@@ -669,6 +680,7 @@ TEST_F(StreamMixerTest, TwoUnscaledStreamsWithDifferentIdsMixProperly) {
EXPECT_CALL(*inputs[i], InitializeAudioPlayback(_, _)).Times(1);
mixer_->AddInput(inputs[i].get());
}
WaitForMixer();
mock_output_->ClearData();
// Populate the streams with data.
......@@ -706,6 +718,7 @@ TEST_F(StreamMixerTest, TwoUnscaledStreamsMixProperlyWithEdgeCases) {
EXPECT_CALL(*inputs[i], InitializeAudioPlayback(_, _)).Times(1);
mixer_->AddInput(inputs[i].get());
}
WaitForMixer();
mock_output_->ClearData();
// Create edge case data for the inputs. By mixing these two short streams,
......@@ -792,6 +805,7 @@ TEST_F(StreamMixerTest, PostProcessorDelayListedDeviceId) {
EXPECT_CALL(*inputs[i], InitializeAudioPlayback(_, _)).Times(1);
mixer_->AddInput(inputs[i].get());
}
WaitForMixer();
auto* post_processors = &pp_factory_->instances;
EXPECT_POSTPROCESSOR_CALL_PROCESSFRAMES(post_processors, "default", 1, _,
......@@ -832,6 +846,7 @@ TEST_F(StreamMixerTest, PostProcessorDelayUnlistedDevice) {
EXPECT_POSTPROCESSOR_CALL_PROCESSFRAMES(post_processors, "assistant-tts", 1,
_, _);
mixer_->AddInput(&input);
WaitForMixer();
// Delay should be based on default processor.
int64_t delay = FramesToDelayUs(
......@@ -880,6 +895,7 @@ TEST_F(StreamMixerTest, PostProcessorRingingWithoutInput) {
MockPostProcessorFactory* factory_ptr = factory.get();
mixer_->ResetPostProcessorsForTest(std::move(factory), test_pipeline_json);
mixer_->AddInput(&input);
WaitForMixer();
// "mix" + "linearize" should be automatic
CHECK_EQ(factory_ptr->instances.size(), 4u);
......@@ -976,11 +992,13 @@ TEST_F(StreamMixerTest, PicksPlayoutChannel) {
// Requests: all = 0 ch0 = 0 ch1 = 1.
EXPECT_CALL_ALL_POSTPROCESSORS(factory_ptr, UpdatePlayoutChannel(1));
mixer_->AddInput(&input3);
WaitForMixer();
VerifyAndClearPostProcessors(factory_ptr);
// Requests: all = 0 ch0 = 0 ch1 = 2.
EXPECT_CALL_ALL_POSTPROCESSORS(factory_ptr, UpdatePlayoutChannel(1));
mixer_->AddInput(&input4);
WaitForMixer();
VerifyAndClearPostProcessors(factory_ptr);
// Requests: all = 1 ch0 = 0 ch1 = 2.
......@@ -988,42 +1006,45 @@ TEST_F(StreamMixerTest, PicksPlayoutChannel) {
EXPECT_CALL_ALL_POSTPROCESSORS(factory_ptr,
UpdatePlayoutChannel(kChannelAll));
mixer_->AddInput(&input1);
WaitForMixer();
VerifyAndClearPostProcessors(factory_ptr);
// Requests: all = 1 ch0 = 0 ch1 = 1.
EXPECT_CALL_ALL_POSTPROCESSORS(factory_ptr,
UpdatePlayoutChannel(kChannelAll));
mixer_->RemoveInput(&input3);
WaitForRemoval();
WaitForMixer();
VerifyAndClearPostProcessors(factory_ptr);
// Requests: all = 0 ch0 = 0 ch1 = 1.
EXPECT_CALL_ALL_POSTPROCESSORS(factory_ptr, UpdatePlayoutChannel(1));
mixer_->RemoveInput(&input1);
WaitForRemoval();
WaitForMixer();
VerifyAndClearPostProcessors(factory_ptr);
// Requests: all = 0 ch0 = 0 ch1 = 0.
EXPECT_CALL_ALL_POSTPROCESSORS(factory_ptr,
UpdatePlayoutChannel(kChannelAll));
mixer_->RemoveInput(&input4);
WaitForRemoval();
WaitForMixer();
VerifyAndClearPostProcessors(factory_ptr);
// Requests: all = 0 ch0 = 1 ch1 = 0
EXPECT_CALL_ALL_POSTPROCESSORS(factory_ptr, UpdatePlayoutChannel(0));
mixer_->AddInput(&input2);
WaitForMixer();
VerifyAndClearPostProcessors(factory_ptr);
// Requests: all = 1 ch0 = 1 ch1 = 0
EXPECT_CALL_ALL_POSTPROCESSORS(factory_ptr,
UpdatePlayoutChannel(kChannelAll));
mixer_->AddInput(&input1);
WaitForMixer();
VerifyAndClearPostProcessors(factory_ptr);
mixer_->RemoveInput(&input1);
mixer_->RemoveInput(&input2);
WaitForRemoval();
WaitForMixer();
}
TEST_F(StreamMixerTest, SetPostProcessorConfig) {
......@@ -1035,6 +1056,7 @@ TEST_F(StreamMixerTest, SetPostProcessorConfig) {
}
mixer_->SetPostProcessorConfig(name, config);
WaitForMixer();
}
TEST_F(StreamMixerTest, ObserverGets2ChannelsByDefault) {
......@@ -1042,15 +1064,18 @@ TEST_F(StreamMixerTest, ObserverGets2ChannelsByDefault) {
testing::StrictMock<MockLoopbackAudioObserver> observer;
mixer_->AddInput(&input);
mixer_->AddLoopbackAudioObserver(&observer);
WaitForMixer();
EXPECT_CALL(observer,
OnLoopbackAudio(_, kSampleFormatF32, kTestSamplesPerSecond,
kNumChannels, _, _));
kNumChannels, _, _))
.Times(testing::AtLeast(1));
PlaybackOnce();
EXPECT_CALL(observer, OnRemoved());
mixer_->RemoveLoopbackAudioObserver(&observer);
WaitForMixer();
mixer_.reset();
}
......@@ -1063,15 +1088,18 @@ TEST_F(StreamMixerTest, ObserverGets1ChannelIfNumOutputChannelsIs1) {
testing::StrictMock<MockLoopbackAudioObserver> observer;
mixer_->AddInput(&input);
mixer_->AddLoopbackAudioObserver(&observer);
WaitForMixer();
EXPECT_CALL(observer,
OnLoopbackAudio(_, kSampleFormatF32, kTestSamplesPerSecond,
kNumOutputChannels, _, _));
kNumOutputChannels, _, _))
.Times(testing::AtLeast(1));
PlaybackOnce();
EXPECT_CALL(observer, OnRemoved());
mixer_->RemoveLoopbackAudioObserver(&observer);
WaitForMixer();
mixer_.reset();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment