Commit 0abcd858 authored by Sergey Ulanov's avatar Sergey Ulanov Committed by Commit Bot

[Fuchsia] Add thread checks in classes used in FuchsiaCdm

Added thread_checker in StreamProcessorHelper, SysmemBufferPool and
SysmemBufferWriterQueue. Also added sequence_checker in
Fuchsia*StreamDecryptor.

Change-Id: I467e53f3512bac5f105d18f6fa5d67fc01fa3d6f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1838625
Commit-Queue: Sergey Ulanov <sergeyu@chromium.org>
Reviewed-by: default avatarYuchen Liu <yucliu@chromium.org>
Reviewed-by: default avatarDavid Dorwin <ddorwin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#704339}
parent 9642dadd
...@@ -92,14 +92,20 @@ FuchsiaStreamDecryptorBase::FuchsiaStreamDecryptorBase( ...@@ -92,14 +92,20 @@ FuchsiaStreamDecryptorBase::FuchsiaStreamDecryptorBase(
fuchsia::media::StreamProcessorPtr processor) fuchsia::media::StreamProcessorPtr processor)
: processor_(std::move(processor), this) {} : processor_(std::move(processor), this) {}
FuchsiaStreamDecryptorBase::~FuchsiaStreamDecryptorBase() = default; FuchsiaStreamDecryptorBase::~FuchsiaStreamDecryptorBase() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
}
void FuchsiaStreamDecryptorBase::DecryptInternal( void FuchsiaStreamDecryptorBase::DecryptInternal(
scoped_refptr<DecoderBuffer> encrypted) { scoped_refptr<DecoderBuffer> encrypted) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
input_writer_queue_.EnqueueBuffer(std::move(encrypted)); input_writer_queue_.EnqueueBuffer(std::move(encrypted));
} }
void FuchsiaStreamDecryptorBase::ResetStream() { void FuchsiaStreamDecryptorBase::ResetStream() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Close current stream and drop all the cached decoder buffers. // Close current stream and drop all the cached decoder buffers.
// Keep input and output buffers to avoid buffer re-allocation. // Keep input and output buffers to avoid buffer re-allocation.
processor_.Reset(); processor_.Reset();
...@@ -109,6 +115,8 @@ void FuchsiaStreamDecryptorBase::ResetStream() { ...@@ -109,6 +115,8 @@ void FuchsiaStreamDecryptorBase::ResetStream() {
// StreamProcessorHelper::Client implementation: // StreamProcessorHelper::Client implementation:
void FuchsiaStreamDecryptorBase::AllocateInputBuffers( void FuchsiaStreamDecryptorBase::AllocateInputBuffers(
const fuchsia::media::StreamBufferConstraints& stream_constraints) { const fuchsia::media::StreamBufferConstraints& stream_constraints) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
base::Optional<fuchsia::sysmem::BufferCollectionConstraints> base::Optional<fuchsia::sysmem::BufferCollectionConstraints>
buffer_constraints = buffer_constraints =
SysmemBufferWriter::GetRecommendedConstraints(stream_constraints); SysmemBufferWriter::GetRecommendedConstraints(stream_constraints);
...@@ -128,10 +136,14 @@ void FuchsiaStreamDecryptorBase::AllocateInputBuffers( ...@@ -128,10 +136,14 @@ void FuchsiaStreamDecryptorBase::AllocateInputBuffers(
} }
void FuchsiaStreamDecryptorBase::OnOutputFormat( void FuchsiaStreamDecryptorBase::OnOutputFormat(
fuchsia::media::StreamOutputFormat format) {} fuchsia::media::StreamOutputFormat format) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
}
void FuchsiaStreamDecryptorBase::OnInputBufferPoolCreated( void FuchsiaStreamDecryptorBase::OnInputBufferPoolCreated(
std::unique_ptr<SysmemBufferPool> pool) { std::unique_ptr<SysmemBufferPool> pool) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!pool) { if (!pool) {
DLOG(ERROR) << "Fail to allocate input buffer."; DLOG(ERROR) << "Fail to allocate input buffer.";
OnError(); OnError();
...@@ -150,6 +162,8 @@ void FuchsiaStreamDecryptorBase::OnInputBufferPoolCreated( ...@@ -150,6 +162,8 @@ void FuchsiaStreamDecryptorBase::OnInputBufferPoolCreated(
void FuchsiaStreamDecryptorBase::OnWriterCreated( void FuchsiaStreamDecryptorBase::OnWriterCreated(
std::unique_ptr<SysmemBufferWriter> writer) { std::unique_ptr<SysmemBufferWriter> writer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!writer) { if (!writer) {
OnError(); OnError();
return; return;
...@@ -166,6 +180,8 @@ void FuchsiaStreamDecryptorBase::OnWriterCreated( ...@@ -166,6 +180,8 @@ void FuchsiaStreamDecryptorBase::OnWriterCreated(
void FuchsiaStreamDecryptorBase::SendInputPacket( void FuchsiaStreamDecryptorBase::SendInputPacket(
const DecoderBuffer* buffer, const DecoderBuffer* buffer,
StreamProcessorHelper::IoPacket packet) { StreamProcessorHelper::IoPacket packet) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!packet.unit_end()) { if (!packet.unit_end()) {
// The encrypted data size is too big. Decryptor should consider // The encrypted data size is too big. Decryptor should consider
// splitting the buffer and update the IV and subsample entries. // splitting the buffer and update the IV and subsample entries.
...@@ -181,6 +197,8 @@ void FuchsiaStreamDecryptorBase::SendInputPacket( ...@@ -181,6 +197,8 @@ void FuchsiaStreamDecryptorBase::SendInputPacket(
} }
void FuchsiaStreamDecryptorBase::ProcessEndOfStream() { void FuchsiaStreamDecryptorBase::ProcessEndOfStream() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
processor_.ProcessEos(); processor_.ProcessEos();
} }
...@@ -208,13 +226,17 @@ FuchsiaClearStreamDecryptor::~FuchsiaClearStreamDecryptor() = default; ...@@ -208,13 +226,17 @@ FuchsiaClearStreamDecryptor::~FuchsiaClearStreamDecryptor() = default;
void FuchsiaClearStreamDecryptor::Decrypt( void FuchsiaClearStreamDecryptor::Decrypt(
scoped_refptr<DecoderBuffer> encrypted, scoped_refptr<DecoderBuffer> encrypted,
Decryptor::DecryptCB decrypt_cb) { Decryptor::DecryptCB decrypt_cb) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!decrypt_cb_); DCHECK(!decrypt_cb_);
decrypt_cb_ = std::move(decrypt_cb); decrypt_cb_ = std::move(decrypt_cb);
current_status_ = Decryptor::kSuccess; current_status_ = Decryptor::kSuccess;
DecryptInternal(std::move(encrypted)); DecryptInternal(std::move(encrypted));
} }
void FuchsiaClearStreamDecryptor::CancelDecrypt() { void FuchsiaClearStreamDecryptor::CancelDecrypt() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
ResetStream(); ResetStream();
// Fire |decrypt_cb_| immediately as required by Decryptor::CancelDecrypt. // Fire |decrypt_cb_| immediately as required by Decryptor::CancelDecrypt.
...@@ -224,6 +246,8 @@ void FuchsiaClearStreamDecryptor::CancelDecrypt() { ...@@ -224,6 +246,8 @@ void FuchsiaClearStreamDecryptor::CancelDecrypt() {
void FuchsiaClearStreamDecryptor::AllocateOutputBuffers( void FuchsiaClearStreamDecryptor::AllocateOutputBuffers(
const fuchsia::media::StreamBufferConstraints& stream_constraints) { const fuchsia::media::StreamBufferConstraints& stream_constraints) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!stream_constraints.has_packet_count_for_client_max() || if (!stream_constraints.has_packet_count_for_client_max() ||
!stream_constraints.has_packet_count_for_client_min()) { !stream_constraints.has_packet_count_for_client_min()) {
DLOG(ERROR) << "StreamBufferConstraints doesn't contain required fields."; DLOG(ERROR) << "StreamBufferConstraints doesn't contain required fields.";
...@@ -247,12 +271,15 @@ void FuchsiaClearStreamDecryptor::AllocateOutputBuffers( ...@@ -247,12 +271,15 @@ void FuchsiaClearStreamDecryptor::AllocateOutputBuffers(
} }
void FuchsiaClearStreamDecryptor::OnProcessEos() { void FuchsiaClearStreamDecryptor::OnProcessEos() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Decryptor never pushes EOS frame. // Decryptor never pushes EOS frame.
NOTREACHED(); NOTREACHED();
} }
void FuchsiaClearStreamDecryptor::OnOutputPacket( void FuchsiaClearStreamDecryptor::OnOutputPacket(
StreamProcessorHelper::IoPacket packet) { StreamProcessorHelper::IoPacket packet) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(decrypt_cb_); DCHECK(decrypt_cb_);
DCHECK(output_reader_); DCHECK(output_reader_);
...@@ -312,6 +339,8 @@ void FuchsiaClearStreamDecryptor::OnOutputPacket( ...@@ -312,6 +339,8 @@ void FuchsiaClearStreamDecryptor::OnOutputPacket(
} }
void FuchsiaClearStreamDecryptor::OnNoKey() { void FuchsiaClearStreamDecryptor::OnNoKey() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Reset the queue. The client is expected to call Decrypt() with the same // Reset the queue. The client is expected to call Decrypt() with the same
// buffer again when it gets kNoKey. // buffer again when it gets kNoKey.
input_writer_queue_.ResetQueue(); input_writer_queue_.ResetQueue();
...@@ -321,6 +350,8 @@ void FuchsiaClearStreamDecryptor::OnNoKey() { ...@@ -321,6 +350,8 @@ void FuchsiaClearStreamDecryptor::OnNoKey() {
} }
void FuchsiaClearStreamDecryptor::OnError() { void FuchsiaClearStreamDecryptor::OnError() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
ResetStream(); ResetStream();
if (decrypt_cb_) if (decrypt_cb_)
std::move(decrypt_cb_).Run(Decryptor::kError, nullptr); std::move(decrypt_cb_).Run(Decryptor::kError, nullptr);
...@@ -330,6 +361,8 @@ void FuchsiaClearStreamDecryptor::OnOutputBufferPoolCreated( ...@@ -330,6 +361,8 @@ void FuchsiaClearStreamDecryptor::OnOutputBufferPoolCreated(
size_t num_buffers_for_client, size_t num_buffers_for_client,
size_t num_buffers_for_server, size_t num_buffers_for_server,
std::unique_ptr<SysmemBufferPool> pool) { std::unique_ptr<SysmemBufferPool> pool) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!pool) { if (!pool) {
LOG(ERROR) << "Fail to allocate output buffer."; LOG(ERROR) << "Fail to allocate output buffer.";
OnError(); OnError();
...@@ -351,6 +384,8 @@ void FuchsiaClearStreamDecryptor::OnOutputBufferPoolCreated( ...@@ -351,6 +384,8 @@ void FuchsiaClearStreamDecryptor::OnOutputBufferPoolCreated(
void FuchsiaClearStreamDecryptor::OnOutputBufferPoolReaderCreated( void FuchsiaClearStreamDecryptor::OnOutputBufferPoolReaderCreated(
std::unique_ptr<SysmemBufferReader> reader) { std::unique_ptr<SysmemBufferReader> reader) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!reader) { if (!reader) {
LOG(ERROR) << "Fail to enable output buffer reader."; LOG(ERROR) << "Fail to enable output buffer reader.";
OnError(); OnError();
...@@ -372,6 +407,7 @@ void FuchsiaSecureStreamDecryptor::SetOutputBufferCollectionToken( ...@@ -372,6 +407,7 @@ void FuchsiaSecureStreamDecryptor::SetOutputBufferCollectionToken(
fuchsia::sysmem::BufferCollectionTokenPtr token, fuchsia::sysmem::BufferCollectionTokenPtr token,
size_t num_buffers_for_decryptor, size_t num_buffers_for_decryptor,
size_t num_buffers_for_codec) { size_t num_buffers_for_codec) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!complete_buffer_allocation_callback_); DCHECK(!complete_buffer_allocation_callback_);
complete_buffer_allocation_callback_ = complete_buffer_allocation_callback_ =
base::BindOnce(&StreamProcessorHelper::CompleteOutputBuffersAllocation, base::BindOnce(&StreamProcessorHelper::CompleteOutputBuffersAllocation,
...@@ -385,16 +421,22 @@ void FuchsiaSecureStreamDecryptor::SetOutputBufferCollectionToken( ...@@ -385,16 +421,22 @@ void FuchsiaSecureStreamDecryptor::SetOutputBufferCollectionToken(
void FuchsiaSecureStreamDecryptor::Decrypt( void FuchsiaSecureStreamDecryptor::Decrypt(
scoped_refptr<DecoderBuffer> encrypted) { scoped_refptr<DecoderBuffer> encrypted) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DecryptInternal(std::move(encrypted)); DecryptInternal(std::move(encrypted));
} }
void FuchsiaSecureStreamDecryptor::Reset() { void FuchsiaSecureStreamDecryptor::Reset() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
ResetStream(); ResetStream();
waiting_for_key_ = false; waiting_for_key_ = false;
} }
void FuchsiaSecureStreamDecryptor::AllocateOutputBuffers( void FuchsiaSecureStreamDecryptor::AllocateOutputBuffers(
const fuchsia::media::StreamBufferConstraints& stream_constraints) { const fuchsia::media::StreamBufferConstraints& stream_constraints) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (complete_buffer_allocation_callback_) { if (complete_buffer_allocation_callback_) {
std::move(complete_buffer_allocation_callback_).Run(); std::move(complete_buffer_allocation_callback_).Run();
} else { } else {
...@@ -403,20 +445,28 @@ void FuchsiaSecureStreamDecryptor::AllocateOutputBuffers( ...@@ -403,20 +445,28 @@ void FuchsiaSecureStreamDecryptor::AllocateOutputBuffers(
} }
void FuchsiaSecureStreamDecryptor::OnProcessEos() { void FuchsiaSecureStreamDecryptor::OnProcessEos() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
client_->OnDecryptorEndOfStreamPacket(); client_->OnDecryptorEndOfStreamPacket();
} }
void FuchsiaSecureStreamDecryptor::OnOutputPacket( void FuchsiaSecureStreamDecryptor::OnOutputPacket(
StreamProcessorHelper::IoPacket packet) { StreamProcessorHelper::IoPacket packet) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
client_->OnDecryptorOutputPacket(std::move(packet)); client_->OnDecryptorOutputPacket(std::move(packet));
} }
base::RepeatingClosure FuchsiaSecureStreamDecryptor::GetOnNewKeyClosure() { base::RepeatingClosure FuchsiaSecureStreamDecryptor::GetOnNewKeyClosure() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return BindToCurrentLoop(base::BindRepeating( return BindToCurrentLoop(base::BindRepeating(
&FuchsiaSecureStreamDecryptor::OnNewKey, weak_factory_.GetWeakPtr())); &FuchsiaSecureStreamDecryptor::OnNewKey, weak_factory_.GetWeakPtr()));
} }
void FuchsiaSecureStreamDecryptor::OnError() { void FuchsiaSecureStreamDecryptor::OnError() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
ResetStream(); ResetStream();
// No need to reset other fields since OnError() is called for non-recoverable // No need to reset other fields since OnError() is called for non-recoverable
...@@ -426,6 +476,7 @@ void FuchsiaSecureStreamDecryptor::OnError() { ...@@ -426,6 +476,7 @@ void FuchsiaSecureStreamDecryptor::OnError() {
} }
void FuchsiaSecureStreamDecryptor::OnNoKey() { void FuchsiaSecureStreamDecryptor::OnNoKey() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!waiting_for_key_); DCHECK(!waiting_for_key_);
// Reset stream position, but keep all pending buffers. They will be // Reset stream position, but keep all pending buffers. They will be
...@@ -443,6 +494,8 @@ void FuchsiaSecureStreamDecryptor::OnNoKey() { ...@@ -443,6 +494,8 @@ void FuchsiaSecureStreamDecryptor::OnNoKey() {
} }
void FuchsiaSecureStreamDecryptor::OnNewKey() { void FuchsiaSecureStreamDecryptor::OnNewKey() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!waiting_for_key_) { if (!waiting_for_key_) {
retry_on_no_key_ = true; retry_on_no_key_ = true;
return; return;
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include <memory> #include <memory>
#include "base/sequence_checker.h"
#include "media/base/decryptor.h" #include "media/base/decryptor.h"
#include "media/fuchsia/common/stream_processor_helper.h" #include "media/fuchsia/common/stream_processor_helper.h"
#include "media/fuchsia/common/sysmem_buffer_pool.h" #include "media/fuchsia/common/sysmem_buffer_pool.h"
...@@ -39,6 +40,8 @@ class FuchsiaStreamDecryptorBase : public StreamProcessorHelper::Client { ...@@ -39,6 +40,8 @@ class FuchsiaStreamDecryptorBase : public StreamProcessorHelper::Client {
SysmemBufferWriterQueue input_writer_queue_; SysmemBufferWriterQueue input_writer_queue_;
SEQUENCE_CHECKER(sequence_checker_);
private: private:
void OnInputBufferPoolCreated(std::unique_ptr<SysmemBufferPool> pool); void OnInputBufferPoolCreated(std::unique_ptr<SysmemBufferPool> pool);
void OnWriterCreated(std::unique_ptr<SysmemBufferWriter> writer); void OnWriterCreated(std::unique_ptr<SysmemBufferWriter> writer);
......
...@@ -84,9 +84,12 @@ StreamProcessorHelper::StreamProcessorHelper( ...@@ -84,9 +84,12 @@ StreamProcessorHelper::StreamProcessorHelper(
processor_->EnableOnStreamFailed(); processor_->EnableOnStreamFailed();
} }
StreamProcessorHelper::~StreamProcessorHelper() = default; StreamProcessorHelper::~StreamProcessorHelper() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
}
void StreamProcessorHelper::Process(IoPacket input) { void StreamProcessorHelper::Process(IoPacket input) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(processor_); DCHECK(processor_);
fuchsia::media::Packet packet; fuchsia::media::Packet packet;
...@@ -113,6 +116,7 @@ void StreamProcessorHelper::Process(IoPacket input) { ...@@ -113,6 +116,7 @@ void StreamProcessorHelper::Process(IoPacket input) {
} }
void StreamProcessorHelper::ProcessEos() { void StreamProcessorHelper::ProcessEos() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(processor_); DCHECK(processor_);
active_stream_ = true; active_stream_ = true;
...@@ -121,6 +125,8 @@ void StreamProcessorHelper::ProcessEos() { ...@@ -121,6 +125,8 @@ void StreamProcessorHelper::ProcessEos() {
} }
void StreamProcessorHelper::Reset() { void StreamProcessorHelper::Reset() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!active_stream_) { if (!active_stream_) {
// Nothing to do if we don't have an active stream. // Nothing to do if we don't have an active stream.
return; return;
...@@ -138,6 +144,8 @@ void StreamProcessorHelper::Reset() { ...@@ -138,6 +144,8 @@ void StreamProcessorHelper::Reset() {
void StreamProcessorHelper::OnStreamFailed(uint64_t stream_lifetime_ordinal, void StreamProcessorHelper::OnStreamFailed(uint64_t stream_lifetime_ordinal,
fuchsia::media::StreamError error) { fuchsia::media::StreamError error) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (stream_lifetime_ordinal_ != stream_lifetime_ordinal) { if (stream_lifetime_ordinal_ != stream_lifetime_ordinal) {
return; return;
} }
...@@ -155,6 +163,8 @@ void StreamProcessorHelper::OnStreamFailed(uint64_t stream_lifetime_ordinal, ...@@ -155,6 +163,8 @@ void StreamProcessorHelper::OnStreamFailed(uint64_t stream_lifetime_ordinal,
void StreamProcessorHelper::OnInputConstraints( void StreamProcessorHelper::OnInputConstraints(
fuchsia::media::StreamBufferConstraints constraints) { fuchsia::media::StreamBufferConstraints constraints) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
// Buffer lifetime ordinal is an odd number incremented by 2 for each buffer // Buffer lifetime ordinal is an odd number incremented by 2 for each buffer
// generation as required by StreamProcessor. // generation as required by StreamProcessor.
input_buffer_lifetime_ordinal_ += 2; input_buffer_lifetime_ordinal_ += 2;
...@@ -178,6 +188,8 @@ void StreamProcessorHelper::OnInputConstraints( ...@@ -178,6 +188,8 @@ void StreamProcessorHelper::OnInputConstraints(
void StreamProcessorHelper::OnFreeInputPacket( void StreamProcessorHelper::OnFreeInputPacket(
fuchsia::media::PacketHeader free_input_packet) { fuchsia::media::PacketHeader free_input_packet) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!free_input_packet.has_buffer_lifetime_ordinal() || if (!free_input_packet.has_buffer_lifetime_ordinal() ||
!free_input_packet.has_packet_index()) { !free_input_packet.has_packet_index()) {
DLOG(ERROR) << "Received OnFreeInputPacket() with missing required fields."; DLOG(ERROR) << "Received OnFreeInputPacket() with missing required fields.";
...@@ -206,6 +218,8 @@ void StreamProcessorHelper::OnFreeInputPacket( ...@@ -206,6 +218,8 @@ void StreamProcessorHelper::OnFreeInputPacket(
void StreamProcessorHelper::OnOutputConstraints( void StreamProcessorHelper::OnOutputConstraints(
fuchsia::media::StreamOutputConstraints output_constraints) { fuchsia::media::StreamOutputConstraints output_constraints) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!output_constraints.has_stream_lifetime_ordinal()) { if (!output_constraints.has_stream_lifetime_ordinal()) {
DLOG(ERROR) DLOG(ERROR)
<< "Received OnOutputConstraints() with missing required fields."; << "Received OnOutputConstraints() with missing required fields.";
...@@ -242,6 +256,8 @@ void StreamProcessorHelper::OnOutputConstraints( ...@@ -242,6 +256,8 @@ void StreamProcessorHelper::OnOutputConstraints(
void StreamProcessorHelper::OnOutputFormat( void StreamProcessorHelper::OnOutputFormat(
fuchsia::media::StreamOutputFormat output_format) { fuchsia::media::StreamOutputFormat output_format) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!output_format.has_stream_lifetime_ordinal() || if (!output_format.has_stream_lifetime_ordinal() ||
!output_format.has_format_details()) { !output_format.has_format_details()) {
DLOG(ERROR) << "Received OnOutputFormat() with missing required fields."; DLOG(ERROR) << "Received OnOutputFormat() with missing required fields.";
...@@ -259,6 +275,8 @@ void StreamProcessorHelper::OnOutputFormat( ...@@ -259,6 +275,8 @@ void StreamProcessorHelper::OnOutputFormat(
void StreamProcessorHelper::OnOutputPacket(fuchsia::media::Packet output_packet, void StreamProcessorHelper::OnOutputPacket(fuchsia::media::Packet output_packet,
bool error_detected_before, bool error_detected_before,
bool error_detected_during) { bool error_detected_during) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!output_packet.has_header() || if (!output_packet.has_header() ||
!output_packet.header().has_buffer_lifetime_ordinal() || !output_packet.header().has_buffer_lifetime_ordinal() ||
!output_packet.header().has_packet_index() || !output_packet.header().has_packet_index() ||
...@@ -299,6 +317,8 @@ void StreamProcessorHelper::OnOutputPacket(fuchsia::media::Packet output_packet, ...@@ -299,6 +317,8 @@ void StreamProcessorHelper::OnOutputPacket(fuchsia::media::Packet output_packet,
void StreamProcessorHelper::OnOutputEndOfStream( void StreamProcessorHelper::OnOutputEndOfStream(
uint64_t stream_lifetime_ordinal, uint64_t stream_lifetime_ordinal,
bool error_detected_before) { bool error_detected_before) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (stream_lifetime_ordinal != stream_lifetime_ordinal_) { if (stream_lifetime_ordinal != stream_lifetime_ordinal_) {
return; return;
} }
...@@ -316,6 +336,7 @@ void StreamProcessorHelper::OnError() { ...@@ -316,6 +336,7 @@ void StreamProcessorHelper::OnError() {
void StreamProcessorHelper::CompleteInputBuffersAllocation( void StreamProcessorHelper::CompleteInputBuffersAllocation(
fuchsia::sysmem::BufferCollectionTokenPtr sysmem_token) { fuchsia::sysmem::BufferCollectionTokenPtr sysmem_token) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!input_buffer_constraints_.IsEmpty()); DCHECK(!input_buffer_constraints_.IsEmpty());
fuchsia::media::StreamBufferPartialSettings settings; fuchsia::media::StreamBufferPartialSettings settings;
settings.set_buffer_lifetime_ordinal(input_buffer_lifetime_ordinal_); settings.set_buffer_lifetime_ordinal(input_buffer_lifetime_ordinal_);
...@@ -334,6 +355,7 @@ void StreamProcessorHelper::CompleteOutputBuffersAllocation( ...@@ -334,6 +355,7 @@ void StreamProcessorHelper::CompleteOutputBuffersAllocation(
size_t num_buffers_for_client, size_t num_buffers_for_client,
size_t num_buffers_for_server, size_t num_buffers_for_server,
fuchsia::sysmem::BufferCollectionTokenPtr collection_token) { fuchsia::sysmem::BufferCollectionTokenPtr collection_token) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!output_buffer_constraints_.IsEmpty()); DCHECK(!output_buffer_constraints_.IsEmpty());
DCHECK_LE(num_buffers_for_client, DCHECK_LE(num_buffers_for_client,
output_buffer_constraints_.packet_count_for_client_max()); output_buffer_constraints_.packet_count_for_client_max());
...@@ -354,6 +376,8 @@ void StreamProcessorHelper::CompleteOutputBuffersAllocation( ...@@ -354,6 +376,8 @@ void StreamProcessorHelper::CompleteOutputBuffersAllocation(
void StreamProcessorHelper::OnRecycleOutputBuffer( void StreamProcessorHelper::OnRecycleOutputBuffer(
uint64_t buffer_lifetime_ordinal, uint64_t buffer_lifetime_ordinal,
uint32_t packet_index) { uint32_t packet_index) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!processor_) if (!processor_)
return; return;
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "base/optional.h" #include "base/optional.h"
#include "base/threading/thread_checker.h"
#include "base/time/time.h" #include "base/time/time.h"
namespace media { namespace media {
...@@ -169,6 +170,9 @@ class StreamProcessorHelper { ...@@ -169,6 +170,9 @@ class StreamProcessorHelper {
fuchsia::media::StreamProcessorPtr processor_; fuchsia::media::StreamProcessorPtr processor_;
Client* const client_; Client* const client_;
// FIDL interfaces are thread-affine (see crbug.com/1012875).
THREAD_CHECKER(thread_checker_);
base::WeakPtr<StreamProcessorHelper> weak_this_; base::WeakPtr<StreamProcessorHelper> weak_this_;
base::WeakPtrFactory<StreamProcessorHelper> weak_factory_; base::WeakPtrFactory<StreamProcessorHelper> weak_factory_;
......
...@@ -31,11 +31,14 @@ SysmemBufferPool::Creator::Creator( ...@@ -31,11 +31,14 @@ SysmemBufferPool::Creator::Creator(
}); });
} }
SysmemBufferPool::Creator::~Creator() = default; SysmemBufferPool::Creator::~Creator() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
}
void SysmemBufferPool::Creator::Create( void SysmemBufferPool::Creator::Create(
fuchsia::sysmem::BufferCollectionConstraints constraints, fuchsia::sysmem::BufferCollectionConstraints constraints,
CreateCB create_cb) { CreateCB create_cb) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!create_cb_); DCHECK(!create_cb_);
create_cb_ = std::move(create_cb); create_cb_ = std::move(create_cb);
// BufferCollection needs to be synchronized to ensure that all token // BufferCollection needs to be synchronized to ensure that all token
...@@ -64,11 +67,13 @@ SysmemBufferPool::SysmemBufferPool( ...@@ -64,11 +67,13 @@ SysmemBufferPool::SysmemBufferPool(
} }
SysmemBufferPool::~SysmemBufferPool() { SysmemBufferPool::~SysmemBufferPool() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (collection_) if (collection_)
collection_->Close(); collection_->Close();
} }
fuchsia::sysmem::BufferCollectionTokenPtr SysmemBufferPool::TakeToken() { fuchsia::sysmem::BufferCollectionTokenPtr SysmemBufferPool::TakeToken() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!shared_tokens_.empty()); DCHECK(!shared_tokens_.empty());
auto token = std::move(shared_tokens_.back()); auto token = std::move(shared_tokens_.back());
shared_tokens_.pop_back(); shared_tokens_.pop_back();
...@@ -76,6 +81,7 @@ fuchsia::sysmem::BufferCollectionTokenPtr SysmemBufferPool::TakeToken() { ...@@ -76,6 +81,7 @@ fuchsia::sysmem::BufferCollectionTokenPtr SysmemBufferPool::TakeToken() {
} }
void SysmemBufferPool::CreateReader(CreateReaderCB create_cb) { void SysmemBufferPool::CreateReader(CreateReaderCB create_cb) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!create_reader_cb_); DCHECK(!create_reader_cb_);
create_reader_cb_ = std::move(create_cb); create_reader_cb_ = std::move(create_cb);
collection_->WaitForBuffersAllocated( collection_->WaitForBuffersAllocated(
...@@ -83,6 +89,7 @@ void SysmemBufferPool::CreateReader(CreateReaderCB create_cb) { ...@@ -83,6 +89,7 @@ void SysmemBufferPool::CreateReader(CreateReaderCB create_cb) {
} }
void SysmemBufferPool::CreateWriter(CreateWriterCB create_cb) { void SysmemBufferPool::CreateWriter(CreateWriterCB create_cb) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!create_writer_cb_); DCHECK(!create_writer_cb_);
create_writer_cb_ = std::move(create_cb); create_writer_cb_ = std::move(create_cb);
collection_->WaitForBuffersAllocated( collection_->WaitForBuffersAllocated(
...@@ -92,6 +99,8 @@ void SysmemBufferPool::CreateWriter(CreateWriterCB create_cb) { ...@@ -92,6 +99,8 @@ void SysmemBufferPool::CreateWriter(CreateWriterCB create_cb) {
void SysmemBufferPool::OnBuffersAllocated( void SysmemBufferPool::OnBuffersAllocated(
zx_status_t status, zx_status_t status,
fuchsia::sysmem::BufferCollectionInfo_2 buffer_collection_info) { fuchsia::sysmem::BufferCollectionInfo_2 buffer_collection_info) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (status != ZX_OK) { if (status != ZX_OK) {
ZX_LOG(ERROR, status) << "Fail to allocate sysmem buffers."; ZX_LOG(ERROR, status) << "Fail to allocate sysmem buffers.";
OnError(); OnError();
...@@ -108,6 +117,7 @@ void SysmemBufferPool::OnBuffersAllocated( ...@@ -108,6 +117,7 @@ void SysmemBufferPool::OnBuffersAllocated(
} }
void SysmemBufferPool::OnError() { void SysmemBufferPool::OnError() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
collection_.Unbind(); collection_.Unbind();
if (create_reader_cb_) if (create_reader_cb_)
std::move(create_reader_cb_).Run(nullptr); std::move(create_reader_cb_).Run(nullptr);
...@@ -132,6 +142,8 @@ BufferAllocator::~BufferAllocator() = default; ...@@ -132,6 +142,8 @@ BufferAllocator::~BufferAllocator() = default;
std::unique_ptr<SysmemBufferPool::Creator> std::unique_ptr<SysmemBufferPool::Creator>
BufferAllocator::MakeBufferPoolCreator(size_t num_of_tokens) { BufferAllocator::MakeBufferPoolCreator(size_t num_of_tokens) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
// Create a new sysmem buffer collection token for the allocated buffers. // Create a new sysmem buffer collection token for the allocated buffers.
fuchsia::sysmem::BufferCollectionTokenPtr collection_token; fuchsia::sysmem::BufferCollectionTokenPtr collection_token;
allocator_->AllocateSharedCollection(collection_token.NewRequest()); allocator_->AllocateSharedCollection(collection_token.NewRequest());
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "base/callback.h" #include "base/callback.h"
#include "base/containers/span.h" #include "base/containers/span.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/threading/thread_checker.h"
namespace media { namespace media {
...@@ -50,6 +51,8 @@ class SysmemBufferPool { ...@@ -50,6 +51,8 @@ class SysmemBufferPool {
std::vector<fuchsia::sysmem::BufferCollectionTokenPtr> shared_tokens_; std::vector<fuchsia::sysmem::BufferCollectionTokenPtr> shared_tokens_;
CreateCB create_cb_; CreateCB create_cb_;
THREAD_CHECKER(thread_checker_);
DISALLOW_COPY_AND_ASSIGN(Creator); DISALLOW_COPY_AND_ASSIGN(Creator);
}; };
...@@ -83,6 +86,9 @@ class SysmemBufferPool { ...@@ -83,6 +86,9 @@ class SysmemBufferPool {
CreateReaderCB create_reader_cb_; CreateReaderCB create_reader_cb_;
CreateWriterCB create_writer_cb_; CreateWriterCB create_writer_cb_;
// FIDL interfaces are thread-affine (see crbug.com/1012875).
THREAD_CHECKER(thread_checker_);
DISALLOW_COPY_AND_ASSIGN(SysmemBufferPool); DISALLOW_COPY_AND_ASSIGN(SysmemBufferPool);
}; };
...@@ -102,6 +108,8 @@ class BufferAllocator { ...@@ -102,6 +108,8 @@ class BufferAllocator {
private: private:
fuchsia::sysmem::AllocatorPtr allocator_; fuchsia::sysmem::AllocatorPtr allocator_;
THREAD_CHECKER(thread_checker_);
DISALLOW_COPY_AND_ASSIGN(BufferAllocator); DISALLOW_COPY_AND_ASSIGN(BufferAllocator);
}; };
......
...@@ -47,6 +47,7 @@ SysmemBufferWriterQueue::~SysmemBufferWriterQueue() = default; ...@@ -47,6 +47,7 @@ SysmemBufferWriterQueue::~SysmemBufferWriterQueue() = default;
void SysmemBufferWriterQueue::EnqueueBuffer( void SysmemBufferWriterQueue::EnqueueBuffer(
scoped_refptr<DecoderBuffer> buffer) { scoped_refptr<DecoderBuffer> buffer) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
pending_buffers_.push_back(PendingBuffer(buffer)); pending_buffers_.push_back(PendingBuffer(buffer));
PumpPackets(); PumpPackets();
} }
...@@ -54,6 +55,7 @@ void SysmemBufferWriterQueue::EnqueueBuffer( ...@@ -54,6 +55,7 @@ void SysmemBufferWriterQueue::EnqueueBuffer(
void SysmemBufferWriterQueue::Start(std::unique_ptr<SysmemBufferWriter> writer, void SysmemBufferWriterQueue::Start(std::unique_ptr<SysmemBufferWriter> writer,
SendPacketCB send_packet_cb, SendPacketCB send_packet_cb,
EndOfStreamCB end_of_stream_cb) { EndOfStreamCB end_of_stream_cb) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!writer_); DCHECK(!writer_);
writer_ = std::move(writer); writer_ = std::move(writer);
...@@ -64,6 +66,7 @@ void SysmemBufferWriterQueue::Start(std::unique_ptr<SysmemBufferWriter> writer, ...@@ -64,6 +66,7 @@ void SysmemBufferWriterQueue::Start(std::unique_ptr<SysmemBufferWriter> writer,
} }
void SysmemBufferWriterQueue::PumpPackets() { void SysmemBufferWriterQueue::PumpPackets() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
auto weak_this = weak_factory_.GetWeakPtr(); auto weak_this = weak_factory_.GetWeakPtr();
while (writer_ && !is_paused_ && while (writer_ && !is_paused_ &&
...@@ -112,18 +115,21 @@ void SysmemBufferWriterQueue::PumpPackets() { ...@@ -112,18 +115,21 @@ void SysmemBufferWriterQueue::PumpPackets() {
} }
void SysmemBufferWriterQueue::ResetQueue() { void SysmemBufferWriterQueue::ResetQueue() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
pending_buffers_.clear(); pending_buffers_.clear();
input_queue_position_ = 0; input_queue_position_ = 0;
is_paused_ = false; is_paused_ = false;
} }
void SysmemBufferWriterQueue::ResetBuffers() { void SysmemBufferWriterQueue::ResetBuffers() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
writer_.reset(); writer_.reset();
send_packet_cb_ = SendPacketCB(); send_packet_cb_ = SendPacketCB();
end_of_stream_cb_ = EndOfStreamCB(); end_of_stream_cb_ = EndOfStreamCB();
} }
void SysmemBufferWriterQueue::ResetPositionAndPause() { void SysmemBufferWriterQueue::ResetPositionAndPause() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
for (auto& buffer : pending_buffers_) { for (auto& buffer : pending_buffers_) {
buffer.buffer_pos = 0; buffer.buffer_pos = 0;
buffer.is_complete = false; buffer.is_complete = false;
...@@ -133,12 +139,14 @@ void SysmemBufferWriterQueue::ResetPositionAndPause() { ...@@ -133,12 +139,14 @@ void SysmemBufferWriterQueue::ResetPositionAndPause() {
} }
void SysmemBufferWriterQueue::Unpause() { void SysmemBufferWriterQueue::Unpause() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(is_paused_); DCHECK(is_paused_);
is_paused_ = false; is_paused_ = false;
PumpPackets(); PumpPackets();
} }
void SysmemBufferWriterQueue::ReleaseBuffer(size_t buffer_index) { void SysmemBufferWriterQueue::ReleaseBuffer(size_t buffer_index) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(writer_); DCHECK(writer_);
// Mark the input buffer as complete. // Mark the input buffer as complete.
...@@ -164,6 +172,7 @@ void SysmemBufferWriterQueue::ReleaseBuffer(size_t buffer_index) { ...@@ -164,6 +172,7 @@ void SysmemBufferWriterQueue::ReleaseBuffer(size_t buffer_index) {
} }
size_t SysmemBufferWriterQueue::num_buffers() const { size_t SysmemBufferWriterQueue::num_buffers() const {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
return writer_ ? writer_->num_buffers() : 0; return writer_ ? writer_->num_buffers() : 0;
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "base/containers/span.h" #include "base/containers/span.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "base/optional.h" #include "base/optional.h"
#include "base/threading/thread_checker.h"
#include "media/fuchsia/common/stream_processor_helper.h" #include "media/fuchsia/common/stream_processor_helper.h"
#include "media/fuchsia/common/sysmem_buffer_writer.h" #include "media/fuchsia/common/sysmem_buffer_writer.h"
...@@ -94,6 +95,9 @@ class SysmemBufferWriterQueue { ...@@ -94,6 +95,9 @@ class SysmemBufferWriterQueue {
SendPacketCB send_packet_cb_; SendPacketCB send_packet_cb_;
EndOfStreamCB end_of_stream_cb_; EndOfStreamCB end_of_stream_cb_;
// FIDL interfaces are thread-affine (see crbug.com/1012875).
THREAD_CHECKER(thread_checker_);
base::WeakPtrFactory<SysmemBufferWriterQueue> weak_factory_{this}; base::WeakPtrFactory<SysmemBufferWriterQueue> weak_factory_{this};
DISALLOW_COPY_AND_ASSIGN(SysmemBufferWriterQueue); DISALLOW_COPY_AND_ASSIGN(SysmemBufferWriterQueue);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment