Commit a67ec70f authored by Dan Sanders's avatar Dan Sanders Committed by Commit Bot

[media] Support concurrent buffers in MojoDecoderBufferConverter.

This adds a queue to the write and read sides of
MojoDecoderBufferConverter, allowing for multiple concurrent buffers to
be processed.

Writes are no longer guaranteed to fail as soon as the pipe is closed,
since no write is attempted immediately. Since there is no error
reporting mechanism on the write side, the only guarantee is that
*eventually* writes will fail. Since MojoDecoderBufferConverter is not
a primary communication channel, this isn't likely to cause
complications.

Bug: 751926
Change-Id: I11c985d8aba3acc8fba6453f5bcf22555334b5b0
Reviewed-on: https://chromium-review.googlesource.com/679735
Commit-Queue: Dan Sanders <sandersd@chromium.org>
Reviewed-by: default avatarKen Rockot <rockot@chromium.org>
Reviewed-by: default avatarXiaohan Wang <xhwang@chromium.org>
Cr-Commit-Position: refs/heads/master@{#505420}
parent 84e6c2d6
......@@ -123,12 +123,9 @@ TEST_F(MojoDecryptorTest, VideoDecodeFreesBuffer) {
TEST_F(MojoDecryptorTest, VideoDecodeFreesMultipleBuffers) {
// Call DecryptAndDecodeVideo() multiple times.
const int TIMES = 5;
{
InSequence seq;
EXPECT_CALL(*this, VideoDecoded(Decryptor::Status::kSuccess, NotNull()))
.Times(TIMES);
EXPECT_CALL(*this, OnFrameDestroyed()).Times(TIMES);
}
EXPECT_CALL(*this, VideoDecoded(Decryptor::Status::kSuccess, NotNull()))
.Times(TIMES);
EXPECT_CALL(*this, OnFrameDestroyed()).Times(TIMES);
EXPECT_CALL(*decryptor_, DecryptAndDecodeVideo(_, _))
.WillRepeatedly(Invoke(this, &MojoDecryptorTest::ReturnSimpleVideoFrame));
......
......@@ -7,6 +7,7 @@
#include <memory>
#include "base/containers/circular_deque.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "media/base/demuxer_stream.h"
......@@ -18,8 +19,8 @@ namespace media {
class DecoderBuffer;
// A helper class that converts mojom::DecoderBuffer to media::DecoderBuffer.
// The data part of the DecoderBuffer is read from a DataPipe.
// Combines mojom::DecoderBuffers with data read from a DataPipe to produce
// media::DecoderBuffers (counterpart of MojoDecoderBufferWriter).
class MojoDecoderBufferReader {
public:
using ReadCB = base::OnceCallback<void(scoped_refptr<DecoderBuffer>)>;
......@@ -35,30 +36,56 @@ class MojoDecoderBufferReader {
~MojoDecoderBufferReader();
// Converts |buffer| into a DecoderBuffer (read data from DataPipe if needed).
// |read_cb| is called with the result DecoderBuffer.
// Reports a null DecoderBuffer in case of an error.
// Enqueues conversion of and reading data for a mojom::DecoderBuffer. Once
// the data has been read, |read_cb| will be called with the converted
// media::DecoderBuffer.
//
// |read_cb| will be called in the same order as ReadDecoderBuffer(). This
// order must match the order that the data was written into the DataPipe!
// Callbacks may run on the original stack, on a Mojo stack, or on a future
// ReadDecoderBuffer() stack.
//
// If reading fails (for example, if the DataPipe is closed), |read_cb| will
// be called with nullptr.
void ReadDecoderBuffer(mojom::DecoderBufferPtr buffer, ReadCB read_cb);
private:
void OnPipeError(MojoResult result);
void OnPipeReadable(MojoResult result);
void CancelReadCB(ReadCB read_cb);
void CompleteCurrentRead();
void ScheduleNextRead();
void OnPipeReadable(MojoResult result, const mojo::HandleSignalsState& state);
void ReadDecoderBufferData();
void OnPipeError(MojoResult result);
// For reading the data section of a DecoderBuffer.
// Read side of the DataPipe for receiving DecoderBuffer data.
mojo::ScopedDataPipeConsumerHandle consumer_handle_;
// Provides notification about |consumer_handle_| readiness.
mojo::SimpleWatcher pipe_watcher_;
bool armed_;
// Buffers waiting to be read in sequence.
base::circular_deque<scoped_refptr<DecoderBuffer>> pending_buffers_;
// Only valid during pending read.
ReadCB read_cb_;
scoped_refptr<DecoderBuffer> media_buffer_;
// Callbacks for pending buffers.
base::circular_deque<ReadCB> pending_read_cbs_;
// Number of bytes already read into the current buffer.
uint32_t bytes_read_;
DISALLOW_COPY_AND_ASSIGN(MojoDecoderBufferReader);
};
// A helper class that converts media::DecoderBuffer to mojom::DecoderBuffer.
// The data part of the DecoderBuffer is written into a DataPipe.
// Converts media::DecoderBuffers to mojom::DecoderBuffers, writing the data
// part to a DataPipe (counterpart of MojoDecoderBufferReader).
//
// If necessary, writes to the DataPipe will be chunked to fit.
// MojoDecoderBufferWriter maintains an internal queue of buffers to enable
// this asynchronous process.
//
// On DataPipe closure, future calls to WriteDecoderBuffer() will return
// nullptr. There is no mechanism to determine which past writes were
// successful prior to the closure.
class MojoDecoderBufferWriter {
public:
// Creates a MojoDecoderBufferWriter of |type| and set the |consumer_handle|.
......@@ -72,23 +99,30 @@ class MojoDecoderBufferWriter {
~MojoDecoderBufferWriter();
// Converts a DecoderBuffer into mojo DecoderBuffer.
// DecoderBuffer data is asynchronously written into DataPipe if needed.
// Returns null if conversion failed or if the data pipe is already closed.
// Converts a media::DecoderBuffer to a mojom::DecoderBuffer and enqueues the
// data to be written to the DataPipe.
//
// Returns nullptr if the DataPipe is already closed.
mojom::DecoderBufferPtr WriteDecoderBuffer(
const scoped_refptr<DecoderBuffer>& media_buffer);
private:
void ScheduleNextWrite();
void OnPipeWritable(MojoResult result, const mojo::HandleSignalsState& state);
void WriteDecoderBufferData();
void OnPipeError(MojoResult result);
void OnPipeWritable(MojoResult result);
MojoResult WriteDecoderBufferData();
// For writing the data section of DecoderBuffer into DataPipe.
// Write side of the DataPipe for sending DecoderBuffer data.
mojo::ScopedDataPipeProducerHandle producer_handle_;
// Provides notifications about |producer_handle_| readiness.
mojo::SimpleWatcher pipe_watcher_;
bool armed_;
// Buffers waiting to be written in sequence.
base::circular_deque<scoped_refptr<DecoderBuffer>> pending_buffers_;
// Only valid when data is being written to the pipe.
scoped_refptr<DecoderBuffer> media_buffer_;
// Number of bytes already written from the current buffer.
uint32_t bytes_written_;
DISALLOW_COPY_AND_ASSIGN(MojoDecoderBufferWriter);
......
......@@ -153,23 +153,6 @@ TEST(MojoDecoderBufferConverterTest, Chunked) {
converter.ConvertAndVerify(buffer);
}
// This test verifies that MojoDecoderBufferWriter returns NULL if data pipe
// is already closed.
TEST(MojoDecoderBufferConverterTest, ReaderSidePipeError) {
base::MessageLoop message_loop;
const uint8_t kData[] = "Hello, world";
const size_t kDataSize = arraysize(kData);
scoped_refptr<DecoderBuffer> media_buffer =
DecoderBuffer::CopyFrom(kData, kDataSize);
MojoDecoderBufferConverter converter;
// Before sending the buffer, close the handle on reader side.
converter.reader.reset();
mojom::DecoderBufferPtr mojo_buffer =
converter.writer->WriteDecoderBuffer(media_buffer);
EXPECT_TRUE(mojo_buffer.is_null());
}
// This test verifies that MojoDecoderBufferReader::ReadCB is called with a
// NULL DecoderBuffer if data pipe is closed during transmission.
TEST(MojoDecoderBufferConverterTest, WriterSidePipeError) {
......@@ -198,4 +181,52 @@ TEST(MojoDecoderBufferConverterTest, WriterSidePipeError) {
run_loop.Run();
}
// This test verifies that MojoDecoderBuffer supports concurrent writes and
// reads.
TEST(MojoDecoderBufferConverterTest, ConcurrentDecoderBuffers) {
base::MessageLoop message_loop;
base::RunLoop run_loop;
// Prevent all of the buffers from fitting at once to exercise the chunking
// logic.
MojoDecoderBufferConverter converter(4);
// Three buffers: normal, EOS, normal.
const uint8_t kData[] = "Hello, world";
const size_t kDataSize = arraysize(kData);
scoped_refptr<DecoderBuffer> media_buffer1 =
DecoderBuffer::CopyFrom(kData, kDataSize);
scoped_refptr<DecoderBuffer> media_buffer2(DecoderBuffer::CreateEOSBuffer());
scoped_refptr<DecoderBuffer> media_buffer3 =
DecoderBuffer::CopyFrom(kData, kDataSize);
// Expect the read callbacks to be issued in the same order.
::testing::InSequence scoper;
base::MockCallback<MojoDecoderBufferReader::ReadCB> mock_cb1;
base::MockCallback<MojoDecoderBufferReader::ReadCB> mock_cb2;
base::MockCallback<MojoDecoderBufferReader::ReadCB> mock_cb3;
EXPECT_CALL(mock_cb1, Run(MatchesDecoderBuffer(media_buffer1)));
EXPECT_CALL(mock_cb2, Run(MatchesDecoderBuffer(media_buffer2)));
EXPECT_CALL(mock_cb3, Run(MatchesDecoderBuffer(media_buffer3)))
.WillOnce(testing::InvokeWithoutArgs(&run_loop, &base::RunLoop::Quit));
// Write all of the buffers at once.
mojom::DecoderBufferPtr mojo_buffer1 =
converter.writer->WriteDecoderBuffer(media_buffer1);
mojom::DecoderBufferPtr mojo_buffer2 =
converter.writer->WriteDecoderBuffer(media_buffer2);
mojom::DecoderBufferPtr mojo_buffer3 =
converter.writer->WriteDecoderBuffer(media_buffer3);
// Read all of the buffers at once.
// Technically could be satisfied by ReadDecoderBuffer() blocking, but that's
// actually a valid implementation. (Quitting the |run_loop| won't work
// properly with that setup though.)
converter.reader->ReadDecoderBuffer(std::move(mojo_buffer1), mock_cb1.Get());
converter.reader->ReadDecoderBuffer(std::move(mojo_buffer2), mock_cb2.Get());
converter.reader->ReadDecoderBuffer(std::move(mojo_buffer3), mock_cb3.Get());
run_loop.Run();
}
} // namespace media
......@@ -159,8 +159,8 @@ class MOJO_CPP_SYSTEM_EXPORT SimpleWatcher {
// state of the handle is placed in |*ready_state| if |ready_state| is
// non-null.
//
// If the watcher is successfully armed, this returns |MOJO_RESULT_OK| and
// |ready_result| and |ready_state| are ignored.
// If the watcher is successfully armed (or was already armed), this returns
// |MOJO_RESULT_OK| and |ready_result| and |ready_state| are ignored.
MojoResult Arm(MojoResult* ready_result = nullptr,
HandleSignalsState* ready_state = nullptr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment