Commit ae9797c4 authored by Zach Trudo's avatar Zach Trudo Committed by Commit Bot

Add SharedQueue

SharedQueue wraps a std::queue with RefCountedThreadSafe to allow it to
be shared across threads. It utilizes a SequencedTaskRunner in order to
ensure that Pop and Push commands are done in order of arrival.

Bug: chromium:1078512
Change-Id: I57ec543fc3bf641ee16cd2e36fbe8f24ac082102
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2340709
Commit-Queue: Zach Trudo <zatrudo@google.com>
Reviewed-by: default avatarLeonid Baraz <lbaraz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#795720}
parent 9eaee75f
...@@ -1178,6 +1178,7 @@ static_library("browser") { ...@@ -1178,6 +1178,7 @@ static_library("browser") {
"policy/messaging_layer/upload/dm_server_upload_service.h", "policy/messaging_layer/upload/dm_server_upload_service.h",
"policy/messaging_layer/util/backoff_settings.cc", "policy/messaging_layer/util/backoff_settings.cc",
"policy/messaging_layer/util/backoff_settings.h", "policy/messaging_layer/util/backoff_settings.h",
"policy/messaging_layer/util/shared_queue.h",
"policy/messaging_layer/util/status.cc", "policy/messaging_layer/util/status.cc",
"policy/messaging_layer/util/status.h", "policy/messaging_layer/util/status.h",
"policy/messaging_layer/util/status_macros.h", "policy/messaging_layer/util/status_macros.h",
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROME_BROWSER_POLICY_MESSAGING_LAYER_UTIL_SHARED_QUEUE_H_
#define CHROME_BROWSER_POLICY_MESSAGING_LAYER_UTIL_SHARED_QUEUE_H_
#include <utility>
#include "base/containers/queue.h"
#include "base/memory/ref_counted.h"
#include "base/sequenced_task_runner.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
namespace reporting {
// SharedQueue wraps a |base::queue| and ensures access happens on a
// SequencedTaskRunner.
template <typename QueueType>
class SharedQueue : public base::RefCountedThreadSafe<SharedQueue<QueueType>> {
public:
static scoped_refptr<SharedQueue<QueueType>> Create() {
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner{
base::ThreadPool::CreateSequencedTaskRunner({})};
return base::WrapRefCounted(
new SharedQueue<QueueType>(sequenced_task_runner));
}
// Push will schedule a push of |item| onto the queue and call
// |push_complete_cb| once complete.
void Push(QueueType item, base::OnceCallback<void()> push_complete_cb) {
sequenced_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&SharedQueue::OnPush, this, std::move(item),
std::move(push_complete_cb)));
}
// Pop will schedule a pop off the queue and call |get_pop_cb| once complete.
// If the queue is empty, |get_pop_cb| will be called with
// error::OUT_OF_RANGE.
void Pop(base::OnceCallback<void(StatusOr<QueueType>)> get_pop_cb) {
sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&SharedQueue::OnPop, this, std::move(get_pop_cb)));
}
protected:
virtual ~SharedQueue() = default;
private:
friend class base::RefCountedThreadSafe<SharedQueue<QueueType>>;
explicit SharedQueue(
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
: sequenced_task_runner_(sequenced_task_runner) {}
void OnPush(QueueType item, base::OnceCallback<void()> push_complete_cb) {
queue_.push(std::move(item));
std::move(push_complete_cb).Run();
}
void OnPop(base::OnceCallback<void(StatusOr<QueueType>)> cb) {
if (queue_.empty()) {
std::move(cb).Run(Status(error::OUT_OF_RANGE, "Queue is empty"));
return;
}
QueueType item = std::move(queue_.front());
queue_.pop();
std::move(cb).Run(std::move(item));
}
// Used to monitor if the callback is in use or not.
base::queue<QueueType> queue_;
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
};
} // namespace reporting
#endif // CHROME_BROWSER_POLICY_MESSAGING_LAYER_UTIL_SHARED_QUEUE_H_
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/policy/messaging_layer/util/shared_queue.h"
#include "base/bind_helpers.h"
#include "base/sequenced_task_runner.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool.h"
#include "base/test/task_environment.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace reporting {
namespace {
class QueueTester {
public:
explicit QueueTester(scoped_refptr<SharedQueue<int>> queue)
: queue_(queue),
sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})),
completed_(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED),
pop_result_(Status(error::FAILED_PRECONDITION, "Pop hasn't run yet")) {}
~QueueTester() = default;
void Push(int value) { queue_->Push(value, base::DoNothing()); }
void Pop() {
sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&QueueTester::PopInternal, base::Unretained(this)));
}
void PushPop(int value) {
queue_->Push(value, base::BindOnce(&QueueTester::OnPushPopComplete,
base::Unretained(this)));
}
void Wait() {
completed_.Wait();
completed_.Reset();
}
StatusOr<int> pop_result() { return pop_result_; }
private:
void OnPushPopComplete() { Pop(); }
void PopInternal() {
queue_->Pop(
base::BindOnce(&QueueTester::OnPopComplete, base::Unretained(this)));
}
void OnPopComplete(StatusOr<int> pop_result) {
pop_result_ = pop_result;
completed_.Signal();
}
scoped_refptr<SharedQueue<int>> queue_;
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
base::WaitableEvent completed_;
StatusOr<int> pop_result_;
};
TEST(SharedQueueTest, SuccessfulPushPop) {
base::test::TaskEnvironment task_envrionment{
base::test::TaskEnvironment::TimeSource::MOCK_TIME};
const int kExpectedValue = 1234;
auto queue = SharedQueue<int>::Create();
QueueTester queue_tester(queue);
queue_tester.PushPop(kExpectedValue);
queue_tester.Wait();
auto pop_result = queue_tester.pop_result();
ASSERT_OK(pop_result);
EXPECT_EQ(pop_result.ValueOrDie(), kExpectedValue);
}
TEST(SharedQueueTest, PushOrderMaintained) {
base::test::TaskEnvironment task_envrionment{
base::test::TaskEnvironment::TimeSource::MOCK_TIME};
std::vector<int> kExpectedValues = {1, 1, 2, 3, 5, 8, 13, 21};
auto queue = SharedQueue<int>::Create();
QueueTester queue_tester(queue);
for (auto value : kExpectedValues) {
queue_tester.Push(value);
}
for (auto value : kExpectedValues) {
queue_tester.Pop();
queue_tester.Wait();
auto pop_result = queue_tester.pop_result();
ASSERT_OK(pop_result);
EXPECT_EQ(pop_result.ValueOrDie(), value);
}
}
} // namespace
} // namespace reporting
...@@ -3392,6 +3392,7 @@ test("unit_tests") { ...@@ -3392,6 +3392,7 @@ test("unit_tests") {
"../browser/policy/messaging_layer/storage/test_storage_module.cc", "../browser/policy/messaging_layer/storage/test_storage_module.cc",
"../browser/policy/messaging_layer/storage/test_storage_module.h", "../browser/policy/messaging_layer/storage/test_storage_module.h",
"../browser/policy/messaging_layer/upload/dm_server_upload_service_unittest.cc", "../browser/policy/messaging_layer/upload/dm_server_upload_service_unittest.cc",
"../browser/policy/messaging_layer/util/shared_queue_unittest.cc",
"../browser/policy/messaging_layer/util/status_macros_unittest.cc", "../browser/policy/messaging_layer/util/status_macros_unittest.cc",
"../browser/policy/messaging_layer/util/status_unittest.cc", "../browser/policy/messaging_layer/util/status_unittest.cc",
"../browser/policy/messaging_layer/util/statusor_unittest.cc", "../browser/policy/messaging_layer/util/statusor_unittest.cc",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment