Commit 385df099 authored by damienv's avatar damienv Committed by Commit bot

Fix Flush in DemuxerStreamAdapter.

When the DemuxerStream has a pending read, this one
must be completed before invoking the flush callback.

BUG=408189

Review URL: https://codereview.chromium.org/565973003

Cr-Commit-Position: refs/heads/master@{#294669}
parent c121d0e5
...@@ -67,9 +67,10 @@ DemuxerStreamAdapter::DemuxerStreamAdapter( ...@@ -67,9 +67,10 @@ DemuxerStreamAdapter::DemuxerStreamAdapter(
media_task_runner_(new DummyMediaTaskRunner(task_runner)), media_task_runner_(new DummyMediaTaskRunner(task_runner)),
demuxer_stream_(demuxer_stream), demuxer_stream_(demuxer_stream),
is_pending_read_(false), is_pending_read_(false),
weak_factory_(new base::WeakPtrFactory<DemuxerStreamAdapter>(this)) { is_pending_demuxer_read_(false),
weak_factory_(this),
weak_this_(weak_factory_.GetWeakPtr()) {
ResetMediaTaskRunner(); ResetMediaTaskRunner();
weak_this_ = weak_factory_->GetWeakPtr();
thread_checker_.DetachFromThread(); thread_checker_.DetachFromThread();
} }
...@@ -82,10 +83,15 @@ DemuxerStreamAdapter::~DemuxerStreamAdapter() { ...@@ -82,10 +83,15 @@ DemuxerStreamAdapter::~DemuxerStreamAdapter() {
void DemuxerStreamAdapter::Read(const ReadCB& read_cb) { void DemuxerStreamAdapter::Read(const ReadCB& read_cb) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(flush_cb_.is_null());
// Support only one read at a time. // Support only one read at a time.
DCHECK(!is_pending_read_); DCHECK(!is_pending_read_);
is_pending_read_ = true; is_pending_read_ = true;
ReadInternal(read_cb);
}
void DemuxerStreamAdapter::ReadInternal(const ReadCB& read_cb) {
bool may_run_in_future = media_task_runner_->PostMediaTask( bool may_run_in_future = media_task_runner_->PostMediaTask(
FROM_HERE, FROM_HERE,
base::Bind(&DemuxerStreamAdapter::RequestBuffer, weak_this_, read_cb), base::Bind(&DemuxerStreamAdapter::RequestBuffer, weak_this_, read_cb),
...@@ -97,21 +103,31 @@ void DemuxerStreamAdapter::Flush(const base::Closure& flush_cb) { ...@@ -97,21 +103,31 @@ void DemuxerStreamAdapter::Flush(const base::Closure& flush_cb) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
CMALOG(kLogControl) << __FUNCTION__; CMALOG(kLogControl) << __FUNCTION__;
// Invalidate all the weak pointers so that we don't receive anymore buffers // Flush cancels any pending read.
// from |demuxer_stream_| that are associated with the current media
// timeline.
is_pending_read_ = false; is_pending_read_ = false;
weak_factory_->InvalidateWeakPtrs();
weak_factory_.reset(new base::WeakPtrFactory<DemuxerStreamAdapter>(this));
weak_this_ = weak_factory_->GetWeakPtr();
// Reset the decoder configurations. // Reset the decoder configurations.
audio_config_ = ::media::AudioDecoderConfig(); audio_config_ = ::media::AudioDecoderConfig();
video_config_ = ::media::VideoDecoderConfig(); video_config_ = ::media::VideoDecoderConfig();
// Create a new media task runner for the upcoming media playback. // Create a new media task runner for the upcoming media timeline.
ResetMediaTaskRunner(); ResetMediaTaskRunner();
DCHECK(flush_cb_.is_null());
if (is_pending_demuxer_read_) {
// If there is a pending demuxer read, the implicit contract
// is that the pending read must be completed before invoking the
// flush callback.
flush_cb_ = flush_cb;
return;
}
// At this point, there is no more pending demuxer read,
// so all the previous tasks associated with the current timeline
// can be cancelled.
weak_factory_.InvalidateWeakPtrs();
weak_this_ = weak_factory_.GetWeakPtr();
CMALOG(kLogControl) << "Flush done"; CMALOG(kLogControl) << "Flush done";
flush_cb.Run(); flush_cb.Run();
} }
...@@ -128,6 +144,7 @@ void DemuxerStreamAdapter::ResetMediaTaskRunner() { ...@@ -128,6 +144,7 @@ void DemuxerStreamAdapter::ResetMediaTaskRunner() {
void DemuxerStreamAdapter::RequestBuffer(const ReadCB& read_cb) { void DemuxerStreamAdapter::RequestBuffer(const ReadCB& read_cb) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
is_pending_demuxer_read_ = true;
demuxer_stream_->Read(::media::BindToCurrentLoop( demuxer_stream_->Read(::media::BindToCurrentLoop(
base::Bind(&DemuxerStreamAdapter::OnNewBuffer, weak_this_, read_cb))); base::Bind(&DemuxerStreamAdapter::OnNewBuffer, weak_this_, read_cb)));
} }
...@@ -138,7 +155,14 @@ void DemuxerStreamAdapter::OnNewBuffer( ...@@ -138,7 +155,14 @@ void DemuxerStreamAdapter::OnNewBuffer(
const scoped_refptr< ::media::DecoderBuffer>& input) { const scoped_refptr< ::media::DecoderBuffer>& input) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
is_pending_read_ = false; is_pending_demuxer_read_ = false;
// Just discard the buffer in the flush stage.
if (!flush_cb_.is_null()) {
CMALOG(kLogControl) << "Flush done";
base::ResetAndReturn(&flush_cb_).Run();
return;
}
if (status == ::media::DemuxerStream::kAborted) { if (status == ::media::DemuxerStream::kAborted) {
DCHECK(input.get() == NULL); DCHECK(input.get() == NULL);
...@@ -153,7 +177,7 @@ void DemuxerStreamAdapter::OnNewBuffer( ...@@ -153,7 +177,7 @@ void DemuxerStreamAdapter::OnNewBuffer(
audio_config_ = demuxer_stream_->audio_decoder_config(); audio_config_ = demuxer_stream_->audio_decoder_config();
// Got a new config, but we still need to get a frame. // Got a new config, but we still need to get a frame.
Read(read_cb); ReadInternal(read_cb);
return; return;
} }
...@@ -167,6 +191,7 @@ void DemuxerStreamAdapter::OnNewBuffer( ...@@ -167,6 +191,7 @@ void DemuxerStreamAdapter::OnNewBuffer(
} }
// Provides the buffer as well as possibly valid audio and video configs. // Provides the buffer as well as possibly valid audio and video configs.
is_pending_read_ = false;
scoped_refptr<DecoderBufferBase> buffer(new DecoderBufferAdapter(input)); scoped_refptr<DecoderBufferBase> buffer(new DecoderBufferAdapter(input));
read_cb.Run(buffer, audio_config_, video_config_); read_cb.Run(buffer, audio_config_, video_config_);
......
...@@ -45,6 +45,7 @@ class DemuxerStreamAdapter : public CodedFrameProvider { ...@@ -45,6 +45,7 @@ class DemuxerStreamAdapter : public CodedFrameProvider {
private: private:
void ResetMediaTaskRunner(); void ResetMediaTaskRunner();
void ReadInternal(const ReadCB& read_cb);
void RequestBuffer(const ReadCB& read_cb); void RequestBuffer(const ReadCB& read_cb);
// Callback invoked from the demuxer stream to signal a buffer is ready. // Callback invoked from the demuxer stream to signal a buffer is ready.
...@@ -66,14 +67,21 @@ class DemuxerStreamAdapter : public CodedFrameProvider { ...@@ -66,14 +67,21 @@ class DemuxerStreamAdapter : public CodedFrameProvider {
// Frames are provided by |demuxer_stream_|. // Frames are provided by |demuxer_stream_|.
::media::DemuxerStream* const demuxer_stream_; ::media::DemuxerStream* const demuxer_stream_;
// Indicate if there is a pending read on the demuxer. // Indicate if there is a pending read.
bool is_pending_read_; bool is_pending_read_;
// Indicate if |demuxer_stream_| has a pending read.
bool is_pending_demuxer_read_;
// In case of a pending flush operation, this is the callback
// that is invoked when flush is completed.
base::Closure flush_cb_;
// Audio/video configuration that applies to the next frame. // Audio/video configuration that applies to the next frame.
::media::AudioDecoderConfig audio_config_; ::media::AudioDecoderConfig audio_config_;
::media::VideoDecoderConfig video_config_; ::media::VideoDecoderConfig video_config_;
scoped_ptr<base::WeakPtrFactory<DemuxerStreamAdapter> > weak_factory_; base::WeakPtrFactory<DemuxerStreamAdapter> weak_factory_;
base::WeakPtr<DemuxerStreamAdapter> weak_this_; base::WeakPtr<DemuxerStreamAdapter> weak_this_;
DISALLOW_COPY_AND_ASSIGN(DemuxerStreamAdapter); DISALLOW_COPY_AND_ASSIGN(DemuxerStreamAdapter);
......
...@@ -51,6 +51,10 @@ class DummyDemuxerStream : public ::media::DemuxerStream { ...@@ -51,6 +51,10 @@ class DummyDemuxerStream : public ::media::DemuxerStream {
virtual bool SupportsConfigChanges() OVERRIDE; virtual bool SupportsConfigChanges() OVERRIDE;
virtual ::media::VideoRotation video_rotation() OVERRIDE; virtual ::media::VideoRotation video_rotation() OVERRIDE;
bool has_pending_read() const {
return has_pending_read_;
}
private: private:
void DoRead(const ReadCB& read_cb); void DoRead(const ReadCB& read_cb);
...@@ -62,6 +66,8 @@ class DummyDemuxerStream : public ::media::DemuxerStream { ...@@ -62,6 +66,8 @@ class DummyDemuxerStream : public ::media::DemuxerStream {
// Number of frames sent so far. // Number of frames sent so far.
int frame_count_; int frame_count_;
bool has_pending_read_;
DISALLOW_COPY_AND_ASSIGN(DummyDemuxerStream); DISALLOW_COPY_AND_ASSIGN(DummyDemuxerStream);
}; };
...@@ -71,8 +77,9 @@ DummyDemuxerStream::DummyDemuxerStream( ...@@ -71,8 +77,9 @@ DummyDemuxerStream::DummyDemuxerStream(
const std::list<int>& config_idx) const std::list<int>& config_idx)
: cycle_count_(cycle_count), : cycle_count_(cycle_count),
delayed_frame_count_(delayed_frame_count), delayed_frame_count_(delayed_frame_count),
config_idx_(config_idx),
frame_count_(0), frame_count_(0),
config_idx_(config_idx) { has_pending_read_(false) {
DCHECK_LE(delayed_frame_count, cycle_count); DCHECK_LE(delayed_frame_count, cycle_count);
} }
...@@ -80,8 +87,10 @@ DummyDemuxerStream::~DummyDemuxerStream() { ...@@ -80,8 +87,10 @@ DummyDemuxerStream::~DummyDemuxerStream() {
} }
void DummyDemuxerStream::Read(const ReadCB& read_cb) { void DummyDemuxerStream::Read(const ReadCB& read_cb) {
has_pending_read_ = true;
if (!config_idx_.empty() && config_idx_.front() == frame_count_) { if (!config_idx_.empty() && config_idx_.front() == frame_count_) {
config_idx_.pop_front(); config_idx_.pop_front();
has_pending_read_ = false;
read_cb.Run(kConfigChanged, read_cb.Run(kConfigChanged,
scoped_refptr< ::media::DecoderBuffer>()); scoped_refptr< ::media::DecoderBuffer>());
return; return;
...@@ -131,6 +140,7 @@ bool DummyDemuxerStream::SupportsConfigChanges() { ...@@ -131,6 +140,7 @@ bool DummyDemuxerStream::SupportsConfigChanges() {
} }
void DummyDemuxerStream::DoRead(const ReadCB& read_cb) { void DummyDemuxerStream::DoRead(const ReadCB& read_cb) {
has_pending_read_ = false;
scoped_refptr< ::media::DecoderBuffer> buffer( scoped_refptr< ::media::DecoderBuffer> buffer(
new ::media::DecoderBuffer(16)); new ::media::DecoderBuffer(16));
buffer->set_timestamp(frame_count_ * base::TimeDelta::FromMilliseconds(40)); buffer->set_timestamp(frame_count_ * base::TimeDelta::FromMilliseconds(40));
...@@ -160,6 +170,7 @@ class DemuxerStreamAdapterTest : public testing::Test { ...@@ -160,6 +170,7 @@ class DemuxerStreamAdapterTest : public testing::Test {
// Number of demuxer read before issuing an early flush. // Number of demuxer read before issuing an early flush.
int early_flush_idx_; int early_flush_idx_;
bool use_post_task_for_flush_;
// Number of expected read frames. // Number of expected read frames.
int total_expected_frames_; int total_expected_frames_;
...@@ -170,12 +181,15 @@ class DemuxerStreamAdapterTest : public testing::Test { ...@@ -170,12 +181,15 @@ class DemuxerStreamAdapterTest : public testing::Test {
// List of expected frame indices with decoder config changes. // List of expected frame indices with decoder config changes.
std::list<int> config_idx_; std::list<int> config_idx_;
scoped_ptr<DummyDemuxerStream> demuxer_stream_;
scoped_ptr<CodedFrameProvider> coded_frame_provider_; scoped_ptr<CodedFrameProvider> coded_frame_provider_;
DISALLOW_COPY_AND_ASSIGN(DemuxerStreamAdapterTest); DISALLOW_COPY_AND_ASSIGN(DemuxerStreamAdapterTest);
}; };
DemuxerStreamAdapterTest::DemuxerStreamAdapterTest() { DemuxerStreamAdapterTest::DemuxerStreamAdapterTest()
: use_post_task_for_flush_(false) {
} }
DemuxerStreamAdapterTest::~DemuxerStreamAdapterTest() { DemuxerStreamAdapterTest::~DemuxerStreamAdapterTest() {
...@@ -241,15 +255,25 @@ void DemuxerStreamAdapterTest::OnNewFrame( ...@@ -241,15 +255,25 @@ void DemuxerStreamAdapterTest::OnNewFrame(
ASSERT_LE(frame_received_count_, early_flush_idx_); ASSERT_LE(frame_received_count_, early_flush_idx_);
if (frame_received_count_ == early_flush_idx_) { if (frame_received_count_ == early_flush_idx_) {
coded_frame_provider_->Flush( base::Closure flush_cb =
base::Bind(&DemuxerStreamAdapterTest::OnFlushCompleted, base::Bind(&DemuxerStreamAdapterTest::OnFlushCompleted,
base::Unretained(this))); base::Unretained(this));
if (use_post_task_for_flush_) {
base::MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&CodedFrameProvider::Flush,
base::Unretained(coded_frame_provider_.get()),
flush_cb));
} else {
coded_frame_provider_->Flush(flush_cb);
}
return; return;
} }
} }
void DemuxerStreamAdapterTest::OnFlushCompleted() { void DemuxerStreamAdapterTest::OnFlushCompleted() {
ASSERT_EQ(frame_received_count_, total_expected_frames_); ASSERT_EQ(frame_received_count_, total_expected_frames_);
ASSERT_FALSE(demuxer_stream_->has_pending_read());
base::MessageLoop::current()->QuitWhenIdle(); base::MessageLoop::current()->QuitWhenIdle();
} }
...@@ -262,12 +286,12 @@ TEST_F(DemuxerStreamAdapterTest, NoDelay) { ...@@ -262,12 +286,12 @@ TEST_F(DemuxerStreamAdapterTest, NoDelay) {
int cycle_count = 1; int cycle_count = 1;
int delayed_frame_count = 0; int delayed_frame_count = 0;
scoped_ptr<DummyDemuxerStream> demuxer_stream( demuxer_stream_.reset(
new DummyDemuxerStream( new DummyDemuxerStream(
cycle_count, delayed_frame_count, config_idx_)); cycle_count, delayed_frame_count, config_idx_));
scoped_ptr<base::MessageLoop> message_loop(new base::MessageLoop()); scoped_ptr<base::MessageLoop> message_loop(new base::MessageLoop());
Initialize(demuxer_stream.get()); Initialize(demuxer_stream_.get());
message_loop->PostTask( message_loop->PostTask(
FROM_HERE, FROM_HERE,
base::Bind(&DemuxerStreamAdapterTest::Start, base::Unretained(this))); base::Bind(&DemuxerStreamAdapterTest::Start, base::Unretained(this)));
...@@ -283,12 +307,12 @@ TEST_F(DemuxerStreamAdapterTest, AllDelayed) { ...@@ -283,12 +307,12 @@ TEST_F(DemuxerStreamAdapterTest, AllDelayed) {
int cycle_count = 1; int cycle_count = 1;
int delayed_frame_count = 1; int delayed_frame_count = 1;
scoped_ptr<DummyDemuxerStream> demuxer_stream( demuxer_stream_.reset(
new DummyDemuxerStream( new DummyDemuxerStream(
cycle_count, delayed_frame_count, config_idx_)); cycle_count, delayed_frame_count, config_idx_));
scoped_ptr<base::MessageLoop> message_loop(new base::MessageLoop()); scoped_ptr<base::MessageLoop> message_loop(new base::MessageLoop());
Initialize(demuxer_stream.get()); Initialize(demuxer_stream_.get());
message_loop->PostTask( message_loop->PostTask(
FROM_HERE, FROM_HERE,
base::Bind(&DemuxerStreamAdapterTest::Start, base::Unretained(this))); base::Bind(&DemuxerStreamAdapterTest::Start, base::Unretained(this)));
...@@ -298,18 +322,19 @@ TEST_F(DemuxerStreamAdapterTest, AllDelayed) { ...@@ -298,18 +322,19 @@ TEST_F(DemuxerStreamAdapterTest, AllDelayed) {
TEST_F(DemuxerStreamAdapterTest, AllDelayedEarlyFlush) { TEST_F(DemuxerStreamAdapterTest, AllDelayedEarlyFlush) {
total_frames_ = 10; total_frames_ = 10;
early_flush_idx_ = 5; early_flush_idx_ = 5;
use_post_task_for_flush_ = true;
total_expected_frames_ = 5; total_expected_frames_ = 5;
config_idx_.push_back(0); config_idx_.push_back(0);
config_idx_.push_back(5); config_idx_.push_back(3);
int cycle_count = 1; int cycle_count = 1;
int delayed_frame_count = 1; int delayed_frame_count = 1;
scoped_ptr<DummyDemuxerStream> demuxer_stream( demuxer_stream_.reset(
new DummyDemuxerStream( new DummyDemuxerStream(
cycle_count, delayed_frame_count, config_idx_)); cycle_count, delayed_frame_count, config_idx_));
scoped_ptr<base::MessageLoop> message_loop(new base::MessageLoop()); scoped_ptr<base::MessageLoop> message_loop(new base::MessageLoop());
Initialize(demuxer_stream.get()); Initialize(demuxer_stream_.get());
message_loop->PostTask( message_loop->PostTask(
FROM_HERE, FROM_HERE,
base::Bind(&DemuxerStreamAdapterTest::Start, base::Unretained(this))); base::Bind(&DemuxerStreamAdapterTest::Start, base::Unretained(this)));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment