Commit 4f6cf809 authored by wutao's avatar wutao Committed by Commit Bot

assistant: Close decoder on stop

Decoder could read media data when the data source is deleted. This cl
fixes this by allowing decoder to clean up defore deletion.

Bug: b/118403995
Test: manual.
Change-Id: I1c7336e417f8a8b5d57880d49e126fbc8873a6d4
Reviewed-on: https://chromium-review.googlesource.com/c/1304066
Commit-Queue: Tao Wu <wutao@chromium.org>
Reviewed-by: default avatarXiaohui Chen <xiaohuic@chromium.org>
Reviewed-by: default avatarSam McNally <sammc@chromium.org>
Cr-Commit-Position: refs/heads/master@{#604094}
parent c9ef934d
......@@ -34,8 +34,11 @@ AssistantAudioDecoder::AssistantAudioDecoder(
client_(std::move(client)),
task_runner_(base::ThreadTaskRunnerHandle::Get()),
data_source_(std::make_unique<IPCDataSource>(std::move(data_source))),
media_thread_(std::make_unique<base::Thread>("media_thread")) {
media_thread_(std::make_unique<base::Thread>("media_thread")),
weak_factory_(this) {
CHECK(media_thread_->Start());
client_.set_connection_error_handler(base::BindOnce(
&AssistantAudioDecoder::OnConnectionError, base::Unretained(this)));
}
AssistantAudioDecoder::~AssistantAudioDecoder() = default;
......@@ -47,57 +50,83 @@ void AssistantAudioDecoder::Decode() {
}
void AssistantAudioDecoder::OpenDecoder(OpenDecoderCallback callback) {
DCHECK(!open_callback_);
open_callback_ = std::move(callback);
media_thread_->task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&AssistantAudioDecoder::OpenDecoderOnMediaThread,
base::Unretained(this), std::move(callback)));
base::Unretained(this)));
}
void AssistantAudioDecoder::OpenDecoderOnMediaThread(
OpenDecoderCallback callback) {
void AssistantAudioDecoder::CloseDecoder(CloseDecoderCallback callback) {
DCHECK(!close_callback_);
close_callback_ = std::move(callback);
media_thread_->task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&AssistantAudioDecoder::CloseDecoderOnMediaThread,
base::Unretained(this)));
}
void AssistantAudioDecoder::OpenDecoderOnMediaThread() {
bool read_ok = true;
protocol_ = std::make_unique<media::BlockingUrlProtocol>(
data_source_.get(), base::BindRepeating(&OnError, &read_ok));
decoder_ = std::make_unique<media::AudioFileReader>(protocol_.get());
if (!decoder_->Open() || !read_ok) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AssistantAudioDecoder::OnDecoderErrorOnThread,
base::Unretained(this), std::move(callback)));
if (closed_ || !decoder_->Open() || !read_ok) {
CloseDecoderOnMediaThread();
return;
}
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AssistantAudioDecoder::OnDecoderInitializedOnThread,
base::Unretained(this), std::move(callback),
decoder_->sample_rate(), decoder_->channels()));
weak_factory_.GetWeakPtr(), decoder_->sample_rate(),
decoder_->channels()));
}
void AssistantAudioDecoder::DecodeOnMediaThread() {
std::vector<std::unique_ptr<media::AudioBus>> decoded_audio_packets;
// Experimental number of decoded packets before sending to |client_|.
constexpr int kPacketsToRead = 16;
decoder_->Read(&decoded_audio_packets, kPacketsToRead);
DCHECK(decoder_);
// The client expects to be called |OnNewBuffers()| so that to return
// AudioDeviceOwner's |FillBuffer()| call. If |closed_| is true, still return
// empty |decoded_audio_packets| to indicate no more data available.
if (!closed_)
decoder_->Read(&decoded_audio_packets, kPacketsToRead);
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AssistantAudioDecoder::OnBufferDecodedOnThread,
base::Unretained(this), std::move(decoded_audio_packets)));
FROM_HERE, base::BindOnce(&AssistantAudioDecoder::OnBufferDecodedOnThread,
weak_factory_.GetWeakPtr(),
std::move(decoded_audio_packets)));
}
void AssistantAudioDecoder::CloseDecoderOnMediaThread() {
// |decoder_| may not be initialized.
if (decoder_)
decoder_->Close();
closed_ = true;
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&AssistantAudioDecoder::RunCallbacksAsClosed,
weak_factory_.GetWeakPtr()));
}
void AssistantAudioDecoder::OnDecoderInitializedOnThread(
OpenDecoderCallback callback,
int sample_rate,
int channels) {
std::move(callback).Run(/*success=*/true, kBytesPerSample, sample_rate,
channels);
DCHECK(open_callback_);
std::move(open_callback_)
.Run(/*success=*/true, kBytesPerSample, sample_rate, channels);
}
void AssistantAudioDecoder::OnBufferDecodedOnThread(
const std::vector<std::unique_ptr<media::AudioBus>>&
decoded_audio_packets) {
if (!client_)
return;
std::vector<std::vector<uint8_t>> buffers;
for (const auto& audio_bus : decoded_audio_packets) {
const int bytes_to_alloc =
......@@ -110,12 +139,25 @@ void AssistantAudioDecoder::OnBufferDecodedOnThread(
client_->OnNewBuffers(buffers);
}
void AssistantAudioDecoder::OnDecoderErrorOnThread(
OpenDecoderCallback callback) {
std::move(callback).Run(/*success=*/false,
/*bytes_per_sample=*/0,
/*samples_per_second=*/0,
/*channels=*/0);
void AssistantAudioDecoder::OnConnectionError() {
client_ = nullptr;
media_thread_->task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&AssistantAudioDecoder::CloseDecoderOnMediaThread,
base::Unretained(this)));
}
void AssistantAudioDecoder::RunCallbacksAsClosed() {
if (open_callback_) {
std::move(open_callback_)
.Run(/*success=*/false,
/*bytes_per_sample=*/0,
/*samples_per_second=*/0,
/*channels=*/0);
}
if (close_callback_)
std::move(close_callback_).Run();
}
} // namespace assistant
......
......@@ -8,6 +8,7 @@
#include <memory>
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "chromeos/services/assistant/public/mojom/assistant_audio_decoder.mojom.h"
namespace media {
......@@ -34,30 +35,38 @@ class AssistantAudioDecoder : public mojom::AssistantAudioDecoder {
// Called by |client_| on main thread.
void OpenDecoder(OpenDecoderCallback callback) override;
void Decode() override;
void CloseDecoder(CloseDecoderCallback callback) override;
private:
// Calls |decoder_| to decode on media thread.
void OpenDecoderOnMediaThread(OpenDecoderCallback callback);
void OpenDecoderOnMediaThread();
void DecodeOnMediaThread();
void CloseDecoderOnMediaThread();
// Calls |client_| methods on main thread.
void OnDecoderInitializedOnThread(OpenDecoderCallback callback,
int sample_rate,
int channels);
void OnDecoderErrorOnThread(OpenDecoderCallback callback);
void OnDecoderInitializedOnThread(int sample_rate, int channels);
void OnBufferDecodedOnThread(
const std::vector<std::unique_ptr<media::AudioBus>>&
decoded_audio_buffers);
void OnConnectionError();
void RunCallbacksAsClosed();
const std::unique_ptr<service_manager::ServiceContextRef> service_ref_;
mojom::AssistantAudioDecoderClientPtr client_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
scoped_refptr<base::SequencedTaskRunner> task_runner_;
OpenDecoderCallback open_callback_;
CloseDecoderCallback close_callback_;
bool closed_ = false;
std::unique_ptr<media::DataSource> data_source_;
std::unique_ptr<media::BlockingUrlProtocol> protocol_;
std::unique_ptr<media::AudioFileReader> decoder_;
std::unique_ptr<base::Thread> media_thread_;
base::WeakPtrFactory<AssistantAudioDecoder> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(AssistantAudioDecoder);
};
......
......@@ -7,14 +7,11 @@
#include <stdint.h>
#include "base/macros.h"
#include "base/threading/thread_checker.h"
#include "chromeos/services/assistant/public/mojom/assistant_audio_decoder.mojom.h"
#include "media/base/data_source.h"
namespace base {
class TaskRunner;
}
namespace chromeos {
namespace assistant {
......@@ -49,12 +46,14 @@ class IPCDataSource : public media::DataSource {
mojom::AssistantMediaDataSourcePtr media_data_source_;
scoped_refptr<base::TaskRunner> utility_task_runner_;
scoped_refptr<base::SequencedTaskRunner> utility_task_runner_;
THREAD_CHECKER(utility_thread_checker_);
// Enforces that the DataSource methods are called on one other thread only.
THREAD_CHECKER(data_source_thread_checker_);
DISALLOW_COPY_AND_ASSIGN(IPCDataSource);
};
} // namespace assistant
......
......@@ -26,6 +26,7 @@ class AudioMediaDataSource : public mojom::AssistantMediaDataSource {
// mojom::MediaDataSource implementation.
// Called by utility process. Must be called after |set_delegate()|.
// The caller must wait for callback to finish before issuing the next read.
void Read(uint32_t size,
mojom::AssistantMediaDataSource::ReadCallback callback) override;
......
......@@ -84,15 +84,17 @@ class AudioOutputImpl : public assistant_client::AudioOutput {
service_manager::Connector* connector,
scoped_refptr<base::SequencedTaskRunner> task_runner,
scoped_refptr<base::SequencedTaskRunner> background_task_runner,
mojom::AssistantAudioDecoderFactory* audio_decoder_factory,
assistant_client::OutputStreamType type,
assistant_client::OutputStreamFormat format)
: connector_(connector),
main_thread_task_runner_(task_runner),
background_thread_task_runner_(background_task_runner),
audio_decoder_factory_(audio_decoder_factory),
stream_type_(type),
format_(format),
audio_stream_handler_(
std::make_unique<AudioStreamHandler>(connector_, task_runner)),
std::make_unique<AudioStreamHandler>(task_runner)),
device_owner_(
std::make_unique<AudioDeviceOwner>(task_runner,
background_task_runner)) {}
......@@ -121,7 +123,8 @@ class AudioOutputImpl : public assistant_client::AudioOutput {
FROM_HERE,
base::BindOnce(
&AudioStreamHandler::StartAudioDecoder,
base::Unretained(audio_stream_handler_.get()), delegate,
base::Unretained(audio_stream_handler_.get()),
audio_decoder_factory_, delegate,
base::BindOnce(&AudioDeviceOwner::StartOnMainThread,
base::Unretained(device_owner_.get()),
audio_stream_handler_.get(), connector_)));
......@@ -151,6 +154,7 @@ class AudioOutputImpl : public assistant_client::AudioOutput {
service_manager::Connector* connector_;
scoped_refptr<base::SequencedTaskRunner> main_thread_task_runner_;
scoped_refptr<base::SequencedTaskRunner> background_thread_task_runner_;
mojom::AssistantAudioDecoderFactory* audio_decoder_factory_;
const assistant_client::OutputStreamType stream_type_;
assistant_client::OutputStreamFormat format_;
......@@ -234,7 +238,11 @@ AudioOutputProviderImpl::AudioOutputProviderImpl(
: volume_control_impl_(connector),
connector_(connector),
main_thread_task_runner_(base::ThreadTaskRunnerHandle::Get()),
background_task_runner_(background_task_runner) {}
background_task_runner_(background_task_runner) {
connector_->BindInterface(mojom::kAudioDecoderServiceName,
mojo::MakeRequest(&audio_decoder_factory_ptr_));
audio_decoder_factory_ = audio_decoder_factory_ptr_.get();
}
AudioOutputProviderImpl::~AudioOutputProviderImpl() = default;
......@@ -244,7 +252,8 @@ assistant_client::AudioOutput* AudioOutputProviderImpl::CreateAudioOutput(
// Owned by one arbitrary thread inside libassistant. It will be destroyed
// once assistant_client::AudioOutput::Delegate::OnStopped() is called.
return new AudioOutputImpl(connector_, main_thread_task_runner_,
background_task_runner_, type, stream_format);
background_task_runner_, audio_decoder_factory_,
type, stream_format);
}
std::vector<assistant_client::OutputStreamEncoding>
......
......@@ -11,6 +11,7 @@
#include "ash/public/interfaces/assistant_volume_control.mojom.h"
#include "base/macros.h"
#include "base/single_thread_task_runner.h"
#include "chromeos/services/assistant/public/mojom/assistant_audio_decoder.mojom.h"
#include "libassistant/shared/public/platform_audio_output.h"
#include "media/base/audio_block_fifo.h"
#include "media/base/audio_parameters.h"
......@@ -90,6 +91,8 @@ class AudioOutputProviderImpl : public assistant_client::AudioOutputProvider {
service_manager::Connector* connector_;
scoped_refptr<base::SequencedTaskRunner> main_thread_task_runner_;
scoped_refptr<base::SequencedTaskRunner> background_task_runner_;
mojom::AssistantAudioDecoderFactoryPtr audio_decoder_factory_ptr_;
mojom::AssistantAudioDecoderFactory* audio_decoder_factory_;
DISALLOW_COPY_AND_ASSIGN(AudioOutputProviderImpl);
};
......
......@@ -7,34 +7,28 @@
#include "ash/public/interfaces/constants.mojom.h"
#include "chromeos/services/assistant/platform/audio_media_data_source.h"
#include "chromeos/services/assistant/public/mojom/constants.mojom.h"
#include "services/service_manager/public/cpp/connector.h"
namespace chromeos {
namespace assistant {
AudioStreamHandler::AudioStreamHandler(
service_manager::Connector* connector,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: connector_(connector),
task_runner_(task_runner),
client_binding_(this),
weak_factory_(this) {}
: task_runner_(task_runner), client_binding_(this), weak_factory_(this) {}
AudioStreamHandler::~AudioStreamHandler() = default;
void AudioStreamHandler::StartAudioDecoder(
mojom::AssistantAudioDecoderFactory* audio_decoder_factory,
assistant_client::AudioOutput::Delegate* delegate,
InitCB on_inited) {
mojom::AssistantAudioDecoderClientPtr client;
client_binding_.Bind(mojo::MakeRequest(&client));
connector_->BindInterface(mojom::kAudioDecoderServiceName,
mojo::MakeRequest(&audio_decoder_factory_ptr_));
mojom::AssistantMediaDataSourcePtr data_source;
media_data_source_ =
std::make_unique<AudioMediaDataSource>(&data_source, task_runner_);
audio_decoder_factory_ptr_->CreateAssistantAudioDecoder(
audio_decoder_factory->CreateAssistantAudioDecoder(
mojo::MakeRequest(&audio_decoder_), std::move(client),
std::move(data_source));
......@@ -45,17 +39,6 @@ void AudioStreamHandler::StartAudioDecoder(
&AudioStreamHandler::OnDecoderInitialized, weak_factory_.GetWeakPtr()));
}
void AudioStreamHandler::OnDecoderInitialized(bool success,
uint32_t bytes_per_sample,
uint32_t samples_per_second,
uint32_t channels) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AudioStreamHandler::OnDecoderInitializedOnThread,
weak_factory_.GetWeakPtr(), success, bytes_per_sample,
samples_per_second, channels));
}
void AudioStreamHandler::OnNewBuffers(
const std::vector<std::vector<uint8_t>>& buffers) {
if (buffers.size() == 0)
......@@ -94,9 +77,26 @@ void AudioStreamHandler::OnError(assistant_client::AudioOutput::Error error) {
}
void AudioStreamHandler::OnStopped() {
delegate_->OnStopped();
stopped_ = true;
// Do not provide more source data.
media_data_source_->set_delegate(nullptr);
delegate_ = nullptr;
// Call |delegate_->OnStopped()| will delete |this|. Call |CloseDecoder| to
// clean up first.
audio_decoder_->CloseDecoder(base::BindOnce(&AudioStreamHandler::StopDelegate,
weak_factory_.GetWeakPtr()));
}
void AudioStreamHandler::OnDecoderInitialized(bool success,
uint32_t bytes_per_sample,
uint32_t samples_per_second,
uint32_t channels) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AudioStreamHandler::OnDecoderInitializedOnThread,
weak_factory_.GetWeakPtr(), success, bytes_per_sample,
samples_per_second, channels));
}
void AudioStreamHandler::OnDecoderInitializedOnThread(
......@@ -105,7 +105,12 @@ void AudioStreamHandler::OnDecoderInitializedOnThread(
uint32_t samples_per_second,
uint32_t channels) {
if (!success) {
OnError(assistant_client::AudioOutput::Error::FATAL_ERROR);
// In the case that both |OpenDecoder()| and |CloseDecoder()| were called,
// there is no need to call |OnError()|, since we are going to call
// |OnStopped()| soon.
if (!stopped_)
OnError(assistant_client::AudioOutput::Error::FATAL_ERROR);
std::move(start_device_owner_on_main_thread_);
return;
}
......@@ -123,6 +128,11 @@ void AudioStreamHandler::OnDecoderInitializedOnThread(
}
}
void AudioStreamHandler::StopDelegate() {
delegate_->OnStopped();
delegate_ = nullptr;
}
void AudioStreamHandler::FillDecodedBuffer(void* buffer, int buffer_size) {
if (on_filled_ && (decoded_data_.size() > 0 || no_more_data_)) {
int size_copied = 0;
......
......@@ -13,10 +13,6 @@
#include "mojo/public/cpp/bindings/associated_binding.h"
#include "mojo/public/cpp/bindings/binding.h"
namespace service_manager {
class Connector;
} // namespace service_manager
namespace chromeos {
namespace assistant {
......@@ -28,13 +24,15 @@ class AudioStreamHandler : public mojom::AssistantAudioDecoderClient,
using InitCB =
base::OnceCallback<void(const assistant_client::OutputStreamFormat&)>;
AudioStreamHandler(service_manager::Connector* connector,
scoped_refptr<base::SequencedTaskRunner> task_runner);
explicit AudioStreamHandler(
scoped_refptr<base::SequencedTaskRunner> task_runner);
~AudioStreamHandler() override;
// Called on main thread.
void StartAudioDecoder(assistant_client::AudioOutput::Delegate* delegate,
InitCB start_device_owner_on_main_thread);
void StartAudioDecoder(
mojom::AssistantAudioDecoderFactory* audio_decoder_factory,
assistant_client::AudioOutput::Delegate* delegate,
InitCB start_device_owner_on_main_thread);
// mojom::AssistantAudioDecoderClient overrides:
// Called by |audio_decoder_| on utility thread.
......@@ -60,6 +58,7 @@ class AudioStreamHandler : public mojom::AssistantAudioDecoderClient,
uint32_t bytes_per_sample,
uint32_t samples_per_second,
uint32_t channels);
void StopDelegate();
// Called by |FillBuffer()| to fill available data. If no available data, it
// will call |DecodeOnThread()| to get more data.
......@@ -72,11 +71,9 @@ class AudioStreamHandler : public mojom::AssistantAudioDecoderClient,
// Calls |audio_decoder_| to decode on main thread.
void DecodeOnThread();
service_manager::Connector* connector_;
scoped_refptr<base::SequencedTaskRunner> task_runner_;
assistant_client::AudioOutput::Delegate* delegate_;
mojom::AssistantAudioDecoderFactoryPtr audio_decoder_factory_ptr_;
mojo::Binding<mojom::AssistantAudioDecoderClient> client_binding_;
std::unique_ptr<AudioMediaDataSource> media_data_source_;
mojom::AssistantAudioDecoderPtr audio_decoder_;
......@@ -86,6 +83,8 @@ class AudioStreamHandler : public mojom::AssistantAudioDecoderClient,
// True if |Decode()| called and not all decoded buffers are received, e.g.
// |buffers_to_receive_| != 0.
bool is_decoding_ = false;
// True after |OnStopped()| called.
bool stopped_ = false;
// Temporary storage of |buffer| passed by |FillBuffer|.
void* buffer_to_copy_ = nullptr;
......
......@@ -24,6 +24,9 @@ interface AssistantAudioDecoder {
// Reads the audio data and decodes.
Decode();
// Close decoder to clean up.
CloseDecoder() => ();
};
// Interface for assistant audio decoder service to call into client.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment