Commit 0af32427 authored by Leonid Baraz's avatar Leonid Baraz Committed by Commit Bot

Add Flush API to Storage

Bug: b:161266545
Change-Id: I57f47b6c8acff90de983ca4c553b6f4fdacfba0d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2296636
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Reviewed-by: default avatarZach Trudo <zatrudo@google.com>
Cr-Commit-Position: refs/heads/master@{#788768}
parent a9bbd8a5
...@@ -56,6 +56,13 @@ const uint64_t background_queue_total = 64 * 1024LL * 1024LL; ...@@ -56,6 +56,13 @@ const uint64_t background_queue_total = 64 * 1024LL * 1024LL;
constexpr base::TimeDelta background_upload_period = constexpr base::TimeDelta background_upload_period =
base::TimeDelta::FromMinutes(1); base::TimeDelta::FromMinutes(1);
constexpr base::FilePath::CharType manual_queue_subdir[] =
FILE_PATH_LITERAL("Manual");
constexpr base::FilePath::CharType manual_queue_prefix[] =
FILE_PATH_LITERAL("P_Manual");
const uint64_t manual_queue_total = 64 * 1024LL * 1024LL;
constexpr base::TimeDelta manual_upload_period = base::TimeDelta::Max();
// Returns vector of <priority, queue_options> for all expected queues in // Returns vector of <priority, queue_options> for all expected queues in
// Storage. Queues are all located under the given root directory. // Storage. Queues are all located under the given root directory.
std::vector<std::pair<Priority, StorageQueue::Options>> ExpectedQueues( std::vector<std::pair<Priority, StorageQueue::Options>> ExpectedQueues(
...@@ -87,6 +94,13 @@ std::vector<std::pair<Priority, StorageQueue::Options>> ExpectedQueues( ...@@ -87,6 +94,13 @@ std::vector<std::pair<Priority, StorageQueue::Options>> ExpectedQueues(
.set_file_prefix(background_queue_prefix) .set_file_prefix(background_queue_prefix)
.set_total_size(background_queue_total) .set_total_size(background_queue_total)
.set_upload_period(background_upload_period)), .set_upload_period(background_upload_period)),
std::make_pair(
MANUAL_BATCH,
StorageQueue::Options()
.set_directory(root_directory.Append(manual_queue_subdir))
.set_file_prefix(manual_queue_prefix)
.set_total_size(manual_queue_total)
.set_upload_period(manual_upload_period)),
}; };
} }
...@@ -220,15 +234,9 @@ void Storage::Write(Priority priority, ...@@ -220,15 +234,9 @@ void Storage::Write(Priority priority,
base::OnceCallback<void(Status)> completion_cb) { base::OnceCallback<void(Status)> completion_cb) {
// Note: queues_ never change after initialization is finished, so there is no // Note: queues_ never change after initialization is finished, so there is no
// need to protect or serialize access to it. // need to protect or serialize access to it.
auto it = queues_.find(priority); ASSIGN_OR_ONCE_CALLBACK_AND_RETURN(scoped_refptr<StorageQueue> queue,
if (it == queues_.end()) { completion_cb, GetQueue(priority));
std::move(completion_cb) queue->Write(data, std::move(completion_cb));
.Run(Status(error::NOT_FOUND,
base::StrCat({"Undefined priority=",
base::NumberToString(priority)})));
return;
}
it->second->Write(data, std::move(completion_cb));
} }
void Storage::Confirm(Priority priority, void Storage::Confirm(Priority priority,
...@@ -236,15 +244,27 @@ void Storage::Confirm(Priority priority, ...@@ -236,15 +244,27 @@ void Storage::Confirm(Priority priority,
base::OnceCallback<void(Status)> completion_cb) { base::OnceCallback<void(Status)> completion_cb) {
// Note: queues_ never change after initialization is finished, so there is no // Note: queues_ never change after initialization is finished, so there is no
// need to protect or serialize access to it. // need to protect or serialize access to it.
ASSIGN_OR_ONCE_CALLBACK_AND_RETURN(scoped_refptr<StorageQueue> queue,
completion_cb, GetQueue(priority));
queue->Confirm(seq_number, std::move(completion_cb));
}
Status Storage::Flush(Priority priority) {
// Note: queues_ never change after initialization is finished, so there is no
// need to protect or serialize access to it.
ASSIGN_OR_RETURN(scoped_refptr<StorageQueue> queue, GetQueue(priority));
queue->Flush();
return Status::StatusOK();
}
StatusOr<scoped_refptr<StorageQueue>> Storage::GetQueue(Priority priority) {
auto it = queues_.find(priority); auto it = queues_.find(priority);
if (it == queues_.end()) { if (it == queues_.end()) {
std::move(completion_cb) return Status(
.Run(Status(error::NOT_FOUND, error::NOT_FOUND,
base::StrCat({"Undefined priority=", base::StrCat({"Undefined priority=", base::NumberToString(priority)}));
base::NumberToString(priority)})));
return;
} }
it->second->Confirm(seq_number, std::move(completion_cb)); return it->second;
} }
} // namespace reporting } // namespace reporting
...@@ -96,6 +96,12 @@ class Storage : public base::RefCountedThreadSafe<Storage> { ...@@ -96,6 +96,12 @@ class Storage : public base::RefCountedThreadSafe<Storage> {
uint64_t seq_number, uint64_t seq_number,
base::OnceCallback<void(Status)> completion_cb); base::OnceCallback<void(Status)> completion_cb);
// Initiates upload of collected records according to the priority.
// Called usually for a queue with an infinite or very large upload period.
// Multiple |Flush| calls can safely run in parallel.
// Returns error if cannot start upload.
Status Flush(Priority priority);
Storage(const Storage& other) = delete; Storage(const Storage& other) = delete;
Storage& operator=(const Storage& other) = delete; Storage& operator=(const Storage& other) = delete;
...@@ -117,6 +123,12 @@ class Storage : public base::RefCountedThreadSafe<Storage> { ...@@ -117,6 +123,12 @@ class Storage : public base::RefCountedThreadSafe<Storage> {
// Returns OK or error status, if anything failed to initialize. // Returns OK or error status, if anything failed to initialize.
Status Init(); Status Init();
// Helper function that selects queue by priority. Returns error
// if priority does not match any queue.
// Note: queues_ never change after initialization is finished, so there is no
// need to protect or serialize access to it.
StatusOr<scoped_refptr<StorageQueue>> GetQueue(Priority priority);
const Options options_; const Options options_;
// Map priority->StorageQueue. // Map priority->StorageQueue.
......
...@@ -150,7 +150,7 @@ Status StorageQueue::Init() { ...@@ -150,7 +150,7 @@ Status StorageQueue::Init() {
// Initiate periodic uploading, if needed. // Initiate periodic uploading, if needed.
if (!options_.upload_period().is_zero()) { if (!options_.upload_period().is_zero()) {
upload_timer_.Start(FROM_HERE, options_.upload_period(), this, upload_timer_.Start(FROM_HERE, options_.upload_period(), this,
&StorageQueue::PeriodicUpload); &StorageQueue::Flush);
} }
return Status::StatusOK(); return Status::StatusOK();
} }
...@@ -564,18 +564,6 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> { ...@@ -564,18 +564,6 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> {
SEQUENCE_CHECKER(read_sequence_checker_); SEQUENCE_CHECKER(read_sequence_checker_);
}; };
void StorageQueue::PeriodicUpload() {
// Note: new uploader created every time PeriodicUpload is called.
StatusOr<std::unique_ptr<UploaderInterface>> uploader =
start_upload_cb_.Run();
if (!uploader.ok()) {
LOG(ERROR) << "Failed to provide the Uploader, status="
<< uploader.status();
return;
}
Start<ReadContext>(std::move(uploader.ValueOrDie()), this);
}
class StorageQueue::WriteContext : public TaskRunnerContext<Status> { class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
public: public:
WriteContext(base::span<const uint8_t> data, WriteContext(base::span<const uint8_t> data,
...@@ -780,6 +768,18 @@ Status StorageQueue::RemoveUnusedFiles(uint64_t seq_number) { ...@@ -780,6 +768,18 @@ Status StorageQueue::RemoveUnusedFiles(uint64_t seq_number) {
return Status::StatusOK(); return Status::StatusOK();
} }
void StorageQueue::Flush() {
// Note: new uploader created every time Flush is called.
StatusOr<std::unique_ptr<UploaderInterface>> uploader =
start_upload_cb_.Run();
if (!uploader.ok()) {
LOG(ERROR) << "Failed to provide the Uploader, status="
<< uploader.status();
return;
}
Start<ReadContext>(std::move(uploader.ValueOrDie()), this);
}
// //
// SingleFile implementation // SingleFile implementation
// //
......
...@@ -89,6 +89,8 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -89,6 +89,8 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// Time period the data is uploaded with. // Time period the data is uploaded with.
// If 0, uploaded immediately after a new record is stored // If 0, uploaded immediately after a new record is stored
// (this setting is intended for the immediate priority). // (this setting is intended for the immediate priority).
// Can be set to infinity - in that case Flush() is expected to be
// called from time to time.
base::TimeDelta upload_period_; base::TimeDelta upload_period_;
}; };
...@@ -145,6 +147,27 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -145,6 +147,27 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
void Confirm(uint64_t seq_number, void Confirm(uint64_t seq_number,
base::OnceCallback<void(Status)> completion_cb); base::OnceCallback<void(Status)> completion_cb);
// Initiates upload of collected records. Called periodically by timer, based
// on upload_period of the queue, and can also be called explicitly - for
// a queue with an infinite or very large upload period. Multiple |Flush|
// calls can safely run in parallel.
// Starts by calling |start_upload_cb_| that instantiates |UploaderInterface
// uploader|. Then repeatedly reads data blob(s) one by one from the
// StorageQueue starting from |first_seq_number_|, handing each one over to
// |uploader|->ProcessBlob (keeping ownership of the buffer) and resuming
// after result callback returns 'true'. Only files that have been closed are
// included in reading; |Upload| makes sure to close the last writeable file
// and create a new one before starting to send records to the |uploader|. If
// the monotonic order of sequencing is broken, INTERNAL error Status is
// reported. |Upload| can be stopped after any record by returning 'false' to
// |processed_cb| callback - in that case |Upload| will behave as if the end
// of data has been reached. While one or more |Upload|s are active, files can
// be added to the StorageQueue but cannot be deleted. If processing of the
// blob takes significant time, |uploader| implementation should be offset to
// another thread to avoid locking StorageQueue.
// Helper methods: SwitchLastFileIfNotEmpty, CollectFilesForUpload.
void Flush();
StorageQueue(const StorageQueue& other) = delete; StorageQueue(const StorageQueue& other) = delete;
StorageQueue& operator=(const StorageQueue& other) = delete; StorageQueue& operator=(const StorageQueue& other) = delete;
...@@ -223,25 +246,6 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -223,25 +246,6 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// ScanLastFile. // ScanLastFile.
Status Init(); Status Init();
// Periodically uploads previously stored but not confirmed records.
// Starts by calling |start_upload_cb_| that instantiates |UploaderInterface
// uploader|. Then repeatedly reads data blob(s) one by one from the
// StorageQueue starting from |first_seq_number_|, handing each one over to
// |uploader|->ProcessBlob (keeping ownership of the buffer) and resuming
// after result callback returns 'true'. Only files that have been closed are
// included in reading; |Upload| makes sure to close the last writeable file
// and create a new one before starting to send records to the |uploader|. If
// the monotonic order of sequencing is broken, INTERNAL error Status is
// reported. |Upload| can be stopped after any record by returning 'false' to
// |processed_cb| callback - in that case |Upload| will behave as if the end
// of data has been reached. While one or more |Upload|s are active, files can
// be added to the StorageQueue but cannot be deleted. If processing of the
// blob takes significant time, |uploader| implementation should be offset to
// another thread to avoid locking StorageQueue.
// Called by timer. Helper methods: SwitchLastFileIfNotEmpty,
// CollectFilesForUpload.
void PeriodicUpload();
// Helper method for Init(): enumerates all data files in the directory. // Helper method for Init(): enumerates all data files in the directory.
// Valid file names are <prefix>.<seq_number>, any other names are ignored. // Valid file names are <prefix>.<seq_number>, any other names are ignored.
Status EnumerateDataFiles(); Status EnumerateDataFiles();
......
...@@ -159,6 +159,10 @@ class StorageQueueTest : public ::testing::TestWithParam<size_t> { ...@@ -159,6 +159,10 @@ class StorageQueueTest : public ::testing::TestWithParam<size_t> {
return BuildStorageQueueOptionsImmediate().set_upload_period(upload_period); return BuildStorageQueueOptionsImmediate().set_upload_period(upload_period);
} }
StorageQueue::Options BuildStorageQueueOptionsOnlyManual() const {
return BuildStorageQueueOptionsPeriodic(base::TimeDelta::Max());
}
StatusOr<std::unique_ptr<StorageQueue::UploaderInterface>> StatusOr<std::unique_ptr<StorageQueue::UploaderInterface>>
BuildMockUploader() { BuildMockUploader() {
auto uploader = std::make_unique<MockUploadClient>(); auto uploader = std::make_unique<MockUploadClient>();
...@@ -229,6 +233,25 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndUpload) { ...@@ -229,6 +233,25 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndUpload) {
task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1)); task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1));
} }
TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndFlush) {
CreateStorageQueueOrDie(BuildStorageQueueOptionsOnlyManual());
WriteStringOrDie(blobs[0]);
WriteStringOrDie(blobs[1]);
WriteStringOrDie(blobs[2]);
// Set uploader expectations.
EXPECT_CALL(set_mock_uploader_expectations_, Call(NotNull()))
.WillOnce(Invoke([](MockUploadClient* mock_upload_client) {
MockUploadClient::SetUp(mock_upload_client)
.Required(blobs[0])
.Required(blobs[1])
.Required(blobs[2]);
}));
// Flush manually.
storage_queue_->Flush();
}
TEST_P(StorageQueueTest, WriteAndRepeatedlyUploadWithConfirmations) { TEST_P(StorageQueueTest, WriteAndRepeatedlyUploadWithConfirmations) {
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic()); CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
......
...@@ -252,6 +252,27 @@ TEST_F(StorageTest, WriteIntoNewStorageAndUpload) { ...@@ -252,6 +252,27 @@ TEST_F(StorageTest, WriteIntoNewStorageAndUpload) {
task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1)); task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1));
} }
TEST_F(StorageTest, WriteIntoNewStorageAndFlush) {
CreateStorageTestOrDie(BuildStorageOptions());
WriteStringOrDie(MANUAL_BATCH, blobs[0]);
WriteStringOrDie(MANUAL_BATCH, blobs[1]);
WriteStringOrDie(MANUAL_BATCH, blobs[2]);
// Set uploader expectations.
EXPECT_CALL(set_mock_uploader_expectations_,
Call(Eq(MANUAL_BATCH), NotNull()))
.WillOnce(
Invoke([](Priority priority, MockUploadClient* mock_upload_client) {
MockUploadClient::SetUp(priority, mock_upload_client)
.Required(blobs[0])
.Required(blobs[1])
.Required(blobs[2]);
}));
// Trigger upload.
EXPECT_OK(storage_->Flush(MANUAL_BATCH));
}
TEST_F(StorageTest, WriteAndRepeatedlyUploadWithConfirmations) { TEST_F(StorageTest, WriteAndRepeatedlyUploadWithConfirmations) {
CreateStorageTestOrDie(BuildStorageOptions()); CreateStorageTestOrDie(BuildStorageOptions());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment