Commit 9044e32c authored by acolwell@chromium.org's avatar acolwell@chromium.org

Fix ChunkDemuxer seek and init callback dispatch.

This patch ensures that seek and init callbacks are always
dispatched from the message loop that initiated the Seek()
or Initialize() operation. This prevents reentrancy problems
and allows the code to jump through less locking hoops.

TESTS=All existing unittests and LayoutTests still pass.

Review URL: https://chromiumcodereview.appspot.com/17261029

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@207767 0039d316-1c4b-4281-b951-d872f2087c98
parent 970749f3
......@@ -13,6 +13,7 @@
#include "base/location.h"
#include "base/message_loop/message_loop_proxy.h"
#include "media/base/audio_decoder_config.h"
#include "media/base/bind_to_loop.h"
#include "media/base/stream_parser_buffer.h"
#include "media/base/video_decoder_config.h"
#include "media/filters/stream_parser_factory.h"
......@@ -207,7 +208,7 @@ class ChunkDemuxerStream : public DemuxerStream {
void StartWaitingForSeek();
void Seek(TimeDelta time);
void CancelPendingSeek();
bool IsSeekPending() const;
bool IsSeekWaitingForData() const;
// Add buffers to this stream. Buffers are stored in SourceBufferStreams,
// which handle ordering and overlap resolution.
......@@ -215,10 +216,10 @@ class ChunkDemuxerStream : public DemuxerStream {
bool Append(const StreamParser::BufferQueue& buffers);
// Signal to the stream that duration has changed to |duration|.
void OnSetDuration(base::TimeDelta duration);
void OnSetDuration(TimeDelta duration);
// Returns the range of buffered data in this stream, capped at |duration|.
Ranges<TimeDelta> GetBufferedRanges(base::TimeDelta duration) const;
Ranges<TimeDelta> GetBufferedRanges(TimeDelta duration) const;
// Signal to the stream that buffers handed in through subsequent calls to
// Append() belong to a media segment that starts at |start_timestamp|.
......@@ -334,7 +335,7 @@ void ChunkDemuxerStream::CancelPendingSeek() {
it->Run(kAborted, NULL);
}
bool ChunkDemuxerStream::IsSeekPending() const {
bool ChunkDemuxerStream::IsSeekWaitingForData() const {
base::AutoLock auto_lock(lock_);
return stream_->IsSeekPending();
}
......@@ -365,13 +366,13 @@ bool ChunkDemuxerStream::Append(const StreamParser::BufferQueue& buffers) {
return true;
}
void ChunkDemuxerStream::OnSetDuration(base::TimeDelta duration) {
void ChunkDemuxerStream::OnSetDuration(TimeDelta duration) {
base::AutoLock auto_lock(lock_);
stream_->OnSetDuration(duration);
}
Ranges<TimeDelta> ChunkDemuxerStream::GetBufferedRanges(
base::TimeDelta duration) const {
TimeDelta duration) const {
base::AutoLock auto_lock(lock_);
Ranges<TimeDelta> range = stream_->GetBufferedTime();
......@@ -578,16 +579,15 @@ void ChunkDemuxer::Initialize(DemuxerHost* host, const PipelineStatusCB& cb) {
base::AutoLock auto_lock(lock_);
init_cb_ = BindToCurrentLoop(cb);
if (state_ == SHUTDOWN) {
base::MessageLoopProxy::current()->PostTask(FROM_HERE, base::Bind(
cb, DEMUXER_ERROR_COULD_NOT_OPEN));
base::ResetAndReturn(&init_cb_).Run(DEMUXER_ERROR_COULD_NOT_OPEN);
return;
}
DCHECK_EQ(state_, WAITING_FOR_INIT);
host_ = host;
ChangeState_Locked(INITIALIZING);
init_cb_ = cb;
base::ResetAndReturn(&open_cb_).Run();
}
......@@ -604,27 +604,25 @@ void ChunkDemuxer::Seek(TimeDelta time, const PipelineStatusCB& cb) {
DCHECK(seek_cb_.is_null());
PipelineStatus status = PIPELINE_ERROR_INVALID_STATE;
{
base::AutoLock auto_lock(lock_);
if (state_ == INITIALIZED || state_ == ENDED) {
if (audio_)
audio_->Seek(time);
base::AutoLock auto_lock(lock_);
if (video_)
video_->Seek(time);
seek_cb_ = BindToCurrentLoop(cb);
if (state_ == INITIALIZED || state_ == ENDED) {
if (audio_)
audio_->Seek(time);
if (IsSeekPending_Locked()) {
DVLOG(1) << "Seek() : waiting for more data to arrive.";
seek_cb_ = cb;
return;
}
if (video_)
video_->Seek(time);
status = PIPELINE_OK;
if (IsSeekWaitingForData_Locked()) {
DVLOG(1) << "Seek() : waiting for more data to arrive.";
return;
}
status = PIPELINE_OK;
}
cb.Run(status);
base::ResetAndReturn(&seek_cb_).Run(status);
}
void ChunkDemuxer::OnAudioRendererDisabled() {
......@@ -665,21 +663,17 @@ void ChunkDemuxer::StartWaitingForSeek() {
}
void ChunkDemuxer::CancelPendingSeek() {
PipelineStatusCB cb;
{
base::AutoLock auto_lock(lock_);
if (IsSeekPending_Locked() && !seek_cb_.is_null()) {
std::swap(cb, seek_cb_);
}
if (audio_)
audio_->CancelPendingSeek();
base::AutoLock auto_lock(lock_);
DCHECK(seek_cb_.is_null() != IsSeekWaitingForData_Locked());
if (video_)
video_->CancelPendingSeek();
}
if (audio_)
audio_->CancelPendingSeek();
if (video_)
video_->CancelPendingSeek();
if (!cb.is_null())
cb.Run(PIPELINE_OK);
if (!seek_cb_.is_null())
base::ResetAndReturn(&seek_cb_).Run(PIPELINE_OK);
}
ChunkDemuxer::Status ChunkDemuxer::AddId(const std::string& id,
......@@ -815,12 +809,12 @@ void ChunkDemuxer::AppendData(const std::string& id,
Ranges<TimeDelta> ranges;
PipelineStatusCB cb;
{
base::AutoLock auto_lock(lock_);
// Capture if the SourceBuffer has a pending seek before we start parsing.
bool old_seek_pending = IsSeekPending_Locked();
// Capture if any of the SourceBuffers are waiting for data before we start
// parsing.
bool old_waiting_for_data = IsSeekWaitingForData_Locked();
if (state_ == ENDED) {
ChangeState_Locked(INITIALIZED);
......@@ -867,8 +861,9 @@ void ChunkDemuxer::AppendData(const std::string& id,
// Check to see if data was appended at the pending seek point. This
// indicates we have parsed enough data to complete the seek.
if (old_seek_pending && !IsSeekPending_Locked() && !seek_cb_.is_null()) {
std::swap(cb, seek_cb_);
if (old_waiting_for_data && !IsSeekWaitingForData_Locked() &&
!seek_cb_.is_null()) {
base::ResetAndReturn(&seek_cb_).Run(PIPELINE_OK);
}
ranges = GetBufferedRanges();
......@@ -876,9 +871,6 @@ void ChunkDemuxer::AppendData(const std::string& id,
for (size_t i = 0; i < ranges.size(); ++i)
host_->AddBufferedTimeRange(ranges.start(i), ranges.end(i));
if (!cb.is_null())
cb.Run(PIPELINE_OK);
}
void ChunkDemuxer::Abort(const std::string& id) {
......@@ -920,14 +912,13 @@ void ChunkDemuxer::SetDuration(double duration) {
// Compute & bounds check the TimeDelta representation of duration.
// This can be different if the value of |duration| doesn't fit the range or
// precision of base::TimeDelta.
base::TimeDelta min_duration = base::TimeDelta::FromInternalValue(1);
base::TimeDelta max_duration =
base::TimeDelta::FromInternalValue(kint64max - 1);
// precision of TimeDelta.
TimeDelta min_duration = TimeDelta::FromInternalValue(1);
TimeDelta max_duration = TimeDelta::FromInternalValue(kint64max - 1);
double min_duration_in_seconds = min_duration.InSecondsF();
double max_duration_in_seconds = max_duration.InSecondsF();
base::TimeDelta duration_td;
TimeDelta duration_td;
if (duration == std::numeric_limits<double>::infinity()) {
duration_td = media::kInfiniteDuration();
} else if (duration < min_duration_in_seconds) {
......@@ -935,11 +926,11 @@ void ChunkDemuxer::SetDuration(double duration) {
} else if (duration > max_duration_in_seconds) {
duration_td = max_duration;
} else {
duration_td = base::TimeDelta::FromMicroseconds(
duration_td = TimeDelta::FromMicroseconds(
duration * base::Time::kMicrosecondsPerSecond);
}
DCHECK(duration_td > base::TimeDelta());
DCHECK(duration_td > TimeDelta());
user_specified_duration_ = duration;
duration_ = duration_td;
......@@ -962,66 +953,57 @@ bool ChunkDemuxer::SetTimestampOffset(const std::string& id, TimeDelta offset) {
void ChunkDemuxer::EndOfStream(PipelineStatus status) {
DVLOG(1) << "EndOfStream(" << status << ")";
PipelineStatusCB cb;
{
base::AutoLock auto_lock(lock_);
DCHECK_NE(state_, WAITING_FOR_INIT);
DCHECK_NE(state_, ENDED);
base::AutoLock auto_lock(lock_);
DCHECK_NE(state_, WAITING_FOR_INIT);
DCHECK_NE(state_, ENDED);
if (state_ == SHUTDOWN || state_ == PARSE_ERROR)
return;
if (state_ == SHUTDOWN || state_ == PARSE_ERROR)
return;
if (state_ == INITIALIZING) {
ReportError_Locked(DEMUXER_ERROR_COULD_NOT_OPEN);
return;
}
if (state_ == INITIALIZING) {
ReportError_Locked(DEMUXER_ERROR_COULD_NOT_OPEN);
return;
}
bool old_seek_pending = IsSeekPending_Locked();
if (audio_)
audio_->EndOfStream();
bool old_waiting_for_data = IsSeekWaitingForData_Locked();
if (audio_)
audio_->EndOfStream();
if (video_)
video_->EndOfStream();
if (video_)
video_->EndOfStream();
// Give a chance to resume the pending seek process.
if (status != PIPELINE_OK) {
ReportError_Locked(status);
return;
}
// Give a chance to resume the pending seek process.
if (status != PIPELINE_OK) {
ReportError_Locked(status);
return;
}
ChangeState_Locked(ENDED);
DecreaseDurationIfNecessary();
ChangeState_Locked(ENDED);
DecreaseDurationIfNecessary();
if (old_seek_pending && !IsSeekPending_Locked() && !seek_cb_.is_null())
std::swap(cb, seek_cb_);
if (old_waiting_for_data && !IsSeekWaitingForData_Locked() &&
!seek_cb_.is_null()) {
base::ResetAndReturn(&seek_cb_).Run(PIPELINE_OK);
}
if (!cb.is_null())
cb.Run(PIPELINE_OK);
}
void ChunkDemuxer::Shutdown() {
DVLOG(1) << "Shutdown()";
PipelineStatusCB cb;
{
base::AutoLock auto_lock(lock_);
if (state_ == SHUTDOWN)
return;
base::AutoLock auto_lock(lock_);
std::swap(cb, seek_cb_);
if (state_ == SHUTDOWN)
return;
if (audio_)
audio_->Shutdown();
if (audio_)
audio_->Shutdown();
if (video_)
video_->Shutdown();
if (video_)
video_->Shutdown();
ChangeState_Locked(SHUTDOWN);
}
ChangeState_Locked(SHUTDOWN);
if (!cb.is_null())
cb.Run(PIPELINE_ERROR_ABORT);
if(!seek_cb_.is_null())
base::ResetAndReturn(&seek_cb_).Run(PIPELINE_ERROR_ABORT);
}
void ChunkDemuxer::ChangeState_Locked(State new_state) {
......@@ -1063,7 +1045,6 @@ void ChunkDemuxer::ReportError_Locked(PipelineStatus error) {
}
if (!cb.is_null()) {
base::AutoUnlock auto_unlock(lock_);
cb.Run(error);
return;
}
......@@ -1072,17 +1053,17 @@ void ChunkDemuxer::ReportError_Locked(PipelineStatus error) {
host_->OnDemuxerError(error);
}
bool ChunkDemuxer::IsSeekPending_Locked() const {
bool ChunkDemuxer::IsSeekWaitingForData_Locked() const {
lock_.AssertAcquired();
bool seek_pending = false;
bool waiting_for_data = false;
if (audio_)
seek_pending = audio_->IsSeekPending();
waiting_for_data = audio_->IsSeekWaitingForData();
if (!seek_pending && video_)
seek_pending = video_->IsSeekPending();
if (!waiting_for_data && video_)
waiting_for_data = video_->IsSeekWaitingForData();
return seek_pending;
return waiting_for_data;
}
void ChunkDemuxer::OnSourceInitDone(bool success, TimeDelta duration) {
......@@ -1095,7 +1076,7 @@ void ChunkDemuxer::OnSourceInitDone(bool success, TimeDelta duration) {
return;
}
if (duration != base::TimeDelta() && duration_ == kNoTimestamp())
if (duration != TimeDelta() && duration_ == kNoTimestamp())
UpdateDuration(duration);
// Wait until all streams have initialized.
......@@ -1216,8 +1197,8 @@ bool ChunkDemuxer::OnTextBuffers(
for (StreamParser::BufferQueue::const_iterator itr = buffers.begin();
itr != buffers.end(); ++itr) {
const StreamParserBuffer* const buffer = itr->get();
const base::TimeDelta start = buffer->GetTimestamp();
const base::TimeDelta end = start + buffer->GetDuration();
const TimeDelta start = buffer->GetTimestamp();
const TimeDelta end = start + buffer->GetDuration();
std::string id, settings, content;
......@@ -1261,7 +1242,7 @@ bool ChunkDemuxer::IsValidId(const std::string& source_id) const {
return source_state_map_.count(source_id) > 0u;
}
void ChunkDemuxer::UpdateDuration(base::TimeDelta new_duration) {
void ChunkDemuxer::UpdateDuration(TimeDelta new_duration) {
DCHECK(duration_ != new_duration);
user_specified_duration_ = -1;
duration_ = new_duration;
......@@ -1278,7 +1259,7 @@ void ChunkDemuxer::IncreaseDurationIfNecessary(
Ranges<TimeDelta> ranges = stream->GetBufferedRanges(kInfiniteDuration());
DCHECK_GT(ranges.size(), 0u);
base::TimeDelta last_timestamp_buffered = ranges.end(ranges.size() - 1);
TimeDelta last_timestamp_buffered = ranges.end(ranges.size() - 1);
if (last_timestamp_buffered > duration_)
UpdateDuration(last_timestamp_buffered);
}
......@@ -1288,7 +1269,7 @@ void ChunkDemuxer::DecreaseDurationIfNecessary() {
if (ranges.size() == 0u)
return;
base::TimeDelta last_timestamp_buffered = ranges.end(ranges.size() - 1);
TimeDelta last_timestamp_buffered = ranges.end(ranges.size() - 1);
if (last_timestamp_buffered < duration_)
UpdateDuration(last_timestamp_buffered);
}
......
......@@ -124,7 +124,7 @@ class MEDIA_EXPORT ChunkDemuxer : public Demuxer {
void ReportError_Locked(PipelineStatus error);
// Returns true if any stream has seeked to a time without buffered data.
bool IsSeekPending_Locked() const;
bool IsSeekWaitingForData_Locked() const;
// Returns true if all streams can successfully call EndOfStream,
// false if any can not.
......
......@@ -460,8 +460,10 @@ class ChunkDemuxerTest : public testing::Test {
}
void ShutdownDemuxer() {
if (demuxer_)
if (demuxer_) {
demuxer_->Shutdown();
message_loop_.RunUntilIdle();
}
}
void AddSimpleBlock(ClusterBuilder* cb, int track_num, int64 timecode) {
......@@ -893,6 +895,8 @@ TEST_F(ChunkDemuxerTest, TestAppendDataAfterSeek) {
AppendData(cluster->data(), cluster->size());
message_loop_.RunUntilIdle();
Checkpoint(2);
}
......@@ -1733,11 +1737,15 @@ TEST_F(ChunkDemuxerTest, TestEndOfStreamAfterPastEosSeek) {
demuxer_->StartWaitingForSeek();
demuxer_->Seek(base::TimeDelta::FromMilliseconds(110),
base::Bind(OnSeekDone_OKExpected, &seek_cb_was_called));
message_loop_.RunUntilIdle();
EXPECT_FALSE(seek_cb_was_called);
EXPECT_CALL(host_, SetDuration(
base::TimeDelta::FromMilliseconds(120)));
demuxer_->EndOfStream(PIPELINE_OK);
message_loop_.RunUntilIdle();
EXPECT_TRUE(seek_cb_was_called);
ShutdownDemuxer();
......@@ -1767,11 +1775,15 @@ TEST_F(ChunkDemuxerTest, TestEndOfStreamDuringPendingSeek) {
demuxer_->StartWaitingForSeek();
demuxer_->Seek(base::TimeDelta::FromMilliseconds(160),
base::Bind(OnSeekDone_OKExpected, &seek_cb_was_called));
message_loop_.RunUntilIdle();
EXPECT_FALSE(seek_cb_was_called);
EXPECT_CALL(host_, SetDuration(
base::TimeDelta::FromMilliseconds(300)));
demuxer_->EndOfStream(PIPELINE_OK);
message_loop_.RunUntilIdle();
EXPECT_FALSE(seek_cb_was_called);
scoped_ptr<Cluster> cluster_a3(
......@@ -1780,6 +1792,9 @@ TEST_F(ChunkDemuxerTest, TestEndOfStreamDuringPendingSeek) {
GenerateSingleStreamCluster(140, 180, kVideoTrackNum, 5));
AppendData(cluster_a3->data(), cluster_a3->size());
AppendData(cluster_v3->data(), cluster_v3->size());
message_loop_.RunUntilIdle();
EXPECT_TRUE(seek_cb_was_called);
ShutdownDemuxer();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment