Commit a7d24332 authored by Kehuang Li's avatar Kehuang Li Committed by Commit Bot

[Chromecast] Add capture service receiver

This is the receiver side of Cast audio capture service that will
observing platform audio input hoocked in assistant service. More info
can be found in the design go/comms-virtual-mic.

tests on desktop.

Bug: internal: 133880006
Merge-With: eureka-internal/274195
Test: Make OTT call on Home after patching all CLs. Run and pass unit
Change-Id: I662c77f4fae819ced791861ca5b762cdf08e8f82
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1655085
Commit-Queue: Kehuang Li <kehuangli@chromium.org>
Reviewed-by: default avatarDavid Benjamin <davidben@chromium.org>
Reviewed-by: default avatarYuchen Liu <yucliu@chromium.org>
Reviewed-by: default avatarKenneth MacKay <kmackay@chromium.org>
Cr-Commit-Position: refs/heads/master@{#675845}
parent be3cccbb
...@@ -118,6 +118,9 @@ declare_args() { ...@@ -118,6 +118,9 @@ declare_args() {
# Recording happens at this sample rate. Must be 16000, 48000 or 96000 Hz. # Recording happens at this sample rate. Must be 16000, 48000 or 96000 Hz.
audio_input_sample_rate = 16000 audio_input_sample_rate = 16000
# Whether use unix sockets in Cast input/output stream.
use_unix_sockets = is_linux
} }
declare_args() { declare_args() {
......
...@@ -63,6 +63,7 @@ test("cast_media_unittests") { ...@@ -63,6 +63,7 @@ test("cast_media_unittests") {
"//chromecast/base", "//chromecast/base",
"//chromecast/base/metrics:test_support", "//chromecast/base/metrics:test_support",
"//chromecast/common/mojom", "//chromecast/common/mojom",
"//chromecast/media/audio/capture_service:unittests",
"//chromecast/media/cma:test_support", "//chromecast/media/cma:test_support",
"//chromecast/media/cma:unittests", "//chromecast/media/cma:unittests",
"//chromecast/public", "//chromecast/public",
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
import("//build/buildflag_header.gni") import("//build/buildflag_header.gni")
import("//chromecast/chromecast.gni") import("//chromecast/chromecast.gni")
import("//media/media_options.gni") import("//media/media_options.gni")
import("//testing/test.gni")
declare_args() { declare_args() {
# Defines bounds for the output buffer size (in frames) # Defines bounds for the output buffer size (in frames)
...@@ -27,6 +28,7 @@ cast_source_set("audio") { ...@@ -27,6 +28,7 @@ cast_source_set("audio") {
"//base", "//base",
"//chromecast/base", "//chromecast/base",
"//chromecast/common/mojom", "//chromecast/common/mojom",
"//chromecast/media/audio/capture_service:receiver",
"//chromecast/media/audio/mixer_service:connection", "//chromecast/media/audio/mixer_service:connection",
"//chromecast/media/audio/mixer_service:proto", "//chromecast/media/audio/mixer_service:proto",
"//chromecast/media/base", "//chromecast/media/base",
...@@ -101,3 +103,19 @@ buildflag_header("audio_buildflags") { ...@@ -101,3 +103,19 @@ buildflag_header("audio_buildflags") {
flags += [ "AUDIO_INPUT_SAMPLE_RATE=$audio_input_sample_rate" ] flags += [ "AUDIO_INPUT_SAMPLE_RATE=$audio_input_sample_rate" ]
} }
} }
cast_source_set("test_support") {
testonly = true
sources = [
"mock_audio_input_callback.h",
]
public_deps = [
"//testing/gmock",
]
deps = [
"//media",
]
}
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import("//build/buildflag_header.gni")
import("//chromecast/chromecast.gni")
import("//testing/libfuzzer/fuzzer_test.gni")
import("//testing/test.gni")
buildflag_header("buildflags") {
header = "capture_service_buildflags.h"
flags = [ "USE_UNIX_SOCKETS=$use_unix_sockets" ]
}
cast_source_set("common") {
sources = [
"constants.h",
]
deps = [
":buildflags",
]
}
cast_source_set("receiver") {
sources = [
"capture_service_receiver.cc",
"capture_service_receiver.h",
"message_parsing_util.cc",
"message_parsing_util.h",
]
public_deps = [
":common",
]
deps = [
":buildflags",
"//base",
"//chromecast/base",
"//chromecast/net:small_message_socket",
"//media",
"//net",
]
}
cast_source_set("unittests") {
testonly = true
sources = [
"capture_service_receiver_unittest.cc",
"message_parsing_util_unittest.cc",
]
deps = [
":receiver",
"//base",
"//base/test:test_support",
"//chromecast/media/audio:test_support",
"//chromecast/net:test_support",
"//net",
"//testing/gtest",
]
}
fuzzer_test("message_parsing_fuzzer") {
sources = [
"message_parsing_fuzzer.cc",
]
deps = [
":receiver",
"//base",
]
}
include_rules = [
"+chromecast/net",
"+net",
]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/audio/capture_service/capture_service_receiver.h"
#include <memory>
#include <string>
#include <utility>
#include "base/bind.h"
#include "base/callback.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "chromecast/media/audio/capture_service/capture_service_buildflags.h"
#include "chromecast/media/audio/capture_service/constants.h"
#include "chromecast/media/audio/capture_service/message_parsing_util.h"
#include "chromecast/net/small_message_socket.h"
#include "media/base/limits.h"
#include "net/base/io_buffer.h"
#include "net/socket/stream_socket.h"
#if BUILDFLAG(USE_UNIX_SOCKETS)
#include "net/socket/unix_domain_client_socket_posix.h"
#else
#include "net/base/address_list.h"
#include "net/base/ip_address.h"
#include "net/base/ip_endpoint.h"
#include "net/log/net_log_source.h"
#include "net/socket/tcp_client_socket.h"
#endif // BUILDFLAG(USE_UNIX_SOCKETS)
// Helper macro to post tasks to the io thread. It is safe to use unretained
// |this|, since |this| owns the thread.
#define ENSURE_ON_IO_THREAD(method, ...) \
if (!task_runner_->RunsTasksInCurrentSequence()) { \
task_runner_->PostTask( \
FROM_HERE, base::BindOnce(&CaptureServiceReceiver::method, \
base::Unretained(this), ##__VA_ARGS__)); \
return; \
}
namespace chromecast {
namespace media {
namespace {
constexpr base::TimeDelta kConnectTimeout = base::TimeDelta::FromSeconds(1);
constexpr base::TimeDelta kInactivityTimeout = base::TimeDelta::FromSeconds(5);
} // namespace
class CaptureServiceReceiver::Socket : public SmallMessageSocket {
public:
Socket(std::unique_ptr<net::StreamSocket> socket, int channels);
~Socket() override;
void Start(::media::AudioInputStream::AudioInputCallback* input_callback);
private:
// SmallMessageSocket implementation:
void OnError(int error) override;
void OnEndOfStream() override;
bool OnMessage(char* data, int size) override;
void OnInactivityTimeout();
bool HandleAudio(std::unique_ptr<::media::AudioBus> audio, int64_t timestamp);
void ReportErrorAndStop();
// Number of audio capture channels that audio manager defines.
const int channels_;
::media::AudioInputStream::AudioInputCallback* input_callback_;
base::OneShotTimer inactivity_timer_;
DISALLOW_COPY_AND_ASSIGN(Socket);
};
CaptureServiceReceiver::Socket::Socket(
std::unique_ptr<net::StreamSocket> socket,
int channels)
: SmallMessageSocket(std::move(socket)),
channels_(channels),
input_callback_(nullptr) {
DCHECK_GT(channels_, 0);
DCHECK_LE(channels_, ::media::limits::kMaxChannels);
}
CaptureServiceReceiver::Socket::~Socket() = default;
void CaptureServiceReceiver::Socket::Start(
::media::AudioInputStream::AudioInputCallback* input_callback) {
input_callback_ = input_callback;
inactivity_timer_.Start(FROM_HERE, kInactivityTimeout, this,
&CaptureServiceReceiver::Socket::OnInactivityTimeout);
ReceiveMessages();
}
void CaptureServiceReceiver::Socket::ReportErrorAndStop() {
inactivity_timer_.Stop();
if (input_callback_) {
input_callback_->OnError();
}
input_callback_ = nullptr;
}
void CaptureServiceReceiver::Socket::OnInactivityTimeout() {
LOG(ERROR) << "Timed out " << this << " due to inactivity";
ReportErrorAndStop();
}
void CaptureServiceReceiver::Socket::OnError(int error) {
LOG(INFO) << "Socket error from " << this << ": " << error;
ReportErrorAndStop();
}
void CaptureServiceReceiver::Socket::OnEndOfStream() {
LOG(ERROR) << "Got EOS " << this;
ReportErrorAndStop();
}
bool CaptureServiceReceiver::Socket::OnMessage(char* data, int size) {
// TODO(https://crbug.com/982014): Remove the DCHECK once using unsigned
// |size|.
DCHECK_GT(size, 0);
int64_t timestamp = 0;
auto audio = capture_service::ReadDataToAudioBus(data, size, &timestamp);
if (!audio.has_value()) {
ReportErrorAndStop();
return false;
}
if (input_callback_) {
inactivity_timer_.Reset();
}
return HandleAudio(std::move(audio.value()), timestamp);
}
bool CaptureServiceReceiver::Socket::HandleAudio(
std::unique_ptr<::media::AudioBus> audio,
int64_t timestamp) {
if (!input_callback_) {
LOG(INFO) << "Received audio before stream metadata; ignoring";
return false;
}
DCHECK(audio);
DCHECK_EQ(channels_, audio->channels());
input_callback_->OnData(
audio.get(),
base::TimeTicks() + base::TimeDelta::FromMicroseconds(timestamp),
/* volume =*/1.0);
return true;
}
CaptureServiceReceiver::CaptureServiceReceiver(
const ::media::AudioParameters& audio_params)
: audio_params_(audio_params), io_thread_(__func__) {
base::Thread::Options options;
options.message_loop_type = base::MessageLoop::TYPE_IO;
// TODO(b/137106361): Tweak the thread priority once the thread priority for
// speech processing gets fixed.
options.priority = base::ThreadPriority::DISPLAY;
CHECK(io_thread_.StartWithOptions(options));
task_runner_ = io_thread_.task_runner();
DCHECK(task_runner_);
}
CaptureServiceReceiver::~CaptureServiceReceiver() {
Stop();
io_thread_.Stop();
}
void CaptureServiceReceiver::Start(
::media::AudioInputStream::AudioInputCallback* input_callback) {
ENSURE_ON_IO_THREAD(Start, input_callback);
#if BUILDFLAG(USE_UNIX_SOCKETS)
std::string path = capture_service::kDefaultUnixDomainSocketPath;
std::unique_ptr<net::StreamSocket> connecting_socket =
std::make_unique<net::UnixDomainClientSocket>(
path, true /* use_abstract_namespace */);
#else // BUILDFLAG(USE_UNIX_SOCKETS)
int port = capture_service::kDefaultTcpPort;
net::IPEndPoint endpoint(net::IPAddress::IPv4Localhost(), port);
std::unique_ptr<net::StreamSocket> connecting_socket =
std::make_unique<net::TCPClientSocket>(
net::AddressList(endpoint), nullptr, nullptr, net::NetLogSource());
#endif // BUILDFLAG(USE_UNIX_SOCKETS)
StartWithSocket(input_callback, std::move(connecting_socket));
}
void CaptureServiceReceiver::StartWithSocket(
::media::AudioInputStream::AudioInputCallback* input_callback,
std::unique_ptr<net::StreamSocket> connecting_socket) {
ENSURE_ON_IO_THREAD(StartWithSocket, input_callback,
std::move(connecting_socket));
DCHECK(!connecting_socket_);
DCHECK(!socket_);
connecting_socket_ = std::move(connecting_socket);
int result = connecting_socket_->Connect(
base::BindOnce(&CaptureServiceReceiver::OnConnected,
base::Unretained(this), input_callback));
if (result != net::ERR_IO_PENDING) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&CaptureServiceReceiver::OnConnected,
base::Unretained(this), input_callback, result));
return;
}
task_runner_->PostDelayedTask(
FROM_HERE,
base::BindOnce(&CaptureServiceReceiver::OnConnectTimeout,
base::Unretained(this), input_callback),
kConnectTimeout);
}
void CaptureServiceReceiver::OnConnected(
::media::AudioInputStream::AudioInputCallback* input_callback,
int result) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
DCHECK_NE(result, net::ERR_IO_PENDING);
if (!connecting_socket_) {
return;
}
if (result == net::OK) {
socket_ = std::make_unique<Socket>(std::move(connecting_socket_),
audio_params_.channels());
socket_->Start(input_callback);
} else {
LOG(INFO) << "Connecting failed: " << net::ErrorToString(result);
input_callback->OnError();
connecting_socket_.reset();
}
if (!connected_cb_.is_null()) {
std::move(connected_cb_).Run();
}
}
void CaptureServiceReceiver::OnConnectTimeout(
::media::AudioInputStream::AudioInputCallback* input_callback) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
if (!connecting_socket_) {
return;
}
LOG(ERROR) << __func__;
input_callback->OnError();
connecting_socket_.reset();
}
void CaptureServiceReceiver::Stop() {
ENSURE_ON_IO_THREAD(Stop);
connecting_socket_.reset();
socket_.reset();
}
void CaptureServiceReceiver::SetConnectClosureForTest(
base::OnceClosure connected_cb) {
connected_cb_ = std::move(connected_cb);
}
void CaptureServiceReceiver::SetTaskRunnerForTest(
scoped_refptr<base::SequencedTaskRunner> task_runner) {
task_runner_ = std::move(task_runner);
}
} // namespace media
} // namespace chromecast
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_MEDIA_AUDIO_CAPTURE_SERVICE_CAPTURE_SERVICE_RECEIVER_H_
#define CHROMECAST_MEDIA_AUDIO_CAPTURE_SERVICE_CAPTURE_SERVICE_RECEIVER_H_
#include <memory>
#include "base/callback_forward.h"
#include "base/macros.h"
#include "base/memory/scoped_refptr.h"
#include "base/threading/thread.h"
#include "media/audio/audio_io.h"
#include "media/base/audio_parameters.h"
namespace base {
class SequencedTaskRunner;
} // namespace base
namespace net {
class StreamSocket;
} // namespace net
namespace chromecast {
namespace media {
class CaptureServiceReceiver {
public:
explicit CaptureServiceReceiver(const ::media::AudioParameters& audio_params);
~CaptureServiceReceiver();
void Start(::media::AudioInputStream::AudioInputCallback* input_callback);
void Stop();
// Unit test can call this method rather than the public one to inject mocked
// stream socket.
void StartWithSocket(
::media::AudioInputStream::AudioInputCallback* input_callback,
std::unique_ptr<net::StreamSocket> connecting_socket);
// Unit test can wait for the closure as a async way to run to idle. E.g.,
//
// base::RunLoop run_loop;
// SetConnectClosureForTest(run_loop.QuitClosure());
// StartWithSocket(...);
// run_loop.Run();
//
void SetConnectClosureForTest(base::OnceClosure connected_cb);
// Unit test can set test task runner so as to run test in sync. Must be
// called after construction and before any other methods. Do not mix with the
// use of SetConnectClosureForTest().
void SetTaskRunnerForTest(
scoped_refptr<base::SequencedTaskRunner> task_runner);
private:
class Socket;
void OnConnected(
::media::AudioInputStream::AudioInputCallback* input_callback,
int result);
void OnConnectTimeout(
::media::AudioInputStream::AudioInputCallback* input_callback);
const ::media::AudioParameters audio_params_;
// Socket requires IO thread, and low latency input stream requires high
// thread priority. Therefore, a private thread instead of the IO thread from
// browser thread pool is necessary so as to make sure input stream won't be
// blocked nor block other tasks on the same thread.
base::Thread io_thread_;
scoped_refptr<base::SequencedTaskRunner> task_runner_;
std::unique_ptr<net::StreamSocket> connecting_socket_;
std::unique_ptr<Socket> socket_;
base::OnceClosure connected_cb_;
DISALLOW_COPY_AND_ASSIGN(CaptureServiceReceiver);
};
} // namespace media
} // namespace chromecast
#endif // CHROMECAST_MEDIA_AUDIO_CAPTURE_SERVICE_CAPTURE_SERVICE_RECEIVER_H_
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/audio/capture_service/capture_service_receiver.h"
#include <memory>
#include "base/big_endian.h"
#include "base/run_loop.h"
#include "base/test/scoped_task_environment.h"
#include "chromecast/media/audio/mock_audio_input_callback.h"
#include "chromecast/net/mock_stream_socket.h"
#include "net/base/io_buffer.h"
#include "testing/gtest/include/gtest/gtest.h"
using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
namespace chromecast {
namespace media {
namespace capture_service {
namespace {
class MockStreamSocket : public chromecast::MockStreamSocket {
public:
MockStreamSocket() = default;
~MockStreamSocket() override = default;
};
class CaptureServiceReceiverTest : public ::testing::Test {
public:
CaptureServiceReceiverTest()
: receiver_(::media::AudioParameters(
::media::AudioParameters::AUDIO_PCM_LINEAR,
::media::ChannelLayout::CHANNEL_LAYOUT_MONO,
16000,
160)) {}
~CaptureServiceReceiverTest() override = default;
protected:
base::test::ScopedTaskEnvironment scoped_task_environment_;
chromecast::MockAudioInputCallback audio_;
CaptureServiceReceiver receiver_;
};
TEST_F(CaptureServiceReceiverTest, StartStop) {
auto socket1 = std::make_unique<MockStreamSocket>();
auto socket2 = std::make_unique<MockStreamSocket>();
EXPECT_CALL(*socket1, Connect(_)).WillOnce(Return(net::OK));
EXPECT_CALL(*socket1, Read(_, _, _)).WillOnce(Return(net::ERR_IO_PENDING));
EXPECT_CALL(*socket2, Connect(_)).WillOnce(Return(net::OK));
// Sync.
base::RunLoop run_loop;
receiver_.SetConnectClosureForTest(run_loop.QuitClosure());
receiver_.StartWithSocket(&audio_, std::move(socket1));
run_loop.Run();
receiver_.Stop();
// Async.
receiver_.StartWithSocket(&audio_, std::move(socket2));
receiver_.Stop();
}
TEST_F(CaptureServiceReceiverTest, ConnectFailed) {
auto socket = std::make_unique<MockStreamSocket>();
EXPECT_CALL(*socket, Connect(_)).WillOnce(Return(net::ERR_FAILED));
EXPECT_CALL(audio_, OnError());
base::RunLoop run_loop;
receiver_.SetConnectClosureForTest(run_loop.QuitClosure());
receiver_.StartWithSocket(&audio_, std::move(socket));
run_loop.Run();
}
// TODO(https://crbug.com/946657): Add unit tests for timeout once supporting of
// MOCK_TIME for threads other than the main thread is available. Also, update
// the use of task runner in the following tests.
TEST_F(CaptureServiceReceiverTest, ReceiveValidMessage) {
auto socket = std::make_unique<MockStreamSocket>();
EXPECT_CALL(*socket, Connect(_)).WillOnce(Return(net::OK));
EXPECT_CALL(*socket, Read(_, _, _))
.WillOnce(Invoke([](net::IOBuffer* buf, int,
net::CompletionOnceCallback) {
std::vector<char> header(16);
base::BigEndianWriter data_writer(header.data(), header.size());
data_writer.WriteU16(334); // 160 frames + header - data[0], in bytes.
data_writer.WriteU16(1); // Mono channel.
data_writer.WriteU16(0); // Interleaved int16.
data_writer.WriteU16(0); // Padding zero.
data_writer.WriteU64(0); // Timestamp.
std::copy(header.data(), header.data() + header.size(), buf->data());
// No need to fill audio frames.
return 336; // 160 frames + header, in bytes.
}))
.WillOnce(Return(net::ERR_IO_PENDING));
EXPECT_CALL(audio_, OnData(_, _, 1.0 /* volume */));
receiver_.SetTaskRunnerForTest(
scoped_task_environment_.GetMainThreadTaskRunner());
receiver_.StartWithSocket(&audio_, std::move(socket));
scoped_task_environment_.RunUntilIdle();
}
TEST_F(CaptureServiceReceiverTest, ReceiveInvalidMessage) {
auto socket = std::make_unique<MockStreamSocket>();
EXPECT_CALL(*socket, Connect(_)).WillOnce(Return(net::OK));
EXPECT_CALL(*socket, Read(_, _, _))
.WillOnce(Invoke([](net::IOBuffer* buf, int,
net::CompletionOnceCallback) {
std::vector<char> header(16, 0);
base::BigEndianWriter data_writer(header.data(), header.size());
data_writer.WriteU16(14); // header - data[0], in bytes.
std::copy(header.data(), header.data() + header.size(), buf->data());
return 16;
}));
EXPECT_CALL(audio_, OnError());
receiver_.SetTaskRunnerForTest(
scoped_task_environment_.GetMainThreadTaskRunner());
receiver_.StartWithSocket(&audio_, std::move(socket));
scoped_task_environment_.RunUntilIdle();
}
TEST_F(CaptureServiceReceiverTest, ReceiveError) {
auto socket = std::make_unique<MockStreamSocket>();
EXPECT_CALL(*socket, Connect(_)).WillOnce(Return(net::OK));
EXPECT_CALL(*socket, Read(_, _, _))
.WillOnce(Return(net::ERR_CONNECTION_RESET));
EXPECT_CALL(audio_, OnError());
receiver_.SetTaskRunnerForTest(
scoped_task_environment_.GetMainThreadTaskRunner());
receiver_.StartWithSocket(&audio_, std::move(socket));
scoped_task_environment_.RunUntilIdle();
}
TEST_F(CaptureServiceReceiverTest, ReceiveEosMessage) {
auto socket = std::make_unique<MockStreamSocket>();
EXPECT_CALL(*socket, Connect(_)).WillOnce(Return(net::OK));
EXPECT_CALL(*socket, Read(_, _, _)).WillOnce(Return(0));
EXPECT_CALL(audio_, OnError());
receiver_.SetTaskRunnerForTest(
scoped_task_environment_.GetMainThreadTaskRunner());
receiver_.StartWithSocket(&audio_, std::move(socket));
scoped_task_environment_.RunUntilIdle();
}
} // namespace
} // namespace capture_service
} // namespace media
} // namespace chromecast
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_MEDIA_AUDIO_CAPTURE_SERVICE_CONSTANTS_H_
#define CHROMECAST_MEDIA_AUDIO_CAPTURE_SERVICE_CONSTANTS_H_
#include "chromecast/media/audio/capture_service/capture_service_buildflags.h"
namespace chromecast {
namespace media {
namespace capture_service {
#if BUILDFLAG(USE_UNIX_SOCKETS)
constexpr char kDefaultUnixDomainSocketPath[] = "/tmp/capture-service";
#else
constexpr int kDefaultTcpPort = 12855;
#endif
enum SampleFormat {
INTERLEAVED_INT16 = 0,
INTERLEAVED_INT32 = 1,
INTERLEAVED_FLOAT = 2,
PLANAR_INT16 = 3,
PLANAR_INT32 = 4,
PLANAR_FLOAT = 5,
};
} // namespace capture_service
} // namespace media
} // namespace chromecast
#endif // CHROMECAST_MEDIA_AUDIO_CAPTURE_SERVICE_CONSTANTS_H_
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stddef.h>
#include <stdint.h>
#include "base/logging.h"
#include "chromecast/media/audio/capture_service/message_parsing_util.h"
struct Environment {
Environment() { logging::SetMinLogLevel(logging::LOG_FATAL); }
};
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
static Environment env;
int64_t timestamp;
auto audio = chromecast::media::capture_service::ReadDataToAudioBus(
reinterpret_cast<const char*>(data), size, &timestamp);
return 0;
}
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/audio/capture_service/message_parsing_util.h"
#include "base/big_endian.h"
#include "base/logging.h"
#include "base/numerics/checked_math.h"
#include "chromecast/media/audio/capture_service/constants.h"
#include "media/base/limits.h"
namespace chromecast {
namespace media {
namespace capture_service {
namespace {
// Check if audio data is properly aligned and has valid frame size. Return the
// number of frames if they are all good, otherwise return 0 to indicate
// failure.
template <typename T>
int CheckAudioData(int channels, const char* data, size_t data_size) {
if (reinterpret_cast<const uintptr_t>(data) % sizeof(T) != 0u) {
LOG(ERROR) << "Misaligned audio data";
return 0;
}
size_t frame_size = 0;
int frames = 0;
if (!base::CheckMul(channels, sizeof(T)).AssignIfValid(&frame_size) ||
!base::CheckDiv(data_size, frame_size)
.Cast<int>()
.AssignIfValid(&frames)) {
LOG(ERROR) << "Numeric overflow: " << data_size << " / (" << channels
<< " * " << sizeof(T) << ").";
return 0;
}
if (!base::CheckMul(channels, frames).IsValid<int>()) {
LOG(ERROR) << "Numeric overflow: " << channels << " * " << frames << ".";
return 0;
}
if (data_size % frame_size != 0) {
LOG(ERROR) << "Audio data size (" << data_size
<< ") is not an integer number of frames (" << frame_size
<< ").";
return 0;
}
return frames;
}
template <typename Traits>
base::Optional<std::unique_ptr<::media::AudioBus>>
ConvertInterleavedData(int channels, const char* data, size_t data_size) {
const int frames =
CheckAudioData<typename Traits::ValueType>(channels, data, data_size);
if (frames <= 0) {
return base::nullopt;
}
auto audio = ::media::AudioBus::Create(channels, frames);
audio->FromInterleaved<Traits>(
reinterpret_cast<const typename Traits::ValueType*>(data), frames);
return audio;
}
template <typename Traits>
base::Optional<std::unique_ptr<::media::AudioBus>>
ConvertPlanarData(int channels, const char* data, size_t data_size) {
const int frames =
CheckAudioData<typename Traits::ValueType>(channels, data, data_size);
if (frames <= 0) {
return base::nullopt;
}
auto audio = ::media::AudioBus::Create(channels, frames);
const typename Traits::ValueType* base_data =
reinterpret_cast<const typename Traits::ValueType*>(data);
for (int c = 0; c < channels; ++c) {
const typename Traits::ValueType* source = base_data + c * frames;
float* dest = audio->channel(c);
for (int f = 0; f < frames; ++f) {
dest[f] = Traits::ToFloat(source[f]);
}
}
return audio;
}
base::Optional<std::unique_ptr<::media::AudioBus>>
ConvertPlanarFloat(int channels, const char* data, size_t data_size) {
const int frames = CheckAudioData<float>(channels, data, data_size);
if (frames <= 0) {
return base::nullopt;
}
auto audio = ::media::AudioBus::Create(channels, frames);
const float* base_data = reinterpret_cast<const float*>(data);
for (int c = 0; c < channels; ++c) {
const float* source = base_data + c * frames;
std::copy(source, source + frames, audio->channel(c));
}
return audio;
}
base::Optional<std::unique_ptr<::media::AudioBus>>
ConvertData(int channels, int format, const char* data, size_t data_size) {
switch (format) {
case capture_service::SampleFormat::INTERLEAVED_INT16:
return ConvertInterleavedData<::media::SignedInt16SampleTypeTraits>(
channels, data, data_size);
case capture_service::SampleFormat::INTERLEAVED_INT32:
return ConvertInterleavedData<::media::SignedInt32SampleTypeTraits>(
channels, data, data_size);
case capture_service::SampleFormat::INTERLEAVED_FLOAT:
return ConvertInterleavedData<::media::Float32SampleTypeTraits>(
channels, data, data_size);
case capture_service::SampleFormat::PLANAR_INT16:
return ConvertPlanarData<::media::SignedInt16SampleTypeTraits>(
channels, data, data_size);
case capture_service::SampleFormat::PLANAR_INT32:
return ConvertPlanarData<::media::SignedInt32SampleTypeTraits>(
channels, data, data_size);
case capture_service::SampleFormat::PLANAR_FLOAT:
return ConvertPlanarFloat(channels, data, data_size);
}
LOG(ERROR) << "Unknown sample format " << format;
return base::nullopt;
}
} // namespace
base::Optional<std::unique_ptr<::media::AudioBus>>
ReadDataToAudioBus(const char* data, size_t size, int64_t* timestamp) {
// Padding bits are used make sure memory for |data| is well aligned for any
// sample formats, i.e., the total bits of the header is a multiple of
// 32-bits.
uint16_t channels, format, padding;
uint64_t timestamp_us;
base::BigEndianReader data_reader(data, size);
if (!data_reader.ReadU16(&channels) || !data_reader.ReadU16(&format) ||
!data_reader.ReadU16(&padding) || padding != 0 ||
!data_reader.ReadU64(&timestamp_us)) {
LOG(ERROR) << "Invalid message header.";
return base::nullopt;
}
if (channels > ::media::limits::kMaxChannels) {
LOG(ERROR) << "Invalid number of channels: " << channels;
return base::nullopt;
}
if (!base::CheckedNumeric<uint64_t>(timestamp_us)
.Cast<int64_t>()
.AssignIfValid(timestamp)) {
LOG(ERROR) << "Invalid timestamp: " << timestamp_us;
}
return ConvertData(channels, format, data_reader.ptr(),
data_reader.remaining());
}
} // namespace capture_service
} // namespace media
} // namespace chromecast
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_MEDIA_AUDIO_CAPTURE_SERVICE_MESSAGE_PARSING_UTIL_H_
#define CHROMECAST_MEDIA_AUDIO_CAPTURE_SERVICE_MESSAGE_PARSING_UTIL_H_
#include <cstdint>
#include <memory>
#include "base/optional.h"
#include "media/base/audio_bus.h"
namespace chromecast {
namespace media {
namespace capture_service {
// Parse the header of the message and copy audio data to AudioBus. Nullopt will
// be returned if parsing fails. If an invalid timestamp is parsed, it won't be
// assigned to |timestamp| nor stop the function, however, an error message will
// be printed.
// The header of the message consists of <uint16_t channels><uint16_t format>
// <uint16_t padding><uint64_t timestamp_us>, where the unsigned |timestamp_us|
// will be converted to signed |timestamp| if valid.
base::Optional<std::unique_ptr<::media::AudioBus>>
ReadDataToAudioBus(const char* data, size_t size, int64_t* timestamp);
} // namespace capture_service
} // namespace media
} // namespace chromecast
#endif // CHROMECAST_MEDIA_AUDIO_CAPTURE_SERVICE_MESSAGE_PARSING_UTIL_H_
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/audio/capture_service/message_parsing_util.h"
#include <vector>
#include "base/big_endian.h"
#include "chromecast/media/audio/capture_service/constants.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace chromecast {
namespace media {
namespace capture_service {
namespace {
using chromecast::media::capture_service::SampleFormat;
constexpr size_t kTotalHeaderBytes = 16;
constexpr size_t kFrames = 10;
constexpr size_t kChannels = 2;
void FillHeader(char* data, size_t size, uint16_t format) {
base::BigEndianWriter writer(data, size);
uint16_t message_size = size - sizeof(uint16_t);
writer.WriteU16(message_size);
// Header.
writer.WriteU16(static_cast<uint16_t>(kChannels));
writer.WriteU16(format);
writer.WriteU16(uint16_t(0)); // Padding.
writer.WriteU64(uint64_t(0)); // Timestamp.
}
TEST(ReadDataToAudioBusTest, ValidInterleavedInt16) {
size_t data_size = kTotalHeaderBytes / sizeof(int16_t) + kFrames * kChannels;
std::vector<int16_t> data(data_size, std::numeric_limits<int16_t>::max());
FillHeader(reinterpret_cast<char*>(data.data()),
data.size() * sizeof(int16_t),
static_cast<uint16_t>(SampleFormat::INTERLEAVED_INT16));
// Fill the second channel with min().
for (size_t i = kTotalHeaderBytes / sizeof(int16_t) + 1; i < data_size;
i += 2) {
data[i] = std::numeric_limits<int16_t>::min();
}
int64_t timestamp = -1;
auto audio = ReadDataToAudioBus(
reinterpret_cast<char*>(data.data()) + sizeof(uint16_t),
data_size * sizeof(int16_t) - sizeof(uint16_t), &timestamp);
EXPECT_TRUE(audio.has_value());
::media::AudioBus* audio_ptr = audio->get();
EXPECT_EQ(audio_ptr->channels(), static_cast<int>(kChannels));
EXPECT_EQ(static_cast<size_t>(audio_ptr->frames()), kFrames);
EXPECT_EQ(timestamp, 0);
for (size_t f = 0; f < kFrames; f++) {
EXPECT_FLOAT_EQ(audio_ptr->channel(0)[f], 1.0f);
EXPECT_FLOAT_EQ(audio_ptr->channel(1)[f], -1.0f);
}
}
TEST(ReadDataToAudioBusTest, ValidInterleavedInt32) {
size_t data_size = kTotalHeaderBytes / sizeof(int32_t) + kFrames * kChannels;
std::vector<int32_t> data(data_size, std::numeric_limits<int32_t>::min());
FillHeader(reinterpret_cast<char*>(data.data()),
data.size() * sizeof(int32_t),
static_cast<uint16_t>(SampleFormat::INTERLEAVED_INT32));
// Fill the second channel with max().
for (size_t i = kTotalHeaderBytes / sizeof(int32_t) + 1; i < data_size;
i += 2) {
data[i] = std::numeric_limits<int32_t>::max();
}
int64_t timestamp = -1;
auto audio = ReadDataToAudioBus(
reinterpret_cast<char*>(data.data()) + sizeof(uint16_t),
data_size * sizeof(int32_t) - sizeof(uint16_t), &timestamp);
EXPECT_TRUE(audio.has_value());
::media::AudioBus* audio_ptr = audio->get();
EXPECT_EQ(audio_ptr->channels(), static_cast<int>(kChannels));
EXPECT_EQ(static_cast<size_t>(audio_ptr->frames()), kFrames);
EXPECT_EQ(timestamp, 0);
for (size_t f = 0; f < kFrames; f++) {
EXPECT_FLOAT_EQ(audio_ptr->channel(0)[f], -1.0f);
EXPECT_FLOAT_EQ(audio_ptr->channel(1)[f], 1.0f);
}
}
TEST(ReadDataToAudioBusTest, ValidPlanarFloat) {
size_t data_size = kTotalHeaderBytes / sizeof(float) + kFrames * kChannels;
std::vector<float> data(data_size, .0f);
FillHeader(reinterpret_cast<char*>(data.data()), data.size() * sizeof(float),
static_cast<uint16_t>(SampleFormat::PLANAR_FLOAT));
// Fill the last k frames, i.e., the second channel, with 0.5f.
for (size_t i = data_size - kFrames; i < data_size; i++) {
data[i] = .5f;
}
int64_t timestamp = -1;
auto audio = ReadDataToAudioBus(
reinterpret_cast<char*>(data.data()) + sizeof(uint16_t),
data_size * sizeof(float) - sizeof(uint16_t), &timestamp);
EXPECT_TRUE(audio.has_value());
::media::AudioBus* audio_ptr = audio->get();
EXPECT_EQ(audio_ptr->channels(), static_cast<int>(kChannels));
EXPECT_EQ(static_cast<size_t>(audio_ptr->frames()), kFrames);
EXPECT_EQ(timestamp, 0);
for (size_t f = 0; f < kFrames; f++) {
EXPECT_FLOAT_EQ(audio_ptr->channel(0)[f], .0f);
EXPECT_FLOAT_EQ(audio_ptr->channel(1)[f], .5f);
}
}
TEST(ReadDataToAudioBusTest, InvalidFormat) {
size_t data_size = kTotalHeaderBytes / sizeof(float) + kFrames * kChannels;
std::vector<float> data(data_size, 1.0f);
FillHeader(reinterpret_cast<char*>(data.data()), data.size() * sizeof(float),
static_cast<uint16_t>(SampleFormat::PLANAR_FLOAT) + 1);
int64_t timestamp = -1;
auto audio = ReadDataToAudioBus(
reinterpret_cast<char*>(data.data()) + sizeof(uint16_t),
data_size * sizeof(float) - sizeof(uint16_t), &timestamp);
EXPECT_FALSE(audio.has_value());
}
TEST(ReadDataToAudioBusTest, EmptyMessageData) {
size_t data_size = kTotalHeaderBytes / sizeof(float);
std::vector<float> data(data_size, 1.0f);
FillHeader(reinterpret_cast<char*>(data.data()), data.size() * sizeof(float),
static_cast<uint16_t>(SampleFormat::PLANAR_FLOAT));
int64_t timestamp = -1;
auto audio = ReadDataToAudioBus(
reinterpret_cast<char*>(data.data()) + sizeof(uint16_t),
data_size * sizeof(float) - sizeof(uint16_t), &timestamp);
EXPECT_FALSE(audio.has_value());
}
TEST(ReadDataToAudioBusTest, InvalidDataLength) {
size_t data_size =
kTotalHeaderBytes / sizeof(float) + kFrames * kChannels + 1;
std::vector<float> data(data_size, 1.0f);
FillHeader(reinterpret_cast<char*>(data.data()), data.size() * sizeof(float),
static_cast<uint16_t>(SampleFormat::PLANAR_FLOAT));
int64_t timestamp = -1;
auto audio = ReadDataToAudioBus(
reinterpret_cast<char*>(data.data()) + sizeof(uint16_t),
data_size * sizeof(float) - sizeof(uint16_t), &timestamp);
EXPECT_FALSE(audio.has_value());
}
TEST(ReadDataToAudioBusTest, NotAlignedData) {
size_t data_size =
kTotalHeaderBytes / sizeof(float) + kFrames * kChannels + 1;
std::vector<float> data(data_size, 1.0f);
FillHeader(reinterpret_cast<char*>(data.data()) + 1,
data.size() * sizeof(float) - 1,
static_cast<uint16_t>(SampleFormat::PLANAR_FLOAT));
int64_t timestamp = -1;
auto audio = ReadDataToAudioBus(
reinterpret_cast<char*>(data.data()) + 1 + sizeof(uint16_t),
data_size * sizeof(float) - 1 - sizeof(uint16_t), &timestamp);
EXPECT_FALSE(audio.has_value());
}
} // namespace
} // namespace capture_service
} // namespace media
} // namespace chromecast
...@@ -13,7 +13,6 @@ proto_library("proto") { ...@@ -13,7 +13,6 @@ proto_library("proto") {
] ]
} }
use_unix_sockets = is_linux
buildflag_header("buildflags") { buildflag_header("buildflags") {
header = "mixer_service_buildflags.h" header = "mixer_service_buildflags.h"
......
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_MEDIA_AUDIO_MOCK_AUDIO_INPUT_CALLBACK_H_
#define CHROMECAST_MEDIA_AUDIO_MOCK_AUDIO_INPUT_CALLBACK_H_
#include "media/audio/audio_io.h"
#include "testing/gmock/include/gmock/gmock.h"
namespace chromecast {
class MockAudioInputCallback
: public ::media::AudioInputStream::AudioInputCallback {
public:
MockAudioInputCallback();
~MockAudioInputCallback() override;
MOCK_METHOD3(OnData, void(const ::media::AudioBus*, base::TimeTicks, double));
MOCK_METHOD0(OnError, void());
};
inline MockAudioInputCallback::MockAudioInputCallback() = default;
inline MockAudioInputCallback::~MockAudioInputCallback() = default;
} // namespace chromecast
#endif // CHROMECAST_MEDIA_AUDIO_MOCK_AUDIO_INPUT_CALLBACK_H_
...@@ -78,11 +78,10 @@ bool SmallMessageSocket::SendBuffer(net::IOBuffer* data, int size) { ...@@ -78,11 +78,10 @@ bool SmallMessageSocket::SendBuffer(net::IOBuffer* data, int size) {
void SmallMessageSocket::Send() { void SmallMessageSocket::Send() {
for (int i = 0; i < kMaxIOLoop; ++i) { for (int i = 0; i < kMaxIOLoop; ++i) {
DCHECK(write_buffer_); DCHECK(write_buffer_);
// TODO(kmackay): Use base::BindOnce() once it is supported.
int result = int result =
socket_->Write(write_buffer_.get(), write_buffer_->BytesRemaining(), socket_->Write(write_buffer_.get(), write_buffer_->BytesRemaining(),
base::BindRepeating(&SmallMessageSocket::OnWriteComplete, base::BindOnce(&SmallMessageSocket::OnWriteComplete,
base::Unretained(this)), base::Unretained(this)),
MISSING_TRAFFIC_ANNOTATION); MISSING_TRAFFIC_ANNOTATION);
if (!HandleWriteResult(result)) { if (!HandleWriteResult(result)) {
return; return;
...@@ -153,11 +152,10 @@ void SmallMessageSocket::Read() { ...@@ -153,11 +152,10 @@ void SmallMessageSocket::Read() {
// This improves average packet receive delay as compared to always posting a // This improves average packet receive delay as compared to always posting a
// new task for each call to Read(). // new task for each call to Read().
for (int i = 0; i < kMaxIOLoop; ++i) { for (int i = 0; i < kMaxIOLoop; ++i) {
// TODO(kmackay): Use base::BindOnce() once it is supported.
int read_result = int read_result =
socket_->Read(read_buffer_.get(), read_buffer_->RemainingCapacity(), socket_->Read(read_buffer_.get(), read_buffer_->RemainingCapacity(),
base::BindRepeating(&SmallMessageSocket::OnReadComplete, base::BindOnce(&SmallMessageSocket::OnReadComplete,
base::Unretained(this))); base::Unretained(this)));
if (!HandleReadResult(read_result)) { if (!HandleReadResult(read_result)) {
return; return;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment