Commit 457f1988 authored by qinmin's avatar qinmin Committed by Commit bot

Move the logic to determine how much data can be written to another function

The logic to calculate how much data to write is getting bigger,
move to a separate function.
This CL also adds a check if the target location is already written.
If so, it will request the current stream to terminate.
Unit test will pass once https://codereview.chromium.org/2737033002/ lands

BUG=644352

Review-Url: https://codereview.chromium.org/2744793003
Cr-Commit-Position: refs/heads/master@{#457839}
parent e13fdde8
......@@ -156,6 +156,35 @@ DownloadInterruptReason DownloadFileImpl::WriteDataToFile(int64_t offset,
return file_.WriteDataToFile(offset, data, data_len);
}
bool DownloadFileImpl::CalculateBytesToWrite(SourceStream* source_stream,
size_t bytes_available_to_write,
size_t* bytes_to_write) {
// If a new slice finds that its target position has already been written,
// terminate the stream.
if (source_stream->bytes_written() == 0) {
for (const auto& received_slice : received_slices_) {
if (received_slice.offset <= source_stream->offset() &&
received_slice.offset + received_slice.received_bytes >
source_stream->offset()) {
*bytes_to_write = 0;
return true;
}
}
}
if (source_stream->length() != DownloadSaveInfo::kLengthFullContent &&
source_stream->bytes_written() +
static_cast<int64_t>(bytes_available_to_write) >=
source_stream->length()) {
// Write a partial buffer as the incoming data exceeds the length limit.
*bytes_to_write = source_stream->length() - source_stream->bytes_written();
return true;
}
*bytes_to_write = bytes_available_to_write;
return false;
}
void DownloadFileImpl::RenameAndUniquify(
const base::FilePath& full_path,
const RenameCompletionCallback& callback) {
......@@ -292,6 +321,7 @@ void DownloadFileImpl::StreamActive(SourceStream* source_stream) {
size_t incoming_data_size = 0;
size_t total_incoming_data_size = 0;
size_t num_buffers = 0;
size_t bytes_to_write = 0;
bool should_terminate = false;
ByteStreamReader::StreamState state(ByteStreamReader::STREAM_EMPTY);
DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_NONE;
......@@ -310,34 +340,28 @@ void DownloadFileImpl::StreamActive(SourceStream* source_stream) {
{
++num_buffers;
base::TimeTicks write_start(base::TimeTicks::Now());
// Stop the stream if it writes more bytes than expected.
if (source_stream->length() != DownloadSaveInfo::kLengthFullContent &&
source_stream->bytes_written() +
static_cast<int64_t>(incoming_data_size) >=
source_stream->length()) {
should_terminate = true;
incoming_data_size =
source_stream->length() - source_stream->bytes_written();
}
should_terminate = CalculateBytesToWrite(
source_stream, incoming_data_size, &bytes_to_write);
DCHECK_GE(incoming_data_size, bytes_to_write);
reason = WriteDataToFile(
source_stream->offset() + source_stream->bytes_written(),
incoming_data.get()->data(), incoming_data_size);
incoming_data.get()->data(), bytes_to_write);
disk_writes_time_ += (base::TimeTicks::Now() - write_start);
bytes_seen_ += incoming_data_size;
total_incoming_data_size += incoming_data_size;
bytes_seen_ += bytes_to_write;
total_incoming_data_size += bytes_to_write;
if (reason == DOWNLOAD_INTERRUPT_REASON_NONE) {
int64_t prev_bytes_written = source_stream->bytes_written();
source_stream->OnWriteBytesToDisk(incoming_data_size);
source_stream->OnWriteBytesToDisk(bytes_to_write);
if (!is_sparse_file_)
break;
// If the write operation creates a new slice, add it to the
// |received_slices_| and update all the entries in
// |source_streams_|.
if (incoming_data_size > 0 && prev_bytes_written == 0) {
AddNewSlice(source_stream->offset(), incoming_data_size);
if (bytes_to_write > 0 && prev_bytes_written == 0) {
AddNewSlice(source_stream->offset(), bytes_to_write);
} else {
received_slices_[source_stream->index()].received_bytes +=
incoming_data_size;
bytes_to_write;
}
}
}
......
......@@ -183,6 +183,16 @@ class CONTENT_EXPORT DownloadFileImpl : public DownloadFile {
// Called before the data is written to disk.
void WillWriteToDisk(size_t data_len);
// For a given SourceStream object and the bytes available to write, determine
// the actual number of bytes it can write to the disk. For parallel
// downloading, if the first disk IO writes to a location that is already
// written by another stream, the current stream should stop writing. Returns
// true if the stream can write no more data and should be finished, returns
// false otherwise.
bool CalculateBytesToWrite(SourceStream* source_stream,
size_t bytes_available_to_write,
size_t* bytes_to_write);
// Called when there's some activity on the byte stream that needs to be
// handled.
void StreamActive(SourceStream* source_stream);
......
......@@ -397,8 +397,10 @@ class DownloadFileTest : public testing::Test {
// Prepare two byte streams to write to the same file sink. If
// |first_stream_completes_early| is true, the first stream will complete
// before the second stream starts.
// before the second stream starts. If |first_stream_write_all_data| is true,
// the first stream will write all the data before the 2nd stream starts.
void PrepareMultipleStreams(bool first_stream_completes_early,
bool first_stream_write_all_data,
int64_t second_stream_length) {
// Create a sparse file.
ASSERT_TRUE(CreateDownloadFile(0, true, true));
......@@ -408,6 +410,7 @@ class DownloadFileTest : public testing::Test {
const char* stream_0_data[] = {kTestData1, kTestData2};
const char* stream_1_data[] = {kTestData4, kTestData5};
const char* all_data[] = {kTestData1, kTestData2, kTestData4, kTestData5};
size_t stream_1_offset = strlen(kTestData1) + strlen(kTestData2);
// Register second SourceStream entry for the second stream.
......@@ -424,8 +427,15 @@ class DownloadFileTest : public testing::Test {
::testing::Sequence s0;
::testing::Sequence s1;
SetupDataAppend(stream_1_data, 2, input_stream_1_, s1, stream_1_offset);
SetupDataAppend(stream_0_data, 2, input_stream_, s0, 0);
if (first_stream_write_all_data) {
SetupDataAppend(all_data, 4, input_stream_, s0, 0);
// The 2nd stream will abort after the first read
SetupDataAppend(stream_1_data, 1, input_stream_1_, s1, stream_1_offset);
} else {
SetupDataAppend(stream_0_data, 2, input_stream_, s0, 0);
SetupDataAppend(stream_1_data, 2, input_stream_1_, s1, stream_1_offset);
}
// If the first stream doesn't finish before the second stream starts
// writing, its length will be cut short by the second stream. So
// STREAM_COMPLETE will never get called.
......@@ -442,7 +452,7 @@ class DownloadFileTest : public testing::Test {
// The stream may terminate in the middle and less Read calls are expected.
// 3. GetStatus: Only called if the stream is completed and last Read call
// returns STREAM_COMPLETE.
if (second_stream_length == 0)
if (second_stream_length == 0 && !first_stream_write_all_data)
SetupFinishStream(DOWNLOAD_INTERRUPT_REASON_NONE, input_stream_1_, s1);
else
EXPECT_CALL(*input_stream_1_, RegisterCallback(_)).RetiresOnSaturation();
......@@ -912,7 +922,7 @@ TEST_F(DownloadFileTest, StreamNonEmptyError) {
//
// Activate both streams at the same time.
TEST_F(DownloadFileTest, MutipleStreamsWrite) {
PrepareMultipleStreams(false, 0);
PrepareMultipleStreams(false, false, 0);
EXPECT_CALL(*(observer_.get()), MockDestinationCompleted(_, _));
int64_t stream_0_length =
......@@ -936,7 +946,7 @@ TEST_F(DownloadFileTest, MutipleStreamsWrite) {
// Activate and deplete one stream, later add the second stream.
TEST_F(DownloadFileTest, MutipleStreamsOneStreamFirst) {
PrepareMultipleStreams(true, 0);
PrepareMultipleStreams(true, false, 0);
int64_t stream_0_length =
static_cast<int64_t>(strlen(kTestData1) + strlen(kTestData2));
......@@ -979,7 +989,7 @@ TEST_F(DownloadFileTest, MutipleStreamsLimitedLength) {
int64_t stream_0_length =
static_cast<int64_t>(strlen(kTestData1) + strlen(kTestData2));
int64_t stream_1_length = static_cast<int64_t>(strlen(kTestData4)) - 1;
PrepareMultipleStreams(false, stream_1_length);
PrepareMultipleStreams(false, false, stream_1_length);
download_file_->AddByteStream(
std::unique_ptr<MockByteStreamReader>(input_stream_1_), stream_0_length);
......@@ -1008,4 +1018,33 @@ TEST_F(DownloadFileTest, MutipleStreamsLimitedLength) {
DestroyDownloadFile(0, false);
}
// Two streams write to one sink, the first stream writes the whole file before
// the seconds stream was able to start
TEST_F(DownloadFileTest, MutipleStreamsFirstStreamWriteAllData) {
PrepareMultipleStreams(true, true, 0);
int64_t stream_0_length =
static_cast<int64_t>(strlen(kTestData1) + strlen(kTestData2) +
strlen(kTestData4) + strlen(kTestData5));
int64_t stream_1_length =
static_cast<int64_t>(strlen(kTestData4) + strlen(kTestData5));
sink_callback_.Run();
base::RunLoop().RunUntilIdle();
EXPECT_CALL(*(observer_.get()), MockDestinationCompleted(_, _));
download_file_->AddByteStream(
std::unique_ptr<MockByteStreamReader>(input_stream_1_),
stream_0_length - stream_1_length);
base::RunLoop().RunUntilIdle();
SourceStreamTestData stream_data_0(0, stream_0_length, true);
SourceStreamTestData stream_data_1(stream_0_length - stream_1_length, 0,
true);
VerifySourceStreamsStates(stream_data_0);
VerifySourceStreamsStates(stream_data_1);
EXPECT_EQ(stream_0_length, TotalBytesReceived());
DestroyDownloadFile(0);
}
} // namespace content
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment