Commit a46cc751 authored by damienv's avatar damienv Committed by Commit bot

Introduce the concept of media task runner.

A media task runner can receive tasks whose scheduling
depends on a media timestamp.
The balanced media task runner factory generates some media task
runners whose task schedulings are loosely synchronized between
each other. A direct application is to query audio and video frames
in a balanced way, which minimizes the memory used for muxed streams.

BUG=408189

Review URL: https://codereview.chromium.org/534893002

Cr-Commit-Position: refs/heads/master@{#293368}
parent 2a5280a3
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/cma/base/balanced_media_task_runner_factory.h"
#include <map>
#include "base/bind.h"
#include "base/callback_helpers.h"
#include "base/logging.h"
#include "base/single_thread_task_runner.h"
#include "chromecast/media/cma/base/media_task_runner.h"
#include "media/base/buffers.h"
namespace chromecast {
namespace media {
// MediaTaskRunnerWithNotification -
// Media task runner which also behaves as a media task runner observer.
class MediaTaskRunnerWithNotification : public MediaTaskRunner {
public:
// Wraps a MediaTaskRunner so that a third party can:
// - be notified when a PostMediaTask is performed on this media task runner.
// |new_task_cb| is invoked in that case.
// - monitor the lifetime of the media task runner, i.e. check when the media
// task runner is not needed anymore.
// |shutdown_cb| is invoked in that case.
MediaTaskRunnerWithNotification(
const scoped_refptr<MediaTaskRunner>& media_task_runner,
const base::Closure& new_task_cb,
const base::Closure& shutdown_cb);
// MediaTaskRunner implementation.
virtual bool PostMediaTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta timestamp) OVERRIDE;
private:
virtual ~MediaTaskRunnerWithNotification();
scoped_refptr<MediaTaskRunner> const media_task_runner_;
const base::Closure new_task_cb_;
const base::Closure shutdown_cb_;
DISALLOW_COPY_AND_ASSIGN(MediaTaskRunnerWithNotification);
};
MediaTaskRunnerWithNotification::MediaTaskRunnerWithNotification(
const scoped_refptr<MediaTaskRunner>& media_task_runner,
const base::Closure& new_task_cb,
const base::Closure& shutdown_cb)
: media_task_runner_(media_task_runner),
new_task_cb_(new_task_cb),
shutdown_cb_(shutdown_cb) {
}
MediaTaskRunnerWithNotification::~MediaTaskRunnerWithNotification() {
shutdown_cb_.Run();
}
bool MediaTaskRunnerWithNotification::PostMediaTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta timestamp) {
bool may_run_in_future =
media_task_runner_->PostMediaTask(from_here, task, timestamp);
if (may_run_in_future)
new_task_cb_.Run();
return may_run_in_future;
}
// BalancedMediaTaskRunner -
// Run media tasks whose timestamp is less or equal to a max timestamp.
//
// Restrictions of BalancedMediaTaskRunner:
// - Can have at most one task in the queue.
// - Tasks should be given by increasing timestamps.
class BalancedMediaTaskRunner
: public MediaTaskRunner {
public:
explicit BalancedMediaTaskRunner(
const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);
// Schedule tasks whose timestamp is less than or equal to |max_timestamp|.
void ScheduleWork(base::TimeDelta max_timestamp);
// Return the timestamp of the last media task.
// Return ::media::kNoTimestamp() if no media task has been posted.
base::TimeDelta GetMediaTimestamp() const;
// MediaTaskRunner implementation.
virtual bool PostMediaTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta timestamp) OVERRIDE;
private:
virtual ~BalancedMediaTaskRunner();
scoped_refptr<base::SingleThreadTaskRunner> const task_runner_;
// Protects the following variables.
mutable base::Lock lock_;
// Possible pending media task.
tracked_objects::Location from_here_;
base::Closure pending_task_;
// Timestamp of the last posted task.
// Is initialized to ::media::kNoTimestamp().
base::TimeDelta last_timestamp_;
DISALLOW_COPY_AND_ASSIGN(BalancedMediaTaskRunner);
};
BalancedMediaTaskRunner::BalancedMediaTaskRunner(
const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
: task_runner_(task_runner),
last_timestamp_(::media::kNoTimestamp()) {
}
BalancedMediaTaskRunner::~BalancedMediaTaskRunner() {
}
void BalancedMediaTaskRunner::ScheduleWork(base::TimeDelta max_media_time) {
base::Closure task;
{
base::AutoLock auto_lock(lock_);
if (pending_task_.is_null())
return;
if (last_timestamp_ != ::media::kNoTimestamp() &&
last_timestamp_ >= max_media_time) {
return;
}
task = base::ResetAndReturn(&pending_task_);
}
task_runner_->PostTask(from_here_, task);
}
base::TimeDelta BalancedMediaTaskRunner::GetMediaTimestamp() const {
base::AutoLock auto_lock(lock_);
return last_timestamp_;
}
bool BalancedMediaTaskRunner::PostMediaTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta timestamp) {
DCHECK(!task.is_null());
// Pass through for a task with no timestamp.
if (timestamp == ::media::kNoTimestamp()) {
return task_runner_->PostTask(from_here, task);
}
base::AutoLock auto_lock(lock_);
// Timestamps must be in order.
// Any task that does not meet that condition is simply discarded.
if (last_timestamp_ != ::media::kNoTimestamp() &&
timestamp < last_timestamp_) {
return false;
}
// Only support one pending task at a time.
DCHECK(pending_task_.is_null());
from_here_ = from_here;
pending_task_ = task;
last_timestamp_ = timestamp;
return true;
}
BalancedMediaTaskRunnerFactory::BalancedMediaTaskRunnerFactory(
base::TimeDelta max_delta)
: max_delta_(max_delta) {
}
BalancedMediaTaskRunnerFactory::~BalancedMediaTaskRunnerFactory() {
}
scoped_refptr<MediaTaskRunner>
BalancedMediaTaskRunnerFactory::CreateMediaTaskRunner(
const scoped_refptr<base::SingleThreadTaskRunner>& task_runner) {
scoped_refptr<BalancedMediaTaskRunner> media_task_runner(
new BalancedMediaTaskRunner(task_runner));
scoped_refptr<MediaTaskRunnerWithNotification> media_task_runner_wrapper(
new MediaTaskRunnerWithNotification(
media_task_runner,
base::Bind(&BalancedMediaTaskRunnerFactory::OnNewTask, this),
base::Bind(
&BalancedMediaTaskRunnerFactory::UnregisterMediaTaskRunner,
this, media_task_runner)));
base::AutoLock auto_lock(lock_);
// Note that |media_task_runner| is inserted here and
// not |media_task_runner_wrapper|. Otherwise, we would always have one
// ref on |media_task_runner_wrapper| and would never get the release
// notification.
// When |media_task_runner_wrapper| is going away,
// BalancedMediaTaskRunnerFactory will receive a notification and will in
// turn remove |media_task_runner|.
task_runners_.insert(media_task_runner);
return media_task_runner_wrapper;
}
void BalancedMediaTaskRunnerFactory::OnNewTask() {
typedef
std::multimap<base::TimeDelta, scoped_refptr<BalancedMediaTaskRunner> >
TaskRunnerMap;
TaskRunnerMap runnable_task_runner;
base::AutoLock auto_lock(lock_);
// Get the minimum timestamp among all streams.
for (MediaTaskRunnerSet::const_iterator it = task_runners_.begin();
it != task_runners_.end(); ++it) {
base::TimeDelta timestamp((*it)->GetMediaTimestamp());
if (timestamp == ::media::kNoTimestamp())
continue;
runnable_task_runner.insert(
std::pair<base::TimeDelta, scoped_refptr<BalancedMediaTaskRunner> >(
timestamp, *it));
}
// If there is no media task, just returns.
if (runnable_task_runner.empty())
return;
// Run tasks which meet the balancing criteria.
base::TimeDelta min_timestamp(runnable_task_runner.begin()->first);
base::TimeDelta max_timestamp = min_timestamp + max_delta_;
for (TaskRunnerMap::iterator it = runnable_task_runner.begin();
it != runnable_task_runner.end(); ++it) {
(*it).second->ScheduleWork(max_timestamp);
}
}
void BalancedMediaTaskRunnerFactory::UnregisterMediaTaskRunner(
const scoped_refptr<BalancedMediaTaskRunner>& media_task_runner) {
base::AutoLock auto_lock(lock_);
task_runners_.erase(media_task_runner);
}
} // namespace media
} // namespace chromecast
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_MEDIA_CMA_BASE_BALANCED_TASK_RUNNER_FACTORY_H_
#define CHROMECAST_MEDIA_CMA_BASE_BALANCED_TASK_RUNNER_FACTORY_H_
#include <set>
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "base/time/time.h"
namespace base {
class SingleThreadTaskRunner;
}
namespace chromecast {
namespace media {
class BalancedMediaTaskRunner;
class MediaTaskRunner;
// BalancedMediaTaskRunnerFactory -
// Create media tasks runners that are loosely synchronized between each other.
// For two tasks T1 and T2 with timestamps ts1 and ts2, the scheduler ensures
// T2 is not scheduled before T1 if ts2 > ts1 + |max_delta|.
class BalancedMediaTaskRunnerFactory
: public base::RefCountedThreadSafe<BalancedMediaTaskRunnerFactory> {
public:
explicit BalancedMediaTaskRunnerFactory(base::TimeDelta max_delta);
// Creates a media task runner using |task_runner| as the underlying
// regular task runner.
// Restriction on the returned media task runner:
// - can only schedule only one media task at a time.
// - timestamps of tasks posted on that task runner must be increasing.
scoped_refptr<MediaTaskRunner> CreateMediaTaskRunner(
const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);
private:
typedef std::set<scoped_refptr<BalancedMediaTaskRunner> > MediaTaskRunnerSet;
friend class base::RefCountedThreadSafe<BalancedMediaTaskRunnerFactory>;
virtual ~BalancedMediaTaskRunnerFactory();
// Invoked when one of the registered media task runners received a new media
// task.
void OnNewTask();
// Unregister a media task runner.
void UnregisterMediaTaskRunner(
const scoped_refptr<BalancedMediaTaskRunner>& media_task_runner);
// Maximum timestamp deviation between tasks from the registered task runners.
const base::TimeDelta max_delta_;
// Task runners created by the factory that have not been unregistered yet.
base::Lock lock_;
MediaTaskRunnerSet task_runners_;
DISALLOW_COPY_AND_ASSIGN(BalancedMediaTaskRunnerFactory);
};
} // namespace media
} // namespace chromecast
#endif // CHROMECAST_MEDIA_CMA_BASE_BALANCED_TASK_RUNNER_FACTORY_H_
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <list>
#include <vector>
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/threading/thread.h"
#include "base/time/time.h"
#include "chromecast/media/cma/base/balanced_media_task_runner_factory.h"
#include "chromecast/media/cma/base/media_task_runner.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace chromecast {
namespace media {
namespace {
struct MediaTaskRunnerTestContext {
MediaTaskRunnerTestContext();
~MediaTaskRunnerTestContext();
scoped_refptr<MediaTaskRunner> media_task_runner;
bool is_pending_task;
std::vector<base::TimeDelta> task_timestamp_list;
size_t task_index;
base::TimeDelta max_timestamp;
};
MediaTaskRunnerTestContext::MediaTaskRunnerTestContext() {
}
MediaTaskRunnerTestContext::~MediaTaskRunnerTestContext() {
}
} // namespace
class BalancedMediaTaskRunnerTest : public testing::Test {
public:
BalancedMediaTaskRunnerTest();
virtual ~BalancedMediaTaskRunnerTest();
void SetupTest(base::TimeDelta max_delta,
const std::vector<std::vector<int> >& timestamps_in_ms,
const std::vector<size_t>& pattern,
const std::vector<int>& expected_task_timestamps_ms);
void ProcessAllTasks();
protected:
// Expected task order based on their timestamps.
std::list<base::TimeDelta> expected_task_timestamps_;
private:
void ScheduleTask();
void Task(size_t task_runner_id, base::TimeDelta timestamp);
void OnTestTimeout();
scoped_refptr<BalancedMediaTaskRunnerFactory> media_task_runner_factory_;
// Schedule first a task on media task runner #scheduling_pattern[0]
// then a task on media task runner #scheduling_pattern[1] and so on.
// Wrap around when reaching the end of the pattern.
std::vector<size_t> scheduling_pattern_;
size_t pattern_index_;
// For each media task runner, keep a track of which task has already been
// scheduled.
std::vector<MediaTaskRunnerTestContext> contexts_;
DISALLOW_COPY_AND_ASSIGN(BalancedMediaTaskRunnerTest);
};
BalancedMediaTaskRunnerTest::BalancedMediaTaskRunnerTest() {
}
BalancedMediaTaskRunnerTest::~BalancedMediaTaskRunnerTest() {
}
void BalancedMediaTaskRunnerTest::SetupTest(
base::TimeDelta max_delta,
const std::vector<std::vector<int> >& timestamps_in_ms,
const std::vector<size_t>& pattern,
const std::vector<int>& expected_task_timestamps_ms) {
media_task_runner_factory_ = new BalancedMediaTaskRunnerFactory(max_delta);
scheduling_pattern_ = pattern;
pattern_index_ = 0;
// Setup each task runner.
size_t n = timestamps_in_ms.size();
contexts_.resize(n);
for (size_t k = 0; k < n; k++) {
contexts_[k].media_task_runner =
media_task_runner_factory_->CreateMediaTaskRunner(
base::MessageLoopProxy::current());
contexts_[k].is_pending_task = false;
contexts_[k].task_index = 0;
contexts_[k].task_timestamp_list.resize(
timestamps_in_ms[k].size());
for (size_t i = 0; i < timestamps_in_ms[k].size(); i++) {
contexts_[k].task_timestamp_list[i] =
base::TimeDelta::FromMilliseconds(timestamps_in_ms[k][i]);
}
}
// Expected task order (for tasks that are actually run).
for (size_t k = 0; k < expected_task_timestamps_ms.size(); k++) {
expected_task_timestamps_.push_back(
base::TimeDelta::FromMilliseconds(expected_task_timestamps_ms[k]));
}
}
void BalancedMediaTaskRunnerTest::ProcessAllTasks() {
base::MessageLoop::current()->PostDelayedTask(
FROM_HERE,
base::Bind(&BalancedMediaTaskRunnerTest::OnTestTimeout,
base::Unretained(this)),
base::TimeDelta::FromSeconds(5));
ScheduleTask();
}
void BalancedMediaTaskRunnerTest::ScheduleTask() {
bool has_task = false;
for (size_t k = 0; k < contexts_.size(); k++) {
if (contexts_[k].task_index < contexts_[k].task_timestamp_list.size())
has_task = true;
}
if (!has_task) {
base::MessageLoop::current()->QuitWhenIdle();
return;
}
size_t next_pattern_index =
(pattern_index_ + 1) % scheduling_pattern_.size();
size_t task_runner_id = scheduling_pattern_[pattern_index_];
MediaTaskRunnerTestContext& context = contexts_[task_runner_id];
// Check whether all tasks have been scheduled for that task runner
// or if there is already one pending task.
if (context.task_index >= context.task_timestamp_list.size() ||
context.is_pending_task) {
pattern_index_ = next_pattern_index;
base::MessageLoopProxy::current()->PostTask(
FROM_HERE,
base::Bind(&BalancedMediaTaskRunnerTest::ScheduleTask,
base::Unretained(this)));
return;
}
bool expected_may_run = false;
if (context.task_timestamp_list[context.task_index] >=
context.max_timestamp) {
expected_may_run = true;
context.max_timestamp = context.task_timestamp_list[context.task_index];
}
bool may_run = context.media_task_runner->PostMediaTask(
FROM_HERE,
base::Bind(&BalancedMediaTaskRunnerTest::Task,
base::Unretained(this),
task_runner_id,
context.task_timestamp_list[context.task_index]),
context.task_timestamp_list[context.task_index]);
EXPECT_EQ(may_run, expected_may_run);
if (may_run)
context.is_pending_task = true;
context.task_index++;
pattern_index_ = next_pattern_index;
base::MessageLoopProxy::current()->PostTask(
FROM_HERE,
base::Bind(&BalancedMediaTaskRunnerTest::ScheduleTask,
base::Unretained(this)));
}
void BalancedMediaTaskRunnerTest::Task(
size_t task_runner_id, base::TimeDelta timestamp) {
ASSERT_FALSE(expected_task_timestamps_.empty());
EXPECT_EQ(timestamp, expected_task_timestamps_.front());
expected_task_timestamps_.pop_front();
contexts_[task_runner_id].is_pending_task = false;
}
void BalancedMediaTaskRunnerTest::OnTestTimeout() {
ADD_FAILURE() << "Test timed out";
if (base::MessageLoop::current())
base::MessageLoop::current()->QuitWhenIdle();
}
TEST_F(BalancedMediaTaskRunnerTest, OneTaskRunner) {
scoped_ptr<base::MessageLoop> message_loop(new base::MessageLoop());
// Timestamps of tasks for the single task runner.
int timestamps0_ms[] = {0, 10, 20, 30, 40, 30, 50, 60, 20, 30, 70};
std::vector<std::vector<int> > timestamps_ms(1);
timestamps_ms[0] = std::vector<int>(
timestamps0_ms, timestamps0_ms + arraysize(timestamps0_ms));
// Scheduling pattern.
std::vector<size_t> scheduling_pattern(1);
scheduling_pattern[0] = 0;
// Expected results.
int expected_timestamps[] = {0, 10, 20, 30, 40, 50, 60, 70};
std::vector<int> expected_timestamps_ms(std::vector<int>(
expected_timestamps,
expected_timestamps + arraysize(expected_timestamps)));
SetupTest(base::TimeDelta::FromMilliseconds(30),
timestamps_ms,
scheduling_pattern,
expected_timestamps_ms);
ProcessAllTasks();
message_loop->Run();
EXPECT_TRUE(expected_task_timestamps_.empty());
}
TEST_F(BalancedMediaTaskRunnerTest, TwoTaskRunnerUnbalanced) {
scoped_ptr<base::MessageLoop> message_loop(new base::MessageLoop());
// Timestamps of tasks for the 2 task runners.
int timestamps0_ms[] = {0, 10, 20, 30, 40, 30, 50, 60, 20, 30, 70};
int timestamps1_ms[] = {5, 15, 25, 35, 45, 35, 55, 65, 25, 35, 75};
std::vector<std::vector<int> > timestamps_ms(2);
timestamps_ms[0] = std::vector<int>(
timestamps0_ms, timestamps0_ms + arraysize(timestamps0_ms));
timestamps_ms[1] = std::vector<int>(
timestamps1_ms, timestamps1_ms + arraysize(timestamps1_ms));
// Scheduling pattern.
size_t pattern[] = {1, 0, 0, 0, 0};
std::vector<size_t> scheduling_pattern = std::vector<size_t>(
pattern, pattern + arraysize(pattern));
// Expected results.
int expected_timestamps[] = {
5, 0, 10, 20, 30, 15, 40, 25, 50, 35, 60, 45, 70, 55, 65, 75 };
std::vector<int> expected_timestamps_ms(std::vector<int>(
expected_timestamps,
expected_timestamps + arraysize(expected_timestamps)));
SetupTest(base::TimeDelta::FromMilliseconds(30),
timestamps_ms,
scheduling_pattern,
expected_timestamps_ms);
ProcessAllTasks();
message_loop->Run();
EXPECT_TRUE(expected_task_timestamps_.empty());
}
} // namespace media
} // namespace chromecast
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/cma/base/media_task_runner.h"
namespace chromecast {
namespace media {
MediaTaskRunner::MediaTaskRunner() {
}
MediaTaskRunner::~MediaTaskRunner() {
}
} // namespace media
} // namespace chromecast
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_MEDIA_CMA_BASE_MEDIA_TASK_RUNNER_H_
#define CHROMECAST_MEDIA_CMA_BASE_MEDIA_TASK_RUNNER_H_
#include "base/callback.h"
#include "base/location.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/time/time.h"
namespace chromecast {
namespace media {
class MediaTaskRunner
: public base::RefCountedThreadSafe<MediaTaskRunner> {
public:
MediaTaskRunner();
// Post a task with the given media |timestamp|. If |timestamp| is equal to
// |kNoTimestamp()|, the task is scheduled right away.
// How the media timestamp is used to schedule the task is an implementation
// detail of derived classes.
// Returns true if the task may be run at some point in the future, and false
// if the task definitely will not be run.
virtual bool PostMediaTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta timestamp) = 0;
protected:
virtual ~MediaTaskRunner();
friend class base::RefCountedThreadSafe<MediaTaskRunner>;
private:
DISALLOW_COPY_AND_ASSIGN(MediaTaskRunner);
};
} // namespace media
} // namespace chromecast
#endif // CHROMECAST_MEDIA_CMA_BASE_MEDIA_TASK_RUNNER_H_
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
'../..', '../..',
], ],
'sources': [ 'sources': [
'cma/base/balanced_media_task_runner_factory.cc',
'cma/base/balanced_media_task_runner_factory.h',
'cma/base/buffering_controller.cc', 'cma/base/buffering_controller.cc',
'cma/base/buffering_controller.h', 'cma/base/buffering_controller.h',
'cma/base/buffering_state.cc', 'cma/base/buffering_state.cc',
...@@ -24,6 +26,8 @@ ...@@ -24,6 +26,8 @@
'cma/base/decoder_buffer_adapter.h', 'cma/base/decoder_buffer_adapter.h',
'cma/base/decoder_buffer_base.cc', 'cma/base/decoder_buffer_base.cc',
'cma/base/decoder_buffer_base.h', 'cma/base/decoder_buffer_base.h',
'cma/base/media_task_runner.cc',
'cma/base/media_task_runner.h',
], ],
}, },
{ {
...@@ -46,6 +50,7 @@ ...@@ -46,6 +50,7 @@
'../../testing/gtest.gyp:gtest_main', '../../testing/gtest.gyp:gtest_main',
], ],
'sources': [ 'sources': [
'cma/base/balanced_media_task_runner_unittest.cc',
'cma/base/buffering_controller_unittest.cc', 'cma/base/buffering_controller_unittest.cc',
'cma/base/run_all_unittests.cc', 'cma/base/run_all_unittests.cc',
], ],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment