Commit 5773863a authored by Tommi's avatar Tommi Committed by Commit Bot

Make WebRTC task queues run on chromium's SequencedTaskRunner.

This changes the previous 1:1 TQ:thread relationship and associates
WebRTC task queues with Chromium's thread pool.

The effect of this is that PeerConnection objects, will start
to share threads for "rtc_event_log", "EncoderQueue" and
"call_worker_queue".

Constructing an instance of PeerConnection will be much less
costly and likely not cost any extra thread creation at all,
which is why we currently have a limit on the number of
instances that we allow creating.

Change-Id: Ifd218c5e14e2ce634fd428606e9713a815bd8205
Bug: 808801
Reviewed-on: https://chromium-review.googlesource.com/890738
Commit-Queue: Tommi <tommi@chromium.org>
Reviewed-by: default avatarGuido Urdaneta <guidou@chromium.org>
Reviewed-by: default avatarHans Wennborg <hans@chromium.org>
Cr-Commit-Position: refs/heads/master@{#542246}
parent 1a49710c
include_rules = [ include_rules = [
'+base', '+base',
'+build',
'+net/base', '+net/base',
'+third_party/webrtc', '+third_party/webrtc',
'+third_party/webrtc_overrides', '+third_party/webrtc_overrides',
......
...@@ -13,104 +13,191 @@ ...@@ -13,104 +13,191 @@
#include "base/bind.h" #include "base/bind.h"
#include "base/lazy_instance.h" #include "base/lazy_instance.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/threading/thread.h" #include "base/synchronization/waitable_event.h"
#include "base/task_scheduler/post_task.h"
#include "base/threading/thread_local.h" #include "base/threading/thread_local.h"
#include "build/build_config.h"
#include "third_party/webrtc/rtc_base/refcount.h" #include "third_party/webrtc/rtc_base/refcount.h"
#include "third_party/webrtc/rtc_base/refcountedobject.h" #include "third_party/webrtc/rtc_base/refcountedobject.h"
// Intentionally outside of the "namespace rtc { ... }" block, because using base::WaitableEvent;
// here, scoped_refptr should *not* be resolved as rtc::scoped_refptr.
namespace rtc {
namespace { namespace {
void RunTask(std::unique_ptr<rtc::QueuedTask> task) { // A lazily created thread local storage for quick access to a TaskQueue.
if (!task->Run()) base::LazyInstance<base::ThreadLocalPointer<TaskQueue>>::Leaky lazy_tls_ptr =
task.release(); LAZY_INSTANCE_INITIALIZER;
base::TaskTraits TaskQueuePriority2Traits(TaskQueue::Priority priority) {
// The content/renderer/media/webrtc/rtc_video_encoder.* code
// employs a PostTask/Wait pattern that uses TQ in a way that makes it
// blocking and synchronous, which is why we allow WithBaseSyncPrimitives()
// for OS_ANDROID.
switch (priority) {
case TaskQueue::Priority::HIGH:
#if defined(OS_ANDROID)
return {base::WithBaseSyncPrimitives(), base::TaskPriority::HIGHEST};
#else
return {base::TaskPriority::HIGHEST};
#endif
break;
case TaskQueue::Priority::LOW:
return {base::MayBlock(), base::TaskPriority::BACKGROUND};
case TaskQueue::Priority::NORMAL:
default:
#if defined(OS_ANDROID)
return {base::WithBaseSyncPrimitives()};
#else
return {};
#endif
}
}
} // namespace
bool TaskQueue::IsCurrent() const {
return Current() == this;
} }
class PostAndReplyTask : public rtc::QueuedTask { class TaskQueue::Impl : public RefCountInterface {
public: public:
PostAndReplyTask( Impl(const char* queue_name,
std::unique_ptr<rtc::QueuedTask> task, TaskQueue* queue,
std::unique_ptr<rtc::QueuedTask> reply, const base::TaskTraits& traits);
const scoped_refptr<base::SingleThreadTaskRunner>& reply_task_runner) ~Impl() override;
: task_(std::move(task)),
reply_(std::move(reply)),
reply_task_runner_(reply_task_runner) {}
~PostAndReplyTask() override {} // To maintain functional compatibility with WebRTC's TaskQueue, we flush
// and deactivate the task queue here, synchronously.
// This has some drawbacks and will likely change in the future, but for now
// is necessary.
void Stop();
private: void PostTask(std::unique_ptr<QueuedTask> task);
bool Run() override { void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
if (!task_->Run()) void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
task_.release(); std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue);
void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply);
reply_task_runner_->PostTask(FROM_HERE, private:
base::Bind(&RunTask, base::Passed(&reply_))); void RunTask(std::unique_ptr<QueuedTask> task);
return true; void Deactivate(WaitableEvent* event);
}
class PostAndReplyTask : public QueuedTask {
public:
PostAndReplyTask(std::unique_ptr<QueuedTask> task,
::scoped_refptr<TaskQueue::Impl> target_queue,
std::unique_ptr<QueuedTask> reply,
::scoped_refptr<TaskQueue::Impl> reply_queue)
: task_(std::move(task)),
target_queue_(std::move(target_queue)),
reply_(std::move(reply)),
reply_queue_(std::move(reply_queue)) {}
~PostAndReplyTask() override {}
private:
bool Run() override {
if (task_) {
target_queue_->RunTask(std::move(task_));
std::unique_ptr<QueuedTask> t = std::unique_ptr<QueuedTask>(this);
reply_queue_->PostTask(std::move(t));
return false; // Don't delete, ownership lies with reply_queue_.
}
reply_queue_->RunTask(std::move(reply_));
return true; // OK to delete.
}
std::unique_ptr<QueuedTask> task_;
::scoped_refptr<TaskQueue::Impl> target_queue_;
std::unique_ptr<QueuedTask> reply_;
::scoped_refptr<TaskQueue::Impl> reply_queue_;
};
std::unique_ptr<rtc::QueuedTask> task_; TaskQueue* const queue_;
std::unique_ptr<rtc::QueuedTask> reply_; const ::scoped_refptr<base::SequencedTaskRunner> task_runner_;
scoped_refptr<base::SingleThreadTaskRunner> reply_task_runner_; bool is_active_ = true; // Checked and set on |task_runner_|.
}; };
// A lazily created thread local storage for quick access to a TaskQueue. // TaskQueue::Impl.
base::LazyInstance<base::ThreadLocalPointer<rtc::TaskQueue>>::Leaky
lazy_tls_ptr = LAZY_INSTANCE_INITIALIZER;
} // namespace TaskQueue::Impl::Impl(const char* queue_name,
TaskQueue* queue,
const base::TaskTraits& traits)
: queue_(queue),
task_runner_(base::CreateSequencedTaskRunnerWithTraits(traits)) {
DCHECK(task_runner_);
}
namespace rtc { TaskQueue::Impl::~Impl() {}
bool TaskQueue::IsCurrent() const { void TaskQueue::Impl::Stop() {
return Current() == this; WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&TaskQueue::Impl::Deactivate, this, &event));
event.Wait();
} }
class TaskQueue::Impl : public RefCountInterface, public base::Thread { void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
public: task_runner_->PostTask(FROM_HERE, base::BindOnce(&TaskQueue::Impl::RunTask,
Impl(const char* queue_name, TaskQueue* queue); this, base::Passed(&task)));
~Impl() override; }
private: void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
virtual void Init() override; uint32_t milliseconds) {
task_runner_->PostDelayedTask(
FROM_HERE,
base::BindOnce(&TaskQueue::Impl::RunTask, this, base::Passed(&task)),
base::TimeDelta::FromMilliseconds(milliseconds));
}
TaskQueue* const queue_; void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
}; std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) {
std::unique_ptr<QueuedTask> t =
std::unique_ptr<QueuedTask>(new PostAndReplyTask(
std::move(task), this, std::move(reply), reply_queue->impl_.get()));
PostTask(std::move(t));
}
TaskQueue::Impl::Impl(const char* queue_name, TaskQueue* queue) void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
: base::Thread(queue_name), queue_(queue) {} std::unique_ptr<QueuedTask> reply) {
PostTaskAndReply(std::move(task), std::move(reply), queue_);
}
void TaskQueue::Impl::Init() { void TaskQueue::Impl::RunTask(std::unique_ptr<QueuedTask> task) {
if (!is_active_)
return;
auto* prev = lazy_tls_ptr.Pointer()->Get();
lazy_tls_ptr.Pointer()->Set(queue_); lazy_tls_ptr.Pointer()->Set(queue_);
if (!task->Run())
task.release();
lazy_tls_ptr.Pointer()->Set(prev);
} }
TaskQueue::Impl::~Impl() { void TaskQueue::Impl::Deactivate(WaitableEvent* event) {
DCHECK(!Thread::IsRunning()); is_active_ = false;
event->Signal();
} }
// TaskQueue.
TaskQueue::TaskQueue(const char* queue_name, TaskQueue::TaskQueue(const char* queue_name,
Priority priority /*= Priority::NORMAL*/) Priority priority /*= Priority::NORMAL*/)
: impl_(new RefCountedObject<Impl>(queue_name, this)) { : impl_(new RefCountedObject<Impl>(queue_name,
this,
TaskQueuePriority2Traits(priority))) {
DCHECK(queue_name); DCHECK(queue_name);
base::Thread::Options options;
switch (priority) {
case Priority::HIGH:
options.priority = base::ThreadPriority::REALTIME_AUDIO;
break;
case Priority::LOW:
options.priority = base::ThreadPriority::BACKGROUND;
break;
case Priority::NORMAL:
default:
options.priority = base::ThreadPriority::NORMAL;
break;
}
CHECK(impl_->StartWithOptions(options));
} }
TaskQueue::~TaskQueue() { TaskQueue::~TaskQueue() {
DCHECK(!IsCurrent()); DCHECK(!IsCurrent());
impl_->Stop();
} }
// static // static
...@@ -119,29 +206,23 @@ TaskQueue* TaskQueue::Current() { ...@@ -119,29 +206,23 @@ TaskQueue* TaskQueue::Current() {
} }
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
impl_->task_runner()->PostTask(FROM_HERE, impl_->PostTask(std::move(task));
base::Bind(&RunTask, base::Passed(&task)));
} }
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) { uint32_t milliseconds) {
impl_->task_runner()->PostDelayedTask( impl_->PostDelayedTask(std::move(task), milliseconds);
FROM_HERE, base::Bind(&RunTask, base::Passed(&task)),
base::TimeDelta::FromMilliseconds(milliseconds));
} }
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply, std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) { TaskQueue* reply_queue) {
PostTask(std::unique_ptr<QueuedTask>(new PostAndReplyTask( impl_->PostTaskAndReply(std::move(task), std::move(reply), reply_queue);
std::move(task), std::move(reply), reply_queue->impl_->task_runner())));
} }
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply) { std::unique_ptr<QueuedTask> reply) {
impl_->task_runner()->PostTaskAndReply( impl_->PostTaskAndReply(std::move(task), std::move(reply));
FROM_HERE, base::Bind(&RunTask, base::Passed(&task)),
base::Bind(&RunTask, base::Passed(&reply)));
} }
} // namespace rtc } // namespace rtc
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment