Commit bfe06737 authored by Min Qin's avatar Min Qin Committed by Commit Bot

Fix an issue that StreamActive() can be called by after all data is consumed

In DownloadFileImpl::RegisterAndActivateStream(), it registerers
a callback when data become available in the stream handle. Then it
starts reading the stream handle in a while loop. And the following
scenario could happen afterwards:
1. The SimpleWatcher object in StreamHandleInputStream thinks data
is available, and posted a callback to run DownloadFileImpl::StreamActive().
2. Before the callback gets to run, the while loop consumes all
the data in the data pipe. This hits the stream_handle_->stream.reset()
statement in StreamHandleInputStream::Read().
3. The callback is executed, and stream_handle_->stream->ReadData()
is called. And this causes chrome to crash.

This CL fixes the issue by registering the callback only when
data from the stream handle is not ready. By doing this, the callback
won't be scheduled when DownloadFileImpl is inside the StreamActive()
loop.

BUG=1009839

Change-Id: I9d90686ffea56fb61a9c29aa2a569618fcf6fbd3
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1832735Reviewed-by: default avatarXing Liu <xingliu@chromium.org>
Commit-Queue: Min Qin <qinmin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#703076}
parent 55fd8528
......@@ -566,9 +566,8 @@ void DownloadFileImpl::Resume() {
for (auto& stream : source_streams_) {
SourceStream* source_stream = stream.second.get();
if (!source_stream->is_finished()) {
ActivateStream(source_stream);
}
if (!source_stream->is_finished())
StreamActive(source_stream, MOJO_RESULT_OK);
}
}
......@@ -591,7 +590,6 @@ void DownloadFileImpl::StreamActive(SourceStream* source_stream,
DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_NONE;
base::TimeDelta delta(
base::TimeDelta::FromMilliseconds(kMaxTimeBlockingFileThreadMs));
// Take care of any file local activity required.
do {
state = source_stream->Read(&incoming_data, &incoming_data_size);
......@@ -649,6 +647,10 @@ void DownloadFileImpl::StreamActive(SourceStream* source_stream,
FROM_HERE, base::BindOnce(&DownloadFileImpl::StreamActive,
weak_factory_.GetWeakPtr(), source_stream,
MOJO_RESULT_OK));
} else if (state == InputStream::EMPTY && !should_terminate) {
source_stream->RegisterDataReadyCallback(
base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
source_stream));
}
if (state == InputStream::COMPLETE)
......@@ -731,13 +733,6 @@ void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) {
received_slice.offset, received_slice.received_bytes);
}
num_active_streams_++;
ActivateStream(source_stream);
}
void DownloadFileImpl::ActivateStream(SourceStream* source_stream) {
source_stream->RegisterDataReadyCallback(
base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
source_stream));
StreamActive(source_stream, MOJO_RESULT_OK);
}
......
......@@ -581,6 +581,9 @@ TEST_P(DownloadFileTestWithRename, RenameFileFinal) {
EXPECT_FALSE(base::PathExists(initial_path));
EXPECT_TRUE(base::PathExists(path_1));
EXPECT_CALL(*input_stream_, RegisterDataReadyCallback(_))
.Times(1)
.RetiresOnSaturation();
// Download the data.
const char* chunks1[] = {kTestData1, kTestData2};
AppendDataToFile(chunks1, 2);
......@@ -596,6 +599,9 @@ TEST_P(DownloadFileTestWithRename, RenameFileFinal) {
EXPECT_FALSE(base::PathExists(path_1));
EXPECT_TRUE(base::PathExists(path_2));
EXPECT_CALL(*input_stream_, RegisterDataReadyCallback(_))
.Times(1)
.RetiresOnSaturation();
const char* chunks2[] = {kTestData3};
AppendDataToFile(chunks2, 1);
......@@ -833,6 +839,9 @@ TEST_F(DownloadFileTest, StreamEmptySuccess) {
// Test that calling the sink_callback_ on an empty stream shouldn't
// do anything.
EXPECT_CALL(*input_stream_, RegisterDataReadyCallback(_))
.Times(1)
.RetiresOnSaturation();
AppendDataToFile(nullptr, 0);
// Finish the download this way and make sure we see it on the observer.
......@@ -984,8 +993,6 @@ TEST_F(DownloadFileTest, MultipleStreamsWrite) {
PrepareStream(&additional_streams_[0], stream_0_length, true, true,
kTestData7, 2);
EXPECT_CALL(*additional_streams_[0], RegisterDataReadyCallback(_))
.RetiresOnSaturation();
EXPECT_CALL(*(observer_.get()), MockDestinationCompleted(_, _));
// Activate the streams.
......@@ -1005,7 +1012,7 @@ TEST_F(DownloadFileTest, MultipleStreamsWrite) {
}
// 3 streams write to one sink, the second stream has a limited length.
TEST_F(DownloadFileTest, MutipleStreamsLimitedLength) {
TEST_F(DownloadFileTest, MultipleStreamsLimitedLength) {
int64_t stream_0_length = GetBuffersLength(kTestData6, 2);
// The second stream has a limited length and should be partially written
......@@ -1025,17 +1032,10 @@ TEST_F(DownloadFileTest, MutipleStreamsLimitedLength) {
PrepareStream(&additional_streams_[1], stream_0_length + stream_1_length,
true, true, kTestData6, 2);
EXPECT_CALL(*additional_streams_[0], RegisterDataReadyCallback(_))
.Times(1)
.RetiresOnSaturation();
EXPECT_CALL(*additional_streams_[0], ClearDataReadyCallback())
.Times(1)
.RetiresOnSaturation();
EXPECT_CALL(*additional_streams_[1], RegisterDataReadyCallback(_))
.RetiresOnSaturation();
EXPECT_CALL(*(observer_.get()), MockDestinationCompleted(_, _));
// Activate all the streams.
......@@ -1111,13 +1111,13 @@ TEST_F(DownloadFileTest, SecondStreamStartingOffsetAlreadyWritten) {
.InSequence(seq)
.WillOnce(Return(InputStream::EMPTY))
.RetiresOnSaturation();
EXPECT_CALL(*input_stream_, RegisterDataReadyCallback(_))
.Times(1)
.RetiresOnSaturation();
sink_callback_.Run(MOJO_RESULT_OK);
base::RunLoop().RunUntilIdle();
additional_streams_[0] = new StrictMock<MockInputStream>();
EXPECT_CALL(*additional_streams_[0], RegisterDataReadyCallback(_))
.WillRepeatedly(Invoke(this, &DownloadFileTest::RegisterCallback))
.RetiresOnSaturation();
EXPECT_CALL(*additional_streams_[0], ClearDataReadyCallback())
.WillRepeatedly(Invoke(this, &DownloadFileTest::ClearCallback))
.RetiresOnSaturation();
......@@ -1152,6 +1152,9 @@ TEST_F(DownloadFileTest, SecondStreamReadsOffsetWrittenByFirst) {
.InSequence(seq)
.WillOnce(Return(InputStream::EMPTY))
.RetiresOnSaturation();
EXPECT_CALL(*input_stream_, RegisterDataReadyCallback(_))
.Times(2)
.RetiresOnSaturation();
sink_callback_.Run(MOJO_RESULT_OK);
base::RunLoop().RunUntilIdle();
......
......@@ -45,6 +45,8 @@ void StreamHandleInputStream::RegisterDataReadyCallback(
const mojo::SimpleWatcher::ReadyCallback& callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (handle_watcher_) {
if (handle_watcher_->IsWatching())
ClearDataReadyCallback();
handle_watcher_->Watch(stream_handle_->stream.get(),
MOJO_HANDLE_SIGNAL_READABLE, callback);
}
......
......@@ -277,9 +277,6 @@ class COMPONENTS_DOWNLOAD_EXPORT DownloadFileImpl : public DownloadFile {
// Register callback and start to read data from the stream.
void RegisterAndActivateStream(SourceStream* source_stream);
// Helper method to activate a stream and start reading from it.
void ActivateStream(SourceStream* source_stream);
// Called when a stream completes.
void OnStreamCompleted(SourceStream* source_stream);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment