Commit 9ad7a116 authored by Alexander Timin's avatar Alexander Timin Committed by Commit Bot

[scheduler] Make TaskQueueImpl own next_scheduled_wake_up information.

Refactor the scheme where TaskQueueImpl notifies TimeDomain about a
delayed wake up and TimeDomain notifies TaskQueueImpl about next wake-up for it.

Make TaskQueueImpl be the ground truth for delayed wake-ups for the
particular queue and task TimeDomain with aggregating wake-ups across
different queues.

R=alexclarke@chromium.org,skyostil@chromium.org

Change-Id: I7d56d35e9c7b5b2bb1240a6a659d0d9a1f00c473
Reviewed-on: https://chromium-review.googlesource.com/955847Reviewed-by: default avatarAlex Clarke <alexclarke@chromium.org>
Commit-Queue: Alexander Timin <altimin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#543730}
parent 9de81aba
......@@ -9,9 +9,9 @@
namespace blink {
namespace scheduler {
base::TimeTicks LazyNow::Now() {
if (now_.is_null())
if (!now_)
now_ = tick_clock_->NowTicks();
return now_;
return now_.value();
}
} // namespace scheduler
......
......@@ -5,6 +5,7 @@
#ifndef THIRD_PARTY_WEBKIT_SOURCE_PLATFORM_SCHEDULER_BASE_LAZY_NOW_H_
#define THIRD_PARTY_WEBKIT_SOURCE_PLATFORM_SCHEDULER_BASE_LAZY_NOW_H_
#include "base/optional.h"
#include "base/time/time.h"
#include "platform/PlatformExport.h"
......@@ -20,7 +21,6 @@ namespace scheduler {
class PLATFORM_EXPORT LazyNow {
public:
explicit LazyNow(base::TimeTicks now) : tick_clock_(nullptr), now_(now) {
DCHECK(!now.is_null());
}
explicit LazyNow(base::TickClock* tick_clock) : tick_clock_(tick_clock) {}
......@@ -30,7 +30,7 @@ class PLATFORM_EXPORT LazyNow {
private:
base::TickClock* tick_clock_; // NOT OWNED
base::TimeTicks now_;
base::Optional<base::TimeTicks> now_;
};
} // namespace scheduler
......
......@@ -254,20 +254,11 @@ TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTaskImpl(
void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
Task pending_task, base::TimeTicks now) {
DelayedWakeUp wake_up = pending_task.delayed_wake_up();
main_thread_only().task_queue_manager->DidQueueTask(pending_task);
main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
// If |pending_task| is at the head of the queue, then make sure a wake-up
// is requested if the queue is enabled. Note we still want to schedule a
// wake-up even if blocked by a fence, because we'd break throttling logic
// otherwise.
DelayedWakeUp new_wake_up =
main_thread_only().delayed_incoming_queue.top().delayed_wake_up();
if (wake_up.time == new_wake_up.time &&
wake_up.sequence_num == new_wake_up.sequence_num) {
ScheduleDelayedWorkInTimeDomain(now);
}
LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow();
UpdateDelayedWakeUp(&lazy_now);
TraceQueueSize();
}
......@@ -420,16 +411,23 @@ bool TaskQueueImpl::HasTaskToRunImmediately() const {
return !immediate_incoming_queue().empty();
}
base::Optional<base::TimeTicks> TaskQueueImpl::GetNextScheduledWakeUp() {
base::Optional<TaskQueueImpl::DelayedWakeUp>
TaskQueueImpl::GetNextScheduledWakeUpImpl() {
// Note we don't scheduled a wake-up for disabled queues.
if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())
return base::nullopt;
return main_thread_only().delayed_incoming_queue.top().delayed_run_time;
return main_thread_only().delayed_incoming_queue.top().delayed_wake_up();
}
base::Optional<TaskQueueImpl::DelayedWakeUp>
TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) {
base::Optional<base::TimeTicks> TaskQueueImpl::GetNextScheduledWakeUp() {
base::Optional<DelayedWakeUp> wake_up = GetNextScheduledWakeUpImpl();
if (!wake_up)
return base::nullopt;
return wake_up->time;
}
void TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) {
// Enqueue all delayed tasks that should be running now, skipping any that
// have been canceled.
while (!main_thread_only().delayed_incoming_queue.empty()) {
......@@ -446,14 +444,18 @@ TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) {
main_thread_only().task_queue_manager->GetNextSequenceNumber());
main_thread_only().delayed_work_queue->Push(std::move(task));
main_thread_only().delayed_incoming_queue.pop();
}
// Make sure the next wake up is scheduled.
if (!main_thread_only().delayed_incoming_queue.empty()) {
return main_thread_only().delayed_incoming_queue.top().delayed_wake_up();
// Normally WakeUpForDelayedWork is called inside DoWork, but it also
// can be called elsewhere (e.g. tests and fast-path for posting
// delayed tasks). Ensure that there is a DoWork posting. No-op inside
// existing DoWork due to DoWork deduplication.
if (IsQueueEnabled() || !main_thread_only().current_fence) {
main_thread_only().task_queue_manager->MaybeScheduleImmediateWork(
FROM_HERE);
}
}
return base::nullopt;
UpdateDelayedWakeUp(lazy_now);
}
void TaskQueueImpl::TraceQueueSize() const {
......@@ -599,7 +601,13 @@ void TaskQueueImpl::SetTimeDomain(TimeDomain* time_domain) {
main_thread_only().time_domain = time_domain;
time_domain->RegisterQueue(this);
ScheduleDelayedWorkInTimeDomain(time_domain->Now());
LazyNow lazy_now = time_domain->CreateLazyNow();
// Clear scheduled wake up to ensure that new notifications are issued
// correctly.
// TODO(altimin): Remove this when we won't have to support changing time
// domains.
main_thread_only().scheduled_wake_up = base::nullopt;
UpdateDelayedWakeUp(&lazy_now);
}
TimeDomain* TaskQueueImpl::GetTimeDomain() const {
......@@ -836,6 +844,9 @@ void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) {
if (!main_thread_only().task_queue_manager)
return;
LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow();
UpdateDelayedWakeUp(&lazy_now);
if (enable) {
if (HasPendingImmediateWork() &&
!main_thread_only().on_next_wake_up_changed_callback.is_null()) {
......@@ -844,16 +855,12 @@ void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) {
base::TimeTicks());
}
ScheduleDelayedWorkInTimeDomain(main_thread_only().time_domain->Now());
// Note the selector calls TaskQueueManagerImpl::OnTaskQueueEnabled which
// posts a DoWork if needed.
// Note the selector calls TaskQueueManager::OnTaskQueueEnabled which posts
// a DoWork if needed.
main_thread_only()
.task_queue_manager->main_thread_only()
.selector.EnableQueue(this);
} else {
if (!main_thread_only().delayed_incoming_queue.empty())
main_thread_only().time_domain->CancelDelayedWork(this);
main_thread_only()
.task_queue_manager->main_thread_only()
.selector.DisableQueue(this);
......@@ -872,9 +879,6 @@ void TaskQueueImpl::SweepCanceledDelayedTasks(base::TimeTicks now) {
if (main_thread_only().delayed_incoming_queue.empty())
return;
base::TimeTicks first_task_runtime =
main_thread_only().delayed_incoming_queue.top().delayed_run_time;
// Remove canceled tasks.
std::priority_queue<Task> remaining_tasks;
while (!main_thread_only().delayed_incoming_queue.empty()) {
......@@ -887,13 +891,8 @@ void TaskQueueImpl::SweepCanceledDelayedTasks(base::TimeTicks now) {
main_thread_only().delayed_incoming_queue = std::move(remaining_tasks);
// Re-schedule delayed call to WakeUpForDelayedWork if needed.
if (main_thread_only().delayed_incoming_queue.empty()) {
main_thread_only().time_domain->CancelDelayedWork(this);
} else if (first_task_runtime !=
main_thread_only().delayed_incoming_queue.top().delayed_run_time) {
ScheduleDelayedWorkInTimeDomain(main_thread_only().time_domain->Now());
}
LazyNow lazy_now(now);
UpdateDelayedWakeUp(&lazy_now);
}
void TaskQueueImpl::PushImmediateIncomingTaskForTest(
......@@ -933,31 +932,31 @@ void TaskQueueImpl::SetOnNextWakeUpChangedCallback(
main_thread_only().on_next_wake_up_changed_callback = callback;
}
void TaskQueueImpl::ScheduleDelayedWorkInTimeDomain(base::TimeTicks now) {
if (!IsQueueEnabled())
return;
if (main_thread_only().delayed_incoming_queue.empty())
return;
main_thread_only().time_domain->ScheduleDelayedWork(
this, main_thread_only().delayed_incoming_queue.top().delayed_wake_up(),
now);
void TaskQueueImpl::UpdateDelayedWakeUp(LazyNow* lazy_now) {
return UpdateDelayedWakeUpImpl(lazy_now, GetNextScheduledWakeUpImpl());
}
void TaskQueueImpl::SetScheduledTimeDomainWakeUp(
base::Optional<base::TimeTicks> scheduled_time_domain_wake_up) {
main_thread_only().scheduled_time_domain_wake_up =
scheduled_time_domain_wake_up;
// If queue has immediate work an appropriate notification has already
// been issued.
if (!scheduled_time_domain_wake_up ||
main_thread_only().on_next_wake_up_changed_callback.is_null() ||
HasPendingImmediateWork())
void TaskQueueImpl::UpdateDelayedWakeUpImpl(
LazyNow* lazy_now,
base::Optional<TaskQueueImpl::DelayedWakeUp> wake_up) {
if (main_thread_only().scheduled_wake_up == wake_up)
return;
main_thread_only().scheduled_wake_up = wake_up;
if (wake_up &&
!main_thread_only().on_next_wake_up_changed_callback.is_null() &&
!HasPendingImmediateWork()) {
main_thread_only().on_next_wake_up_changed_callback.Run(wake_up->time);
}
main_thread_only().time_domain->ScheduleWakeUpForQueue(this, wake_up,
lazy_now);
}
main_thread_only().on_next_wake_up_changed_callback.Run(
scheduled_time_domain_wake_up.value());
void TaskQueueImpl::SetDelayedWakeUpForTesting(
base::Optional<TaskQueueImpl::DelayedWakeUp> wake_up) {
LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow();
UpdateDelayedWakeUpImpl(&lazy_now, wake_up);
}
bool TaskQueueImpl::HasPendingImmediateWork() {
......
......@@ -75,6 +75,14 @@ class PLATFORM_EXPORT TaskQueueImpl {
base::TimeTicks time;
int sequence_num;
bool operator!=(const DelayedWakeUp& other) const {
return time != other.time || other.sequence_num != sequence_num;
}
bool operator==(const DelayedWakeUp& other) const {
return !(*this != other);
}
bool operator<=(const DelayedWakeUp& other) const {
if (time == other.time) {
// Debug gcc builds can compare an element against itself.
......@@ -166,6 +174,7 @@ class PLATFORM_EXPORT TaskQueueImpl {
size_t GetNumberOfPendingTasks() const;
bool HasTaskToRunImmediately() const;
base::Optional<base::TimeTicks> GetNextScheduledWakeUp();
base::Optional<DelayedWakeUp> GetNextScheduledWakeUpImpl();
void SetQueuePriority(TaskQueue::QueuePriority priority);
TaskQueue::QueuePriority GetQueuePriority() const;
void AddTaskObserver(base::MessageLoop::TaskObserver* task_observer);
......@@ -200,14 +209,6 @@ class PLATFORM_EXPORT TaskQueueImpl {
void NotifyWillProcessTask(const base::PendingTask& pending_task);
void NotifyDidProcessTask(const base::PendingTask& pending_task);
// Called by TimeDomain when the wake-up for this queue has changed.
// There is only one wake-up, new wake-up cancels any previous wake-ups.
// If |scheduled_time_domain_wake_up| is base::nullopt then the wake-up
// has been cancelled.
// Must be called from the main thread.
void SetScheduledTimeDomainWakeUp(
base::Optional<base::TimeTicks> scheduled_time_domain_wake_up);
// Check for available tasks in immediate work queues.
// Used to check if we need to generate notifications about delayed work.
bool HasPendingImmediateWork();
......@@ -229,13 +230,9 @@ class PLATFORM_EXPORT TaskQueueImpl {
}
// Enqueues any delayed tasks which should be run now on the
// |delayed_work_queue|. Returns the subsequent wake-up that is required, if
// any. Must be called from the main thread.
base::Optional<DelayedWakeUp> WakeUpForDelayedWork(LazyNow* lazy_now);
base::Optional<base::TimeTicks> scheduled_time_domain_wake_up() const {
return main_thread_only().scheduled_time_domain_wake_up;
}
// |delayed_work_queue|.
// Must be called from the main thread.
void WakeUpForDelayedWork(LazyNow* lazy_now);
HeapHandle heap_handle() const { return main_thread_only().heap_handle; }
......@@ -296,6 +293,9 @@ class PLATFORM_EXPORT TaskQueueImpl {
// constructed due to not having TaskQueue.
void SetQueueEnabledForTest(bool enabled);
protected:
void SetDelayedWakeUpForTesting(base::Optional<DelayedWakeUp> wake_up);
private:
friend class WorkQueue;
friend class WorkQueueTest;
......@@ -340,9 +340,11 @@ class PLATFORM_EXPORT TaskQueueImpl {
base::trace_event::BlameContext* blame_context; // Not owned.
EnqueueOrder current_fence;
base::Optional<base::TimeTicks> delayed_fence;
base::Optional<base::TimeTicks> scheduled_time_domain_wake_up;
OnTaskStartedHandler on_task_started_handler;
OnTaskCompletedHandler on_task_completed_handler;
// Last reported wake up, used only in UpdateWakeUp to avoid
// excessive calls.
base::Optional<DelayedWakeUp> scheduled_wake_up;
// If false, queue will be disabled. Used only for tests.
bool is_enabled_for_test;
};
......@@ -392,7 +394,9 @@ class PLATFORM_EXPORT TaskQueueImpl {
void EnableOrDisableWithSelector(bool enable);
// Schedules delayed work on time domain and calls the observer.
void ScheduleDelayedWorkInTimeDomain(base::TimeTicks now);
void UpdateDelayedWakeUp(LazyNow* lazy_now);
void UpdateDelayedWakeUpImpl(LazyNow* lazy_now,
base::Optional<DelayedWakeUp> wake_up);
// Activate a delayed fence if a time has come.
void ActivateDelayedFenceIfNeeded(base::TimeTicks now);
......
......@@ -27,57 +27,49 @@ void TimeDomain::UnregisterQueue(internal::TaskQueueImpl* queue) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_EQ(queue->GetTimeDomain(), this);
CancelDelayedWork(queue);
LazyNow lazy_now = CreateLazyNow();
ScheduleWakeUpForQueue(queue, base::nullopt, &lazy_now);
}
void TimeDomain::ScheduleDelayedWork(
void TimeDomain::ScheduleWakeUpForQueue(
internal::TaskQueueImpl* queue,
internal::TaskQueueImpl::DelayedWakeUp wake_up,
base::TimeTicks now) {
base::Optional<internal::TaskQueueImpl::DelayedWakeUp> wake_up,
LazyNow* lazy_now) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_EQ(queue->GetTimeDomain(), this);
DCHECK(queue->IsQueueEnabled());
// We only want to store a single wake-up per queue, so we need to remove any
// previously registered wake up for |queue|.
if (queue->heap_handle().IsValid()) {
DCHECK(queue->scheduled_time_domain_wake_up());
// O(log n)
delayed_wake_up_queue_.ChangeKey(queue->heap_handle(), {wake_up, queue});
DCHECK(queue->IsQueueEnabled() || !wake_up);
base::Optional<base::TimeTicks> previous_wake_up;
if (!delayed_wake_up_queue_.empty())
previous_wake_up = delayed_wake_up_queue_.Min().wake_up.time;
if (wake_up) {
// Insert a new wake-up into the heap.
if (queue->heap_handle().IsValid()) {
// O(log n)
delayed_wake_up_queue_.ChangeKey(queue->heap_handle(),
{wake_up.value(), queue});
} else {
// O(log n)
delayed_wake_up_queue_.insert({wake_up.value(), queue});
}
} else {
// O(log n)
delayed_wake_up_queue_.insert({wake_up, queue});
// Remove a wake-up from heap if present.
if (queue->heap_handle().IsValid())
delayed_wake_up_queue_.erase(queue->heap_handle());
}
queue->SetScheduledTimeDomainWakeUp(wake_up.time);
// If |queue| is the first wake-up then request the wake-up.
if (delayed_wake_up_queue_.Min().queue == queue)
RequestWakeUpAt(now, wake_up.time);
}
base::Optional<base::TimeTicks> new_wake_up;
if (!delayed_wake_up_queue_.empty())
new_wake_up = delayed_wake_up_queue_.Min().wake_up.time;
void TimeDomain::CancelDelayedWork(internal::TaskQueueImpl* queue) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_EQ(queue->GetTimeDomain(), this);
// If no wake-up has been requested then bail out.
if (!queue->heap_handle().IsValid())
if (previous_wake_up == new_wake_up)
return;
DCHECK(queue->scheduled_time_domain_wake_up());
DCHECK(!delayed_wake_up_queue_.empty());
base::TimeTicks prev_first_wake_up =
delayed_wake_up_queue_.Min().wake_up.time;
// O(log n)
delayed_wake_up_queue_.erase(queue->heap_handle());
if (delayed_wake_up_queue_.empty()) {
CancelWakeUpAt(prev_first_wake_up);
} else if (prev_first_wake_up != delayed_wake_up_queue_.Min().wake_up.time) {
CancelWakeUpAt(prev_first_wake_up);
RequestWakeUpAt(Now(), delayed_wake_up_queue_.Min().wake_up.time);
}
if (previous_wake_up)
CancelWakeUpAt(previous_wake_up.value());
if (new_wake_up)
RequestWakeUpAt(lazy_now->Now(), new_wake_up.value());
}
void TimeDomain::WakeUpReadyDelayedQueues(LazyNow* lazy_now) {
......@@ -88,18 +80,7 @@ void TimeDomain::WakeUpReadyDelayedQueues(LazyNow* lazy_now) {
while (!delayed_wake_up_queue_.empty() &&
delayed_wake_up_queue_.Min().wake_up.time <= lazy_now->Now()) {
internal::TaskQueueImpl* queue = delayed_wake_up_queue_.Min().queue;
base::Optional<internal::TaskQueueImpl::DelayedWakeUp> next_wake_up =
queue->WakeUpForDelayedWork(lazy_now);
if (next_wake_up) {
// O(log n)
delayed_wake_up_queue_.ReplaceMin({*next_wake_up, queue});
queue->SetScheduledTimeDomainWakeUp(next_wake_up->time);
} else {
// O(log n)
delayed_wake_up_queue_.Pop();
DCHECK(!queue->scheduled_time_domain_wake_up());
}
queue->WakeUpForDelayedWork(lazy_now);
}
}
......
......@@ -69,16 +69,10 @@ class PLATFORM_EXPORT TimeDomain {
// the next task was posted to and it returns true. Returns false otherwise.
bool NextScheduledTaskQueue(internal::TaskQueueImpl** out_task_queue) const;
// Schedules a call to TaskQueueImpl::WakeUpForDelayedWork when this
// TimeDomain reaches |delayed_run_time|. This supersedes any previously
// registered wake-up for |queue|.
void ScheduleDelayedWork(internal::TaskQueueImpl* queue,
internal::TaskQueueImpl::DelayedWakeUp wake_up,
base::TimeTicks now);
// Cancels any scheduled calls to TaskQueueImpl::WakeUpForDelayedWork for
// |queue|.
void CancelDelayedWork(internal::TaskQueueImpl* queue);
void ScheduleWakeUpForQueue(
internal::TaskQueueImpl* queue,
base::Optional<internal::TaskQueueImpl::DelayedWakeUp> wake_up,
LazyNow* lazy_now);
// Registers the |queue|.
void RegisterQueue(internal::TaskQueueImpl* queue);
......@@ -132,9 +126,6 @@ class PLATFORM_EXPORT TimeDomain {
void ClearHeapHandle() {
DCHECK(queue->heap_handle().IsValid());
queue->set_heap_handle(HeapHandle());
DCHECK(queue->scheduled_time_domain_wake_up());
queue->SetScheduledTimeDomainWakeUp(base::nullopt);
}
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment