Commit 28a596db authored by wutao's avatar wutao Committed by Commit Bot

Assistant: Decodes streaming data - client side

This patch implements the decoding client side for streaming data.

Bug: b/113131759, b/80315134
Test: manual.
Change-Id: I2ae58ef3d3c368ba02891db14b0a39f62155f8a9
Reviewed-on: https://chromium-review.googlesource.com/1195145Reviewed-by: default avatarSam McNally <sammc@chromium.org>
Reviewed-by: default avatarXiaohui Chen <xiaohuic@chromium.org>
Commit-Queue: Tao Wu <wutao@chromium.org>
Cr-Commit-Position: refs/heads/master@{#587915}
parent d9ca8389
...@@ -49,8 +49,12 @@ source_set("lib") { ...@@ -49,8 +49,12 @@ source_set("lib") {
"assistant_settings_manager_impl.h", "assistant_settings_manager_impl.h",
"platform/audio_input_provider_impl.cc", "platform/audio_input_provider_impl.cc",
"platform/audio_input_provider_impl.h", "platform/audio_input_provider_impl.h",
"platform/audio_media_data_source.cc",
"platform/audio_media_data_source.h",
"platform/audio_output_provider_impl.cc", "platform/audio_output_provider_impl.cc",
"platform/audio_output_provider_impl.h", "platform/audio_output_provider_impl.h",
"platform/audio_stream_handler.cc",
"platform/audio_stream_handler.h",
"platform/file_provider_impl.cc", "platform/file_provider_impl.cc",
"platform/file_provider_impl.h", "platform/file_provider_impl.h",
"platform/network_provider_impl.cc", "platform/network_provider_impl.cc",
......
...@@ -33,8 +33,8 @@ AssistantAudioDecoder::AssistantAudioDecoder( ...@@ -33,8 +33,8 @@ AssistantAudioDecoder::AssistantAudioDecoder(
: service_ref_(std::move(service_ref)), : service_ref_(std::move(service_ref)),
client_(std::move(client)), client_(std::move(client)),
task_runner_(base::ThreadTaskRunnerHandle::Get()), task_runner_(base::ThreadTaskRunnerHandle::Get()),
media_thread_(std::make_unique<base::Thread>("media_thread")), data_source_(std::make_unique<IPCDataSource>(std::move(data_source))),
data_source_(std::make_unique<IPCDataSource>(std::move(data_source))) { media_thread_(std::make_unique<base::Thread>("media_thread")) {
CHECK(media_thread_->Start()); CHECK(media_thread_->Start());
} }
...@@ -78,7 +78,7 @@ void AssistantAudioDecoder::OpenDecoderOnMediaThread( ...@@ -78,7 +78,7 @@ void AssistantAudioDecoder::OpenDecoderOnMediaThread(
void AssistantAudioDecoder::DecodeOnMediaThread() { void AssistantAudioDecoder::DecodeOnMediaThread() {
std::vector<std::unique_ptr<media::AudioBus>> decoded_audio_packets; std::vector<std::unique_ptr<media::AudioBus>> decoded_audio_packets;
// Experimental number of decoded packets before sending to |client_|. // Experimental number of decoded packets before sending to |client_|.
constexpr int kPacketsToRead = 128; constexpr int kPacketsToRead = 16;
decoder_->Read(&decoded_audio_packets, kPacketsToRead); decoder_->Read(&decoded_audio_packets, kPacketsToRead);
task_runner_->PostTask( task_runner_->PostTask(
......
...@@ -8,7 +8,6 @@ ...@@ -8,7 +8,6 @@
#include <memory> #include <memory>
#include "base/macros.h" #include "base/macros.h"
#include "base/synchronization/lock.h"
#include "chromeos/services/assistant/public/mojom/assistant_audio_decoder.mojom.h" #include "chromeos/services/assistant/public/mojom/assistant_audio_decoder.mojom.h"
namespace media { namespace media {
......
...@@ -72,7 +72,7 @@ void IPCDataSource::ReadDone(uint8_t* destination, ...@@ -72,7 +72,7 @@ void IPCDataSource::ReadDone(uint8_t* destination,
int requested_size, int requested_size,
const std::vector<uint8_t>& data) { const std::vector<uint8_t>& data) {
DCHECK_CALLED_ON_VALID_THREAD(utility_thread_checker_); DCHECK_CALLED_ON_VALID_THREAD(utility_thread_checker_);
if (data.size() > requested_size) { if (static_cast<int>(data.size()) > requested_size) {
mojo::ReportBadMessage("IPCDataSource::ReadDone: Unexpected data size."); mojo::ReportBadMessage("IPCDataSource::ReadDone: Unexpected data size.");
callback.Run(0); callback.Run(0);
return; return;
......
...@@ -7,7 +7,8 @@ ...@@ -7,7 +7,8 @@
"ash": [ "system_ui" ], "ash": [ "system_ui" ],
"identity": [ "identity_manager" ], "identity": [ "identity_manager" ],
"device": [ "device:battery_monitor" ], "device": [ "device:battery_monitor" ],
"audio": [ "stream_factory" ] "audio": [ "stream_factory" ],
"assistant_audio_decoder": [ "assistant:audio_decoder" ]
}, },
"provides": { "provides": {
"assistant": [ "assistant": [
......
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromeos/services/assistant/platform/audio_media_data_source.h"
#include <algorithm>
#include "base/time/time.h"
namespace chromeos {
namespace assistant {
namespace {
// The maximum number of bytes to decode on each iteration.
// 512 was chosen to make sure decoding does not block for long.
constexpr int kMaxBytesToDecode = 512;
} // namespace
AudioMediaDataSource::AudioMediaDataSource(
mojom::AssistantMediaDataSourcePtr* interface_ptr,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: binding_(this, mojo::MakeRequest(interface_ptr)),
task_runner_(task_runner),
weak_factory_(this) {}
AudioMediaDataSource::~AudioMediaDataSource() = default;
void AudioMediaDataSource::Read(
int32_t size,
mojom::AssistantMediaDataSource::ReadCallback callback) {
if (!delegate_) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AudioMediaDataSource::OnFillBuffer,
weak_factory_.GetWeakPtr(), std::move(callback), 0));
return;
}
size = std::min(size, kMaxBytesToDecode);
source_buffer_.resize(size);
delegate_->FillBuffer(
source_buffer_.data(), source_buffer_.size(),
// TODO(wutao): This should be a future time that these buffers would be
// played.
base::TimeTicks::Now().since_origin().InMicroseconds(), [
task_runner = task_runner_, weak_ptr = weak_factory_.GetWeakPtr(),
repeating_callback =
base::AdaptCallbackForRepeating(std::move(callback))
](int bytes_available) {
task_runner->PostTask(
FROM_HERE,
base::BindOnce(&AudioMediaDataSource::OnFillBuffer, weak_ptr,
std::move(repeating_callback), bytes_available));
});
}
void AudioMediaDataSource::OnFillBuffer(
mojom::AssistantMediaDataSource::ReadCallback callback,
int bytes_filled) {
source_buffer_.resize(bytes_filled);
std::move(callback).Run(source_buffer_);
}
} // namespace assistant
} // namespace chromeos
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMEOS_SERVICES_ASSISTANT_PLATFORM_AUDIO_MEDIA_DATA_SOURCE_H_
#define CHROMEOS_SERVICES_ASSISTANT_PLATFORM_AUDIO_MEDIA_DATA_SOURCE_H_
#include <vector>
#include "base/macros.h"
#include "base/single_thread_task_runner.h"
#include "chromeos/services/assistant/public/mojom/assistant_audio_decoder.mojom.h"
#include "libassistant/shared/public/platform_audio_output.h"
#include "mojo/public/cpp/bindings/binding.h"
namespace chromeos {
namespace assistant {
// Class to provide media data source for audio stream decoder.
// Internally it will read media data from |delegate_|.
class AudioMediaDataSource : public mojom::AssistantMediaDataSource {
public:
AudioMediaDataSource(mojom::AssistantMediaDataSourcePtr* interface_ptr,
scoped_refptr<base::SequencedTaskRunner> task_runner);
~AudioMediaDataSource() override;
// mojom::MediaDataSource implementation.
// Called by utility process. Must be called after |set_delegate()|.
void Read(int32_t size,
mojom::AssistantMediaDataSource::ReadCallback callback) override;
// Called by AudioStreamHandler on main thread.
void set_delegate(assistant_client::AudioOutput::Delegate* delegate) {
delegate_ = delegate;
}
private:
// Called on main thread.
void OnFillBuffer(mojom::AssistantMediaDataSource::ReadCallback callback,
int bytes_filled);
mojo::Binding<mojom::AssistantMediaDataSource> binding_;
scoped_refptr<base::SequencedTaskRunner> task_runner_;
assistant_client::AudioOutput::Delegate* delegate_ = nullptr;
std::vector<uint8_t> source_buffer_;
base::WeakPtrFactory<AudioMediaDataSource> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(AudioMediaDataSource);
};
} // namespace assistant
} // namespace chromeos
#endif // CHROMEOS_SERVICES_ASSISTANT_PLATFORM_AUDIO_MEDIA_DATA_SOURCE_H_
...@@ -5,8 +5,12 @@ ...@@ -5,8 +5,12 @@
#include "chromeos/services/assistant/platform/audio_output_provider_impl.h" #include "chromeos/services/assistant/platform/audio_output_provider_impl.h"
#include "ash/public/interfaces/constants.mojom.h" #include "ash/public/interfaces/constants.mojom.h"
#include "chromeos/services/assistant/platform/audio_stream_handler.h"
#include "chromeos/services/assistant/public/mojom/assistant_audio_decoder.mojom.h"
#include "chromeos/services/assistant/public/mojom/constants.mojom.h"
#include "libassistant/shared/public/platform_audio_buffer.h" #include "libassistant/shared/public/platform_audio_buffer.h"
#include "media/audio/audio_device_description.h" #include "media/audio/audio_device_description.h"
#include "media/base/limits.h"
#include "services/service_manager/public/cpp/connector.h" #include "services/service_manager/public/cpp/connector.h"
namespace chromeos { namespace chromeos {
...@@ -36,21 +40,6 @@ int32_t GetBytesPerFrame(const assistant_client::OutputStreamFormat& format) { ...@@ -36,21 +40,6 @@ int32_t GetBytesPerFrame(const assistant_client::OutputStreamFormat& format) {
int32_t GetBufferSizeInBytesFromBufferFormat( int32_t GetBufferSizeInBytesFromBufferFormat(
const assistant_client::OutputStreamFormat& format) { const assistant_client::OutputStreamFormat& format) {
int32_t frame_size_in_bytes = 0;
switch (format.encoding) {
case assistant_client::OutputStreamEncoding::STREAM_PCM_S16:
frame_size_in_bytes = 2;
break;
case assistant_client::OutputStreamEncoding::STREAM_PCM_S32:
case assistant_client::OutputStreamEncoding::STREAM_PCM_F32:
frame_size_in_bytes = 4;
break;
default:
NOTREACHED();
break;
}
return GetBytesPerFrame(format) * format.pcm_sample_rate / return GetBytesPerFrame(format) * format.pcm_sample_rate /
kNumberOfBuffersPerSec; kNumberOfBuffersPerSec;
} }
...@@ -78,12 +67,19 @@ void FillAudioFifoWithDataOfBufferFormat( ...@@ -78,12 +67,19 @@ void FillAudioFifoWithDataOfBufferFormat(
fifo->Push(data.data(), frames, bytes_per_sample); fifo->Push(data.data(), frames, bytes_per_sample);
} }
bool IsEncodedFormat(const assistant_client::OutputStreamFormat& format) {
return format.encoding ==
assistant_client::OutputStreamEncoding::STREAM_MP3 ||
format.encoding ==
assistant_client::OutputStreamEncoding::STREAM_OPUS_IN_OGG;
}
class AudioOutputImpl : public assistant_client::AudioOutput { class AudioOutputImpl : public assistant_client::AudioOutput {
public: public:
AudioOutputImpl( AudioOutputImpl(
service_manager::Connector* connector, service_manager::Connector* connector,
scoped_refptr<base::SingleThreadTaskRunner> task_runner, scoped_refptr<base::SequencedTaskRunner> task_runner,
scoped_refptr<base::SingleThreadTaskRunner> background_task_runner, scoped_refptr<base::SequencedTaskRunner> background_task_runner,
assistant_client::OutputStreamType type, assistant_client::OutputStreamType type,
assistant_client::OutputStreamFormat format) assistant_client::OutputStreamFormat format)
: connector_(connector), : connector_(connector),
...@@ -91,6 +87,8 @@ class AudioOutputImpl : public assistant_client::AudioOutput { ...@@ -91,6 +87,8 @@ class AudioOutputImpl : public assistant_client::AudioOutput {
background_thread_task_runner_(background_task_runner), background_thread_task_runner_(background_task_runner),
stream_type_(type), stream_type_(type),
format_(format), format_(format),
audio_stream_handler_(
std::make_unique<AudioStreamHandler>(connector_, task_runner)),
device_owner_( device_owner_(
std::make_unique<AudioDeviceOwner>(task_runner, std::make_unique<AudioDeviceOwner>(task_runner,
background_task_runner)) {} background_task_runner)) {}
...@@ -101,37 +99,59 @@ class AudioOutputImpl : public assistant_client::AudioOutput { ...@@ -101,37 +99,59 @@ class AudioOutputImpl : public assistant_client::AudioOutput {
FROM_HERE, FROM_HERE,
base::BindOnce( base::BindOnce(
[](std::unique_ptr<AudioDeviceOwner> device_owner, [](std::unique_ptr<AudioDeviceOwner> device_owner,
scoped_refptr<base::SingleThreadTaskRunner> background_runner) { scoped_refptr<base::SequencedTaskRunner> background_runner) {
// Ensures |device_owner| is destructed on the correct thread. // Ensures |device_owner| is destructed on the correct thread.
background_runner->DeleteSoon(FROM_HERE, device_owner.release()); background_runner->DeleteSoon(FROM_HERE, device_owner.release());
}, },
std::move(device_owner_), background_thread_task_runner_)); std::move(device_owner_), background_thread_task_runner_));
main_thread_task_runner_->DeleteSoon(FROM_HERE,
audio_stream_handler_.release());
} }
// assistant_client::AudioOutput overrides: // assistant_client::AudioOutput overrides:
assistant_client::OutputStreamType GetType() override { return stream_type_; } assistant_client::OutputStreamType GetType() override { return stream_type_; }
void Start(assistant_client::AudioOutput::Delegate* delegate) override { void Start(assistant_client::AudioOutput::Delegate* delegate) override {
main_thread_task_runner_->PostTask( if (IsEncodedFormat(format_)) {
FROM_HERE, base::BindOnce(&AudioDeviceOwner::StartOnMainThread, main_thread_task_runner_->PostTask(
base::Unretained(device_owner_.get()), FROM_HERE,
delegate, connector_, format_)); base::BindOnce(
&AudioStreamHandler::StartAudioDecoder,
base::Unretained(audio_stream_handler_.get()), delegate,
base::BindOnce(&AudioDeviceOwner::StartOnMainThread,
base::Unretained(device_owner_.get()),
audio_stream_handler_.get(), connector_)));
} else {
main_thread_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&AudioDeviceOwner::StartOnMainThread,
base::Unretained(device_owner_.get()),
delegate, connector_, format_));
}
} }
void Stop() override { void Stop() override {
background_thread_task_runner_->PostTask( if (IsEncodedFormat(format_)) {
FROM_HERE, base::BindOnce(&AudioDeviceOwner::StopOnBackgroundThread, main_thread_task_runner_->PostTask(
base::Unretained(device_owner_.get()))); FROM_HERE,
base::BindOnce(&AudioStreamHandler::OnStopped,
base::Unretained(audio_stream_handler_.get())));
} else {
background_thread_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&AudioDeviceOwner::StopOnBackgroundThread,
base::Unretained(device_owner_.get())));
}
} }
private: private:
service_manager::Connector* connector_; service_manager::Connector* connector_;
scoped_refptr<base::SingleThreadTaskRunner> main_thread_task_runner_; scoped_refptr<base::SequencedTaskRunner> main_thread_task_runner_;
scoped_refptr<base::SingleThreadTaskRunner> background_thread_task_runner_; scoped_refptr<base::SequencedTaskRunner> background_thread_task_runner_;
const assistant_client::OutputStreamType stream_type_; const assistant_client::OutputStreamType stream_type_;
assistant_client::OutputStreamFormat format_; assistant_client::OutputStreamFormat format_;
std::unique_ptr<AudioStreamHandler> audio_stream_handler_;
std::unique_ptr<AudioDeviceOwner> device_owner_; std::unique_ptr<AudioDeviceOwner> device_owner_;
DISALLOW_COPY_AND_ASSIGN(AudioOutputImpl); DISALLOW_COPY_AND_ASSIGN(AudioOutputImpl);
...@@ -187,7 +207,7 @@ void VolumeControlImpl::OnMuteStateChanged(bool mute) { ...@@ -187,7 +207,7 @@ void VolumeControlImpl::OnMuteStateChanged(bool mute) {
AudioOutputProviderImpl::AudioOutputProviderImpl( AudioOutputProviderImpl::AudioOutputProviderImpl(
service_manager::Connector* connector, service_manager::Connector* connector,
scoped_refptr<base::SingleThreadTaskRunner> background_task_runner) scoped_refptr<base::SequencedTaskRunner> background_task_runner)
: volume_control_impl_(connector), : volume_control_impl_(connector),
connector_(connector), connector_(connector),
main_thread_task_runner_(base::ThreadTaskRunnerHandle::Get()), main_thread_task_runner_(base::ThreadTaskRunnerHandle::Get()),
...@@ -200,20 +220,18 @@ assistant_client::AudioOutput* AudioOutputProviderImpl::CreateAudioOutput( ...@@ -200,20 +220,18 @@ assistant_client::AudioOutput* AudioOutputProviderImpl::CreateAudioOutput(
const assistant_client::OutputStreamFormat& stream_format) { const assistant_client::OutputStreamFormat& stream_format) {
// Owned by one arbitrary thread inside libassistant. It will be destroyed // Owned by one arbitrary thread inside libassistant. It will be destroyed
// once assistant_client::AudioOutput::Delegate::OnStopped() is called. // once assistant_client::AudioOutput::Delegate::OnStopped() is called.
// TODO(muyuanli): Handle encoded stream: OutputStreamEncoding::STREAM_MP3 /
// OGG.
return new AudioOutputImpl(connector_, main_thread_task_runner_, return new AudioOutputImpl(connector_, main_thread_task_runner_,
background_task_runner_, type, stream_format); background_task_runner_, type, stream_format);
} }
std::vector<assistant_client::OutputStreamEncoding> std::vector<assistant_client::OutputStreamEncoding>
AudioOutputProviderImpl::GetSupportedStreamEncodings() { AudioOutputProviderImpl::GetSupportedStreamEncodings() {
// TODO(muyuanli): implement after media decoder is ready.
return std::vector<assistant_client::OutputStreamEncoding>{ return std::vector<assistant_client::OutputStreamEncoding>{
assistant_client::OutputStreamEncoding::STREAM_PCM_S16, assistant_client::OutputStreamEncoding::STREAM_PCM_S16,
assistant_client::OutputStreamEncoding::STREAM_PCM_S32, assistant_client::OutputStreamEncoding::STREAM_PCM_S32,
assistant_client::OutputStreamEncoding::STREAM_PCM_F32, assistant_client::OutputStreamEncoding::STREAM_PCM_F32,
assistant_client::OutputStreamEncoding::STREAM_MP3, assistant_client::OutputStreamEncoding::STREAM_MP3,
assistant_client::OutputStreamEncoding::STREAM_OPUS_IN_OGG,
}; };
} }
...@@ -255,17 +273,19 @@ void AudioDeviceOwner::StartOnMainThread( ...@@ -255,17 +273,19 @@ void AudioDeviceOwner::StartOnMainThread(
delegate_ = delegate; delegate_ = delegate;
format_ = format; format_ = format;
// TODO(wutao): Remove this after supporting mp3 encoding. // TODO(wutao): There is a bug LibAssistant sends wrong format. Do not run
if (format_.encoding == assistant_client::OutputStreamEncoding::STREAM_MP3) { // in this case.
if (format_.pcm_num_channels >
static_cast<int>(media::limits::kMaxChannels)) {
delegate_->OnEndOfStream(); delegate_->OnEndOfStream();
return; return;
} }
audio_param_ = GetAudioParametersFromBufferFormat(format_); audio_param_ = GetAudioParametersFromBufferFormat(format_);
// |audio_fifo_| contains 3x the number of frames to render. // |audio_fifo_| contains 8x the number of frames to render.
audio_fifo_ = std::make_unique<media::AudioBlockFifo>( audio_fifo_ = std::make_unique<media::AudioBlockFifo>(
format.pcm_num_channels, audio_param_.frames_per_buffer(), 3); format.pcm_num_channels, audio_param_.frames_per_buffer(), 8);
audio_data_.resize(GetBufferSizeInBytesFromBufferFormat(format_)); audio_data_.resize(GetBufferSizeInBytesFromBufferFormat(format_));
{ {
...@@ -286,9 +306,9 @@ void AudioDeviceOwner::StartOnMainThread( ...@@ -286,9 +306,9 @@ void AudioDeviceOwner::StartOnMainThread(
void AudioDeviceOwner::StopOnBackgroundThread() { void AudioDeviceOwner::StopOnBackgroundThread() {
DCHECK(background_task_runner_->RunsTasksInCurrentSequence()); DCHECK(background_task_runner_->RunsTasksInCurrentSequence());
output_device_.reset(); output_device_.reset();
delegate_->OnStopped(); delegate_->OnStopped();
delegate_ = nullptr;
} }
void AudioDeviceOwner::StartDeviceOnBackgroundThread( void AudioDeviceOwner::StartDeviceOnBackgroundThread(
...@@ -307,7 +327,8 @@ int AudioDeviceOwner::Render(base::TimeDelta delay, ...@@ -307,7 +327,8 @@ int AudioDeviceOwner::Render(base::TimeDelta delay,
base::AutoLock lock(lock_); base::AutoLock lock(lock_);
if (!is_filling_ && audio_fifo_->GetAvailableFrames() <= 0) { if (!is_filling_ && audio_fifo_->GetAvailableFrames() <= 0) {
delegate_->OnEndOfStream(); if (delegate_)
delegate_->OnEndOfStream();
return 0; return 0;
} }
if (audio_fifo_->GetAvailableFrames() <= 0) { if (audio_fifo_->GetAvailableFrames() <= 0) {
...@@ -339,7 +360,8 @@ int AudioDeviceOwner::Render(base::TimeDelta delay, ...@@ -339,7 +360,8 @@ int AudioDeviceOwner::Render(base::TimeDelta delay,
void AudioDeviceOwner::OnRenderError() { void AudioDeviceOwner::OnRenderError() {
DVLOG(1) << "OnRenderError()"; DVLOG(1) << "OnRenderError()";
delegate_->OnError(assistant_client::AudioOutput::Error::FATAL_ERROR); if (delegate_)
delegate_->OnError(assistant_client::AudioOutput::Error::FATAL_ERROR);
} }
void AudioDeviceOwner::ScheduleFillLocked(const base::TimeTicks& time) { void AudioDeviceOwner::ScheduleFillLocked(const base::TimeTicks& time) {
...@@ -350,6 +372,10 @@ void AudioDeviceOwner::ScheduleFillLocked(const base::TimeTicks& time) { ...@@ -350,6 +372,10 @@ void AudioDeviceOwner::ScheduleFillLocked(const base::TimeTicks& time) {
// FillBuffer will not be called after delegate_->OnEndOfStream, after which // FillBuffer will not be called after delegate_->OnEndOfStream, after which
// AudioDeviceOwner will be destroyed. Thus |this| is valid for capture // AudioDeviceOwner will be destroyed. Thus |this| is valid for capture
// here. // here.
if (!delegate_)
return;
delegate_->FillBuffer( delegate_->FillBuffer(
audio_data_.data(), audio_data_.data(),
std::min(static_cast<int>(audio_data_.size()), std::min(static_cast<int>(audio_data_.size()),
......
...@@ -56,7 +56,7 @@ class AudioOutputProviderImpl : public assistant_client::AudioOutputProvider { ...@@ -56,7 +56,7 @@ class AudioOutputProviderImpl : public assistant_client::AudioOutputProvider {
public: public:
explicit AudioOutputProviderImpl( explicit AudioOutputProviderImpl(
service_manager::Connector* connector, service_manager::Connector* connector,
scoped_refptr<base::SingleThreadTaskRunner> background_task_runner); scoped_refptr<base::SequencedTaskRunner> background_task_runner);
~AudioOutputProviderImpl() override; ~AudioOutputProviderImpl() override;
// assistant_client::AudioOutputProvider overrides: // assistant_client::AudioOutputProvider overrides:
...@@ -79,8 +79,8 @@ class AudioOutputProviderImpl : public assistant_client::AudioOutputProvider { ...@@ -79,8 +79,8 @@ class AudioOutputProviderImpl : public assistant_client::AudioOutputProvider {
private: private:
VolumeControlImpl volume_control_impl_; VolumeControlImpl volume_control_impl_;
service_manager::Connector* connector_; service_manager::Connector* connector_;
scoped_refptr<base::SingleThreadTaskRunner> main_thread_task_runner_; scoped_refptr<base::SequencedTaskRunner> main_thread_task_runner_;
scoped_refptr<base::SingleThreadTaskRunner> background_task_runner_; scoped_refptr<base::SequencedTaskRunner> background_task_runner_;
DISALLOW_COPY_AND_ASSIGN(AudioOutputProviderImpl); DISALLOW_COPY_AND_ASSIGN(AudioOutputProviderImpl);
}; };
......
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromeos/services/assistant/platform/audio_stream_handler.h"
#include "ash/public/interfaces/constants.mojom.h"
#include "chromeos/services/assistant/platform/audio_media_data_source.h"
#include "chromeos/services/assistant/public/mojom/constants.mojom.h"
#include "services/service_manager/public/cpp/connector.h"
namespace chromeos {
namespace assistant {
AudioStreamHandler::AudioStreamHandler(
service_manager::Connector* connector,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: connector_(connector),
task_runner_(task_runner),
client_binding_(this),
weak_factory_(this) {}
AudioStreamHandler::~AudioStreamHandler() = default;
void AudioStreamHandler::StartAudioDecoder(
assistant_client::AudioOutput::Delegate* delegate,
InitCB on_inited) {
mojom::AssistantAudioDecoderClientPtr client;
client_binding_.Bind(mojo::MakeRequest(&client));
connector_->BindInterface(mojom::kAudioDecoderServiceName,
mojo::MakeRequest(&audio_decoder_factory_ptr_));
mojom::AssistantMediaDataSourcePtr data_source;
media_data_source_ =
std::make_unique<AudioMediaDataSource>(&data_source, task_runner_);
audio_decoder_factory_ptr_->CreateAssistantAudioDecoder(
mojo::MakeRequest(&audio_decoder_), std::move(client),
std::move(data_source));
delegate_ = delegate;
media_data_source_->set_delegate(delegate_);
start_device_owner_on_main_thread_ = std::move(on_inited);
audio_decoder_->OpenDecoder(base::BindOnce(
&AudioStreamHandler::OnDecoderInitialized, weak_factory_.GetWeakPtr()));
}
void AudioStreamHandler::OnDecoderInitialized(bool success,
int bytes_per_sample,
int samples_per_second,
int channels) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AudioStreamHandler::OnDecoderInitializedOnThread,
weak_factory_.GetWeakPtr(), success, bytes_per_sample,
samples_per_second, channels));
}
void AudioStreamHandler::OnNewBuffers(
const std::vector<std::vector<uint8_t>>& buffers) {
if (buffers.size() == 0)
no_more_data_ = true;
for (const auto& buffer : buffers)
decoded_data_.emplace_back(buffer);
is_decoding_ = false;
FillDecodedBuffer(buffer_to_copy_, size_to_copy_);
}
// TODO(wutao): Needs to pass |playback_timestamp| to LibAssistant.
void AudioStreamHandler::FillBuffer(
void* buffer,
int buffer_size,
int64_t playback_timestamp,
assistant_client::Callback1<int> on_filled) {
DCHECK(!on_filled_);
on_filled_ = std::move(on_filled);
buffer_to_copy_ = buffer;
size_to_copy_ = buffer_size;
FillDecodedBuffer(buffer, buffer_size);
}
void AudioStreamHandler::OnEndOfStream() {
if (delegate_)
delegate_->OnEndOfStream();
}
void AudioStreamHandler::OnError(assistant_client::AudioOutput::Error error) {
if (delegate_)
delegate_->OnError(error);
}
void AudioStreamHandler::OnStopped() {
delegate_->OnStopped();
media_data_source_->set_delegate(nullptr);
delegate_ = nullptr;
}
void AudioStreamHandler::OnDecoderInitializedOnThread(bool success,
int bytes_per_sample,
int samples_per_second,
int channels) {
if (!success) {
OnError(assistant_client::AudioOutput::Error::FATAL_ERROR);
std::move(start_device_owner_on_main_thread_);
return;
}
DCHECK(bytes_per_sample == 2 || bytes_per_sample == 4);
const assistant_client::OutputStreamFormat format = {
bytes_per_sample == 2
? assistant_client::OutputStreamEncoding::STREAM_PCM_S16
: assistant_client::OutputStreamEncoding::STREAM_PCM_S32,
/*pcm_sample_rate=*/samples_per_second,
/*pcm_num_channels=*/channels};
if (!device_owner_started_) {
DCHECK(start_device_owner_on_main_thread_);
DCHECK(!on_filled_);
std::move(start_device_owner_on_main_thread_).Run(format);
device_owner_started_ = true;
}
}
void AudioStreamHandler::FillDecodedBuffer(void* buffer, int buffer_size) {
if (on_filled_ && (decoded_data_.size() > 0 || no_more_data_)) {
int size_copied = 0;
// Fill buffer with data not more than requested.
while (!decoded_data_.empty() && size_copied < buffer_size) {
std::vector<uint8_t>& data = decoded_data_.front();
int audio_buffer_size = static_cast<int>(data.size());
if (size_copied + audio_buffer_size > buffer_size)
audio_buffer_size = buffer_size - size_copied;
memcpy(reinterpret_cast<uint8_t*>(buffer) + size_copied, data.data(),
audio_buffer_size);
size_copied += audio_buffer_size;
if (audio_buffer_size < static_cast<int>(data.size()))
data.erase(data.begin(), data.begin() + audio_buffer_size);
else
decoded_data_.pop_front();
}
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&AudioStreamHandler::OnFillBufferOnThread,
weak_factory_.GetWeakPtr(),
std::move(on_filled_), size_copied));
}
if (decoded_data_.empty() && !no_more_data_) {
task_runner_->PostTask(FROM_HERE,
base::BindOnce(&AudioStreamHandler::DecodeOnThread,
weak_factory_.GetWeakPtr()));
}
}
void AudioStreamHandler::OnFillBufferOnThread(
assistant_client::Callback1<int> on_filled,
int num_bytes) {
std::move(on_filled)(num_bytes);
}
void AudioStreamHandler::DecodeOnThread() {
if (is_decoding_)
return;
is_decoding_ = true;
audio_decoder_->Decode();
}
} // namespace assistant
} // namespace chromeos
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMEOS_SERVICES_ASSISTANT_PLATFORM_AUDIO_STREAM_HANDLER_H_
#define CHROMEOS_SERVICES_ASSISTANT_PLATFORM_AUDIO_STREAM_HANDLER_H_
#include "base/macros.h"
#include "base/single_thread_task_runner.h"
#include "base/synchronization/lock.h"
#include "chromeos/services/assistant/public/mojom/assistant_audio_decoder.mojom.h"
#include "libassistant/shared/public/platform_audio_output.h"
#include "mojo/public/cpp/bindings/associated_binding.h"
#include "mojo/public/cpp/bindings/binding.h"
namespace service_manager {
class Connector;
} // namespace service_manager
namespace chromeos {
namespace assistant {
class AudioMediaDataSource;
class AudioStreamHandler : public mojom::AssistantAudioDecoderClient,
public assistant_client::AudioOutput::Delegate {
public:
using InitCB =
base::OnceCallback<void(const assistant_client::OutputStreamFormat&)>;
AudioStreamHandler(service_manager::Connector* connector,
scoped_refptr<base::SequencedTaskRunner> task_runner);
~AudioStreamHandler() override;
// Called on main thread.
void StartAudioDecoder(assistant_client::AudioOutput::Delegate* delegate,
InitCB start_device_owner_on_main_thread);
// mojom::AssistantAudioDecoderClient overrides:
// Called by |audio_decoder_| on utility thread.
void OnNewBuffers(const std::vector<std::vector<uint8_t>>& buffers) override;
// assistant_client::AudioOutput::Delegate overrides:
// Called by AudioDeviceOwner on main thread.
void FillBuffer(void* buffer,
int buffer_size,
int64_t playback_timestamp,
assistant_client::Callback1<int> on_decoded) override;
void OnEndOfStream() override;
void OnError(assistant_client::AudioOutput::Error error) override;
void OnStopped() override;
private:
// Calls AudioDeviceOwner to start on main thread.
void OnDecoderInitialized(bool success,
int bytes_per_sample,
int samples_per_second,
int channels);
void OnDecoderInitializedOnThread(bool success,
int bytes_per_sample,
int samples_per_second,
int channels);
// Called by |FillBuffer()| to fill available data. If no available data, it
// will call |DecodeOnThread()| to get more data.
void FillDecodedBuffer(void* buffer, int buffer_size);
// Fills buffer to AudioDeviceOwner on main thread.
void OnFillBufferOnThread(assistant_client::Callback1<int> on_decoded,
int num_bytes);
// Calls |audio_decoder_| to decode on main thread.
void DecodeOnThread();
service_manager::Connector* connector_;
scoped_refptr<base::SequencedTaskRunner> task_runner_;
assistant_client::AudioOutput::Delegate* delegate_;
mojom::AssistantAudioDecoderFactoryPtr audio_decoder_factory_ptr_;
mojo::Binding<mojom::AssistantAudioDecoderClient> client_binding_;
std::unique_ptr<AudioMediaDataSource> media_data_source_;
mojom::AssistantAudioDecoderPtr audio_decoder_;
// True when there is more decoded data.
bool no_more_data_ = false;
// True if |Decode()| called and not all decoded buffers are received, e.g.
// |buffers_to_receive_| != 0.
bool is_decoding_ = false;
// Temporary storage of |buffer| passed by |FillBuffer|.
void* buffer_to_copy_ = nullptr;
// Temporary storage of |buffer_size| passed by |FillBuffer|.
int size_to_copy_ = 0;
// Temporary storage of |on_filled| passed by |FillBuffer|.
assistant_client::Callback1<int> on_filled_;
// True after |start_device_owner_on_main_thread_| is called.
bool device_owner_started_ = false;
InitCB start_device_owner_on_main_thread_;
base::circular_deque<std::vector<uint8_t>> decoded_data_;
base::WeakPtrFactory<AudioStreamHandler> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(AudioStreamHandler);
};
} // namespace assistant
} // namespace chromeos
#endif // CHROMEOS_SERVICES_ASSISTANT_PLATFORM_AUDIO_STREAM_HANDLER_H_
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment