Commit 58914197 authored by Nick Diego Yamane's avatar Nick Diego Yamane Committed by Commit Bot

Implement file descriptor watching API in MessagePumpGlib

The main user of GLib MessagePump implementation is Aura/X11, while Ozone-based
ports use mainly libevent version, which exposes WatchFileDescriptor API that
is used by higher level components through MessageLoop[Current]For{UI,IO}
interfaces. Currently, there's no such API in MessagePumpGlib.

The motivation here is to make MessageLoopCurrent API usage transparent,
regardless which message pump is used in UI thread. This benefits X11 migration
to Ozone on Linux desktop, which (just like Aura/X11) must integrate with Gtk,
so GLib-based loop in browser's main thread is a natural path.

So, this CL implements the aforementioned API in GLib MessageLoop following the
same design, assumptions and overall structure as its original libevent-based
version.

As a follow-up, https://crrev.com/c/1730972 refactors code using
MessagePumpLibevent::{FdWatchController,FdWatcher} directly, replacing them by
MessageLoopCurrent*ForUI counterparts as well as refactoring
X11EventSourceLibevent (which Implementation has been used as reference for
this CL) in the same direction, so in the end it has no libevent specific code
and the build-time glib/libevent switch (use_glib gn var) is possible.

Bug: 988094, 789065
Test: base_unittests (MessagePumpGLibFdWatchTest.*)
Change-Id: I843ab924d18c08b6384e0716bc36c4f5f8482fa3
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1731031
Commit-Queue: Nick Yamane <nickdiego@igalia.com>
Reviewed-by: default avatarkylechar <kylechar@chromium.org>
Cr-Commit-Position: refs/heads/master@{#685245}
parent 331d5842
......@@ -211,10 +211,10 @@ class BASE_EXPORT MessageLoopCurrentForUI : public MessageLoopCurrent {
MessageLoopCurrentForUI* operator->() { return this; }
#if defined(USE_OZONE) && !defined(OS_FUCHSIA) && !defined(OS_WIN)
// Please see MessagePumpLibevent for definition.
static_assert(std::is_same<MessagePumpForUI, MessagePumpLibevent>::value,
"MessageLoopCurrentForUI::WatchFileDescriptor is not supported "
"when MessagePumpForUI is not a MessagePumpLibevent.");
static_assert(
std::is_base_of<WatchableIOMessagePumpPosix, MessagePumpForUI>::value,
"MessageLoopCurrentForUI::WatchFileDescriptor is supported only"
"by MessagePumpLibevent and MessagePumpGlib implementations.");
bool WatchFileDescriptor(int fd,
bool persistent,
MessagePumpForUI::Mode mode,
......
......@@ -131,9 +131,9 @@ struct ThreadInfo {
// Used for accesing |thread_info|.
static LazyInstance<Lock>::Leaky thread_info_lock = LAZY_INSTANCE_INITIALIZER;
// If non-NULL it means a MessagePumpGlib exists and has been Run. This is
// If non-null it means a MessagePumpGlib exists and has been Run. This is
// destroyed when the MessagePump is destroyed.
ThreadInfo* thread_info = NULL;
ThreadInfo* thread_info = nullptr;
void CheckThread(MessagePumpGlib* pump) {
AutoLock auto_lock(thread_info_lock.Get());
......@@ -151,12 +151,38 @@ void PumpDestroyed(MessagePumpGlib* pump) {
AutoLock auto_lock(thread_info_lock.Get());
if (thread_info && thread_info->pump == pump) {
delete thread_info;
thread_info = NULL;
thread_info = nullptr;
}
}
#endif
struct FdWatchSource : public GSource {
MessagePumpGlib* pump;
MessagePumpGlib::FdWatchController* controller;
};
gboolean FdWatchSourcePrepare(GSource* source, gint* timeout_ms) {
*timeout_ms = -1;
return FALSE;
}
gboolean FdWatchSourceCheck(GSource* gsource) {
auto* source = static_cast<FdWatchSource*>(gsource);
return source->pump->HandleFdWatchCheck(source->controller) ? TRUE : FALSE;
}
gboolean FdWatchSourceDispatch(GSource* gsource,
GSourceFunc unused_func,
gpointer unused_data) {
auto* source = static_cast<FdWatchSource*>(gsource);
source->pump->HandleFdWatchDispatch(source->controller);
return TRUE;
}
GSourceFuncs g_fd_watch_source_funcs = {
FdWatchSourcePrepare, FdWatchSourceCheck, FdWatchSourceDispatch, nullptr};
} // namespace
struct MessagePumpGlib::RunState {
......@@ -209,6 +235,116 @@ MessagePumpGlib::~MessagePumpGlib() {
close(wakeup_pipe_write_);
}
MessagePumpGlib::FdWatchController::FdWatchController(const Location& location)
: FdWatchControllerInterface(location) {}
MessagePumpGlib::FdWatchController::~FdWatchController() {
if (IsInitialized()) {
CHECK(StopWatchingFileDescriptor());
}
if (was_destroyed_) {
DCHECK(!*was_destroyed_);
*was_destroyed_ = true;
}
}
bool MessagePumpGlib::FdWatchController::StopWatchingFileDescriptor() {
if (!IsInitialized())
return false;
g_source_destroy(source_);
g_source_unref(source_);
source_ = nullptr;
watcher_ = nullptr;
return true;
}
bool MessagePumpGlib::FdWatchController::IsInitialized() const {
return !!source_;
}
bool MessagePumpGlib::FdWatchController::InitOrUpdate(int fd,
int mode,
FdWatcher* watcher) {
gushort event_flags = 0;
if (mode & WATCH_READ) {
event_flags |= G_IO_IN;
}
if (mode & WATCH_WRITE) {
event_flags |= G_IO_OUT;
}
if (!IsInitialized()) {
poll_fd_ = std::make_unique<GPollFD>();
poll_fd_->fd = fd;
} else {
if (poll_fd_->fd != fd)
return false;
// Combine old/new event masks.
event_flags |= poll_fd_->events;
// Destroy previous source
bool stopped = StopWatchingFileDescriptor();
DCHECK(stopped);
}
poll_fd_->events = event_flags;
poll_fd_->revents = 0;
source_ = g_source_new(&g_fd_watch_source_funcs, sizeof(FdWatchSource));
DCHECK(source_);
g_source_add_poll(source_, poll_fd_.get());
g_source_set_can_recurse(source_, FALSE);
g_source_set_callback(source_, nullptr, nullptr, nullptr);
watcher_ = watcher;
return true;
}
bool MessagePumpGlib::FdWatchController::Attach(MessagePumpGlib* pump) {
DCHECK(pump);
if (!IsInitialized()) {
return false;
}
auto* source = static_cast<FdWatchSource*>(source_);
source->controller = this;
source->pump = pump;
g_source_attach(source_, pump->context_);
return true;
}
void MessagePumpGlib::FdWatchController::NotifyCanRead() {
if (!watcher_)
return;
DCHECK(poll_fd_);
watcher_->OnFileCanReadWithoutBlocking(poll_fd_->fd);
}
void MessagePumpGlib::FdWatchController::NotifyCanWrite() {
if (!watcher_)
return;
DCHECK(poll_fd_);
watcher_->OnFileCanWriteWithoutBlocking(poll_fd_->fd);
}
bool MessagePumpGlib::WatchFileDescriptor(int fd,
bool persistent,
int mode,
FdWatchController* controller,
FdWatcher* watcher) {
DCHECK_GE(fd, 0);
DCHECK(controller);
DCHECK(watcher);
DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
// WatchFileDescriptor should be called on the pump thread. It is not
// threadsafe, so the watcher may never be registered.
DCHECK_CALLED_ON_VALID_THREAD(watch_fd_caller_checker_);
if (!controller->InitOrUpdate(fd, mode, watcher)) {
DPLOG(ERROR) << "FdWatchController init failed (fd=" << fd << ")";
return false;
}
return controller->Attach(this);
}
// Return the timeout we want passed to poll.
int MessagePumpGlib::HandlePrepare() {
// |state_| may be null during tests.
......@@ -327,6 +463,33 @@ void MessagePumpGlib::ScheduleDelayedWork(const TimeTicks& delayed_work_time) {
ScheduleWork();
}
bool MessagePumpGlib::HandleFdWatchCheck(FdWatchController* controller) {
DCHECK(controller);
gushort flags = controller->poll_fd_->revents;
return (flags & G_IO_IN) || (flags & G_IO_OUT);
}
void MessagePumpGlib::HandleFdWatchDispatch(FdWatchController* controller) {
DCHECK(controller);
DCHECK(controller->poll_fd_);
gushort flags = controller->poll_fd_->revents;
if ((flags & G_IO_IN) && (flags & G_IO_OUT)) {
// Both callbacks will be called. It is necessary to check that
// |controller| is not destroyed.
bool controller_was_destroyed = false;
controller->was_destroyed_ = &controller_was_destroyed;
controller->NotifyCanWrite();
if (!controller_was_destroyed)
controller->NotifyCanRead();
if (!controller_was_destroyed)
controller->was_destroyed_ = nullptr;
} else if (flags & G_IO_IN) {
controller->NotifyCanRead();
} else if (flags & G_IO_OUT) {
controller->NotifyCanWrite();
}
}
bool MessagePumpGlib::ShouldQuit() const {
CHECK(state_);
return state_->should_quit;
......
......@@ -10,7 +10,9 @@
#include "base/base_export.h"
#include "base/macros.h"
#include "base/message_loop/message_pump.h"
#include "base/message_loop/watchable_io_message_pump_posix.h"
#include "base/observer_list.h"
#include "base/threading/thread_checker.h"
#include "base/time/time.h"
typedef struct _GMainContext GMainContext;
......@@ -21,17 +23,70 @@ namespace base {
// This class implements a base MessagePump needed for TYPE_UI MessageLoops on
// platforms using GLib.
class BASE_EXPORT MessagePumpGlib : public MessagePump {
class BASE_EXPORT MessagePumpGlib : public MessagePump,
public WatchableIOMessagePumpPosix {
public:
class FdWatchController : public FdWatchControllerInterface {
public:
explicit FdWatchController(const Location& from_here);
~FdWatchController() override;
// FdWatchControllerInterface:
bool StopWatchingFileDescriptor() override;
private:
friend class MessagePumpGlib;
friend class MessagePumpGLibFdWatchTest;
// FdWatchController instances can be reused (unless fd changes), so we
// need to keep track of initialization status and taking it into account
// when setting up a fd watching. Please refer to
// WatchableIOMessagePumpPosix docs for more details. This is called by
// WatchFileDescriptor() and sets up a GSource for the input parameters.
// The source is not attached here, so the events will not be fired until
// Attach() is called.
bool InitOrUpdate(int fd, int mode, FdWatcher* watcher);
// Returns the current initialization status.
bool IsInitialized() const;
// Tries to attach the internal GSource instance to the |pump|'s
// GMainContext, so IO events start to be dispatched. Returns false if
// |this| is not correctly initialized, otherwise returns true.
bool Attach(MessagePumpGlib* pump);
// Forward read and write events to |watcher_|. It is a no-op if watcher_
// is null, which can happen when controller is suddenly stopped through
// StopWatchingFileDescriptor().
void NotifyCanRead();
void NotifyCanWrite();
FdWatcher* watcher_ = nullptr;
GSource* source_ = nullptr;
std::unique_ptr<GPollFD> poll_fd_;
// If this pointer is non-null, the pointee is set to true in the
// destructor.
bool* was_destroyed_ = nullptr;
DISALLOW_COPY_AND_ASSIGN(FdWatchController);
};
MessagePumpGlib();
~MessagePumpGlib() override;
// Internal methods used for processing the pump callbacks. They are
// public for simplicity but should not be used directly. HandlePrepare
// is called during the prepare step of glib, and returns a timeout that
// will be passed to the poll. HandleCheck is called after the poll
// has completed, and returns whether or not HandleDispatch should be called.
// HandleDispatch is called if HandleCheck returned true.
// Part of WatchableIOMessagePumpPosix interface.
// Please refer to WatchableIOMessagePumpPosix docs for more details.
bool WatchFileDescriptor(int fd,
bool persistent,
int mode,
FdWatchController* controller,
FdWatcher* delegate);
// Internal methods used for processing the pump callbacks. They are public
// for simplicity but should not be used directly. HandlePrepare is called
// during the prepare step of glib, and returns a timeout that will be passed
// to the poll. HandleCheck is called after the poll has completed, and
// returns whether or not HandleDispatch should be called. HandleDispatch is
// called if HandleCheck returned true.
int HandlePrepare();
bool HandleCheck();
void HandleDispatch();
......@@ -42,6 +97,12 @@ class BASE_EXPORT MessagePumpGlib : public MessagePump {
void ScheduleWork() override;
void ScheduleDelayedWork(const TimeTicks& delayed_work_time) override;
// Internal methods used for processing the FdWatchSource callbacks. As for
// main pump callbacks, they are public for simplicity but should not be used
// directly.
bool HandleFdWatchCheck(FdWatchController* controller);
void HandleFdWatchDispatch(FdWatchController* controller);
private:
bool ShouldQuit() const;
......@@ -69,6 +130,8 @@ class BASE_EXPORT MessagePumpGlib : public MessagePump {
// Use a unique_ptr to avoid needing the definition of GPollFD in the header.
std::unique_ptr<GPollFD> wakeup_gpollfd_;
THREAD_CHECKER(watch_fd_caller_checker_);
DISALLOW_COPY_AND_ASSIGN(MessagePumpGlib);
};
......
......@@ -13,13 +13,17 @@
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/callback.h"
#include "base/files/file_util.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_current.h"
#include "base/message_loop/message_pump_type.h"
#include "base/posix/eintr_wrapper.h"
#include "base/run_loop.h"
#include "base/single_thread_task_runner.h"
#include "base/synchronization/waitable_event.h"
#include "base/synchronization/waitable_event_watcher.h"
#include "base/threading/thread.h"
#include "base/threading/thread_task_runner_handle.h"
#include "testing/gtest/include/gtest/gtest.h"
......@@ -544,4 +548,220 @@ TEST_F(MessagePumpGLibTest, TestGtkLoop) {
run_loop.Run();
}
// Tests for WatchFileDescriptor API
class MessagePumpGLibFdWatchTest : public testing::Test {
protected:
MessagePumpGLibFdWatchTest()
: io_thread_("MessagePumpGLibFdWatchTestIOThread") {}
~MessagePumpGLibFdWatchTest() override = default;
void SetUp() override {
Thread::Options options(MessagePumpType::IO, 0);
ASSERT_TRUE(io_thread_.StartWithOptions(options));
int ret = pipe(pipefds_);
ASSERT_EQ(0, ret);
}
void TearDown() override {
if (IGNORE_EINTR(close(pipefds_[0])) < 0)
PLOG(ERROR) << "close";
if (IGNORE_EINTR(close(pipefds_[1])) < 0)
PLOG(ERROR) << "close";
}
void WaitUntilIoThreadStarted() {
ASSERT_TRUE(io_thread_.WaitUntilThreadStarted());
}
scoped_refptr<SingleThreadTaskRunner> io_runner() const {
return io_thread_.task_runner();
}
void SimulateEvent(MessagePumpGlib* pump,
MessagePumpGlib::FdWatchController* controller) {
controller->poll_fd_->revents = G_IO_IN | G_IO_OUT;
pump->HandleFdWatchDispatch(controller);
}
int pipefds_[2];
private:
Thread io_thread_;
};
namespace {
class BaseWatcher : public MessagePumpGlib::FdWatcher {
public:
explicit BaseWatcher(MessagePumpGlib::FdWatchController* controller)
: controller_(controller) {
DCHECK(controller_);
}
~BaseWatcher() override = default;
// base:MessagePumpGlib::FdWatcher interface
void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
protected:
MessagePumpGlib::FdWatchController* controller_;
};
class DeleteWatcher : public BaseWatcher {
public:
explicit DeleteWatcher(
std::unique_ptr<MessagePumpGlib::FdWatchController> controller)
: BaseWatcher(controller.get()),
owned_controller_(std::move(controller)) {}
~DeleteWatcher() override { DCHECK(!controller_); }
void OnFileCanWriteWithoutBlocking(int /* fd */) override {
DCHECK(owned_controller_);
owned_controller_.reset();
controller_ = nullptr;
}
private:
std::unique_ptr<MessagePumpGlib::FdWatchController> owned_controller_;
};
class StopWatcher : public BaseWatcher {
public:
explicit StopWatcher(MessagePumpGlib::FdWatchController* controller)
: BaseWatcher(controller) {}
~StopWatcher() override = default;
void OnFileCanWriteWithoutBlocking(int /* fd */) override {
controller_->StopWatchingFileDescriptor();
}
};
void QuitMessageLoopAndStart(OnceClosure quit_closure) {
std::move(quit_closure).Run();
RunLoop runloop(RunLoop::Type::kNestableTasksAllowed);
ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, runloop.QuitClosure());
runloop.Run();
}
class NestedPumpWatcher : public MessagePumpGlib::FdWatcher {
public:
NestedPumpWatcher() = default;
~NestedPumpWatcher() override = default;
void OnFileCanReadWithoutBlocking(int /* fd */) override {
RunLoop runloop;
ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure()));
runloop.Run();
}
void OnFileCanWriteWithoutBlocking(int /* fd */) override {}
};
class QuitWatcher : public BaseWatcher {
public:
QuitWatcher(MessagePumpGlib::FdWatchController* controller,
base::OnceClosure quit_closure)
: BaseWatcher(controller), quit_closure_(std::move(quit_closure)) {}
void OnFileCanReadWithoutBlocking(int /* fd */) override {
if (quit_closure_)
std::move(quit_closure_).Run();
}
private:
base::OnceClosure quit_closure_;
};
void WriteFDWrapper(const int fd,
const char* buf,
int size,
WaitableEvent* event) {
ASSERT_TRUE(WriteFileDescriptor(fd, buf, size));
}
} // namespace
// Tests that MessagePumpGlib::FdWatcher::OnFileCanReadWithoutBlocking is not
// called for a READ_WRITE event, when the controller is destroyed in
// OnFileCanWriteWithoutBlocking callback.
TEST_F(MessagePumpGLibFdWatchTest, DeleteWatcher) {
auto pump = std::make_unique<MessagePumpGlib>();
auto controller_ptr =
std::make_unique<MessagePumpGlib::FdWatchController>(FROM_HERE);
auto* controller = controller_ptr.get();
DeleteWatcher watcher(std::move(controller_ptr));
pump->WatchFileDescriptor(pipefds_[1], false,
MessagePumpGlib::WATCH_READ_WRITE, controller,
&watcher);
SimulateEvent(pump.get(), controller);
}
// Tests that MessagePumpGlib::FdWatcher::OnFileCanReadWithoutBlocking is not
// called for a READ_WRITE event, when the watcher calls
// StopWatchingFileDescriptor in OnFileCanWriteWithoutBlocking callback.
TEST_F(MessagePumpGLibFdWatchTest, StopWatcher) {
std::unique_ptr<MessagePumpGlib> pump(new MessagePumpGlib);
MessagePumpGlib::FdWatchController controller(FROM_HERE);
StopWatcher watcher(&controller);
pump->WatchFileDescriptor(pipefds_[1], false,
MessagePumpGlib::WATCH_READ_WRITE, &controller,
&watcher);
SimulateEvent(pump.get(), &controller);
}
// Tests that FdWatcher works properly with nested loops.
TEST_F(MessagePumpGLibFdWatchTest, NestedPumpWatcher) {
MessageLoop loop(MessagePumpType::UI);
std::unique_ptr<MessagePumpGlib> pump(new MessagePumpGlib);
MessagePumpGlib::FdWatchController controller(FROM_HERE);
NestedPumpWatcher watcher;
pump->WatchFileDescriptor(pipefds_[1], false, MessagePumpGlib::WATCH_READ,
&controller, &watcher);
SimulateEvent(pump.get(), &controller);
}
// Tests that MessagePumpGlib quits immediately when it is quit from
// libevent's event_base_loop().
TEST_F(MessagePumpGLibFdWatchTest, QuitWatcher) {
auto pump_ptr = std::make_unique<MessagePumpGlib>();
MessagePumpGlib* pump = pump_ptr.get();
MessageLoop loop(std::move(pump_ptr));
RunLoop run_loop;
MessagePumpGlib::FdWatchController controller(FROM_HERE);
QuitWatcher delegate(&controller, run_loop.QuitClosure());
WaitableEvent event;
auto watcher = std::make_unique<WaitableEventWatcher>();
pump->WatchFileDescriptor(pipefds_[0], false, MessagePumpGlib::WATCH_READ,
&controller, &delegate);
// Make the IO thread wait for |event| before writing to pipefds[1].
const char buf = 0;
WaitableEventWatcher::EventCallback write_fd_task =
BindOnce(&WriteFDWrapper, pipefds_[1], &buf, 1);
io_runner()->PostTask(
FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching),
Unretained(watcher.get()), &event,
std::move(write_fd_task), io_runner()));
// Queue |event| to signal on |MessageLoopCurrentForUI::Get()|.
ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event)));
// Now run the MessageLoop.
run_loop.Run();
// StartWatching can move |watcher| to IO thread. Release on IO thread.
io_runner()->PostTask(FROM_HERE, BindOnce(&WaitableEventWatcher::StopWatching,
Owned(std::move(watcher))));
}
} // namespace base
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment