Commit dd6b0b15 authored by Leonid Baraz's avatar Leonid Baraz Committed by Commit Bot

Handle generation and digests.

Whenever certain record is written generation and its digest are stored
in metadata file, and then all previous metadata file(s) are
asynchronously deleted.
When reopening the StorageQueue, metadata file matching the last
sequencing number is used to restore generation and digest for future
writes.
The code does not handle errors yet - missing or corrupt metadata file,
incomplete data file, etc. In the future we will add code to reset
generation id and eliminate last digest.

MockClient in tests makes sure generation_id is the same for all records.
It also verify records own digest; last record digest is not verified
yet, because the test expects some records to be duplicated.

Bug: b:153364303
Bug: b:153659559
Change-Id: I80d8e7f03097bcc4a510f21e0a546876b3104960
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2388716
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Reviewed-by: default avatarZach Trudo <zatrudo@google.com>
Cr-Commit-Position: refs/heads/master@{#804486}
parent 28dc1f25
......@@ -101,10 +101,4 @@ Record ReportQueue::AugmentRecord(base::StringPiece record_data) {
return record;
}
StatusOr<std::string> ReportQueue::GetLastRecordDigest() {
// TODO(b/153659559) Getting the actual last record digest will come later.
// For now we just set to a string.
return "LastRecordDigest";
}
} // namespace reporting
......@@ -79,7 +79,6 @@ class ReportQueue {
void SendRecordToStorage(base::StringPiece record, EnqueueCallback callback);
reporting::Record AugmentRecord(base::StringPiece record_data);
StatusOr<std::string> GetLastRecordDigest();
std::unique_ptr<ReportQueueConfiguration> config_;
scoped_refptr<StorageModule> storage_;
......
......@@ -22,6 +22,7 @@
#include "base/memory/ptr_util.h"
#include "base/memory/weak_ptr.h"
#include "base/optional.h"
#include "base/rand_util.h"
#include "base/sequence_checker.h"
#include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h"
......@@ -37,12 +38,16 @@
#include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
#include "components/policy/proto/record.pb.h"
#include "crypto/random.h"
#include "crypto/sha2.h"
#include "third_party/protobuf/src/google/protobuf/io/zero_copy_stream_impl_lite.h"
namespace reporting {
namespace {
// Metadata file name prefix.
const base::FilePath::CharType METADATA_NAME[] = FILE_PATH_LITERAL("META");
// The size in bytes that all files and records are rounded to (for privacy:
// make it harder to differ between kinds of records).
constexpr size_t FRAME_SIZE = 16u;
......@@ -152,8 +157,22 @@ Status StorageQueue::Init() {
{"Fileset directory '", options_.directory().MaybeAsASCII(),
"' does not exist, error=", base::File::ErrorToString(error)}));
}
// Enumerate data files and scan the last one to determine what sequence
// numbers do we have (first and last).
RETURN_IF_ERROR(EnumerateDataFiles());
RETURN_IF_ERROR(ScanLastFile());
generation_id_ = base::RandUint64(); // reset it in case of inavaliability.
if (next_seq_number_ > 0) {
// Enumerate metadata files to determine what sequence numbers have
// last record digest. They might have metadata for sequence numbers
// beyond what data files had, because metadata is written ahead of the
// data, but must have metadata for the last data, because metadata is only
// removed once data is written. So we are picking the metadata matching the
// last sequencing number and load both digest and generation id from there.
// If there is no match, we bail out for now; later on we will instead
// start a new generation from the next sequencing number (with no digest!)
RETURN_IF_ERROR(RestoreMetadata());
}
// Initiate periodic uploading, if needed.
if (!options_.upload_period().is_zero()) {
upload_timer_.Start(FROM_HERE, options_.upload_period(), this,
......@@ -162,6 +181,26 @@ Status StorageQueue::Init() {
return Status::StatusOK();
}
void StorageQueue::UpdateRecordDigest(WrappedRecord* wrapped_record) {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_);
// Attach last record digest, if present.
if (last_record_digest_.has_value()) {
*wrapped_record->mutable_last_record_digest() = last_record_digest_.value();
}
// Calculate new record digest.
{
std::string serialized_record;
wrapped_record->record().SerializeToString(&serialized_record);
*wrapped_record->mutable_record_digest() =
crypto::SHA256HashString(serialized_record);
DCHECK_EQ(wrapped_record->record_digest().size(), crypto::kSHA256Length);
}
// Store it in the record (for self-verification by the server).
last_record_digest_ = wrapped_record->record_digest();
}
Status StorageQueue::EnumerateDataFiles() {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_);
// We need to set first_seq_number_ to 0 if this is the initialization
......@@ -180,25 +219,25 @@ Status StorageQueue::EnumerateDataFiles() {
base::StrCat({"File has no extension: '",
full_name.MaybeAsASCII(), "'"}));
}
uint64_t seq_number = 0;
bool success = base::StringToUint64(
dir_enum.GetInfo().GetName().Extension().substr(1), &seq_number);
uint64_t file_seq_number = 0;
bool success = base::StringToUint64(extension.substr(1), &file_seq_number);
if (!success) {
return Status(error::INTERNAL,
base::StrCat({"File extension does not parse: '",
full_name.MaybeAsASCII(), "'"}));
}
if (!files_
.emplace(seq_number, base::MakeRefCounted<SingleFile>(
full_name, dir_enum.GetInfo().GetSize()))
.emplace(file_seq_number,
base::MakeRefCounted<SingleFile>(
full_name, dir_enum.GetInfo().GetSize()))
.second) {
return Status(error::ALREADY_EXISTS,
base::StrCat({"Sequencing duplicated: '",
full_name.MaybeAsASCII(), "'"}));
}
if (!first_seq_number.has_value() ||
first_seq_number.value() > seq_number) {
first_seq_number = seq_number;
first_seq_number.value() > file_seq_number) {
first_seq_number = file_seq_number;
}
}
// first_seq_number.has_value() is true only if we found some files.
......@@ -387,6 +426,143 @@ Status StorageQueue::WriteHeaderAndBlock(
return Status::StatusOK();
}
Status StorageQueue::WriteMetadata() {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_);
// Synchronously write the metafile.
auto meta_file = base::MakeRefCounted<SingleFile>(
options_.directory()
.Append(METADATA_NAME)
.AddExtensionASCII(base::NumberToString(next_seq_number_)),
/*size=*/0);
RETURN_IF_ERROR(meta_file->Open(/*read_only=*/false));
// Write generation id.
auto append_result = meta_file->Append(base::StringPiece(
reinterpret_cast<const char*>(&generation_id_), sizeof(generation_id_)));
if (!append_result.ok()) {
return Status(
error::RESOURCE_EXHAUSTED,
base::StrCat({"Cannot write metafile=", meta_file->name(),
" status=", append_result.status().ToString()}));
}
// Write last record digest.
DCHECK(last_record_digest_.has_value()); // Must be set by now.
append_result = meta_file->Append(last_record_digest_.value());
if (!append_result.ok()) {
return Status(
error::RESOURCE_EXHAUSTED,
base::StrCat({"Cannot write metafile=", meta_file->name(),
" status=", append_result.status().ToString()}));
}
if (append_result.ValueOrDie() != last_record_digest_.value().size()) {
return Status(error::DATA_LOSS, base::StrCat({"Failure writing metafile=",
meta_file->name()}));
}
meta_file->Close();
// Asynchronously delete all earlier metafiles. Do not wait for this to
// happen.
base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT, base::MayBlock()},
base::BindOnce(&StorageQueue::DeleteOutdatedMetadata, this,
next_seq_number_));
return Status::StatusOK();
}
Status StorageQueue::RestoreMetadata() {
// Enumerate all meta-files into a map seq_number->file_path.
std::map<uint64_t, base::FilePath> meta_files_paths;
base::FileEnumerator dir_enum(
options_.directory(),
/*recursive=*/false, base::FileEnumerator::FILES,
base::StrCat({METADATA_NAME, FILE_PATH_LITERAL(".*")}));
base::FilePath full_name;
while (full_name = dir_enum.Next(), !full_name.empty()) {
const auto extension = dir_enum.GetInfo().GetName().Extension();
if (extension.empty()) {
continue;
}
uint64_t seq_number = 0;
bool success = base::StringToUint64(
dir_enum.GetInfo().GetName().Extension().substr(1), &seq_number);
if (!success) {
continue;
}
meta_files_paths.emplace(seq_number, full_name); // Ignore the result.
}
// See whether we have a match for next_seq_number_ - 1.
DCHECK_GT(next_seq_number_, 0u);
auto it = meta_files_paths.find(next_seq_number_ - 1);
if (it == meta_files_paths.end()) {
// For now we fail in this case. Later on we will provide a generation
// switch.
return Status(error::DATA_LOSS,
base::StrCat({"Cannot recover last record digest at ",
base::NumberToString(next_seq_number_ - 1)}));
}
// Match found. Load the metadata.
auto meta_file = base::MakeRefCounted<SingleFile>(
options_.directory()
.Append(METADATA_NAME)
.AddExtensionASCII(base::NumberToString(next_seq_number_ - 1)),
/*size=*/0);
RETURN_IF_ERROR(meta_file->Open(/*read_only=*/true));
// Read generation id.
auto read_result = meta_file->Read(/*pos=*/0, sizeof(generation_id_));
if (!read_result.ok() ||
read_result.ValueOrDie().size() != sizeof(generation_id_)) {
return Status(error::DATA_LOSS,
base::StrCat({"Cannot read metafile=", meta_file->name(),
" status=", read_result.status().ToString()}));
}
generation_id_ =
*reinterpret_cast<const uint64_t*>(read_result.ValueOrDie().data());
// Read last record digest.
read_result =
meta_file->Read(/*pos=*/sizeof(generation_id_), crypto::kSHA256Length);
if (!read_result.ok() ||
read_result.ValueOrDie().size() != crypto::kSHA256Length) {
return Status(error::DATA_LOSS,
base::StrCat({"Cannot read metafile=", meta_file->name(),
" status=", read_result.status().ToString()}));
}
last_record_digest_ = std::string(read_result.ValueOrDie());
// Delete other metadata files.
for (const auto& file_path : meta_files_paths) {
if (file_path.first == next_seq_number_ - 1) {
continue; // Skip the file we just used.
}
base::DeleteFile(file_path.second); // Ignore any errors.
}
return Status::StatusOK();
}
void StorageQueue::DeleteOutdatedMetadata(uint64_t seq_number_to_keep) {
std::vector<base::FilePath> files_to_delete;
base::FileEnumerator dir_enum(
options_.directory(),
/*recursive=*/false, base::FileEnumerator::FILES,
base::StrCat({METADATA_NAME, FILE_PATH_LITERAL(".*")}));
base::FilePath full_name;
while (full_name = dir_enum.Next(), !full_name.empty()) {
const auto extension = dir_enum.GetInfo().GetName().Extension();
if (extension.empty()) {
continue;
}
uint64_t seq_number = 0;
bool success = base::StringToUint64(
dir_enum.GetInfo().GetName().Extension().substr(1), &seq_number);
if (!success) {
continue;
}
if (seq_number >= seq_number_to_keep) {
continue;
}
files_to_delete.emplace_back(full_name);
}
for (const auto& file_path : files_to_delete) {
base::DeleteFile(file_path); // Ignore any errors.
}
}
class StorageQueue::ReadContext : public TaskRunnerContext<Status> {
public:
ReadContext(std::unique_ptr<UploaderInterface> uploader,
......@@ -457,7 +633,8 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> {
Response(blob.status());
return;
}
CallCurrentRecord(seq_number_, blob.ValueOrDie());
CallCurrentRecord(storage_queue->generation_id_, seq_number_,
blob.ValueOrDie());
}
void OnCompletion() override {
......@@ -476,7 +653,9 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> {
// place processing of the record on any thread(s). Once it returns, it will
// schedule NextRecord to execute on the sequential thread runner of this
// StorageQueue.
void CallCurrentRecord(uint64_t seq_number, base::StringPiece blob) {
void CallCurrentRecord(uint64_t generation_id,
uint64_t seq_number,
base::StringPiece blob) {
google::protobuf::io::ArrayInputStream stream( // Zero-copy stream.
blob.data(), blob.size());
EncryptedRecord encrypted_record;
......@@ -498,10 +677,11 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> {
base::Unretained(this)));
return;
}
// Fill in sequeincing information.
// Priority and Generation ID are attached by the Storage layer.
// Fill in sequencing information.
// Priority is attached by the Storage layer.
SequencingInformation* const sequencing_info =
encrypted_record.mutable_sequencing_information();
sequencing_info->set_generation_id(generation_id);
sequencing_info->set_sequencing_id(seq_number);
uploader_->ProcessRecord(std::move(encrypted_record),
base::BindOnce(&ReadContext::ScheduleNextRecord,
......@@ -543,7 +723,8 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> {
Response(blob.status());
return;
}
CallCurrentRecord(seq_number_, blob.ValueOrDie());
CallCurrentRecord(storage_queue->generation_id_, seq_number_,
blob.ValueOrDie());
}
StatusOr<base::StringPiece> EnsureBlob(uint64_t seq_number) {
......@@ -666,11 +847,13 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
// Wrap the record.
WrappedRecord wrapped_record;
*wrapped_record.mutable_record() = std::move(record_);
// Later: add digests to the wrapped record.
// Calculate and attach record digest.
storage_queue_->UpdateRecordDigest(&wrapped_record);
// Serialize and encrypt wrapped record on a thread pool.
base::ThreadPool::PostTask(
FROM_HERE,
FROM_HERE, {base::TaskPriority::BEST_EFFORT},
base::BindOnce(&WriteContext::SerializeAndEncryptWrappedRecord,
base::Unretained(this), std::move(wrapped_record)));
}
......@@ -737,8 +920,17 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
}
scoped_refptr<SingleFile> last_file = assign_result.ValueOrDie();
// Prepare and initiate writing generation into to a new file.
// Pick up when both WriteMetadata and WriteHeaderAndBlock
// have finished.
Status write_result = storage_queue_->WriteMetadata();
if (!write_result.ok()) {
Response(write_result);
return;
}
// Write header and block.
Status write_result =
write_result =
storage_queue_->WriteHeaderAndBlock(buffer, std::move(last_file));
if (!write_result.ok()) {
Response(write_result);
......
......@@ -139,7 +139,8 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// caller can "fire and forget" it (|completion_cb| allows to verify that
// record has been successfully enqueued). If file is going to become too
// large, it is closed and new file is created.
// Helper methods: AssignLastFile, WriteHeaderAndBlock.
// Helper methods: AssignLastFile, WriteHeaderAndBlock, OpenNewWriteableFile,
// WriteMetadata, DeleteOutdatedMetadata.
void Write(Record record, base::OnceCallback<void(Status)> completion_cb);
// Confirms acceptance of the records up to |seq_number| (inclusively).
......@@ -247,9 +248,14 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// Must be called once and only once after construction.
// Returns OK or error status, if anything failed to initialize.
// Called once, during initialization.
// Helper methods: EnumerateDataFiles, ScanLastFile.
// Helper methods: EnumerateDataFiles, ScanLastFile, RestoreMetadata.
Status Init();
// Attaches last record digest to the given record (does not exist at a
// generation start). Calculates the given record digest and stores it
// as the last one for the next record.
void UpdateRecordDigest(WrappedRecord* wrapped_record);
// Helper method for Init(): enumerates all data files in the directory.
// Valid file names are <prefix>.<seq_number>, any other names are ignored.
Status EnumerateDataFiles();
......@@ -270,6 +276,23 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// writeable file, adding it to |files_|.
StatusOr<scoped_refptr<SingleFile>> OpenNewWriteableFile();
// Helper method for Write(): stores a file with metadata to match the
// incoming new record. Synchronously composes metadata to record, then
// asynchronously writes it into a file with next sequencing number and then
// notifies the Write operation that it can now complete. After that it
// asynchronously deletes all other files with lower sequencing number
// (multiple Writes can see the same files and attempt to delete them, and
// that is not an error).
Status WriteMetadata();
// Helper method for Init(): locates file with metadata that matches the
// last sequencing number and loads metadat from it.
Status RestoreMetadata();
// Helper method for Write(): deletes meta files up to, but not including
// |seq_number_to_keep|. Any errors are ignored.
void DeleteOutdatedMetadata(uint64_t seq_number_to_keep);
// Helper method for Write(): composes record header and writes it to the
// file, followed by data.
Status WriteHeaderAndBlock(base::StringPiece data,
......@@ -296,6 +319,16 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// Immutable options, stored at the time of creation.
const Options options_;
// Current generation id, unique per device and queue.
// Set up once during initialization by reading from the 'gen_id.NNNN' file
// matching the last seq number, or generated anew as a random number if no
// such file found (files do not match the id).
uint64_t generation_id_ = 0;
// Digest of the last written record (loaded at queue initialization, absent
// if the new generation has just started, and no records where stored yet).
base::Optional<std::string> last_record_digest_;
// Next sequencing number to store (not assigned yet).
uint64_t next_seq_number_ = 0;
......
......@@ -9,12 +9,16 @@
#include <vector>
#include "base/files/scoped_temp_dir.h"
#include "base/optional.h"
#include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h"
#include "base/synchronization/waitable_event.h"
#include "base/test/task_environment.h"
#include "chrome/browser/policy/messaging_layer/encryption/test_encryption_module.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "components/policy/proto/record.pb.h"
#include "crypto/sha2.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
......@@ -84,6 +88,44 @@ class MockUploadClient : public StorageQueue::UploaderInterface {
WrappedRecord wrapped_record;
ASSERT_TRUE(wrapped_record.ParseFromString(
encrypted_record.ValueOrDie().encrypted_wrapped_record()));
// Verify generation match.
if (generation_id_.has_value() &&
generation_id_.value() != encrypted_record.ValueOrDie()
.sequencing_information()
.generation_id()) {
std::move(processed_cb)
.Run(UploadRecordFailure(Status(
error::DATA_LOSS,
base::StrCat({"Generation id mismatch, expected=",
base::NumberToString(generation_id_.value()),
" actual=",
base::NumberToString(encrypted_record.ValueOrDie()
.sequencing_information()
.generation_id())}))));
return;
}
if (!generation_id_.has_value()) {
generation_id_ = encrypted_record.ValueOrDie()
.sequencing_information()
.generation_id();
}
// Verify digest and its match.
// Last record digest is not verified yet, since duplicate records are
// accepted in this test.
{
std::string serialized_record;
wrapped_record.record().SerializeToString(&serialized_record);
const auto record_digest = crypto::SHA256HashString(serialized_record);
DCHECK_EQ(record_digest.size(), crypto::kSHA256Length);
if (record_digest != wrapped_record.record_digest()) {
std::move(processed_cb)
.Run(UploadRecordFailure(
Status(error::DATA_LOSS, "Record digest mismatch")));
return;
}
}
std::move(processed_cb)
.Run(UploadRecord(encrypted_record.ValueOrDie()
.sequencing_information()
......@@ -133,6 +175,7 @@ class MockUploadClient : public StorageQueue::UploaderInterface {
};
private:
base::Optional<uint64_t> generation_id_;
Sequence test_upload_sequence_;
};
......
......@@ -9,6 +9,9 @@
#include <vector>
#include "base/files/scoped_temp_dir.h"
#include "base/optional.h"
#include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h"
#include "base/synchronization/waitable_event.h"
#include "base/test/task_environment.h"
#include "chrome/browser/policy/messaging_layer/encryption/test_encryption_module.h"
......@@ -16,7 +19,7 @@
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "components/policy/proto/record.pb.h"
#include "components/policy/proto/record_constants.pb.h"
#include "crypto/sha2.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
......@@ -87,6 +90,44 @@ class MockUploadClient : public Storage::UploaderInterface {
WrappedRecord wrapped_record;
ASSERT_TRUE(wrapped_record.ParseFromString(
encrypted_record.ValueOrDie().encrypted_wrapped_record()));
// Verify generation match.
if (generation_id_.has_value() &&
generation_id_.value() != encrypted_record.ValueOrDie()
.sequencing_information()
.generation_id()) {
std::move(processed_cb)
.Run(UploadRecordFailure(Status(
error::DATA_LOSS,
base::StrCat({"Generation id mismatch, expected=",
base::NumberToString(generation_id_.value()),
" actual=",
base::NumberToString(encrypted_record.ValueOrDie()
.sequencing_information()
.generation_id())}))));
return;
}
if (!generation_id_.has_value()) {
generation_id_ = encrypted_record.ValueOrDie()
.sequencing_information()
.generation_id();
}
// Verify digest and its match.
// Last record digest is not verified yet, since duplicate records are
// accepted in this test.
{
std::string serialized_record;
wrapped_record.record().SerializeToString(&serialized_record);
const auto record_digest = crypto::SHA256HashString(serialized_record);
DCHECK_EQ(record_digest.size(), crypto::kSHA256Length);
if (record_digest != wrapped_record.record_digest()) {
std::move(processed_cb)
.Run(UploadRecordFailure(
Status(error::DATA_LOSS, "Record digest mismatch")));
return;
}
}
std::move(processed_cb)
.Run(UploadRecord(
encrypted_record.ValueOrDie().sequencing_information().priority(),
......@@ -163,6 +204,7 @@ class MockUploadClient : public Storage::UploaderInterface {
};
private:
base::Optional<uint64_t> generation_id_;
Sequence test_upload_sequence_;
};
......
......@@ -75,13 +75,11 @@ message SequencingInformation {
// (what to do with that is a decision that the caller needs to make).
optional uint64 sequencing_id = 1;
// Generation ID (required)
// UUID of the last boot that did not find the public key cached - can
// happen after powerwash.
// Generation ID (required). Unique per device and priority. Generated anew
// when previous record digest is not found at startup (e.g. after powerwash).
optional uint64 generation_id = 2;
// Priority (required)
// Generation IDs are per Priority.
// Priority (required).
optional Priority priority = 3;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment