Commit 17f533a1 authored by Alexander Timin's avatar Alexander Timin Committed by Commit Bot

Quick Cleanup for TaskQueueManager

BUG=783309

Change-Id: Ibb2fe849a2ef7d1bec48fbacea14d12cea783bd7
Reviewed-on: https://chromium-review.googlesource.com/763875
Commit-Queue: Alexander Timin <altimin@chromium.org>
Reviewed-by: default avatarAlexander Timin <altimin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#525370}
parent baf650e8
......@@ -21,15 +21,26 @@
#include "platform/scheduler/base/work_queue.h"
#include "platform/scheduler/base/work_queue_sets.h"
static const double kLongTaskTraceEventThreshold = 0.05;
namespace blink {
namespace scheduler {
namespace {
const double kLongTaskTraceEventThreshold = 0.05;
double MonotonicTimeInSeconds(base::TimeTicks time_ticks) {
return (time_ticks - base::TimeTicks()).InSecondsF();
}
void SweepCanceledDelayedTasksInQueue(
internal::TaskQueueImpl* queue,
std::map<TimeDomain*, base::TimeTicks>* time_domain_now) {
TimeDomain* time_domain = queue->GetTimeDomain();
if (time_domain_now->find(time_domain) == time_domain_now->end())
time_domain_now->insert(std::make_pair(time_domain, time_domain->Now()));
queue->SweepCanceledDelayedTasks(time_domain_now->at(time_domain));
}
} // namespace
TaskQueueManager::TaskQueueManager(
......@@ -37,11 +48,6 @@ TaskQueueManager::TaskQueueManager(
: real_time_domain_(new RealTimeDomain()),
graceful_shutdown_helper_(new internal::GracefulQueueShutdownHelper()),
controller_(std::move(controller)),
task_was_run_on_quiescence_monitored_queue_(false),
work_batch_size_(1),
task_count_(0),
currently_executing_task_queue_(nullptr),
observer_(nullptr),
weak_factory_(this) {
// TODO(altimin): Create a sequence checker here.
DCHECK(controller_->RunsTasksInCurrentSequence());
......@@ -88,11 +94,6 @@ std::unique_ptr<TaskQueueManager> TaskQueueManager::TakeOverCurrentThread() {
base::DefaultTickClock::GetInstance())));
}
TaskQueueManager::AnyThread::AnyThread()
: do_work_running_count(0),
immediate_do_work_posted_count(0),
is_nested(false) {}
void TaskQueueManager::RegisterTimeDomain(TimeDomain* time_domain) {
time_domains_.insert(time_domain);
time_domain->OnRegisterWithTaskQueueManager(this);
......@@ -104,7 +105,7 @@ void TaskQueueManager::UnregisterTimeDomain(TimeDomain* time_domain) {
std::unique_ptr<internal::TaskQueueImpl> TaskQueueManager::CreateTaskQueueImpl(
const TaskQueue::Spec& spec) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
TimeDomain* time_domain =
spec.time_domain ? spec.time_domain : real_time_domain_.get();
DCHECK(time_domains_.find(time_domain) != time_domains_.end());
......@@ -116,7 +117,7 @@ std::unique_ptr<internal::TaskQueueImpl> TaskQueueManager::CreateTaskQueueImpl(
}
void TaskQueueManager::SetObserver(Observer* observer) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
observer_ = observer;
}
......@@ -124,7 +125,7 @@ void TaskQueueManager::UnregisterTaskQueueImpl(
std::unique_ptr<internal::TaskQueueImpl> task_queue) {
TRACE_EVENT1("renderer.scheduler", "TaskQueueManager::UnregisterTaskQueue",
"queue_name", task_queue->GetName());
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
selector_.RemoveQueue(task_queue.get());
......@@ -222,7 +223,7 @@ void TaskQueueManager::MaybeScheduleDelayedWork(
TimeDomain* requesting_time_domain,
base::TimeTicks now,
base::TimeTicks run_time) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
// Make sure we don't cancel another TimeDomain's wake-up.
DCHECK(!next_delayed_do_work_ ||
next_delayed_do_work_.time_domain() == requesting_time_domain);
......@@ -254,7 +255,7 @@ void TaskQueueManager::MaybeScheduleDelayedWork(
void TaskQueueManager::CancelDelayedWork(TimeDomain* requesting_time_domain,
base::TimeTicks run_time) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
if (next_delayed_do_work_.run_time() != run_time)
return;
......@@ -264,7 +265,7 @@ void TaskQueueManager::CancelDelayedWork(TimeDomain* requesting_time_domain,
}
void TaskQueueManager::DoWork(WorkType work_type) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
TRACE_EVENT1("renderer.scheduler", "TaskQueueManager::DoWork", "delayed",
work_type == WorkType::kDelayed);
......@@ -369,7 +370,7 @@ void TaskQueueManager::PostDoWorkContinuationLocked(
base::Optional<NextTaskDelay> next_delay,
LazyNow* lazy_now,
MoveableAutoLock lock) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
{
MoveableAutoLock auto_lock(std::move(lock));
......@@ -388,7 +389,7 @@ void TaskQueueManager::PostDoWorkContinuationLocked(
if (any_thread().immediate_do_work_posted_count > 0)
return;
if (next_delay->Delay() <= base::TimeDelta()) {
if (next_delay->delay() <= base::TimeDelta()) {
// If a delayed DoWork is pending then we don't need to post a
// continuation because it should run immediately.
if (next_delayed_do_work_ &&
......@@ -401,23 +402,23 @@ void TaskQueueManager::PostDoWorkContinuationLocked(
}
// We avoid holding |any_thread_lock_| while posting the task.
if (next_delay->Delay() <= base::TimeDelta()) {
if (next_delay->delay() <= base::TimeDelta()) {
controller_->ScheduleWork();
} else {
base::TimeTicks run_time = lazy_now->Now() + next_delay->Delay();
base::TimeTicks run_time = lazy_now->Now() + next_delay->delay();
if (next_delayed_do_work_.run_time() == run_time)
return;
next_delayed_do_work_ =
NextDelayedDoWork(run_time, next_delay->time_domain());
controller_->ScheduleDelayedWork(next_delay->Delay());
controller_->ScheduleDelayedWork(next_delay->delay());
}
}
base::Optional<TaskQueueManager::NextTaskDelay>
TaskQueueManager::ComputeDelayTillNextTaskLocked(LazyNow* lazy_now) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
// Unfortunately because |any_thread_lock_| is held it's not safe to call
// ReloadEmptyWorkQueues here (possible lock order inversion), however this
......@@ -471,7 +472,7 @@ TaskQueueManager::ProcessTaskResult TaskQueueManager::ProcessTaskFromWorkQueue(
bool is_nested,
LazyNow time_before_task,
base::TimeTicks* time_after_task) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
base::WeakPtr<TaskQueueManager> protect = GetWeakPtr();
internal::TaskQueueImpl::Task pending_task =
work_queue->TakeTaskFromWorkQueue();
......@@ -625,32 +626,32 @@ void TaskQueueManager::NotifyDidProcessTaskObservers(
}
void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
DCHECK_GE(work_batch_size, 1);
work_batch_size_ = work_batch_size;
}
void TaskQueueManager::AddTaskObserver(
base::MessageLoop::TaskObserver* task_observer) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
task_observers_.AddObserver(task_observer);
}
void TaskQueueManager::RemoveTaskObserver(
base::MessageLoop::TaskObserver* task_observer) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
task_observers_.RemoveObserver(task_observer);
}
void TaskQueueManager::AddTaskTimeObserver(
TaskTimeObserver* task_time_observer) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
task_time_observers_.AddObserver(task_time_observer);
}
void TaskQueueManager::RemoveTaskTimeObserver(
TaskTimeObserver* task_time_observer) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
task_time_observers_.RemoveObserver(task_time_observer);
}
......@@ -679,7 +680,7 @@ std::unique_ptr<base::trace_event::ConvertableToTraceFormat>
TaskQueueManager::AsValueWithSelectorResult(
bool should_run,
internal::WorkQueue* selected_work_queue) const {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
std::unique_ptr<base::trace_event::TracedValue> state(
new base::trace_event::TracedValue());
base::TimeTicks now = real_time_domain()->CreateLazyNow().Now();
......@@ -726,7 +727,7 @@ TaskQueueManager::AsValueWithSelectorResult(
}
void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
DCHECK(queue->IsQueueEnabled());
// Only schedule DoWork if there's something to do.
if (queue->HasTaskToRunImmediately() && !queue->BlockedByFence())
......@@ -735,7 +736,7 @@ void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) {
void TaskQueueManager::OnTriedToSelectBlockedWorkQueue(
internal::WorkQueue* work_queue) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
DCHECK(!work_queue->Empty());
if (observer_)
observer_->OnTriedToExecuteBlockedTask();
......@@ -745,19 +746,6 @@ bool TaskQueueManager::HasImmediateWorkForTesting() const {
return !selector_.EnabledWorkQueuesEmpty();
}
namespace {
void SweepCanceledDelayedTasksInQueue(
internal::TaskQueueImpl* queue,
std::map<TimeDomain*, base::TimeTicks>* time_domain_now) {
TimeDomain* time_domain = queue->GetTimeDomain();
if (time_domain_now->find(time_domain) == time_domain_now->end())
time_domain_now->insert(std::make_pair(time_domain, time_domain->Now()));
queue->SweepCanceledDelayedTasks(time_domain_now->at(time_domain));
}
} // namespace
void TaskQueueManager::SweepCanceledDelayedTasks() {
std::map<TimeDomain*, base::TimeTicks> time_domain_now;
for (const auto& queue : active_queues_)
......
......@@ -33,14 +33,16 @@ class ConvertableToTraceFormat;
namespace blink {
namespace scheduler {
namespace task_queue_manager_unittest {
class TaskQueueManagerTest;
}
namespace internal {
class TaskQueueImpl;
class ThreadController;
} // namespace internal
namespace task_queue_manager_unittest {
class TaskQueueManagerTest;
} // namespace task_queue_manager_unittest
class LazyNow;
class RealTimeDomain;
class TaskQueue;
......@@ -53,7 +55,7 @@ class TimeDomain;
//
// 1. Incoming task queue. Tasks that are posted get immediately appended here.
// When a task is appended into an empty incoming queue, the task manager
// work function (DoWork) is scheduled to run on the main task runner.
// work function (DoWork()) is scheduled to run on the main task runner.
//
// 2. Work queue. If a work queue is empty when DoWork() is entered, tasks from
// the incoming task queue (if any) are moved here. The work queues are
......@@ -66,6 +68,18 @@ class PLATFORM_EXPORT TaskQueueManager
public internal::TaskQueueSelector::Observer,
public base::RunLoop::NestingObserver {
public:
// Observer class that is called back on the main thread.
class PLATFORM_EXPORT Observer {
public:
virtual ~Observer() {}
virtual void OnTriedToExecuteBlockedTask() = 0;
virtual void OnBeginNestedRunLoop() = 0;
virtual void OnExitNestedRunLoop() = 0;
};
~TaskQueueManager() override;
// Assume direct control over current thread and create a TaskQueueManager.
......@@ -86,7 +100,7 @@ class PLATFORM_EXPORT TaskQueueManager
// Requests that a task to process work is posted on the main task runner.
// These tasks are de-duplicated in two buckets: main-thread and all other
// threads. This distinction is done to reduce the overehead from locks, we
// threads. This distinction is done to reduce the overhead from locks, we
// assume the main-thread path will be hot.
void MaybeScheduleImmediateWork(const base::Location& from_here);
......@@ -131,20 +145,9 @@ class PLATFORM_EXPORT TaskQueueManager
return task_queue;
}
class PLATFORM_EXPORT Observer {
public:
virtual ~Observer() {}
virtual void OnTriedToExecuteBlockedTask() = 0;
virtual void OnBeginNestedRunLoop() = 0;
virtual void OnExitNestedRunLoop() = 0;
};
// Called once to set the Observer. This function is called on the main
// thread. If |observer| is null, then no callbacks will occur.
// Note |observer| is expected to outlive the SchedulerHelper.
// Note: |observer| is expected to outlive the SchedulerHelper.
void SetObserver(Observer* observer);
// Time domains must be registered for the task queues to get updated.
......@@ -158,7 +161,7 @@ class PLATFORM_EXPORT TaskQueueManager
// Returns the currently executing TaskQueue if any. Must be called on the
// thread this class was created on.
internal::TaskQueueImpl* currently_executing_task_queue() const {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
return currently_executing_task_queue_;
}
......@@ -214,7 +217,7 @@ class PLATFORM_EXPORT TaskQueueManager
DCHECK(time_domain);
}
base::TimeDelta Delay() const { return delay_; }
base::TimeDelta delay() const { return delay_; }
TimeDomain* time_domain() const { return time_domain_; }
bool operator>(const NextTaskDelay& other) const {
......@@ -266,12 +269,35 @@ class PLATFORM_EXPORT TaskQueueManager
TimeDomain* time_domain_;
};
// TaskQueueSelector::Observer implementation:
enum class ProcessTaskResult {
kDeferred,
kExecuted,
kTaskQueueManagerDeleted,
};
using IncomingImmediateWorkMap =
std::unordered_map<internal::TaskQueueImpl*, internal::EnqueueOrder>;
struct AnyThread {
AnyThread() = default;
// Task queues with newly available work on the incoming queue.
IncomingImmediateWorkMap has_incoming_immediate_work;
int do_work_running_count = 0;
int immediate_do_work_posted_count = 0;
// Whether or not the message loop is currently nested.
bool is_nested = false;
};
// TODO(alexclarke): Add a MainThreadOnly struct too.
// TaskQueueSelector::Observer:
void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override;
void OnTriedToSelectBlockedWorkQueue(
internal::WorkQueue* work_queue) override;
// base::RunLoop::NestingObserver implementation:
// base::RunLoop::NestingObserver:
void OnBeginNestedRunLoop() override;
// Called by the task queue to register a new pending task.
......@@ -280,7 +306,7 @@ class PLATFORM_EXPORT TaskQueueManager
// Use the selector to choose a pending task and run it.
void DoWork(WorkType work_type);
// Post a DoWork continuation if |next_delay| is not empty.
// Post a DoWork() continuation if |next_delay| is not empty.
void PostDoWorkContinuationLocked(base::Optional<NextTaskDelay> next_delay,
LazyNow* lazy_now,
MoveableAutoLock lock);
......@@ -294,15 +320,9 @@ class PLATFORM_EXPORT TaskQueueManager
// avoid running any tasks.
bool SelectWorkQueueToService(internal::WorkQueue** out_work_queue);
enum class ProcessTaskResult {
kDeferred,
kExecuted,
kTaskQueueManagerDeleted,
};
// Runs a single nestable task from the |queue|. On exit, |out_task| will
// contain the task which was executed. Non-nestable task are reposted on the
// run loop. The queue must not be empty. On exit |time_after_task| may get
// run loop. The queue must not be empty. On exit |time_after_task| may get
// set (not guaranteed), sampling |real_time_domain()->Now()| immediately
// after running the task.
ProcessTaskResult ProcessTaskFromWorkQueue(internal::WorkQueue* work_queue,
......@@ -345,9 +365,6 @@ class PLATFORM_EXPORT TaskQueueManager
internal::EnqueueOrder enqueue_order,
bool queue_is_blocked);
using IncomingImmediateWorkMap =
std::unordered_map<internal::TaskQueueImpl*, internal::EnqueueOrder>;
// Calls |ReloadImmediateWorkQueueIfEmpty| on all queues in
// |queues_to_reload|.
void ReloadEmptyWorkQueues(
......@@ -386,24 +403,11 @@ class PLATFORM_EXPORT TaskQueueManager
internal::EnqueueOrderGenerator enqueue_order_generator_;
base::debug::TaskAnnotator task_annotator_;
base::ThreadChecker main_thread_checker_;
THREAD_CHECKER(main_thread_checker_);
std::unique_ptr<internal::ThreadController> controller_;
internal::TaskQueueSelector selector_;
bool task_was_run_on_quiescence_monitored_queue_;
struct AnyThread {
AnyThread();
// Task queues with newly available work on the incoming queue.
IncomingImmediateWorkMap has_incoming_immediate_work;
int do_work_running_count;
int immediate_do_work_posted_count;
bool is_nested; // Whether or not the message loop is currently nested.
};
// TODO(alexclarke): Add a MainThreadOnly struct too.
bool task_was_run_on_quiescence_monitored_queue_ = false;
mutable base::Lock any_thread_lock_;
AnyThread any_thread_;
......@@ -419,16 +423,17 @@ class PLATFORM_EXPORT TaskQueueManager
NextDelayedDoWork next_delayed_do_work_;
int work_batch_size_;
size_t task_count_;
int work_batch_size_ = 1;
size_t task_count_ = 0;
base::ObserverList<base::MessageLoop::TaskObserver> task_observers_;
base::ObserverList<TaskTimeObserver> task_time_observers_;
internal::TaskQueueImpl* currently_executing_task_queue_; // NOT OWNED
// NOT OWNED
internal::TaskQueueImpl* currently_executing_task_queue_ = nullptr;
Observer* observer_; // NOT OWNED
Observer* observer_ = nullptr; // NOT OWNED
base::WeakPtrFactory<TaskQueueManager> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(TaskQueueManager);
......
......@@ -2619,23 +2619,23 @@ TEST_F(TaskQueueManagerTest, ComputeDelayTillNextTask) {
base::TimeDelta::FromSeconds(10));
EXPECT_EQ(base::TimeDelta::FromSeconds(10),
ComputeDelayTillNextTask(&lazy_now)->Delay());
ComputeDelayTillNextTask(&lazy_now)->delay());
runners_[1]->PostDelayedTask(FROM_HERE, base::Bind(&NopTask),
base::TimeDelta::FromSeconds(15));
EXPECT_EQ(base::TimeDelta::FromSeconds(10),
ComputeDelayTillNextTask(&lazy_now)->Delay());
ComputeDelayTillNextTask(&lazy_now)->delay());
runners_[1]->PostDelayedTask(FROM_HERE, base::Bind(&NopTask),
base::TimeDelta::FromSeconds(5));
EXPECT_EQ(base::TimeDelta::FromSeconds(5),
ComputeDelayTillNextTask(&lazy_now)->Delay());
ComputeDelayTillNextTask(&lazy_now)->delay());
runners_[0]->PostTask(FROM_HERE, base::Bind(&NopTask));
EXPECT_EQ(base::TimeDelta(), ComputeDelayTillNextTask(&lazy_now)->Delay());
EXPECT_EQ(base::TimeDelta(), ComputeDelayTillNextTask(&lazy_now)->delay());
}
TEST_F(TaskQueueManagerTest, ComputeDelayTillNextTask_Disabled) {
......@@ -2668,7 +2668,7 @@ TEST_F(TaskQueueManagerTest, ComputeDelayTillNextTask_FenceUnblocking) {
runners_[0]->InsertFence(TaskQueue::InsertFencePosition::kNow);
LazyNow lazy_now(&now_src_);
EXPECT_EQ(base::TimeDelta(), ComputeDelayTillNextTask(&lazy_now)->Delay());
EXPECT_EQ(base::TimeDelta(), ComputeDelayTillNextTask(&lazy_now)->delay());
}
TEST_F(TaskQueueManagerTest, ComputeDelayTillNextTask_DelayedTaskReady) {
......@@ -2680,7 +2680,7 @@ TEST_F(TaskQueueManagerTest, ComputeDelayTillNextTask_DelayedTaskReady) {
now_src_.Advance(base::TimeDelta::FromSeconds(10));
LazyNow lazy_now(&now_src_);
EXPECT_EQ(base::TimeDelta(), ComputeDelayTillNextTask(&lazy_now)->Delay());
EXPECT_EQ(base::TimeDelta(), ComputeDelayTillNextTask(&lazy_now)->delay());
}
TEST_F(TaskQueueManagerTest, PostDoWorkContinuation_NoMoreWork) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment