Commit 5495795f authored by Leonid Baraz's avatar Leonid Baraz Committed by Commit Bot

Refactor Storage to handle EncryptedRecord.

Since EncrypedRecord represents not only data but also sequencing
information, rely on it everywhere, including the unittests.
Extend test to verify sequencing too.

Bug: b:153364303
Bug: 1078512
Change-Id: I04dfe6bb9c1bbaac92f34363b26e149011dc9502
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2380498
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Reviewed-by: default avatarZach Trudo <zatrudo@google.com>
Cr-Commit-Position: refs/heads/master@{#802734}
parent d2e70f93
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "chrome/browser/browser_process.h" #include "chrome/browser/browser_process.h"
#include "chrome/browser/browser_process_platform_part.h" #include "chrome/browser/browser_process_platform_part.h"
#include "chrome/browser/net/system_network_context_manager.h" #include "chrome/browser/net/system_network_context_manager.h"
#include "components/policy/proto/record.pb.h"
#include "services/network/public/cpp/shared_url_loader_factory.h" #include "services/network/public/cpp/shared_url_loader_factory.h"
#if defined(OS_CHROMEOS) #if defined(OS_CHROMEOS)
...@@ -122,63 +123,42 @@ StatusOr<std::unique_ptr<Uploader>> Uploader::Create( ...@@ -122,63 +123,42 @@ StatusOr<std::unique_ptr<Uploader>> Uploader::Create(
return uploader; return uploader;
} }
void Uploader::ProcessBlob(Priority priority, void Uploader::ProcessRecord(StatusOr<EncryptedRecord> data,
StatusOr<base::span<const uint8_t>> data, base::OnceCallback<void(bool)> processed_cb) {
base::OnceCallback<void(bool)> processed_cb) {
if (completed_ || !data.ok()) { if (completed_ || !data.ok()) {
std::move(processed_cb).Run(false); std::move(processed_cb).Run(false);
return; return;
} }
class ProcessBlobContext : public TaskRunnerContext<bool> { class ProcessRecordContext : public TaskRunnerContext<bool> {
public: public:
ProcessBlobContext( ProcessRecordContext(
base::span<const uint8_t> data, EncryptedRecord record,
std::vector<EncryptedRecord>* records, std::vector<EncryptedRecord>* records,
base::OnceCallback<void(bool)> processed_callback, base::OnceCallback<void(bool)> processed_callback,
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner) scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
: TaskRunnerContext<bool>(std::move(processed_callback), : TaskRunnerContext<bool>(std::move(processed_callback),
sequenced_task_runner), sequenced_task_runner),
records_(records), records_(records),
data_(data.begin(), data.end()) {} record_(std::move(record)) {}
private: private:
~ProcessBlobContext() override = default; ~ProcessRecordContext() override = default;
void OnStart() override { void OnStart() override {
if (data_.empty()) { records_->emplace_back(std::move(record_));
Complete(true); Response(true);
return;
}
ProcessBlob();
}
void ProcessBlob() {
EncryptedRecord record;
if (!record.ParseFromArray(data_.data(), data_.size())) {
Complete(false);
return;
}
records_->push_back(record);
Complete(true);
}
void Complete(bool success) {
if (!success) {
LOG(ERROR) << "Unable to process blob";
}
Response(success);
} }
std::vector<EncryptedRecord>* const records_; std::vector<EncryptedRecord>* const records_;
const std::vector<uint8_t> data_; const EncryptedRecord record_;
}; };
Start<ProcessBlobContext>(data.ValueOrDie(), encrypted_records_.get(), Start<ProcessRecordContext>(data.ValueOrDie(), encrypted_records_.get(),
std::move(processed_cb), sequenced_task_runner_); std::move(processed_cb), sequenced_task_runner_);
} }
void Uploader::Completed(Priority priority, Status final_status) { void Uploader::Completed(Status final_status) {
if (!final_status.ok()) { if (!final_status.ok()) {
// No work to do - something went wrong with storage and it no longer wants // No work to do - something went wrong with storage and it no longer wants
// to upload the records. Let the records die with |this|. // to upload the records. Let the records die with |this|.
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "chrome/browser/policy/messaging_layer/util/shared_queue.h" #include "chrome/browser/policy/messaging_layer/util/shared_queue.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h" #include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "chrome/browser/policy/messaging_layer/util/task_runner_context.h" #include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
#include "components/policy/proto/record.pb.h"
namespace reporting { namespace reporting {
...@@ -50,13 +51,10 @@ class ReportingClient { ...@@ -50,13 +51,10 @@ class ReportingClient {
Uploader(const Uploader& other) = delete; Uploader(const Uploader& other) = delete;
Uploader& operator=(const Uploader& other) = delete; Uploader& operator=(const Uploader& other) = delete;
// TODO(chromium:1078512) Priority is unused, remove it. void ProcessRecord(StatusOr<EncryptedRecord> data,
void ProcessBlob(Priority priority, base::OnceCallback<void(bool)> processed_cb) override;
StatusOr<base::span<const uint8_t>> data,
base::OnceCallback<void(bool)> processed_cb) override;
// TODO(chromium:1078512) Priority is unused, remove it. void Completed(Status final_status) override;
void Completed(Priority priority, Status final_status) override;
private: private:
explicit Uploader(UploadCallback upload_callback_); explicit Uploader(UploadCallback upload_callback_);
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "chrome/browser/policy/messaging_layer/storage/storage_queue.h" #include "chrome/browser/policy/messaging_layer/storage/storage_queue.h"
#include "chrome/browser/policy/messaging_layer/util/status_macros.h" #include "chrome/browser/policy/messaging_layer/util/status_macros.h"
#include "chrome/browser/policy/messaging_layer/util/task_runner_context.h" #include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
#include "components/policy/proto/record.pb.h"
namespace reporting { namespace reporting {
...@@ -123,12 +124,21 @@ class Storage::QueueUploaderInterface : public StorageQueue::UploaderInterface { ...@@ -123,12 +124,21 @@ class Storage::QueueUploaderInterface : public StorageQueue::UploaderInterface {
std::move(uploader)); std::move(uploader));
} }
void ProcessBlob(StatusOr<base::span<const uint8_t>> data, void ProcessRecord(StatusOr<EncryptedRecord> encrypted_record,
base::OnceCallback<void(bool)> processed_cb) override { base::OnceCallback<void(bool)> processed_cb) override {
storage_interface_->ProcessBlob(priority_, data, std::move(processed_cb)); if (encrypted_record.ok()) {
// Update sequencing information: add Priority and Generation ID.
SequencingInformation* const sequencing_info =
encrypted_record.ValueOrDie().mutable_sequencing_information();
sequencing_info->set_priority(priority_);
// sequencing_info->set_generation_id(...); Not supported yet.
}
storage_interface_->ProcessRecord(std::move(encrypted_record),
std::move(processed_cb));
} }
void Completed(Status final_status) override { void Completed(Status final_status) override {
storage_interface_->Completed(priority_, final_status); storage_interface_->Completed(final_status);
} }
private: private:
...@@ -230,13 +240,13 @@ Storage::Storage(const Options& options, StartUploadCb start_upload_cb) ...@@ -230,13 +240,13 @@ Storage::Storage(const Options& options, StartUploadCb start_upload_cb)
Storage::~Storage() = default; Storage::~Storage() = default;
void Storage::Write(Priority priority, void Storage::Write(Priority priority,
base::span<const uint8_t> data, EncryptedRecord record,
base::OnceCallback<void(Status)> completion_cb) { base::OnceCallback<void(Status)> completion_cb) {
// Note: queues_ never change after initialization is finished, so there is no // Note: queues_ never change after initialization is finished, so there is no
// need to protect or serialize access to it. // need to protect or serialize access to it.
ASSIGN_OR_ONCE_CALLBACK_AND_RETURN(scoped_refptr<StorageQueue> queue, ASSIGN_OR_ONCE_CALLBACK_AND_RETURN(scoped_refptr<StorageQueue> queue,
completion_cb, GetQueue(priority)); completion_cb, GetQueue(priority));
queue->Write(data, std::move(completion_cb)); queue->Write(std::move(record), std::move(completion_cb));
} }
void Storage::Confirm(Priority priority, void Storage::Confirm(Priority priority,
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
#include "base/callback.h" #include "base/callback.h"
#include "base/containers/flat_map.h" #include "base/containers/flat_map.h"
#include "base/containers/span.h"
#include "base/files/file_path.h" #include "base/files/file_path.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/memory/scoped_refptr.h" #include "base/memory/scoped_refptr.h"
...@@ -19,6 +18,7 @@ ...@@ -19,6 +18,7 @@
#include "chrome/browser/policy/messaging_layer/storage/storage_queue.h" #include "chrome/browser/policy/messaging_layer/storage/storage_queue.h"
#include "chrome/browser/policy/messaging_layer/util/status.h" #include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h" #include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "components/policy/proto/record.pb.h"
#include "components/policy/proto/record_constants.pb.h" #include "components/policy/proto/record_constants.pb.h"
namespace reporting { namespace reporting {
...@@ -38,17 +38,16 @@ class Storage : public base::RefCountedThreadSafe<Storage> { ...@@ -38,17 +38,16 @@ class Storage : public base::RefCountedThreadSafe<Storage> {
public: public:
virtual ~UploaderInterface() = default; virtual ~UploaderInterface() = default;
// Asynchronously processes every record (e.g. serializes and adds to the // Unserializes every record and hands ownership over for processing (e.g.
// network message). Expects |processed_cb| to be called after the record // to add to the network message). Expects |processed_cb| to be called after
// has been processed, with true if next record needs to be delivered and // the record or error status has been processed, with true if next record
// false if the Uploader should stop. // needs to be delivered and false if the Uploader should stop.
virtual void ProcessBlob(Priority priority, virtual void ProcessRecord(StatusOr<EncryptedRecord> record,
StatusOr<base::span<const uint8_t>> data, base::OnceCallback<void(bool)> processed_cb) = 0;
base::OnceCallback<void(bool)> processed_cb) = 0;
// Finalizes the upload (e.g. sends the message to the server and gets // Finalizes the upload (e.g. sends the message to the server and gets
// response). // response).
virtual void Completed(Priority priority, Status final_status) = 0; virtual void Completed(Status final_status) = 0;
}; };
// Callback type for UploadInterface provider for specified queue. // Callback type for UploadInterface provider for specified queue.
...@@ -82,11 +81,12 @@ class Storage : public base::RefCountedThreadSafe<Storage> { ...@@ -82,11 +81,12 @@ class Storage : public base::RefCountedThreadSafe<Storage> {
StartUploadCb start_upload_cb, StartUploadCb start_upload_cb,
base::OnceCallback<void(StatusOr<scoped_refptr<Storage>>)> completion_cb); base::OnceCallback<void(StatusOr<scoped_refptr<Storage>>)> completion_cb);
// Writes data blob into the Storage (the last file of it) according to the // Serializes EncryptedRecord (taking ownership of it) and writes the
// resulting blob into the Storage (the last file of it) according to the
// priority with the next sequencing number assigned. If file is going to // priority with the next sequencing number assigned. If file is going to
// become too large, it is closed and new file is created. // become too large, it is closed and new file is created.
void Write(Priority priority, void Write(Priority priority,
base::span<const uint8_t> data, EncryptedRecord record,
base::OnceCallback<void(Status)> completion_cb); base::OnceCallback<void(Status)> completion_cb);
// Confirms acceptance of the records according to the priority up to // Confirms acceptance of the records according to the priority up to
...@@ -123,7 +123,7 @@ class Storage : public base::RefCountedThreadSafe<Storage> { ...@@ -123,7 +123,7 @@ class Storage : public base::RefCountedThreadSafe<Storage> {
// Returns OK or error status, if anything failed to initialize. // Returns OK or error status, if anything failed to initialize.
Status Init(); Status Init();
// Helper function that selects queue by priority. Returns error // Helper method that selects queue by priority. Returns error
// if priority does not match any queue. // if priority does not match any queue.
// Note: queues_ never change after initialization is finished, so there is no // Note: queues_ never change after initialization is finished, so there is no
// need to protect or serialize access to it. // need to protect or serialize access to it.
......
...@@ -25,11 +25,7 @@ StorageModule::~StorageModule() = default; ...@@ -25,11 +25,7 @@ StorageModule::~StorageModule() = default;
void StorageModule::AddRecord(EncryptedRecord record, void StorageModule::AddRecord(EncryptedRecord record,
Priority priority, Priority priority,
base::OnceCallback<void(Status)> callback) { base::OnceCallback<void(Status)> callback) {
size_t record_size = record.ByteSizeLong(); storage_->Write(priority, std::move(record), std::move(callback));
auto data = std::make_unique<uint8_t[]>(record_size);
record.SerializeToArray(data.get(), record_size);
storage_->Write(priority, base::make_span(data.get(), record_size),
std::move(callback));
} }
void StorageModule::ReportSuccess( void StorageModule::ReportSuccess(
......
...@@ -29,8 +29,9 @@ class StorageModule : public base::RefCountedThreadSafe<StorageModule> { ...@@ -29,8 +29,9 @@ class StorageModule : public base::RefCountedThreadSafe<StorageModule> {
StorageModule(const StorageModule& other) = delete; StorageModule(const StorageModule& other) = delete;
StorageModule& operator=(const StorageModule& other) = delete; StorageModule& operator=(const StorageModule& other) = delete;
// AddRecord will add |record| to the |StorageModule| according to the // AddRecord will add |record| (taking ownership) to the |StorageModule|
// provided |priority|. On completion, |callback| will be called. // according to the provided |priority|. On completion, |callback| will be
// called.
virtual void AddRecord(reporting::EncryptedRecord record, virtual void AddRecord(reporting::EncryptedRecord record,
reporting::Priority priority, reporting::Priority priority,
base::OnceCallback<void(Status)> callback); base::OnceCallback<void(Status)> callback);
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
#include <vector> #include <vector>
#include "base/callback.h" #include "base/callback.h"
#include "base/containers/span.h"
#include "base/files/file.h" #include "base/files/file.h"
#include "base/files/file_path.h" #include "base/files/file_path.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
...@@ -24,6 +23,7 @@ ...@@ -24,6 +23,7 @@
#include "base/timer/timer.h" #include "base/timer/timer.h"
#include "chrome/browser/policy/messaging_layer/util/status.h" #include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h" #include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "components/policy/proto/record.pb.h"
namespace reporting { namespace reporting {
...@@ -104,12 +104,12 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -104,12 +104,12 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
public: public:
virtual ~UploaderInterface() = default; virtual ~UploaderInterface() = default;
// Asynchronously processes every record (e.g. serializes and adds to the // Unserializes every record and hands ownership over for processing (e.g.
// network message). Expects |processed_cb| to be called after the record // to add to the network message). Expects |processed_cb| to be called after
// or error status has been processed, with true if next record needs to be // the record or error status has been processed, with true if next record
// delivered and false if the Uploader should stop. // needs to be delivered and false if the Uploader should stop.
virtual void ProcessBlob(StatusOr<base::span<const uint8_t>> data, virtual void ProcessRecord(StatusOr<EncryptedRecord> record,
base::OnceCallback<void(bool)> processed_cb) = 0; base::OnceCallback<void(bool)> processed_cb) = 0;
// Finalizes the upload (e.g. sends the message to server and gets // Finalizes the upload (e.g. sends the message to server and gets
// response). Called always, regardless of whether there were errors. // response). Called always, regardless of whether there were errors.
...@@ -131,13 +131,14 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -131,13 +131,14 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
base::OnceCallback<void(StatusOr<scoped_refptr<StorageQueue>>)> base::OnceCallback<void(StatusOr<scoped_refptr<StorageQueue>>)>
completion_cb); completion_cb);
// Writes data blob into the StorageQueue (the last file of it) with the next // Serializes EncryptedRecord (taking ownership of it) and writes the
// sequencing number assigned. The write is a non-blocking operation - // resulting blob into the StorageQueue (the last file of it) with the next
// caller can "fire and forget" it (|completion_cb| allows to verify that // sequencing number assigned. The write is a non-blocking operation - caller
// record has been successfully enqueued). If file is going to become too // can "fire and forget" it (|completion_cb| allows to verify that record has
// large, it is closed and new file is created. // been successfully enqueued). If file is going to become too large, it is
// closed and new file is created.
// Helper methods: AssignLastFile, WriteHeaderAndBlock. // Helper methods: AssignLastFile, WriteHeaderAndBlock.
void Write(base::span<const uint8_t> data, void Write(EncryptedRecord record,
base::OnceCallback<void(Status)> completion_cb); base::OnceCallback<void(Status)> completion_cb);
// Confirms acceptance of the records up to |seq_number| (inclusively). // Confirms acceptance of the records up to |seq_number| (inclusively).
...@@ -152,9 +153,9 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -152,9 +153,9 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// a queue with an infinite or very large upload period. Multiple |Flush| // a queue with an infinite or very large upload period. Multiple |Flush|
// calls can safely run in parallel. // calls can safely run in parallel.
// Starts by calling |start_upload_cb_| that instantiates |UploaderInterface // Starts by calling |start_upload_cb_| that instantiates |UploaderInterface
// uploader|. Then repeatedly reads data blob(s) one by one from the // uploader|. Then repeatedly reads EncryptedRecord(s) one by one from the
// StorageQueue starting from |first_seq_number_|, handing each one over to // StorageQueue starting from |first_seq_number_|, handing each one over to
// |uploader|->ProcessBlob (keeping ownership of the buffer) and resuming // |uploader|->ProcessRecord (keeping ownership of the buffer) and resuming
// after result callback returns 'true'. Only files that have been closed are // after result callback returns 'true'. Only files that have been closed are
// included in reading; |Upload| makes sure to close the last writeable file // included in reading; |Upload| makes sure to close the last writeable file
// and create a new one before starting to send records to the |uploader|. If // and create a new one before starting to send records to the |uploader|. If
...@@ -163,8 +164,8 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -163,8 +164,8 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// |processed_cb| callback - in that case |Upload| will behave as if the end // |processed_cb| callback - in that case |Upload| will behave as if the end
// of data has been reached. While one or more |Upload|s are active, files can // of data has been reached. While one or more |Upload|s are active, files can
// be added to the StorageQueue but cannot be deleted. If processing of the // be added to the StorageQueue but cannot be deleted. If processing of the
// blob takes significant time, |uploader| implementation should be offset to // record takes significant time, |uploader| implementation should be offset
// another thread to avoid locking StorageQueue. // to another thread to avoid locking StorageQueue.
// Helper methods: SwitchLastFileIfNotEmpty, CollectFilesForUpload. // Helper methods: SwitchLastFileIfNotEmpty, CollectFilesForUpload.
void Flush(); void Flush();
...@@ -194,12 +195,12 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -194,12 +195,12 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
Status Delete(); Status Delete();
// Attempts to read |size| bytes from position |pos| and returns // Attempts to read |size| bytes from position |pos| and returns
// span of data that were actually read (no more than |size|). // reference to the data that were actually read (no more than |size|).
// End of file is indicated by empty span. // End of file is indicated by empty data.
StatusOr<base::span<const uint8_t>> Read(uint32_t pos, uint32_t size); StatusOr<base::StringPiece> Read(uint32_t pos, uint32_t size);
// Appends data to the file. // Appends data to the file.
StatusOr<uint32_t> Append(base::span<const uint8_t> data); StatusOr<uint32_t> Append(base::StringPiece data);
bool is_opened() const { return handle_.get() != nullptr; } bool is_opened() const { return handle_.get() != nullptr; }
bool is_readonly() const { bool is_readonly() const {
...@@ -232,7 +233,7 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -232,7 +233,7 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
size_t data_start_ = 0; size_t data_start_ = 0;
size_t data_end_ = 0; size_t data_end_ = 0;
uint64_t file_position_ = 0; uint64_t file_position_ = 0;
std::unique_ptr<uint8_t[]> buffer_; std::unique_ptr<char[]> buffer_;
}; };
// Private constructor, to be called by Create factory method only. // Private constructor, to be called by Create factory method only.
...@@ -242,8 +243,8 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -242,8 +243,8 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// and determines the sequencing information of the last record. // and determines the sequencing information of the last record.
// Must be called once and only once after construction. // Must be called once and only once after construction.
// Returns OK or error status, if anything failed to initialize. // Returns OK or error status, if anything failed to initialize.
// Called once, during initialization. Helper methods: EnumerateDataFiles, // Called once, during initialization.
// ScanLastFile. // Helper methods: EnumerateDataFiles, ScanLastFile.
Status Init(); Status Init();
// Helper method for Init(): enumerates all data files in the directory. // Helper method for Init(): enumerates all data files in the directory.
...@@ -268,7 +269,7 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -268,7 +269,7 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// Helper method for Write(): composes record header and writes it to the // Helper method for Write(): composes record header and writes it to the
// file, followed by data. // file, followed by data.
Status WriteHeaderAndBlock(base::span<const uint8_t> data, Status WriteHeaderAndBlock(base::StringPiece data,
scoped_refptr<SingleFile> file); scoped_refptr<SingleFile> file);
// Helper method for Upload: if the last file is not empty (has at least one // Helper method for Upload: if the last file is not empty (has at least one
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment