Commit 88dca968 authored by Leonid Baraz's avatar Leonid Baraz Committed by Commit Bot

Refactor ReportingQueue.

Remove immediate Status returning, make fully asynchronous.

Bug: None
Change-Id: I6d8619e5a90641b645f6109a6a6e7db42e16aced
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2453489
Auto-Submit: Leonid Baraz <lbaraz@chromium.org>
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Commit-Queue: Zach Trudo <zatrudo@google.com>
Reviewed-by: default avatarZach Trudo <zatrudo@google.com>
Cr-Commit-Position: refs/heads/master@{#814324}
parent d681aaee
......@@ -23,32 +23,32 @@ class MockReportQueue : public ReportQueue {
MockReportQueue();
~MockReportQueue() override;
Status Enqueue(base::StringPiece record,
EnqueueCallback callback) const override {
return StringPieceEnqueue_(record, std::move(callback));
void Enqueue(base::StringPiece record,
EnqueueCallback callback) const override {
StringPieceEnqueue_(record, std::move(callback));
}
Status Enqueue(const base::Value& record,
EnqueueCallback callback) const override {
return ValueEnqueue_(record, std::move(callback));
void Enqueue(const base::Value& record,
EnqueueCallback callback) const override {
ValueEnqueue_(record, std::move(callback));
}
Status Enqueue(google::protobuf::MessageLite* record,
EnqueueCallback callback) const override {
return MessageLiteEnqueue_(record, std::move(callback));
void Enqueue(google::protobuf::MessageLite* record,
EnqueueCallback callback) const override {
MessageLiteEnqueue_(record, std::move(callback));
}
MOCK_METHOD(Status,
MOCK_METHOD(void,
StringPieceEnqueue_,
(base::StringPiece record, EnqueueCallback callback),
(const));
MOCK_METHOD(Status,
MOCK_METHOD(void,
ValueEnqueue_,
(const base::Value& record, EnqueueCallback callback),
(const));
MOCK_METHOD(Status,
MOCK_METHOD(void,
MessageLiteEnqueue_,
(google::protobuf::MessageLite * record,
EnqueueCallback callback),
......
......@@ -25,14 +25,45 @@ namespace reporting {
// It ensures that all ReportQueues are created with the same storage settings.
//
// Example Usage:
// Status SendMessage(google::protobuf::ImportantMessage important_message,
// base::OnceCallback<void(Status)> callback) {
// ASSIGN_OR_RETURN(std::unique_ptr<ReportQueueConfiguration> config,
// ReportQueueConfiguration::Create(...));
// ASSIGN_OR_RETURN(std::unique_ptr<ReportQueue> report_queue,
// ReportingClient::CreateReportQueue(config));
// return report_queue->Enqueue(important_message, callback);
// void SendMessage(google::protobuf::ImportantMessage important_message,
// reporting::ReportQueue::EnqueueCallback done_cb) {
// // Create configuration.
// auto config_result = reporting::ReportQueueConfiguration::Create(...);
// // Bail out if configuration failed to create.
// if (!config_result.ok()) {
// std::move(done_cb).Run(config_result.status());
// return;
// }
// // Asynchronously create ReportingQueue.
// base::ThreadPool::PostTask(
// FROM_HERE,
// base::BindOnce(
// [](google::protobuf::ImportantMessage important_message,
// reporting::ReportQueue::EnqueueCallback done_cb,
// std::unique_ptr<reporting::ReportQueueConfiguration> config) {
// // Asynchronously create ReportingQueue.
// reporting::ReportingClient::CreateReportQueue(
// std::move(config),
// base::BindOnce(
// [](base::StringPiece data,
// reporting::ReportQueue::EnqueueCallback done_cb,
// reporting::StatusOr<std::unique_ptr<
// reporting::ReportQueue>> report_queue_result) {
// // Bail out if queue failed to create.
// if (!report_queue_result.ok()) {
// std::move(done_cb).Run(report_queue_result.status());
// return;
// }
// // Queue created successfully, enqueue the message.
// report_queue_result.ValueOrDie()->Enqueue(
// important_message, std::move(done_cb));
// },
// important_message, std::move(done_cb)));
// },
// important_message, std::move(done_cb),
// std::move(config_result.ValueOrDie())))
// }
class ReportingClient {
public:
struct Configuration {
......
......@@ -34,23 +34,43 @@ using policy::DMToken;
using reporting::Destination;
using reporting::Priority;
class TestCallbackWaiter {
// Usage (in tests only):
//
// TestEvent<ResType> e;
// ... Do some async work passing e.cb() as a completion callback of
// base::OnceCallback<void(ResType* res)> type which also may perform some
// other action specified by |done| callback provided by the caller.
// ... = e.result(); // Will wait for e.cb() to be called and return the
// collected result.
//
template <typename ResType>
class TestEvent {
public:
TestCallbackWaiter() : run_loop_(std::make_unique<base::RunLoop>()) {}
virtual void Signal() { run_loop_->Quit(); }
TestEvent() : run_loop_(std::make_unique<base::RunLoop>()) {}
~TestEvent() = default;
TestEvent(const TestEvent& other) = delete;
TestEvent& operator=(const TestEvent& other) = delete;
ResType result() {
run_loop_->Run();
return std::forward<ResType>(result_);
}
void Wait() { run_loop_->Run(); }
void Reset() {
run_loop_.reset();
run_loop_ = std::make_unique<base::RunLoop>();
// Completion callback to hand over to the processing method.
base::OnceCallback<void(ResType res)> cb() {
return base::BindOnce(
[](base::RunLoop* run_loop, ResType* result, ResType res) {
*result = std::forward<ResType>(res);
run_loop->Quit();
},
base::Unretained(run_loop_.get()), base::Unretained(&result_));
}
protected:
private:
std::unique_ptr<base::RunLoop> run_loop_;
ResType result_;
};
class ReportingClientTest : public testing::Test {
class ReportClientTest : public testing::Test {
public:
void SetUp() override {
#ifdef OS_CHROMEOS
......@@ -99,67 +119,37 @@ class ReportingClientTest : public testing::Test {
};
// Tests that a ReportQueue can be created using the ReportingClient.
TEST_F(ReportingClientTest, CreatesReportQueue) {
TEST_F(ReportClientTest, CreatesReportQueue) {
auto config_result = ReportQueueConfiguration::Create(
dm_token_, destination_, priority_, policy_checker_callback_);
ASSERT_OK(config_result);
TestCallbackWaiter waiter;
StatusOr<std::unique_ptr<ReportQueue>> result;
auto create_report_queue_cb = base::BindOnce(
[](TestCallbackWaiter* waiter,
StatusOr<std::unique_ptr<ReportQueue>>* result,
StatusOr<std::unique_ptr<ReportQueue>> create_result) {
*result = std::move(create_result);
waiter->Signal();
},
&waiter, &result);
TestEvent<StatusOr<std::unique_ptr<ReportQueue>>> a;
ReportingClient::CreateReportQueue(std::move(config_result.ValueOrDie()),
std::move(create_report_queue_cb));
waiter.Wait();
waiter.Reset();
ASSERT_OK(result);
a.cb());
ASSERT_OK(a.result());
}
// Ensures that created ReportQueues are actually different.
TEST_F(ReportingClientTest, CreatesTwoDifferentReportQueues) {
TEST_F(ReportClientTest, CreatesTwoDifferentReportQueues) {
auto config_result = ReportQueueConfiguration::Create(
dm_token_, destination_, priority_, policy_checker_callback_);
EXPECT_TRUE(config_result.ok());
TestCallbackWaiter waiter;
StatusOr<std::unique_ptr<ReportQueue>> result;
auto create_report_queue_cb = base::BindOnce(
[](TestCallbackWaiter* waiter,
StatusOr<std::unique_ptr<ReportQueue>>* result,
StatusOr<std::unique_ptr<ReportQueue>> create_result) {
*result = std::move(create_result);
waiter->Signal();
},
&waiter, &result);
TestEvent<StatusOr<std::unique_ptr<ReportQueue>>> a1;
ReportingClient::CreateReportQueue(std::move(config_result.ValueOrDie()),
std::move(create_report_queue_cb));
waiter.Wait();
waiter.Reset();
a1.cb());
auto result = a1.result();
ASSERT_OK(result);
auto report_queue_1 = std::move(result.ValueOrDie());
TestEvent<StatusOr<std::unique_ptr<ReportQueue>>> a2;
config_result = ReportQueueConfiguration::Create(
dm_token_, destination_, priority_, policy_checker_callback_);
create_report_queue_cb = base::BindOnce(
[](TestCallbackWaiter* waiter,
StatusOr<std::unique_ptr<ReportQueue>>* result,
StatusOr<std::unique_ptr<ReportQueue>> create_result) {
*result = std::move(create_result);
waiter->Signal();
},
&waiter, &result);
ReportingClient::CreateReportQueue(std::move(config_result.ValueOrDie()),
std::move(create_report_queue_cb));
waiter.Wait();
a2.cb());
result = a2.result();
ASSERT_OK(result);
auto report_queue_2 = std::move(result.ValueOrDie());
EXPECT_NE(report_queue_1.get(), report_queue_2.get());
......
......@@ -50,42 +50,47 @@ ReportQueue::ReportQueue(std::unique_ptr<ReportQueueConfiguration> config,
DETACH_FROM_SEQUENCE(sequence_checker_);
}
Status ReportQueue::Enqueue(base::StringPiece record,
EnqueueCallback callback) const {
return AddRecord(record, std::move(callback));
void ReportQueue::Enqueue(base::StringPiece record,
EnqueueCallback callback) const {
AddRecord(record, std::move(callback));
}
Status ReportQueue::Enqueue(const base::Value& record,
EnqueueCallback callback) const {
void ReportQueue::Enqueue(const base::Value& record,
EnqueueCallback callback) const {
std::string json_record;
if (!base::JSONWriter::Write(record, &json_record)) {
return Status(error::INVALID_ARGUMENT,
"Provided record was not convertable to a std::string");
std::move(callback).Run(
Status(error::INVALID_ARGUMENT,
"Provided record was not convertable to a std::string"));
return;
}
return AddRecord(json_record, std::move(callback));
AddRecord(json_record, std::move(callback));
}
Status ReportQueue::Enqueue(google::protobuf::MessageLite* record,
EnqueueCallback callback) const {
void ReportQueue::Enqueue(google::protobuf::MessageLite* record,
EnqueueCallback callback) const {
std::string protobuf_record;
if (!record->SerializeToString(&protobuf_record)) {
return Status(error::INVALID_ARGUMENT,
"Unabled to serialize record to string. Most likely due to "
"unset required fields.");
std::move(callback).Run(
Status(error::INVALID_ARGUMENT,
"Unabled to serialize record to string. Most likely due to "
"unset required fields."));
return;
}
return AddRecord(protobuf_record, std::move(callback));
}
Status ReportQueue::AddRecord(base::StringPiece record,
EnqueueCallback callback) const {
RETURN_IF_ERROR(config_->CheckPolicy());
if (!sequenced_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ReportQueue::SendRecordToStorage,
base::Unretained(this), std::string(record),
std::move(callback)))) {
return Status(error::INTERNAL, "Failed to post the record for processing.");
void ReportQueue::AddRecord(base::StringPiece record,
EnqueueCallback callback) const {
const Status status = config_->CheckPolicy();
if (!status.ok()) {
std::move(callback).Run(status);
return;
}
return Status::StatusOK();
sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&ReportQueue::SendRecordToStorage, base::Unretained(this),
std::string(record), std::move(callback)));
}
void ReportQueue::SendRecordToStorage(base::StringPiece record_data,
......
......@@ -60,25 +60,25 @@ class ReportQueue {
// UPLOAD_EVENTS : UploadEventsRequest
//
// |record| will be sent as a string with no conversion.
virtual Status Enqueue(base::StringPiece record,
EnqueueCallback callback) const;
virtual void Enqueue(base::StringPiece record,
EnqueueCallback callback) const;
// |record| will be converted to a JSON string with base::JsonWriter::Write.
virtual Status Enqueue(const base::Value& record,
EnqueueCallback callback) const;
virtual void Enqueue(const base::Value& record,
EnqueueCallback callback) const;
// |record| will be converted to a string with SerializeToString(). The
// handler is responsible for converting the record back to a proto with a
// ParseFromString() call.
virtual Status Enqueue(google::protobuf::MessageLite* record,
EnqueueCallback callback) const;
virtual void Enqueue(google::protobuf::MessageLite* record,
EnqueueCallback callback) const;
protected:
ReportQueue(std::unique_ptr<ReportQueueConfiguration> config,
scoped_refptr<StorageModule> storage);
private:
Status AddRecord(base::StringPiece record, EnqueueCallback callback) const;
void AddRecord(base::StringPiece record, EnqueueCallback callback) const;
void SendRecordToStorage(base::StringPiece record,
EnqueueCallback callback) const;
......
......@@ -44,52 +44,35 @@ namespace {
//
// TestEvent<ResType> e;
// ... Do some async work passing e.cb() as a completion callback of
// base::OnceCallback<void(ResType* res)> type which also may perform
// some other action specified by |done| callback provided by the caller.
// base::OnceCallback<void(ResType* res)> type which also may perform some
// other action specified by |done| callback provided by the caller.
// ... = e.result(); // Will wait for e.cb() to be called and return the
// // collected result.
//
// Or, when the callback is not expected to be invoked:
//
// TestEvent<ResType> e(/*expected_to_complete=*/false);
// ... Start work passing e.cb() as a completion callback,
// which will not happen.
// collected result.
//
template <typename ResType>
class TestEvent {
public:
explicit TestEvent(bool expected_to_complete = true)
: expected_to_complete_(expected_to_complete),
completed_(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED) {}
~TestEvent() {
if (expected_to_complete_) {
EXPECT_TRUE(completed_.IsSignaled()) << "Not responded";
} else {
EXPECT_FALSE(completed_.IsSignaled()) << "Responded";
}
}
TestEvent() : run_loop_(std::make_unique<base::RunLoop>()) {}
~TestEvent() = default;
TestEvent(const TestEvent& other) = delete;
TestEvent& operator=(const TestEvent& other) = delete;
ResType result() {
completed_.Wait();
run_loop_->Run();
return std::forward<ResType>(result_);
}
// Completion callback to hand over to the processing method.
base::OnceCallback<void(ResType res)> cb() {
DCHECK(!completed_.IsSignaled());
return base::BindOnce(
[](base::WaitableEvent* completed, ResType* result, ResType res) {
[](base::RunLoop* run_loop, ResType* result, ResType res) {
*result = std::forward<ResType>(res);
completed->Signal();
run_loop->Quit();
},
base::Unretained(&completed_), base::Unretained(&result_));
base::Unretained(run_loop_.get()), base::Unretained(&result_));
}
private:
bool expected_to_complete_;
base::WaitableEvent completed_;
std::unique_ptr<base::RunLoop> run_loop_;
ResType result_;
};
......@@ -152,12 +135,9 @@ class ReportQueueTest : public testing::Test {
TEST_F(ReportQueueTest, SuccessfulStringRecord) {
constexpr char kTestString[] = "El-Chupacabra";
TestEvent<Status> a;
Status status = report_queue_->Enqueue(kTestString, a.cb());
ASSERT_OK(status);
report_queue_->Enqueue(kTestString, a.cb());
EXPECT_OK(a.result());
EXPECT_EQ(test_storage_module()->priority(), priority_);
EXPECT_EQ(test_storage_module()->record().data(), kTestString);
}
......@@ -169,8 +149,7 @@ TEST_F(ReportQueueTest, SuccessfulBaseValueRecord) {
base::Value test_dict(base::Value::Type::DICTIONARY);
test_dict.SetStringKey(kTestKey, kTestValue);
TestEvent<Status> a;
Status status = report_queue_->Enqueue(test_dict, a.cb());
ASSERT_OK(status);
report_queue_->Enqueue(test_dict, a.cb());
EXPECT_OK(a.result());
EXPECT_EQ(test_storage_module()->priority(), priority_);
......@@ -187,8 +166,7 @@ TEST_F(ReportQueueTest, SuccessfulProtoRecord) {
reporting::test::TestMessage test_message;
test_message.set_test("TEST_MESSAGE");
TestEvent<Status> a;
Status status = report_queue_->Enqueue(&test_message, a.cb());
ASSERT_OK(status);
report_queue_->Enqueue(&test_message, a.cb());
EXPECT_OK(a.result());
EXPECT_EQ(test_storage_module()->priority(), priority_);
......@@ -212,9 +190,8 @@ TEST_F(ReportQueueTest, CallSuccessCallbackFailure) {
reporting::test::TestMessage test_message;
test_message.set_test("TEST_MESSAGE");
TestEvent<Status> a;
Status status = report_queue_->Enqueue(&test_message, a.cb());
ASSERT_OK(status);
auto result = a.result();
report_queue_->Enqueue(&test_message, a.cb());
const auto result = a.result();
EXPECT_FALSE(result.ok());
EXPECT_EQ(result.error_code(), error::UNKNOWN);
}
......@@ -223,10 +200,11 @@ TEST_F(ReportQueueTest, EnqueueStringFailsOnPolicy) {
EXPECT_CALL(*this, MockedPolicyCheck)
.WillOnce(Return(Status(error::UNAUTHENTICATED, "Failing for tests")));
constexpr char kTestString[] = "El-Chupacabra";
TestEvent<Status> a(/*expected_to_complete=*/false);
Status status = report_queue_->Enqueue(kTestString, a.cb());
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), error::UNAUTHENTICATED);
TestEvent<Status> a;
report_queue_->Enqueue(kTestString, a.cb());
const auto result = a.result();
EXPECT_FALSE(result.ok());
EXPECT_EQ(result.error_code(), error::UNAUTHENTICATED);
}
TEST_F(ReportQueueTest, EnqueueProtoFailsOnPolicy) {
......@@ -234,10 +212,11 @@ TEST_F(ReportQueueTest, EnqueueProtoFailsOnPolicy) {
.WillOnce(Return(Status(error::UNAUTHENTICATED, "Failing for tests")));
reporting::test::TestMessage test_message;
test_message.set_test("TEST_MESSAGE");
TestEvent<Status> a(/*expected_to_complete=*/false);
Status status = report_queue_->Enqueue(&test_message, a.cb());
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), error::UNAUTHENTICATED);
TestEvent<Status> a;
report_queue_->Enqueue(&test_message, a.cb());
const auto result = a.result();
EXPECT_FALSE(result.ok());
EXPECT_EQ(result.error_code(), error::UNAUTHENTICATED);
}
TEST_F(ReportQueueTest, EnqueueValueFailsOnPolicy) {
......@@ -247,10 +226,11 @@ TEST_F(ReportQueueTest, EnqueueValueFailsOnPolicy) {
constexpr char kTestValue[] = "TEST_VALUE";
base::Value test_dict(base::Value::Type::DICTIONARY);
test_dict.SetStringKey(kTestKey, kTestValue);
TestEvent<Status> a(/*expected_to_complete=*/false);
Status status = report_queue_->Enqueue(test_dict, a.cb());
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), error::UNAUTHENTICATED);
TestEvent<Status> a;
report_queue_->Enqueue(test_dict, a.cb());
const auto result = a.result();
EXPECT_FALSE(result.ok());
EXPECT_EQ(result.error_code(), error::UNAUTHENTICATED);
}
} // namespace
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment