Commit f9875f72 authored by Zach Trudo's avatar Zach Trudo Committed by Chromium LUCI CQ

Add RecordHandlerImpl

RecordHandlerImpl replaces Destination specific handlers
with a handler that points directly to the new endpoint.

Bug: chromium:1078512
Change-Id: I8dd7ceca2556ad032f2e398cde1987fa4100b016
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2555200
Commit-Queue: Zach Trudo <zatrudo@google.com>
Reviewed-by: default avatarLeonid Baraz <lbaraz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#833093}
parent f9ca0d7d
...@@ -1228,12 +1228,10 @@ static_library("browser") { ...@@ -1228,12 +1228,10 @@ static_library("browser") {
"policy/messaging_layer/storage/storage_module.h", "policy/messaging_layer/storage/storage_module.h",
"policy/messaging_layer/storage/storage_queue.cc", "policy/messaging_layer/storage/storage_queue.cc",
"policy/messaging_layer/storage/storage_queue.h", "policy/messaging_layer/storage/storage_queue.h",
"policy/messaging_layer/upload/app_install_report_handler.cc",
"policy/messaging_layer/upload/app_install_report_handler.h",
"policy/messaging_layer/upload/dm_server_upload_service.cc", "policy/messaging_layer/upload/dm_server_upload_service.cc",
"policy/messaging_layer/upload/dm_server_upload_service.h", "policy/messaging_layer/upload/dm_server_upload_service.h",
"policy/messaging_layer/upload/meet_device_telemetry_report_handler.cc", "policy/messaging_layer/upload/record_handler_impl.cc",
"policy/messaging_layer/upload/meet_device_telemetry_report_handler.h", "policy/messaging_layer/upload/record_handler_impl.h",
"policy/messaging_layer/upload/upload_client.cc", "policy/messaging_layer/upload/upload_client.cc",
"policy/messaging_layer/upload/upload_client.h", "policy/messaging_layer/upload/upload_client.h",
"policy/messaging_layer/util/backoff_settings.cc", "policy/messaging_layer/util/backoff_settings.cc",
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/policy/messaging_layer/upload/app_install_report_handler.h"
#include <utility>
#include "base/bind.h"
#include "base/callback.h"
#include "base/containers/queue.h"
#include "base/json/json_reader.h"
#include "base/optional.h"
#include "base/sequenced_task_runner.h"
#include "base/strings/strcat.h"
#include "base/task/post_task.h"
#include "base/task_runner.h"
#include "base/values.h"
#include "chrome/browser/policy/messaging_layer/upload/dm_server_upload_service.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/status_macros.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
#include "components/policy/core/common/cloud/cloud_policy_client.h"
#include "components/policy/proto/record.pb.h"
#include "components/policy/proto/record_constants.pb.h"
#include "content/public/browser/browser_task_traits.h"
#include "content/public/browser/browser_thread.h"
namespace reporting {
using AppInstallReportUploader =
AppInstallReportHandler::AppInstallReportUploader;
using UploaderLeaderTracker = AppInstallReportHandler::UploaderLeaderTracker;
UploaderLeaderTracker::UploaderLeaderTracker(policy::CloudPolicyClient* client)
: client_(client) {}
UploaderLeaderTracker::~UploaderLeaderTracker() = default;
// static
scoped_refptr<UploaderLeaderTracker> UploaderLeaderTracker::Create(
policy::CloudPolicyClient* client) {
return base::WrapRefCounted(new UploaderLeaderTracker(client));
}
StatusOr<std::unique_ptr<UploaderLeaderTracker::LeaderLock>>
UploaderLeaderTracker::RequestLeaderPromotion() {
if (has_promoted_app_install_event_uploader_) {
return Status(error::RESOURCE_EXHAUSTED,
"Only one leader is allowed at a time.");
}
has_promoted_app_install_event_uploader_ = true;
return std::make_unique<LeaderLock>(
base::BindOnce(&UploaderLeaderTracker::ReleaseLeader,
base::Unretained(this)),
client_);
}
void UploaderLeaderTracker::ReleaseLeader() {
has_promoted_app_install_event_uploader_ = false;
}
UploaderLeaderTracker::LeaderLock::LeaderLock(
UploaderLeaderTracker::ReleaseLeaderCallback release_cb,
policy::CloudPolicyClient* client)
: client_(std::move(client)),
release_leader_callback_(std::move(release_cb)) {}
UploaderLeaderTracker::LeaderLock::~LeaderLock() {
if (release_leader_callback_) {
std::move(release_leader_callback_).Run();
}
}
AppInstallReportUploader::AppInstallReportUploader(
base::Value report,
scoped_refptr<SharedQueue<base::Value>> report_queue,
scoped_refptr<UploaderLeaderTracker> leader_tracker,
ClientCallback client_cb,
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
: TaskRunnerContext<bool>(std::move(client_cb), sequenced_task_runner),
report_(std::move(report)),
report_queue_(report_queue),
leader_tracker_(leader_tracker) {}
AppInstallReportUploader::~AppInstallReportUploader() = default;
void AppInstallReportUploader::OnStart() {
report_queue_->Push(std::move(report_),
base::BindOnce(&AppInstallReportUploader::OnPushComplete,
base::Unretained(this)));
}
void AppInstallReportUploader::OnPushComplete() {
Schedule(&AppInstallReportUploader::RequestLeaderPromotion,
base::Unretained(this));
}
void AppInstallReportUploader::RequestLeaderPromotion() {
auto promo_result = leader_tracker_->RequestLeaderPromotion();
if (!promo_result.ok()) {
Complete();
return;
}
leader_lock_ = std::move(promo_result.ValueOrDie());
ScheduleNextPop();
}
void AppInstallReportUploader::ScheduleNextPop() {
report_queue_->Pop(base::BindOnce(&AppInstallReportUploader::OnPopResult,
base::Unretained(this)));
}
void AppInstallReportUploader::OnPopResult(StatusOr<base::Value> pop_result) {
if (!pop_result.ok()) {
// There are no more records to process - exit.
leader_lock_.reset();
Complete();
return;
}
Schedule(&AppInstallReportUploader::StartUpload, base::Unretained(this),
std::move(pop_result.ValueOrDie()));
}
void AppInstallReportUploader::StartUpload(base::Value record) {
ClientCallback cb = base::BindOnce(
&AppInstallReportUploader::OnUploadComplete, base::Unretained(this));
base::PostTask(FROM_HERE, {content::BrowserThread::UI},
base::BindOnce(
[](policy::CloudPolicyClient* client, base::Value record,
ClientCallback cb) {
client->UploadExtensionInstallReport(std::move(record),
std::move(cb));
},
leader_lock_->client(), std::move(record), std::move(cb)));
}
void AppInstallReportUploader::OnUploadComplete(bool success) {
if (!success) {
LOG(ERROR) << Status(error::DATA_LOSS, "Upload was unsuccessful");
}
Schedule(&AppInstallReportUploader::ScheduleNextPop, base::Unretained(this));
}
void AppInstallReportUploader::Complete() {
Schedule(&AppInstallReportUploader::Response, base::Unretained(this), true);
}
AppInstallReportHandler::AppInstallReportHandler(
policy::CloudPolicyClient* client)
: RecordHandler(client),
report_queue_(SharedQueue<base::Value>::Create()),
leader_tracker_(UploaderLeaderTracker::Create(client)),
sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})) {}
AppInstallReportHandler::~AppInstallReportHandler() = default;
Status AppInstallReportHandler::HandleRecord(Record record) {
RETURN_IF_ERROR(ValidateRecord(record));
ASSIGN_OR_RETURN(base::Value report, ConvertRecord(record));
ClientCallback client_cb = base::BindOnce([](bool finished_running) {
VLOG(1) << "Finished Running AppInstallReportUploader";
});
// Start an uploader in case any previous uploader has finished running before
// this record was posted.
Start<AppInstallReportUploader>(std::move(report), report_queue_,
leader_tracker_, std::move(client_cb),
sequenced_task_runner_);
return Status::StatusOK();
}
Status AppInstallReportHandler::ValidateRecord(const Record& record) const {
return ValidateDestination(record, Destination::UPLOAD_EVENTS);
}
Status AppInstallReportHandler::ValidateDestination(
const Record& record,
Destination expected_destination) const {
if (record.destination() != expected_destination) {
return Status(
error::INVALID_ARGUMENT,
base::StrCat({"Record destination mismatch, expected=",
Destination_Name(expected_destination), ", encountered=",
Destination_Name(record.destination())}));
}
return Status::StatusOK();
}
StatusOr<base::Value> AppInstallReportHandler::ConvertRecord(
const Record& record) const {
base::Optional<base::Value> report_result =
base::JSONReader::Read(record.data());
if (!report_result.has_value()) {
return Status(error::INVALID_ARGUMENT, "Unknown Report Format");
}
return std::move(report_result.value());
}
} // namespace reporting
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROME_BROWSER_POLICY_MESSAGING_LAYER_UPLOAD_APP_INSTALL_REPORT_HANDLER_H_
#define CHROME_BROWSER_POLICY_MESSAGING_LAYER_UPLOAD_APP_INSTALL_REPORT_HANDLER_H_
#include <string>
#include <utility>
#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/sequenced_task_runner.h"
#include "base/task/post_task.h"
#include "base/task_runner.h"
#include "base/values.h"
#include "chrome/browser/policy/messaging_layer/upload/dm_server_upload_service.h"
#include "chrome/browser/policy/messaging_layer/util/shared_queue.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/status_macros.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
#include "components/policy/core/common/cloud/cloud_policy_client.h"
#include "components/policy/proto/record.pb.h"
namespace reporting {
// |AppInstallReportHandler| handles |AppInstallReportRequests|, sending them to
// the server using |CloudPolicyClient|. Since |CloudPolicyClient| will cancel
// any in progress reports if a new report is added, |AppInstallReportHandler|
// ensures that only one report is ever processed at one time by forming a
// queue.
class AppInstallReportHandler : public DmServerUploadService::RecordHandler {
public:
// The client uses a boolean value for status, where true indicates success
// and false indicates failure.
using ClientCallback = base::OnceCallback<void(bool status)>;
// Tracking the leader needs to outlive |AppInstallReportHandler| so it needs
// to be wrapped in a scoped_refptr.
class UploaderLeaderTracker
: public base::RefCountedThreadSafe<UploaderLeaderTracker> {
public:
using ReleaseLeaderCallback = base::OnceCallback<void()>;
// Holds the lock on the leader, releases it upon destruction.
class LeaderLock {
public:
LeaderLock(ReleaseLeaderCallback release_cb,
policy::CloudPolicyClient* client);
virtual ~LeaderLock();
policy::CloudPolicyClient* client() { return client_; }
private:
policy::CloudPolicyClient* client_;
ReleaseLeaderCallback release_leader_callback_;
};
static scoped_refptr<UploaderLeaderTracker> Create(
policy::CloudPolicyClient* cloud_policy_client);
// If there is currently no leader
// (|has_promoted_app_install_event_uploader_| is false), then the StatusOr
// will contain a callback to release leadership. If there is currently a
// leader an error::RESOURCE_EXHAUSTED is returned (which should be the
// common case). This will be called on sequence from inside the
// |AppInstallReportUploader| and so needs no additional protection.
StatusOr<std::unique_ptr<LeaderLock>> RequestLeaderPromotion();
private:
friend class base::RefCountedThreadSafe<UploaderLeaderTracker>;
explicit UploaderLeaderTracker(policy::CloudPolicyClient* client);
virtual ~UploaderLeaderTracker();
// Once a AppInstallEventUploader leader drains the queue of reports, it
// will release its leadership and return, allowing a new
// AppInstallEventUploader to take leadership and upload events.
void ReleaseLeader();
// CloudPolicyClient allows calls to the reporting server.
policy::CloudPolicyClient* client_;
// Flag indicates whether a leader has been promoted.
bool has_promoted_app_install_event_uploader_{false};
};
using RequestLeaderPromotionCallback =
base::OnceCallback<StatusOr<UploaderLeaderTracker::LeaderLock>()>;
// AppInstallReportUploader handles enqueuing events on the |report_queue_|,
// and uploading those events with the |client_|.
class AppInstallReportUploader : public TaskRunnerContext<bool> {
public:
AppInstallReportUploader(
base::Value report,
scoped_refptr<SharedQueue<base::Value>> report_queue,
scoped_refptr<UploaderLeaderTracker> leader_tracker,
ClientCallback client_cb,
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner);
private:
~AppInstallReportUploader() override;
// AppInstallReportUploader follows the this sequence for handling upload:
// OnStart(): Pushes a report onto the |report_queue_|
// OnPushComplete: Called off sequence. Schedules RequestLeaderPromotion on
// sequence
// RequestLeaderPromotion: Called on sequence. requests promotion to leader
// if there isn't already one.
// OnLeaderPromotionResult: Called off sequence - two paths
// 1. A leader already exists - Call Complete() and then Response().
// 2. Promoted to leader - begin processing records in the queue.
// Schedules |ScheduleNextUpload| on sequence.
//
// ScheduleNextPop: Called on sequence. Calls report_queue_->Pop() with
// |StartUpload|.
// OnPopResult: Called off Sequence, two paths:
// 1. pop_result indicates there are no more records:
// Schedule ReleaseLeaderPromotion on sequence then Complete and
// Response.
// 2. pop_result holds a record: Schedule |UploadRecord|.
// UploadRecord: Called on sequence. Calls client_->UploadAppInstallReport
// with |UploadComplete| as the callback.
// UploadComplete: Called off sequence. Schedule |ScheduleNextPop| on
// sequence.
//
// During the ScheduleNextPop loop other requests can be enqueued from other
// threads while leadership is held. This allows one thread to be busy with
// the process of uploading, while other threads can push reports onto the
// queue and return. This is necessary because |CloudPolicyClient| only
// allows one upload at a time.
void OnStart() override;
void OnPushComplete();
void RequestLeaderPromotion();
void ScheduleNextPop();
void OnPopResult(StatusOr<base::Value> pop_result);
void StartUpload(base::Value record);
void OnUploadComplete(bool success);
void Complete();
base::Value report_;
scoped_refptr<SharedQueue<base::Value>> report_queue_;
scoped_refptr<UploaderLeaderTracker> leader_tracker_;
std::unique_ptr<UploaderLeaderTracker::LeaderLock> leader_lock_;
};
explicit AppInstallReportHandler(policy::CloudPolicyClient* client);
~AppInstallReportHandler() override;
// Base class RecordHandler method implementation.
Status HandleRecord(Record record) override;
protected:
// Helper method for |ValidateRecord|. Validates destination.
Status ValidateDestination(const Record& record,
Destination expected_destination) const;
private:
// Validate record (override for subclass).
virtual Status ValidateRecord(const Record& record) const;
// Convert record into base::Value for upload (override for subclass).
virtual StatusOr<base::Value> ConvertRecord(const Record& record) const;
scoped_refptr<SharedQueue<base::Value>> report_queue_;
scoped_refptr<UploaderLeaderTracker> leader_tracker_;
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
};
} // namespace reporting
#endif // CHROME_BROWSER_POLICY_MESSAGING_LAYER_UPLOAD_APP_INSTALL_REPORT_HANDLER_H_
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
#include "base/sequence_checker.h" #include "base/sequence_checker.h"
#include "base/task/post_task.h" #include "base/task/post_task.h"
#include "base/task_runner.h" #include "base/task_runner.h"
#include "chrome/browser/policy/messaging_layer/util/shared_vector.h"
#include "chrome/browser/policy/messaging_layer/util/status.h" #include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/status_macros.h" #include "chrome/browser/policy/messaging_layer/util/status_macros.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h" #include "chrome/browser/policy/messaging_layer/util/statusor.h"
...@@ -47,16 +46,24 @@ class DmServerUploadService { ...@@ -47,16 +46,24 @@ class DmServerUploadService {
using CompletionCallback = base::OnceCallback<void(CompletionResponse)>; using CompletionCallback = base::OnceCallback<void(CompletionResponse)>;
// Since DmServer records need to be sorted prior to sending, we need handlers // Handles sending records to the server.
// for each type of record.
class RecordHandler { class RecordHandler {
public: public:
explicit RecordHandler(policy::CloudPolicyClient* client);
virtual ~RecordHandler() = default; virtual ~RecordHandler() = default;
virtual Status HandleRecord(Record record) = 0; // Will iterate over |records| and ensure they are in ascending sequence
// order, and within the same generation. Any out of order records will be
// discarded.
// Once the server has responded |upload_complete| is called with either the
// highest accepted SequencingInformation, or an error detailing the failure
// cause.
// Any errors will result in |upload_complete| being called with a Status.
virtual void HandleRecords(
std::unique_ptr<std::vector<EncryptedRecord>> records,
DmServerUploadService::CompletionCallback upload_complete) = 0;
protected: protected:
explicit RecordHandler(policy::CloudPolicyClient* client);
policy::CloudPolicyClient* GetClient() const { return client_; } policy::CloudPolicyClient* GetClient() const { return client_; }
private: private:
...@@ -70,26 +77,17 @@ class DmServerUploadService { ...@@ -70,26 +77,17 @@ class DmServerUploadService {
public: public:
DmServerUploader( DmServerUploader(
std::unique_ptr<std::vector<EncryptedRecord>> records, std::unique_ptr<std::vector<EncryptedRecord>> records,
scoped_refptr<SharedVector<std::unique_ptr<RecordHandler>>> handlers, RecordHandler* handler,
CompletionCallback completion_cb, CompletionCallback completion_cb,
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner); scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner);
private: private:
struct RecordInfo {
Record record;
SequencingInformation sequencing_information;
};
~DmServerUploader() override; ~DmServerUploader() override;
// OnStart checks to ensure that our record set isn't empty, and requests // OnStart checks to ensure that our record set isn't empty, and requests
// handler size status from |handlers_|. // handler size status from |handlers_|.
void OnStart() override; void OnStart() override;
// The callback for handler size status. Will early exit if there are no
// available handlers. Otherwise schedules ProcessRecords.
void IsHandlerVectorEmptyCheck(bool handler_is_empty);
// ProcessRecords verifies that the records provided are parseable and sets // ProcessRecords verifies that the records provided are parseable and sets
// the |Record|s up for handling by the |RecordHandlers|s. On // the |Record|s up for handling by the |RecordHandlers|s. On
// completion, ProcessRecords |Schedule|s |HandleRecords|. // completion, ProcessRecords |Schedule|s |HandleRecords|.
...@@ -103,15 +101,13 @@ class DmServerUploadService { ...@@ -103,15 +101,13 @@ class DmServerUploadService {
// processed and calls Complete. // processed and calls Complete.
void OnRecordsHandled(); void OnRecordsHandled();
// Complete evaluates if any records were successfully uploaded. If no // Complete schedules |Response| with the provided |completion_response|.
// records were successfully uploaded and |status| is not ok - it calls void Complete(CompletionResponse completion_response);
// |Response| with the provided |status|. Otherwise it calls |Response| with
// the list of successful uploads (even if some were not successful).
void Complete(Status status);
// Helper function for determining if a Record is valid and adding it to // Helper function for determining if an EncryptedRecord is valid.
// |record_infos_|. Status IsRecordValid(const EncryptedRecord& encrypted_record,
Status IsRecordValid(const EncryptedRecord& encrypted_record); const uint64_t expected_generation_id,
const uint64_t expected_sequencing_id) const;
// Helper function for tracking the highest sequencing information per // Helper function for tracking the highest sequencing information per
// generation id. Schedules ProcessSuccessfulUploadAddition. // generation id. Schedules ProcessSuccessfulUploadAddition.
...@@ -125,13 +121,8 @@ class DmServerUploadService { ...@@ -125,13 +121,8 @@ class DmServerUploadService {
SequencingInformation sequencing_information); SequencingInformation sequencing_information);
std::unique_ptr<std::vector<EncryptedRecord>> encrypted_records_; std::unique_ptr<std::vector<EncryptedRecord>> encrypted_records_;
scoped_refptr<SharedVector<std::unique_ptr<RecordHandler>>> handlers_; RecordHandler* handler_;
// generation_id_ will be set to the generation of the first record in
// encrypted_records_.
uint64_t generation_id_;
std::vector<RecordInfo> record_infos_;
base::Optional<SequencingInformation> highest_successful_sequence_; base::Optional<SequencingInformation> highest_successful_sequence_;
SEQUENCE_CHECKER(sequence_checker_); SEQUENCE_CHECKER(sequence_checker_);
...@@ -159,11 +150,8 @@ class DmServerUploadService { ...@@ -159,11 +150,8 @@ class DmServerUploadService {
DmServerUploadService(std::unique_ptr<policy::CloudPolicyClient> client, DmServerUploadService(std::unique_ptr<policy::CloudPolicyClient> client,
ReportSuccessfulUploadCallback completion_cb); ReportSuccessfulUploadCallback completion_cb);
static void InitRecordHandlers( static void InitRecordHandler(
std::unique_ptr<DmServerUploadService> uploader, std::unique_ptr<DmServerUploadService> uploader,
#ifdef OS_CHROMEOS
Profile* primary_profile,
#endif // OS_CHROMEOS
base::OnceCallback<void(StatusOr<std::unique_ptr<DmServerUploadService>>)> base::OnceCallback<void(StatusOr<std::unique_ptr<DmServerUploadService>>)>
created_cb); created_cb);
...@@ -173,7 +161,7 @@ class DmServerUploadService { ...@@ -173,7 +161,7 @@ class DmServerUploadService {
std::unique_ptr<policy::CloudPolicyClient> client_; std::unique_ptr<policy::CloudPolicyClient> client_;
ReportSuccessfulUploadCallback upload_cb_; ReportSuccessfulUploadCallback upload_cb_;
scoped_refptr<SharedVector<std::unique_ptr<RecordHandler>>> record_handlers_; std::unique_ptr<RecordHandler> handler_;
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_; scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
}; };
......
...@@ -8,6 +8,8 @@ ...@@ -8,6 +8,8 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h"
#include "base/task_runner.h" #include "base/task_runner.h"
#include "base/test/task_environment.h" #include "base/test/task_environment.h"
#include "chrome/browser/policy/messaging_layer/util/shared_vector.h" #include "chrome/browser/policy/messaging_layer/util/shared_vector.h"
...@@ -21,8 +23,10 @@ ...@@ -21,8 +23,10 @@
namespace reporting { namespace reporting {
namespace { namespace {
using testing::_; using ::testing::_;
using testing::Return; using ::testing::Invoke;
using ::testing::Return;
using ::testing::WithArgs;
// Usage (in tests only): // Usage (in tests only):
// //
...@@ -119,34 +123,32 @@ class TestRecordHandler : public DmServerUploadService::RecordHandler { ...@@ -119,34 +123,32 @@ class TestRecordHandler : public DmServerUploadService::RecordHandler {
TestRecordHandler() : RecordHandler(/*client=*/nullptr) {} TestRecordHandler() : RecordHandler(/*client=*/nullptr) {}
~TestRecordHandler() override = default; ~TestRecordHandler() override = default;
MOCK_METHOD(Status, HandleRecord, (Record)); void HandleRecords(
std::unique_ptr<std::vector<EncryptedRecord>> records,
DmServerUploadService::CompletionCallback upload_complete) override {
HandleRecords_(records, upload_complete);
}
MOCK_METHOD(void,
HandleRecords_,
(std::unique_ptr<std::vector<EncryptedRecord>>&,
DmServerUploadService::CompletionCallback&));
}; };
class DmServerUploaderTest : public testing::Test { class DmServerUploaderTest : public testing::Test {
public: public:
DmServerUploaderTest() DmServerUploaderTest()
: sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})), : sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})),
handlers_(SharedVector<std::unique_ptr< handler_(std::make_unique<TestRecordHandler>()),
DmServerUploadService::RecordHandler>>::Create()) {} records_(std::make_unique<std::vector<EncryptedRecord>>()) {}
void SetUp() override {
std::unique_ptr<TestRecordHandler> handler_ptr(new TestRecordHandler());
handler_ = handler_ptr.get();
handlers_->PushBack(std::move(handler_ptr), base::DoNothing());
records_ = std::make_unique<std::vector<EncryptedRecord>>();
}
protected: protected:
content::BrowserTaskEnvironment task_envrionment_{ content::BrowserTaskEnvironment task_envrionment_{
base::test::TaskEnvironment::TimeSource::MOCK_TIME}; base::test::TaskEnvironment::TimeSource::MOCK_TIME};
TestRecordHandler* handler_;
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_; scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
scoped_refptr<
SharedVector<std::unique_ptr<DmServerUploadService::RecordHandler>>>
handlers_;
std::unique_ptr<TestRecordHandler> handler_;
std::unique_ptr<std::vector<EncryptedRecord>> records_; std::unique_ptr<std::vector<EncryptedRecord>> records_;
const base::TimeDelta kMaxDelay_ = base::TimeDelta::FromSeconds(1); const base::TimeDelta kMaxDelay_ = base::TimeDelta::FromSeconds(1);
...@@ -156,55 +158,54 @@ TEST_F(DmServerUploaderTest, ProcessesRecord) { ...@@ -156,55 +158,54 @@ TEST_F(DmServerUploaderTest, ProcessesRecord) {
// Add an empty record. // Add an empty record.
records_->emplace_back(); records_->emplace_back();
EXPECT_CALL(*handler_, HandleRecord(_)).WillOnce(Return(Status::StatusOK())); EXPECT_CALL(*handler_, HandleRecords_(_, _))
.WillOnce(WithArgs<1>(
Invoke([](DmServerUploadService::CompletionCallback& callback) {
std::move(callback).Run(SequencingInformation());
})));
TestCallbackWaiter callback_waiter; TestCallbackWaiter callback_waiter;
DmServerUploadService::CompletionCallback cb = DmServerUploadService::CompletionCallback cb =
base::BindOnce(&TestCallbackWaiter::CompleteExpectSuccess, base::BindOnce(&TestCallbackWaiter::CompleteExpectSuccess,
base::Unretained(&callback_waiter)); base::Unretained(&callback_waiter));
Start<DmServerUploadService::DmServerUploader>( Start<DmServerUploadService::DmServerUploader>(std::move(records_),
std::move(records_), handlers_, std::move(cb), sequenced_task_runner_); handler_.get(), std::move(cb),
sequenced_task_runner_);
callback_waiter.Wait(); callback_waiter.Wait();
} }
TEST_F(DmServerUploaderTest, ProcessesRecords) { TEST_F(DmServerUploaderTest, ProcessesRecords) {
for (uint64_t i = 0; i < 10; i++) { uint64_t kNumberOfRecords = 10;
EncryptedRecord record; uint64_t kGenerationId = 1234;
auto* sequencing_info = record.mutable_sequencing_information();
sequencing_info->set_sequencing_id(i); for (uint64_t i = 0; i < kNumberOfRecords; i++) {
EncryptedRecord encrypted_record;
records_->push_back(record); encrypted_record.set_encrypted_wrapped_record(
base::StrCat({"Record Number ", base::NumberToString(i)}));
auto* sequencing_information =
encrypted_record.mutable_sequencing_information();
sequencing_information->set_generation_id(kGenerationId);
sequencing_information->set_sequencing_id(i);
sequencing_information->set_priority(Priority::IMMEDIATE);
records_->push_back(std::move(encrypted_record));
} }
EXPECT_CALL(*handler_, HandleRecord(_)) EXPECT_CALL(*handler_, HandleRecords_(_, _))
.Times(10) .WillOnce(WithArgs<1>(
.WillRepeatedly(Return(Status::StatusOK())); Invoke([](DmServerUploadService::CompletionCallback& callback) {
std::move(callback).Run(SequencingInformation());
})));
TestCallbackWaiter callback_waiter; TestCallbackWaiter callback_waiter;
DmServerUploadService::CompletionCallback cb = DmServerUploadService::CompletionCallback cb =
base::BindOnce(&TestCallbackWaiter::CompleteExpectSuccess, base::BindOnce(&TestCallbackWaiter::CompleteExpectSuccess,
base::Unretained(&callback_waiter)); base::Unretained(&callback_waiter));
Start<DmServerUploadService::DmServerUploader>( Start<DmServerUploadService::DmServerUploader>(std::move(records_),
std::move(records_), handlers_, std::move(cb), sequenced_task_runner_); handler_.get(), std::move(cb),
sequenced_task_runner_);
callback_waiter.Wait();
}
TEST_F(DmServerUploaderTest, DeniesBadWrappedRecord) {
EncryptedRecord record;
record.set_encrypted_wrapped_record("El Chupacabra");
records_->push_back(record);
TestCallbackWaiter callback_waiter;
DmServerUploadService::CompletionCallback cb =
base::BindOnce(&TestCallbackWaiter::CompleteExpectInvalidArgument,
base::Unretained(&callback_waiter));
Start<DmServerUploadService::DmServerUploader>(
std::move(records_), handlers_, std::move(cb), sequenced_task_runner_);
callback_waiter.Wait(); callback_waiter.Wait();
} }
...@@ -213,16 +214,21 @@ TEST_F(DmServerUploaderTest, ReportsFailureToProcess) { ...@@ -213,16 +214,21 @@ TEST_F(DmServerUploaderTest, ReportsFailureToProcess) {
// Add an empty record. // Add an empty record.
records_->emplace_back(); records_->emplace_back();
EXPECT_CALL(*handler_, HandleRecord(_)) EXPECT_CALL(*handler_, HandleRecords_(_, _))
.WillOnce(Return(Status(error::INVALID_ARGUMENT, "Fail for test"))); .WillOnce(WithArgs<1>(
Invoke([](DmServerUploadService::CompletionCallback& callback) {
std::move(callback).Run(
Status(error::FAILED_PRECONDITION, "Fail for test"));
})));
TestCallbackWaiter callback_waiter; TestCallbackWaiter callback_waiter;
DmServerUploadService::CompletionCallback cb = DmServerUploadService::CompletionCallback cb =
base::BindOnce(&TestCallbackWaiter::CompleteExpectFailedPrecondition, base::BindOnce(&TestCallbackWaiter::CompleteExpectFailedPrecondition,
base::Unretained(&callback_waiter)); base::Unretained(&callback_waiter));
Start<DmServerUploadService::DmServerUploader>( Start<DmServerUploadService::DmServerUploader>(std::move(records_),
std::move(records_), handlers_, std::move(cb), sequenced_task_runner_); handler_.get(), std::move(cb),
sequenced_task_runner_);
callback_waiter.Wait(); callback_waiter.Wait();
} }
...@@ -231,17 +237,21 @@ TEST_F(DmServerUploaderTest, ReportsFailureToUpload) { ...@@ -231,17 +237,21 @@ TEST_F(DmServerUploaderTest, ReportsFailureToUpload) {
// Add an empty record. // Add an empty record.
records_->emplace_back(); records_->emplace_back();
EXPECT_CALL(*handler_, HandleRecord(_)) EXPECT_CALL(*handler_, HandleRecords_(_, _))
.WillRepeatedly( .WillOnce(WithArgs<1>(
Return(Status(error::DEADLINE_EXCEEDED, "Fail for test"))); Invoke([](DmServerUploadService::CompletionCallback& callback) {
std::move(callback).Run(
Status(error::DEADLINE_EXCEEDED, "Fail for test"));
})));
TestCallbackWaiter callback_waiter; TestCallbackWaiter callback_waiter;
DmServerUploadService::CompletionCallback cb = DmServerUploadService::CompletionCallback cb =
base::BindOnce(&TestCallbackWaiter::CompleteExpectFailedPrecondition, base::BindOnce(&TestCallbackWaiter::CompleteExpectDeadlineExceeded,
base::Unretained(&callback_waiter)); base::Unretained(&callback_waiter));
Start<DmServerUploadService::DmServerUploader>( Start<DmServerUploadService::DmServerUploader>(std::move(records_),
std::move(records_), handlers_, std::move(cb), sequenced_task_runner_); handler_.get(), std::move(cb),
sequenced_task_runner_);
callback_waiter.Wait(); callback_waiter.Wait();
} }
...@@ -252,8 +262,9 @@ TEST_F(DmServerUploaderTest, FailWithZeroRecords) { ...@@ -252,8 +262,9 @@ TEST_F(DmServerUploaderTest, FailWithZeroRecords) {
base::BindOnce(&TestCallbackWaiter::CompleteExpectInvalidArgument, base::BindOnce(&TestCallbackWaiter::CompleteExpectInvalidArgument,
base::Unretained(&callback_waiter)); base::Unretained(&callback_waiter));
Start<DmServerUploadService::DmServerUploader>( Start<DmServerUploadService::DmServerUploader>(std::move(records_),
std::move(records_), handlers_, std::move(cb), sequenced_task_runner_); handler_.get(), std::move(cb),
sequenced_task_runner_);
callback_waiter.Wait(); callback_waiter.Wait();
} }
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/policy/messaging_layer/upload/meet_device_telemetry_report_handler.h"
#include <utility>
#include "base/bind.h"
#include "base/callback.h"
#include "base/containers/queue.h"
#include "base/json/json_reader.h"
#include "base/json/json_string_value_serializer.h"
#include "base/optional.h"
#include "base/sequenced_task_runner.h"
#include "base/strings/strcat.h"
#include "base/task/post_task.h"
#include "base/task_runner.h"
#include "base/values.h"
#include "chrome/browser/chromeos/file_manager/path_util.h"
#include "chrome/browser/policy/messaging_layer/upload/app_install_report_handler.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/status_macros.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "chrome/browser/profiles/reporting_util.h"
#include "components/policy/core/common/cloud/cloud_policy_client.h"
#include "components/policy/core/common/cloud/realtime_reporting_job_configuration.h"
#include "components/policy/proto/record.pb.h"
#include "components/policy/proto/record_constants.pb.h"
namespace reporting {
namespace {
// Common Key names used when building the Event dictionary to pass to the
// Chrome Reporting API.
// When sent the event should look like this:
// {
// "time": "2017-01-15T01:30:15.01Z", // Timestamp in RFC3339 format
// "reportingRecordEvent": {
// "destination": 3,
// "dmToken": "abcdef1234",
// "timestampUs"": 123456,
// "data": "String of Data"
// }
// }
constexpr char kTime[] = "time";
constexpr char kReportingRecordEvent[] = "reportingRecordEvent";
// Common key names used when builing the ReportingRecordEvent dictionary to
// pass to Chrome Reporting API. This is the dictionary indicated by
// |kReportingRecordEvent|. This dictionary is a direct 1:1 relation to the
// reporting::Record proto.
constexpr char kDestination[] = "destination";
constexpr char kDmToken[] = "dmToken";
constexpr char kTimestampUs[] = "timestampUs";
constexpr char kData[] = "data";
// Takes a reporting::Record and converts it to the |kReportingRecordEvent|
// dictionary.
base::Value ConvertRecordProtoToValue(const Record& record) {
base::Value record_fields{base::Value::Type::DICTIONARY};
if (record.has_destination()) {
record_fields.SetIntKey(kDestination, record.destination());
}
if (!record.dm_token().empty()) {
record_fields.SetStringKey(kDmToken, record.dm_token());
}
if (record.has_timestamp_us()) {
// Do not convert into RFC3339 format - we need to keep microseconds.
// 64-bit ints aren't supported by JSON - must be stored as strings
std::ostringstream str;
str << record.timestamp_us();
record_fields.SetStringKey(kTimestampUs, str.str());
}
if (record.has_data()) { // No data indicates gap, empty data is still data.
record_fields.SetStringKey(kData, record.data());
}
return record_fields;
}
// Generates a RFC3999 time string.
std::string GetTimeString(const base::Time& timestamp) {
base::Time::Exploded time_exploded;
timestamp.UTCExplode(&time_exploded);
std::string time_str = base::StringPrintf(
"%d-%02d-%02dT%02d:%02d:%02d.%03dZ", time_exploded.year,
time_exploded.month, time_exploded.day_of_month, time_exploded.hour,
time_exploded.minute, time_exploded.second, time_exploded.millisecond);
return time_str;
}
// Creates a list of Event dictionaries from a Record. (Note: Currently takes
// one Record and makes a list with one Event).
base::Value ConvertRecordProtoToEventList(const Record& record) {
// Create the Event Dictionary
base::Value event{base::Value::Type::DICTIONARY};
// Set the |kReportingRecordEvent| key to the Record dictionary.
event.SetKey(kReportingRecordEvent, ConvertRecordProtoToValue(record));
// Build the timestamp and set the |kTime| to the RFC3999 version.
base::Time timestamp =
base::Time::UnixEpoch() +
base::TimeDelta::FromMicroseconds(record.timestamp_us());
event.SetStringKey(kTime, GetTimeString(timestamp));
// Build the list and append the event.
base::Value records_list{base::Value::Type::LIST};
records_list.Append(std::move(event));
return records_list;
}
} // namespace
MeetDeviceTelemetryReportHandler::MeetDeviceTelemetryReportHandler(
Profile* profile,
policy::CloudPolicyClient* client)
: AppInstallReportHandler(client), profile_(profile) {}
MeetDeviceTelemetryReportHandler::~MeetDeviceTelemetryReportHandler() = default;
Status MeetDeviceTelemetryReportHandler::ValidateRecord(
const Record& record) const {
RETURN_IF_ERROR(
ValidateDestination(record, Destination::MEET_DEVICE_TELEMETRY));
if (!record.has_data()) {
return Status(error::INVALID_ARGUMENT, "No 'data' in the Record");
}
return Status::StatusOK();
}
StatusOr<base::Value> MeetDeviceTelemetryReportHandler::ConvertRecord(
const Record& record) const {
base::Value context = reporting::GetContext(profile_);
base::Value event_list = ConvertRecordProtoToEventList(record);
return policy::RealtimeReportingJobConfiguration::BuildReport(
std::move(event_list), std::move(context));
}
} // namespace reporting
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROME_BROWSER_POLICY_MESSAGING_LAYER_UPLOAD_MEET_DEVICE_TELEMETRY_REPORT_HANDLER_H_
#define CHROME_BROWSER_POLICY_MESSAGING_LAYER_UPLOAD_MEET_DEVICE_TELEMETRY_REPORT_HANDLER_H_
#include <string>
#include <utility>
#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/sequenced_task_runner.h"
#include "base/task/post_task.h"
#include "base/task_runner.h"
#include "base/values.h"
#include "chrome/browser/policy/messaging_layer/upload/app_install_report_handler.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/profiles/profile.h"
#include "components/policy/core/common/cloud/cloud_policy_client.h"
#include "components/policy/proto/record.pb.h"
namespace reporting {
// |MeetDeviceTelemetryReportHandler| wraps |AppInstallReportHandler| to send
// MeetDeviceTelemetry data to DM server with MEET_DEVICE_TELEMETRY destination
// using |CloudPolicyClient|. Since |CloudPolicyClient| will cancel any in
// progress reports if a new report is added, |AppInstallReportHandler| ensures
// that only one report is ever processed at one time by forming a queue.
// Exists only on ChromeOS.
class MeetDeviceTelemetryReportHandler : public AppInstallReportHandler {
public:
// The client uses a boolean value for status, where true indicates success
// and false indicates failure.
using ClientCallback = AppInstallReportHandler::ClientCallback;
MeetDeviceTelemetryReportHandler(Profile* profile,
policy::CloudPolicyClient* client);
~MeetDeviceTelemetryReportHandler() override;
private:
Status ValidateRecord(const Record& record) const override;
StatusOr<base::Value> ConvertRecord(const Record& record) const override;
Profile* const profile_;
};
} // namespace reporting
#endif // CHROME_BROWSER_POLICY_MESSAGING_LAYER_UPLOAD_MEET_DEVICE_TELEMETRY_REPORT_HANDLER_H_
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/policy/messaging_layer/upload/meet_device_telemetry_report_handler.h"
#include "base/json/json_writer.h"
#include "base/optional.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/post_task.h"
#include "base/task_runner.h"
#include "base/test/task_environment.h"
#include "base/values.h"
#include "chrome/browser/policy/messaging_layer/upload/dm_server_upload_service.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/status_macros.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
#include "components/policy/core/common/cloud/cloud_policy_client.h"
#include "components/policy/core/common/cloud/dm_token.h"
#include "components/policy/core/common/cloud/mock_cloud_policy_client.h"
#include "components/policy/proto/record.pb.h"
#include "components/policy/proto/record_constants.pb.h"
#include "components/user_manager/scoped_user_manager.h"
#include "content/public/test/browser_task_environment.h"
#include "testing/gtest/include/gtest/gtest.h"
using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::WithArgs;
namespace reporting {
namespace {
MATCHER_P(MatchValue, expected, "matches base::Value") {
base::Value* const events = arg.FindListKey("events");
if (!events) {
LOG(ERROR) << "Arg does not have 'events' or 'events' is not a list";
return false;
}
base::Value::ListView events_list = events->GetList();
if (events_list.size() != 1) {
LOG(ERROR) << "'events' is empty or has more than one element in the list";
return false;
}
const base::Value& event = *events_list.begin();
const auto* reporting_record_event = event.FindKey("reportingRecordEvent");
if (reporting_record_event == nullptr) {
LOG(ERROR) << "'reportingRecordEvent' is missing" << event;
return false;
}
const auto destination = reporting_record_event->FindIntKey("destination");
if (!destination.has_value() ||
destination.value() != Destination::MEET_DEVICE_TELEMETRY) {
LOG(ERROR) << "'destination' is wrong or missing";
return false;
}
const std::string* const data = reporting_record_event->FindStringKey("data");
if (!data) {
LOG(ERROR) << "'data' is missing";
return false;
}
DCHECK(expected);
std::string expected_string;
if (!base::JSONWriter::Write(*expected, &expected_string)) {
LOG(INFO) << "Unable to serialize the expected";
return false;
}
return *data == expected_string;
}
class TestCallbackWaiter {
public:
TestCallbackWaiter() : run_loop_(std::make_unique<base::RunLoop>()) {}
virtual void Signal() { run_loop_->Quit(); }
void Wait() { run_loop_->Run(); }
protected:
std::unique_ptr<base::RunLoop> run_loop_;
};
class MeetDeviceTelemetryReportHandlerTest : public testing::Test {
public:
MeetDeviceTelemetryReportHandlerTest() = default;
void SetUp() override {
// Set up client.
client_.SetDMToken(
policy::DMToken::CreateValidTokenForTesting("FAKE_DM_TOKEN").value());
}
protected:
content::BrowserTaskEnvironment task_envrionment_;
policy::MockCloudPolicyClient client_;
};
class TestRecord {
public:
explicit TestRecord(base::StringPiece key = "TEST_KEY",
base::StringPiece value = "TEST_VALUE") {
data_.SetKey(key, base::Value(value));
}
Record GetRecord() {
Record record;
std::string json_data;
base::JSONWriter::Write(data_, &json_data);
record.set_data(json_data);
record.set_destination(Destination::MEET_DEVICE_TELEMETRY);
return record;
}
const base::Value* data() const { return &data_; }
private:
base::Value data_{base::Value::Type::DICTIONARY};
};
TEST_F(MeetDeviceTelemetryReportHandlerTest, AcceptsValidRecord) {
TestCallbackWaiter waiter;
TestRecord test_record;
EXPECT_CALL(client_,
UploadExtensionInstallReport_(MatchValue(test_record.data()), _))
.WillOnce(WithArgs<1>(Invoke(
[&waiter](
MeetDeviceTelemetryReportHandler::ClientCallback& callback) {
std::move(callback).Run(true);
waiter.Signal();
})));
MeetDeviceTelemetryReportHandler handler(/*profile=*/nullptr, &client_);
Status handle_status = handler.HandleRecord(test_record.GetRecord());
EXPECT_OK(handle_status);
waiter.Wait();
}
TEST_F(MeetDeviceTelemetryReportHandlerTest, DeniesInvalidDestination) {
EXPECT_CALL(client_, UploadExtensionInstallReport_(_, _)).Times(0);
MeetDeviceTelemetryReportHandler handler(/*profile=*/nullptr, &client_);
Record test_record;
test_record.set_destination(Destination::UPLOAD_EVENTS);
Status handle_status = handler.HandleRecord(test_record);
EXPECT_FALSE(handle_status.ok());
EXPECT_EQ(handle_status.error_code(), error::INVALID_ARGUMENT);
}
TEST_F(MeetDeviceTelemetryReportHandlerTest, DeniesInvalidData) {
EXPECT_CALL(client_, UploadExtensionInstallReport_(_, _)).Times(0);
MeetDeviceTelemetryReportHandler handler(/*profile=*/nullptr, &client_);
Record test_record;
test_record.clear_data();
Status handle_status = handler.HandleRecord(test_record);
EXPECT_FALSE(handle_status.ok());
EXPECT_EQ(handle_status.error_code(), error::INVALID_ARGUMENT);
}
TEST_F(MeetDeviceTelemetryReportHandlerTest, ReportsUnsuccessfulCall) {
TestCallbackWaiter waiter;
TestRecord test_record;
EXPECT_CALL(client_,
UploadExtensionInstallReport_(MatchValue(test_record.data()), _))
.WillOnce(WithArgs<1>(Invoke(
[&waiter](
MeetDeviceTelemetryReportHandler::ClientCallback& callback) {
std::move(callback).Run(false);
waiter.Signal();
})));
MeetDeviceTelemetryReportHandler handler(/*profile=*/nullptr, &client_);
Status handle_status = handler.HandleRecord(test_record.GetRecord());
EXPECT_OK(handle_status);
waiter.Wait();
}
class TestCallbackWaiterWithCounter : public TestCallbackWaiter {
public:
explicit TestCallbackWaiterWithCounter(int counter_limit)
: counter_limit_(counter_limit) {}
void Signal() override {
DCHECK_GT(counter_limit_, 0);
if (--counter_limit_ == 0) {
run_loop_->Quit();
}
}
private:
std::atomic<int> counter_limit_;
};
TEST_F(MeetDeviceTelemetryReportHandlerTest, AcceptsMultipleValidRecords) {
const int kExpectedCallTimes = 10;
TestCallbackWaiterWithCounter waiter{kExpectedCallTimes};
TestRecord test_record;
EXPECT_CALL(client_,
UploadExtensionInstallReport_(MatchValue(test_record.data()), _))
.WillRepeatedly(WithArgs<1>(Invoke(
[&waiter](
MeetDeviceTelemetryReportHandler::ClientCallback& callback) {
std::move(callback).Run(true);
waiter.Signal();
})));
MeetDeviceTelemetryReportHandler handler(/*profile=*/nullptr, &client_);
for (int i = 0; i < kExpectedCallTimes; i++) {
Status handle_status = handler.HandleRecord(test_record.GetRecord());
EXPECT_OK(handle_status);
}
waiter.Wait();
}
} // namespace
} // namespace reporting
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/policy/messaging_layer/upload/record_handler_impl.h"
#include <utility>
#include "base/bind.h"
#include "base/callback.h"
#include "base/containers/queue.h"
#include "base/json/json_reader.h"
#include "base/optional.h"
#include "base/sequenced_task_runner.h"
#include "base/strings/strcat.h"
#include "base/task/post_task.h"
#include "base/task_runner.h"
#include "base/values.h"
#include "chrome/browser/policy/messaging_layer/upload/dm_server_upload_service.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/status_macros.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
#include "chrome/browser/profiles/profile_manager.h"
#include "chrome/browser/profiles/reporting_util.h"
#include "components/policy/core/common/cloud/cloud_policy_client.h"
#include "components/policy/proto/record.pb.h"
#include "components/policy/proto/record_constants.pb.h"
#include "content/public/browser/browser_task_traits.h"
#include "content/public/browser/browser_thread.h"
namespace reporting {
RecordHandlerImpl::ReportUploader::ReportUploader(
std::unique_ptr<std::vector<EncryptedRecord>> records,
policy::CloudPolicyClient* client,
DmServerUploadService::CompletionCallback client_cb,
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
: TaskRunnerContext<DmServerUploadService::CompletionResponse>(
std::move(client_cb),
sequenced_task_runner),
records_(std::move(records)),
client_(client) {}
RecordHandlerImpl::ReportUploader::~ReportUploader() = default;
void RecordHandlerImpl::ReportUploader::OnStart() {
if (client_ == nullptr) {
Status null_client = Status(error::INVALID_ARGUMENT, "Client was null");
LOG(ERROR) << null_client;
Complete(null_client);
return;
}
if (records_ == nullptr) {
Status null_records = Status(error::INVALID_ARGUMENT, "records_ was null");
LOG(ERROR) << null_records;
Complete(null_records);
return;
}
if (records_->empty()) {
Status empty_records =
Status(error::INVALID_ARGUMENT, "records_ was empty");
LOG(ERROR) << empty_records;
Complete(empty_records);
return;
}
// We'll be popping records off the back.
std::reverse(records_->begin(), records_->end());
StartUpload(records_->back());
}
void RecordHandlerImpl::ReportUploader::StartUpload(
const EncryptedRecord& encrypted_record) {
auto cb = base::BindOnce(&RecordHandlerImpl::ReportUploader::OnUploadComplete,
base::Unretained(this));
base::PostTask(
FROM_HERE, {content::BrowserThread::UI},
base::BindOnce(
[](policy::CloudPolicyClient* client, const EncryptedRecord& record,
base::OnceCallback<void(bool)> cb) {
client->UploadEncryptedReport(
record,
reporting::GetContext(ProfileManager::GetPrimaryUserProfile()),
std::move(cb));
},
client_, encrypted_record, std::move(cb)));
}
void RecordHandlerImpl::ReportUploader::OnUploadComplete(bool success) {
if (!success) {
Schedule(&RecordHandlerImpl::ReportUploader::HandleFailedUpload,
base::Unretained(this));
return;
}
Schedule(&RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload,
base::Unretained(this));
}
void RecordHandlerImpl::ReportUploader::HandleFailedUpload() {
Status data_loss = Status(
error::DATA_LOSS,
base::StrCat({"Record failed uploaded: ",
records_->back().sequencing_information().DebugString()}));
LOG(ERROR) << data_loss;
if (highest_sequencing_information_.has_value()) {
Complete(std::move(highest_sequencing_information_.value()));
return;
}
Complete(data_loss);
}
void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() {
highest_sequencing_information_ = records_->back().sequencing_information();
// Pop the last record that was processed.
records_->pop_back();
if (records_->empty()) {
Complete(highest_sequencing_information_.value());
return;
}
StartUpload(records_->back());
}
void RecordHandlerImpl::ReportUploader::Complete(
DmServerUploadService::CompletionResponse completion_result) {
Schedule(&RecordHandlerImpl::ReportUploader::Response, base::Unretained(this),
completion_result);
}
RecordHandlerImpl::RecordHandlerImpl(policy::CloudPolicyClient* client)
: RecordHandler(client),
sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})) {}
RecordHandlerImpl::~RecordHandlerImpl() = default;
void RecordHandlerImpl::HandleRecords(
std::unique_ptr<std::vector<EncryptedRecord>> records,
DmServerUploadService::CompletionCallback upload_complete_cb) {
Start<RecordHandlerImpl::ReportUploader>(std::move(records), GetClient(),
std::move(upload_complete_cb),
sequenced_task_runner_);
}
} // namespace reporting
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROME_BROWSER_POLICY_MESSAGING_LAYER_UPLOAD_RECORD_HANDLER_IMPL_H_
#define CHROME_BROWSER_POLICY_MESSAGING_LAYER_UPLOAD_RECORD_HANDLER_IMPL_H_
#include <string>
#include <utility>
#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/sequenced_task_runner.h"
#include "base/task/post_task.h"
#include "base/task_runner.h"
#include "base/values.h"
#include "chrome/browser/policy/messaging_layer/upload/dm_server_upload_service.h"
#include "chrome/browser/policy/messaging_layer/util/shared_queue.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/status_macros.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
#include "components/policy/core/common/cloud/cloud_policy_client.h"
#include "components/policy/proto/record.pb.h"
namespace reporting {
// |RecordHandlerImpl| handles |ReportRequests|, sending them to
// the server using |CloudPolicyClient|. Since |CloudPolicyClient| will cancel
// any in progress reports if a new report is added, |RecordHandlerImpl|
// ensures that only one report is ever processed at one time by forming a
// queue.
class RecordHandlerImpl : public DmServerUploadService::RecordHandler {
public:
// ReportUploader handles enqueuing events on the |report_queue_|,
// and uploading those events with the |client_|.
class ReportUploader
: public TaskRunnerContext<DmServerUploadService::CompletionResponse> {
public:
ReportUploader(
std::unique_ptr<std::vector<EncryptedRecord>> records,
policy::CloudPolicyClient* client,
DmServerUploadService::CompletionCallback upload_complete_cb,
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner);
private:
~ReportUploader() override;
void OnStart() override;
void StartUpload(const EncryptedRecord& encrypted_record);
void OnUploadComplete(bool success);
void HandleFailedUpload();
void HandleSuccessfulUpload();
void Complete(DmServerUploadService::CompletionResponse completion_result);
std::unique_ptr<std::vector<EncryptedRecord>> records_;
policy::CloudPolicyClient* client_;
// Set for the highest record being uploaded.
base::Optional<SequencingInformation> highest_sequencing_information_;
};
explicit RecordHandlerImpl(policy::CloudPolicyClient* client);
~RecordHandlerImpl() override;
// Base class RecordHandler method implementation.
void HandleRecords(
std::unique_ptr<std::vector<EncryptedRecord>> record,
DmServerUploadService::CompletionCallback upload_complete) override;
private:
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
};
} // namespace reporting
#endif // CHROME_BROWSER_POLICY_MESSAGING_LAYER_UPLOAD_RECORD_HANDLER_IMPL_H_
...@@ -2,10 +2,12 @@ ...@@ -2,10 +2,12 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#include "chrome/browser/policy/messaging_layer/upload/app_install_report_handler.h" #include "chrome/browser/policy/messaging_layer/upload/record_handler_impl.h"
#include "base/json/json_writer.h" #include "base/json/json_writer.h"
#include "base/optional.h" #include "base/optional.h"
#include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h"
#include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event.h"
#include "base/task/post_task.h" #include "base/task/post_task.h"
#include "base/task_runner.h" #include "base/task_runner.h"
...@@ -32,38 +34,56 @@ using ::testing::WithArgs; ...@@ -32,38 +34,56 @@ using ::testing::WithArgs;
namespace reporting { namespace reporting {
namespace { namespace {
MATCHER_P(MatchValue, expected, "matches base::Value") { MATCHER_P(ValueEqualsProto,
std::string arg_string; expected,
if (!base::JSONWriter::Write(arg, &arg_string)) { "Compares StatusOr<MessageLite> to expected MessageLite") {
LOG(INFO) << "Unable to serialize the arg"; if (!arg.ok()) {
return false; return false;
} }
if (arg.ValueOrDie().GetTypeName() != expected.GetTypeName()) {
DCHECK(expected);
std::string expected_string;
if (!base::JSONWriter::Write(*expected, &expected_string)) {
LOG(INFO) << "Unable to serialize the expected";
return false; return false;
} }
return arg.ValueOrDie().SerializeAsString() == expected.SerializeAsString();
return arg_string == expected_string;
} }
class TestCallbackWaiter { class TestCallbackWaiter {
public: public:
TestCallbackWaiter() : run_loop_(std::make_unique<base::RunLoop>()) {} TestCallbackWaiter() = default;
virtual void Signal() { run_loop_->Quit(); } virtual void Signal() { run_loop_.Quit(); }
void Wait() { run_loop_->Run(); } void Wait() { run_loop_.Run(); }
protected: protected:
std::unique_ptr<base::RunLoop> run_loop_; base::RunLoop run_loop_;
};
class TestCallbackWaiterWithCounter : public TestCallbackWaiter {
public:
explicit TestCallbackWaiterWithCounter(int counter_limit)
: counter_limit_(counter_limit) {}
void Signal() override {
DCHECK_GT(counter_limit_, 0);
if (--counter_limit_ == 0) {
run_loop_.Quit();
}
}
private:
std::atomic<int> counter_limit_;
};
class TestCompletionResponder {
public:
MOCK_METHOD(void,
RecordsHandled,
(DmServerUploadService::CompletionResponse));
}; };
class AppInstallReportHandlerTest : public testing::Test { class RecordHandlerImplTest : public testing::Test {
public: public:
AppInstallReportHandlerTest() RecordHandlerImplTest()
: client_(std::make_unique<policy::MockCloudPolicyClient>()) {} : client_(std::make_unique<policy::MockCloudPolicyClient>()) {}
protected: protected:
...@@ -76,122 +96,98 @@ class AppInstallReportHandlerTest : public testing::Test { ...@@ -76,122 +96,98 @@ class AppInstallReportHandlerTest : public testing::Test {
std::unique_ptr<policy::MockCloudPolicyClient> client_; std::unique_ptr<policy::MockCloudPolicyClient> client_;
}; };
class TestRecord { std::unique_ptr<std::vector<EncryptedRecord>> RecordListBuilder(
public: uint64_t number_of_test_records,
explicit TestRecord(base::StringPiece key = "TEST_KEY", uint64_t generation_id) {
base::StringPiece value = "TEST_VALUE") { std::unique_ptr<std::vector<EncryptedRecord>> test_records =
data_.SetKey(key, base::Value(value)); std::make_unique<std::vector<EncryptedRecord>>();
}
for (uint64_t i = 0; i < number_of_test_records; i++) {
Record GetRecord() { EncryptedRecord encrypted_record;
Record record; encrypted_record.set_encrypted_wrapped_record(
std::string json_data; base::StrCat({"Record Number ", base::NumberToString(i)}));
base::JSONWriter::Write(data_, &json_data); auto* sequencing_information =
record.set_data(json_data); encrypted_record.mutable_sequencing_information();
record.set_destination(Destination::UPLOAD_EVENTS); sequencing_information->set_generation_id(generation_id);
return record; sequencing_information->set_sequencing_id(i);
sequencing_information->set_priority(Priority::IMMEDIATE);
test_records->push_back(std::move(encrypted_record));
} }
return test_records;
}
const base::Value* data() const { return &data_; } TEST_F(RecordHandlerImplTest, ForwardsRecordsToCloudPolicyClient) {
uint64_t kNumTestRecords = 10;
private: uint64_t kGenerationId = 1234;
base::Value data_{base::Value::Type::DICTIONARY}; auto test_records = RecordListBuilder(kNumTestRecords, kGenerationId);
};
TEST_F(AppInstallReportHandlerTest, AcceptsValidRecord) { TestCallbackWaiterWithCounter client_waiter{kNumTestRecords};
TestCallbackWaiter waiter; EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
TestRecord test_record; .Times(kNumTestRecords)
EXPECT_CALL(*client_, .WillRepeatedly(WithArgs<2>(
UploadExtensionInstallReport_(MatchValue(test_record.data()), _)) Invoke([&client_waiter](base::OnceCallback<void(bool)> callback) {
.WillOnce(WithArgs<1>(
Invoke([&waiter](AppInstallReportHandler::ClientCallback& callback) {
std::move(callback).Run(true); std::move(callback).Run(true);
waiter.Signal(); client_waiter.Signal();
}))); })));
AppInstallReportHandler handler(client_.get()); RecordHandlerImpl handler(client_.get());
Status handle_status = handler.HandleRecord(test_record.GetRecord());
EXPECT_OK(handle_status);
waiter.Wait();
}
TEST_F(AppInstallReportHandlerTest, DeniesInvalidDestination) {
EXPECT_CALL(*client_, UploadExtensionInstallReport_(_, _)).Times(0);
AppInstallReportHandler handler(client_.get());
Record test_record; TestCallbackWaiter responder_waiter;
test_record.set_destination(Destination::MEET_DEVICE_TELEMETRY); TestCompletionResponder responder;
EXPECT_CALL(responder, RecordsHandled(ValueEqualsProto(
test_records->back().sequencing_information())))
.WillOnce(Invoke([&responder_waiter]() { responder_waiter.Signal(); }));
Status handle_status = handler.HandleRecord(test_record); auto responder_callback = base::BindOnce(
EXPECT_FALSE(handle_status.ok()); &TestCompletionResponder::RecordsHandled, base::Unretained(&responder));
EXPECT_EQ(handle_status.error_code(), error::INVALID_ARGUMENT);
}
TEST_F(AppInstallReportHandlerTest, DeniesInvalidData) { handler.HandleRecords(std::move(test_records), std::move(responder_callback));
EXPECT_CALL(*client_, UploadExtensionInstallReport_(_, _)).Times(0);
AppInstallReportHandler handler(client_.get());
Record test_record; client_waiter.Wait();
test_record.set_data("BAD_DATA"); responder_waiter.Wait();
Status handle_status = handler.HandleRecord(test_record);
EXPECT_FALSE(handle_status.ok());
EXPECT_EQ(handle_status.error_code(), error::INVALID_ARGUMENT);
} }
TEST_F(AppInstallReportHandlerTest, ReportsUnsuccessfulCall) { TEST_F(RecordHandlerImplTest, ReportsEarlyFailure) {
TestCallbackWaiter waiter; uint64_t kNumSuccessfulUploads = 5;
uint64_t kNumTestRecords = 10;
uint64_t kGenerationId = 1234;
auto test_records = RecordListBuilder(kNumTestRecords, kGenerationId);
TestRecord test_record; // Wait kNumSuccessfulUploads times + 1 for the failure.
EXPECT_CALL(*client_, TestCallbackWaiterWithCounter client_waiter{kNumSuccessfulUploads + 1};
UploadExtensionInstallReport_(MatchValue(test_record.data()), _))
.WillOnce(WithArgs<1>( ::testing::InSequence seq;
Invoke([&waiter](AppInstallReportHandler::ClientCallback& callback) { EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.Times(kNumSuccessfulUploads)
.WillRepeatedly(WithArgs<2>(
Invoke([&client_waiter](base::OnceCallback<void(bool)> callback) {
std::move(callback).Run(true);
client_waiter.Signal();
})));
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.WillOnce(WithArgs<2>(
Invoke([&client_waiter](base::OnceCallback<void(bool)> callback) {
std::move(callback).Run(false); std::move(callback).Run(false);
waiter.Signal(); client_waiter.Signal();
}))); })));
AppInstallReportHandler handler(client_.get()); RecordHandlerImpl handler(client_.get());
Status handle_status = handler.HandleRecord(test_record.GetRecord());
EXPECT_OK(handle_status);
waiter.Wait();
}
class TestCallbackWaiterWithCounter : public TestCallbackWaiter {
public:
explicit TestCallbackWaiterWithCounter(int counter_limit)
: counter_limit_(counter_limit) {}
void Signal() override {
DCHECK_GT(counter_limit_, 0);
if (--counter_limit_ == 0) {
run_loop_->Quit();
}
}
private: TestCallbackWaiter responder_waiter;
std::atomic<int> counter_limit_; TestCompletionResponder responder;
}; EXPECT_CALL(
responder,
TEST_F(AppInstallReportHandlerTest, AcceptsMultipleValidRecords) { RecordsHandled(ValueEqualsProto(
const int kExpectedCallTimes = 10; (*test_records)[kNumSuccessfulUploads - 1].sequencing_information())))
TestCallbackWaiterWithCounter waiter{kExpectedCallTimes}; .WillOnce(Invoke([&responder_waiter]() { responder_waiter.Signal(); }));
TestRecord test_record; auto responder_callback = base::BindOnce(
EXPECT_CALL(*client_, &TestCompletionResponder::RecordsHandled, base::Unretained(&responder));
UploadExtensionInstallReport_(MatchValue(test_record.data()), _))
.WillRepeatedly(WithArgs<1>(
Invoke([&waiter](AppInstallReportHandler::ClientCallback& callback) {
std::move(callback).Run(true);
waiter.Signal();
})));
AppInstallReportHandler handler(client_.get()); handler.HandleRecords(std::move(test_records), std::move(responder_callback));
for (int i = 0; i < kExpectedCallTimes; i++) { client_waiter.Wait();
Status handle_status = handler.HandleRecord(test_record.GetRecord()); responder_waiter.Wait();
EXPECT_OK(handle_status);
}
waiter.Wait();
} }
} // namespace } // namespace
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
#include "base/test/task_environment.h" #include "base/test/task_environment.h"
#include "base/test/test_mock_time_task_runner.h" #include "base/test/test_mock_time_task_runner.h"
#include "base/values.h" #include "base/values.h"
#include "chrome/browser/policy/messaging_layer/upload/app_install_report_handler.h" #include "chrome/browser/policy/messaging_layer/upload/record_handler_impl.h"
#include "components/account_id/account_id.h" #include "components/account_id/account_id.h"
#include "components/policy/core/common/cloud/dm_token.h" #include "components/policy/core/common/cloud/dm_token.h"
#include "components/policy/core/common/cloud/mock_cloud_policy_client.h" #include "components/policy/core/common/cloud/mock_cloud_policy_client.h"
...@@ -36,6 +36,15 @@ using testing::Invoke; ...@@ -36,6 +36,15 @@ using testing::Invoke;
using testing::InvokeArgument; using testing::InvokeArgument;
using testing::WithArgs; using testing::WithArgs;
MATCHER_P(EqualsProto,
message,
"Match a proto Message equal to the matcher's argument.") {
std::string expected_serialized, actual_serialized;
message.SerializeToString(&expected_serialized);
arg.SerializeToString(&actual_serialized);
return expected_serialized == actual_serialized;
}
// Usage (in tests only): // Usage (in tests only):
// //
// TestEvent<ResType> e; // TestEvent<ResType> e;
...@@ -78,6 +87,12 @@ class TestCallbackWaiter { ...@@ -78,6 +87,12 @@ class TestCallbackWaiter {
virtual void Signal() { run_loop_->Quit(); } virtual void Signal() { run_loop_->Quit(); }
void CompleteExpectSequencingInformation(SequencingInformation expected,
SequencingInformation info) {
EXPECT_THAT(info, EqualsProto(expected));
Signal();
}
void Wait() { run_loop_->Run(); } void Wait() { run_loop_->Run(); }
protected: protected:
...@@ -142,31 +157,10 @@ class UploadClientTest : public ::testing::Test { ...@@ -142,31 +157,10 @@ class UploadClientTest : public ::testing::Test {
#endif // OS_CHROMEOS #endif // OS_CHROMEOS
}; };
TEST_F(UploadClientTest, CreateUploadClient) { TEST_F(UploadClientTest, CreateUploadClientAndUploadRecords) {
const int kExpectedCallTimes = 10; const int kExpectedCallTimes = 10;
const uint64_t kGenerationId = 1234; const uint64_t kGenerationId = 1234;
TestCallbackWaiterWithCounter waiter(kExpectedCallTimes);
auto client = std::make_unique<MockCloudPolicyClient>();
client->SetDMToken(
policy::DMToken::CreateValidTokenForTesting("FAKE_DM_TOKEN").value());
EXPECT_CALL(*client, UploadExtensionInstallReport_(_, _))
.WillRepeatedly(WithArgs<1>(
Invoke([&waiter](AppInstallReportHandler::ClientCallback& callback) {
std::move(callback).Run(true);
base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT},
base::BindOnce(&TestCallbackWaiterWithCounter::Signal,
base::Unretained(&waiter)));
})));
TestEvent<StatusOr<std::unique_ptr<UploadClient>>> e;
UploadClient::Create(std::move(client), base::DoNothing(), e.cb());
StatusOr<std::unique_ptr<UploadClient>> upload_client_result = e.result();
ASSERT_OK(upload_client_result) << upload_client_result.status();
base::Value data{base::Value::Type::DICTIONARY}; base::Value data{base::Value::Type::DICTIONARY};
data.SetKey("TEST_KEY", base::Value("TEST_VALUE")); data.SetKey("TEST_KEY", base::Value("TEST_VALUE"));
...@@ -180,7 +174,6 @@ TEST_F(UploadClientTest, CreateUploadClient) { ...@@ -180,7 +174,6 @@ TEST_F(UploadClientTest, CreateUploadClient) {
std::string serialized_record; std::string serialized_record;
wrapped_record.SerializeToString(&serialized_record); wrapped_record.SerializeToString(&serialized_record);
std::unique_ptr<std::vector<EncryptedRecord>> records = std::unique_ptr<std::vector<EncryptedRecord>> records =
std::make_unique<std::vector<EncryptedRecord>>(); std::make_unique<std::vector<EncryptedRecord>>();
for (int i = 0; i < kExpectedCallTimes; i++) { for (int i = 0; i < kExpectedCallTimes; i++) {
...@@ -195,11 +188,40 @@ TEST_F(UploadClientTest, CreateUploadClient) { ...@@ -195,11 +188,40 @@ TEST_F(UploadClientTest, CreateUploadClient) {
records->push_back(encrypted_record); records->push_back(encrypted_record);
} }
TestCallbackWaiterWithCounter waiter(kExpectedCallTimes);
auto client = std::make_unique<MockCloudPolicyClient>();
client->SetDMToken(
policy::DMToken::CreateValidTokenForTesting("FAKE_DM_TOKEN").value());
EXPECT_CALL(*client, UploadEncryptedReport(_, _, _))
.WillRepeatedly(WithArgs<2>(
Invoke([&waiter](base::OnceCallback<void(bool)> callback) {
std::move(callback).Run(true);
base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT},
base::BindOnce(&TestCallbackWaiterWithCounter::Signal,
base::Unretained(&waiter)));
})));
TestCallbackWaiter completion_callback_waiter;
UploadClient::ReportSuccessfulUploadCallback completion_cb =
base::BindRepeating(
&TestCallbackWaiter::CompleteExpectSequencingInformation,
base::Unretained(&completion_callback_waiter),
records->back().sequencing_information());
TestEvent<StatusOr<std::unique_ptr<UploadClient>>> e;
UploadClient::Create(std::move(client), completion_cb, e.cb());
StatusOr<std::unique_ptr<UploadClient>> upload_client_result = e.result();
ASSERT_OK(upload_client_result) << upload_client_result.status();
auto upload_client = std::move(upload_client_result.ValueOrDie()); auto upload_client = std::move(upload_client_result.ValueOrDie());
auto enqueue_result = upload_client->EnqueueUpload(std::move(records)); auto enqueue_result = upload_client->EnqueueUpload(std::move(records));
EXPECT_TRUE(enqueue_result.ok()); EXPECT_TRUE(enqueue_result.ok());
waiter.Wait(); waiter.Wait();
completion_callback_waiter.Wait();
} }
} // namespace } // namespace
......
...@@ -3628,9 +3628,8 @@ test("unit_tests") { ...@@ -3628,9 +3628,8 @@ test("unit_tests") {
"../browser/policy/messaging_layer/storage/storage_unittest.cc", "../browser/policy/messaging_layer/storage/storage_unittest.cc",
"../browser/policy/messaging_layer/storage/test_storage_module.cc", "../browser/policy/messaging_layer/storage/test_storage_module.cc",
"../browser/policy/messaging_layer/storage/test_storage_module.h", "../browser/policy/messaging_layer/storage/test_storage_module.h",
"../browser/policy/messaging_layer/upload/app_install_report_handler_unittest.cc",
"../browser/policy/messaging_layer/upload/dm_server_upload_service_unittest.cc", "../browser/policy/messaging_layer/upload/dm_server_upload_service_unittest.cc",
"../browser/policy/messaging_layer/upload/meet_device_telemetry_report_handler_unittest.cc", "../browser/policy/messaging_layer/upload/record_handler_impl_unittest.cc",
"../browser/policy/messaging_layer/upload/upload_client_unittest.cc", "../browser/policy/messaging_layer/upload/upload_client_unittest.cc",
"../browser/policy/messaging_layer/util/shared_queue_unittest.cc", "../browser/policy/messaging_layer/util/shared_queue_unittest.cc",
"../browser/policy/messaging_layer/util/shared_vector_unittest.cc", "../browser/policy/messaging_layer/util/shared_vector_unittest.cc",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment