Commit 959eaea4 authored by ananta's avatar ananta Committed by Commit bot

The ReadFile API on Windows invoked by the FileStream::Context class which is...

The ReadFile API on Windows invoked by the FileStream::Context class which is used by URLRequestFileJob at times completes synchronously.

The FileStream::Context class is used by the URLRequestFileJob class on the IO thread. The intention here is to open
the file for overlapped IO and thus complete the read and writes asynchonously.
Turns out that there are cases where the ReadFile API completes synchronously which ends up blocking the IO thread.
http://support.microsoft.com/kb/156932

Fix for this is to post the ReadFile call to a worker pool and inform the caller about failures etc on the calling thread.

Changes in this patch are as below:-
1. The FileStream::Context::Read function posts the ReadFile call to the Worker thread pool. The entry point for this task is the static
   function ReadAsync.
   This function posts results back to the originating thread via the function ReadAsyncResult

2. Fixed the FileStreamTest.WriteRead test to not read the data in the completion callback of the stream
   Write function. This causes a DCHECK to fire in the MessageLoopForIO class that we are entering a nested
   loop which is a no no for that message pump type.

   This DCHECK fires because of the change on Windows to execute the ReadFile in a Worker thread pool.
   In any case this change is safe for all platforms.
   The written data is validated by reading it back in the function ValidateWrittenData in the TestWriteReadCompletionCallback class.

   We call this explicitly in the FileStreamTest.WriteRead test.

3. Fix the SyncableFileOperationRunnerTest.CopyAndMove test failures.

This test internally initiates a ReadFile call on Windows via the FileStream::Context class for the Copy and Move
test. It invokes Copy and Move and then calls MessageLoop::RunUntilIdle. This call basically runs the message loop
until all pending tasks and events have been processed. This works on Windows by fluke because the ReadFile call
which completes asynchronously posts IO completion tasks to the IO thread (current thread) which then get pulled out
by the RunUntilIdle call.

There could be cases where the ReadFile call does not post IO completion events before RunUntilIdle returns and we could
see the same failures.

Reason we see this with this patch is because ReadFile is now completed by a worker pool thread which basically guarantees
the above scenario. Fix is to use base::RunLoop::Run() and Quit the loop when the DidFinish callback is received.

I changed all places in the syncable_file_operation_runner_unittest.cc file to use base::RunLoop::Run() or
base::RunLoop::RunUntilIdle() as the MessageLoop equivalents are deprecated.

BUG=423948

Review URL: https://codereview.chromium.org/887863002

Cr-Commit-Position: refs/heads/master@{#314388}
parent ae133cc3
......@@ -10,6 +10,7 @@
#include "base/location.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/thread_task_runner_handle.h"
#include "chrome/browser/sync_file_system/local/canned_syncable_file_system.h"
#include "chrome/browser/sync_file_system/local/local_file_change_tracker.h"
......@@ -131,6 +132,7 @@ class SyncableFileOperationRunnerTest : public testing::Test {
SCOPED_TRACE(testing::Message() << location.ToString());
EXPECT_EQ(expect, status);
++callback_count_;
base::MessageLoop::current()->Quit();
}
bool CreateTempFile(base::FilePath* path) {
......@@ -170,13 +172,13 @@ TEST_F(SyncableFileOperationRunnerTest, SimpleQueue) {
file_system_.operation_runner()->Truncate(
URL(kFile), 1,
ExpectStatus(FROM_HERE, File::FILE_OK));
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().RunUntilIdle();
EXPECT_EQ(0, callback_count_);
// Read operations are not blocked (and are executed before queued ones).
file_system_.operation_runner()->FileExists(
URL(kFile), ExpectStatus(FROM_HERE, File::FILE_ERROR_NOT_FOUND));
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().RunUntilIdle();
EXPECT_EQ(1, callback_count_);
// End syncing (to enable write).
......@@ -184,14 +186,14 @@ TEST_F(SyncableFileOperationRunnerTest, SimpleQueue) {
ASSERT_TRUE(sync_status()->IsWritable(URL(kFile)));
ResetCallbackStatus();
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().RunUntilIdle();
EXPECT_EQ(2, callback_count_);
// Now the file must have been created and updated.
ResetCallbackStatus();
file_system_.operation_runner()->FileExists(
URL(kFile), ExpectStatus(FROM_HERE, File::FILE_OK));
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().RunUntilIdle();
EXPECT_EQ(1, callback_count_);
}
......@@ -211,13 +213,13 @@ TEST_F(SyncableFileOperationRunnerTest, WriteToParentAndChild) {
file_system_.operation_runner()->Remove(
URL(kParent), true /* recursive */,
ExpectStatus(FROM_HERE, File::FILE_OK));
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().RunUntilIdle();
EXPECT_EQ(0, callback_count_);
// Read operations are not blocked (and are executed before queued ones).
file_system_.operation_runner()->DirectoryExists(
URL(kDir), ExpectStatus(FROM_HERE, File::FILE_OK));
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().Run();
EXPECT_EQ(1, callback_count_);
// Writes to unrelated files must succeed as well.
......@@ -225,7 +227,7 @@ TEST_F(SyncableFileOperationRunnerTest, WriteToParentAndChild) {
file_system_.operation_runner()->CreateDirectory(
URL(kOther), false /* exclusive */, false /* recursive */,
ExpectStatus(FROM_HERE, File::FILE_OK));
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().Run();
EXPECT_EQ(1, callback_count_);
// End syncing (to enable write).
......@@ -233,7 +235,7 @@ TEST_F(SyncableFileOperationRunnerTest, WriteToParentAndChild) {
ASSERT_TRUE(sync_status()->IsWritable(URL(kDir)));
ResetCallbackStatus();
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().Run();
EXPECT_EQ(2, callback_count_);
}
......@@ -259,7 +261,7 @@ TEST_F(SyncableFileOperationRunnerTest, CopyAndMove) {
URL("dest-move"),
storage::FileSystemOperation::OPTION_NONE,
ExpectStatus(FROM_HERE, File::FILE_OK));
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().Run();
EXPECT_EQ(1, callback_count_);
// Only "dest-copy1" should exist.
......@@ -279,13 +281,13 @@ TEST_F(SyncableFileOperationRunnerTest, CopyAndMove) {
storage::FileSystemOperation::OPTION_NONE,
storage::FileSystemOperationRunner::CopyProgressCallback(),
ExpectStatus(FROM_HERE, File::FILE_OK));
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().RunUntilIdle();
EXPECT_EQ(0, callback_count_);
// Finish syncing the "dest-copy2" directory to unlock Copy.
sync_status()->EndSyncing(URL("dest-copy2"));
ResetCallbackStatus();
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().Run();
EXPECT_EQ(1, callback_count_);
// Now we should have "dest-copy2".
......@@ -295,7 +297,7 @@ TEST_F(SyncableFileOperationRunnerTest, CopyAndMove) {
// Finish syncing the kParent to unlock Move.
sync_status()->EndSyncing(URL(kParent));
ResetCallbackStatus();
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().Run();
EXPECT_EQ(1, callback_count_);
// Now we should have "dest-move".
......@@ -314,7 +316,7 @@ TEST_F(SyncableFileOperationRunnerTest, Write) {
file_system_.operation_runner()->Write(
&url_request_context_,
URL(kFile), blob.GetBlobDataHandle(), 0, GetWriteCallback(FROM_HERE));
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().RunUntilIdle();
EXPECT_EQ(0, callback_count_);
sync_status()->EndSyncing(URL(kFile));
......@@ -404,7 +406,7 @@ TEST_F(SyncableFileOperationRunnerTest, Cancel) {
URL(kFile), 10, ExpectStatus(FROM_HERE, File::FILE_OK));
file_system_.operation_runner()->Cancel(
id, ExpectStatus(FROM_HERE, File::FILE_ERROR_INVALID_OPERATION));
base::MessageLoop::current()->RunUntilIdle();
base::RunLoop().Run();
EXPECT_EQ(2, callback_count_);
}
......
......@@ -77,6 +77,12 @@ void FileStream::Context::Orphan() {
orphaned_ = true;
#if defined(OS_WIN)
// Clean up weak pointers here to ensure that they are destroyed on the
// same thread where they were created.
weak_ptr_factory_.InvalidateWeakPtrs();
#endif
if (!async_in_progress_) {
CloseAndDelete();
} else if (file_.IsValid()) {
......
......@@ -28,6 +28,7 @@
#define NET_BASE_FILE_STREAM_CONTEXT_H_
#include "base/files/file.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/move.h"
#include "base/task_runner.h"
......@@ -159,6 +160,35 @@ class FileStream::Context {
virtual void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
DWORD bytes_read,
DWORD error) override;
// The ReadFile call on Windows can execute synchonously at times.
// http://support.microsoft.com/kb/156932. This ends up blocking the calling
// thread which is undesirable. To avoid this we execute the ReadFile call
// on a worker thread.
// The |context| parameter is a weak pointer instance passed to the worker
// pool.
// The |file| parameter is the handle to the file being read.
// The |buf| parameter is the buffer where we want the ReadFile to read the
// data into.
// The |buf_len| parameter contains the number of bytes to be read.
// The |overlapped| parameter is a pointer to the OVERLAPPED structure being
// used.
// The |origin_thread_loop| is a MessageLoopProxy instance used to post tasks
// back to the originating thread.
static void ReadAsync(
const base::WeakPtr<FileStream::Context>& context,
HANDLE file,
scoped_refptr<net::IOBuffer> buf,
int buf_len,
OVERLAPPED* overlapped,
scoped_refptr<base::MessageLoopProxy> origin_thread_loop);
// This callback executes on the main calling thread. It informs the caller
// about the result of the ReadFile call.
// The |os_error| parameter contains the value of the last error returned by
// the ReadFile API.
void ReadAsyncResult(DWORD os_error);
#elif defined(OS_POSIX)
// ReadFileImpl() is a simple wrapper around read() that handles EINTR
// signals and calls RecordAndMapError() to map errno to net error codes.
......@@ -179,6 +209,8 @@ class FileStream::Context {
base::MessageLoopForIO::IOContext io_context_;
CompletionCallback callback_;
scoped_refptr<IOBuffer> in_flight_buf_;
// WeakPtrFactory for posting tasks back to |this|.
base::WeakPtrFactory<Context> weak_ptr_factory_;
#endif
DISALLOW_COPY_AND_ASSIGN(Context);
......
......@@ -8,9 +8,12 @@
#include "base/files/file_path.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/metrics/histogram.h"
#include "base/profiler/scoped_tracker.h"
#include "base/task_runner.h"
#include "base/threading/worker_pool.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
......@@ -37,7 +40,8 @@ FileStream::Context::Context(const scoped_refptr<base::TaskRunner>& task_runner)
: io_context_(),
async_in_progress_(false),
orphaned_(false),
task_runner_(task_runner) {
task_runner_(task_runner),
weak_ptr_factory_(this) {
io_context_.handler = this;
memset(&io_context_.overlapped, 0, sizeof(io_context_.overlapped));
}
......@@ -48,7 +52,8 @@ FileStream::Context::Context(base::File file,
file_(file.Pass()),
async_in_progress_(false),
orphaned_(false),
task_runner_(task_runner) {
task_runner_(task_runner),
weak_ptr_factory_(this) {
io_context_.handler = this;
memset(&io_context_.overlapped, 0, sizeof(io_context_.overlapped));
if (file_.IsValid()) {
......@@ -69,20 +74,16 @@ int FileStream::Context::Read(IOBuffer* buf,
DCHECK(!async_in_progress_);
DWORD bytes_read;
if (!ReadFile(file_.GetPlatformFile(), buf->data(), buf_len,
&bytes_read, &io_context_.overlapped)) {
IOResult error = IOResult::FromOSError(GetLastError());
if (error.os_error == ERROR_HANDLE_EOF)
return 0; // Report EOF by returning 0 bytes read.
if (error.os_error == ERROR_IO_PENDING)
IOCompletionIsPending(callback, buf);
else
LOG(WARNING) << "ReadFile failed: " << error.os_error;
return static_cast<int>(error.result);
}
IOCompletionIsPending(callback, buf);
base::WorkerPool::PostTask(
FROM_HERE,
base::Bind(&FileStream::Context::ReadAsync,
weak_ptr_factory_.GetWeakPtr(), file_.GetPlatformFile(),
make_scoped_refptr(buf), buf_len, &io_context_.overlapped,
base::MessageLoop::current()->message_loop_proxy()),
false);
return ERR_IO_PENDING;
}
......@@ -165,4 +166,34 @@ void FileStream::Context::OnIOCompleted(
temp_callback.Run(result);
}
// static
void FileStream::Context::ReadAsync(
const base::WeakPtr<FileStream::Context>& context,
HANDLE file,
scoped_refptr<net::IOBuffer> buf,
int buf_len,
OVERLAPPED* overlapped,
scoped_refptr<base::MessageLoopProxy> origin_thread_loop) {
DWORD bytes_read = 0;
if (!ReadFile(file, buf->data(), buf_len, &bytes_read, overlapped)) {
origin_thread_loop->PostTask(
FROM_HERE, base::Bind(&FileStream::Context::ReadAsyncResult, context,
::GetLastError()));
}
}
void FileStream::Context::ReadAsyncResult(DWORD os_error) {
IOResult error = IOResult::FromOSError(os_error);
if (error.os_error == ERROR_HANDLE_EOF) {
// Report EOF by returning 0 bytes read.
OnIOCompleted(&io_context_, 0, error.os_error);
} else if (error.os_error != ERROR_IO_PENDING) {
// We don't need to inform the caller about ERROR_PENDING_IO as that was
// already done when the ReadFile call was queued to the worker pool.
if (error.os_error)
LOG(WARNING) << "ReadFile failed: " << error.os_error;
OnIOCompleted(&io_context_, 0, error.os_error);
}
}
} // namespace net
......@@ -546,6 +546,25 @@ class TestWriteReadCompletionCallback {
const CompletionCallback& callback() const { return callback_; }
void ValidateWrittenData() {
TestCompletionCallback callback;
int rv = 0;
for (;;) {
scoped_refptr<IOBufferWithSize> buf = new IOBufferWithSize(4);
rv = stream_->Read(buf.get(), buf->size(), callback.callback());
if (rv == ERR_IO_PENDING) {
base::MessageLoop::ScopedNestableTaskAllower allow(
base::MessageLoop::current());
rv = callback.WaitForResult();
}
EXPECT_LE(0, rv);
if (rv <= 0)
break;
*total_bytes_read_ += rv;
data_read_->append(buf->data(), rv);
}
}
private:
void OnComplete(int result) {
DCHECK_LT(0, result);
......@@ -577,22 +596,6 @@ class TestWriteReadCompletionCallback {
base::MessageLoop::current());
EXPECT_LE(0, callback64.WaitForResult());
}
TestCompletionCallback callback;
for (;;) {
scoped_refptr<IOBufferWithSize> buf = new IOBufferWithSize(4);
rv = stream_->Read(buf.get(), buf->size(), callback.callback());
if (rv == ERR_IO_PENDING) {
base::MessageLoop::ScopedNestableTaskAllower allow(
base::MessageLoop::current());
rv = callback.WaitForResult();
}
EXPECT_LE(0, rv);
if (rv <= 0)
break;
*total_bytes_read_ += rv;
data_read_->append(buf->data(), rv);
}
}
result_ = *total_bytes_written_;
......@@ -646,6 +649,8 @@ TEST_F(FileStreamTest, WriteRead) {
EXPECT_LT(0, rv);
EXPECT_EQ(kTestDataSize, total_bytes_written);
callback.ValidateWrittenData();
stream.reset();
EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment