Commit a7566620 authored by Leonid Baraz's avatar Leonid Baraz Committed by Commit Bot

Define TaskRunnerContext for series of sequential tasks.

See also https://docs.google.com/document/d/1yixj-sTbcpZjZvWjytERfJ4ZnEyuntC67qgp9oyBeIU

Change-Id: Ib6b6e59b2013c004d1e5e43c05c1cc97b489e66c
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2191121Reviewed-by: default avatarPavol Marko <pmarko@chromium.org>
Reviewed-by: default avatarSergey Poromov <poromov@chromium.org>
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#775132}
parent abf20aba
......@@ -1273,6 +1273,7 @@ static_library("browser") {
"policy/messaging_layer/util/status_macros.h",
"policy/messaging_layer/util/statusor.cc",
"policy/messaging_layer/util/statusor.h",
"policy/messaging_layer/util/task_runner_context.h",
"policy/network_prediction_policy_handler.cc",
"policy/network_prediction_policy_handler.h",
"policy/profile_policy_connector.cc",
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROME_BROWSER_POLICY_MESSAGING_LAYER_UTIL_TASK_RUNNER_CONTEXT_H_
#define CHROME_BROWSER_POLICY_MESSAGING_LAYER_UTIL_TASK_RUNNER_CONTEXT_H_
#include <utility>
#include "base/bind.h"
#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_refptr.h"
#include "base/sequence_checker.h"
#include "base/sequenced_task_runner.h"
#include "base/time/time.h"
namespace reporting {
// This class defines refcounted context for multiple actions executed on
// a sequenced task runner with the ability to make asynchronous calls to
// other threads and resuming sequenced execution by calling |Schedule| or
// |ScheduleAfter|. Multiple actions can be scheduled at once; they will be
// executed on the same sequenced task runner. Ends execution when one of the
// actions calls |Response| (any previoudly scheduled action will still be
// executed after that, but it does not make much sense: it cannot call
// |Response| for the second time).
//
// Derived from RefCountedThreadSafe, because adding and releasing a reference
// may take place on different threads.
//
// Code snippet:
//
// Declaration:
// class SeriesOfActionsContext : public TaskRunnerContext<...> {
// public:
// SeriesOfActionsContext(
// ...,
// base::OnceCallback<void(...)> callback,
// scoped_refptr<base::SequencedTaskRunner> task_runner)
// : TaskRunnerContext<...>(std::move(callback),
// std::move(task_runner)) {}
//
// protected:
// // Context can only be deleted by calling Response method.
// ~SeriesOfActionsContext() override = default;
//
// private:
// void Action1(...) {
// ...
// if (...) {
// Response(...);
// return;
// }
// Schedule(&SeriesOfActionsContext::Action2, this, ...);
// ...
// ScheduleAfter(delay, &SeriesOfActionsContext::Action3, this, ...);
// }
//
// void OnStart() override { Action1(...); }
// };
//
// Usage:
// base::MakeRefCounted<SeriesOfActionsContext>(
// ...,
// returning_callback,
// base::SequencedTaskRunnerHandle::Get())->Start();
//
template <typename ResponseType>
class TaskRunnerContext
: public base::RefCountedThreadSafe<TaskRunnerContext<ResponseType>> {
public:
TaskRunnerContext(base::OnceCallback<void(ResponseType)> callback,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: callback_(std::move(callback)), task_runner_(std::move(task_runner)) {
// Constructor can be called from any thread.
DETACH_FROM_SEQUENCE(sequence_checker_);
}
TaskRunnerContext(const TaskRunnerContext& other) = delete;
TaskRunnerContext& operator=(const TaskRunnerContext& other) = delete;
// Starts execution (can be called from any thread to schedule the first
// action in the sequence).
void Start() {
// Hold to ourselves until Response() is called.
base::RefCountedThreadSafe<TaskRunnerContext<ResponseType>>::AddRef();
// Place actual start on the sequential task runner.
Schedule(&TaskRunnerContext<ResponseType>::OnStartWrap, this);
}
// Schedules next execution (can be called from any thread).
template <class Function, class... Args>
void Schedule(Function&& proc, Args&&... args) {
task_runner_->PostTask(FROM_HERE,
base::BindOnce(std::forward<Function>(proc),
std::forward<Args>(args)...));
}
// Schedules next execution with delay (can be called from any thread).
template <class Function, class... Args>
void ScheduleAfter(base::TimeDelta delay, Function&& proc, Args&&... args) {
task_runner_->PostDelayedTask(FROM_HERE,
base::BindOnce(std::forward<Function>(proc),
std::forward<Args>(args)...),
delay);
}
// Responds to the caller once completed the work sequence
// (can only be called by action scheduled to the sequenced task runner).
void Response(ResponseType result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
OnCompletion();
// Respond to the caller.
DCHECK(!callback_.is_null()) << "Already responded";
std::move(callback_).Run(result);
// Release reference taken by Start().
base::RefCountedThreadSafe<TaskRunnerContext<ResponseType>>::Release();
}
// Helper method checks that the caller runs on valid sequence.
// Can be used by any scheduled action.
// No need to call it by OnStart, OnCompletion and destructor.
// For non-debug builds it is a no-op.
void CheckOnValidSequence() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
}
protected:
// Context can only be deleted by calling Response method.
virtual ~TaskRunnerContext() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(callback_.is_null()) << "Released without responding to the caller";
}
private:
friend class base::RefCountedThreadSafe<TaskRunnerContext<ResponseType>>;
// Hook for execution start. Should be overridden to do non-trivial work.
virtual void OnStart() { Response(ResponseType()); }
// Finalization action before responding and deleting the context.
// May be overridden, if necessary.
virtual void OnCompletion() {}
// Wrapper for OnStart to mandate sequence checker.
void OnStartWrap() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
OnStart();
}
// User callback to deliver result.
base::OnceCallback<void(ResponseType)> callback_;
// Sequential task runner (guarantees that each action is executed
// sequentially in order of submission).
scoped_refptr<base::SequencedTaskRunner> task_runner_;
SEQUENCE_CHECKER(sequence_checker_);
};
} // namespace reporting
#endif // CHROME_BROWSER_POLICY_MESSAGING_LAYER_UTIL_TASK_RUNNER_CONTEXT_H_
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
#include <functional>
#include <vector>
#include "base/bind.h"
#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_refptr.h"
#include "base/sequenced_task_runner.h"
#include "base/synchronization/waitable_event.h"
#include "base/test/task_environment.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace reporting {
namespace {
class TaskRunner : public ::testing::Test {
protected:
base::test::TaskEnvironment task_environment_;
};
// This is the simplest test - runs one action only on a sequenced task runner.
TEST_F(TaskRunner, SingleAction) {
class SingleActionContext : public TaskRunnerContext<bool> {
public:
SingleActionContext(base::OnceCallback<void(bool)> callback,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: TaskRunnerContext<bool>(std::move(callback), std::move(task_runner)) {
}
protected:
// Context can only be deleted by calling Response method.
~SingleActionContext() override = default;
private:
void OnStart() override { Response(true); }
};
bool result = false;
// Created context reference is self-destruct upon completion of 'Start',
// but the context itself lives through until all tasks are done.
base::RunLoop run_loop;
base::MakeRefCounted<SingleActionContext>(
base::BindOnce(
[](base::RunLoop* run_loop, bool* var, bool val) {
*var = val;
run_loop->Quit();
},
&run_loop, &result),
base::SequencedTaskRunnerHandle::Get())
->Start();
run_loop.Run();
EXPECT_TRUE(result);
}
// This test runs a series of action on a sequenced task runner.
TEST_F(TaskRunner, SeriesOfActions) {
class SeriesOfActionsContext : public TaskRunnerContext<uint32_t> {
public:
SeriesOfActionsContext(uint32_t init_value,
base::OnceCallback<void(uint32_t)> callback,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: TaskRunnerContext<uint32_t>(std::move(callback),
std::move(task_runner)),
init_value_(init_value) {}
protected:
// Context can only be deleted by calling Response method.
~SeriesOfActionsContext() override = default;
private:
void Halve(uint32_t value, uint32_t log) {
CheckOnValidSequence();
if (value <= 1) {
Response(log);
return;
}
Schedule(&SeriesOfActionsContext::Halve, this, value / 2, log + 1);
}
void OnStart() override { Halve(init_value_, 0); }
const uint32_t init_value_;
};
uint32_t result = 0;
base::RunLoop run_loop;
base::MakeRefCounted<SeriesOfActionsContext>(
128,
base::BindOnce(
[](base::RunLoop* run_loop, uint32_t* var, uint32_t val) {
*var = val;
run_loop->Quit();
},
&run_loop, &result),
base::SequencedTaskRunnerHandle::Get())
->Start();
run_loop.Run();
EXPECT_EQ(result, 7u);
}
// This test runs the same series of actions injecting delays.
TEST_F(TaskRunner, SeriesOfDelays) {
class SeriesOfDelaysContext : public TaskRunnerContext<uint32_t> {
public:
SeriesOfDelaysContext(uint32_t init_value,
base::OnceCallback<void(uint32_t)> callback,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: TaskRunnerContext<uint32_t>(std::move(callback),
std::move(task_runner)),
init_value_(init_value),
delay_(base::TimeDelta::FromSecondsD(0.1)) {}
protected:
// Context can only be deleted by calling Response method.
~SeriesOfDelaysContext() override = default;
private:
void Halve(uint32_t value, uint32_t log) {
CheckOnValidSequence();
if (value <= 1) {
Response(log);
return;
}
delay_ += base::TimeDelta::FromSecondsD(0.1);
ScheduleAfter(delay_, &SeriesOfDelaysContext::Halve, this, value / 2,
log + 1);
}
void OnStart() override { Halve(init_value_, 0); }
const uint32_t init_value_;
base::TimeDelta delay_;
};
// Run on another thread, so that we can wait on the quit event here
// and avoid RunLoopIdle (which would exit on the first delay).
uint32_t result = 0;
base::RunLoop run_loop;
base::MakeRefCounted<SeriesOfDelaysContext>(
128,
base::BindOnce(
[](base::RunLoop* run_loop, uint32_t* var, uint32_t val) {
*var = val;
run_loop->Quit();
},
&run_loop, &result),
base::SequencedTaskRunnerHandle::Get())
->Start();
run_loop.Run();
EXPECT_EQ(result, 7u);
}
// This test runs the same series of actions offsetting them to a random threads
// and then taking control back to the sequenced task runner.
TEST_F(TaskRunner, SeriesOfAsyncs) {
class SeriesOfAsyncsContext : public TaskRunnerContext<uint32_t> {
public:
SeriesOfAsyncsContext(uint32_t init_value,
base::OnceCallback<void(uint32_t)> callback,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: TaskRunnerContext<uint32_t>(std::move(callback),
std::move(task_runner)),
init_value_(init_value),
delay_(base::TimeDelta::FromSecondsD(0.1)) {}
protected:
// Context can only be deleted by calling Response method.
~SeriesOfAsyncsContext() override = default;
private:
void Halve(uint32_t value, uint32_t log) {
CheckOnValidSequence();
if (value <= 1) {
Response(log);
return;
}
// Perform a calculation on a generic thread pool with delay,
// then get back to the sequence by calling Schedule from there.
delay_ += base::TimeDelta::FromSecondsD(0.1);
base::ThreadPool::PostDelayedTask(
FROM_HERE,
base::BindOnce(
[](uint32_t value, uint32_t log,
scoped_refptr<SeriesOfAsyncsContext> context) {
// Action executed asyncrhonously.
value /= 2;
++log;
// Getting back to the sequence.
context->Schedule(&SeriesOfAsyncsContext::Halve, context.get(),
value, log);
},
value, log, base::WrapRefCounted<SeriesOfAsyncsContext>(this)),
delay_);
}
void OnStart() override { Halve(init_value_, 0); }
const uint32_t init_value_;
base::TimeDelta delay_;
};
// Run on another thread, so that we can wait on the quit event here
// and avoid RunLoopIdle (which would exit on the first delay).
uint32_t result = 0;
base::RunLoop run_loop;
base::MakeRefCounted<SeriesOfAsyncsContext>(
128,
base::BindOnce(
[](base::RunLoop* run_loop, uint32_t* var, uint32_t val) {
*var = val;
run_loop->Quit();
},
&run_loop, &result),
base::SequencedTaskRunnerHandle::Get())
->Start();
run_loop.Run();
EXPECT_EQ(result, 7u);
}
// This test calculates Fibonacci as a tree of recurrent actions on a sequenced
// task runner. Note that 2 actions are scheduled in parallel.
TEST_F(TaskRunner, TreeOfActions) {
// Helper class accepts multiple 'AddIncoming' calls to add numbers,
// and invokes 'callback' when last reference to it is dropped.
class Summator : public base::RefCounted<Summator> {
public:
explicit Summator(base::OnceCallback<void(uint32_t)> callback)
: callback_(std::move(callback)) {}
void AddIncoming(uint32_t incoming) { result_ += incoming; }
protected:
virtual ~Summator() {
DCHECK(!callback_.is_null())
<< "Released without responding to the caller";
std::move(callback_).Run(result_);
}
private:
friend class base::RefCounted<Summator>;
uint32_t result_ = 0;
base::OnceCallback<void(uint32_t)> callback_;
};
// Context class for Fibonacci asynchronous recursion tree.
class TreeOfActionsContext : public TaskRunnerContext<uint32_t> {
public:
TreeOfActionsContext(uint32_t init_value,
base::OnceCallback<void(uint32_t)> callback,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: TaskRunnerContext<uint32_t>(std::move(callback),
std::move(task_runner)),
init_value_(init_value) {}
protected:
// Context can only be deleted by calling Response method.
~TreeOfActionsContext() override = default;
private:
void FibonacciSplit(uint32_t value, scoped_refptr<Summator> join) {
CheckOnValidSequence();
if (value < 2u) {
join->AddIncoming(value); // Fib(0) == 1, Fib(1) == 1
return; // No more actions to schedule.
}
// Schedule two asynchronous recursive calls.
// 'join' above will self-destruct once both callbacks complete
// and drop references to it. Each callback spawns additional
// callbacks, and when they complete, adds the results to its
// own 'Summator' instance.
for (const uint32_t subval : {value - 1, value - 2}) {
Schedule(&TreeOfActionsContext::FibonacciSplit, this, subval,
base::MakeRefCounted<Summator>(
base::BindOnce(&Summator::AddIncoming, join)));
}
}
void OnStart() override {
FibonacciSplit(init_value_, base::MakeRefCounted<Summator>(base::BindOnce(
&TreeOfActionsContext::Response, this)));
}
const uint32_t init_value_;
};
const std::vector<uint32_t> expected_fibo_results(
{0, 1, 1, 2, 3, 5, 8, 13, 21, 34,
55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181});
std::vector<uint32_t> actual_fibo_results(expected_fibo_results.size());
base::RunLoop run_loop;
size_t count = expected_fibo_results.size();
// Start all calculations (they will intermix on the same sequential runner).
for (uint32_t n = 0; n < expected_fibo_results.size(); ++n) {
uint32_t* const result = &actual_fibo_results[n];
*result = 0;
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(
[](size_t* count, base::RunLoop* run_loop, uint32_t n,
uint32_t* result) {
base::MakeRefCounted<TreeOfActionsContext>(
n,
base::BindOnce(
[](size_t* count, base::RunLoop* run_loop,
uint32_t* var, uint32_t val) {
*var = val;
if (!--*count) {
run_loop->Quit();
}
},
count, run_loop, result),
base::SequencedTaskRunnerHandle::Get())
->Start();
},
&count, &run_loop, n, result));
}
// Wait for it all to finish and compare the results.
run_loop.Run();
EXPECT_THAT(actual_fibo_results, ::testing::Eq(expected_fibo_results));
}
// This test runs a series of actions returning non-primitive object as a result
// (Status).
TEST_F(TaskRunner, ActionsWithStatus) {
class ActionsWithStatusContext : public TaskRunnerContext<Status> {
public:
ActionsWithStatusContext(
const std::vector<Status>& vector,
base::OnceCallback<void(Status)> callback,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: TaskRunnerContext<Status>(std::move(callback),
std::move(task_runner)),
vector_(vector) {}
protected:
// Context can only be deleted by calling Response method.
~ActionsWithStatusContext() override = default;
private:
void Pick(size_t index) {
CheckOnValidSequence();
if (index < vector_.size()) {
if (vector_[index].ok()) {
Schedule(&ActionsWithStatusContext::Pick, this, index + 1);
return;
}
Response(vector_[index]);
return;
}
Response(Status(error::OUT_OF_RANGE, "All statuses are OK"));
}
void OnStart() override { Pick(0); }
const std::vector<Status> vector_;
};
Status result(error::UNKNOWN, "Not yet set");
base::RunLoop run_loop;
base::MakeRefCounted<ActionsWithStatusContext>(
std::vector<Status>({Status::StatusOK(), Status::StatusOK(),
Status::StatusOK(),
Status(error::CANCELLED, "Cancelled"),
Status::StatusOK(), Status::StatusOK()}),
base::BindOnce(
[](base::RunLoop* run_loop, Status* result, Status res) {
*result = res;
run_loop->Quit();
},
&run_loop, &result),
base::SequencedTaskRunnerHandle::Get())
->Start();
run_loop.Run();
EXPECT_EQ(result, Status(error::CANCELLED, "Cancelled"));
}
// This test runs a series of actions returning non-primitive object as a result
// (StatusOr<scoped_ptr<...>>).
TEST_F(TaskRunner, ActionsWithStatusOrPtr) {
class RefCountedValue : public base::RefCounted<RefCountedValue> {
public:
explicit RefCountedValue(int value) : value_(value) {}
int value() const { return value_; }
private:
friend class base::RefCounted<RefCountedValue>;
~RefCountedValue() = default;
const int value_;
};
using StatusOrPtr = StatusOr<scoped_refptr<RefCountedValue>>;
class ActionsWithStatusOrContext : public TaskRunnerContext<StatusOrPtr> {
public:
ActionsWithStatusOrContext(
const std::vector<StatusOrPtr>& vector,
base::OnceCallback<void(StatusOrPtr)> callback,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: TaskRunnerContext<StatusOrPtr>(std::move(callback),
std::move(task_runner)),
vector_(vector) {}
protected:
// Context can only be deleted by calling Response method.
~ActionsWithStatusOrContext() override = default;
private:
void Pick(size_t index) {
CheckOnValidSequence();
if (index < vector_.size()) {
if (!vector_[index].ok()) {
Schedule(&ActionsWithStatusOrContext::Pick, this, index + 1);
return;
}
Response(vector_[index]);
return;
}
Response(Status(error::OUT_OF_RANGE, "All statuses are OK"));
}
void OnStart() override { Pick(0); }
const std::vector<StatusOrPtr> vector_;
};
const int kI = 0;
StatusOrPtr result;
base::RunLoop run_loop;
base::MakeRefCounted<ActionsWithStatusOrContext>(
std::vector<StatusOrPtr>({Status(error::CANCELLED, "Cancelled"),
Status(error::CANCELLED, "Cancelled"),
Status(error::CANCELLED, "Cancelled"),
Status(error::CANCELLED, "Cancelled"),
Status(error::CANCELLED, "Cancelled"),
base::MakeRefCounted<RefCountedValue>(kI)}),
base::BindOnce(
[](base::RunLoop* run_loop, StatusOrPtr* result, StatusOrPtr res) {
*result = std::move(res);
run_loop->Quit();
},
&run_loop, &result),
base::SequencedTaskRunnerHandle::Get())
->Start();
run_loop.Run();
EXPECT_TRUE(result.ok()) << result.status();
EXPECT_EQ(result.ValueOrDie()->value(), kI);
}
} // namespace
} // namespace reporting
......@@ -3310,6 +3310,7 @@ test("unit_tests") {
"../browser/policy/messaging_layer/util/status_macros_unittest.cc",
"../browser/policy/messaging_layer/util/status_unittest.cc",
"../browser/policy/messaging_layer/util/statusor_unittest.cc",
"../browser/policy/messaging_layer/util/task_runner_context_unittest.cc",
"../browser/policy/profile_policy_connector_unittest.cc",
"../browser/policy/webusb_allow_devices_for_urls_policy_handler_unittest.cc",
"../browser/predictors/autocomplete_action_predictor_table_unittest.cc",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment