Commit dc311da5 authored by Ryan Keane's avatar Ryan Keane Committed by Chromium LUCI CQ

[Cast] Add PushBufferQueue FIFO

This CL defined the PushBufferQueue class, which sits in between the
CmaBackendProxy (which feeds in data when it gets a PushBuffer command)
and the CastAudioDecoderChannel (see
https://chromium-review.googlesource.com/c/chromium/src/+/2509046) which
pulls data out (via GetBufferData()).

Bug: b/167426091
Change-Id: I7e155f4d121bef31855c48d2c7f037bafd626a20
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2509338
Commit-Queue: Ryan Keane <rwkeane@google.com>
Reviewed-by: default avatarDaniel Nicoara <dnicoara@chromium.org>
Reviewed-by: default avatarYuchen Liu <yucliu@chromium.org>
Cr-Commit-Position: refs/heads/master@{#834551}
parent 15ab3759
......@@ -79,8 +79,17 @@ cast_source_set("unittests") {
# Building the required proto files here cause a linker issue on
# Android because both protobuf_full and protobuf_lite get included.
if (!is_android) {
sources += [ "backend/proxy/cma_backend_proxy_unittest.cc" ]
deps += [ "//chromecast/media/cma/backend/proxy" ]
sources += [
"backend/proxy/cma_backend_proxy_unittest.cc",
"backend/proxy/push_buffer_queue_unittest.cc",
]
deps += [
"//chromecast/media/cma/backend/proxy",
"//chromecast/media/cma/backend/proxy:cast_audio_decoder_service_proto",
# TODO(b/174874712): Remove the explicit gRPC dependency.
"//third_party/grpc:grpc++",
]
}
if (enable_video_with_mixed_audio) {
......
include_rules = [
"+third_party/grpc",
]
......@@ -41,6 +41,8 @@ cast_source_set("proxy") {
"multizone_audio_decoder_proxy.h",
"multizone_audio_decoder_proxy_impl.cc",
"multizone_audio_decoder_proxy_impl.h",
"push_buffer_queue.cc",
"push_buffer_queue.h",
]
deps = [
......@@ -49,5 +51,6 @@ cast_source_set("proxy") {
"//chromecast/base",
"//chromecast/media/api",
"//chromecast/public/media",
"//third_party/protobuf:protobuf_full",
]
}
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/cma/backend/proxy/push_buffer_queue.h"
#include <atomic>
#include "base/notreached.h"
#include "base/template_util.h"
#include "chromecast/media/api/decoder_buffer_base.h"
#include "third_party/protobuf/src/google/protobuf/util/delimited_message_util.h"
namespace chromecast {
namespace media {
namespace {
// The number of consecutive failed read attempts before the buffer is
// determined to be in an invalid state.
int kMaximumFailedReadAttempts = 10;
// The maximum size of a read/write window used by the underlying buffer. This
// is the maximum size of the array which will be cached for upcoming use.
// size_t type is used here to simplify comparison logic later on.
size_t kWindowSizeBytes = 32;
} // namespace
// static
constexpr size_t PushBufferQueue::kBufferSizeBytes;
PushBufferQueue::PushBufferQueue()
: producer_handler_(this),
consumer_handler_(this),
consumer_stream_(base::in_place_t(), &consumer_handler_),
protobuf_consumer_stream_(base::in_place_t(),
&consumer_stream_.value(),
1),
producer_stream_(base::in_place_t(), &producer_handler_) {
DETACH_FROM_SEQUENCE(producer_sequence_checker_);
DETACH_FROM_SEQUENCE(consumer_sequence_checker_);
}
PushBufferQueue::~PushBufferQueue() = default;
bool PushBufferQueue::PushBuffer(const PushBufferRequest& request) {
auto success = PushBufferImpl(request);
if (success) {
producer_handler_.ApplyNewBytesWritten();
}
return success;
}
bool PushBufferQueue::PushBufferImpl(const PushBufferRequest& request) {
DCHECK_CALLED_ON_VALID_SEQUENCE(producer_sequence_checker_);
bytes_written_during_current_write_ = 0;
// NOTE: This method is used instead of SerializeDelimitedToZeroCopyStream()
// due to bugs in the method's implementation. See b/173477672.
DCHECK(producer_stream_.has_value());
bool success = google::protobuf::util::SerializeDelimitedToOstream(
request, &producer_stream_.value());
if (success) {
producer_handler_.overflow();
} else {
// Now the stream is in a bad state, so recreate it. This should only occur
// when the entire |buffer_| is full at time of writing.
bytes_written_during_current_write_ = 0;
producer_handler_.overflow();
producer_stream_ = base::nullopt;
producer_stream_.emplace(&producer_handler_);
}
return success;
}
bool PushBufferQueue::HasBufferedData() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(consumer_sequence_checker_);
return !is_in_invalid_state_ && GetAvailableyByteCount() != size_t{0};
}
base::Optional<PushBufferQueue::PushBufferRequest>
PushBufferQueue::GetBufferedData() {
auto result = GetBufferedDataImpl();
if (result.has_value()) {
consumer_handler_.ApplyNewBytesRead();
}
return result;
}
base::Optional<PushBufferQueue::PushBufferRequest>
PushBufferQueue::GetBufferedDataImpl() {
DCHECK_CALLED_ON_VALID_SEQUENCE(consumer_sequence_checker_);
DCHECK(HasBufferedData());
bytes_read_during_current_read_ = 0;
PushBufferRequest request;
DCHECK(protobuf_consumer_stream_.has_value());
bool succeeded = google::protobuf::util::ParseDelimitedFromZeroCopyStream(
&request, &protobuf_consumer_stream_.value(), nullptr /* clean_eof */);
// This case will only occur in one of the following cases:
// - Reading a PushBuffer at the same time it is being written.
// - An error occurs while reading from the stream (specifically, a PushBuffer
// was serialized incorrectly).
// The former case is not expected to occur, but is handled to be safe.
// The latter case is only expected if the buffer is written to when not
// enough space is available to handle the new write.
//
// TODO(rwkeane): Eliminate handling of the former case after validating this
// doesn't occur in practice.
if (!succeeded) {
consecuitive_read_failures_++;
if (+consecuitive_read_failures_ > kMaximumFailedReadAttempts) {
// This means that data was probably serialized incorrectly.
is_in_invalid_state_ = true;
}
// Reset the read pointers so that future reads re-read the old data.
bytes_read_during_current_read_ = 0;
consumer_handler_.ResetReadPointers();
// If |!succeeded|, the streams have ended up in an unexpected state and
// need to be recreated.
protobuf_consumer_stream_ = base::nullopt;
consumer_stream_ = base::nullopt;
consumer_stream_.emplace(&consumer_handler_);
protobuf_consumer_stream_.emplace(&consumer_stream_.value(), 1);
return base::nullopt;
}
consecuitive_read_failures_ = 0;
return request;
}
int PushBufferQueue::GetAvailableyByteCount() const {
const int total_bytes_read =
bytes_read_so_far_.load(std::memory_order_relaxed) +
consumer_handler_.GetReadOffset();
const int total_bytes_written =
bytes_written_so_far_.load(std::memory_order_relaxed);
return total_bytes_written - total_bytes_read;
}
PushBufferQueue::ProducerHandler::ProducerHandler(PushBufferQueue* queue)
: queue_(queue) {
DCHECK(queue_);
}
PushBufferQueue::ProducerHandler::~ProducerHandler() = default;
int PushBufferQueue::ProducerHandler::overflow(int ch) {
// Get the number of bytes read and written so far.
const size_t current_read_bytes =
queue_->bytes_read_so_far_.load(std::memory_order_acquire);
const int currently_written_bytes = UpdateBytesWritten();
DCHECK_GE(static_cast<size_t>(currently_written_bytes), current_read_bytes);
// Calculates the current size of the buffer.
const size_t bytes_currently_used =
currently_written_bytes - current_read_bytes;
DCHECK_LE(bytes_currently_used, kBufferSizeBytes);
// Calculates the number of bytes that should be included in the next write
// window, which is the least of:
// - |kWindowSizeBytes|
// - The number that can be written before wrapping around to the beginning of
// the underlying array,
// - The number of bytes available before the current read pointer.
const size_t current_write_index = currently_written_bytes % kBufferSizeBytes;
const size_t available_writable_bytes =
std::min(kBufferSizeBytes - current_write_index,
current_read_bytes + kBufferSizeBytes - currently_written_bytes);
const size_t new_window_size =
std::min(kWindowSizeBytes, available_writable_bytes);
// If there is no writable area, then return a special value per method
// contact.
if (new_window_size == 0) {
setp(epptr(), epptr());
return std::char_traits<char>::eof();
}
// Update the pointers that determine the writable area and write the given
// value |ch| if one was given.
setp(&queue_->buffer_[current_write_index],
&queue_->buffer_[current_write_index + new_window_size]);
const bool should_write_ch = (ch != std::char_traits<char>::eof());
if (should_write_ch) {
sputc(static_cast<char>(ch));
}
return 1; // This can be any value except std::char_traits<char>::eof().
}
void PushBufferQueue::ProducerHandler::ApplyNewBytesWritten() {
queue_->bytes_written_so_far_.fetch_add(
queue_->bytes_written_during_current_write_, std::memory_order_relaxed);
queue_->bytes_written_during_current_write_ = 0;
}
size_t PushBufferQueue::ProducerHandler::UpdateBytesWritten() {
const int change_in_write_count = pptr() - pbase();
DCHECK_GE(change_in_write_count, 0);
queue_->bytes_written_during_current_write_ += change_in_write_count;
return queue_->bytes_written_so_far_.load(std::memory_order_relaxed) +
queue_->bytes_written_during_current_write_;
}
PushBufferQueue::ConsumerHandler::ConsumerHandler(PushBufferQueue* queue)
: queue_(queue) {
DCHECK(queue_);
}
PushBufferQueue::ConsumerHandler::~ConsumerHandler() = default;
int PushBufferQueue::ConsumerHandler::underflow() {
// Get the written and read bytes.
const size_t currently_written_bytes =
queue_->bytes_written_so_far_.load(std::memory_order_acquire);
const size_t current_read_bytes = UpdateBytesRead();
DCHECK_GE(currently_written_bytes, current_read_bytes);
// Stop reading at either the end of the array or the current write index,
// whichever is sooner. While there may be more data wrapped around after the
// end of the array, that can be handled as part of the next underflow() call.
const size_t avail = currently_written_bytes - current_read_bytes;
const size_t begin = current_read_bytes % kBufferSizeBytes;
const size_t end = std::min(begin + avail, kBufferSizeBytes);
const size_t new_window_size = std::min(end - begin, kWindowSizeBytes);
// This means that there are no bytes left to read. Return a special value per
// method contract.
if (new_window_size == 0) {
return std::char_traits<char>::eof();
}
// Otherwise, there is still readable data. Update the readable window and
// return the current character per method contact. Because
// std::char_traits<char>::eof() is a special return code, cast to a uint to
// avoid all negative results (EOF is guaranteed to be negative by the stl).
DCHECK_LE(current_read_bytes + new_window_size, currently_written_bytes);
setg(&queue_->buffer_[begin], &queue_->buffer_[begin],
&queue_->buffer_[begin + new_window_size]);
return static_cast<uint8_t>(queue_->buffer_[begin]);
}
void PushBufferQueue::ConsumerHandler::ResetReadPointers() {
const size_t begin =
queue_->bytes_read_so_far_.load(std::memory_order_relaxed) %
kBufferSizeBytes;
setg(&queue_->buffer_[begin], &queue_->buffer_[begin],
&queue_->buffer_[begin]);
}
void PushBufferQueue::ConsumerHandler::ApplyNewBytesRead() {
queue_->bytes_read_so_far_.fetch_add(queue_->bytes_read_during_current_read_,
std::memory_order_relaxed);
queue_->bytes_read_during_current_read_ = 0;
}
size_t PushBufferQueue::ConsumerHandler::UpdateBytesRead() {
const int change_in_read_count = GetReadOffset();
DCHECK_GE(change_in_read_count, 0);
queue_->bytes_read_during_current_read_ += change_in_read_count;
return queue_->bytes_read_so_far_.load(std::memory_order_relaxed) +
queue_->bytes_read_during_current_read_;
}
int PushBufferQueue::ConsumerHandler::GetReadOffset() const {
return gptr() - eback();
}
} // namespace media
} // namespace chromecast
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_MEDIA_CMA_BACKEND_PROXY_PUSH_BUFFER_QUEUE_H_
#define CHROMECAST_MEDIA_CMA_BACKEND_PROXY_PUSH_BUFFER_QUEUE_H_
#include <atomic>
#include <istream>
#include <ostream>
#include "base/optional.h"
#include "base/sequence_checker.h"
#include "chromecast/media/api/decoder_buffer_base.h"
#include "third_party/openscreen/src/cast/cast_core/api/runtime/cast_audio_decoder_service.pb.h"
#include "third_party/protobuf/src/google/protobuf/io/zero_copy_stream_impl.h"
namespace chromecast {
namespace media {
struct AudioConfig;
// This class is responsible for buffering both DecoderBuffer and AudioConfig
// data, which are pushed together over gRPC using the PushData() API call.
// Two sequences are expected to simultaneously access this object:
// - A PRODUCER sequence, which will push new data in.
// - A CONSUMER thread which will pull this data back out of the data structure.
//
// This is achieved through serializing this protobuf into bytes, then storing
// these bytes in a lockless FIFO.
class PushBufferQueue {
public:
using PushBufferRequest = cast::media::PushBufferRequest;
// The amount of space to allocate in the buffer.
static constexpr size_t kBufferSizeBytes = 0x01 << 12; // 4 kB.
PushBufferQueue();
~PushBufferQueue();
// Pushes the data stored in the associated type to the queue underlying this
// object. Returns true if the operation was successful, and false otherwise.
//
// May only be called by the PRODUCER.
bool PushBuffer(const PushBufferRequest& request);
// Returns true if there is data available for reading.
//
// May only be called by the CONSUMER.
bool HasBufferedData() const;
// Attempts to read the top PushBufferRequest on the queue, returning the
// instance on success and empty if not enough data is available yet.
//
// May only be called by the CONSUMER.
base::Optional<PushBufferRequest> GetBufferedData();
private:
// These classes exist for the following 2 reasons:
// 1) Readability. Separating of the Read and Write methods is simpler
// 2) Thread safety guarantees. The stl provides no guarantees of thread
// safety within a single instance of std::basic_streambuf<char>, even
// though there should be no overlap between the resources used by both.
// In an ideal world, this functionality could all live in the
// |PushBufferQueue| class.
//
// The approach used by basic_streambuf is to maintain a 'window' on the data
// from which it reads/writes. When the window runs out, underflow() or read
// or overflow() for write is called to get the next window.
// These allow this class to be used as a thread-safe circular read/write
// buffer by input and output streams, as required for use with protobuf
// serialization and deserialization utilities.
//
// Methods in |ProducerHandler| may only be called from the PRODUCER.
class ProducerHandler : public std::basic_streambuf<char> {
public:
explicit ProducerHandler(PushBufferQueue* queue);
~ProducerHandler() override;
// std::basic_streambuf<char> overrides:
int overflow(int ch = std::char_traits<char>::eof()) override;
// Stores the new value of |bytes_written_so_far_| following a successful
// write.
void ApplyNewBytesWritten();
private:
// Updates |bytes_written_during_current_write_| and returns the total
// number of bytes written including these new bytes.
size_t UpdateBytesWritten();
PushBufferQueue* const queue_;
};
// Methods in |ConsumerHandler| may only be called from the CONSUMER.
class ConsumerHandler : public std::basic_streambuf<char> {
public:
explicit ConsumerHandler(PushBufferQueue* queue);
~ConsumerHandler() override;
// std::basic_streambuf<char> overrides:
int underflow() override;
// Returns the number of bytes that have been read so far but not accounted
// for by |queue_->bytes_read_so_far_|.
int GetReadOffset() const;
// Resets the get area for this streambuf to start at the location pointed
// to by |bytes_read_so_far_| and configures the stream to call underflow()
// during its next read.
void ResetReadPointers();
// Stores the new value of |bytes_read_so_far_| following a successful read.
void ApplyNewBytesRead();
private:
// Updates |bytes_read_during_current_read_| and returns the total number of
// bytes read including these new bytes.
size_t UpdateBytesRead();
PushBufferQueue* const queue_;
};
// Friend declaration is needed to test some edge cases that can be hit when
// simultaneous reads and writes are ongoing.
friend class PushBufferQueueTests;
// Give access to helper types.
friend class ProducerHandler;
friend class ConsumerHandler;
// Gets the number of buffered bytes. May only be called from the CONSUMER.
int GetAvailableyByteCount() const;
// Helper methods to be used for test hooks.
bool PushBufferImpl(const PushBufferRequest& request);
base::Optional<PushBufferRequest> GetBufferedDataImpl();
// Buffer where serialized PushBufferRequest data is stored.
char buffer_[kBufferSizeBytes];
// Total number of bytes read or written by completed operations so far.
// Atomics are used both to ensure that read and write operations are atomic
// on all systems and to ensure that different values for these values aren't
// loaded from each CPU's physical cache. Size_t types are used intentionally
// to allow for wrap-around.
std::atomic_size_t bytes_read_so_far_{0};
std::atomic_size_t bytes_written_so_far_{0};
// The number of bytes read during the current GetBufferedData() call. This is
// necessary due to internal details of how an IstreamInputStream handles
// end-of-stream conditions. May only be accessed or modified by the
// CONSUMER.
int bytes_read_during_current_read_ = 0;
// The number of bytes written during the current PushBuffer call. This helps
// to prevent reads of PushBuffer instances currently being written. May only
// be accessed by the PRODUCER.
int bytes_written_during_current_write_ = 0;
// Tracks whether this buffer is in a valid state for further reads to occur.
// May only be used by the CONSUMER.
int consecuitive_read_failures_ = 0;
bool is_in_invalid_state_ = false;
// Helpers for keeping CONSUMER and PRODUCER sequences independent.
ProducerHandler producer_handler_;
ConsumerHandler consumer_handler_;
// Sequence checkers for thread safety validation:
SEQUENCE_CHECKER(producer_sequence_checker_);
SEQUENCE_CHECKER(consumer_sequence_checker_);
// Input streams backed by this instance. They must be optional so that they
// can be re-created following a failed read. These should only be used by the
// CONSUMER.
base::Optional<std::istream> consumer_stream_;
base::Optional<google::protobuf::io::IstreamInputStream>
protobuf_consumer_stream_;
// Output stream backed by this instance. This must be optional so it can be
// re-created following a failed write. It should only be used by the
// PRODUCER.
base::Optional<std::ostream> producer_stream_;
};
} // namespace media
} // namespace chromecast
#endif // CHROMECAST_MEDIA_CMA_BACKEND_PROXY_PUSH_BUFFER_QUEUE_H_
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/cma/backend/proxy/push_buffer_queue.h"
#include <atomic>
#include <sstream>
#include "base/bind.h"
#include "base/location.h"
#include "base/optional.h"
#include "base/threading/thread.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "third_party/openscreen/src/cast/cast_core/api/runtime/cast_audio_decoder_service.grpc.pb.h"
namespace chromecast {
namespace media {
class PushBufferQueueTests : public testing::Test {
public:
PushBufferQueueTests()
: first_audio_buffer_(CreateAudioBufferRequest(0, false, 1, 2, 255)),
second_audio_buffer_(CreateAudioBufferRequest(2, false, 6, 4, 2, 0)),
third_audio_buffer_(
CreateAudioBufferRequest(4, true, 0, 1, 1, 2, 3, 5, 8)),
fourth_audio_buffer_(
CreateAudioBufferRequest(42, true, 4, 8, 15, 16, 23, 42)),
fifth_audio_buffer_(CreateAudioBufferRequest(1, false)) {
std::vector<uint8_t> extra_data{0, 1, 7, 127, 255};
auto* config = new cast::media::AudioConfiguration;
config->set_codec(
cast::media::AudioConfiguration_AudioCodec_AUDIO_CODEC_MP3);
config->set_channel_layout(
cast::media::
AudioConfiguration_ChannelLayout_CHANNEL_LAYOUT_SURROUND_5_1);
config->set_sample_format(
cast::media::AudioConfiguration_SampleFormat_SAMPLE_FORMAT_PLANAR_S32);
config->set_bytes_per_channel(42);
config->set_channel_number(-1);
config->set_samples_per_second(112358);
config->set_extra_data(extra_data.data(), extra_data.size());
first_audio_config_.set_allocated_audio_config(config);
}
void ReadData(const std::string& name,
const PushBufferQueue::PushBufferRequest& target_buffer) {
ASSERT_TRUE(queue_.HasBufferedData()) << name;
base::Optional<PushBufferRequest> get = queue_.GetBufferedData();
ASSERT_TRUE(get.has_value()) << name;
CheckEqual("first", get.value(), target_buffer);
}
protected:
using PushBufferRequest = PushBufferQueue::PushBufferRequest;
using AudioDecoderBuffer = cast::media::AudioDecoderBuffer;
template <typename... TData>
static PushBufferRequest CreateAudioBufferRequest(int64_t pts_micros,
bool end_of_stream,
TData... data) {
return CreateAudioBufferRequest(pts_micros, end_of_stream,
std::vector<uint8_t>{uint8_t{data}...});
}
static PushBufferRequest CreateAudioBufferRequest(
int64_t pts_micros,
bool end_of_stream,
std::vector<uint8_t> data_vector) {
PushBufferRequest request;
auto* audio_buffer = new AudioDecoderBuffer;
audio_buffer->set_pts_micros(pts_micros);
audio_buffer->set_end_of_stream(end_of_stream);
audio_buffer->set_data(data_vector.data(), data_vector.size());
request.set_allocated_buffer(audio_buffer);
return request;
}
void CheckEqual(const std::string& name,
const PushBufferRequest& first,
const PushBufferRequest& second) {
std::string failure_str = "failed on " + std::move(name);
ASSERT_EQ(first.has_buffer(), second.has_buffer()) << failure_str;
ASSERT_EQ(first.has_audio_config(), second.has_audio_config())
<< failure_str;
if (first.has_buffer()) {
EXPECT_EQ(first.buffer().pts_micros(), second.buffer().pts_micros())
<< failure_str;
EXPECT_EQ(first.buffer().end_of_stream(), second.buffer().end_of_stream())
<< failure_str;
EXPECT_EQ(first.buffer().data(), second.buffer().data()) << failure_str;
}
if (first.has_audio_config()) {
EXPECT_EQ(first.audio_config().codec(), second.audio_config().codec())
<< failure_str;
EXPECT_EQ(first.audio_config().channel_layout(),
second.audio_config().channel_layout())
<< failure_str;
EXPECT_EQ(first.audio_config().sample_format(),
second.audio_config().sample_format())
<< failure_str;
EXPECT_EQ(first.audio_config().bytes_per_channel(),
second.audio_config().bytes_per_channel())
<< failure_str;
EXPECT_EQ(first.audio_config().channel_number(),
second.audio_config().channel_number())
<< failure_str;
EXPECT_EQ(first.audio_config().samples_per_second(),
second.audio_config().samples_per_second())
<< failure_str;
EXPECT_EQ(first.audio_config().extra_data(),
second.audio_config().extra_data())
<< failure_str;
}
}
void UpdateBufferWriteStreamPositions() {
queue_.producer_handler_.overflow();
queue_.producer_handler_.ApplyNewBytesWritten();
}
std::string GetIterationName(int iteration_id) {
std::stringstream ss;
ss << "iteration " << iteration_id;
return ss.str();
}
bool StartPushBuffer(const PushBufferRequest& request) {
return queue_.PushBufferImpl(request);
}
base::Optional<PushBufferQueue::PushBufferRequest> StartGetBufferedData() {
return queue_.GetBufferedDataImpl();
}
void FinishPushBuffer() { queue_.producer_handler_.ApplyNewBytesWritten(); }
void FinishGetBufferedData() { queue_.consumer_handler_.ApplyNewBytesRead(); }
std::istream* consumer_stream() { return &queue_.consumer_stream_.value(); }
std::ostream* producer_stream() { return &queue_.producer_stream_.value(); }
size_t available_bytes() { return queue_.GetAvailableyByteCount(); }
PushBufferQueue queue_;
// Some test data
PushBufferRequest first_audio_buffer_;
PushBufferRequest second_audio_buffer_;
PushBufferRequest third_audio_buffer_;
PushBufferRequest fourth_audio_buffer_;
PushBufferRequest fifth_audio_buffer_;
PushBufferRequest first_audio_config_;
};
TEST_F(PushBufferQueueTests, TestPushOrdering) {
EXPECT_FALSE(queue_.HasBufferedData());
queue_.PushBuffer(first_audio_buffer_);
queue_.PushBuffer(second_audio_buffer_);
queue_.PushBuffer(first_audio_config_);
queue_.PushBuffer(third_audio_buffer_);
ReadData("first", first_audio_buffer_);
ReadData("second", second_audio_buffer_);
queue_.PushBuffer(fourth_audio_buffer_);
ReadData("config", first_audio_config_);
ReadData("third", third_audio_buffer_);
ReadData("fourth", fourth_audio_buffer_);
EXPECT_FALSE(queue_.HasBufferedData());
queue_.PushBuffer(fifth_audio_buffer_);
ReadData("fifth", fifth_audio_buffer_);
EXPECT_FALSE(queue_.HasBufferedData());
}
TEST_F(PushBufferQueueTests, TestPushLargeBuffer) {
std::vector<uint8_t> data;
for (int i = 0; i < 256; i++) {
data.push_back(i);
}
auto buffer = CreateAudioBufferRequest(0, false, data);
queue_.PushBuffer(buffer);
ReadData("big buffer", buffer);
}
TEST_F(PushBufferQueueTests, TestWrapAround) {
auto first_buffer = CreateAudioBufferRequest(0, false, 0, 1);
queue_.PushBuffer(first_buffer);
for (size_t i = 1; i < PushBufferQueue::kBufferSizeBytes * 3; i++) {
const std::string name = GetIterationName(i);
const uint8_t previous_id = (i - 1) % 256;
const uint8_t current_id = i % 256;
const uint8_t next_id = (i + 1) % 256;
auto buffer = CreateAudioBufferRequest(
0, false, std::vector<uint8_t>{current_id, next_id});
auto old_buffer = CreateAudioBufferRequest(
0, false, std::vector<uint8_t>{previous_id, current_id});
// Make sure the length is 6. 7 is prime, so guaranteed to hit all possible
// positions in |buffer_| when an extra bit is used for the size.
std::string serialized_str;
buffer.SerializeToString(&serialized_str);
ASSERT_EQ(serialized_str.size(), size_t{6}) << name;
ASSERT_TRUE(queue_.HasBufferedData()) << name;
ASSERT_TRUE(queue_.PushBuffer(buffer)) << name;
ReadData(name, old_buffer);
}
}
TEST_F(PushBufferQueueTests, TestWriteEntireBuffer) {
for (size_t i = 0; i < (PushBufferQueue::kBufferSizeBytes >> 3); i++) {
auto buffer = CreateAudioBufferRequest(0, false, 0, 1, 2);
// Make sure the length is 8 after serialization (with the extra length
// bit).
std::string serialized_str;
buffer.SerializeToString(&serialized_str);
ASSERT_EQ(serialized_str.size(), size_t{7});
ASSERT_TRUE(queue_.PushBuffer(buffer)) << GetIterationName(i);
}
auto failing_buffer = CreateAudioBufferRequest(0, false);
EXPECT_FALSE(queue_.PushBuffer(failing_buffer));
for (size_t i = 0; i < (PushBufferQueue::kBufferSizeBytes >> 3); i++) {
auto buffer = CreateAudioBufferRequest(0, false, 0, 1, 2);
ReadData(GetIterationName(i), buffer);
}
// Make sure writing still works after the failed write above.
EXPECT_FALSE(queue_.HasBufferedData());
queue_.PushBuffer(first_audio_buffer_);
ReadData("first", first_audio_buffer_);
EXPECT_FALSE(queue_.HasBufferedData());
}
TEST_F(PushBufferQueueTests, TestReadingFromPartialWrite) {
std::string serialized_str;
first_audio_buffer_.SerializeToString(&serialized_str);
char size = static_cast<char>(serialized_str.size());
ASSERT_GT(size, 2);
*producer_stream() << size << serialized_str[0] << serialized_str[1];
UpdateBufferWriteStreamPositions();
ASSERT_TRUE(queue_.HasBufferedData());
base::Optional<PushBufferRequest> pulled_buffer = queue_.GetBufferedData();
EXPECT_FALSE(pulled_buffer.has_value());
EXPECT_TRUE(queue_.HasBufferedData());
for (size_t i = 2; i < serialized_str.size(); i++) {
*producer_stream() << serialized_str[i];
}
UpdateBufferWriteStreamPositions();
ASSERT_TRUE(queue_.HasBufferedData());
pulled_buffer = queue_.GetBufferedData();
ASSERT_TRUE(pulled_buffer.has_value());
CheckEqual("buffer", pulled_buffer.value(), first_audio_buffer_);
}
TEST_F(PushBufferQueueTests, InterleaveProduceAndConsume) {
EXPECT_FALSE(queue_.HasBufferedData());
EXPECT_TRUE(StartPushBuffer(first_audio_buffer_));
EXPECT_FALSE(queue_.HasBufferedData());
FinishPushBuffer();
ReadData("first", first_audio_buffer_);
ASSERT_TRUE(StartPushBuffer(second_audio_buffer_));
FinishGetBufferedData();
EXPECT_FALSE(queue_.HasBufferedData());
FinishPushBuffer();
ReadData("second", second_audio_buffer_);
EXPECT_FALSE(queue_.HasBufferedData());
}
TEST_F(PushBufferQueueTests, TestMultithreaded) {
queue_.PushBuffer(first_audio_buffer_);
queue_.PushBuffer(second_audio_buffer_);
base::Thread consumer_thread("Consumer Thread");
consumer_thread.StartAndWaitForTesting();
{
auto task_runner = consumer_thread.task_runner();
auto this_ptr = base::Unretained(this);
task_runner->PostTask(
FROM_HERE, base::BindOnce(&PushBufferQueueTests::ReadData, this_ptr,
"first", first_audio_buffer_));
task_runner->PostTask(
FROM_HERE, base::BindOnce(&PushBufferQueueTests::ReadData, this_ptr,
"second", second_audio_buffer_));
queue_.PushBuffer(third_audio_buffer_);
queue_.PushBuffer(fourth_audio_buffer_);
task_runner->PostTask(
FROM_HERE, base::BindOnce(&PushBufferQueueTests::ReadData, this_ptr,
"third", third_audio_buffer_));
task_runner->PostTask(
FROM_HERE, base::BindOnce(&PushBufferQueueTests::ReadData, this_ptr,
"fourth", fourth_audio_buffer_));
}
consumer_thread.FlushForTesting();
consumer_thread.Stop();
}
} // namespace media
} // namespace chromecast
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment