Commit 1cbf8303 authored by David Roger's avatar David Roger Committed by Commit Bot

Revert "Add shutting down process to the Storage and StorageQueue."

This reverts commit 60d33a56.

Reason for revert: unittests are flaky. See https://crbug.com/1099629

Original change's description:
> Add shutting down process to the Storage and StorageQueue.
> 
> Making that will guarantee, that queue is destructed on the appropriate
> queue only and in order with other asynchronous operations (after them
> all). When the device is shutting down, it won't allow any new asyn
> operations to start.
> This should resolve crbug/1098359.
> 
> Bug: 1098359
> Change-Id: I12fd860778bc8dd7efb1f51f6d878c2cd1dd3256
> Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2268382
> Reviewed-by: Zach Trudo <zatrudo@google.com>
> Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#782828}

TBR=zatrudo@google.com,lbaraz@chromium.org

Change-Id: I63eeb790e5bbd164ed4e7b4884ef5ffe7ba457c4
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: 1098359, 1099629
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2270143Reviewed-by: default avatarDavid Roger <droger@chromium.org>
Commit-Queue: David Roger <droger@chromium.org>
Cr-Commit-Position: refs/heads/master@{#782917}
parent ef4c2d2d
...@@ -211,67 +211,7 @@ void Storage::Create( ...@@ -211,67 +211,7 @@ void Storage::Create(
Storage::Storage(const Options& options, StartUploadCb start_upload_cb) Storage::Storage(const Options& options, StartUploadCb start_upload_cb)
: options_(options), start_upload_cb_(std::move(start_upload_cb)) {} : options_(options), start_upload_cb_(std::move(start_upload_cb)) {}
Storage::~Storage() { Storage::~Storage() = default;
DCHECK(is_shutting_down_) << "Storage not shut down properly";
for (const auto& q : queues_) {
DCHECK_EQ(q.second.get(), nullptr)
<< "Queue has not been shutdown properly, priority=" << q.first;
}
}
// static
void Storage::ShutDown(scoped_refptr<Storage>* storage,
base::OnceCallback<void(Status)> done_cb) {
// Shuts down all queues of the Storage object.
class StorageShutDownContext : public TaskRunnerContext<Status> {
public:
StorageShutDownContext(scoped_refptr<Storage>* storage,
base::OnceCallback<void(Status)> callback)
: TaskRunnerContext<Status>(
std::move(callback),
base::ThreadPool::CreateSequencedTaskRunner(
{base::TaskPriority::BEST_EFFORT, base::MayBlock()})),
storage_(storage),
count_((*storage_)->queues_.size()) {}
private:
// Context can only be deleted by calling Response method.
~StorageShutDownContext() override { DCHECK_EQ(count_, 0); }
void OnStart() override {
CheckOnValidSequence();
(*storage_)->is_shutting_down_ = true;
for (auto& queue : (*storage_)->queues_) {
StorageQueue::ShutDown(
&queue.second,
base::BindOnce(&StorageShutDownContext::ScheduleQueueClosed, this));
}
}
void ScheduleQueueClosed() {
Schedule(&StorageShutDownContext::QueueClosed, this);
}
void QueueClosed() {
CheckOnValidSequence();
DCHECK_GT(count_, 0);
if (--count_ > 0) {
return;
}
storage_->reset();
Response(Status::StatusOK());
}
const std::vector<std::pair<Priority, StorageQueue::Options>>
queues_options_;
scoped_refptr<Storage>* const storage_;
int32_t count_;
Status final_status_;
};
// Asynchronously shut down.
Start<StorageShutDownContext>(storage, std::move(done_cb));
}
void Storage::Write(Priority priority, void Storage::Write(Priority priority,
base::span<const uint8_t> data, base::span<const uint8_t> data,
......
...@@ -95,14 +95,6 @@ class Storage : public base::RefCountedThreadSafe<Storage> { ...@@ -95,14 +95,6 @@ class Storage : public base::RefCountedThreadSafe<Storage> {
uint64_t seq_number, uint64_t seq_number,
base::OnceCallback<void(Status)> completion_cb); base::OnceCallback<void(Status)> completion_cb);
// Shuts the Storage down with each queue on its sequenced task runner, and
// returns by calling |done_cb| closure. Guarantees that all previously
// scheduled operations are complete before the Storage is shut down and calls
// |done_cb|, assuming no queue is referenced elsewhere. Status passed to
// done_cb currently has no meaning and is expected to be OK.
static void ShutDown(scoped_refptr<Storage>* storage,
base::OnceCallback<void(Status)> done_cb);
Storage(const Storage& other) = delete; Storage(const Storage& other) = delete;
Storage& operator=(const Storage& other) = delete; Storage& operator=(const Storage& other) = delete;
...@@ -129,10 +121,6 @@ class Storage : public base::RefCountedThreadSafe<Storage> { ...@@ -129,10 +121,6 @@ class Storage : public base::RefCountedThreadSafe<Storage> {
// Map priority->StorageQueue. // Map priority->StorageQueue.
base::flat_map<Priority, scoped_refptr<StorageQueue>> queues_; base::flat_map<Priority, scoped_refptr<StorageQueue>> queues_;
// Flag indicating that the Storage is shutting down, and no new asynchronous
// operations can be started.
bool is_shutting_down_ = false;
// Upload provider callback. // Upload provider callback.
const StartUploadCb start_upload_cb_; const StartUploadCb start_upload_cb_;
}; };
......
...@@ -121,9 +121,9 @@ StorageQueue::StorageQueue(const Options& options, ...@@ -121,9 +121,9 @@ StorageQueue::StorageQueue(const Options& options,
} }
StorageQueue::~StorageQueue() { StorageQueue::~StorageQueue() {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_); // TODO(b/153364303): Should be
DCHECK(is_shutting_down_) << "StorageQueue not shut down properly"; // DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_);
DCHECK_EQ(active_read_operations_, 0);
// Stop upload timer. // Stop upload timer.
upload_timer_.AbandonAndStop(); upload_timer_.AbandonAndStop();
// CLose all opened files. // CLose all opened files.
...@@ -365,10 +365,6 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> { ...@@ -365,10 +365,6 @@ class StorageQueue::ReadContext : public TaskRunnerContext<Status> {
void OnStart() override { void OnStart() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(read_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(read_sequence_checker_);
if (storage_queue_->is_shutting_down_) {
Response(Status(error::UNAVAILABLE, "StorageQueue shutting down"));
return;
}
seq_number_ = storage_queue_->first_seq_number_; seq_number_ = storage_queue_->first_seq_number_;
// If the last file is not empty (has at least one record), // If the last file is not empty (has at least one record),
// close it and create the new one, so that its records are // close it and create the new one, so that its records are
...@@ -566,11 +562,6 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> { ...@@ -566,11 +562,6 @@ class StorageQueue::WriteContext : public TaskRunnerContext<Status> {
void OnStart() override { void OnStart() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(write_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(write_sequence_checker_);
if (storage_queue_->is_shutting_down_) {
Response(Status(error::UNAVAILABLE, "StorageQueue shutting down"));
return;
}
// Prepare uploader, if need to run it after Write. // Prepare uploader, if need to run it after Write.
if (storage_queue_->options_.upload_period().is_zero()) { if (storage_queue_->options_.upload_period().is_zero()) {
StatusOr<std::unique_ptr<UploaderInterface>> uploader = StatusOr<std::unique_ptr<UploaderInterface>> uploader =
...@@ -688,10 +679,6 @@ class StorageQueue::ConfirmContext : public TaskRunnerContext<Status> { ...@@ -688,10 +679,6 @@ class StorageQueue::ConfirmContext : public TaskRunnerContext<Status> {
void OnStart() override { void OnStart() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(confirm_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(confirm_sequence_checker_);
if (storage_queue_->is_shutting_down_) {
Response(Status(error::UNAVAILABLE, "StorageQueue shutting down"));
return;
}
Response(storage_queue_->RemoveUnusedFiles(seq_number_)); Response(storage_queue_->RemoveUnusedFiles(seq_number_));
} }
...@@ -708,20 +695,6 @@ void StorageQueue::Confirm(uint64_t seq_number, ...@@ -708,20 +695,6 @@ void StorageQueue::Confirm(uint64_t seq_number,
Start<ConfirmContext>(seq_number, std::move(completion_cb), this); Start<ConfirmContext>(seq_number, std::move(completion_cb), this);
} }
// static
void StorageQueue::ShutDown(scoped_refptr<StorageQueue>* queue,
base::OnceClosure done_cb) {
(*queue)->sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](scoped_refptr<StorageQueue>* queue, base::OnceClosure done_cb) {
(*queue)->is_shutting_down_ = true;
queue->reset();
std::move(done_cb).Run();
},
base::Unretained(queue), std::move(done_cb)));
}
Status StorageQueue::RemoveUnusedFiles(uint64_t seq_number) { Status StorageQueue::RemoveUnusedFiles(uint64_t seq_number) {
DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(storage_queue_sequence_checker_);
if (first_seq_number_ <= seq_number) { if (first_seq_number_ <= seq_number) {
......
...@@ -145,13 +145,6 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -145,13 +145,6 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
void Confirm(uint64_t seq_number, void Confirm(uint64_t seq_number,
base::OnceCallback<void(Status)> completion_cb); base::OnceCallback<void(Status)> completion_cb);
// Shuts the StorageQueue down on its sequenced task runner, and returns by
// calling |done_cb| closure. If the reference is the last one, ShutDown
// guarantees that all previously scheduled operations are complete before the
// queue is shut down.
static void ShutDown(scoped_refptr<StorageQueue>* queue,
base::OnceClosure done_cb);
StorageQueue(const StorageQueue& other) = delete; StorageQueue(const StorageQueue& other) = delete;
StorageQueue& operator=(const StorageQueue& other) = delete; StorageQueue& operator=(const StorageQueue& other) = delete;
...@@ -306,10 +299,6 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> { ...@@ -306,10 +299,6 @@ class StorageQueue : public base::RefCountedThreadSafe<StorageQueue> {
// destructor. // destructor.
int32_t active_read_operations_ = 0; int32_t active_read_operations_ = 0;
// Flag indicating that the queue is shutting down, and no new asynchronous
// operations can be started.
bool is_shutting_down_ = false;
// Upload timer (active only if options_.upload_period() is not 0). // Upload timer (active only if options_.upload_period() is not 0).
base::RepeatingTimer upload_timer_; base::RepeatingTimer upload_timer_;
......
...@@ -132,12 +132,6 @@ class StorageQueueTest : public ::testing::TestWithParam<size_t> { ...@@ -132,12 +132,6 @@ class StorageQueueTest : public ::testing::TestWithParam<size_t> {
protected: protected:
void SetUp() override { ASSERT_TRUE(location_.CreateUniqueTempDir()); } void SetUp() override { ASSERT_TRUE(location_.CreateUniqueTempDir()); }
void TearDown() override {
if (storage_queue_) {
ShutDownStorageQueue();
}
}
void CreateStorageQueueOrDie(const StorageQueue::Options& options) { void CreateStorageQueueOrDie(const StorageQueue::Options& options) {
ASSERT_FALSE(storage_queue_) << "StorageQueue already assigned"; ASSERT_FALSE(storage_queue_) << "StorageQueue already assigned";
TestEvent<StatusOr<scoped_refptr<StorageQueue>>> e; TestEvent<StatusOr<scoped_refptr<StorageQueue>>> e;
...@@ -152,19 +146,6 @@ class StorageQueueTest : public ::testing::TestWithParam<size_t> { ...@@ -152,19 +146,6 @@ class StorageQueueTest : public ::testing::TestWithParam<size_t> {
storage_queue_ = std::move(storage_queue_result.ValueOrDie()); storage_queue_ = std::move(storage_queue_result.ValueOrDie());
} }
void ShutDownStorageQueue() {
base::WaitableEvent completed(
base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED);
StorageQueue::ShutDown(
&storage_queue_,
base::BindOnce(
[](base::WaitableEvent* completed) { completed->Signal(); },
base::Unretained(&completed)));
completed.Wait();
ASSERT_EQ(storage_queue_.get(), nullptr);
}
StorageQueue::Options BuildStorageQueueOptionsImmediate() const { StorageQueue::Options BuildStorageQueueOptionsImmediate() const {
return StorageQueue::Options() return StorageQueue::Options()
.set_directory( .set_directory(
...@@ -224,8 +205,7 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndReopen) { ...@@ -224,8 +205,7 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndReopen) {
WriteStringOrDie(blobs[1]); WriteStringOrDie(blobs[1]);
WriteStringOrDie(blobs[2]); WriteStringOrDie(blobs[2]);
ShutDownStorageQueue(); storage_queue_.reset();
ASSERT_EQ(storage_queue_.get(), nullptr);
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic()); CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
} }
......
...@@ -162,12 +162,6 @@ class StorageTest : public ::testing::Test { ...@@ -162,12 +162,6 @@ class StorageTest : public ::testing::Test {
protected: protected:
void SetUp() override { ASSERT_TRUE(location_.CreateUniqueTempDir()); } void SetUp() override { ASSERT_TRUE(location_.CreateUniqueTempDir()); }
void TearDown() override {
if (storage_) {
ShutDownStorage();
}
}
void CreateStorageTestOrDie(const Storage::Options& options) { void CreateStorageTestOrDie(const Storage::Options& options) {
ASSERT_FALSE(storage_) << "StorageTest already assigned"; ASSERT_FALSE(storage_) << "StorageTest already assigned";
TestEvent<StatusOr<scoped_refptr<Storage>>> e; TestEvent<StatusOr<scoped_refptr<Storage>>> e;
...@@ -181,21 +175,6 @@ class StorageTest : public ::testing::Test { ...@@ -181,21 +175,6 @@ class StorageTest : public ::testing::Test {
storage_ = std::move(storage_result.ValueOrDie()); storage_ = std::move(storage_result.ValueOrDie());
} }
void ShutDownStorage() {
base::WaitableEvent completed(
base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED);
Storage::ShutDown(&storage_,
base::BindOnce(
[](base::WaitableEvent* completed, Status status) {
ASSERT_OK(status);
completed->Signal();
},
base::Unretained(&completed)));
completed.Wait();
ASSERT_EQ(storage_.get(), nullptr);
}
Storage::Options BuildStorageOptions() const { Storage::Options BuildStorageOptions() const {
return Storage::Options().set_directory( return Storage::Options().set_directory(
base::FilePath(location_.GetPath())); base::FilePath(location_.GetPath()));
...@@ -248,8 +227,7 @@ TEST_F(StorageTest, WriteIntoNewStorageAndReopen) { ...@@ -248,8 +227,7 @@ TEST_F(StorageTest, WriteIntoNewStorageAndReopen) {
WriteStringOrDie(FAST_BATCH, blobs[1]); WriteStringOrDie(FAST_BATCH, blobs[1]);
WriteStringOrDie(FAST_BATCH, blobs[2]); WriteStringOrDie(FAST_BATCH, blobs[2]);
ShutDownStorage(); storage_.reset();
ASSERT_EQ(storage_.get(), nullptr);
CreateStorageTestOrDie(BuildStorageOptions()); CreateStorageTestOrDie(BuildStorageOptions());
} }
...@@ -463,7 +441,8 @@ TEST_F(StorageTest, WriteAndRepeatedlyImmediateUploadWithConfirmations) { ...@@ -463,7 +441,8 @@ TEST_F(StorageTest, WriteAndRepeatedlyImmediateUploadWithConfirmations) {
WriteStringOrDie(IMMEDIATE, more_blobs[2]); WriteStringOrDie(IMMEDIATE, more_blobs[2]);
} }
TEST_F(StorageTest, WriteAndRepeatedlyUploadMultipleQueues) { // TODO(crbug.com/1098359): Flaky.
TEST_F(StorageTest, DISABLED_WriteAndRepeatedlyUploadMultipleQueues) {
CreateStorageTestOrDie(BuildStorageOptions()); CreateStorageTestOrDie(BuildStorageOptions());
// Upload is initiated asynchronously, so it may happen after the next // Upload is initiated asynchronously, so it may happen after the next
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment