Commit 2c4150a2 authored by Xiaohan Wang's avatar Xiaohan Wang Committed by Commit Bot

media: Reduce latency in MojoDecoderBufferConverter

Today we always write and read from the DataPipe asynchronously. This
might introduce unnecessary latency since the DataPipe might already be
readable/writable when we schedule the read/write.

This also indirectly contributes to out-of-order dispatch of Decode()
and Reset() calls in some MojoDecoderBufferConverter clients, e.g.
MojoAudioDecoderService, MojoVideoDecoderService, MojoDecryptorService.
See BUG for more details. Unit tests are added to cover this issue.

Note that this CL does not completely fix the BUG. A follow up CL will
come next, with more tests added.

BUG=792281
TEST=More tests added.

Change-Id: Ia6dc25f8621cca2a041b6299b104de2074c06a02
Reviewed-on: https://chromium-review.googlesource.com/818329
Commit-Queue: Xiaohan Wang <xhwang@chromium.org>
Reviewed-by: default avatarDan Sanders <sandersd@chromium.org>
Reviewed-by: default avatarXiangjun Zhang <xjz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#523246}
parent 19ce2ec0
...@@ -130,6 +130,9 @@ class MEDIA_EXPORT Decryptor { ...@@ -130,6 +130,9 @@ class MEDIA_EXPORT Decryptor {
// end-of-stream DecoderBuffer until no frame/buffer can be produced. // end-of-stream DecoderBuffer until no frame/buffer can be produced.
// These methods can only be called after the corresponding decoder has // These methods can only be called after the corresponding decoder has
// been successfully initialized. // been successfully initialized.
// DecryptAndDecodeAudio() should not be called until any previous
// AudioDecodeCB has completed. Thus, only one AudioDecodeCB may be pending at
// any time. Same for DecryptAndDecodeVideo();
virtual void DecryptAndDecodeAudio( virtual void DecryptAndDecodeAudio(
const scoped_refptr<DecoderBuffer>& encrypted, const scoped_refptr<DecoderBuffer>& encrypted,
const AudioDecodeCB& audio_decode_cb) = 0; const AudioDecodeCB& audio_decode_cb) = 0;
......
...@@ -142,13 +142,23 @@ class MojoAudioDecoderTest : public ::testing::Test { ...@@ -142,13 +142,23 @@ class MojoAudioDecoderTest : public ::testing::Test {
void Initialize() { InitializeAndExpect(true); } void Initialize() { InitializeAndExpect(true); }
void Decode() {
scoped_refptr<DecoderBuffer> buffer(new DecoderBuffer(100));
mojo_audio_decoder_->Decode(
buffer,
base::Bind(&MojoAudioDecoderTest::OnDecoded, base::Unretained(this)));
}
void Reset() { void Reset() {
mojo_audio_decoder_->Reset(
base::Bind(&MojoAudioDecoderTest::OnReset, base::Unretained(this)));
}
void ResetAndWaitUntilFinish() {
DVLOG(1) << __func__; DVLOG(1) << __func__;
EXPECT_CALL(*this, OnReset()) EXPECT_CALL(*this, OnReset())
.WillOnce(InvokeWithoutArgs(this, &MojoAudioDecoderTest::QuitLoop)); .WillOnce(InvokeWithoutArgs(this, &MojoAudioDecoderTest::QuitLoop));
Reset();
mojo_audio_decoder_->Reset(
base::Bind(&MojoAudioDecoderTest::OnReset, base::Unretained(this)));
RunLoop(); RunLoop();
} }
...@@ -184,11 +194,15 @@ class MojoAudioDecoderTest : public ::testing::Test { ...@@ -184,11 +194,15 @@ class MojoAudioDecoderTest : public ::testing::Test {
Decode(); Decode();
} }
void Decode() { void DecodeAndReset() {
scoped_refptr<DecoderBuffer> buffer(new DecoderBuffer(100)); InSequence s; // Make sure all callbacks are fired in order.
mojo_audio_decoder_->Decode( EXPECT_CALL(*this, OnOutput(_)).Times(kOutputPerDecode);
buffer, EXPECT_CALL(*this, OnDecoded(DecodeStatus::OK));
base::Bind(&MojoAudioDecoderTest::OnDecoded, base::Unretained(this))); EXPECT_CALL(*this, OnReset())
.WillOnce(InvokeWithoutArgs(this, &MojoAudioDecoderTest::QuitLoop));
Decode();
Reset();
RunLoop();
} }
base::MessageLoop message_loop_; base::MessageLoop message_loop_;
...@@ -227,7 +241,7 @@ TEST_F(MojoAudioDecoderTest, Initialize_Success) { ...@@ -227,7 +241,7 @@ TEST_F(MojoAudioDecoderTest, Initialize_Success) {
TEST_F(MojoAudioDecoderTest, Reinitialize_Success) { TEST_F(MojoAudioDecoderTest, Reinitialize_Success) {
Initialize(); Initialize();
DecodeMultipleTimes(10); DecodeMultipleTimes(10);
Reset(); ResetAndWaitUntilFinish();
// Reinitialize MojoAudioDecoder. // Reinitialize MojoAudioDecoder.
Initialize(); Initialize();
...@@ -243,6 +257,12 @@ TEST_F(MojoAudioDecoderTest, Decode_MultipleTimes) { ...@@ -243,6 +257,12 @@ TEST_F(MojoAudioDecoderTest, Decode_MultipleTimes) {
DecodeMultipleTimes(100); DecodeMultipleTimes(100);
} }
TEST_F(MojoAudioDecoderTest, Reset_DuringDecode) {
Initialize();
DecodeAndReset();
}
// TODO(xhwang): Add more tests. // TODO(xhwang): Add more tests.
} // namespace media } // namespace media
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
#include "base/test/test_message_loop.h" #include "base/test/test_message_loop.h"
#include "media/base/decryptor.h" #include "media/base/decryptor.h"
#include "media/base/mock_filters.h" #include "media/base/mock_filters.h"
#include "media/base/test_helpers.h"
#include "media/base/timestamp_constants.h"
#include "media/base/video_frame.h" #include "media/base/video_frame.h"
#include "media/mojo/clients/mojo_decryptor.h" #include "media/mojo/clients/mojo_decryptor.h"
#include "media/mojo/common/mojo_shared_buffer_video_frame.h" #include "media/mojo/common/mojo_shared_buffer_video_frame.h"
...@@ -58,8 +60,9 @@ class MojoDecryptorTest : public ::testing::Test { ...@@ -58,8 +60,9 @@ class MojoDecryptorTest : public ::testing::Test {
mojo_decryptor_service_.reset(); mojo_decryptor_service_.reset();
} }
void ReturnSimpleVideoFrame(const scoped_refptr<DecoderBuffer>& encrypted, void ReturnSharedBufferVideoFrame(
const Decryptor::VideoDecodeCB& video_decode_cb) { const scoped_refptr<DecoderBuffer>& encrypted,
const Decryptor::VideoDecodeCB& video_decode_cb) {
// We don't care about the encrypted data, just create a simple VideoFrame. // We don't care about the encrypted data, just create a simple VideoFrame.
scoped_refptr<VideoFrame> frame( scoped_refptr<VideoFrame> frame(
MojoSharedBufferVideoFrame::CreateDefaultI420( MojoSharedBufferVideoFrame::CreateDefaultI420(
...@@ -73,12 +76,28 @@ class MojoDecryptorTest : public ::testing::Test { ...@@ -73,12 +76,28 @@ class MojoDecryptorTest : public ::testing::Test {
video_decode_cb.Run(Decryptor::kSuccess, std::move(frame)); video_decode_cb.Run(Decryptor::kSuccess, std::move(frame));
} }
void ReturnAudioFrames(const scoped_refptr<DecoderBuffer>& encrypted,
const Decryptor::AudioDecodeCB& audio_decode_cb) {
const ChannelLayout kChannelLayout = CHANNEL_LAYOUT_4_0;
const int kSampleRate = 48000;
const base::TimeDelta start_time = base::TimeDelta::FromSecondsD(1000.0);
auto audio_buffer = MakeAudioBuffer<float>(
kSampleFormatPlanarF32, kChannelLayout,
ChannelLayoutToChannelCount(kChannelLayout), kSampleRate, 0.0f, 1.0f,
kSampleRate / 10, start_time);
Decryptor::AudioFrames audio_frames = {audio_buffer};
audio_decode_cb.Run(Decryptor::kSuccess, audio_frames);
}
void ReturnEOSVideoFrame(const scoped_refptr<DecoderBuffer>& encrypted, void ReturnEOSVideoFrame(const scoped_refptr<DecoderBuffer>& encrypted,
const Decryptor::VideoDecodeCB& video_decode_cb) { const Decryptor::VideoDecodeCB& video_decode_cb) {
// Simply create and return an End-Of-Stream VideoFrame. // Simply create and return an End-Of-Stream VideoFrame.
video_decode_cb.Run(Decryptor::kSuccess, VideoFrame::CreateEOSFrame()); video_decode_cb.Run(Decryptor::kSuccess, VideoFrame::CreateEOSFrame());
} }
MOCK_METHOD2(AudioDecoded,
void(Decryptor::Status status,
const Decryptor::AudioFrames& frames));
MOCK_METHOD2(VideoDecoded, MOCK_METHOD2(VideoDecoded,
void(Decryptor::Status status, void(Decryptor::Status status,
const scoped_refptr<VideoFrame>& frame)); const scoped_refptr<VideoFrame>& frame));
...@@ -102,6 +121,48 @@ class MojoDecryptorTest : public ::testing::Test { ...@@ -102,6 +121,48 @@ class MojoDecryptorTest : public ::testing::Test {
DISALLOW_COPY_AND_ASSIGN(MojoDecryptorTest); DISALLOW_COPY_AND_ASSIGN(MojoDecryptorTest);
}; };
// DecryptAndDecodeAudio() and ResetDecoder(kAudio) immediately.
TEST_F(MojoDecryptorTest, ResetDuringDecryptAndDecodeAudio) {
{
// Make sure calls are made in order.
InSequence seq;
EXPECT_CALL(*decryptor_, DecryptAndDecodeAudio(_, _))
.WillOnce(Invoke(this, &MojoDecryptorTest::ReturnAudioFrames));
EXPECT_CALL(*decryptor_, ResetDecoder(Decryptor::kAudio));
// The returned status could be success or aborted.
EXPECT_CALL(*this, AudioDecoded(_, _));
}
scoped_refptr<DecoderBuffer> buffer(new DecoderBuffer(100));
mojo_decryptor_->DecryptAndDecodeAudio(
std::move(buffer),
base::Bind(&MojoDecryptorTest::AudioDecoded, base::Unretained(this)));
mojo_decryptor_->ResetDecoder(Decryptor::kAudio);
base::RunLoop().RunUntilIdle();
}
// DecryptAndDecodeVideo() and ResetDecoder(kVideo) immediately.
TEST_F(MojoDecryptorTest, ResetDuringDecryptAndDecodeVideo) {
{
// Make sure calls are made in order.
InSequence seq;
EXPECT_CALL(*decryptor_, DecryptAndDecodeVideo(_, _))
.WillOnce(
Invoke(this, &MojoDecryptorTest::ReturnSharedBufferVideoFrame));
EXPECT_CALL(*decryptor_, ResetDecoder(Decryptor::kVideo));
// The returned status could be success or aborted.
EXPECT_CALL(*this, VideoDecoded(_, _));
EXPECT_CALL(*this, OnFrameDestroyed());
}
scoped_refptr<DecoderBuffer> buffer(new DecoderBuffer(100));
mojo_decryptor_->DecryptAndDecodeVideo(
std::move(buffer),
base::Bind(&MojoDecryptorTest::VideoDecoded, base::Unretained(this)));
mojo_decryptor_->ResetDecoder(Decryptor::kVideo);
base::RunLoop().RunUntilIdle();
}
TEST_F(MojoDecryptorTest, VideoDecodeFreesBuffer) { TEST_F(MojoDecryptorTest, VideoDecodeFreesBuffer) {
// Call DecryptAndDecodeVideo(). Once the callback VideoDecoded() completes, // Call DecryptAndDecodeVideo(). Once the callback VideoDecoded() completes,
// the frame will be destroyed, and the buffer will be released. // the frame will be destroyed, and the buffer will be released.
...@@ -111,7 +172,7 @@ TEST_F(MojoDecryptorTest, VideoDecodeFreesBuffer) { ...@@ -111,7 +172,7 @@ TEST_F(MojoDecryptorTest, VideoDecodeFreesBuffer) {
EXPECT_CALL(*this, OnFrameDestroyed()); EXPECT_CALL(*this, OnFrameDestroyed());
} }
EXPECT_CALL(*decryptor_, DecryptAndDecodeVideo(_, _)) EXPECT_CALL(*decryptor_, DecryptAndDecodeVideo(_, _))
.WillOnce(Invoke(this, &MojoDecryptorTest::ReturnSimpleVideoFrame)); .WillOnce(Invoke(this, &MojoDecryptorTest::ReturnSharedBufferVideoFrame));
scoped_refptr<DecoderBuffer> buffer(new DecoderBuffer(100)); scoped_refptr<DecoderBuffer> buffer(new DecoderBuffer(100));
mojo_decryptor_->DecryptAndDecodeVideo( mojo_decryptor_->DecryptAndDecodeVideo(
...@@ -127,7 +188,8 @@ TEST_F(MojoDecryptorTest, VideoDecodeFreesMultipleBuffers) { ...@@ -127,7 +188,8 @@ TEST_F(MojoDecryptorTest, VideoDecodeFreesMultipleBuffers) {
.Times(TIMES); .Times(TIMES);
EXPECT_CALL(*this, OnFrameDestroyed()).Times(TIMES); EXPECT_CALL(*this, OnFrameDestroyed()).Times(TIMES);
EXPECT_CALL(*decryptor_, DecryptAndDecodeVideo(_, _)) EXPECT_CALL(*decryptor_, DecryptAndDecodeVideo(_, _))
.WillRepeatedly(Invoke(this, &MojoDecryptorTest::ReturnSimpleVideoFrame)); .WillRepeatedly(
Invoke(this, &MojoDecryptorTest::ReturnSharedBufferVideoFrame));
for (int i = 0; i < TIMES; ++i) { for (int i = 0; i < TIMES; ++i) {
scoped_refptr<DecoderBuffer> buffer(new DecoderBuffer(100)); scoped_refptr<DecoderBuffer> buffer(new DecoderBuffer(100));
...@@ -148,7 +210,8 @@ TEST_F(MojoDecryptorTest, VideoDecodeHoldThenFreeBuffers) { ...@@ -148,7 +210,8 @@ TEST_F(MojoDecryptorTest, VideoDecodeHoldThenFreeBuffers) {
.WillOnce(SaveArg<1>(&saved_frame1)) .WillOnce(SaveArg<1>(&saved_frame1))
.WillOnce(SaveArg<1>(&saved_frame2)); .WillOnce(SaveArg<1>(&saved_frame2));
EXPECT_CALL(*decryptor_, DecryptAndDecodeVideo(_, _)) EXPECT_CALL(*decryptor_, DecryptAndDecodeVideo(_, _))
.WillRepeatedly(Invoke(this, &MojoDecryptorTest::ReturnSimpleVideoFrame)); .WillRepeatedly(
Invoke(this, &MojoDecryptorTest::ReturnSharedBufferVideoFrame));
for (int i = 0; i < 2; ++i) { for (int i = 0; i < 2; ++i) {
scoped_refptr<DecoderBuffer> buffer(new DecoderBuffer(100)); scoped_refptr<DecoderBuffer> buffer(new DecoderBuffer(100));
......
...@@ -80,47 +80,50 @@ MojoDecoderBufferReader::MojoDecoderBufferReader( ...@@ -80,47 +80,50 @@ MojoDecoderBufferReader::MojoDecoderBufferReader(
MojoDecoderBufferReader::~MojoDecoderBufferReader() { MojoDecoderBufferReader::~MojoDecoderBufferReader() {
DVLOG(1) << __func__; DVLOG(1) << __func__;
CancelAllPendingReadCBs();
} }
void MojoDecoderBufferReader::CancelReadCB(ReadCB read_cb) { void MojoDecoderBufferReader::CancelReadCB(ReadCB read_cb) {
DVLOG(1) << "Failed to read DecoderBuffer becuase the pipe is already closed"; DVLOG(1) << "Failed to read DecoderBuffer because the pipe is already closed";
std::move(read_cb).Run(nullptr); std::move(read_cb).Run(nullptr);
} }
void MojoDecoderBufferReader::CancelAllPendingReadCBs() {
while (!pending_read_cbs_.empty()) {
ReadCB read_cb = std::move(pending_read_cbs_.front());
pending_read_cbs_.pop_front();
// TODO(sandersd): Make sure there are no possible re-entrancy issues
// here. Perhaps these should be posted, or merged into a single error
// callback?
CancelReadCB(std::move(read_cb));
}
}
void MojoDecoderBufferReader::CompleteCurrentRead() { void MojoDecoderBufferReader::CompleteCurrentRead() {
DVLOG(4) << __func__; DVLOG(4) << __func__;
bytes_read_ = 0;
ReadCB read_cb = std::move(pending_read_cbs_.front()); ReadCB read_cb = std::move(pending_read_cbs_.front());
pending_read_cbs_.pop_front(); pending_read_cbs_.pop_front();
scoped_refptr<DecoderBuffer> buffer = std::move(pending_buffers_.front()); scoped_refptr<DecoderBuffer> buffer = std::move(pending_buffers_.front());
pending_buffers_.pop_front(); pending_buffers_.pop_front();
DCHECK(buffer->end_of_stream() || buffer->data_size() == bytes_read_);
bytes_read_ = 0;
std::move(read_cb).Run(std::move(buffer)); std::move(read_cb).Run(std::move(buffer));
} }
void MojoDecoderBufferReader::ScheduleNextRead() { void MojoDecoderBufferReader::ScheduleNextRead() {
DVLOG(4) << __func__; DVLOG(4) << __func__;
DCHECK(!armed_);
DCHECK(!pending_buffers_.empty());
// Do nothing if a read is already scheduled. armed_ = true;
if (armed_) pipe_watcher_.ArmOrNotify();
return;
// Immediately complete empty reads.
// A non-EOS buffer can have zero size. See http://crbug.com/663438
while (!pending_buffers_.empty() &&
(pending_buffers_.front()->end_of_stream() ||
pending_buffers_.front()->data_size() == 0)) {
// TODO(sandersd): Make sure there are no possible re-entrancy issues here.
// Perhaps read callbacks should be posted?
CompleteCurrentRead();
}
// Request a callback to issue the DataPipe read.
if (!pending_buffers_.empty()) {
armed_ = true;
pipe_watcher_.ArmOrNotify();
}
} }
// TODO(xhwang): Move this up to match declaration order.
void MojoDecoderBufferReader::ReadDecoderBuffer( void MojoDecoderBufferReader::ReadDecoderBuffer(
mojom::DecoderBufferPtr mojo_buffer, mojom::DecoderBufferPtr mojo_buffer,
ReadCB read_cb) { ReadCB read_cb) {
...@@ -140,7 +143,13 @@ void MojoDecoderBufferReader::ReadDecoderBuffer( ...@@ -140,7 +143,13 @@ void MojoDecoderBufferReader::ReadDecoderBuffer(
// are zero-sized. // are zero-sized.
pending_read_cbs_.push_back(std::move(read_cb)); pending_read_cbs_.push_back(std::move(read_cb));
pending_buffers_.push_back(std::move(media_buffer)); pending_buffers_.push_back(std::move(media_buffer));
ScheduleNextRead();
// Do nothing if a read is already scheduled.
if (armed_)
return;
// To reduce latency, always process pending reads immediately.
ProcessPendingReads();
} }
void MojoDecoderBufferReader::OnPipeReadable( void MojoDecoderBufferReader::OnPipeReadable(
...@@ -148,6 +157,10 @@ void MojoDecoderBufferReader::OnPipeReadable( ...@@ -148,6 +157,10 @@ void MojoDecoderBufferReader::OnPipeReadable(
const mojo::HandleSignalsState& state) { const mojo::HandleSignalsState& state) {
DVLOG(4) << __func__ << "(" << result << ", " << state.readable() << ")"; DVLOG(4) << __func__ << "(" << result << ", " << state.readable() << ")";
// |MOJO_RESULT_CANCELLED| may be dispatched even while the SimpleWatcher
// is disarmed, and no further notifications will be dispatched after that.
DCHECK(armed_ || result == MOJO_RESULT_CANCELLED);
armed_ = false; armed_ = false;
if (result != MOJO_RESULT_OK) { if (result != MOJO_RESULT_OK) {
...@@ -156,36 +169,59 @@ void MojoDecoderBufferReader::OnPipeReadable( ...@@ -156,36 +169,59 @@ void MojoDecoderBufferReader::OnPipeReadable(
} }
DCHECK(state.readable()); DCHECK(state.readable());
ReadDecoderBufferData(); ProcessPendingReads();
} }
void MojoDecoderBufferReader::ReadDecoderBufferData() { void MojoDecoderBufferReader::ProcessPendingReads() {
DVLOG(4) << __func__; DVLOG(4) << __func__;
DCHECK(!armed_);
DCHECK(!pending_buffers_.empty()); DCHECK(!pending_buffers_.empty());
DecoderBuffer* buffer = pending_buffers_.front().get();
uint32_t buffer_size = base::checked_cast<uint32_t>(buffer->data_size());
DCHECK_GT(buffer_size, 0u);
uint32_t num_bytes = buffer_size - bytes_read_; while (!pending_buffers_.empty()) {
DCHECK_GT(num_bytes, 0u); DecoderBuffer* buffer = pending_buffers_.front().get();
MojoResult result = uint32_t buffer_size = 0u;
consumer_handle_->ReadData(buffer->writable_data() + bytes_read_, if (!pending_buffers_.front()->end_of_stream())
&num_bytes, MOJO_WRITE_DATA_FLAG_NONE); buffer_size = base::checked_cast<uint32_t>(buffer->data_size());
if (IsPipeReadWriteError(result)) { // Immediately complete empty reads.
OnPipeError(result); // A non-EOS buffer can have zero size. See http://crbug.com/663438
} else { if (buffer_size == 0) {
if (result == MOJO_RESULT_OK) {
DCHECK_GT(num_bytes, 0u);
bytes_read_ += num_bytes;
// TODO(sandersd): Make sure there are no possible re-entrancy issues // TODO(sandersd): Make sure there are no possible re-entrancy issues
// here. // here. Perhaps read callbacks should be posted?
if (bytes_read_ == buffer_size) CompleteCurrentRead();
CompleteCurrentRead(); continue;
} }
ScheduleNextRead();
// We may be starting to read a new buffer (|bytes_read_| == 0), or
// recovering from a previous partial read (|bytes_read_| > 0).
DCHECK_GT(buffer_size, bytes_read_);
uint32_t num_bytes = buffer_size - bytes_read_;
MojoResult result =
consumer_handle_->ReadData(buffer->writable_data() + bytes_read_,
&num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
if (IsPipeReadWriteError(result)) {
OnPipeError(result);
return;
}
if (result == MOJO_RESULT_SHOULD_WAIT) {
ScheduleNextRead();
return;
}
DCHECK_EQ(result, MOJO_RESULT_OK);
DCHECK_GT(num_bytes, 0u);
bytes_read_ += num_bytes;
// TODO(sandersd): Make sure there are no possible re-entrancy issues
// here.
if (bytes_read_ == buffer_size)
CompleteCurrentRead();
// Since we can still read, try to read more.
} }
} }
...@@ -201,14 +237,7 @@ void MojoDecoderBufferReader::OnPipeError(MojoResult result) { ...@@ -201,14 +237,7 @@ void MojoDecoderBufferReader::OnPipeError(MojoResult result) {
<< ", num_bytes(read)=" << bytes_read_; << ", num_bytes(read)=" << bytes_read_;
bytes_read_ = 0; bytes_read_ = 0;
pending_buffers_.clear(); pending_buffers_.clear();
while (!pending_read_cbs_.empty()) { CancelAllPendingReadCBs();
ReadCB read_cb = std::move(pending_read_cbs_.front());
pending_read_cbs_.pop_front();
// TODO(sandersd): Make sure there are no possible re-entrancy issues
// here. Perhaps these should be posted, or merged into a single error
// callback?
CancelReadCB(std::move(read_cb));
}
} }
} }
...@@ -251,16 +280,11 @@ MojoDecoderBufferWriter::~MojoDecoderBufferWriter() { ...@@ -251,16 +280,11 @@ MojoDecoderBufferWriter::~MojoDecoderBufferWriter() {
void MojoDecoderBufferWriter::ScheduleNextWrite() { void MojoDecoderBufferWriter::ScheduleNextWrite() {
DVLOG(4) << __func__; DVLOG(4) << __func__;
DCHECK(!armed_);
DCHECK(!pending_buffers_.empty());
// Do nothing if a write is already scheduled. armed_ = true;
if (armed_) pipe_watcher_.ArmOrNotify();
return;
// Request a callback to issue the DataPipe write.
if (!pending_buffers_.empty()) {
armed_ = true;
pipe_watcher_.ArmOrNotify();
}
} }
mojom::DecoderBufferPtr MojoDecoderBufferWriter::WriteDecoderBuffer( mojom::DecoderBufferPtr MojoDecoderBufferWriter::WriteDecoderBuffer(
...@@ -271,7 +295,7 @@ mojom::DecoderBufferPtr MojoDecoderBufferWriter::WriteDecoderBuffer( ...@@ -271,7 +295,7 @@ mojom::DecoderBufferPtr MojoDecoderBufferWriter::WriteDecoderBuffer(
if (!producer_handle_.is_valid()) { if (!producer_handle_.is_valid()) {
DVLOG(1) DVLOG(1)
<< __func__ << __func__
<< ": Failed to write DecoderBuffer becuase the pipe is already closed"; << ": Failed to write DecoderBuffer because the pipe is already closed";
return nullptr; return nullptr;
} }
...@@ -284,7 +308,12 @@ mojom::DecoderBufferPtr MojoDecoderBufferWriter::WriteDecoderBuffer( ...@@ -284,7 +308,12 @@ mojom::DecoderBufferPtr MojoDecoderBufferWriter::WriteDecoderBuffer(
// Queue writing the buffer's data into our DataPipe. // Queue writing the buffer's data into our DataPipe.
pending_buffers_.push_back(media_buffer); pending_buffers_.push_back(media_buffer);
ScheduleNextWrite();
// Do nothing if a write is already scheduled. Otherwise, to reduce latency,
// always try to write data to the pipe first.
if (!armed_)
ProcessPendingWrites();
return mojo_buffer; return mojo_buffer;
} }
...@@ -293,6 +322,10 @@ void MojoDecoderBufferWriter::OnPipeWritable( ...@@ -293,6 +322,10 @@ void MojoDecoderBufferWriter::OnPipeWritable(
const mojo::HandleSignalsState& state) { const mojo::HandleSignalsState& state) {
DVLOG(4) << __func__ << "(" << result << ", " << state.writable() << ")"; DVLOG(4) << __func__ << "(" << result << ", " << state.writable() << ")";
// |MOJO_RESULT_CANCELLED| may be dispatched even while the SimpleWatcher
// is disarmed, and no further notifications will be dispatched after that.
DCHECK(armed_ || result == MOJO_RESULT_CANCELLED);
armed_ = false; armed_ = false;
if (result != MOJO_RESULT_OK) { if (result != MOJO_RESULT_OK) {
...@@ -301,35 +334,47 @@ void MojoDecoderBufferWriter::OnPipeWritable( ...@@ -301,35 +334,47 @@ void MojoDecoderBufferWriter::OnPipeWritable(
} }
DCHECK(state.writable()); DCHECK(state.writable());
WriteDecoderBufferData(); ProcessPendingWrites();
} }
void MojoDecoderBufferWriter::WriteDecoderBufferData() { void MojoDecoderBufferWriter::ProcessPendingWrites() {
DVLOG(4) << __func__; DVLOG(4) << __func__;
DCHECK(!armed_);
DCHECK(!pending_buffers_.empty()); DCHECK(!pending_buffers_.empty());
DecoderBuffer* buffer = pending_buffers_.front().get();
uint32_t buffer_size = base::checked_cast<uint32_t>(buffer->data_size());
DCHECK_GT(buffer_size, 0u);
uint32_t num_bytes = buffer_size - bytes_written_; while (!pending_buffers_.empty()) {
DCHECK_GT(num_bytes, 0u); DecoderBuffer* buffer = pending_buffers_.front().get();
MojoResult result = producer_handle_->WriteData( uint32_t buffer_size = base::checked_cast<uint32_t>(buffer->data_size());
buffer->data() + bytes_written_, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); DCHECK_GT(buffer_size, 0u) << "Unexpected EOS or empty buffer";
if (IsPipeReadWriteError(result)) { // We may be starting to write a new buffer (|bytes_written_| == 0), or
OnPipeError(result); // recovering from a previous partial write (|bytes_written_| > 0).
} else { uint32_t num_bytes = buffer_size - bytes_written_;
if (result == MOJO_RESULT_OK) { DCHECK_GT(num_bytes, 0u);
DCHECK_GT(num_bytes, 0u);
bytes_written_ += num_bytes; MojoResult result = producer_handle_->WriteData(
if (bytes_written_ == buffer_size) { buffer->data() + bytes_written_, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
pending_buffers_.pop_front();
bytes_written_ = 0; if (IsPipeReadWriteError(result)) {
} OnPipeError(result);
return;
}
if (result == MOJO_RESULT_SHOULD_WAIT) {
ScheduleNextWrite();
return;
} }
ScheduleNextWrite();
DCHECK_EQ(MOJO_RESULT_OK, result);
DCHECK_GT(num_bytes, 0u);
bytes_written_ += num_bytes;
if (bytes_written_ == buffer_size) {
pending_buffers_.pop_front();
bytes_written_ = 0;
}
// Since we can still write, try to write more.
} }
} }
......
...@@ -51,10 +51,11 @@ class MojoDecoderBufferReader { ...@@ -51,10 +51,11 @@ class MojoDecoderBufferReader {
private: private:
void CancelReadCB(ReadCB read_cb); void CancelReadCB(ReadCB read_cb);
void CancelAllPendingReadCBs();
void CompleteCurrentRead(); void CompleteCurrentRead();
void ScheduleNextRead(); void ScheduleNextRead();
void OnPipeReadable(MojoResult result, const mojo::HandleSignalsState& state); void OnPipeReadable(MojoResult result, const mojo::HandleSignalsState& state);
void ReadDecoderBufferData(); void ProcessPendingReads();
void OnPipeError(MojoResult result); void OnPipeError(MojoResult result);
// Read side of the DataPipe for receiving DecoderBuffer data. // Read side of the DataPipe for receiving DecoderBuffer data.
...@@ -109,7 +110,7 @@ class MojoDecoderBufferWriter { ...@@ -109,7 +110,7 @@ class MojoDecoderBufferWriter {
private: private:
void ScheduleNextWrite(); void ScheduleNextWrite();
void OnPipeWritable(MojoResult result, const mojo::HandleSignalsState& state); void OnPipeWritable(MojoResult result, const mojo::HandleSignalsState& state);
void WriteDecoderBufferData(); void ProcessPendingWrites();
void OnPipeError(MojoResult result); void OnPipeError(MojoResult result);
// Write side of the DataPipe for sending DecoderBuffer data. // Write side of the DataPipe for sending DecoderBuffer data.
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
using ::testing::_; using ::testing::_;
using ::testing::Invoke; using ::testing::Invoke;
using ::testing::InSequence;
using ::testing::Mock; using ::testing::Mock;
using ::testing::Return; using ::testing::Return;
using ::testing::SaveArg; using ::testing::SaveArg;
...@@ -44,6 +45,8 @@ namespace media { ...@@ -44,6 +45,8 @@ namespace media {
namespace { namespace {
const int kMaxDecodeRequests = 4;
// A mock VideoDecoder with helpful default functionality. // A mock VideoDecoder with helpful default functionality.
// TODO(sandersd): Determine how best to merge this with MockVideoDecoder // TODO(sandersd): Determine how best to merge this with MockVideoDecoder
// declared in mock_filters.h. // declared in mock_filters.h.
...@@ -56,13 +59,15 @@ class MockVideoDecoder : public VideoDecoder { ...@@ -56,13 +59,15 @@ class MockVideoDecoder : public VideoDecoder {
EXPECT_CALL(*this, NeedsBitstreamConversion()) EXPECT_CALL(*this, NeedsBitstreamConversion())
.WillRepeatedly(Return(false)); .WillRepeatedly(Return(false));
EXPECT_CALL(*this, CanReadWithoutStalling()).WillRepeatedly(Return(true)); EXPECT_CALL(*this, CanReadWithoutStalling()).WillRepeatedly(Return(true));
EXPECT_CALL(*this, GetMaxDecodeRequests()).WillRepeatedly(Return(1)); EXPECT_CALL(*this, GetMaxDecodeRequests())
.WillRepeatedly(Return(kMaxDecodeRequests));
// For regular methods, only configure a default action. // For regular methods, only configure a default action.
ON_CALL(*this, DoInitialize(_)).WillByDefault(RunCallback<0>(true)); ON_CALL(*this, DoInitialize(_)).WillByDefault(RunCallback<0>(true));
ON_CALL(*this, Decode(_, _)) ON_CALL(*this, Decode(_, _))
.WillByDefault(Invoke(this, &MockVideoDecoder::DoDecode)); .WillByDefault(Invoke(this, &MockVideoDecoder::DoDecode));
ON_CALL(*this, Reset(_)).WillByDefault(RunCallback<0>()); ON_CALL(*this, Reset(_))
.WillByDefault(Invoke(this, &MockVideoDecoder::DoReset));
} }
// Re-declare as public. // Re-declare as public.
...@@ -111,6 +116,11 @@ class MockVideoDecoder : public VideoDecoder { ...@@ -111,6 +116,11 @@ class MockVideoDecoder : public VideoDecoder {
FROM_HERE, base::Bind(decode_cb, DecodeStatus::OK)); FROM_HERE, base::Bind(decode_cb, DecodeStatus::OK));
} }
void DoReset(const base::Closure& reset_cb) {
// |reset_cb| must not be called from the same stack.
base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, reset_cb);
}
private: private:
// Destructing a std::unique_ptr<VideoDecoder>(this) is a no-op. // Destructing a std::unique_ptr<VideoDecoder>(this) is a no-op.
// TODO(sandersd): After this, any method call is an error. Implement checks // TODO(sandersd): After this, any method call is an error. Implement checks
...@@ -266,7 +276,7 @@ TEST_F(MojoVideoDecoderIntegrationTest, Initialize) { ...@@ -266,7 +276,7 @@ TEST_F(MojoVideoDecoderIntegrationTest, Initialize) {
EXPECT_EQ(client_->GetDisplayName(), "MojoVideoDecoder"); EXPECT_EQ(client_->GetDisplayName(), "MojoVideoDecoder");
EXPECT_EQ(client_->NeedsBitstreamConversion(), false); EXPECT_EQ(client_->NeedsBitstreamConversion(), false);
EXPECT_EQ(client_->CanReadWithoutStalling(), true); EXPECT_EQ(client_->CanReadWithoutStalling(), true);
EXPECT_EQ(client_->GetMaxDecodeRequests(), 1); EXPECT_EQ(client_->GetMaxDecodeRequests(), kMaxDecodeRequests);
} }
TEST_F(MojoVideoDecoderIntegrationTest, Decode) { TEST_F(MojoVideoDecoderIntegrationTest, Decode) {
...@@ -312,4 +322,31 @@ TEST_F(MojoVideoDecoderIntegrationTest, ReleaseAfterShutdown) { ...@@ -312,4 +322,31 @@ TEST_F(MojoVideoDecoderIntegrationTest, ReleaseAfterShutdown) {
RunUntilIdle(); RunUntilIdle();
} }
TEST_F(MojoVideoDecoderIntegrationTest, ResetDuringDecode) {
ASSERT_TRUE(Initialize());
VideoFrame::ReleaseMailboxCB release_cb = VideoFrame::ReleaseMailboxCB();
StrictMock<base::MockCallback<VideoDecoder::DecodeCB>> decode_cb;
StrictMock<base::MockCallback<base::Closure>> reset_cb;
EXPECT_CALL(*decoder_, GetReleaseMailboxCB())
.WillRepeatedly(Return(release_cb));
EXPECT_CALL(output_cb_, Run(_)).Times(kMaxDecodeRequests);
EXPECT_CALL(*decoder_, Decode(_, _)).Times(kMaxDecodeRequests);
EXPECT_CALL(*decoder_, Reset(_)).Times(1);
InSequence s; // Make sure all callbacks are fired in order.
EXPECT_CALL(decode_cb, Run(_)).Times(kMaxDecodeRequests);
EXPECT_CALL(reset_cb, Run());
int64_t timestamp_ms = 0;
for (int j = 0; j < kMaxDecodeRequests; ++j) {
client_->Decode(CreateKeyframe(timestamp_ms++), decode_cb.Get());
}
client_->Reset(reset_cb.Get());
RunUntilIdle();
}
} // namespace media } // namespace media
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment