Commit 526613bf authored by Leonid Baraz's avatar Leonid Baraz Committed by Commit Bot

Handle missing files on StorageQueue opening.

Bug: b:169248924
Change-Id: I8ca3f363d7b616d06707b6dd3639ddc29a334351
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2481004Reviewed-by: default avatarZach Trudo <zatrudo@google.com>
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#818548}
parent a8c00288
...@@ -171,9 +171,33 @@ Status StorageQueue::Init() { ...@@ -171,9 +171,33 @@ Status StorageQueue::Init() {
// data, but must have metadata for the last data, because metadata is only // data, but must have metadata for the last data, because metadata is only
// removed once data is written. So we are picking the metadata matching the // removed once data is written. So we are picking the metadata matching the
// last sequencing number and load both digest and generation id from there. // last sequencing number and load both digest and generation id from there.
// If there is no match, we bail out for now; later on we will instead const Status status = RestoreMetadata(&used_files_set);
// start a new generation from the next sequencing number (with no digest!) // If there is no match, clear up everything we've found before and start
RETURN_IF_ERROR(RestoreMetadata(&used_files_set)); // a new generation from scratch.
// In the future we could possibly consider preserving the previous
// generation data, but will need to resolve multiple issues:
// 1) we would need to send the old generation before starting to send
// the new one, which could trigger a loss of data in the new generation.
// 2) we could end up with 3 or more generations, if the loss of metadata
// repeats. Which of them should be sent first (which one is expected
// by the server)?
// 3) different generations might include the same sequencing ids;
// how do we resolve file naming then? Should we add generation id
// to the file name too?
// Because of all this, for now we just drop the old generation data
// and start the new one from scratch.
if (!status.ok()) {
LOG(ERROR) << "Failed to restore metadata, status=" << status;
// Reset all parameters as they were at the beginning of Init().
// Some of them might have been changed earlier.
next_seq_number_ = 0;
first_seq_number_ = 0;
first_unconfirmed_seq_number_ = base::nullopt;
last_record_digest_ = base::nullopt;
// Delete all files.
files_.clear();
used_files_set.clear();
}
} }
// Delete all files except used ones. // Delete all files except used ones.
DeleteUnusedFiles(used_files_set); DeleteUnusedFiles(used_files_set);
...@@ -532,7 +556,7 @@ Status StorageQueue::RestoreMetadata( ...@@ -532,7 +556,7 @@ Status StorageQueue::RestoreMetadata(
base::StrCat({"Cannot read metafile=", meta_file->name(), base::StrCat({"Cannot read metafile=", meta_file->name(),
" status=", read_result.status().ToString()})); " status=", read_result.status().ToString()}));
} }
generation_id_ = const uint64_t generation_id =
*reinterpret_cast<const uint64_t*>(read_result.ValueOrDie().data()); *reinterpret_cast<const uint64_t*>(read_result.ValueOrDie().data());
// Read last record digest. // Read last record digest.
read_result = read_result =
...@@ -543,6 +567,8 @@ Status StorageQueue::RestoreMetadata( ...@@ -543,6 +567,8 @@ Status StorageQueue::RestoreMetadata(
base::StrCat({"Cannot read metafile=", meta_file->name(), base::StrCat({"Cannot read metafile=", meta_file->name(),
" status=", read_result.status().ToString()})); " status=", read_result.status().ToString()}));
} }
// Everything read successfully, set the queue up.
generation_id_ = generation_id;
last_record_digest_ = std::string(read_result.ValueOrDie()); last_record_digest_ = std::string(read_result.ValueOrDie());
// Store used metadata file. // Store used metadata file.
used_files_set->emplace(meta_file_path); used_files_set->emplace(meta_file_path);
......
...@@ -185,6 +185,9 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -185,6 +185,9 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// Test only: makes specified records fail on reading. // Test only: makes specified records fail on reading.
void TestInjectBlockReadErrors(std::initializer_list<uint64_t> seq_numbers); void TestInjectBlockReadErrors(std::initializer_list<uint64_t> seq_numbers);
// Access queue options.
const Options& options() const { return options_; }
StorageQueue(const StorageQueue& other) = delete; StorageQueue(const StorageQueue& other) = delete;
StorageQueue& operator=(const StorageQueue& other) = delete; StorageQueue& operator=(const StorageQueue& other) = delete;
......
...@@ -9,6 +9,8 @@ ...@@ -9,6 +9,8 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "base/files/file_path.h"
#include "base/files/file_util.h"
#include "base/files/scoped_temp_dir.h" #include "base/files/scoped_temp_dir.h"
#include "base/optional.h" #include "base/optional.h"
#include "base/strings/strcat.h" #include "base/strings/strcat.h"
...@@ -36,6 +38,9 @@ using ::testing::WithArg; ...@@ -36,6 +38,9 @@ using ::testing::WithArg;
namespace reporting { namespace reporting {
namespace { namespace {
// Metadata file name prefix.
const base::FilePath::CharType METADATA_NAME[] = FILE_PATH_LITERAL("META");
// Usage (in tests only): // Usage (in tests only):
// //
// TestEvent<ResType> e; // TestEvent<ResType> e;
...@@ -98,11 +103,13 @@ class MockUploadClient : public StorageQueue::UploaderInterface { ...@@ -98,11 +103,13 @@ class MockUploadClient : public StorageQueue::UploaderInterface {
if (generation_id_.has_value() && if (generation_id_.has_value() &&
generation_id_.value() != sequencing_information.generation_id()) { generation_id_.value() != sequencing_information.generation_id()) {
std::move(processed_cb) std::move(processed_cb)
.Run(UploadRecordFailure(Status( .Run(UploadRecordFailure(
sequencing_information.sequencing_id(),
Status(
error::DATA_LOSS, error::DATA_LOSS,
base::StrCat({"Generation id mismatch, expected=", base::StrCat(
base::NumberToString(generation_id_.value()), {"Generation id mismatch, expected=",
" actual=", base::NumberToString(generation_id_.value()), " actual=",
base::NumberToString( base::NumberToString(
sequencing_information.generation_id())})))); sequencing_information.generation_id())}))));
return; return;
...@@ -122,9 +129,16 @@ class MockUploadClient : public StorageQueue::UploaderInterface { ...@@ -122,9 +129,16 @@ class MockUploadClient : public StorageQueue::UploaderInterface {
if (record_digest != wrapped_record.record_digest()) { if (record_digest != wrapped_record.record_digest()) {
std::move(processed_cb) std::move(processed_cb)
.Run(UploadRecordFailure( .Run(UploadRecordFailure(
sequencing_information.sequencing_id(),
Status(error::DATA_LOSS, "Record digest mismatch"))); Status(error::DATA_LOSS, "Record digest mismatch")));
return; return;
} }
// Store record digest for the next record in sequence to verify.
last_record_digest_map_->emplace(
std::make_pair(sequencing_information.sequencing_id(),
sequencing_information.generation_id()),
record_digest);
// If last record digest is present, match it and validate.
if (wrapped_record.has_last_record_digest()) { if (wrapped_record.has_last_record_digest()) {
auto it = last_record_digest_map_->find( auto it = last_record_digest_map_->find(
std::make_pair(sequencing_information.sequencing_id() - 1, std::make_pair(sequencing_information.sequencing_id() - 1,
...@@ -134,14 +148,11 @@ class MockUploadClient : public StorageQueue::UploaderInterface { ...@@ -134,14 +148,11 @@ class MockUploadClient : public StorageQueue::UploaderInterface {
it->second.value() != wrapped_record.last_record_digest())) { it->second.value() != wrapped_record.last_record_digest())) {
std::move(processed_cb) std::move(processed_cb)
.Run(UploadRecordFailure( .Run(UploadRecordFailure(
sequencing_information.sequencing_id(),
Status(error::DATA_LOSS, "Last record digest mismatch"))); Status(error::DATA_LOSS, "Last record digest mismatch")));
return; return;
} }
} }
last_record_digest_map_->emplace(
std::make_pair(sequencing_information.sequencing_id(),
sequencing_information.generation_id()),
record_digest);
} }
std::move(processed_cb) std::move(processed_cb)
...@@ -156,11 +167,13 @@ class MockUploadClient : public StorageQueue::UploaderInterface { ...@@ -156,11 +167,13 @@ class MockUploadClient : public StorageQueue::UploaderInterface {
if (generation_id_.has_value() && if (generation_id_.has_value() &&
generation_id_.value() != sequencing_information.generation_id()) { generation_id_.value() != sequencing_information.generation_id()) {
std::move(processed_cb) std::move(processed_cb)
.Run(UploadRecordFailure(Status( .Run(UploadRecordFailure(
sequencing_information.sequencing_id(),
Status(
error::DATA_LOSS, error::DATA_LOSS,
base::StrCat({"Generation id mismatch, expected=", base::StrCat(
base::NumberToString(generation_id_.value()), {"Generation id mismatch, expected=",
" actual=", base::NumberToString(generation_id_.value()), " actual=",
base::NumberToString( base::NumberToString(
sequencing_information.generation_id())})))); sequencing_information.generation_id())}))));
return; return;
...@@ -181,7 +194,7 @@ class MockUploadClient : public StorageQueue::UploaderInterface { ...@@ -181,7 +194,7 @@ class MockUploadClient : public StorageQueue::UploaderInterface {
void Completed(Status status) override { UploadComplete(status); } void Completed(Status status) override { UploadComplete(status); }
MOCK_METHOD(bool, UploadRecord, (uint64_t, base::StringPiece), (const)); MOCK_METHOD(bool, UploadRecord, (uint64_t, base::StringPiece), (const));
MOCK_METHOD(bool, UploadRecordFailure, (Status), (const)); MOCK_METHOD(bool, UploadRecordFailure, (uint64_t, Status), (const));
MOCK_METHOD(bool, UploadGap, (uint64_t, uint64_t), (const)); MOCK_METHOD(bool, UploadGap, (uint64_t, uint64_t), (const));
MOCK_METHOD(void, UploadComplete, (Status), (const)); MOCK_METHOD(void, UploadComplete, (Status), (const));
...@@ -191,9 +204,6 @@ class MockUploadClient : public StorageQueue::UploaderInterface { ...@@ -191,9 +204,6 @@ class MockUploadClient : public StorageQueue::UploaderInterface {
public: public:
explicit SetUp(MockUploadClient* client) : client_(client) {} explicit SetUp(MockUploadClient* client) : client_(client) {}
~SetUp() { ~SetUp() {
EXPECT_CALL(*client_, UploadRecordFailure(_))
.Times(0)
.InSequence(client_->test_upload_sequence_);
EXPECT_CALL(*client_, UploadComplete(Eq(Status::StatusOK()))) EXPECT_CALL(*client_, UploadComplete(Eq(Status::StatusOK())))
.Times(1) .Times(1)
.InSequence(client_->test_upload_sequence_); .InSequence(client_->test_upload_sequence_);
...@@ -231,6 +241,13 @@ class MockUploadClient : public StorageQueue::UploaderInterface { ...@@ -231,6 +241,13 @@ class MockUploadClient : public StorageQueue::UploaderInterface {
return *this; return *this;
} }
SetUp& Failure(uint64_t sequence_number, Status error) {
EXPECT_CALL(*client_, UploadRecordFailure(Eq(sequence_number), Eq(error)))
.InSequence(client_->test_upload_sequence_)
.WillOnce(Return(true));
return *this;
}
private: private:
MockUploadClient* const client_; MockUploadClient* const client_;
}; };
...@@ -430,6 +447,110 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueReopenWriteMoreAndUpload) { ...@@ -430,6 +447,110 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueReopenWriteMoreAndUpload) {
task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1)); task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1));
} }
TEST_P(StorageQueueTest,
WriteIntoNewStorageQueueReopenWithMissingMetadataWriteMoreAndUpload) {
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
WriteStringOrDie(data[0]);
WriteStringOrDie(data[1]);
WriteStringOrDie(data[2]);
// Save copy of options.
const StorageQueue::Options options = storage_queue_->options();
storage_queue_.reset();
// Delete all metadata files.
base::FileEnumerator dir_enum(
options.directory(),
/*recursive=*/false, base::FileEnumerator::FILES,
base::StrCat({METADATA_NAME, FILE_PATH_LITERAL(".*")}));
base::FilePath full_name;
while (full_name = dir_enum.Next(), !full_name.empty()) {
base::DeleteFile(full_name);
}
// Reopen, starting a new generation.
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
WriteStringOrDie(more_data[0]);
WriteStringOrDie(more_data[1]);
WriteStringOrDie(more_data[2]);
// Set uploader expectations. Previous data is all lost.
EXPECT_CALL(set_mock_uploader_expectations_, Call(NotNull()))
.WillOnce(Invoke([](MockUploadClient* mock_upload_client) {
MockUploadClient::SetUp(mock_upload_client)
.Required(0, more_data[0])
.Required(1, more_data[1])
.Required(2, more_data[2]);
}));
// Trigger upload.
task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1));
}
TEST_P(StorageQueueTest,
WriteIntoNewStorageQueueReopenWithMissingDataWriteMoreAndUpload) {
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
WriteStringOrDie(data[0]);
WriteStringOrDie(data[1]);
WriteStringOrDie(data[2]);
// Save copy of options.
const StorageQueue::Options options = storage_queue_->options();
storage_queue_.reset();
// Reopen with the same generation and sequencing information.
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
// Delete the first data file.
base::FilePath full_name = options.directory().Append(
base::StrCat({options.file_prefix(), FILE_PATH_LITERAL(".0")}));
base::DeleteFile(full_name);
// Write more data.
WriteStringOrDie(more_data[0]);
WriteStringOrDie(more_data[1]);
WriteStringOrDie(more_data[2]);
// Set uploader expectations. Previous data is all lost.
// The expected results depend on the test configuration.
switch (options.single_file_size()) {
case 1: // single record in file - deletion killed the first record
EXPECT_CALL(set_mock_uploader_expectations_, Call(NotNull()))
.WillOnce(Invoke([](MockUploadClient* mock_upload_client) {
MockUploadClient::SetUp(mock_upload_client)
.PossibleGap(0, 1)
.Required(1, data[1])
.Required(2, data[2])
.Required(3, more_data[0])
.Required(4, more_data[1])
.Required(5, more_data[2]);
}));
break;
case 256: // two records in file - deletion killed the first two records.
EXPECT_CALL(set_mock_uploader_expectations_, Call(NotNull()))
.WillOnce(Invoke([](MockUploadClient* mock_upload_client) {
MockUploadClient::SetUp(mock_upload_client)
.PossibleGap(0, 2)
.Failure(
2, Status(error::DATA_LOSS, "Last record digest mismatch"))
.Required(3, more_data[0])
.Required(4, more_data[1])
.Required(5, more_data[2]);
}));
break;
default: // UNlimited file size - deletion above killed all the data.
EXPECT_CALL(set_mock_uploader_expectations_, Call(NotNull()))
.WillOnce(Invoke([](MockUploadClient* mock_upload_client) {
MockUploadClient::SetUp(mock_upload_client).PossibleGap(0, 1);
}));
}
// Trigger upload.
task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1));
}
TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndFlush) { TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndFlush) {
CreateStorageQueueOrDie(BuildStorageQueueOptionsOnlyManual()); CreateStorageQueueOrDie(BuildStorageQueueOptionsOnlyManual());
WriteStringOrDie(data[0]); WriteStringOrDie(data[0]);
...@@ -849,8 +970,8 @@ TEST_P(StorageQueueTest, WriteEncryptFailure) { ...@@ -849,8 +970,8 @@ TEST_P(StorageQueueTest, WriteEncryptFailure) {
INSTANTIATE_TEST_SUITE_P(VaryingFileSize, INSTANTIATE_TEST_SUITE_P(VaryingFileSize,
StorageQueueTest, StorageQueueTest,
testing::Values(128 * 1024LL * 1024LL, testing::Values(128 * 1024LL * 1024LL,
64 /* two records in file */, 256 /* two records in file */,
32 /* single record in file */)); 1 /* single record in file */));
// TODO(b/157943006): Additional tests: // TODO(b/157943006): Additional tests:
// 1) Options object with a bad path. // 1) Options object with a bad path.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment