Commit 6410ee3d authored by Alex Clarke's avatar Alex Clarke Committed by Commit Bot

Second try at using LazilyDeallocatedDeque instead of circular_deque in TaskQueue

Previous patch: https://chromium-review.googlesource.com/c/chromium/src/+/1080792

Our usage pattern is unfortunate for existing queues such as
base::circular_deque.  We tend to fill up an empty queue and then drain all
those tasks until it's empty.  This means the queue yo-yos in size which
confuses the memory reclamation schemes of most queues.

As an optimisation we introduce a deque specialised for TaskQueueManager's
usage patterns. For performance (memory allocation isn't free) we don't
automatically reclaim memory when the queue becomes empty.  Instead we rely
on the surrounding code periodically calling MaybeShrinkQueue, ideally when
the queue is empty.

We keep track of the maximum recent queue size and rate limit
how often MaybeShrinkQueue actually shrinks the buffer to avoid unnecessary
churn.

This yields a nice win on our microbenchmark:

Patch: us/run for 10000 delayed tasks with N queues
1 queue            4 queues           8 queues           32 queues
33448.166666666664 33215.75496688742   33484.34           34018.37414965987
33972.18243243243  33846.91891891892   34489.737931034484 34727.90277777778
33367.90666666667  33167.54304635762   33392.96           33906.89864864865
33392.13333333333  33107.17763157895   33340.18           33718.73825503356
37921.01515151515  39379.06299212598   38851.27906976744  39366.03125
38171.564885496184 37401.72388059701   37640.32330827068  37800.51127819549
34691.2275862069   34359.61643835616   34993.468531468534 35366.795774647886
35981.20863309353  35089.18881118881   38530.230769230766 39280.3515625
39262.8671875      36411.384057971016  33576.10067114094  33939.69594594595
37913.59848484849  38324.076335877864  38061.59848484849  39921.00793650794
Average 35812.1871 35430.24471         35636.02188        36204.63076

ToT: us/run for 10000 delayed tasks with N queues
1 queue            4 queues           8 queues           32 queues
40459.540322580644 40536.04838709677  38994.573643410855 38696.2
39422.149606299216 39299.5            37888.18939393939  37874.74436090225
38419.70229007633  38025.742424242424 37844.41353383459  38020.469696969696
35052.72027972028  38147.80303030303  35504.89361702128  34138.02721088436
37096.77777777778  34942.541666666664 37003.529411764706 37579.60447761194
38818.67441860465  38233.068702290075 37978.628787878784 37867.57142857143
38455.49618320611  37903.05303030303  38106.143939393936 38129.5
40609.33064516129  37721.75187969925  34656.441379310345 34294.33561643836
35273.704225352114 34646.324137931035 34335.643835616436 34311.82876712329
35821.41428571429  35362.035211267605 37522.27611940299  35429.281690140844
Average 37942.951  37481.78685        36983.47337        36634.15632

Percentage improvement
5.61570422	5.473437399	3.643388159	1.172472933

NB the reason the improvement goes down with the number of queues is because
we're saving malloc overhead in the queue, but a constant number of tasks are
posted across N queues.  This means the more queues we have in this test, the
less loaded the queues are individually.

Change-Id: I975d7f864dc55715fb9f949ef65321da93e4cef4
Reviewed-on: https://chromium-review.googlesource.com/1169043Reviewed-by: default avatarSami Kyöstilä <skyostil@chromium.org>
Commit-Queue: Alex Clarke <alexclarke@chromium.org>
Cr-Commit-Position: refs/heads/master@{#582586}
parent c63621c5
...@@ -341,6 +341,10 @@ void TaskQueueImpl::ReloadEmptyImmediateQueue(TaskDeque* queue) { ...@@ -341,6 +341,10 @@ void TaskQueueImpl::ReloadEmptyImmediateQueue(TaskDeque* queue) {
AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_); AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
queue->swap(immediate_incoming_queue()); queue->swap(immediate_incoming_queue());
// Since |immediate_incoming_queue| is empty, now is a good time to consider
// reducing it's capacity if we're wasting memory.
immediate_incoming_queue().MaybeShrinkQueue();
// Activate delayed fence if necessary. This is ideologically similar to // Activate delayed fence if necessary. This is ideologically similar to
// ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted
// from any thread we can't generate an enqueue order for the fence there, // from any thread we can't generate an enqueue order for the fence there,
...@@ -518,6 +522,13 @@ void TaskQueueImpl::AsValueInto(TimeTicks now, ...@@ -518,6 +522,13 @@ void TaskQueueImpl::AsValueInto(TimeTicks now,
state->SetInteger("delayed_work_queue_size", state->SetInteger("delayed_work_queue_size",
main_thread_only().delayed_work_queue->Size()); main_thread_only().delayed_work_queue->Size());
state->SetInteger("immediate_incoming_queue_capacity",
immediate_incoming_queue().capacity());
state->SetInteger("immediate_work_queue_capacity",
immediate_work_queue()->Capacity());
state->SetInteger("delayed_work_queue_capacity",
delayed_work_queue()->Capacity());
if (!main_thread_only().delayed_incoming_queue.empty()) { if (!main_thread_only().delayed_incoming_queue.empty()) {
TimeDelta delay_to_next_task = TimeDelta delay_to_next_task =
(main_thread_only().delayed_incoming_queue.top().delayed_run_time - (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
...@@ -879,6 +890,9 @@ void TaskQueueImpl::SweepCanceledDelayedTasks(TimeTicks now) { ...@@ -879,6 +890,9 @@ void TaskQueueImpl::SweepCanceledDelayedTasks(TimeTicks now) {
main_thread_only().delayed_incoming_queue = std::move(remaining_tasks); main_thread_only().delayed_incoming_queue = std::move(remaining_tasks);
// Also consider shrinking the work queue if it's wasting memory.
main_thread_only().delayed_work_queue->MaybeShrinkQueue();
LazyNow lazy_now(now); LazyNow lazy_now(now);
UpdateDelayedWakeUp(&lazy_now); UpdateDelayedWakeUp(&lazy_now);
} }
......
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
#include <set> #include <set>
#include "base/callback.h" #include "base/callback.h"
#include "base/containers/circular_deque.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop.h"
...@@ -393,7 +392,7 @@ class BASE_EXPORT TaskQueueImpl { ...@@ -393,7 +392,7 @@ class BASE_EXPORT TaskQueueImpl {
// empty. // empty.
void PushOntoImmediateIncomingQueueLocked(Task task); void PushOntoImmediateIncomingQueueLocked(Task task);
using TaskDeque = circular_deque<Task>; using TaskDeque = LazilyDeallocatedDeque<Task>;
// Extracts all the tasks from the immediate incoming queue and swaps it with // Extracts all the tasks from the immediate incoming queue and swaps it with
// |queue| which must be empty. // |queue| which must be empty.
......
...@@ -67,7 +67,7 @@ void WorkQueue::Push(TaskQueueImpl::Task task) { ...@@ -67,7 +67,7 @@ void WorkQueue::Push(TaskQueueImpl::Task task) {
#endif #endif
// Make sure the |enqueue_order()| is monotonically increasing. // Make sure the |enqueue_order()| is monotonically increasing.
DCHECK(was_empty || tasks_.rbegin()->enqueue_order() < task.enqueue_order()); DCHECK(was_empty || tasks_.back().enqueue_order() < task.enqueue_order());
// Amoritized O(1). // Amoritized O(1).
tasks_.push_back(std::move(task)); tasks_.push_back(std::move(task));
...@@ -133,9 +133,16 @@ TaskQueueImpl::Task WorkQueue::TakeTaskFromWorkQueue() { ...@@ -133,9 +133,16 @@ TaskQueueImpl::Task WorkQueue::TakeTaskFromWorkQueue() {
TaskQueueImpl::Task pending_task = std::move(tasks_.front()); TaskQueueImpl::Task pending_task = std::move(tasks_.front());
tasks_.pop_front(); tasks_.pop_front();
// NB immediate tasks have a different pipeline to delayed ones. // NB immediate tasks have a different pipeline to delayed ones.
if (queue_type_ == QueueType::kImmediate && tasks_.empty()) { if (tasks_.empty()) {
// Short-circuit the queue reload so that OnPopQueue does the right thing. // NB delayed tasks are inserted via Push, no don't need to reload those.
task_queue_->ReloadEmptyImmediateQueue(&tasks_); if (queue_type_ == QueueType::kImmediate) {
// Short-circuit the queue reload so that OnPopQueue does the right
// thing.
task_queue_->ReloadEmptyImmediateQueue(&tasks_);
}
// Since the queue is empty, now is a good time to consider reducing it's
// capacity if we're wasting memory.
tasks_.MaybeShrinkQueue();
} }
// OnPopQueue calls GetFrontTaskEnqueueOrder which checks BlockedByFence() so // OnPopQueue calls GetFrontTaskEnqueueOrder which checks BlockedByFence() so
...@@ -154,10 +161,16 @@ bool WorkQueue::RemoveAllCanceledTasksFromFront() { ...@@ -154,10 +161,16 @@ bool WorkQueue::RemoveAllCanceledTasksFromFront() {
task_removed = true; task_removed = true;
} }
if (task_removed) { if (task_removed) {
// NB immediate tasks have a different pipeline to delayed ones. if (tasks_.empty()) {
if (queue_type_ == QueueType::kImmediate && tasks_.empty()) { // NB delayed tasks are inserted via Push, no don't need to reload those.
// Short-circuit the queue reload so that OnPopQueue does the right thing. if (queue_type_ == QueueType::kImmediate) {
task_queue_->ReloadEmptyImmediateQueue(&tasks_); // Short-circuit the queue reload so that OnPopQueue does the right
// thing.
task_queue_->ReloadEmptyImmediateQueue(&tasks_);
}
// Since the queue is empty, now is a good time to consider reducing it's
// capacity if we're wasting memory.
tasks_.MaybeShrinkQueue();
} }
work_queue_sets_->OnPopQueue(this); work_queue_sets_->OnPopQueue(this);
task_queue_->TraceQueueSize(); task_queue_->TraceQueueSize();
...@@ -231,6 +244,10 @@ void WorkQueue::PopTaskForTesting() { ...@@ -231,6 +244,10 @@ void WorkQueue::PopTaskForTesting() {
tasks_.pop_front(); tasks_.pop_front();
} }
void WorkQueue::MaybeShrinkQueue() {
tasks_.MaybeShrinkQueue();
}
} // namespace internal } // namespace internal
} // namespace sequence_manager } // namespace sequence_manager
} // namespace base } // namespace base
...@@ -77,6 +77,8 @@ class BASE_EXPORT WorkQueue { ...@@ -77,6 +77,8 @@ class BASE_EXPORT WorkQueue {
size_t Size() const { return tasks_.size(); } size_t Size() const { return tasks_.size(); }
size_t Capacity() const { return tasks_.capacity(); }
// Pulls a task off the |tasks_| and informs the WorkQueueSets. If the // Pulls a task off the |tasks_| and informs the WorkQueueSets. If the
// task removed had an enqueue order >= the current fence then WorkQueue // task removed had an enqueue order >= the current fence then WorkQueue
// pretends to be empty as far as the WorkQueueSets is concerned. // pretends to be empty as far as the WorkQueueSets is concerned.
...@@ -130,6 +132,9 @@ class BASE_EXPORT WorkQueue { ...@@ -130,6 +132,9 @@ class BASE_EXPORT WorkQueue {
// Test support function. This should not be used in production code. // Test support function. This should not be used in production code.
void PopTaskForTesting(); void PopTaskForTesting();
// Shrinks |tasks_| if it's wasting memory.
void MaybeShrinkQueue();
private: private:
bool InsertFenceImpl(EnqueueOrder fence); bool InsertFenceImpl(EnqueueOrder fence);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment