Commit f9f6c1e9 authored by fdoray's avatar fdoray Committed by Commit bot

Add a new file descriptor watch API.

This API can be used from any task from which
SequencedTaskRunnerHandle::Get() returns a SequencedTaskRunner (i.e. in
particular, it can be used from SINGLE_THREADED and SEQUENCED tasks
running in base/task_scheduler). As opposed to the existing API which
is only available from single-threaded tasks running in a
MessageLoopForIO.

Threads that want to support this API must instantiate a
FileDescriptorWatcher. The constructor of FileDescriptorWatcher takes
as argument a MessageLoopForIO to use to watch file descriptors for
which callbacks are registered on the thread on which it is invoked. The
MessageLoopForIO does *not* have to run on that thread. When the
MessageLoopForIO detects that watched file descriptors are readable
and/or writable without blocking, it posts a task to run the callback on
the sequence on which it was registered.

Design doc:
https://docs.google.com/document/d/1F5CjON2JNtCtdEug3LOL8-avj188k_xOgqzdv288Q_s/edit?usp=sharing

Discussion on chromium-dev:
https://groups.google.com/a/chromium.org/d/msg/chromium-dev/voOAab4mV9A/S9xXdpPkBgAJ

BUG=645114

Review-Url: https://codereview.chromium.org/2332923004
Cr-Commit-Position: refs/heads/master@{#418853}
parent ea2402df
......@@ -316,6 +316,8 @@ component("base") {
"files/dir_reader_linux.h",
"files/dir_reader_posix.h",
"files/file.cc",
"files/file_descriptor_watcher_posix.cc",
"files/file_descriptor_watcher_posix.h",
"files/file_enumerator.cc",
"files/file_enumerator.h",
"files/file_enumerator_posix.cc",
......@@ -1126,6 +1128,8 @@ component("base") {
"debug/crash_logging.h",
"debug/stack_trace.cc",
"debug/stack_trace_posix.cc",
"files/file_descriptor_watcher_posix.cc",
"files/file_descriptor_watcher_posix.h",
"files/file_enumerator_posix.cc",
"files/file_proxy.cc",
"files/file_util_proxy.cc",
......@@ -1784,6 +1788,7 @@ test("base_unittests") {
"feature_list_unittest.cc",
"file_version_info_win_unittest.cc",
"files/dir_reader_posix_unittest.cc",
"files/file_descriptor_watcher_posix_unittest.cc",
"files/file_locking_unittest.cc",
"files/file_path_unittest.cc",
"files/file_path_watcher_unittest.cc",
......
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/files/file_descriptor_watcher_posix.h"
#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/threading/thread_checker.h"
#include "base/threading/thread_local.h"
namespace base {
namespace {
// MessageLoopForIO used to watch file descriptors for which callbacks are
// registered from a given thread.
LazyInstance<ThreadLocalPointer<MessageLoopForIO>>::Leaky
tls_message_loop_for_io = LAZY_INSTANCE_INITIALIZER;
} // namespace
FileDescriptorWatcher::Controller::~Controller() {
DCHECK(sequence_checker_.CalledOnValidSequence());
message_loop_for_io_task_runner_->DeleteSoon(FROM_HERE, watcher_.release());
// Since WeakPtrs are invalidated by the destructor, RunCallback() won't be
// invoked after this returns.
}
class FileDescriptorWatcher::Controller::Watcher
: public MessageLoopForIO::Watcher,
public MessageLoop::DestructionObserver {
public:
Watcher(WeakPtr<Controller> controller, MessageLoopForIO::Mode mode, int fd);
~Watcher() override;
void StartWatching();
private:
friend class FileDescriptorWatcher;
// MessageLoopForIO::Watcher:
void OnFileCanReadWithoutBlocking(int fd) override;
void OnFileCanWriteWithoutBlocking(int fd) override;
// MessageLoop::DestructionObserver:
void WillDestroyCurrentMessageLoop() override;
// Used to instruct the MessageLoopForIO to stop watching the file descriptor.
MessageLoopForIO::FileDescriptorWatcher file_descriptor_watcher_;
// Runs tasks on the sequence on which this was instantiated (i.e. the
// sequence on which the callback must run).
const scoped_refptr<SequencedTaskRunner> callback_task_runner_ =
SequencedTaskRunnerHandle::Get();
// The Controller that created this Watcher.
WeakPtr<Controller> controller_;
// Whether this Watcher is notified when |fd_| becomes readable or writable
// without blocking.
const MessageLoopForIO::Mode mode_;
// The watched file descriptor.
const int fd_;
// Except for the constructor, every method of this class must run on the same
// MessageLoopForIO thread.
ThreadChecker thread_checker_;
// Whether this Watcher was registered as a DestructionObserver on the
// MessageLoopForIO thread.
bool registered_as_destruction_observer_ = false;
DISALLOW_COPY_AND_ASSIGN(Watcher);
};
FileDescriptorWatcher::Controller::Watcher::Watcher(
WeakPtr<Controller> controller,
MessageLoopForIO::Mode mode,
int fd)
: controller_(controller), mode_(mode), fd_(fd) {
DCHECK(callback_task_runner_);
thread_checker_.DetachFromThread();
}
FileDescriptorWatcher::Controller::Watcher::~Watcher() {
DCHECK(thread_checker_.CalledOnValidThread());
MessageLoopForIO::current()->RemoveDestructionObserver(this);
}
void FileDescriptorWatcher::Controller::Watcher::StartWatching() {
DCHECK(thread_checker_.CalledOnValidThread());
MessageLoopForIO::current()->WatchFileDescriptor(
fd_, false, mode_, &file_descriptor_watcher_, this);
if (!registered_as_destruction_observer_) {
MessageLoopForIO::current()->AddDestructionObserver(this);
registered_as_destruction_observer_ = true;
}
}
void FileDescriptorWatcher::Controller::Watcher::OnFileCanReadWithoutBlocking(
int fd) {
DCHECK_EQ(fd_, fd);
DCHECK_EQ(MessageLoopForIO::WATCH_READ, mode_);
DCHECK(thread_checker_.CalledOnValidThread());
// Run the callback on the sequence on which the watch was initiated.
callback_task_runner_->PostTask(FROM_HERE,
Bind(&Controller::RunCallback, controller_));
}
void FileDescriptorWatcher::Controller::Watcher::OnFileCanWriteWithoutBlocking(
int fd) {
DCHECK_EQ(fd_, fd);
DCHECK_EQ(MessageLoopForIO::WATCH_WRITE, mode_);
DCHECK(thread_checker_.CalledOnValidThread());
// Run the callback on the sequence on which the watch was initiated.
callback_task_runner_->PostTask(FROM_HERE,
Bind(&Controller::RunCallback, controller_));
}
void FileDescriptorWatcher::Controller::Watcher::
WillDestroyCurrentMessageLoop() {
DCHECK(thread_checker_.CalledOnValidThread());
// A Watcher is owned by a Controller. When the Controller is deleted, it
// transfers ownership of the Watcher to a delete task posted to the
// MessageLoopForIO. If the MessageLoopForIO is deleted before the delete task
// runs, the following line takes care of deleting the Watcher.
delete this;
}
FileDescriptorWatcher::Controller::Controller(MessageLoopForIO::Mode mode,
int fd,
const Closure& callback)
: callback_(callback),
message_loop_for_io_task_runner_(
tls_message_loop_for_io.Get().Get()->task_runner()),
weak_factory_(this) {
DCHECK(!callback_.is_null());
DCHECK(message_loop_for_io_task_runner_);
watcher_ = MakeUnique<Watcher>(weak_factory_.GetWeakPtr(), mode, fd);
StartWatching();
}
void FileDescriptorWatcher::Controller::StartWatching() {
DCHECK(sequence_checker_.CalledOnValidSequence());
// It is safe to use Unretained() below because |watcher_| can only be deleted
// by a delete task posted to |message_loop_for_io_task_runner_| by this
// Controller's destructor. Since this delete task hasn't been posted yet, it
// can't run before the task posted below.
message_loop_for_io_task_runner_->PostTask(
FROM_HERE, Bind(&Watcher::StartWatching, Unretained(watcher_.get())));
}
void FileDescriptorWatcher::Controller::RunCallback() {
DCHECK(sequence_checker_.CalledOnValidSequence());
WeakPtr<Controller> weak_this = weak_factory_.GetWeakPtr();
callback_.Run();
// If |this| wasn't deleted, re-enable the watch.
if (weak_this)
StartWatching();
}
FileDescriptorWatcher::FileDescriptorWatcher(
MessageLoopForIO* message_loop_for_io) {
DCHECK(message_loop_for_io);
DCHECK(!tls_message_loop_for_io.Get().Get());
tls_message_loop_for_io.Get().Set(message_loop_for_io);
}
FileDescriptorWatcher::~FileDescriptorWatcher() {
tls_message_loop_for_io.Get().Set(nullptr);
}
std::unique_ptr<FileDescriptorWatcher::Controller>
FileDescriptorWatcher::WatchReadable(int fd, const Closure& callback) {
return WrapUnique(new Controller(MessageLoopForIO::WATCH_READ, fd, callback));
}
std::unique_ptr<FileDescriptorWatcher::Controller>
FileDescriptorWatcher::WatchWritable(int fd, const Closure& callback) {
return WrapUnique(
new Controller(MessageLoopForIO::WATCH_WRITE, fd, callback));
}
} // namespace base
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_FILES_FILE_DESCRIPTOR_WATCHER_POSIX_H_
#define BASE_FILES_FILE_DESCRIPTOR_WATCHER_POSIX_H_
#include <memory>
#include "base/base_export.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/sequence_checker.h"
namespace base {
class SingleThreadTaskRunner;
// The FileDescriptorWatcher API allows callbacks to be invoked when file
// descriptors are readable or writable without blocking.
class BASE_EXPORT FileDescriptorWatcher {
public:
// Instantiated and returned by WatchReadable() or WatchWritable(). The
// constructor registers a callback to be invoked when a file descriptor is
// readable or writable without blocking and the destructor unregisters it.
class Controller {
public:
// Unregisters the callback registered by the constructor.
~Controller();
private:
friend class FileDescriptorWatcher;
class Watcher;
// Registers |callback| to be invoked when |fd| is readable or writable
// without blocking (depending on |mode|).
Controller(MessageLoopForIO::Mode mode, int fd, const Closure& callback);
// Starts watching the file descriptor.
void StartWatching();
// Runs |callback_|.
void RunCallback();
// The callback to run when the watched file descriptor is readable or
// writable without blocking.
Closure callback_;
// TaskRunner associated with the MessageLoopForIO that watches the file
// descriptor.
const scoped_refptr<SingleThreadTaskRunner>
message_loop_for_io_task_runner_;
// Notified by the MessageLoopForIO associated with
// |message_loop_for_io_task_runner_| when the watched file descriptor is
// readable or writable without blocking. Posts a task to run RunCallback()
// on the sequence on which the Controller was instantiated. When the
// Controller is deleted, ownership of |watcher_| is transfered to a delete
// task posted to the MessageLoopForIO. This ensures that |watcher_| isn't
// deleted while it is being used by the MessageLoopForIO.
std::unique_ptr<Watcher> watcher_;
// Validates that the Controller is used on the sequence on which it was
// instantiated.
SequenceChecker sequence_checker_;
WeakPtrFactory<Controller> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(Controller);
};
// Registers |message_loop_for_io| to watch file descriptors for which
// callbacks are registered from the current thread via WatchReadable() or
// WatchWritable(). |message_loop_for_io| may run on another thread. The
// constructed FileDescriptorWatcher must not outlive |message_loop_for_io|.
FileDescriptorWatcher(MessageLoopForIO* message_loop_for_io);
~FileDescriptorWatcher();
// Registers |callback| to be invoked on the current sequence when |fd| is
// readable or writable without blocking. |callback| is unregistered when the
// returned Controller is deleted (deletion must happen on the current
// sequence). To call these methods, a FileDescriptorWatcher must have been
// instantiated on the current thread and SequencedTaskRunnerHandle::IsSet()
// must return true.
static std::unique_ptr<Controller> WatchReadable(int fd,
const Closure& callback);
static std::unique_ptr<Controller> WatchWritable(int fd,
const Closure& callback);
private:
DISALLOW_COPY_AND_ASSIGN(FileDescriptorWatcher);
};
} // namespace base
#endif // BASE_FILES_FILE_DESCRIPTOR_WATCHER_POSIX_H_
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/files/file_descriptor_watcher_posix.h"
#include <unistd.h>
#include <memory>
#include "base/bind.h"
#include "base/files/file_util.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/message_loop/message_loop.h"
#include "base/posix/eintr_wrapper.h"
#include "base/run_loop.h"
#include "base/test/test_timeouts.h"
#include "base/threading/platform_thread.h"
#include "base/threading/thread.h"
#include "base/threading/thread_checker_impl.h"
#include "build/build_config.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
namespace {
class Mock {
public:
Mock() = default;
MOCK_METHOD0(ReadableCallback, void());
MOCK_METHOD0(WritableCallback, void());
private:
DISALLOW_COPY_AND_ASSIGN(Mock);
};
enum class FileDescriptorWatcherTestType {
MESSAGE_LOOP_FOR_IO_ON_MAIN_THREAD,
MESSAGE_LOOP_FOR_IO_ON_OTHER_THREAD,
};
class FileDescriptorWatcherTest
: public testing::TestWithParam<FileDescriptorWatcherTestType> {
public:
FileDescriptorWatcherTest()
: message_loop_(GetParam() == FileDescriptorWatcherTestType::
MESSAGE_LOOP_FOR_IO_ON_MAIN_THREAD
? new MessageLoopForIO
: new MessageLoop),
other_thread_("FileDescriptorWatcherTest_OtherThread") {}
~FileDescriptorWatcherTest() override = default;
void SetUp() override {
ASSERT_EQ(0, pipe(pipe_fds_));
MessageLoop* message_loop_for_io;
if (GetParam() ==
FileDescriptorWatcherTestType::MESSAGE_LOOP_FOR_IO_ON_OTHER_THREAD) {
Thread::Options options;
options.message_loop_type = MessageLoop::TYPE_IO;
ASSERT_TRUE(other_thread_.StartWithOptions(options));
message_loop_for_io = other_thread_.message_loop();
} else {
message_loop_for_io = message_loop_.get();
}
ASSERT_TRUE(message_loop_for_io->IsType(MessageLoop::TYPE_IO));
file_descriptor_watcher_ = MakeUnique<FileDescriptorWatcher>(
static_cast<MessageLoopForIO*>(message_loop_for_io));
}
void TearDown() override {
if (GetParam() ==
FileDescriptorWatcherTestType::MESSAGE_LOOP_FOR_IO_ON_MAIN_THREAD &&
message_loop_) {
// Allow the delete task posted by the Controller's destructor to run.
base::RunLoop().RunUntilIdle();
}
EXPECT_EQ(0, IGNORE_EINTR(close(pipe_fds_[0])));
EXPECT_EQ(0, IGNORE_EINTR(close(pipe_fds_[1])));
}
protected:
int read_file_descriptor() const { return pipe_fds_[0]; }
int write_file_descriptor() const { return pipe_fds_[1]; }
// Waits for a short delay and run pending tasks.
void WaitAndRunPendingTasks() {
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
RunLoop().RunUntilIdle();
}
// Registers ReadableCallback() to be called on |mock_| when
// read_file_descriptor() is readable without blocking.
std::unique_ptr<FileDescriptorWatcher::Controller> WatchReadable() {
std::unique_ptr<FileDescriptorWatcher::Controller> controller =
FileDescriptorWatcher::WatchReadable(
read_file_descriptor(),
Bind(&Mock::ReadableCallback, Unretained(&mock_)));
EXPECT_TRUE(controller);
// Unless read_file_descriptor() was readable before the callback was
// registered, this shouldn't do anything.
WaitAndRunPendingTasks();
return controller;
}
// Registers WritableCallback() to be called on |mock_| when
// write_file_descriptor() is writable without blocking.
std::unique_ptr<FileDescriptorWatcher::Controller> WatchWritable() {
std::unique_ptr<FileDescriptorWatcher::Controller> controller =
FileDescriptorWatcher::WatchWritable(
read_file_descriptor(),
Bind(&Mock::WritableCallback, Unretained(&mock_)));
EXPECT_TRUE(controller);
return controller;
}
void WriteByte() {
constexpr char kByte = '!';
ASSERT_TRUE(
WriteFileDescriptor(write_file_descriptor(), &kByte, sizeof(kByte)));
}
void ReadByte() {
// This is always called as part of the WatchReadable() callback, which
// should run on the main thread.
EXPECT_TRUE(thread_checker_.CalledOnValidThread());
char buffer;
ASSERT_TRUE(ReadFromFD(read_file_descriptor(), &buffer, sizeof(buffer)));
}
// Mock on wich callbacks are invoked.
testing::StrictMock<Mock> mock_;
// MessageLoop bound to the main thread.
std::unique_ptr<MessageLoop> message_loop_;
// Thread running a MessageLoopForIO. Used when the test type is
// MESSAGE_LOOP_FOR_IO_ON_OTHER_THREAD.
Thread other_thread_;
private:
// Determines which MessageLoopForIO is used to watch file descriptors for
// which callbacks are registered on the main thread.
std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher_;
// Watched file descriptors.
int pipe_fds_[2];
// Used to verify that callbacks run on the thread on which they are
// registered.
ThreadCheckerImpl thread_checker_;
DISALLOW_COPY_AND_ASSIGN(FileDescriptorWatcherTest);
};
} // namespace
TEST_P(FileDescriptorWatcherTest, WatchWritable) {
auto controller = WatchWritable();
// On Mac and iOS, the write end of a newly created pipe is writable without
// blocking.
#if defined(OS_MACOSX)
RunLoop run_loop;
EXPECT_CALL(mock_, WritableCallback())
.WillOnce(testing::Invoke(&run_loop, &RunLoop::Quit));
run_loop.Run();
#endif // defined(OS_MACOSX)
}
TEST_P(FileDescriptorWatcherTest, WatchReadableOneByte) {
auto controller = WatchReadable();
// Write 1 byte to the pipe, making it readable without blocking. Expect one
// call to ReadableCallback() which will read 1 byte from the pipe.
WriteByte();
RunLoop run_loop;
EXPECT_CALL(mock_, ReadableCallback())
.WillOnce(testing::Invoke([this, &run_loop]() {
ReadByte();
run_loop.Quit();
}));
run_loop.Run();
testing::Mock::VerifyAndClear(&mock_);
// No more call to ReadableCallback() is expected.
WaitAndRunPendingTasks();
}
TEST_P(FileDescriptorWatcherTest, WatchReadableTwoBytes) {
auto controller = WatchReadable();
// Write 2 bytes to the pipe. Expect two calls to ReadableCallback() which
// will each read 1 byte from the pipe.
WriteByte();
WriteByte();
RunLoop run_loop;
EXPECT_CALL(mock_, ReadableCallback())
.WillOnce(testing::Invoke([this]() { ReadByte(); }))
.WillOnce(testing::Invoke([this, &run_loop]() {
ReadByte();
run_loop.Quit();
}));
run_loop.Run();
testing::Mock::VerifyAndClear(&mock_);
// No more call to ReadableCallback() is expected.
WaitAndRunPendingTasks();
}
TEST_P(FileDescriptorWatcherTest, WatchReadableByteWrittenFromCallback) {
auto controller = WatchReadable();
// Write 1 byte to the pipe. Expect one call to ReadableCallback() from which
// 1 byte is read and 1 byte is written to the pipe. Then, expect another call
// to ReadableCallback() from which the remaining byte is read from the pipe.
WriteByte();
RunLoop run_loop;
EXPECT_CALL(mock_, ReadableCallback())
.WillOnce(testing::Invoke([this]() {
ReadByte();
WriteByte();
}))
.WillOnce(testing::Invoke([this, &run_loop]() {
ReadByte();
run_loop.Quit();
}));
run_loop.Run();
testing::Mock::VerifyAndClear(&mock_);
// No more call to ReadableCallback() is expected.
WaitAndRunPendingTasks();
}
TEST_P(FileDescriptorWatcherTest, DeleteControllerFromCallback) {
auto controller = WatchReadable();
// Write 1 byte to the pipe. Expect one call to ReadableCallback() from which
// |controller| is deleted.
WriteByte();
RunLoop run_loop;
EXPECT_CALL(mock_, ReadableCallback())
.WillOnce(testing::Invoke([&run_loop, &controller]() {
controller = nullptr;
run_loop.Quit();
}));
run_loop.Run();
testing::Mock::VerifyAndClear(&mock_);
// Since |controller| has been deleted, no call to ReadableCallback() is
// expected even though the pipe is still readable without blocking.
WaitAndRunPendingTasks();
}
TEST_P(FileDescriptorWatcherTest,
DeleteControllerBeforeFileDescriptorReadable) {
auto controller = WatchReadable();
// Cancel the watch.
controller = nullptr;
// Write 1 byte to the pipe to make it readable without blocking.
WriteByte();
// No call to ReadableCallback() is expected.
WaitAndRunPendingTasks();
}
TEST_P(FileDescriptorWatcherTest, DeleteControllerAfterFileDescriptorReadable) {
auto controller = WatchReadable();
// Write 1 byte to the pipe to make it readable without blocking.
WriteByte();
// Cancel the watch.
controller = nullptr;
// No call to ReadableCallback() is expected.
WaitAndRunPendingTasks();
}
TEST_P(FileDescriptorWatcherTest, DeleteControllerAfterDeleteMessageLoopForIO) {
auto controller = WatchReadable();
// Delete the MessageLoopForIO.
if (GetParam() ==
FileDescriptorWatcherTestType::MESSAGE_LOOP_FOR_IO_ON_MAIN_THREAD) {
message_loop_ = nullptr;
} else {
other_thread_.Stop();
}
// Deleting |controller| shouldn't crash even though that causes a task to be
// posted to the MessageLoopForIO thread.
controller = nullptr;
}
INSTANTIATE_TEST_CASE_P(
MessageLoopForIOOnMainThread,
FileDescriptorWatcherTest,
::testing::Values(
FileDescriptorWatcherTestType::MESSAGE_LOOP_FOR_IO_ON_MAIN_THREAD));
INSTANTIATE_TEST_CASE_P(
MessageLoopForIOOnOtherThread,
FileDescriptorWatcherTest,
::testing::Values(
FileDescriptorWatcherTestType::MESSAGE_LOOP_FOR_IO_ON_OTHER_THREAD));
} // namespace base
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment