Commit 8a85f21a authored by Leonid Baraz's avatar Leonid Baraz Committed by Commit Bot

Boilerplate for processing Gaps.

Add ProcessGap entry point to Uploader, so that later on
StorageQueue will be able to call it whenever there are no records
in the queue that server expects to get, or if records are corrupt
(e.g., file was deleted or altered).

Currently StorageQueue enumerated records from the first it has and
calls ProcessRecord for every one. If something goes wrong, it bails out.

The plan is to change it like this in further CLs:
1) Separate last confirmed seq id from the first queue has.
2) If the last confirmed is lower, call ProcessGap for all those that are
   missing.
3) If anything is wrong, call ProcessGap for that one and switch to the
   next record regardless.

ProcessRecord will add EncryptedRecord with data to the request to send.
ProcessGap will add one or more Encrypted records with no data - that
seems to be the easiest approach.

Bug: b:169248924
Change-Id: I3cd600daf819a5ca8428c79788669be59f6d01dc
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2463766
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Reviewed-by: default avatarZach Trudo <zatrudo@google.com>
Cr-Commit-Position: refs/heads/master@{#815881}
parent b6dbb3a8
...@@ -168,32 +168,23 @@ void ReportingClient::Uploader::ProcessRecord( ...@@ -168,32 +168,23 @@ void ReportingClient::Uploader::ProcessRecord(
return; return;
} }
class ProcessRecordContext : public TaskRunnerContext<bool> { sequenced_task_runner_->PostTask(
public: FROM_HERE,
ProcessRecordContext( base::BindOnce(
EncryptedRecord record, [](std::vector<EncryptedRecord>* records, EncryptedRecord record,
std::vector<EncryptedRecord>* records, base::OnceCallback<void(bool)> processed_cb) {
base::OnceCallback<void(bool)> processed_callback, records->emplace_back(std::move(record));
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner) std::move(processed_cb).Run(true);
: TaskRunnerContext<bool>(std::move(processed_callback), },
sequenced_task_runner), base::Unretained(encrypted_records_.get()),
records_(records), std::move(data.ValueOrDie()), std::move(processed_cb)));
record_(std::move(record)) {} }
private: void ReportingClient::Uploader::ProcessGap(
~ProcessRecordContext() override = default; SequencingInformation start,
uint64_t count,
void OnStart() override { base::OnceCallback<void(bool)> processed_cb) {
records_->emplace_back(std::move(record_)); LOG(FATAL) << "Gap not implemented yet";
Response(true);
}
std::vector<EncryptedRecord>* const records_;
const EncryptedRecord record_;
};
Start<ProcessRecordContext>(data.ValueOrDie(), encrypted_records_.get(),
std::move(processed_cb), sequenced_task_runner_);
} }
void ReportingClient::Uploader::Completed(Status final_status) { void ReportingClient::Uploader::Completed(Status final_status) {
......
...@@ -227,6 +227,9 @@ class ReportingClient { ...@@ -227,6 +227,9 @@ class ReportingClient {
void ProcessRecord(StatusOr<EncryptedRecord> data, void ProcessRecord(StatusOr<EncryptedRecord> data,
base::OnceCallback<void(bool)> processed_cb) override; base::OnceCallback<void(bool)> processed_cb) override;
void ProcessGap(SequencingInformation start,
uint64_t count,
base::OnceCallback<void(bool)> processed_cb) override;
void Completed(Status final_status) override; void Completed(Status final_status) override;
......
...@@ -138,6 +138,13 @@ class Storage::QueueUploaderInterface : public StorageQueue::UploaderInterface { ...@@ -138,6 +138,13 @@ class Storage::QueueUploaderInterface : public StorageQueue::UploaderInterface {
std::move(processed_cb)); std::move(processed_cb));
} }
void ProcessGap(SequencingInformation start,
uint64_t count,
base::OnceCallback<void(bool)> processed_cb) override {
storage_interface_->ProcessGap(std::move(start), count,
std::move(processed_cb));
}
void Completed(Status final_status) override { void Completed(Status final_status) override {
storage_interface_->Completed(final_status); storage_interface_->Completed(final_status);
} }
......
...@@ -28,28 +28,8 @@ namespace reporting { ...@@ -28,28 +28,8 @@ namespace reporting {
// according to the priority. // according to the priority.
class Storage : public base::RefCountedThreadSafe<Storage> { class Storage : public base::RefCountedThreadSafe<Storage> {
public: public:
// Interface for Upload, which must be implemented by an object returned by // Interface for Upload, forwarding to StorageQueue::UploaderInterface.
// |StartUpload| callback (see below). using UploaderInterface = StorageQueue::UploaderInterface;
// Every time Storage starts an upload (by timer or immediately after Write)
// it uses this interface to hand available records over to the actual
// uploader. Storage takes ownership of it and automatically discards after
// |Completed| returns. Similar to StorageQueue::UploaderInterface, but with
// added priority parameter.
class UploaderInterface {
public:
virtual ~UploaderInterface() = default;
// Unserializes every record and hands ownership over for processing (e.g.
// to add to the network message). Expects |processed_cb| to be called after
// the record or error status has been processed, with true if next record
// needs to be delivered and false if the Uploader should stop.
virtual void ProcessRecord(StatusOr<EncryptedRecord> record,
base::OnceCallback<void(bool)> processed_cb) = 0;
// Finalizes the upload (e.g. sends the message to the server and gets
// response).
virtual void Completed(Status final_status) = 0;
};
// Callback type for UploadInterface provider for specified queue. // Callback type for UploadInterface provider for specified queue.
using StartUploadCb = using StartUploadCb =
......
...@@ -656,6 +656,7 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> { ...@@ -656,6 +656,7 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> {
void CallCurrentRecord(uint64_t generation_id, void CallCurrentRecord(uint64_t generation_id,
uint64_t seq_number, uint64_t seq_number,
base::StringPiece blob) { base::StringPiece blob) {
DCHECK_CALLED_ON_VALID_SEQUENCE(read_sequence_checker_);
google::protobuf::io::ArrayInputStream stream( // Zero-copy stream. google::protobuf::io::ArrayInputStream stream( // Zero-copy stream.
blob.data(), blob.size()); blob.data(), blob.size());
EncryptedRecord encrypted_record; EncryptedRecord encrypted_record;
...@@ -1030,6 +1031,7 @@ void StorageQueue::Confirm(uint64_t seq_number, ...@@ -1030,6 +1031,7 @@ void StorageQueue::Confirm(uint64_t seq_number,
Status StorageQueue::RemoveConfirmedData(uint64_t seq_number) { Status StorageQueue::RemoveConfirmedData(uint64_t seq_number) {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_);
// Update first available number, if new one is higher.
if (first_seq_number_ <= seq_number) { if (first_seq_number_ <= seq_number) {
first_seq_number_ = seq_number + 1; first_seq_number_ = seq_number + 1;
} }
......
...@@ -113,6 +113,14 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -113,6 +113,14 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
virtual void ProcessRecord(StatusOr<EncryptedRecord> record, virtual void ProcessRecord(StatusOr<EncryptedRecord> record,
base::OnceCallback<void(bool)> processed_cb) = 0; base::OnceCallback<void(bool)> processed_cb) = 0;
// Makes a note of a gap [start, start + count). Expects |processed_cb| to
// be called after the record or error status has been processed, with true
// if next record needs to be delivered and false if the Uploader should
// stop.
virtual void ProcessGap(SequencingInformation start,
uint64_t count,
base::OnceCallback<void(bool)> processed_cb) = 0;
// Finalizes the upload (e.g. sends the message to server and gets // Finalizes the upload (e.g. sends the message to server and gets
// response). Called always, regardless of whether there were errors. // response). Called always, regardless of whether there were errors.
virtual void Completed(Status final_status) = 0; virtual void Completed(Status final_status) = 0;
...@@ -161,14 +169,15 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -161,14 +169,15 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// |uploader|->ProcessRecord (keeping ownership of the buffer) and resuming // |uploader|->ProcessRecord (keeping ownership of the buffer) and resuming
// after result callback returns 'true'. Only files that have been closed are // after result callback returns 'true'. Only files that have been closed are
// included in reading; |Upload| makes sure to close the last writeable file // included in reading; |Upload| makes sure to close the last writeable file
// and create a new one before starting to send records to the |uploader|. If // and create a new one before starting to send records to the |uploader|.
// the monotonic order of sequencing is broken, INTERNAL error Status is // If some records are not available or corrupt, |uploader|->ProcessGap is
// reported. |Upload| can be stopped after any record by returning 'false' to // called. If the monotonic order of sequencing is broken, INTERNAL error
// |processed_cb| callback - in that case |Upload| will behave as if the end // Status is reported. |Upload| can be stopped after any record by returning
// of data has been reached. While one or more |Upload|s are active, files can // 'false' to |processed_cb| callback - in that case |Upload| will behave as
// be added to the StorageQueue but cannot be deleted. If processing of the // if the end of data has been reached. While one or more |Upload|s are
// record takes significant time, |uploader| implementation should be offset // active, files can be added to the StorageQueue but cannot be deleted. If
// to another thread to avoid locking StorageQueue. // processing of the record takes significant time, |uploader| implementation
// should be offset to another thread to avoid locking StorageQueue.
// Helper methods: SwitchLastFileIfNotEmpty, CollectFilesForUpload. // Helper methods: SwitchLastFileIfNotEmpty, CollectFilesForUpload.
void Flush(); void Flush();
......
...@@ -150,6 +150,12 @@ class MockUploadClient : public StorageQueue::UploaderInterface { ...@@ -150,6 +150,12 @@ class MockUploadClient : public StorageQueue::UploaderInterface {
wrapped_record.record().data())); wrapped_record.record().data()));
} }
void ProcessGap(SequencingInformation start,
uint64_t count,
base::OnceCallback<void(bool)> processed_cb) override {
LOG(FATAL) << "Gap not implemented yet";
}
void Completed(Status status) override { UploadComplete(status); } void Completed(Status status) override { UploadComplete(status); }
MOCK_METHOD(bool, UploadRecord, (uint64_t, base::StringPiece), (const)); MOCK_METHOD(bool, UploadRecord, (uint64_t, base::StringPiece), (const));
......
...@@ -157,6 +157,12 @@ class MockUploadClient : public Storage::UploaderInterface { ...@@ -157,6 +157,12 @@ class MockUploadClient : public Storage::UploaderInterface {
wrapped_record.record().data())); wrapped_record.record().data()));
} }
void ProcessGap(SequencingInformation start,
uint64_t count,
base::OnceCallback<void(bool)> processed_cb) override {
LOG(FATAL) << "Gap not implemented yet";
}
void Completed(Status status) override { UploadComplete(status); } void Completed(Status status) override { UploadComplete(status); }
MOCK_METHOD(bool, MOCK_METHOD(bool,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment