Commit a11ff40f authored by skyostil's avatar skyostil Committed by Commit bot

Run task queue manager work in batches

This patch lets the task queue manager run more than one posted task
per invocation. This helps reduce the overhead of yielding to and from
the main message loop and can speed up cases where tasks are posted
very frequently. One example is indexeddb, where some operations such as
index building can result in 2500 posted tasks/s (Nexus 7).

This patch also adds accounting for the desired run time of the next
pending delayed task. This information is used to break out of a work
batch if a delayed task should be run instead. Doing this avoids adding
extra delay to delayed tasks.

A potential downside of this change is that it can penalize work that
runs on the message loop without going through the task queue manager.
Based on performance tests[1], almost all tasks on the renderer main
thread are already getting executed by the task queue manager, so I
believe this change shouldn't cause a regression.

Note that this version of the patch still uses a batch size of 1 while
we investigate some mac test failures triggered by larger batch sizes.

[1] https://docs.google.com/a/chromium.org/spreadsheets/d/1IJZpBabW1pr4fb2T8BlkleHcOvHYrjvmCx_dLesxfMA/edit#gid=1492760051

BUG=444764,451593,453898

Review URL: https://codereview.chromium.org/845543004

Cr-Commit-Position: refs/heads/master@{#314364}
parent ad46c41e
......@@ -45,6 +45,8 @@ RendererSchedulerImpl::RendererSchedulerImpl(
CONTROL_TASK_QUEUE, RendererTaskQueueSelector::CONTROL_PRIORITY);
renderer_task_queue_selector_->DisableQueue(IDLE_TASK_QUEUE);
task_queue_manager_->SetAutoPump(IDLE_TASK_QUEUE, false);
// TODO(skyostil): Increase this to 4 (crbug.com/444764).
task_queue_manager_->SetWorkBatchSize(1);
for (size_t i = 0; i < TASK_QUEUE_COUNT; i++) {
task_queue_manager_->SetQueueName(
......@@ -255,8 +257,15 @@ void RendererSchedulerImpl::EndIdlePeriod() {
renderer_task_queue_selector_->DisableQueue(IDLE_TASK_QUEUE);
}
void RendererSchedulerImpl::SetTimeSourceForTesting(
scoped_refptr<cc::TestNowSource> time_source) {
main_thread_checker_.CalledOnValidThread();
time_source_ = time_source;
task_queue_manager_->SetTimeSourceForTesting(time_source);
}
base::TimeTicks RendererSchedulerImpl::Now() const {
return gfx::FrameTime::Now();
return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now();
}
RendererSchedulerImpl::PollableNeedsUpdateFlag::PollableNeedsUpdateFlag(
......
......@@ -8,6 +8,7 @@
#include "base/atomicops.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_checker.h"
#include "cc/test/test_now_source.h"
#include "content/renderer/scheduler/cancelable_closure_holder.h"
#include "content/renderer/scheduler/renderer_scheduler.h"
#include "content/renderer/scheduler/single_thread_idle_task_runner.h"
......@@ -42,9 +43,7 @@ class CONTENT_EXPORT RendererSchedulerImpl : public RendererScheduler {
bool ShouldYieldForHighPriorityWork() override;
void Shutdown() override;
protected:
// Virtual for testing.
virtual base::TimeTicks Now() const;
void SetTimeSourceForTesting(scoped_refptr<cc::TestNowSource> time_source);
private:
friend class RendererSchedulerImplTest;
......@@ -116,6 +115,8 @@ class CONTENT_EXPORT RendererSchedulerImpl : public RendererScheduler {
void StartIdlePeriod();
void EndIdlePeriod();
base::TimeTicks Now() const;
base::ThreadChecker main_thread_checker_;
scoped_ptr<RendererTaskQueueSelector> renderer_task_queue_selector_;
scoped_ptr<TaskQueueManager> task_queue_manager_;
......@@ -139,6 +140,8 @@ class CONTENT_EXPORT RendererSchedulerImpl : public RendererScheduler {
base::TimeTicks last_input_time_;
PollableNeedsUpdateFlag policy_may_need_update_;
scoped_refptr<cc::TestNowSource> time_source_;
base::WeakPtr<RendererSchedulerImpl> weak_renderer_scheduler_ptr_;
base::WeakPtrFactory<RendererSchedulerImpl> weak_factory_;
......
......@@ -12,31 +12,18 @@
namespace content {
class RendererSchedulerImplForTest : public RendererSchedulerImpl {
public:
RendererSchedulerImplForTest(
scoped_refptr<cc::OrderedSimpleTaskRunner> task_runner,
scoped_refptr<cc::TestNowSource> clock)
: RendererSchedulerImpl(task_runner), clock_(clock) {}
~RendererSchedulerImplForTest() override {}
protected:
base::TimeTicks Now() const override { return clock_->Now(); }
private:
scoped_refptr<cc::TestNowSource> clock_;
};
class RendererSchedulerImplTest : public testing::Test {
public:
RendererSchedulerImplTest()
: clock_(cc::TestNowSource::Create(5000)),
mock_task_runner_(new cc::OrderedSimpleTaskRunner(clock_, false)),
scheduler_(new RendererSchedulerImplForTest(mock_task_runner_, clock_)),
scheduler_(new RendererSchedulerImpl(mock_task_runner_)),
default_task_runner_(scheduler_->DefaultTaskRunner()),
compositor_task_runner_(scheduler_->CompositorTaskRunner()),
loading_task_runner_(scheduler_->LoadingTaskRunner()),
idle_task_runner_(scheduler_->IdleTaskRunner()) {}
idle_task_runner_(scheduler_->IdleTaskRunner()) {
scheduler_->SetTimeSourceForTesting(clock_);
}
~RendererSchedulerImplTest() override {}
void RunUntilIdle() { mock_task_runner_->RunUntilIdle(); }
......
......@@ -4,11 +4,18 @@
#include "content/renderer/scheduler/task_queue_manager.h"
#include <queue>
#include "base/bind.h"
#include "base/trace_event/trace_event.h"
#include "base/trace_event/trace_event_argument.h"
#include "cc/test/test_now_source.h"
#include "content/renderer/scheduler/task_queue_selector.h"
namespace {
const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
}
namespace content {
namespace internal {
......@@ -40,7 +47,7 @@ class TaskQueue : public base::SingleThreadTaskRunner {
void SetAutoPump(bool auto_pump);
void PumpQueue();
bool UpdateWorkQueue();
bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task);
base::PendingTask TakeTaskFromWorkQueue();
void WillDeleteTaskQueueManager();
......@@ -74,6 +81,9 @@ class TaskQueue : public base::SingleThreadTaskRunner {
base::TaskQueue incoming_queue_;
bool auto_pump_;
const char* name_;
std::priority_queue<base::TimeTicks,
std::vector<base::TimeTicks>,
std::greater<base::TimeTicks>> delayed_task_run_times_;
base::TaskQueue work_queue_;
......@@ -113,6 +123,8 @@ bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here,
task_queue_manager_->DidQueueTask(&pending_task);
if (delay > base::TimeDelta()) {
pending_task.delayed_run_time = task_queue_manager_->Now() + delay;
delayed_task_run_times_.push(pending_task.delayed_run_time);
return task_queue_manager_->PostDelayedTask(
from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay);
}
......@@ -130,12 +142,16 @@ bool TaskQueue::IsQueueEmpty() const {
}
}
bool TaskQueue::UpdateWorkQueue() {
bool TaskQueue::UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task) {
if (!work_queue_.empty())
return true;
{
base::AutoLock lock(lock_);
if (!delayed_task_run_times_.empty()) {
*next_pending_delayed_task =
std::min(*next_pending_delayed_task, delayed_task_run_times_.top());
}
if (!auto_pump_ || incoming_queue_.empty())
return false;
work_queue_.Swap(&incoming_queue_);
......@@ -170,6 +186,17 @@ void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
if (auto_pump_ && incoming_queue_.empty())
task_queue_manager_->MaybePostDoWorkOnMainRunner();
incoming_queue_.push(pending_task);
if (!pending_task.delayed_run_time.is_null()) {
// Update the time of the next pending delayed task.
while (!delayed_task_run_times_.empty() &&
delayed_task_run_times_.top() <= pending_task.delayed_run_time) {
delayed_task_run_times_.pop();
}
// Clear the delayed run time because we've already applied the delay
// before getting here.
incoming_queue_.back().delayed_run_time = base::TimeTicks();
}
}
void TaskQueue::SetAutoPump(bool auto_pump) {
......@@ -245,6 +272,8 @@ TaskQueueManager::TaskQueueManager(
: main_task_runner_(main_task_runner),
selector_(selector),
pending_dowork_count_(0),
work_batch_size_(1),
time_source_(nullptr),
weak_factory_(this) {
DCHECK(main_task_runner->RunsTasksOnCurrentThread());
TRACE_EVENT_OBJECT_CREATED_WITH_ID(
......@@ -299,14 +328,21 @@ void TaskQueueManager::PumpQueue(size_t queue_index) {
queue->PumpQueue();
}
bool TaskQueueManager::UpdateWorkQueues() {
bool TaskQueueManager::UpdateWorkQueues(
base::TimeTicks* next_pending_delayed_task) {
// TODO(skyostil): This is not efficient when the number of queues grows very
// large due to the number of locks taken. Consider optimizing when we get
// there.
main_thread_checker_.CalledOnValidThread();
bool has_work = false;
for (auto& queue : queues_)
has_work |= queue->UpdateWorkQueue();
for (auto& queue : queues_) {
has_work |= queue->UpdateWorkQueue(next_pending_delayed_task);
if (!queue->work_queue().empty()) {
// Currently we should not be getting tasks with delayed run times in any
// of the work queues.
DCHECK(queue->work_queue().front().delayed_run_time.is_null());
}
}
return has_work;
}
......@@ -332,14 +368,26 @@ void TaskQueueManager::DoWork(bool posted_from_main_thread) {
DCHECK_GE(pending_dowork_count_, 0);
}
main_thread_checker_.CalledOnValidThread();
if (!UpdateWorkQueues())
return;
size_t queue_index;
if (!SelectWorkQueueToService(&queue_index))
return;
MaybePostDoWorkOnMainRunner();
ProcessTaskFromWorkQueue(queue_index);
base::TimeTicks next_pending_delayed_task(
base::TimeTicks::FromInternalValue(kMaxTimeTicks));
for (int i = 0; i < work_batch_size_; i++) {
if (!UpdateWorkQueues(&next_pending_delayed_task))
return;
// Interrupt the work batch if we should run the next delayed task.
if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks &&
Now() >= next_pending_delayed_task)
return;
size_t queue_index;
if (!SelectWorkQueueToService(&queue_index))
return;
// Note that this function won't post another call to DoWork if one is
// already pending, so it is safe to call it in a loop.
MaybePostDoWorkOnMainRunner();
ProcessTaskFromWorkQueue(queue_index);
}
}
bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) {
......@@ -388,6 +436,22 @@ void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) {
queue->set_name(name);
}
void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
main_thread_checker_.CalledOnValidThread();
DCHECK_GE(work_batch_size, 1);
work_batch_size_ = work_batch_size;
}
void TaskQueueManager::SetTimeSourceForTesting(
scoped_refptr<cc::TestNowSource> time_source) {
main_thread_checker_.CalledOnValidThread();
time_source_ = time_source;
}
base::TimeTicks TaskQueueManager::Now() const {
return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now();
}
scoped_refptr<base::debug::ConvertableToTraceFormat>
TaskQueueManager::AsValueWithSelectorResult(bool should_run,
size_t selected_queue) const {
......
......@@ -22,6 +22,10 @@ class TracedValue;
}
}
namespace cc {
class TestNowSource;
}
namespace content {
namespace internal {
class TaskQueue;
......@@ -80,6 +84,14 @@ class CONTENT_EXPORT TaskQueueManager {
// to a static string.
void SetQueueName(size_t queue_index, const char* name);
// Set the number of tasks executed in a single invocation of the task queue
// manager. Increasing the batch size can reduce the overhead of yielding
// back to the main message loop -- at the cost of potentially delaying other
// tasks posted to the main loop. The batch size is 1 by default.
void SetWorkBatchSize(int work_batch_size);
void SetTimeSourceForTesting(scoped_refptr<cc::TestNowSource> time_source);
private:
friend class internal::TaskQueue;
......@@ -97,7 +109,9 @@ class CONTENT_EXPORT TaskQueueManager {
// Reloads any empty work queues which have automatic pumping enabled.
// Returns true if any work queue has tasks after doing this.
bool UpdateWorkQueues();
// |next_pending_delayed_task| should be the time of the next known delayed
// task. It is updated if any task is found which should run earlier.
bool UpdateWorkQueues(base::TimeTicks* next_pending_delayed_task);
// Chooses the next work queue to service. Returns true if |out_queue_index|
// indicates the queue from which the next task should be run, false to
......@@ -118,6 +132,8 @@ class CONTENT_EXPORT TaskQueueManager {
base::TimeDelta delay);
internal::TaskQueue* Queue(size_t queue_index) const;
base::TimeTicks Now() const;
scoped_refptr<base::debug::ConvertableToTraceFormat>
AsValueWithSelectorResult(bool should_run, size_t selected_queue) const;
......@@ -135,6 +151,10 @@ class CONTENT_EXPORT TaskQueueManager {
// where re-entrant problems happen.
int pending_dowork_count_;
int work_batch_size_;
scoped_refptr<cc::TestNowSource> time_source_;
base::WeakPtrFactory<TaskQueueManager> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(TaskQueueManager);
......
......@@ -6,6 +6,7 @@
#include "base/test/test_simple_task_runner.h"
#include "base/threading/thread.h"
#include "cc/test/test_now_source.h"
#include "content/renderer/scheduler/task_queue_selector.h"
#include "testing/gmock/include/gmock/gmock.h"
......@@ -256,6 +257,31 @@ TEST_F(TaskQueueManagerTest, DelayedTaskPosting) {
EXPECT_THAT(run_order, ElementsAre(1));
}
TEST_F(TaskQueueManagerTest, DelayedTaskDoesNotStayDelayed) {
Initialize(1u);
std::vector<int> run_order;
scoped_refptr<base::SingleThreadTaskRunner> runner =
manager_->TaskRunnerForQueue(0);
selector_->AppendQueueToService(0);
base::TimeDelta delay(base::TimeDelta::FromMilliseconds(10));
runner->PostDelayedTask(FROM_HERE, base::Bind(&TestTask, 1, &run_order),
delay);
test_task_runner_->RunPendingTasks();
// Reload the work queue so we see the next pending task. It should no longer
// be marked as delayed.
manager_->PumpQueue(0);
EXPECT_TRUE(selector_->work_queues()[0]->front().delayed_run_time.is_null());
// Let the task run normally.
selector_->AppendQueueToService(0);
test_task_runner_->RunUntilIdle();
EXPECT_THAT(run_order, ElementsAre(1));
}
TEST_F(TaskQueueManagerTest, ManualPumping) {
Initialize(1u);
manager_->SetAutoPump(0, false);
......@@ -480,5 +506,81 @@ TEST_F(TaskQueueManagerTest, PostFromNestedRunloop) {
EXPECT_THAT(run_order, ElementsAre(0, 2, 1));
}
TEST_F(TaskQueueManagerTest, WorkBatching) {
Initialize(1u);
manager_->SetWorkBatchSize(2);
std::vector<int> run_order;
scoped_refptr<base::SingleThreadTaskRunner> runner =
manager_->TaskRunnerForQueue(0);
selector_->AppendQueueToService(0);
selector_->AppendQueueToService(0);
selector_->AppendQueueToService(0);
selector_->AppendQueueToService(0);
runner->PostTask(FROM_HERE, base::Bind(&TestTask, 1, &run_order));
runner->PostTask(FROM_HERE, base::Bind(&TestTask, 2, &run_order));
runner->PostTask(FROM_HERE, base::Bind(&TestTask, 3, &run_order));
runner->PostTask(FROM_HERE, base::Bind(&TestTask, 4, &run_order));
// Running one task in the host message loop should cause two posted tasks to
// get executed.
EXPECT_EQ(test_task_runner_->GetPendingTasks().size(), 1u);
test_task_runner_->RunPendingTasks();
EXPECT_THAT(run_order, ElementsAre(1, 2));
// The second task runs the remaining two posted tasks.
EXPECT_EQ(test_task_runner_->GetPendingTasks().size(), 1u);
test_task_runner_->RunPendingTasks();
EXPECT_THAT(run_order, ElementsAre(1, 2, 3, 4));
}
void AdvanceNowTestTask(int value,
std::vector<int>* out_result,
scoped_refptr<cc::TestNowSource> time_source,
base::TimeDelta delta) {
TestTask(value, out_result);
time_source->AdvanceNow(delta);
}
TEST_F(TaskQueueManagerTest, InterruptWorkBatchForDelayedTask) {
scoped_refptr<cc::TestNowSource> clock(cc::TestNowSource::Create());
Initialize(1u);
manager_->SetWorkBatchSize(2);
manager_->SetTimeSourceForTesting(clock);
std::vector<int> run_order;
scoped_refptr<base::SingleThreadTaskRunner> runner =
manager_->TaskRunnerForQueue(0);
selector_->AppendQueueToService(0);
selector_->AppendQueueToService(0);
selector_->AppendQueueToService(0);
base::TimeDelta delta(base::TimeDelta::FromMilliseconds(10));
runner->PostTask(
FROM_HERE, base::Bind(&AdvanceNowTestTask, 2, &run_order, clock, delta));
runner->PostTask(
FROM_HERE, base::Bind(&AdvanceNowTestTask, 3, &run_order, clock, delta));
base::TimeDelta delay(base::TimeDelta::FromMilliseconds(5));
runner->PostDelayedTask(FROM_HERE, base::Bind(&TestTask, 1, &run_order),
delay);
// At this point we have two posted tasks: one for DoWork and one of the
// delayed task. Only the first non-delayed task should get executed because
// the work batch is interrupted by the pending delayed task.
EXPECT_EQ(test_task_runner_->GetPendingTasks().size(), 2u);
test_task_runner_->RunPendingTasks();
EXPECT_THAT(run_order, ElementsAre(2));
// Running all remaining tasks should execute both pending tasks.
test_task_runner_->RunUntilIdle();
EXPECT_THAT(run_order, ElementsAre(2, 3, 1));
}
} // namespace
} // namespace content
......@@ -89,6 +89,9 @@ src:*/third_party/pdfium/core/src/fpdfapi/fpdf_render/fpdf_render_text.cpp
# obj/third_party/libwebm/libwebm.a(obj/third_party/libwebm/source/libwebm.mkvmuxer.o)(.data.rel..L__unnamed_2+0x18): error: undefined reference to 'typeinfo for mkvparser::IMkvReader'
src:*/third_party/libwebm/source/mkvmuxer.cpp
# obj/content/libcontent_renderer.a(obj/content/renderer/scheduler/content_renderer.renderer_scheduler_impl.o)(.data.rel..L__unnamed_399+0x18): error: undefined reference to ' typeinfo for cc::TestNowSource'
type:*TestNowSource*
#############################################################################
# UBSan seems to be emit false positives when virtual base classes are
# involved, see e.g. chromium:448102
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment