Commit af294a5a authored by Min Qin's avatar Min Qin Committed by Commit Bot

Allow each download stream to take some data for validation purpose

Currently for each download stream, the data read from the network
was immediately written to the disk.
This CL allows each SourceStream to read some data before the file
write offset for content validation purpose. And data will only be
written after all the initial validation data are consumed.
A new field called starting_file_write_offset_ is added to SourceStream
to identify this checkpoint for file validation.

BUG=965215

Change-Id: Ia46de1a588bc0c2fb4e6eb19e62ef794d706ad1d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1625461
Commit-Queue: Min Qin <qinmin@chromium.org>
Reviewed-by: default avatarXing Liu <xingliu@chromium.org>
Reviewed-by: default avatarScott Violet <sky@chromium.org>
Cr-Commit-Position: refs/heads/master@{#664030}
parent 918987af
......@@ -4,6 +4,7 @@
#include "components/download/public/common/download_file_impl.h"
#include <algorithm>
#include <string>
#include <utility>
......@@ -58,13 +59,24 @@ const int kUnknownContentLength = -1;
DownloadFileImpl::SourceStream::SourceStream(
int64_t offset,
int64_t length,
int64_t starting_file_write_offset,
std::unique_ptr<InputStream> stream)
: offset_(offset),
length_(length),
starting_file_write_offset_(starting_file_write_offset),
bytes_read_(0),
bytes_written_(0),
finished_(false),
index_(0u),
input_stream_(std::move(stream)) {}
input_stream_(std::move(stream)) {
CHECK_LE(offset_, starting_file_write_offset_);
CHECK_GE(offset_, 0);
DCHECK(length <= 0 || length >= starting_file_write_offset - offset)
<< "Not enough for content validation. offset = " << offset
<< ", length = " << length
<< " , starting_file_write_offset = " << starting_file_write_offset
<< ".";
}
DownloadFileImpl::SourceStream::~SourceStream() = default;
......@@ -72,19 +84,25 @@ void DownloadFileImpl::SourceStream::Initialize() {
input_stream_->Initialize();
}
void DownloadFileImpl::SourceStream::OnWriteBytesToDisk(int64_t bytes_write) {
bytes_written_ += bytes_write;
void DownloadFileImpl::SourceStream::OnBytesConsumed(int64_t bytes_read,
int64_t bytes_written) {
CHECK_GE(bytes_read, bytes_written);
bytes_read_ += bytes_read;
bytes_written_ += bytes_written;
}
void DownloadFileImpl::SourceStream::TruncateLengthWithWrittenDataBlock(
int64_t offset,
int64_t received_slice_offset,
int64_t bytes_written) {
DCHECK_GT(bytes_written, 0);
if (length_ == kNoBytesToWrite)
return;
if (offset <= offset_) {
if (offset + bytes_written > offset_) {
if (received_slice_offset <= starting_file_write_offset_) {
// If validation has completed, mark the stream as finished if the file
// write position already has data.
if (received_slice_offset + bytes_written > starting_file_write_offset_ &&
GetRemainingBytesToValidate() == 0) {
length_ = kNoBytesToWrite;
finished_ = true;
}
......@@ -92,8 +110,12 @@ void DownloadFileImpl::SourceStream::TruncateLengthWithWrittenDataBlock(
}
if (length_ == DownloadSaveInfo::kLengthFullContent ||
length_ > offset - offset_) {
length_ = offset - offset_;
(length_ > received_slice_offset - offset_ &&
length_ > starting_file_write_offset_ - offset_)) {
// Stream length should always include the validation data, unless the
// response is too short.
length_ =
std::max(received_slice_offset, starting_file_write_offset_) - offset_;
}
}
......@@ -123,6 +145,11 @@ InputStream::StreamState DownloadFileImpl::SourceStream::Read(
return input_stream_->Read(data, length);
}
size_t DownloadFileImpl::SourceStream::GetRemainingBytesToValidate() {
int64_t bytes_remaining = starting_file_write_offset_ - offset_ - bytes_read_;
return bytes_remaining < 0 ? 0 : bytes_remaining;
}
DownloadFileImpl::DownloadFileImpl(
std::unique_ptr<DownloadSaveInfo> save_info,
const base::FilePath& default_download_directory,
......@@ -149,7 +176,8 @@ DownloadFileImpl::DownloadFileImpl(
download_id);
source_streams_[save_info_->offset] = std::make_unique<SourceStream>(
save_info_->offset, save_info_->length, std::move(stream));
save_info_->offset, save_info_->length,
save_info_->GetStartingFileWriteOffset(), std::move(stream));
DETACH_FROM_SEQUENCE(sequence_checker_);
}
......@@ -184,7 +212,7 @@ void DownloadFileImpl::Initialize(
bytes_so_far += received_slice.received_bytes;
}
} else {
bytes_so_far = save_info_->offset;
bytes_so_far = save_info_->GetStartingFileWriteOffset();
}
int64_t bytes_wasted = 0;
DownloadInterruptReason reason = file_.Initialize(
......@@ -227,7 +255,7 @@ void DownloadFileImpl::AddInputStream(std::unique_ptr<InputStream> stream,
}
DCHECK(source_streams_.find(offset) == source_streams_.end());
source_streams_[offset] =
std::make_unique<SourceStream>(offset, length, std::move(stream));
std::make_unique<SourceStream>(offset, length, offset, std::move(stream));
OnSourceStreamAdded(source_streams_[offset].get());
}
......@@ -244,46 +272,73 @@ void DownloadFileImpl::OnSourceStreamAdded(SourceStream* source_stream) {
RegisterAndActivateStream(source_stream);
}
DownloadInterruptReason DownloadFileImpl::WriteDataToFile(int64_t offset,
const char* data,
size_t data_len) {
DownloadInterruptReason DownloadFileImpl::ValidateAndWriteDataToFile(
int64_t offset,
const char* data,
size_t bytes_to_validate,
size_t bytes_to_write) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
WillWriteToDisk(data_len);
return file_.WriteDataToFile(offset, data, data_len);
// Check if some of the data is for validation purpose.
if (bytes_to_validate > 0 &&
!file_.ValidateDataInFile(offset, data, bytes_to_validate)) {
return DOWNLOAD_INTERRUPT_REASON_FILE_HASH_MISMATCH;
}
// If there is no data to write, just return DOWNLOAD_INTERRUPT_REASON_NONE
// and read the next chunk.
if (bytes_to_write <= 0)
return DOWNLOAD_INTERRUPT_REASON_NONE;
// Write the remaining data to disk.
WillWriteToDisk(bytes_to_write);
return file_.WriteDataToFile(offset + bytes_to_validate,
data + bytes_to_validate, bytes_to_write);
}
bool DownloadFileImpl::CalculateBytesToWrite(SourceStream* source_stream,
size_t bytes_available_to_write,
size_t* bytes_to_validate,
size_t* bytes_to_write) {
*bytes_to_validate = 0;
if (source_stream->length() == kNoBytesToWrite) {
*bytes_to_write = 0;
return true;
}
// First calculate the number of bytes to validate.
*bytes_to_write = bytes_available_to_write;
size_t remaining_bytes_to_validate =
source_stream->GetRemainingBytesToValidate();
if (remaining_bytes_to_validate > 0) {
*bytes_to_validate =
std::min(remaining_bytes_to_validate, bytes_available_to_write);
*bytes_to_write -= *bytes_to_validate;
}
if (source_stream->length() != DownloadSaveInfo::kLengthFullContent &&
source_stream->bytes_read() +
static_cast<int64_t>(bytes_available_to_write) >
source_stream->length()) {
// Total bytes to consume is capped by the length of the stream.
int64_t bytes_to_consume =
source_stream->length() - source_stream->bytes_read();
// The validation data should always be streamed.
DCHECK_GE(bytes_to_consume, static_cast<int64_t>(*bytes_to_validate));
*bytes_to_write = bytes_to_consume - *bytes_to_validate;
return true;
}
// If a new slice finds that its target position has already been written,
// terminate the stream.
if (source_stream->bytes_written() == 0) {
// terminate the stream if there are no bytes to validate.
if (source_stream->bytes_written() == 0 && *bytes_to_write > 0) {
for (const auto& received_slice : received_slices_) {
if (received_slice.offset <= source_stream->offset() &&
if (received_slice.offset <=
source_stream->starting_file_write_offset() &&
received_slice.offset + received_slice.received_bytes >
source_stream->offset()) {
source_stream->starting_file_write_offset()) {
*bytes_to_write = 0;
return true;
}
}
}
if (source_stream->length() != DownloadSaveInfo::kLengthFullContent &&
source_stream->bytes_written() +
static_cast<int64_t>(bytes_available_to_write) >
source_stream->length()) {
// Write a partial buffer as the incoming data exceeds the length limit.
*bytes_to_write = source_stream->length() - source_stream->bytes_written();
return true;
}
*bytes_to_write = bytes_available_to_write;
return false;
}
......@@ -361,8 +416,9 @@ DownloadInterruptReason DownloadFileImpl::HandleStreamCompletionStatus(
DownloadInterruptReason reason = source_stream->GetCompletionStatus();
if (source_stream->length() == DownloadSaveInfo::kLengthFullContent &&
!received_slices_.empty() &&
(source_stream->offset() == received_slices_.back().offset +
received_slices_.back().received_bytes) &&
(source_stream->starting_file_write_offset() ==
received_slices_.back().offset +
received_slices_.back().received_bytes) &&
reason == DOWNLOAD_INTERRUPT_REASON_SERVER_NO_RANGE) {
// We are probably reaching the end of the stream, don't treat this
// as an error.
......@@ -508,6 +564,7 @@ void DownloadFileImpl::StreamActive(SourceStream* source_stream,
size_t incoming_data_size = 0;
size_t total_incoming_data_size = 0;
size_t num_buffers = 0;
size_t bytes_to_validate = 0;
size_t bytes_to_write = 0;
bool should_terminate = false;
InputStream::StreamState state(InputStream::EMPTY);
......@@ -524,24 +581,26 @@ void DownloadFileImpl::StreamActive(SourceStream* source_stream,
break;
case InputStream::HAS_DATA: {
++num_buffers;
should_terminate = CalculateBytesToWrite(
source_stream, incoming_data_size, &bytes_to_write);
should_terminate =
CalculateBytesToWrite(source_stream, incoming_data_size,
&bytes_to_validate, &bytes_to_write);
DCHECK_GE(incoming_data_size, bytes_to_write);
reason = WriteDataToFile(
source_stream->offset() + source_stream->bytes_written(),
incoming_data->data(), bytes_to_write);
reason = ValidateAndWriteDataToFile(
source_stream->offset() + source_stream->bytes_read(),
incoming_data->data(), bytes_to_validate, bytes_to_write);
bytes_seen_ += bytes_to_write;
total_incoming_data_size += bytes_to_write;
total_incoming_data_size += incoming_data_size;
if (reason == DOWNLOAD_INTERRUPT_REASON_NONE) {
int64_t prev_bytes_written = source_stream->bytes_written();
source_stream->OnWriteBytesToDisk(bytes_to_write);
source_stream->OnBytesConsumed(incoming_data_size, bytes_to_write);
if (!IsSparseFile())
break;
// If the write operation creates a new slice, add it to the
// |received_slices_| and update all the entries in
// |source_streams_|.
if (bytes_to_write > 0 && prev_bytes_written == 0) {
AddNewSlice(source_stream->offset(), bytes_to_write);
AddNewSlice(source_stream->starting_file_write_offset(),
bytes_to_write);
} else {
received_slices_[source_stream->index()].received_bytes +=
bytes_to_write;
......@@ -611,7 +670,7 @@ void DownloadFileImpl::NotifyObserver(SourceStream* source_stream,
}
SetPotentialFileLength(source_stream->offset() +
source_stream->bytes_written());
source_stream->bytes_read());
}
num_active_streams_--;
......@@ -698,10 +757,10 @@ void DownloadFileImpl::AddNewSlice(int64_t offset, int64_t length) {
// Update the index of exising SourceStreams.
for (auto& stream : source_streams_) {
SourceStream* source_stream = stream.second.get();
if (source_stream->offset() > offset) {
if (source_stream->starting_file_write_offset() > offset) {
if (slice_added && source_stream->bytes_written() > 0)
source_stream->set_index(source_stream->index() + 1);
} else if (source_stream->offset() == offset) {
} else if (source_stream->starting_file_write_offset() == offset) {
source_stream->set_index(index);
} else {
source_stream->TruncateLengthWithWrittenDataBlock(offset, length);
......@@ -736,26 +795,29 @@ void DownloadFileImpl::HandleStreamError(SourceStream* source_stream,
source_stream->set_finished(true);
num_active_streams_--;
// If previous stream has already written data at the starting offset of
// the error stream. The download can complete.
bool can_recover_from_error = (source_stream->length() == kNoBytesToWrite);
// See if the previous stream can download the full content.
// If the current stream has written some data, length of all preceding
// streams will be truncated.
if (IsSparseFile() && !can_recover_from_error) {
SourceStream* preceding_neighbor = FindPrecedingNeighbor(source_stream);
while (preceding_neighbor) {
if (CanRecoverFromError(source_stream, preceding_neighbor)) {
can_recover_from_error = true;
break;
}
bool can_recover_from_error = false;
if (reason != DOWNLOAD_INTERRUPT_REASON_FILE_HASH_MISMATCH) {
// If previous stream has already written data at the starting offset of
// the error stream. The download can complete.
can_recover_from_error = (source_stream->length() == kNoBytesToWrite);
// See if the previous stream can download the full content.
// If the current stream has written some data, length of all preceding
// streams will be truncated.
if (IsSparseFile() && !can_recover_from_error) {
SourceStream* preceding_neighbor = FindPrecedingNeighbor(source_stream);
while (preceding_neighbor) {
if (CanRecoverFromError(source_stream, preceding_neighbor)) {
can_recover_from_error = true;
break;
}
// If the neighbor cannot recover the error and it has already created
// a slice, just interrupt the download.
if (preceding_neighbor->bytes_written() > 0)
break;
preceding_neighbor = FindPrecedingNeighbor(preceding_neighbor);
// If the neighbor cannot recover the error and it has already created
// a slice, just interrupt the download.
if (preceding_neighbor->bytes_written() > 0)
break;
preceding_neighbor = FindPrecedingNeighbor(preceding_neighbor);
}
}
}
......@@ -784,8 +846,9 @@ DownloadFileImpl::SourceStream* DownloadFileImpl::FindPrecedingNeighbor(
int64_t max_preceding_offset = 0;
SourceStream* ret = nullptr;
for (auto& stream : source_streams_) {
int64_t offset = stream.second->offset();
if (offset < source_stream->offset() && offset >= max_preceding_offset) {
int64_t offset = stream.second->starting_file_write_offset();
if (offset < source_stream->starting_file_write_offset() &&
offset >= max_preceding_offset) {
ret = stream.second.get();
max_preceding_offset = offset;
}
......@@ -805,6 +868,9 @@ void DownloadFileImpl::DebugStates() const {
DVLOG(1) << "Total source stream count = " << source_streams_.size();
for (const auto& stream : source_streams_) {
DVLOG(1) << "Source stream, offset = " << stream.second->offset()
<< " , bytes_read = " << stream.second->bytes_read()
<< " , starting_file_write_offset = "
<< stream.second->starting_file_write_offset()
<< " , bytes_written = " << stream.second->bytes_written()
<< " , is_finished = " << stream.second->is_finished()
<< " , length = " << stream.second->length()
......
......@@ -192,15 +192,22 @@ class DownloadFileTest : public testing::Test {
closure.Run();
}
bool CreateDownloadFile(int offset, bool calculate_hash) {
return CreateDownloadFile(offset, 0, calculate_hash,
DownloadItem::ReceivedSlices());
bool CreateDownloadFile(bool calculate_hash) {
return CreateDownloadFile(0, calculate_hash, DownloadItem::ReceivedSlices(),
-1);
}
bool CreateDownloadFile(int offset,
int length,
bool CreateDownloadFile(int length,
bool calculate_hash,
const DownloadItem::ReceivedSlices& received_slices) {
return CreateDownloadFile(length, calculate_hash,
DownloadItem::ReceivedSlices(), -1);
}
bool CreateDownloadFile(int length,
bool calculate_hash,
const DownloadItem::ReceivedSlices& received_slices,
int file_offset) {
// There can be only one.
DCHECK(!download_file_);
......@@ -213,8 +220,23 @@ class DownloadFileTest : public testing::Test {
.RetiresOnSaturation();
std::unique_ptr<DownloadSaveInfo> save_info(new DownloadSaveInfo());
save_info->offset = offset;
// Fill the file by repeatedly copying |kTestData1| if |file_offset| is
// positive.
if (file_offset > 0) {
base::CreateTemporaryFileInDir(download_dir_.GetPath(),
&save_info->file_path);
int len = file_offset;
int data_len = strlen(kTestData1);
while (len > 0) {
int bytes_to_write = len > data_len ? data_len : len;
base::AppendToFile(save_info->file_path, kTestData1, bytes_to_write);
len -= bytes_to_write;
}
}
save_info->offset = 0;
save_info->length = length;
save_info->file_offset = file_offset;
download_file_.reset(new TestDownloadFileImpl(
std::move(save_info), download_dir_.GetPath(),
......@@ -462,6 +484,9 @@ class DownloadFileTest : public testing::Test {
int64_t bytes_;
int64_t bytes_per_sec_;
// Keep track of what data should be saved to the disk file.
std::string expected_data_;
private:
void SetRenameResult(const base::Closure& closure,
DownloadInterruptReason* reason_p,
......@@ -476,9 +501,6 @@ class DownloadFileTest : public testing::Test {
}
base::test::ScopedTaskEnvironment scoped_task_environment_;
// Keep track of what data should be saved to the disk file.
std::string expected_data_;
};
// DownloadFile::RenameAndAnnotate and DownloadFile::RenameAndUniquify have a
......@@ -531,7 +553,7 @@ const int DownloadFileTest::kDummyRequestId = 67;
// Rename the file before any data is downloaded, after some has, after it all
// has, and after it's closed.
TEST_P(DownloadFileTestWithRename, RenameFileFinal) {
ASSERT_TRUE(CreateDownloadFile(0, true));
ASSERT_TRUE(CreateDownloadFile(true));
base::FilePath initial_path(download_file_->FullPath());
EXPECT_TRUE(base::PathExists(initial_path));
base::FilePath path_1(initial_path.InsertBeforeExtensionASCII("_1"));
......@@ -601,7 +623,7 @@ TEST_P(DownloadFileTestWithRename, RenameFileFinal) {
// the above test because it only applies to RenameAndAnnotate().
// RenameAndUniquify() doesn't overwrite by design.
TEST_F(DownloadFileTest, RenameOverwrites) {
ASSERT_TRUE(CreateDownloadFile(0, true));
ASSERT_TRUE(CreateDownloadFile(true));
base::FilePath initial_path(download_file_->FullPath());
EXPECT_TRUE(base::PathExists(initial_path));
base::FilePath path_1(initial_path.InsertBeforeExtensionASCII("_1"));
......@@ -631,7 +653,7 @@ TEST_F(DownloadFileTest, RenameOverwrites) {
// DownloadFileTestWithRename test because this only applies to
// RenameAndUniquify().
TEST_F(DownloadFileTest, RenameUniquifies) {
ASSERT_TRUE(CreateDownloadFile(0, true));
ASSERT_TRUE(CreateDownloadFile(true));
base::FilePath initial_path(download_file_->FullPath());
EXPECT_TRUE(base::PathExists(initial_path));
base::FilePath path_1(initial_path.InsertBeforeExtensionASCII("_1"));
......@@ -654,7 +676,7 @@ TEST_F(DownloadFileTest, RenameUniquifies) {
// Test that RenameAndUniquify doesn't try to uniquify in the case where the
// target filename is the same as the current filename.
TEST_F(DownloadFileTest, RenameRecognizesSelfConflict) {
ASSERT_TRUE(CreateDownloadFile(0, true));
ASSERT_TRUE(CreateDownloadFile(true));
base::FilePath initial_path(download_file_->FullPath());
EXPECT_TRUE(base::PathExists(initial_path));
......@@ -671,7 +693,7 @@ TEST_F(DownloadFileTest, RenameRecognizesSelfConflict) {
// Test to make sure we get the proper error on failure.
TEST_P(DownloadFileTestWithRename, RenameError) {
ASSERT_TRUE(CreateDownloadFile(0, true));
ASSERT_TRUE(CreateDownloadFile(true));
base::FilePath initial_path(download_file_->FullPath());
// Create a subdirectory.
......@@ -726,7 +748,7 @@ void TestRenameCompletionCallback(const base::Closure& closure,
// base::MessageLoopCurrent::Get(). Each RunLoop processes that queue until it
// sees a QuitClosure() targeted at itself, at which point it stops processing.
TEST_P(DownloadFileTestWithRename, RenameWithErrorRetry) {
ASSERT_TRUE(CreateDownloadFile(0, true));
ASSERT_TRUE(CreateDownloadFile(true));
base::FilePath initial_path(download_file_->FullPath());
// Create a subdirectory.
......@@ -797,7 +819,7 @@ TEST_P(DownloadFileTestWithRename, RenameWithErrorRetry) {
// Various tests of the StreamActive method.
TEST_F(DownloadFileTest, StreamEmptySuccess) {
ASSERT_TRUE(CreateDownloadFile(0, true));
ASSERT_TRUE(CreateDownloadFile(true));
base::FilePath initial_path(download_file_->FullPath());
EXPECT_TRUE(base::PathExists(initial_path));
......@@ -813,7 +835,7 @@ TEST_F(DownloadFileTest, StreamEmptySuccess) {
}
TEST_F(DownloadFileTest, StreamEmptyError) {
ASSERT_TRUE(CreateDownloadFile(0, true));
ASSERT_TRUE(CreateDownloadFile(true));
base::FilePath initial_path(download_file_->FullPath());
EXPECT_TRUE(base::PathExists(initial_path));
......@@ -842,7 +864,7 @@ TEST_F(DownloadFileTest, StreamEmptyError) {
}
TEST_F(DownloadFileTest, StreamNonEmptySuccess) {
ASSERT_TRUE(CreateDownloadFile(0, true));
ASSERT_TRUE(CreateDownloadFile(true));
base::FilePath initial_path(download_file_->FullPath());
EXPECT_TRUE(base::PathExists(initial_path));
......@@ -858,7 +880,7 @@ TEST_F(DownloadFileTest, StreamNonEmptySuccess) {
}
TEST_F(DownloadFileTest, StreamNonEmptyError) {
ASSERT_TRUE(CreateDownloadFile(0, true));
ASSERT_TRUE(CreateDownloadFile(true));
base::FilePath initial_path(download_file_->FullPath());
EXPECT_TRUE(base::PathExists(initial_path));
......@@ -888,6 +910,58 @@ TEST_F(DownloadFileTest, StreamNonEmptyError) {
DestroyDownloadFile(0);
}
// Tests that if file content validation succeeds, all the remaining data will
// be writing to the file.
TEST_F(DownloadFileTest, FileContentValidationSuccess) {
int stream_length = strlen(kTestData1) * 2;
ASSERT_TRUE(CreateDownloadFile(
stream_length /* length */, true /* calculate_hash */,
DownloadItem::ReceivedSlices(), strlen(kTestData1) - 1));
base::FilePath initial_path(download_file_->FullPath());
EXPECT_TRUE(base::PathExists(initial_path));
const char* chunks1[] = {kTestData1, kTestData1};
::testing::Sequence s1;
SetupDataAppend(chunks1, 2 /* num_chunks */, input_stream_, s1);
SetupFinishStream(DOWNLOAD_INTERRUPT_REASON_NONE, input_stream_, s1);
EXPECT_CALL(*(observer_.get()), MockDestinationCompleted(_, _));
sink_callback_.Run(MOJO_RESULT_OK);
VerifyStreamAndSize();
base::RunLoop().RunUntilIdle();
DestroyDownloadFile(0);
}
// Tests that if file content validation fails, an error will occur and no data
// will be written.
TEST_F(DownloadFileTest, FileContentValidationFail) {
int file_length = strlen(kTestData2) - 1;
int stream_length = strlen(kTestData1) + strlen(kTestData2);
ASSERT_TRUE(CreateDownloadFile(stream_length /* length */,
true /* calculate_hash */,
DownloadItem::ReceivedSlices(), file_length));
base::FilePath initial_path(download_file_->FullPath());
EXPECT_TRUE(base::PathExists(initial_path));
std::string file_content = std::string(kTestData1, 0, file_length);
expected_data_ = file_content;
VerifyStreamAndSize();
const char* chunks1[] = {kTestData2, kTestData1};
::testing::Sequence s1;
// Only 1 chunk will be read, and it will generate an error after
// failing the validation.
SetupDataAppend(chunks1, 1 /* num_chunks */, input_stream_, s1);
EXPECT_CALL(*input_stream_, ClearDataReadyCallback());
EXPECT_CALL(*(observer_.get()),
MockDestinationError(DOWNLOAD_INTERRUPT_REASON_FILE_HASH_MISMATCH,
file_length, _));
sink_callback_.Run(MOJO_RESULT_OK);
base::RunLoop().RunUntilIdle();
expected_data_ = file_content;
DestroyDownloadFile(0);
}
// Tests for concurrent streams handling, used for parallel download.
//
// Activate both streams at the same time.
......@@ -895,7 +969,7 @@ TEST_F(DownloadFileTest, MultipleStreamsWrite) {
int64_t stream_0_length = GetBuffersLength(kTestData6, 2);
int64_t stream_1_length = GetBuffersLength(kTestData7, 2);
ASSERT_TRUE(CreateDownloadFile(0, stream_0_length, true,
ASSERT_TRUE(CreateDownloadFile(stream_0_length, true,
DownloadItem::ReceivedSlices()));
PrepareStream(&input_stream_, 0, false, true, kTestData6, 2);
......@@ -934,7 +1008,7 @@ TEST_F(DownloadFileTest, MutipleStreamsLimitedLength) {
// "Range:50-".
int64_t stream_2_length = GetBuffersLength(kTestData6, 2);
ASSERT_TRUE(CreateDownloadFile(0, stream_0_length, true,
ASSERT_TRUE(CreateDownloadFile(stream_0_length, true,
DownloadItem::ReceivedSlices()));
PrepareStream(&input_stream_, 0, false, true, kTestData6, 2);
......@@ -986,7 +1060,7 @@ TEST_F(DownloadFileTest, MutipleStreamsLimitedLength) {
TEST_F(DownloadFileTest, MultipleStreamsFirstStreamWriteAllData) {
int64_t stream_0_length = GetBuffersLength(kTestData8, 4);
ASSERT_TRUE(CreateDownloadFile(0, DownloadSaveInfo::kLengthFullContent, true,
ASSERT_TRUE(CreateDownloadFile(DownloadSaveInfo::kLengthFullContent, true,
DownloadItem::ReceivedSlices()));
PrepareStream(&input_stream_, 0, false, true, kTestData8, 4);
......@@ -1019,7 +1093,7 @@ TEST_F(DownloadFileTest, MultipleStreamsFirstStreamWriteAllData) {
TEST_F(DownloadFileTest, SecondStreamStartingOffsetAlreadyWritten) {
int64_t stream_0_length = GetBuffersLength(kTestData6, 2);
ASSERT_TRUE(CreateDownloadFile(0, stream_0_length, true,
ASSERT_TRUE(CreateDownloadFile(stream_0_length, true,
DownloadItem::ReceivedSlices()));
Sequence seq;
......
......@@ -102,7 +102,7 @@ bool CanRecoverFromError(
// the error stream.
if (error_stream->length() > 0) {
return error_stream->offset() + error_stream->length() <=
preceding_neighbor->offset() + preceding_neighbor->bytes_written();
preceding_neighbor->offset() + preceding_neighbor->bytes_read();
}
return false;
......
......@@ -43,7 +43,8 @@ class ParallelDownloadUtilsRecoverErrorTest
EXPECT_CALL(*input_stream_, GetCompletionStatus())
.WillRepeatedly(Return(DOWNLOAD_INTERRUPT_REASON_NONE));
return std::make_unique<DownloadFileImpl::SourceStream>(
offset, length, std::unique_ptr<MockInputStream>(input_stream_));
offset, length, offset,
std::unique_ptr<MockInputStream>(input_stream_));
}
protected:
......@@ -166,7 +167,7 @@ TEST_P(ParallelDownloadUtilsRecoverErrorTest,
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
// Even if it has written some data.
preceding_stream->OnWriteBytesToDisk(1000u);
preceding_stream->OnBytesConsumed(1000u, 1000u);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
// Now capped the length of preceding stream with different values.
......@@ -177,14 +178,15 @@ TEST_P(ParallelDownloadUtilsRecoverErrorTest,
preceding_stream->set_finished(false);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
preceding_stream->set_finished(true);
preceding_stream->OnWriteBytesToDisk(kErrorStreamOffset - preceding_offset);
int64_t bytes_consumed = kErrorStreamOffset - preceding_offset;
preceding_stream->OnBytesConsumed(bytes_consumed, bytes_consumed);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
// Inject an error results in failure, even if data written exceeds the first
// byte of error stream.
EXPECT_CALL(*input_stream_, GetCompletionStatus())
.WillRepeatedly(Return(DOWNLOAD_INTERRUPT_REASON_FILE_NO_SPACE));
preceding_stream->OnWriteBytesToDisk(1000u);
preceding_stream->OnBytesConsumed(1000u, 1000u);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
// Make preceding stream can reach the first byte of error stream.
......@@ -194,9 +196,9 @@ TEST_P(ParallelDownloadUtilsRecoverErrorTest,
preceding_stream->set_finished(false);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
preceding_stream->set_finished(true);
preceding_stream->OnWriteBytesToDisk(kErrorStreamOffset - preceding_offset);
preceding_stream->OnBytesConsumed(bytes_consumed, bytes_consumed);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
preceding_stream->OnWriteBytesToDisk(1);
preceding_stream->OnBytesConsumed(1, 1);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
// Preceding stream that never download data won't recover the error stream.
......@@ -229,11 +231,13 @@ TEST_P(ParallelDownloadUtilsRecoverErrorTest,
// Since the preceding stream can never reach the starting offset, for an
// unfinished stream, we rely on length instead of bytes written.
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
preceding_stream->OnWriteBytesToDisk(kErrorStreamOffset - preceding_offset);
int64_t bytes_consumed = kErrorStreamOffset - preceding_offset;
preceding_stream->OnBytesConsumed(bytes_consumed, bytes_consumed);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
preceding_stream->OnWriteBytesToDisk(kErrorStreamLength - 1);
preceding_stream->OnBytesConsumed(kErrorStreamLength - 1,
kErrorStreamLength - 1);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
preceding_stream->OnWriteBytesToDisk(1);
preceding_stream->OnBytesConsumed(1, 1);
// Create preceding stream that can reach the upper bound of error stream.
// Since it's unfinished, it potentially can take over error stream's work
......@@ -248,11 +252,12 @@ TEST_P(ParallelDownloadUtilsRecoverErrorTest,
// Finished preceding stream only checks data written.
preceding_stream = CreateSourceStream(preceding_offset, 1);
preceding_stream->set_finished(true);
preceding_stream->OnWriteBytesToDisk(kErrorStreamOffset - preceding_offset);
preceding_stream->OnBytesConsumed(bytes_consumed, bytes_consumed);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
preceding_stream->OnWriteBytesToDisk(kErrorStreamLength - 1);
preceding_stream->OnBytesConsumed(kErrorStreamLength - 1,
kErrorStreamLength - 1);
EXPECT_FALSE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
preceding_stream->OnWriteBytesToDisk(1);
preceding_stream->OnBytesConsumed(1, 1);
EXPECT_TRUE(CanRecoverFromError(error_stream.get(), preceding_stream.get()));
// Even if inject an error, since data written has cover the upper bound of
......
......@@ -99,17 +99,19 @@ class COMPONENTS_DOWNLOAD_EXPORT DownloadFileImpl : public DownloadFile {
public:
SourceStream(int64_t offset,
int64_t length,
int64_t starting_file_write_offset,
std::unique_ptr<InputStream> stream);
~SourceStream();
void Initialize();
// Called after successfully writing a buffer to disk.
void OnWriteBytesToDisk(int64_t bytes_write);
// Called after successfully reading and writing a buffer from stream.
void OnBytesConsumed(int64_t bytes_read, int64_t bytes_written);
// Given a data block that is already written, truncate the length of this
// object to avoid overwriting that block.
void TruncateLengthWithWrittenDataBlock(int64_t offset,
// object to avoid overwriting that block. Data used for validation purpose
// will not be truncated.
void TruncateLengthWithWrittenDataBlock(int64_t received_slice_offset,
int64_t bytes_written);
// Registers the callback that will be called when data is ready.
......@@ -136,8 +138,15 @@ class COMPONENTS_DOWNLOAD_EXPORT DownloadFileImpl : public DownloadFile {
InputStream::StreamState Read(scoped_refptr<net::IOBuffer>* data,
size_t* length);
// Returning the remaining bytes to validate.
size_t GetRemainingBytesToValidate();
int64_t offset() const { return offset_; }
int64_t length() const { return length_; }
int64_t starting_file_write_offset() const {
return starting_file_write_offset_;
}
int64_t bytes_read() const { return bytes_read_; }
int64_t bytes_written() const { return bytes_written_; }
bool is_finished() const { return finished_; }
void set_finished(bool finish) { finished_ = finish; }
......@@ -145,15 +154,24 @@ class COMPONENTS_DOWNLOAD_EXPORT DownloadFileImpl : public DownloadFile {
void set_index(size_t index) { index_ = index; }
private:
// Starting position for the stream to write to disk.
// Starting position of the stream, this is from the network response.
int64_t offset_;
// The maximum length to write to the disk. If set to 0, keep writing until
// the stream depletes.
int64_t length_;
// Number of bytes written to disk from the stream.
// Next write position is (|offset_| + |bytes_written_|).
// All the data received before this offset are used for validation purpose
// and will not be written to disk. This value should always be no less than
// |offset_|.
int64_t starting_file_write_offset_;
// Number of bytes read from the stream.
// Next read position is (|offset_| + |bytes_read_|).
int64_t bytes_read_;
// Number of bytes written to the disk. This does not include the bytes used
// for validation.
int64_t bytes_written_;
// If all the data read from the stream has been successfully written to
......@@ -172,11 +190,13 @@ class COMPONENTS_DOWNLOAD_EXPORT DownloadFileImpl : public DownloadFile {
protected:
// For test class overrides.
// Write data from the offset to the file.
// On OS level, it will seek to the |offset| and write from there.
virtual DownloadInterruptReason WriteDataToFile(int64_t offset,
const char* data,
size_t data_len);
// Validate the first |bytes_to_validate| bytes and write the next
// |bytes_to_write| bytes of data from the offset to the file.
virtual DownloadInterruptReason ValidateAndWriteDataToFile(
int64_t offset,
const char* data,
size_t bytes_to_validate,
size_t bytes_to_write);
virtual base::TimeDelta GetRetryDelayForFailedRename(int attempt_number);
......@@ -232,13 +252,14 @@ class COMPONENTS_DOWNLOAD_EXPORT DownloadFileImpl : public DownloadFile {
void WillWriteToDisk(size_t data_len);
// For a given SourceStream object and the bytes available to write, determine
// the actual number of bytes it can write to the disk. For parallel
// downloading, if the first disk IO writes to a location that is already
// written by another stream, the current stream should stop writing. Returns
// true if the stream can write no more data and should be finished, returns
// false otherwise.
// the number of bytes to validate and the number of bytes it can write to the
// disk. For parallel downloading, if the first disk IO writes to a location
// that is already written by another stream, the current stream should stop
// writing. Returns true if the stream can write no more data and should be
// finished, returns false otherwise.
bool CalculateBytesToWrite(SourceStream* source_stream,
size_t bytes_available_to_write,
size_t* bytes_to_validate,
size_t* bytes_to_write);
// Called when a new SourceStream object is added.
......
......@@ -15,4 +15,8 @@ DownloadSaveInfo::~DownloadSaveInfo() = default;
DownloadSaveInfo::DownloadSaveInfo(DownloadSaveInfo&& that) = default;
int64_t DownloadSaveInfo::GetStartingFileWriteOffset() {
return file_offset >= 0 ? file_offset : offset;
}
} // namespace download
......@@ -30,6 +30,8 @@ struct COMPONENTS_DOWNLOAD_EXPORT DownloadSaveInfo {
~DownloadSaveInfo();
DownloadSaveInfo(DownloadSaveInfo&& that);
int64_t GetStartingFileWriteOffset();
// If non-empty, contains the full target path of the download that has been
// determined prior to download initiation. This is considered to be a trusted
// path.
......@@ -42,9 +44,18 @@ struct COMPONENTS_DOWNLOAD_EXPORT DownloadSaveInfo {
// If valid, contains the source data stream for the file contents.
base::File file;
// The file offset at which to start the download.
// The offset sent to the server when requesting the download. During
// resumption, |offset| could be smaller than the downloaded content length.
// This is because download may request some data to validate whether the
// content has changed.
int64_t offset = 0;
// The file offset to start writing to disk. If this value is negative,
// download stream will be writing to the disk starting at |offset|.
// Otherwise, this value will be used. Data received before |file_offset| are
// used for validation purpose.
int64_t file_offset = -1;
// The number of the bytes to download from |offset|.
// Ask to retrieve segment of the download file when length is greater than 0.
// Request the rest of the file starting from |offset|, when length is
......
......@@ -52,9 +52,11 @@ class DownloadFileWithError : public download::DownloadFileImpl {
bool is_parallelizable) override;
// DownloadFile interface.
download::DownloadInterruptReason WriteDataToFile(int64_t offset,
const char* data,
size_t data_len) override;
download::DownloadInterruptReason ValidateAndWriteDataToFile(
int64_t offset,
const char* data,
size_t bytes_to_validate,
size_t bytes_to_write) override;
download::DownloadInterruptReason HandleStreamCompletionStatus(
SourceStream* source_stream) override;
......@@ -174,13 +176,15 @@ void DownloadFileWithError::Initialize(
received_slices, is_parallelizable);
}
download::DownloadInterruptReason DownloadFileWithError::WriteDataToFile(
int64_t offset,
const char* data,
size_t data_len) {
download::DownloadInterruptReason
DownloadFileWithError::ValidateAndWriteDataToFile(int64_t offset,
const char* data,
size_t bytes_to_validate,
size_t bytes_to_write) {
return ShouldReturnError(
TestFileErrorInjector::FILE_OPERATION_WRITE,
download::DownloadFileImpl::WriteDataToFile(offset, data, data_len));
download::DownloadFileImpl::ValidateAndWriteDataToFile(
offset, data, bytes_to_validate, bytes_to_write));
}
download::DownloadInterruptReason
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment