Commit a3fff4cd authored by scherkus@chromium.org's avatar scherkus@chromium.org

Replace RunInSeries() and RunInParallel() with SerialRunner helper class.

The biggest improvement here is that early termination of the callback series is accomplished by deleting the object.

BUG=138583

Review URL: https://chromiumcodereview.appspot.com/10830146

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@149951 0039d316-1c4b-4281-b951-d872f2087c98
parent 5ef46522
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/base/callback_util.h"
#include "base/bind.h"
#include "base/synchronization/lock.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop.h"
#include "base/message_loop_proxy.h"
namespace media {
// Executes the given closure if and only if the closure returned by
// GetClosure() has been executed exactly |count| times.
//
// |done_cb| will be executed on the same thread that created the CountingCB.
class CountingCB : public base::RefCountedThreadSafe<CountingCB> {
public:
CountingCB(int count, const base::Closure& done_cb)
: message_loop_(base::MessageLoopProxy::current()),
count_(count),
done_cb_(done_cb) {
}
// Returns a closure bound to this object.
base::Closure GetClosure() {
return base::Bind(&CountingCB::OnCallback, this);
}
protected:
friend class base::RefCountedThreadSafe<CountingCB>;
virtual ~CountingCB() {}
private:
void OnCallback() {
{
base::AutoLock l(lock_);
count_--;
DCHECK_GE(count_, 0) << "CountingCB executed too many times";
if (count_ != 0)
return;
}
if (!message_loop_->BelongsToCurrentThread()) {
message_loop_->PostTask(FROM_HERE, done_cb_);
return;
}
done_cb_.Run();
}
scoped_refptr<base::MessageLoopProxy> message_loop_;
base::Lock lock_;
int count_;
base::Closure done_cb_;
DISALLOW_COPY_AND_ASSIGN(CountingCB);
};
static void OnSeriesCallback(
scoped_refptr<base::MessageLoopProxy> message_loop,
scoped_ptr<std::queue<ClosureFunc> > closures,
const base::Closure& done_cb) {
if (!message_loop->BelongsToCurrentThread()) {
message_loop->PostTask(FROM_HERE, base::Bind(
&OnSeriesCallback, message_loop, base::Passed(&closures), done_cb));
return;
}
if (closures->empty()) {
done_cb.Run();
return;
}
ClosureFunc cb = closures->front();
closures->pop();
cb.Run(base::Bind(
&OnSeriesCallback, message_loop, base::Passed(&closures), done_cb));
}
void RunInSeries(scoped_ptr<std::queue<ClosureFunc> > closures,
const base::Closure& done_cb) {
OnSeriesCallback(base::MessageLoopProxy::current(),
closures.Pass(), done_cb);
}
static void OnStatusCallback(
scoped_refptr<base::MessageLoopProxy> message_loop,
scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs,
const PipelineStatusCB& done_cb,
PipelineStatus last_status) {
if (!message_loop->BelongsToCurrentThread()) {
message_loop->PostTask(FROM_HERE, base::Bind(
&OnStatusCallback, message_loop, base::Passed(&status_cbs), done_cb,
last_status));
return;
}
if (status_cbs->empty() || last_status != PIPELINE_OK) {
done_cb.Run(last_status);
return;
}
PipelineStatusCBFunc status_cb = status_cbs->front();
status_cbs->pop();
status_cb.Run(base::Bind(
&OnStatusCallback, message_loop, base::Passed(&status_cbs), done_cb));
}
void RunInSeriesWithStatus(
scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs,
const PipelineStatusCB& done_cb) {
OnStatusCallback(base::MessageLoopProxy::current(),
status_cbs.Pass(), done_cb, PIPELINE_OK);
}
void RunInParallel(scoped_ptr<std::queue<ClosureFunc> > closures,
const base::Closure& done_cb) {
if (closures->empty()) {
done_cb.Run();
return;
}
scoped_refptr<CountingCB> counting_cb =
new CountingCB(closures->size(), done_cb);
while (!closures->empty()) {
closures->front().Run(counting_cb->GetClosure());
closures->pop();
}
}
} // namespace media
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MEDIA_BASE_CALLBACK_UTIL_H_
#define MEDIA_BASE_CALLBACK_UTIL_H_
#include <queue>
#include "base/callback.h"
#include "base/memory/scoped_ptr.h"
#include "media/base/pipeline_status.h"
namespace media {
typedef base::Callback<void(const base::Closure&)> ClosureFunc;
typedef base::Callback<void(const PipelineStatusCB&)> PipelineStatusCBFunc;
// Executes the closures in FIFO order, executing |done_cb| when the last
// closure has completed running.
//
// All closures (including |done_cb|) are executed on same thread as the
// calling thread.
void RunInSeries(scoped_ptr<std::queue<ClosureFunc> > closures,
const base::Closure& done_cb);
// Executes the closures in FIFO order, executing |done_cb| when the last
// closure has completed running, reporting the final status code.
//
// Closures will stop being executed if a previous closure in the series
// returned an error status and |done_cb| will be executed prematurely.
//
// All closures (including |done_cb|) are executed on same thread as the
// calling thread.
void RunInSeriesWithStatus(
scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs,
const PipelineStatusCB& done_cb);
// Executes the closures in parallel, executing |done_cb| when all closures have
// completed running.
//
// No attempt is made to parallelize execution of the closures. In other words,
// this method will run all closures in FIFO order if said closures execute
// synchronously on the same call stack.
//
// All closures (including |done_cb|) are executed on same thread as the
// calling thread.
void RunInParallel(scoped_ptr<std::queue<ClosureFunc> > closures,
const base::Closure& done_cb);
} // namespace media
#endif // MEDIA_BASE_CALLBACK_UTIL_H_
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
#include "base/synchronization/condition_variable.h" #include "base/synchronization/condition_variable.h"
#include "media/base/audio_decoder.h" #include "media/base/audio_decoder.h"
#include "media/base/audio_renderer.h" #include "media/base/audio_renderer.h"
#include "media/base/callback_util.h"
#include "media/base/clock.h" #include "media/base/clock.h"
#include "media/base/filter_collection.h" #include "media/base/filter_collection.h"
#include "media/base/media_log.h" #include "media/base/media_log.h"
...@@ -451,59 +450,63 @@ TimeDelta Pipeline::TimeForByteOffset_Locked(int64 byte_offset) const { ...@@ -451,59 +450,63 @@ TimeDelta Pipeline::TimeForByteOffset_Locked(int64 byte_offset) const {
return time_offset; return time_offset;
} }
void Pipeline::DoPause(const base::Closure& done_cb) { void Pipeline::DoPause(const PipelineStatusCB& done_cb) {
DCHECK(message_loop_->BelongsToCurrentThread()); DCHECK(message_loop_->BelongsToCurrentThread());
scoped_ptr<std::queue<ClosureFunc> > closures(new std::queue<ClosureFunc>); DCHECK(!pending_callbacks_.get());
SerialRunner::Queue bound_fns;
if (audio_renderer_) if (audio_renderer_)
closures->push(base::Bind(&AudioRenderer::Pause, audio_renderer_)); bound_fns.Push(base::Bind(&AudioRenderer::Pause, audio_renderer_));
if (video_renderer_) if (video_renderer_)
closures->push(base::Bind(&VideoRenderer::Pause, video_renderer_)); bound_fns.Push(base::Bind(&VideoRenderer::Pause, video_renderer_));
RunInSeries(closures.Pass(), done_cb); pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
} }
void Pipeline::DoFlush(const base::Closure& done_cb) { void Pipeline::DoFlush(const PipelineStatusCB& done_cb) {
DCHECK(message_loop_->BelongsToCurrentThread()); DCHECK(message_loop_->BelongsToCurrentThread());
scoped_ptr<std::queue<ClosureFunc> > closures(new std::queue<ClosureFunc>); DCHECK(!pending_callbacks_.get());
SerialRunner::Queue bound_fns;
if (audio_renderer_) if (audio_renderer_)
closures->push(base::Bind(&AudioRenderer::Flush, audio_renderer_)); bound_fns.Push(base::Bind(&AudioRenderer::Flush, audio_renderer_));
if (video_renderer_) if (video_renderer_)
closures->push(base::Bind(&VideoRenderer::Flush, video_renderer_)); bound_fns.Push(base::Bind(&VideoRenderer::Flush, video_renderer_));
RunInParallel(closures.Pass(), done_cb); pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
} }
void Pipeline::DoPlay(const base::Closure& done_cb) { void Pipeline::DoPlay(const PipelineStatusCB& done_cb) {
DCHECK(message_loop_->BelongsToCurrentThread()); DCHECK(message_loop_->BelongsToCurrentThread());
scoped_ptr<std::queue<ClosureFunc> > closures(new std::queue<ClosureFunc>); DCHECK(!pending_callbacks_.get());
SerialRunner::Queue bound_fns;
if (audio_renderer_) if (audio_renderer_)
closures->push(base::Bind(&AudioRenderer::Play, audio_renderer_)); bound_fns.Push(base::Bind(&AudioRenderer::Play, audio_renderer_));
if (video_renderer_) if (video_renderer_)
closures->push(base::Bind(&VideoRenderer::Play, video_renderer_)); bound_fns.Push(base::Bind(&VideoRenderer::Play, video_renderer_));
RunInSeries(closures.Pass(), done_cb); pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
} }
void Pipeline::DoStop(const base::Closure& done_cb) { void Pipeline::DoStop(const PipelineStatusCB& done_cb) {
DCHECK(message_loop_->BelongsToCurrentThread()); DCHECK(message_loop_->BelongsToCurrentThread());
scoped_ptr<std::queue<ClosureFunc> > closures(new std::queue<ClosureFunc>); DCHECK(!pending_callbacks_.get());
SerialRunner::Queue bound_fns;
if (demuxer_) if (demuxer_)
closures->push(base::Bind(&Demuxer::Stop, demuxer_)); bound_fns.Push(base::Bind(&Demuxer::Stop, demuxer_));
if (audio_renderer_) if (audio_renderer_)
closures->push(base::Bind(&AudioRenderer::Stop, audio_renderer_)); bound_fns.Push(base::Bind(&AudioRenderer::Stop, audio_renderer_));
if (video_renderer_) if (video_renderer_)
closures->push(base::Bind(&VideoRenderer::Stop, video_renderer_)); bound_fns.Push(base::Bind(&VideoRenderer::Stop, video_renderer_));
RunInSeries(closures.Pass(), done_cb); pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
} }
void Pipeline::AddBufferedByteRange(int64 start, int64 end) { void Pipeline::AddBufferedByteRange(int64 start, int64 end) {
...@@ -544,12 +547,6 @@ void Pipeline::OnFilterInitialize(PipelineStatus status) { ...@@ -544,12 +547,6 @@ void Pipeline::OnFilterInitialize(PipelineStatus status) {
&Pipeline::InitializeTask, this, status)); &Pipeline::InitializeTask, this, status));
} }
// Called from any thread.
void Pipeline::OnFilterStateTransition() {
message_loop_->PostTask(FROM_HERE, base::Bind(
&Pipeline::FilterStateTransitionTask, this));
}
// Called from any thread. // Called from any thread.
// This method makes the PipelineStatusCB behave like a Closure. It // This method makes the PipelineStatusCB behave like a Closure. It
// makes it look like a host()->SetError() call followed by a call to // makes it look like a host()->SetError() call followed by a call to
...@@ -557,13 +554,15 @@ void Pipeline::OnFilterStateTransition() { ...@@ -557,13 +554,15 @@ void Pipeline::OnFilterStateTransition() {
// //
// TODO: Revisit this code when SetError() is removed from FilterHost and // TODO: Revisit this code when SetError() is removed from FilterHost and
// all the Closures are converted to PipelineStatusCB. // all the Closures are converted to PipelineStatusCB.
void Pipeline::OnFilterStateTransitionWithStatus(PipelineStatus status) { void Pipeline::OnFilterStateTransition(PipelineStatus status) {
if (status != PIPELINE_OK) if (status != PIPELINE_OK)
SetError(status); SetError(status);
OnFilterStateTransition(); message_loop_->PostTask(FROM_HERE, base::Bind(
&Pipeline::FilterStateTransitionTask, this));
} }
void Pipeline::OnTeardownStateTransition() { void Pipeline::OnTeardownStateTransition(PipelineStatus status) {
// Ignore any errors during teardown.
message_loop_->PostTask(FROM_HERE, base::Bind( message_loop_->PostTask(FROM_HERE, base::Bind(
&Pipeline::TeardownStateTransitionTask, this)); &Pipeline::TeardownStateTransitionTask, this));
} }
...@@ -694,7 +693,7 @@ void Pipeline::InitializeTask(PipelineStatus last_stage_status) { ...@@ -694,7 +693,7 @@ void Pipeline::InitializeTask(PipelineStatus last_stage_status) {
SetState(kSeeking); SetState(kSeeking);
seek_timestamp_ = demuxer_->GetStartTime(); seek_timestamp_ = demuxer_->GetStartTime();
DoSeek(seek_timestamp_, true, DoSeek(seek_timestamp_, true,
base::Bind(&Pipeline::OnFilterStateTransitionWithStatus, this)); base::Bind(&Pipeline::OnFilterStateTransition, this));
} }
} }
...@@ -889,6 +888,9 @@ void Pipeline::AudioDisabledTask() { ...@@ -889,6 +888,9 @@ void Pipeline::AudioDisabledTask() {
void Pipeline::FilterStateTransitionTask() { void Pipeline::FilterStateTransitionTask() {
DCHECK(message_loop_->BelongsToCurrentThread()); DCHECK(message_loop_->BelongsToCurrentThread());
DCHECK(pending_callbacks_.get())
<< "Filter state transitions must be completed via pending_callbacks_";
pending_callbacks_.reset();
// No reason transitioning if we've errored or have stopped. // No reason transitioning if we've errored or have stopped.
if (IsPipelineStopped()) { if (IsPipelineStopped()) {
...@@ -923,7 +925,7 @@ void Pipeline::FilterStateTransitionTask() { ...@@ -923,7 +925,7 @@ void Pipeline::FilterStateTransitionTask() {
DoFlush(base::Bind(&Pipeline::OnFilterStateTransition, this)); DoFlush(base::Bind(&Pipeline::OnFilterStateTransition, this));
} else if (state_ == kSeeking) { } else if (state_ == kSeeking) {
DoSeek(seek_timestamp_, false, DoSeek(seek_timestamp_, false,
base::Bind(&Pipeline::OnFilterStateTransitionWithStatus, this)); base::Bind(&Pipeline::OnFilterStateTransition, this));
} else if (state_ == kStarting) { } else if (state_ == kStarting) {
DoPlay(base::Bind(&Pipeline::OnFilterStateTransition, this)); DoPlay(base::Bind(&Pipeline::OnFilterStateTransition, this));
} else if (state_ == kStopping) { } else if (state_ == kStopping) {
...@@ -964,6 +966,10 @@ void Pipeline::FilterStateTransitionTask() { ...@@ -964,6 +966,10 @@ void Pipeline::FilterStateTransitionTask() {
void Pipeline::TeardownStateTransitionTask() { void Pipeline::TeardownStateTransitionTask() {
DCHECK(IsPipelineTearingDown()); DCHECK(IsPipelineTearingDown());
DCHECK(pending_callbacks_.get())
<< "Teardown state transitions must be completed via pending_callbacks_";
pending_callbacks_.reset();
switch (state_) { switch (state_) {
case kStopping: case kStopping:
SetState(error_caused_teardown_ ? kError : kStopped); SetState(error_caused_teardown_ ? kError : kStopped);
...@@ -1171,6 +1177,9 @@ void Pipeline::TearDownPipeline() { ...@@ -1171,6 +1177,9 @@ void Pipeline::TearDownPipeline() {
// Mark that we already start tearing down operation. // Mark that we already start tearing down operation.
tearing_down_ = true; tearing_down_ = true;
// Cancel any pending operation so we can proceed with teardown.
pending_callbacks_.reset();
switch (state_) { switch (state_) {
case kCreated: case kCreated:
case kError: case kError:
...@@ -1229,22 +1238,25 @@ void Pipeline::DoSeek(base::TimeDelta seek_timestamp, ...@@ -1229,22 +1238,25 @@ void Pipeline::DoSeek(base::TimeDelta seek_timestamp,
bool skip_demuxer_seek, bool skip_demuxer_seek,
const PipelineStatusCB& done_cb) { const PipelineStatusCB& done_cb) {
DCHECK(message_loop_->BelongsToCurrentThread()); DCHECK(message_loop_->BelongsToCurrentThread());
scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs( DCHECK(!pending_callbacks_.get());
new std::queue<PipelineStatusCBFunc>()); SerialRunner::Queue bound_fns;
if (!skip_demuxer_seek) if (!skip_demuxer_seek) {
status_cbs->push(base::Bind(&Demuxer::Seek, demuxer_, seek_timestamp)); bound_fns.Push(base::Bind(
&Demuxer::Seek, demuxer_, seek_timestamp));
}
if (audio_renderer_) if (audio_renderer_) {
status_cbs->push(base::Bind( bound_fns.Push(base::Bind(
&AudioRenderer::Preroll, audio_renderer_, seek_timestamp)); &AudioRenderer::Preroll, audio_renderer_, seek_timestamp));
}
if (video_renderer_) if (video_renderer_) {
status_cbs->push(base::Bind( bound_fns.Push(base::Bind(
&VideoRenderer::Preroll, video_renderer_, seek_timestamp)); &VideoRenderer::Preroll, video_renderer_, seek_timestamp));
}
RunInSeriesWithStatus(status_cbs.Pass(), base::Bind( pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
&Pipeline::ReportStatus, this, done_cb));
} }
void Pipeline::OnAudioUnderflow() { void Pipeline::OnAudioUnderflow() {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "media/base/media_export.h" #include "media/base/media_export.h"
#include "media/base/pipeline_status.h" #include "media/base/pipeline_status.h"
#include "media/base/ranges.h" #include "media/base/ranges.h"
#include "media/base/serial_runner.h"
#include "ui/gfx/size.h" #include "ui/gfx/size.h"
class MessageLoop; class MessageLoop;
...@@ -310,14 +311,12 @@ class MEDIA_EXPORT Pipeline ...@@ -310,14 +311,12 @@ class MEDIA_EXPORT Pipeline
// Callbacks executed by filters upon completing initialization. // Callbacks executed by filters upon completing initialization.
void OnFilterInitialize(PipelineStatus status); void OnFilterInitialize(PipelineStatus status);
// Callback executed by filters upon completing Play(), Pause(), or Stop(). // Callback executed by filters upon completing Play(), Pause(), Flush(),
void OnFilterStateTransition(); // Seek() or Stop().
void OnFilterStateTransition(PipelineStatus status);
// Callback executed by filters upon completing Seek().
void OnFilterStateTransitionWithStatus(PipelineStatus status);
// Callback executed by filters when completing teardown operations. // Callback executed by filters when completing teardown operations.
void OnTeardownStateTransition(); void OnTeardownStateTransition(PipelineStatus status);
// Callback executed by filters to update statistics. // Callback executed by filters to update statistics.
void OnUpdateStatistics(const PipelineStatistics& stats); void OnUpdateStatistics(const PipelineStatistics& stats);
...@@ -415,10 +414,10 @@ class MEDIA_EXPORT Pipeline ...@@ -415,10 +414,10 @@ class MEDIA_EXPORT Pipeline
// Initiates an asynchronous Pause/Seek/Play/Stop() call sequence executing // Initiates an asynchronous Pause/Seek/Play/Stop() call sequence executing
// |done_cb| when completed. // |done_cb| when completed.
void DoPause(const base::Closure& done_cb); void DoPause(const PipelineStatusCB& done_cb);
void DoFlush(const base::Closure& done_cb); void DoFlush(const PipelineStatusCB& done_cb);
void DoPlay(const base::Closure& done_cb); void DoPlay(const PipelineStatusCB& done_cb);
void DoStop(const base::Closure& done_cb); void DoStop(const PipelineStatusCB& done_cb);
// Initiates an asynchronous Seek() and preroll call sequence executing // Initiates an asynchronous Seek() and preroll call sequence executing
// |done_cb| with the final status when completed. If |skip_demuxer_seek| is // |done_cb| with the final status when completed. If |skip_demuxer_seek| is
...@@ -563,6 +562,8 @@ class MEDIA_EXPORT Pipeline ...@@ -563,6 +562,8 @@ class MEDIA_EXPORT Pipeline
// reaches "kStarted", at which point it is used & zeroed out. // reaches "kStarted", at which point it is used & zeroed out.
base::Time creation_time_; base::Time creation_time_;
scoped_ptr<SerialRunner> pending_callbacks_;
DISALLOW_COPY_AND_ASSIGN(Pipeline); DISALLOW_COPY_AND_ASSIGN(Pipeline);
}; };
......
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/base/serial_runner.h"
#include "base/bind.h"
#include "base/callback_helpers.h"
#include "base/message_loop.h"
#include "base/message_loop_proxy.h"
namespace media {
// Converts a bound function accepting a Closure into a bound function
// accepting a PipelineStatusCB. Since closures have no way of reporting a
// status |status_cb| is executed with PIPELINE_OK.
static void RunBoundClosure(
const SerialRunner::BoundClosure& bound_closure,
const PipelineStatusCB& status_cb) {
bound_closure.Run(base::Bind(status_cb, PIPELINE_OK));
}
// Runs |status_cb| with |last_status| on |message_loop|, trampolining if
// necessary.
static void RunOnMessageLoop(
const scoped_refptr<base::MessageLoopProxy>& message_loop,
const PipelineStatusCB& status_cb,
PipelineStatus last_status) {
if (!message_loop->BelongsToCurrentThread()) {
message_loop->PostTask(FROM_HERE, base::Bind(
&RunOnMessageLoop, message_loop, status_cb, last_status));
return;
}
status_cb.Run(last_status);
}
SerialRunner::Queue::Queue() {}
SerialRunner::Queue::~Queue() {}
void SerialRunner::Queue::Push(
const BoundClosure& bound_closure) {
bound_fns_.push(base::Bind(&RunBoundClosure, bound_closure));
}
void SerialRunner::Queue::Push(
const BoundPipelineStatusCB& bound_status_cb) {
bound_fns_.push(bound_status_cb);
}
SerialRunner::BoundPipelineStatusCB SerialRunner::Queue::Pop() {
BoundPipelineStatusCB bound_fn = bound_fns_.front();
bound_fns_.pop();
return bound_fn;
}
bool SerialRunner::Queue::empty() {
return bound_fns_.empty();
}
SerialRunner::SerialRunner(
const Queue& bound_fns, const PipelineStatusCB& done_cb)
: weak_this_(this),
message_loop_(base::MessageLoopProxy::current()),
bound_fns_(bound_fns),
done_cb_(done_cb) {
message_loop_->PostTask(FROM_HERE, base::Bind(
&SerialRunner::RunNextInSeries, weak_this_.GetWeakPtr(),
PIPELINE_OK));
}
SerialRunner::~SerialRunner() {}
scoped_ptr<SerialRunner> SerialRunner::Run(
const Queue& bound_fns, const PipelineStatusCB& done_cb) {
scoped_ptr<SerialRunner> callback_series(
new SerialRunner(bound_fns, done_cb));
return callback_series.Pass();
}
void SerialRunner::RunNextInSeries(PipelineStatus last_status) {
DCHECK(message_loop_->BelongsToCurrentThread());
DCHECK(!done_cb_.is_null());
if (bound_fns_.empty() || last_status != PIPELINE_OK) {
base::ResetAndReturn(&done_cb_).Run(last_status);
return;
}
BoundPipelineStatusCB bound_fn = bound_fns_.Pop();
bound_fn.Run(base::Bind(&RunOnMessageLoop, message_loop_, base::Bind(
&SerialRunner::RunNextInSeries, weak_this_.GetWeakPtr())));
}
} // namespace media
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MEDIA_BASE_SERIAL_RUNNER_H_
#define MEDIA_BASE_SERIAL_RUNNER_H_
#include <queue>
#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "media/base/pipeline_status.h"
namespace base {
class MessageLoopProxy;
}
namespace media {
// Runs a series of bound functions accepting Closures or PipelineStatusCB.
// SerialRunner doesn't use regular Closure/PipelineStatusCBs as it late binds
// the completion callback as the series progresses.
class SerialRunner {
public:
typedef base::Callback<void(const base::Closure&)> BoundClosure;
typedef base::Callback<void(const PipelineStatusCB&)> BoundPipelineStatusCB;
// Serial queue of bound functions to run.
class Queue {
public:
Queue();
~Queue();
void Push(const BoundClosure& bound_fn);
void Push(const BoundPipelineStatusCB& bound_fn);
private:
friend class SerialRunner;
BoundPipelineStatusCB Pop();
bool empty();
std::queue<BoundPipelineStatusCB> bound_fns_;
};
// Executes the bound functions in series, executing |done_cb| when finished.
//
// All bound functions are executed on the thread that Run() is called on,
// including |done_cb|.
//
// Deleting the object will prevent execution of any unstarted bound
// functions, including |done_cb|.
static scoped_ptr<SerialRunner> Run(
const Queue& bound_fns, const PipelineStatusCB& done_cb);
private:
friend class scoped_ptr<SerialRunner>;
SerialRunner(const Queue& bound_fns, const PipelineStatusCB& done_cb);
~SerialRunner();
void RunNextInSeries(PipelineStatus last_status);
base::WeakPtrFactory<SerialRunner> weak_this_;
scoped_refptr<base::MessageLoopProxy> message_loop_;
Queue bound_fns_;
PipelineStatusCB done_cb_;
DISALLOW_COPY_AND_ASSIGN(SerialRunner);
};
} // namespace media
#endif // MEDIA_BASE_SERIAL_RUNNER_H_
...@@ -155,8 +155,6 @@ ...@@ -155,8 +155,6 @@
'base/buffers.h', 'base/buffers.h',
'base/byte_queue.cc', 'base/byte_queue.cc',
'base/byte_queue.h', 'base/byte_queue.h',
'base/callback_util.cc',
'base/callback_util.h',
'base/channel_layout.cc', 'base/channel_layout.cc',
'base/channel_layout.h', 'base/channel_layout.h',
'base/clock.cc', 'base/clock.cc',
...@@ -201,6 +199,8 @@ ...@@ -201,6 +199,8 @@
'base/ranges.h', 'base/ranges.h',
'base/seekable_buffer.cc', 'base/seekable_buffer.cc',
'base/seekable_buffer.h', 'base/seekable_buffer.h',
'base/serial_runner.cc',
'base/serial_runner.h',
'base/sinc_resampler.cc', 'base/sinc_resampler.cc',
'base/sinc_resampler.h', 'base/sinc_resampler.h',
'base/stream_parser.cc', 'base/stream_parser.cc',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment