Commit 36985c8e authored by Josh Karlin's avatar Josh Karlin Committed by Commit Bot

Prioritize replies as well as tasks

What:
PrioritizedTaskRunner currently runs its tasks (from RunTaskAndReply) in prioritized order but not the replies. This CL adds a priority queue for replies to run in order as well.

Why:
At least for the cache, the reply task is equally important to the posted task and should run in priority order.


Bug: 856674
Change-Id: Icea37adb96891473292e0c38805c42e84f9d4f5f
Reviewed-on: https://chromium-review.googlesource.com/1114921Reviewed-by: default avatarMaks Orlovich <morlovich@chromium.org>
Commit-Queue: Josh Karlin <jkarlin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#570781}
parent 92746870
......@@ -41,41 +41,52 @@ void PrioritizedTaskRunner::PostTaskAndReply(const base::Location& from_here,
Job job(from_here, std::move(task), std::move(reply), priority,
task_count_++);
{
base::AutoLock lock(job_heap_lock_);
job_heap_.push_back(std::move(job));
std::push_heap(job_heap_.begin(), job_heap_.end(), JobComparer());
base::AutoLock lock(task_job_heap_lock_);
task_job_heap_.push_back(std::move(job));
std::push_heap(task_job_heap_.begin(), task_job_heap_.end(), JobComparer());
}
Job* out_job = new Job();
task_runner_->PostTaskAndReply(
from_here,
base::BindOnce(&PrioritizedTaskRunner::RunPostTaskAndReply, this,
out_job),
base::BindOnce(&PrioritizedTaskRunner::RunReply, this,
base::Owned(out_job)));
}
PrioritizedTaskRunner::Job PrioritizedTaskRunner::PopJob() {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
base::AutoLock lock(job_heap_lock_);
std::pop_heap(job_heap_.begin(), job_heap_.end(), JobComparer());
Job result = std::move(job_heap_.back());
job_heap_.pop_back();
return result;
base::BindOnce(&PrioritizedTaskRunner::RunPostTaskAndReply, this),
base::BindOnce(&PrioritizedTaskRunner::RunReply, this));
}
PrioritizedTaskRunner::~PrioritizedTaskRunner() = default;
void PrioritizedTaskRunner::RunPostTaskAndReply(Job* out_job) {
void PrioritizedTaskRunner::RunPostTaskAndReply() {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
*out_job = PopJob();
std::move(out_job->task).Run();
// Find the next job to run.
Job job;
{
base::AutoLock lock(task_job_heap_lock_);
std::pop_heap(task_job_heap_.begin(), task_job_heap_.end(), JobComparer());
job = std::move(task_job_heap_.back());
task_job_heap_.pop_back();
}
std::move(job.task).Run();
// Add the job to the reply priority queue.
base::AutoLock reply_lock(reply_job_heap_lock_);
reply_job_heap_.push_back(std::move(job));
std::push_heap(reply_job_heap_.begin(), reply_job_heap_.end(), JobComparer());
}
void PrioritizedTaskRunner::RunReply(Job* job) {
std::move(job->reply).Run();
void PrioritizedTaskRunner::RunReply() {
// Find the next job to run.
Job job;
{
base::AutoLock lock(reply_job_heap_lock_);
std::pop_heap(reply_job_heap_.begin(), reply_job_heap_.end(),
JobComparer());
job = std::move(reply_job_heap_.back());
reply_job_heap_.pop_back();
}
// Run the job.
std::move(job.reply).Run();
}
} // namespace net
......@@ -38,17 +38,22 @@ void ReplyAdapter(base::OnceCallback<void(ReplyArgType)> callback,
}
} // namespace internal
// PrioritizedTaskRunner allows for prioritization of posted tasks within a
// single TaskRunner. Be careful, as it is possible to starve a task.
// PrioritizedTaskRunner allows for prioritization of posted tasks and their
// replies. It provides up to 2^32 priority levels. All tasks posted via the
// PrioritizedTaskRunner will run in priority order. All replies from
// PostTaskAndReply will also run in priority order. Be careful, as it is
// possible to starve a task.
class NET_EXPORT_PRIVATE PrioritizedTaskRunner
: public base::RefCountedThreadSafe<PrioritizedTaskRunner> {
public:
enum class ReplyRunnerType { kStandard, kPrioritized };
PrioritizedTaskRunner(scoped_refptr<base::TaskRunner> task_runner);
// Similar to TaskRunner::PostTaskAndReply, except that the task runs at
// |priority|. Priority 0 is the highest priority and will run before other
// priority values. Multiple tasks with the same |priority| value are run in
// order of posting.
// order of posting. The replies are also run in prioritized order on the
// calling taskrunner.
void PostTaskAndReply(const base::Location& from_here,
base::OnceClosure task,
base::OnceClosure reply,
......@@ -106,20 +111,18 @@ class NET_EXPORT_PRIVATE PrioritizedTaskRunner
}
};
// Pops the next task from the heap.
Job PopJob();
void RunPostTaskAndReply(Job* out_job);
void RunReply(Job* job);
void RunPostTaskAndReply();
void RunReply();
~PrioritizedTaskRunner();
// TODO(jkarlin): Replace the heap with a std::priority_queue once it
// TODO(jkarlin): Replace the heaps with std::priority_queue once it
// supports move-only types.
// Accessed on both task_runner_ and the reply task runner.
std::vector<Job> job_heap_;
base::Lock job_heap_lock_;
std::vector<Job> task_job_heap_;
base::Lock task_job_heap_lock_;
std::vector<Job> reply_job_heap_;
base::Lock reply_job_heap_lock_;
// Accessed on the reply task runner.
scoped_refptr<base::TaskRunner> task_runner_;
......
......@@ -59,6 +59,25 @@ class PrioritizedTaskRunnerTest : public testing::Test {
return out;
}
// Adds a task to the task runner and waits for it to execute.
void ProcessTaskRunner(base::TaskRunner* task_runner) {
// Use a waitable event instead of a run loop as we need to be careful not
// to run any tasks on this task runner while waiting.
base::WaitableEvent waitable_event;
task_runner->PostTask(FROM_HERE,
base::BindOnce(
[](base::WaitableEvent* waitable_event) {
waitable_event->Signal();
},
&waitable_event));
base::ScopedAllowBaseSyncPrimitivesForTesting sync;
waitable_event.Wait();
}
// Adds a task to the |task_runner|, forcing it to wait for a conditional.
// Call ReleaseTaskRunner to continue.
void BlockTaskRunner(base::TaskRunner* task_runner) {
waitable_event_.Reset();
......@@ -70,6 +89,8 @@ class PrioritizedTaskRunnerTest : public testing::Test {
base::BindOnce(wait_function, &waitable_event_));
}
// Signals the task runner's conditional so that it can continue after calling
// BlockTaskRunner.
void ReleaseTaskRunner() { waitable_event_.Signal(); }
protected:
......@@ -167,6 +188,54 @@ TEST_F(PrioritizedTaskRunnerTest, PostTaskAndReplyTestPriority) {
ReplyOrder());
}
// Ensure that replies are run in priority order.
TEST_F(PrioritizedTaskRunnerTest, PostTaskAndReplyTestReplyPriority) {
auto task_runner =
base::CreateSequencedTaskRunnerWithTraits(base::TaskTraits());
auto prioritized_task_runner =
base::MakeRefCounted<PrioritizedTaskRunner>(task_runner);
// Add a couple of tasks to run right away, but don't run their replies yet.
BlockTaskRunner(task_runner.get());
prioritized_task_runner->PostTaskAndReply(
FROM_HERE,
base::BindOnce(&PrioritizedTaskRunnerTest::PushName,
base::Unretained(this), "Task2"),
base::BindOnce(&PrioritizedTaskRunnerTest::PushName,
base::Unretained(this), "Reply2"),
2);
prioritized_task_runner->PostTaskAndReply(
FROM_HERE,
base::BindOnce(&PrioritizedTaskRunnerTest::PushName,
base::Unretained(this), "Task1"),
base::BindOnce(&PrioritizedTaskRunnerTest::PushName,
base::Unretained(this), "Reply1"),
1);
ReleaseTaskRunner();
// Run the current tasks (but not their replies).
ProcessTaskRunner(task_runner.get());
// Now post task 0 (highest priority) and run it. None of the replies have
// been processed yet, so its reply should skip to the head of the queue.
prioritized_task_runner->PostTaskAndReply(
FROM_HERE,
base::BindOnce(&PrioritizedTaskRunnerTest::PushName,
base::Unretained(this), "Task0"),
base::BindOnce(&PrioritizedTaskRunnerTest::PushName,
base::Unretained(this), "Reply0"),
0);
ProcessTaskRunner(task_runner.get());
// Run the replies.
scoped_task_environment_.RunUntilIdle();
EXPECT_EQ((std::vector<std::string>{"Task1", "Task2", "Task0"}), TaskOrder());
EXPECT_EQ((std::vector<std::string>{"Reply0", "Reply1", "Reply2"}),
ReplyOrder());
}
TEST_F(PrioritizedTaskRunnerTest, PriorityOverflow) {
auto task_runner =
base::CreateSequencedTaskRunnerWithTraits(base::TaskTraits());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment