Commit 3926c70a authored by Leonid Baraz's avatar Leonid Baraz

Fix order distortions.

Fixes a serious bug in the StorageQueue found with the stress test.
When records are enqueued fast (e.g. in multiple threads) it can cause
the following: the sequence ids will always be strictly monotonic, but
last record digest may refer to a wrong record:
(1) Record1 gets last digest of previous record
(2) Record2 gets digest of Record1
(3) Record2 gets seq id=1 and is written to the file
(4) Record1 gets seq_id=2 and is written to the file.
Upon uploading, server will get Record2[seq=1] uploaded first,
Record1[seq=2] second, but last record digest from Record1 will not
match digest of Record2 - it will match record before Record1.
Add stress test too.

Bug: b:177439641
Change-Id: I5b7e425ea6aa0f30b96248dcaebb22562c085613
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2641569Reviewed-by: default avatarZach Trudo <zatrudo@google.com>
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#846352}
parent ff88cb88
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <algorithm> #include <algorithm>
#include <cstring> #include <cstring>
#include <iterator> #include <iterator>
#include <list>
#include <map> #include <map>
#include <memory> #include <memory>
#include <string> #include <string>
...@@ -131,6 +132,7 @@ StorageQueue::StorageQueue(const QueueOptions& options, ...@@ -131,6 +132,7 @@ StorageQueue::StorageQueue(const QueueOptions& options,
sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner( sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner(
{base::TaskPriority::BEST_EFFORT, base::MayBlock()})) { {base::TaskPriority::BEST_EFFORT, base::MayBlock()})) {
DETACH_FROM_SEQUENCE(storage_queue_sequence_checker_); DETACH_FROM_SEQUENCE(storage_queue_sequence_checker_);
DCHECK(write_contexts_queue_.empty());
} }
StorageQueue::~StorageQueue() { StorageQueue::~StorageQueue() {
...@@ -139,6 +141,8 @@ StorageQueue::~StorageQueue() { ...@@ -139,6 +141,8 @@ StorageQueue::~StorageQueue() {
// Stop upload timer. // Stop upload timer.
upload_timer_.AbandonAndStop(); upload_timer_.AbandonAndStop();
// Make sure no pending writes is present.
DCHECK(write_contexts_queue_.empty());
// Close all opened files. // Close all opened files.
files_.clear(); files_.clear();
} }
...@@ -1013,7 +1017,8 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> { ...@@ -1013,7 +1017,8 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
: TaskRunnerContext<Status>(std::move(write_callback), : TaskRunnerContext<Status>(std::move(write_callback),
storage_queue->sequenced_task_runner_), storage_queue->sequenced_task_runner_),
storage_queue_(storage_queue), storage_queue_(storage_queue),
record_(std::move(record)) { record_(std::move(record)),
in_contexts_queue_(storage_queue->write_contexts_queue_.end()) {
DCHECK(storage_queue.get()); DCHECK(storage_queue.get());
DETACH_FROM_SEQUENCE(write_sequence_checker_); DETACH_FROM_SEQUENCE(write_sequence_checker_);
} }
...@@ -1021,6 +1026,23 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> { ...@@ -1021,6 +1026,23 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
private: private:
// Context can only be deleted by calling Response method. // Context can only be deleted by calling Response method.
~WriteContext() override { ~WriteContext() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(write_sequence_checker_);
// If still in queue, remove it (something went wrong).
if (in_contexts_queue_ != storage_queue_->write_contexts_queue_.end()) {
DCHECK_EQ(storage_queue_->write_contexts_queue_.front(), this);
storage_queue_->write_contexts_queue_.erase(in_contexts_queue_);
}
// If there is the context at the front of the queue and its buffer is
// filled in, schedule respective |Write| to happen now.
if (!storage_queue_->write_contexts_queue_.empty() &&
!storage_queue_->write_contexts_queue_.front()->buffer_.empty()) {
storage_queue_->write_contexts_queue_.front()->Schedule(
&WriteContext::ResumeWriteRecord,
base::Unretained(storage_queue_->write_contexts_queue_.front()));
}
// If no uploader is needed, we are done. // If no uploader is needed, we are done.
if (!uploader_) { if (!uploader_) {
return; return;
...@@ -1054,6 +1076,10 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> { ...@@ -1054,6 +1076,10 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
// Calculate and attach record digest. // Calculate and attach record digest.
storage_queue_->UpdateRecordDigest(&wrapped_record); storage_queue_->UpdateRecordDigest(&wrapped_record);
// Add context to the end of the queue.
in_contexts_queue_ = storage_queue_->write_contexts_queue_.insert(
storage_queue_->write_contexts_queue_.end(), this);
// Serialize and encrypt wrapped record on a thread pool. // Serialize and encrypt wrapped record on a thread pool.
base::ThreadPool::PostTask( base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT}, FROM_HERE, {base::TaskPriority::BEST_EFFORT},
...@@ -1116,11 +1142,30 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> { ...@@ -1116,11 +1142,30 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
encrypted_record_result.ValueOrDie().Clear(); encrypted_record_result.ValueOrDie().Clear();
// Write into storage on sequntial task runner. // Write into storage on sequntial task runner.
Schedule(&WriteContext::WriteRecord, base::Unretained(this), buffer); Schedule(&WriteContext::WriteRecord, base::Unretained(this),
std::move(buffer));
} }
void WriteRecord(base::StringPiece buffer) { void WriteRecord(std::string buffer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(write_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(write_sequence_checker_);
buffer_.swap(buffer);
ResumeWriteRecord();
}
void ResumeWriteRecord() {
DCHECK_CALLED_ON_VALID_SEQUENCE(write_sequence_checker_);
// If we are not at the head of the queue, delay write and expect to be
// reactivated later.
DCHECK(in_contexts_queue_ != storage_queue_->write_contexts_queue_.end());
if (storage_queue_->write_contexts_queue_.front() != this) {
return;
}
// We are at the head of the queue, remove ourselves.
storage_queue_->write_contexts_queue_.pop_front();
in_contexts_queue_ = storage_queue_->write_contexts_queue_.end();
// Prepare uploader, if need to run it after Write. // Prepare uploader, if need to run it after Write.
if (storage_queue_->options_.upload_period().is_zero()) { if (storage_queue_->options_.upload_period().is_zero()) {
...@@ -1134,8 +1179,9 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> { ...@@ -1134,8 +1179,9 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
} }
} }
DCHECK(!buffer_.empty());
StatusOr<scoped_refptr<SingleFile>> assign_result = StatusOr<scoped_refptr<SingleFile>> assign_result =
storage_queue_->AssignLastFile(buffer.size()); storage_queue_->AssignLastFile(buffer_.size());
if (!assign_result.ok()) { if (!assign_result.ok()) {
Response(assign_result.status()); Response(assign_result.status());
return; return;
...@@ -1151,7 +1197,7 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> { ...@@ -1151,7 +1197,7 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
// Write header and block. // Write header and block.
write_result = write_result =
storage_queue_->WriteHeaderAndBlock(buffer, std::move(last_file)); storage_queue_->WriteHeaderAndBlock(buffer_, std::move(last_file));
if (!write_result.ok()) { if (!write_result.ok()) {
Response(write_result); Response(write_result);
return; return;
...@@ -1164,6 +1210,15 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> { ...@@ -1164,6 +1210,15 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
Record record_; Record record_;
// Position in the |storage_queue_|->|write_contexts_queue_|.
// We use it in order to detect whether the context is in the queue
// and to remove it from the queue, when the time comes.
std::list<WriteContext*>::iterator in_contexts_queue_;
// Write buffer. When filled in (after encryption), |WriteRecord| can be
// executed. Empty until encryption is done.
std::string buffer_;
// Upload provider (if any). // Upload provider (if any).
std::unique_ptr<UploaderInterface> uploader_; std::unique_ptr<UploaderInterface> uploader_;
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#ifndef CHROME_BROWSER_POLICY_MESSAGING_LAYER_STORAGE_STORAGE_QUEUE_H_ #ifndef CHROME_BROWSER_POLICY_MESSAGING_LAYER_STORAGE_STORAGE_QUEUE_H_
#define CHROME_BROWSER_POLICY_MESSAGING_LAYER_STORAGE_STORAGE_QUEUE_H_ #define CHROME_BROWSER_POLICY_MESSAGING_LAYER_STORAGE_STORAGE_QUEUE_H_
#include <list>
#include <map> #include <map>
#include <memory> #include <memory>
#include <string> #include <string>
...@@ -304,6 +305,13 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -304,6 +305,13 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// if the new generation has just started, and no records where stored yet). // if the new generation has just started, and no records where stored yet).
base::Optional<std::string> last_record_digest_; base::Optional<std::string> last_record_digest_;
// Queue of the write context instances in the order of creation, sequencing
// ids and record digests. Context is always removed from this queue before
// being destructed. We use std::list rather than std::queue, because
// if the write fails, it needs to be removed from the queue regardless of
// whether it is at the head, tail or middle.
std::list<WriteContext*> write_contexts_queue_;
// Next sequencing id to store (not assigned yet). // Next sequencing id to store (not assigned yet).
int64_t next_sequencing_id_ = 0; int64_t next_sequencing_id_ = 0;
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/policy/messaging_layer/storage/storage_queue.h"
#include <cstdint>
#include <initializer_list>
#include <utility>
#include <vector>
#include "base/containers/flat_map.h"
#include "base/files/file_path.h"
#include "base/files/file_util.h"
#include "base/files/scoped_temp_dir.h"
#include "base/optional.h"
#include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h"
#include "base/synchronization/waitable_event.h"
#include "base/test/task_environment.h"
#include "chrome/browser/policy/messaging_layer/encryption/test_encryption_module.h"
#include "chrome/browser/policy/messaging_layer/storage/resources/resource_interface.h"
#include "chrome/browser/policy/messaging_layer/storage/storage_configuration.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "components/policy/proto/record.pb.h"
#include "crypto/sha2.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
using ::testing::_;
using ::testing::Between;
using ::testing::Eq;
using ::testing::Invoke;
using ::testing::NotNull;
using ::testing::Return;
using ::testing::Sequence;
using ::testing::StrEq;
using ::testing::WithArg;
namespace reporting {
namespace {
constexpr size_t kTotalQueueStarts = 4;
constexpr size_t kTotalWritesPerStart = 256;
constexpr char kDataPrefix[] = "Rec";
// Usage (in tests only):
//
// TestEvent<ResType> e;
// ... Do some async work passing e.cb() as a completion callback of
// base::OnceCallback<void(ResType* res)> type which also may perform some
// other action specified by |done| callback provided by the caller.
// ... = e.result(); // Will wait for e.cb() to be called and return the
// collected result.
//
template <typename ResType>
class TestEvent {
public:
TestEvent() : run_loop_(std::make_unique<base::RunLoop>()) {}
~TestEvent() { EXPECT_FALSE(run_loop_->running()) << "Not responded"; }
TestEvent(const TestEvent& other) = delete;
TestEvent& operator=(const TestEvent& other) = delete;
ResType result() {
run_loop_->Run();
return std::forward<ResType>(result_);
}
// Completion callback to hand over to the processing method.
base::OnceCallback<void(ResType res)> cb() {
return base::BindOnce(
[](base::RunLoop* run_loop, ResType* result, ResType res) {
*result = std::forward<ResType>(res);
run_loop->Quit();
},
base::Unretained(run_loop_.get()), base::Unretained(&result_));
}
private:
std::unique_ptr<base::RunLoop> run_loop_;
ResType result_;
};
class TestUploadClient : public StorageQueue::UploaderInterface {
public:
// Mapping of <generation id, sequencing id> to matching record digest.
// Whenever a record is uploaded and includes last record digest, this map
// should have that digest already recorded. Only the first record in a
// generation is uploaded without last record digest.
using LastRecordDigestMap = base::flat_map<
std::pair<int64_t /*generation id */, int64_t /*sequencing id*/>,
base::Optional<std::string /*digest*/>>;
explicit TestUploadClient(LastRecordDigestMap* last_record_digest_map)
: last_record_digest_map_(last_record_digest_map) {}
void ProcessRecord(EncryptedRecord encrypted_record,
base::OnceCallback<void(bool)> processed_cb) override {
WrappedRecord wrapped_record;
ASSERT_TRUE(wrapped_record.ParseFromString(
encrypted_record.encrypted_wrapped_record()));
// Verify generation match.
const auto& sequencing_information =
encrypted_record.sequencing_information();
if (!generation_id_.has_value()) {
generation_id_ = sequencing_information.generation_id();
} else {
ASSERT_THAT(generation_id_.value(),
Eq(sequencing_information.generation_id()));
}
// Verify digest and its match.
// Last record digest is not verified yet, since duplicate records are
// accepted in this test.
{
std::string serialized_record;
wrapped_record.record().SerializeToString(&serialized_record);
const auto record_digest = crypto::SHA256HashString(serialized_record);
DCHECK_EQ(record_digest.size(), crypto::kSHA256Length);
ASSERT_THAT(record_digest, Eq(wrapped_record.record_digest()));
// Store record digest for the next record in sequence to verify.
last_record_digest_map_->emplace(
std::make_pair(sequencing_information.sequencing_id(),
sequencing_information.generation_id()),
record_digest);
// If last record digest is present, match it and validate.
if (wrapped_record.has_last_record_digest()) {
auto it = last_record_digest_map_->find(
std::make_pair(sequencing_information.sequencing_id() - 1,
sequencing_information.generation_id()));
if (it != last_record_digest_map_->end() && it->second.has_value()) {
ASSERT_THAT(it->second.value(),
Eq(wrapped_record.last_record_digest()));
}
}
}
std::move(processed_cb).Run(true);
}
void ProcessGap(SequencingInformation sequencing_information,
uint64_t count,
base::OnceCallback<void(bool)> processed_cb) override {
ASSERT_TRUE(false) << "There should be no gaps";
}
void Completed(bool need_encryption_key, Status status) override {
ASSERT_FALSE(need_encryption_key);
ASSERT_OK(status);
}
private:
base::Optional<int64_t> generation_id_;
LastRecordDigestMap* const last_record_digest_map_;
Sequence test_upload_sequence_;
};
class StorageQueueStressTest : public ::testing::TestWithParam<size_t> {
public:
void SetUp() override {
ASSERT_TRUE(location_.CreateUniqueTempDir());
options_.set_directory(base::FilePath(location_.GetPath()))
.set_single_file_size(GetParam());
}
void TearDown() override {
ResetTestStorageQueue();
// Make sure all memory is deallocated.
ASSERT_THAT(GetMemoryResource()->GetUsed(), Eq(0u));
}
void CreateTestStorageQueueOrDie(const QueueOptions& options) {
ASSERT_FALSE(storage_queue_) << "StorageQueue already assigned";
test_encryption_module_ =
base::MakeRefCounted<test::TestEncryptionModule>();
TestEvent<StatusOr<scoped_refptr<StorageQueue>>> e;
StorageQueue::Create(
options,
base::BindRepeating(&StorageQueueStressTest::BuildTestUploader,
base::Unretained(this)),
test_encryption_module_, e.cb());
StatusOr<scoped_refptr<StorageQueue>> storage_queue_result = e.result();
ASSERT_OK(storage_queue_result) << "Failed to create StorageQueue, error="
<< storage_queue_result.status();
storage_queue_ = std::move(storage_queue_result.ValueOrDie());
}
void ResetTestStorageQueue() {
task_environment_.RunUntilIdle();
storage_queue_.reset();
}
QueueOptions BuildStorageQueueOptionsImmediate() const {
return QueueOptions(options_)
.set_subdirectory(FILE_PATH_LITERAL("D1"))
.set_file_prefix(FILE_PATH_LITERAL("F0001"));
}
QueueOptions BuildStorageQueueOptionsPeriodic(
base::TimeDelta upload_period = base::TimeDelta::FromSeconds(1)) const {
return BuildStorageQueueOptionsImmediate().set_upload_period(upload_period);
}
QueueOptions BuildStorageQueueOptionsOnlyManual() const {
return BuildStorageQueueOptionsPeriodic(base::TimeDelta::Max());
}
StatusOr<std::unique_ptr<StorageQueue::UploaderInterface>>
BuildTestUploader() {
return std::make_unique<TestUploadClient>(&last_record_digest_map_);
}
void WriteStringAsync(base::StringPiece data,
base::OnceCallback<void(Status)> cb) {
EXPECT_TRUE(storage_queue_) << "StorageQueue not created yet";
Record record;
record.mutable_data()->assign(data.data(), data.size());
record.set_destination(UPLOAD_EVENTS);
record.set_dm_token("DM TOKEN");
storage_queue_->Write(std::move(record), std::move(cb));
}
base::ScopedTempDir location_;
StorageOptions options_;
scoped_refptr<test::TestEncryptionModule> test_encryption_module_;
scoped_refptr<StorageQueue> storage_queue_;
// Test-wide global mapping of <generation id, sequencing id> to record
// digest. Serves all TestUploadClients created by test fixture.
TestUploadClient::LastRecordDigestMap last_record_digest_map_;
base::test::TaskEnvironment task_environment_{
base::test::TaskEnvironment::TimeSource::MOCK_TIME};
};
class TestCallbackWaiter {
public:
TestCallbackWaiter() : runner_(base::ThreadTaskRunnerHandle::Get()) {}
TestCallbackWaiter(const TestCallbackWaiter& other) = delete;
TestCallbackWaiter& operator=(const TestCallbackWaiter& other) = delete;
void Attach() {
const size_t old_counter = counter_.fetch_add(1);
DCHECK_GT(old_counter, 0u) << "Cannot attach when already being released";
}
void Signal() {
const size_t old_counter = counter_.fetch_sub(1);
DCHECK_GT(old_counter, 0u) << "Already being released";
if (old_counter > 1u) {
// There are more owners.
return;
}
// Dropping the last owner.
run_loop_.Quit();
}
void Wait() {
Signal(); // Rid of the constructor's ownership.
run_loop_.Run();
}
private:
std::atomic<size_t> counter_{1}; // Owned by constructor.
const scoped_refptr<base::SingleThreadTaskRunner> runner_;
base::RunLoop run_loop_;
};
TEST_P(StorageQueueStressTest,
WriteIntoNewStorageQueueReopenWriteMoreAndUpload) {
for (size_t iStart = 0; iStart < kTotalQueueStarts; ++iStart) {
TestCallbackWaiter write_waiter;
base::RepeatingCallback<void(Status)> cb = base::BindRepeating(
[](TestCallbackWaiter* waiter, Status status) {
EXPECT_OK(status);
waiter->Signal();
},
&write_waiter);
SCOPED_TRACE(base::StrCat({"Create ", base::NumberToString(iStart)}));
CreateTestStorageQueueOrDie(BuildStorageQueueOptionsOnlyManual());
// Write into the queue at random order (simultaneously).
SCOPED_TRACE(base::StrCat({"Write ", base::NumberToString(iStart)}));
const std::string rec_prefix =
base::StrCat({kDataPrefix, base::NumberToString(iStart), "_"});
for (size_t iRec = 0; iRec < kTotalWritesPerStart; ++iRec) {
write_waiter.Attach();
base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT},
base::BindOnce(
[](base::StringPiece rec_prefix, size_t iRec,
StorageQueueStressTest* test,
base::RepeatingCallback<void(Status)> cb) {
test->WriteStringAsync(
base::StrCat({rec_prefix, base::NumberToString(iRec)}), cb);
},
rec_prefix, iRec, this, cb));
}
write_waiter.Wait();
SCOPED_TRACE(base::StrCat({"Upload ", base::NumberToString(iStart)}));
storage_queue_->Flush();
SCOPED_TRACE(base::StrCat({"Reset ", base::NumberToString(iStart)}));
ResetTestStorageQueue();
EXPECT_THAT(last_record_digest_map_.size(),
Eq((iStart + 1) * kTotalWritesPerStart));
SCOPED_TRACE(base::StrCat({"Done ", base::NumberToString(iStart)}));
}
}
INSTANTIATE_TEST_SUITE_P(
VaryingFileSize,
StorageQueueStressTest,
testing::Values(1 * 1024LL, 2 * 1024LL, 3 * 1024LL, 4 * 1024LL));
} // namespace
} // namespace reporting
...@@ -3644,6 +3644,7 @@ test("unit_tests") { ...@@ -3644,6 +3644,7 @@ test("unit_tests") {
"../browser/policy/messaging_layer/public/report_queue_configuration_unittest.cc", "../browser/policy/messaging_layer/public/report_queue_configuration_unittest.cc",
"../browser/policy/messaging_layer/public/report_queue_unittest.cc", "../browser/policy/messaging_layer/public/report_queue_unittest.cc",
"../browser/policy/messaging_layer/storage/resources/resource_interface_unittest.cc", "../browser/policy/messaging_layer/storage/resources/resource_interface_unittest.cc",
"../browser/policy/messaging_layer/storage/storage_queue_stress_test.cc",
"../browser/policy/messaging_layer/storage/storage_queue_unittest.cc", "../browser/policy/messaging_layer/storage/storage_queue_unittest.cc",
"../browser/policy/messaging_layer/storage/storage_unittest.cc", "../browser/policy/messaging_layer/storage/storage_unittest.cc",
"../browser/policy/messaging_layer/storage/test_storage_module.cc", "../browser/policy/messaging_layer/storage/test_storage_module.cc",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment