Commit 60d33a56 authored by Leonid Baraz's avatar Leonid Baraz Committed by Commit Bot

Add shutting down process to the Storage and StorageQueue.

Making that will guarantee, that queue is destructed on the appropriate
queue only and in order with other asynchronous operations (after them
all). When the device is shutting down, it won't allow any new asyn
operations to start.
This should resolve crbug/1098359.

Bug: 1098359
Change-Id: I12fd860778bc8dd7efb1f51f6d878c2cd1dd3256
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2268382Reviewed-by: default avatarZach Trudo <zatrudo@google.com>
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#782828}
parent 02f0bd94
...@@ -211,7 +211,67 @@ void Storage::Create( ...@@ -211,7 +211,67 @@ void Storage::Create(
Storage::Storage(const Options& options, StartUploadCb start_upload_cb) Storage::Storage(const Options& options, StartUploadCb start_upload_cb)
: options_(options), start_upload_cb_(std::move(start_upload_cb)) {} : options_(options), start_upload_cb_(std::move(start_upload_cb)) {}
Storage::~Storage() = default; Storage::~Storage() {
DCHECK(is_shutting_down_) << "Storage not shut down properly";
for (const auto& q : queues_) {
DCHECK_EQ(q.second.get(), nullptr)
<< "Queue has not been shutdown properly, priority=" << q.first;
}
}
// static
void Storage::ShutDown(scoped_refptr<Storage>* storage,
base::OnceCallback<void(Status)> done_cb) {
// Shuts down all queues of the Storage object.
class StorageShutDownContext : public TaskRunnerContext<Status> {
public:
StorageShutDownContext(scoped_refptr<Storage>* storage,
base::OnceCallback<void(Status)> callback)
: TaskRunnerContext<Status>(
std::move(callback),
base::ThreadPool::CreateSequencedTaskRunner(
{base::TaskPriority::BEST_EFFORT, base::MayBlock()})),
storage_(storage),
count_((*storage_)->queues_.size()) {}
private:
// Context can only be deleted by calling Response method.
~StorageShutDownContext() override { DCHECK_EQ(count_, 0); }
void OnStart() override {
CheckOnValidSequence();
(*storage_)->is_shutting_down_ = true;
for (auto& queue : (*storage_)->queues_) {
StorageQueue::ShutDown(
&queue.second,
base::BindOnce(&StorageShutDownContext::ScheduleQueueClosed, this));
}
}
void ScheduleQueueClosed() {
Schedule(&StorageShutDownContext::QueueClosed, this);
}
void QueueClosed() {
CheckOnValidSequence();
DCHECK_GT(count_, 0);
if (--count_ > 0) {
return;
}
storage_->reset();
Response(Status::StatusOK());
}
const std::vector<std::pair<Priority, StorageQueue::Options>>
queues_options_;
scoped_refptr<Storage>* const storage_;
int32_t count_;
Status final_status_;
};
// Asynchronously shut down.
Start<StorageShutDownContext>(storage, std::move(done_cb));
}
void Storage::Write(Priority priority, void Storage::Write(Priority priority,
base::span<const uint8_t> data, base::span<const uint8_t> data,
......
...@@ -95,6 +95,14 @@ class Storage : public base::RefCountedThreadSafe<Storage> { ...@@ -95,6 +95,14 @@ class Storage : public base::RefCountedThreadSafe<Storage> {
uint64_t seq_number, uint64_t seq_number,
base::OnceCallback<void(Status)> completion_cb); base::OnceCallback<void(Status)> completion_cb);
// Shuts the Storage down with each queue on its sequenced task runner, and
// returns by calling |done_cb| closure. Guarantees that all previously
// scheduled operations are complete before the Storage is shut down and calls
// |done_cb|, assuming no queue is referenced elsewhere. Status passed to
// done_cb currently has no meaning and is expected to be OK.
static void ShutDown(scoped_refptr<Storage>* storage,
base::OnceCallback<void(Status)> done_cb);
Storage(const Storage& other) = delete; Storage(const Storage& other) = delete;
Storage& operator=(const Storage& other) = delete; Storage& operator=(const Storage& other) = delete;
...@@ -121,6 +129,10 @@ class Storage : public base::RefCountedThreadSafe<Storage> { ...@@ -121,6 +129,10 @@ class Storage : public base::RefCountedThreadSafe<Storage> {
// Map priority->StorageQueue. // Map priority->StorageQueue.
base::flat_map<Priority, scoped_refptr<StorageQueue>> queues_; base::flat_map<Priority, scoped_refptr<StorageQueue>> queues_;
// Flag indicating that the Storage is shutting down, and no new asynchronous
// operations can be started.
bool is_shutting_down_ = false;
// Upload provider callback. // Upload provider callback.
const StartUploadCb start_upload_cb_; const StartUploadCb start_upload_cb_;
}; };
......
...@@ -121,9 +121,9 @@ StorageQueue::StorageQueue(const Options& options, ...@@ -121,9 +121,9 @@ StorageQueue::StorageQueue(const Options& options,
} }
StorageQueue::~StorageQueue() { StorageQueue::~StorageQueue() {
// TODO(b/153364303): Should be DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_);
// DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_); DCHECK(is_shutting_down_) << "StorageQueue not shut down properly";
DCHECK_EQ(active_read_operations_, 0);
// Stop upload timer. // Stop upload timer.
upload_timer_.AbandonAndStop(); upload_timer_.AbandonAndStop();
// CLose all opened files. // CLose all opened files.
...@@ -365,6 +365,10 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> { ...@@ -365,6 +365,10 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> {
void OnStart() override { void OnStart() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(read_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(read_sequence_checker_);
if (storage_queue_->is_shutting_down_) {
Response(Status(error::UNAVAILABLE, "StorageQueue shutting down"));
return;
}
seq_number_ = storage_queue_->first_seq_number_; seq_number_ = storage_queue_->first_seq_number_;
// If the last file is not empty (has at least one record), // If the last file is not empty (has at least one record),
// close it and create the new one, so that its records are // close it and create the new one, so that its records are
...@@ -562,6 +566,11 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> { ...@@ -562,6 +566,11 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
void OnStart() override { void OnStart() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(write_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(write_sequence_checker_);
if (storage_queue_->is_shutting_down_) {
Response(Status(error::UNAVAILABLE, "StorageQueue shutting down"));
return;
}
// Prepare uploader, if need to run it after Write. // Prepare uploader, if need to run it after Write.
if (storage_queue_->options_.upload_period().is_zero()) { if (storage_queue_->options_.upload_period().is_zero()) {
StatusOr<std::unique_ptr<UploaderInterface>> uploader = StatusOr<std::unique_ptr<UploaderInterface>> uploader =
...@@ -679,6 +688,10 @@ class StorageQueue::ConfirmContext : public TaskRunnerContext<Status> { ...@@ -679,6 +688,10 @@ class StorageQueue::ConfirmContext : public TaskRunnerContext<Status> {
void OnStart() override { void OnStart() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(confirm_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(confirm_sequence_checker_);
if (storage_queue_->is_shutting_down_) {
Response(Status(error::UNAVAILABLE, "StorageQueue shutting down"));
return;
}
Response(storage_queue_->RemoveUnusedFiles(seq_number_)); Response(storage_queue_->RemoveUnusedFiles(seq_number_));
} }
...@@ -695,6 +708,20 @@ void StorageQueue::Confirm(uint64_t seq_number, ...@@ -695,6 +708,20 @@ void StorageQueue::Confirm(uint64_t seq_number,
Start<ConfirmContext>(seq_number, std::move(completion_cb), this); Start<ConfirmContext>(seq_number, std::move(completion_cb), this);
} }
// static
void StorageQueue::ShutDown(scoped_refptr<StorageQueue>* queue,
base::OnceClosure done_cb) {
(*queue)->sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](scoped_refptr<StorageQueue>* queue, base::OnceClosure done_cb) {
(*queue)->is_shutting_down_ = true;
queue->reset();
std::move(done_cb).Run();
},
base::Unretained(queue), std::move(done_cb)));
}
Status StorageQueue::RemoveUnusedFiles(uint64_t seq_number) { Status StorageQueue::RemoveUnusedFiles(uint64_t seq_number) {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_);
if (first_seq_number_ <= seq_number) { if (first_seq_number_ <= seq_number) {
......
...@@ -145,6 +145,13 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -145,6 +145,13 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
void Confirm(uint64_t seq_number, void Confirm(uint64_t seq_number,
base::OnceCallback<void(Status)> completion_cb); base::OnceCallback<void(Status)> completion_cb);
// Shuts the StorageQueue down on its sequenced task runner, and returns by
// calling |done_cb| closure. If the reference is the last one, ShutDown
// guarantees that all previously scheduled operations are complete before the
// queue is shut down.
static void ShutDown(scoped_refptr<StorageQueue>* queue,
base::OnceClosure done_cb);
StorageQueue(const StorageQueue& other) = delete; StorageQueue(const StorageQueue& other) = delete;
StorageQueue& operator=(const StorageQueue& other) = delete; StorageQueue& operator=(const StorageQueue& other) = delete;
...@@ -299,6 +306,10 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -299,6 +306,10 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// destructor. // destructor.
int32_t active_read_operations_ = 0; int32_t active_read_operations_ = 0;
// Flag indicating that the queue is shutting down, and no new asynchronous
// operations can be started.
bool is_shutting_down_ = false;
// Upload timer (active only if options_.upload_period() is not 0). // Upload timer (active only if options_.upload_period() is not 0).
base::RepeatingTimer upload_timer_; base::RepeatingTimer upload_timer_;
......
...@@ -132,6 +132,12 @@ class StorageQueueTest : public ::testing::TestWithParam<size_t> { ...@@ -132,6 +132,12 @@ class StorageQueueTest : public ::testing::TestWithParam<size_t> {
protected: protected:
void SetUp() override { ASSERT_TRUE(location_.CreateUniqueTempDir()); } void SetUp() override { ASSERT_TRUE(location_.CreateUniqueTempDir()); }
void TearDown() override {
if (storage_queue_) {
ShutDownStorageQueue();
}
}
void CreateStorageQueueOrDie(const StorageQueue::Options& options) { void CreateStorageQueueOrDie(const StorageQueue::Options& options) {
ASSERT_FALSE(storage_queue_) << "StorageQueue already assigned"; ASSERT_FALSE(storage_queue_) << "StorageQueue already assigned";
TestEvent<StatusOr<scoped_refptr<StorageQueue>>> e; TestEvent<StatusOr<scoped_refptr<StorageQueue>>> e;
...@@ -146,6 +152,19 @@ class StorageQueueTest : public ::testing::TestWithParam<size_t> { ...@@ -146,6 +152,19 @@ class StorageQueueTest : public ::testing::TestWithParam<size_t> {
storage_queue_ = std::move(storage_queue_result.ValueOrDie()); storage_queue_ = std::move(storage_queue_result.ValueOrDie());
} }
void ShutDownStorageQueue() {
base::WaitableEvent completed(
base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED);
StorageQueue::ShutDown(
&storage_queue_,
base::BindOnce(
[](base::WaitableEvent* completed) { completed->Signal(); },
base::Unretained(&completed)));
completed.Wait();
ASSERT_EQ(storage_queue_.get(), nullptr);
}
StorageQueue::Options BuildStorageQueueOptionsImmediate() const { StorageQueue::Options BuildStorageQueueOptionsImmediate() const {
return StorageQueue::Options() return StorageQueue::Options()
.set_directory( .set_directory(
...@@ -205,7 +224,8 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndReopen) { ...@@ -205,7 +224,8 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndReopen) {
WriteStringOrDie(blobs[1]); WriteStringOrDie(blobs[1]);
WriteStringOrDie(blobs[2]); WriteStringOrDie(blobs[2]);
storage_queue_.reset(); ShutDownStorageQueue();
ASSERT_EQ(storage_queue_.get(), nullptr);
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic()); CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
} }
......
...@@ -162,6 +162,12 @@ class StorageTest : public ::testing::Test { ...@@ -162,6 +162,12 @@ class StorageTest : public ::testing::Test {
protected: protected:
void SetUp() override { ASSERT_TRUE(location_.CreateUniqueTempDir()); } void SetUp() override { ASSERT_TRUE(location_.CreateUniqueTempDir()); }
void TearDown() override {
if (storage_) {
ShutDownStorage();
}
}
void CreateStorageTestOrDie(const Storage::Options& options) { void CreateStorageTestOrDie(const Storage::Options& options) {
ASSERT_FALSE(storage_) << "StorageTest already assigned"; ASSERT_FALSE(storage_) << "StorageTest already assigned";
TestEvent<StatusOr<scoped_refptr<Storage>>> e; TestEvent<StatusOr<scoped_refptr<Storage>>> e;
...@@ -175,6 +181,21 @@ class StorageTest : public ::testing::Test { ...@@ -175,6 +181,21 @@ class StorageTest : public ::testing::Test {
storage_ = std::move(storage_result.ValueOrDie()); storage_ = std::move(storage_result.ValueOrDie());
} }
void ShutDownStorage() {
base::WaitableEvent completed(
base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED);
Storage::ShutDown(&storage_,
base::BindOnce(
[](base::WaitableEvent* completed, Status status) {
ASSERT_OK(status);
completed->Signal();
},
base::Unretained(&completed)));
completed.Wait();
ASSERT_EQ(storage_.get(), nullptr);
}
Storage::Options BuildStorageOptions() const { Storage::Options BuildStorageOptions() const {
return Storage::Options().set_directory( return Storage::Options().set_directory(
base::FilePath(location_.GetPath())); base::FilePath(location_.GetPath()));
...@@ -227,7 +248,8 @@ TEST_F(StorageTest, WriteIntoNewStorageAndReopen) { ...@@ -227,7 +248,8 @@ TEST_F(StorageTest, WriteIntoNewStorageAndReopen) {
WriteStringOrDie(FAST_BATCH, blobs[1]); WriteStringOrDie(FAST_BATCH, blobs[1]);
WriteStringOrDie(FAST_BATCH, blobs[2]); WriteStringOrDie(FAST_BATCH, blobs[2]);
storage_.reset(); ShutDownStorage();
ASSERT_EQ(storage_.get(), nullptr);
CreateStorageTestOrDie(BuildStorageOptions()); CreateStorageTestOrDie(BuildStorageOptions());
} }
...@@ -441,8 +463,7 @@ TEST_F(StorageTest, WriteAndRepeatedlyImmediateUploadWithConfirmations) { ...@@ -441,8 +463,7 @@ TEST_F(StorageTest, WriteAndRepeatedlyImmediateUploadWithConfirmations) {
WriteStringOrDie(IMMEDIATE, more_blobs[2]); WriteStringOrDie(IMMEDIATE, more_blobs[2]);
} }
// TODO(crbug.com/1098359): Flaky. TEST_F(StorageTest, WriteAndRepeatedlyUploadMultipleQueues) {
TEST_F(StorageTest, DISABLED_WriteAndRepeatedlyUploadMultipleQueues) {
CreateStorageTestOrDie(BuildStorageOptions()); CreateStorageTestOrDie(BuildStorageOptions());
// Upload is initiated asynchronously, so it may happen after the next // Upload is initiated asynchronously, so it may happen after the next
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment