Commit 3ed7a262 authored by Leonid Baraz's avatar Leonid Baraz Committed by Commit Bot

Fix reopening storage queue.

Bug: b:153364303
Change-Id: Idb13d9993a6301cda0ca7514a7c0fc457c7e90a2
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2364060Reviewed-by: default avatarZach Trudo <zatrudo@google.com>
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#799849}
parent 210beb90
...@@ -205,7 +205,7 @@ Status StorageQueue::ScanLastFile() { ...@@ -205,7 +205,7 @@ Status StorageQueue::ScanLastFile() {
// and reopening it. If the file remains open for too long, it will auto-close // and reopening it. If the file remains open for too long, it will auto-close
// by timer. // by timer.
scoped_refptr<SingleFile> last_file = files_.rbegin()->second.get(); scoped_refptr<SingleFile> last_file = files_.rbegin()->second.get();
auto open_status = last_file->Open(/*read_only=*/true); auto open_status = last_file->Open(/*read_only=*/false);
if (!open_status.ok()) { if (!open_status.ok()) {
LOG(ERROR) << "Error opening file " << last_file->name() LOG(ERROR) << "Error opening file " << last_file->name()
<< ", status=" << open_status; << ", status=" << open_status;
...@@ -328,17 +328,24 @@ Status StorageQueue::WriteHeaderAndBlock( ...@@ -328,17 +328,24 @@ Status StorageQueue::WriteHeaderAndBlock(
// Write to the last file, update sequencing number. // Write to the last file, update sequencing number.
auto open_status = file->Open(/*read_only=*/false); auto open_status = file->Open(/*read_only=*/false);
if (!open_status.ok()) { if (!open_status.ok()) {
return Status(error::ALREADY_EXISTS, "Cannot open file"); return Status(error::ALREADY_EXISTS,
base::StrCat({"Cannot open file=", file->name(),
" status=", open_status.ToString()}));
} }
auto write_status = file->Append(base::make_span( auto write_status = file->Append(base::make_span(
reinterpret_cast<const uint8_t*>(&header), sizeof(header))); reinterpret_cast<const uint8_t*>(&header), sizeof(header)));
if (!write_status.ok()) { if (!write_status.ok()) {
return Status(error::RESOURCE_EXHAUSTED, "Cannot write file"); return Status(error::RESOURCE_EXHAUSTED,
base::StrCat({"Cannot write file=", file->name(),
" status=", write_status.status().ToString()}));
} }
if (data.size() > 0) { if (data.size() > 0) {
write_status = file->Append(data); write_status = file->Append(data);
if (!write_status.ok()) { if (!write_status.ok()) {
return Status(error::RESOURCE_EXHAUSTED, "Cannot write file"); return Status(
error::RESOURCE_EXHAUSTED,
base::StrCat({"Cannot write file=", file->name(),
" status=", write_status.status().ToString()}));
} }
// Pad to the whole frame, if necessary. // Pad to the whole frame, if necessary.
const size_t pad_size = const size_t pad_size =
...@@ -350,7 +357,10 @@ Status StorageQueue::WriteHeaderAndBlock( ...@@ -350,7 +357,10 @@ Status StorageQueue::WriteHeaderAndBlock(
crypto::RandBytes(pad_span); crypto::RandBytes(pad_span);
write_status = file->Append(pad_span); write_status = file->Append(pad_span);
if (!write_status.ok()) { if (!write_status.ok()) {
return Status(error::RESOURCE_EXHAUSTED, "Cannot pad file"); return Status(
error::RESOURCE_EXHAUSTED,
base::StrCat({"Cannot pad file=", file->name(),
" status=", write_status.status().ToString()}));
} }
} }
} }
...@@ -799,9 +809,9 @@ Status StorageQueue::SingleFile::Open(bool read_only) { ...@@ -799,9 +809,9 @@ Status StorageQueue::SingleFile::Open(bool read_only) {
return Status::StatusOK(); return Status::StatusOK();
} }
handle_ = std::make_unique<base::File>( handle_ = std::make_unique<base::File>(
filename_, read_only filename_, read_only ? (base::File::FLAG_OPEN | base::File::FLAG_READ)
? (base::File::FLAG_OPEN | base::File::FLAG_READ) : (base::File::FLAG_OPEN_ALWAYS |
: (base::File::FLAG_CREATE | base::File::FLAG_APPEND)); base::File::FLAG_APPEND | base::File::FLAG_READ));
if (!handle_ || !handle_->IsValid()) { if (!handle_ || !handle_->IsValid()) {
return Status(error::DATA_LOSS, return Status(error::DATA_LOSS,
base::StrCat({"Cannot open file=", name(), " for ", base::StrCat({"Cannot open file=", name(), " for ",
...@@ -842,7 +852,6 @@ Status StorageQueue::SingleFile::Delete() { ...@@ -842,7 +852,6 @@ Status StorageQueue::SingleFile::Delete() {
StatusOr<base::span<const uint8_t>> StorageQueue::SingleFile::Read( StatusOr<base::span<const uint8_t>> StorageQueue::SingleFile::Read(
uint32_t pos, uint32_t pos,
uint32_t size) { uint32_t size) {
DCHECK(is_readonly());
if (!handle_) { if (!handle_) {
return Status(error::UNAVAILABLE, base::StrCat({"File not open ", name()})); return Status(error::UNAVAILABLE, base::StrCat({"File not open ", name()}));
} }
...@@ -917,8 +926,8 @@ StatusOr<uint32_t> StorageQueue::SingleFile::Append( ...@@ -917,8 +926,8 @@ StatusOr<uint32_t> StorageQueue::SingleFile::Append(
} }
size_t actual_size = 0; size_t actual_size = 0;
while (data.size() > 0) { while (data.size() > 0) {
const int32_t result = handle_->WriteAtCurrentPos( const int32_t result = handle_->Write(
reinterpret_cast<const char*>(data.data()), data.size()); size_, reinterpret_cast<const char*>(data.data()), data.size());
if (result < 0) { if (result < 0) {
return Status( return Status(
error::DATA_LOSS, error::DATA_LOSS,
......
...@@ -214,6 +214,21 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndReopen) { ...@@ -214,6 +214,21 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndReopen) {
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic()); CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
} }
TEST_P(StorageQueueTest, WriteIntoNewStorageQueueReopenAndWriteMore) {
EXPECT_CALL(set_mock_uploader_expectations_, Call(NotNull())).Times(0);
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
WriteStringOrDie(blobs[0]);
WriteStringOrDie(blobs[1]);
WriteStringOrDie(blobs[2]);
storage_queue_.reset();
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
WriteStringOrDie(more_blobs[0]);
WriteStringOrDie(more_blobs[1]);
WriteStringOrDie(more_blobs[2]);
}
TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndUpload) { TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndUpload) {
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic()); CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
WriteStringOrDie(blobs[0]); WriteStringOrDie(blobs[0]);
...@@ -233,6 +248,35 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndUpload) { ...@@ -233,6 +248,35 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndUpload) {
task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1)); task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1));
} }
TEST_P(StorageQueueTest, WriteIntoNewStorageQueueReopenWriteMoreAndUpload) {
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
WriteStringOrDie(blobs[0]);
WriteStringOrDie(blobs[1]);
WriteStringOrDie(blobs[2]);
storage_queue_.reset();
CreateStorageQueueOrDie(BuildStorageQueueOptionsPeriodic());
WriteStringOrDie(more_blobs[0]);
WriteStringOrDie(more_blobs[1]);
WriteStringOrDie(more_blobs[2]);
// Set uploader expectations.
EXPECT_CALL(set_mock_uploader_expectations_, Call(NotNull()))
.WillOnce(Invoke([](MockUploadClient* mock_upload_client) {
MockUploadClient::SetUp(mock_upload_client)
.Required(blobs[0])
.Required(blobs[1])
.Required(blobs[2])
.Required(more_blobs[0])
.Required(more_blobs[1])
.Required(more_blobs[2]);
}));
// Trigger upload.
task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1));
}
TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndFlush) { TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndFlush) {
CreateStorageQueueOrDie(BuildStorageQueueOptionsOnlyManual()); CreateStorageQueueOrDie(BuildStorageQueueOptionsOnlyManual());
WriteStringOrDie(blobs[0]); WriteStringOrDie(blobs[0]);
...@@ -252,6 +296,35 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndFlush) { ...@@ -252,6 +296,35 @@ TEST_P(StorageQueueTest, WriteIntoNewStorageQueueAndFlush) {
storage_queue_->Flush(); storage_queue_->Flush();
} }
TEST_P(StorageQueueTest, WriteIntoNewStorageQueueReopenWriteMoreAndFlush) {
CreateStorageQueueOrDie(BuildStorageQueueOptionsOnlyManual());
WriteStringOrDie(blobs[0]);
WriteStringOrDie(blobs[1]);
WriteStringOrDie(blobs[2]);
storage_queue_.reset();
CreateStorageQueueOrDie(BuildStorageQueueOptionsOnlyManual());
WriteStringOrDie(more_blobs[0]);
WriteStringOrDie(more_blobs[1]);
WriteStringOrDie(more_blobs[2]);
// Set uploader expectations.
EXPECT_CALL(set_mock_uploader_expectations_, Call(NotNull()))
.WillOnce(Invoke([](MockUploadClient* mock_upload_client) {
MockUploadClient::SetUp(mock_upload_client)
.Required(blobs[0])
.Required(blobs[1])
.Required(blobs[2])
.Required(more_blobs[0])
.Required(more_blobs[1])
.Required(more_blobs[2]);
}));
// Flush manually.
storage_queue_->Flush();
}
TEST_P(StorageQueueTest, ValidateVariousRecordSizes) { TEST_P(StorageQueueTest, ValidateVariousRecordSizes) {
std::vector<std::string> blobs; std::vector<std::string> blobs;
for (size_t i = 16; i < 16 + 16; ++i) { for (size_t i = 16; i < 16 + 16; ++i) {
......
...@@ -232,6 +232,21 @@ TEST_F(StorageTest, WriteIntoNewStorageAndReopen) { ...@@ -232,6 +232,21 @@ TEST_F(StorageTest, WriteIntoNewStorageAndReopen) {
CreateStorageTestOrDie(BuildStorageOptions()); CreateStorageTestOrDie(BuildStorageOptions());
} }
TEST_F(StorageTest, WriteIntoNewStorageReopenAndWriteMore) {
EXPECT_CALL(set_mock_uploader_expectations_, Call(_, NotNull())).Times(0);
CreateStorageTestOrDie(BuildStorageOptions());
WriteStringOrDie(FAST_BATCH, blobs[0]);
WriteStringOrDie(FAST_BATCH, blobs[1]);
WriteStringOrDie(FAST_BATCH, blobs[2]);
storage_.reset();
CreateStorageTestOrDie(BuildStorageOptions());
WriteStringOrDie(FAST_BATCH, more_blobs[0]);
WriteStringOrDie(FAST_BATCH, more_blobs[1]);
WriteStringOrDie(FAST_BATCH, more_blobs[2]);
}
TEST_F(StorageTest, WriteIntoNewStorageAndUpload) { TEST_F(StorageTest, WriteIntoNewStorageAndUpload) {
CreateStorageTestOrDie(BuildStorageOptions()); CreateStorageTestOrDie(BuildStorageOptions());
WriteStringOrDie(FAST_BATCH, blobs[0]); WriteStringOrDie(FAST_BATCH, blobs[0]);
...@@ -252,6 +267,36 @@ TEST_F(StorageTest, WriteIntoNewStorageAndUpload) { ...@@ -252,6 +267,36 @@ TEST_F(StorageTest, WriteIntoNewStorageAndUpload) {
task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1)); task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1));
} }
TEST_F(StorageTest, WriteIntoNewStorageReopenWriteMoreAndUpload) {
CreateStorageTestOrDie(BuildStorageOptions());
WriteStringOrDie(FAST_BATCH, blobs[0]);
WriteStringOrDie(FAST_BATCH, blobs[1]);
WriteStringOrDie(FAST_BATCH, blobs[2]);
storage_.reset();
CreateStorageTestOrDie(BuildStorageOptions());
WriteStringOrDie(FAST_BATCH, more_blobs[0]);
WriteStringOrDie(FAST_BATCH, more_blobs[1]);
WriteStringOrDie(FAST_BATCH, more_blobs[2]);
// Set uploader expectations.
EXPECT_CALL(set_mock_uploader_expectations_, Call(Eq(FAST_BATCH), NotNull()))
.WillOnce(
Invoke([](Priority priority, MockUploadClient* mock_upload_client) {
MockUploadClient::SetUp(priority, mock_upload_client)
.Required(blobs[0])
.Required(blobs[1])
.Required(blobs[2])
.Required(more_blobs[0])
.Required(more_blobs[1])
.Required(more_blobs[2]);
}));
// Trigger upload.
task_environment_.FastForwardBy(base::TimeDelta::FromSeconds(1));
}
TEST_F(StorageTest, WriteIntoNewStorageAndFlush) { TEST_F(StorageTest, WriteIntoNewStorageAndFlush) {
CreateStorageTestOrDie(BuildStorageOptions()); CreateStorageTestOrDie(BuildStorageOptions());
WriteStringOrDie(MANUAL_BATCH, blobs[0]); WriteStringOrDie(MANUAL_BATCH, blobs[0]);
...@@ -273,6 +318,37 @@ TEST_F(StorageTest, WriteIntoNewStorageAndFlush) { ...@@ -273,6 +318,37 @@ TEST_F(StorageTest, WriteIntoNewStorageAndFlush) {
EXPECT_OK(storage_->Flush(MANUAL_BATCH)); EXPECT_OK(storage_->Flush(MANUAL_BATCH));
} }
TEST_F(StorageTest, WriteIntoNewStorageReopenWriteMoreAndFlush) {
CreateStorageTestOrDie(BuildStorageOptions());
WriteStringOrDie(MANUAL_BATCH, blobs[0]);
WriteStringOrDie(MANUAL_BATCH, blobs[1]);
WriteStringOrDie(MANUAL_BATCH, blobs[2]);
storage_.reset();
CreateStorageTestOrDie(BuildStorageOptions());
WriteStringOrDie(MANUAL_BATCH, more_blobs[0]);
WriteStringOrDie(MANUAL_BATCH, more_blobs[1]);
WriteStringOrDie(MANUAL_BATCH, more_blobs[2]);
// Set uploader expectations.
EXPECT_CALL(set_mock_uploader_expectations_,
Call(Eq(MANUAL_BATCH), NotNull()))
.WillOnce(
Invoke([](Priority priority, MockUploadClient* mock_upload_client) {
MockUploadClient::SetUp(priority, mock_upload_client)
.Required(blobs[0])
.Required(blobs[1])
.Required(blobs[2])
.Required(more_blobs[0])
.Required(more_blobs[1])
.Required(more_blobs[2]);
}));
// Trigger upload.
EXPECT_OK(storage_->Flush(MANUAL_BATCH));
}
TEST_F(StorageTest, WriteAndRepeatedlyUploadWithConfirmations) { TEST_F(StorageTest, WriteAndRepeatedlyUploadWithConfirmations) {
CreateStorageTestOrDie(BuildStorageOptions()); CreateStorageTestOrDie(BuildStorageOptions());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment