Commit 66f866c4 authored by Sergey Ulanov's avatar Sergey Ulanov Committed by Commit Bot

[Fuchsia] Rewrite MessagePumpFuchsia based on AsyncDispatcher

Now MessagePumpFuchsia runs AsyncDispatcher to handle IO events. This will
allow to use async lib and libraries that depend on it (such as FIDL) on
UI and IO threads.

Bug: 831384
Cq-Include-Trybots: luci.chromium.try:android_optional_gpu_tests_rel;luci.chromium.try:linux_optional_gpu_tests_rel;luci.chromium.try:mac_optional_gpu_tests_rel;luci.chromium.try:win_optional_gpu_tests_rel
Change-Id: I981d18f9e07d25a5cd4f6006714af91557c0c795
Reviewed-on: https://chromium-review.googlesource.com/985648
Commit-Queue: Sergey Ulanov <sergeyu@chromium.org>
Reviewed-by: default avatarWez <wez@chromium.org>
Cr-Commit-Position: refs/heads/master@{#551399}
parent 2dbd34cb
......@@ -4,23 +4,39 @@
#include "base/message_loop/message_pump_fuchsia.h"
#include <fdio/private.h>
#include <zircon/status.h>
#include <zircon/syscalls.h>
#include "base/auto_reset.h"
#include "base/fuchsia/fuchsia_logging.h"
#include "base/logging.h"
namespace base {
MessagePumpFuchsia::ZxHandleWatchController::ZxHandleWatchController(
const Location& from_here)
: created_from_location_(from_here) {}
: async_wait_t({}), created_from_location_(from_here) {}
MessagePumpFuchsia::ZxHandleWatchController::~ZxHandleWatchController() {
if (!StopWatchingZxHandle())
NOTREACHED();
}
bool MessagePumpFuchsia::ZxHandleWatchController::WaitBegin() {
DCHECK(!handler);
async_wait_t::handler = &HandleSignal;
zx_status_t status = async_begin_wait(&weak_pump_->async_dispatcher_, this);
if (status != ZX_OK) {
ZX_DLOG(ERROR, status) << "async_begin_wait() failed";
async_wait_t::handler = nullptr;
return false;
}
return true;
}
bool MessagePumpFuchsia::ZxHandleWatchController::StopWatchingZxHandle() {
if (was_stopped_) {
DCHECK(!*was_stopped_);
......@@ -33,23 +49,64 @@ bool MessagePumpFuchsia::ZxHandleWatchController::StopWatchingZxHandle() {
was_stopped_ = nullptr;
}
if (!has_begun_)
return true;
has_begun_ = false;
// If the pump is gone then there is nothing to cancel.
if (!weak_pump_)
return true;
int result = zx_port_cancel(weak_pump_->port_.get(), handle_, wait_key());
DLOG_IF(ERROR, result != ZX_OK)
<< "zx_port_cancel(handle=" << handle_
<< ") failed: " << zx_status_get_string(result);
// |handler| is set when waiting for a signal.
if (!handler)
return true;
async_wait_t::handler = nullptr;
zx_status_t result = async_cancel_wait(&weak_pump_->async_dispatcher_, this);
ZX_DLOG_IF(ERROR, result != ZX_OK, result) << "async_cancel_wait failed";
return result == ZX_OK;
}
// static
void MessagePumpFuchsia::ZxHandleWatchController::HandleSignal(
async_t* async,
async_wait_t* wait,
zx_status_t status,
const zx_packet_signal_t* signal) {
if (status != ZX_OK) {
ZX_LOG(WARNING, status) << "async wait failed";
return;
}
ZxHandleWatchController* controller =
static_cast<ZxHandleWatchController*>(wait);
DCHECK_EQ(controller->handler, &HandleSignal);
controller->handler = nullptr;
// |signal| can include other spurious things, in particular, that an fd
// is writable, when we only asked to know when it was readable. In that
// case, we don't want to call both the CanWrite and CanRead callback,
// when the caller asked for only, for example, readable callbacks. So,
// mask with the events that we actually wanted to know about.
zx_signals_t signals = signal->trigger & signal->observed;
DCHECK_NE(0u, signals);
// In the case of a persistent Watch, the Watch may be stopped and
// potentially deleted by the caller within the callback, in which case
// |controller| should not be accessed again, and we mustn't continue the
// watch. We check for this with a bool on the stack, which the Watch
// receives a pointer to.
bool was_stopped = false;
controller->was_stopped_ = &was_stopped;
controller->watcher_->OnZxHandleSignalled(wait->object, signals);
if (was_stopped)
return;
controller->was_stopped_ = nullptr;
if (controller->persistent_)
controller->WaitBegin();
}
void MessagePumpFuchsia::FdWatchController::OnZxHandleSignalled(
zx_handle_t handle,
zx_signals_t signals) {
......@@ -79,6 +136,19 @@ MessagePumpFuchsia::FdWatchController::~FdWatchController() {
NOTREACHED();
}
bool MessagePumpFuchsia::FdWatchController::WaitBegin() {
// Refresh the |handle_| and |desired_signals_| from the mxio for the fd.
// Some types of fdio map read/write events to different signals depending on
// their current state, so we must do this every time we begin to wait.
__fdio_wait_begin(io_, desired_events_, &object, &trigger);
if (async_wait_t::object == ZX_HANDLE_INVALID) {
DLOG(ERROR) << "fdio_wait_begin failed";
return false;
}
return MessagePumpFuchsia::ZxHandleWatchController::WaitBegin();
}
bool MessagePumpFuchsia::FdWatchController::StopWatchingFileDescriptor() {
bool success = StopWatchingZxHandle();
if (io_) {
......@@ -88,11 +158,9 @@ bool MessagePumpFuchsia::FdWatchController::StopWatchingFileDescriptor() {
return success;
}
MessagePumpFuchsia::MessagePumpFuchsia() : weak_factory_(this) {
CHECK_EQ(ZX_OK, zx_port_create(0, port_.receive()));
}
MessagePumpFuchsia::MessagePumpFuchsia() : weak_factory_(this) {}
MessagePumpFuchsia::~MessagePumpFuchsia() {}
MessagePumpFuchsia::~MessagePumpFuchsia() = default;
bool MessagePumpFuchsia::WatchFileDescriptor(int fd,
bool persistent,
......@@ -138,19 +206,6 @@ bool MessagePumpFuchsia::WatchFileDescriptor(int fd,
controller);
}
bool MessagePumpFuchsia::FdWatchController::WaitBegin() {
// Refresh the |handle_| and |desired_signals_| from the mxio for the fd.
// Some types of mxio map read/write events to different signals depending on
// their current state, so we must do this every time we begin to wait.
__fdio_wait_begin(io_, desired_events_, &handle_, &desired_signals_);
if (handle_ == ZX_HANDLE_INVALID) {
DLOG(ERROR) << "fdio_wait_begin failed";
return false;
}
return MessagePumpFuchsia::ZxHandleWatchController::WaitBegin();
}
bool MessagePumpFuchsia::WatchZxHandle(zx_handle_t handle,
bool persistent,
zx_signals_t signals,
......@@ -160,15 +215,15 @@ bool MessagePumpFuchsia::WatchZxHandle(zx_handle_t handle,
DCHECK(controller);
DCHECK(delegate);
DCHECK(handle == ZX_HANDLE_INVALID ||
controller->handle_ == ZX_HANDLE_INVALID ||
handle == controller->handle_);
controller->async_wait_t::object == ZX_HANDLE_INVALID ||
handle == controller->async_wait_t::object);
if (!controller->StopWatchingZxHandle())
NOTREACHED();
controller->handle_ = handle;
controller->async_wait_t::object = handle;
controller->persistent_ = persistent;
controller->desired_signals_ = signals;
controller->async_wait_t::trigger = signals;
controller->watcher_ = delegate;
controller->weak_pump_ = weak_factory_.GetWeakPtr();
......@@ -176,85 +231,22 @@ bool MessagePumpFuchsia::WatchZxHandle(zx_handle_t handle,
return controller->WaitBegin();
}
bool MessagePumpFuchsia::ZxHandleWatchController::WaitBegin() {
DCHECK(!has_begun_);
zx_status_t status =
zx_object_wait_async(handle_, weak_pump_->port_.get(), wait_key(),
desired_signals_, ZX_WAIT_ASYNC_ONCE);
if (status != ZX_OK) {
DLOG(ERROR) << "zx_object_wait_async failed: "
<< zx_status_get_string(status)
<< " (port=" << weak_pump_->port_.get() << ")";
return false;
}
has_begun_ = true;
return true;
}
uint32_t MessagePumpFuchsia::ZxHandleWatchController::WaitEnd(
zx_signals_t signals) {
DCHECK(has_begun_);
has_begun_ = false;
// |signals| can include other spurious things, in particular, that an fd
// is writable, when we only asked to know when it was readable. In that
// case, we don't want to call both the CanWrite and CanRead callback,
// when the caller asked for only, for example, readable callbacks. So,
// mask with the events that we actually wanted to know about.
signals &= desired_signals_;
return signals;
}
bool MessagePumpFuchsia::HandleEvents(zx_time_t deadline) {
zx_port_packet_t packet;
const zx_status_t wait_status =
zx_port_wait(port_.get(), deadline, &packet, 0);
if (wait_status == ZX_ERR_TIMED_OUT)
return false;
if (wait_status != ZX_OK) {
NOTREACHED() << "unexpected wait status: "
<< zx_status_get_string(wait_status);
return false;
}
zx_status_t status = async_dispatcher_.DispatchOrWaitUntil(deadline);
switch (status) {
// Return true if some tasks or events were dispatched or if the dispatcher
// was stopped by ScheduleWork().
case ZX_OK:
case ZX_ERR_CANCELED:
return true;
case ZX_ERR_TIMED_OUT:
return false;
if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE) {
// A watched fd caused the wakeup via zx_object_wait_async().
DCHECK_EQ(ZX_OK, packet.status);
ZxHandleWatchController* controller =
reinterpret_cast<ZxHandleWatchController*>(
static_cast<uintptr_t>(packet.key));
DCHECK_NE(0u, packet.signal.trigger & packet.signal.observed);
zx_signals_t signals = controller->WaitEnd(packet.signal.observed);
// In the case of a persistent Watch, the Watch may be stopped and
// potentially deleted by the caller within the callback, in which case
// |controller| should not be accessed again, and we mustn't continue the
// watch. We check for this with a bool on the stack, which the Watch
// receives a pointer to.
bool controller_was_stopped = false;
controller->was_stopped_ = &controller_was_stopped;
controller->watcher_->OnZxHandleSignalled(controller->handle_, signals);
if (!controller_was_stopped) {
controller->was_stopped_ = nullptr;
if (controller->persistent_)
controller->WaitBegin();
}
} else {
// Wakeup caused by ScheduleWork().
DCHECK_EQ(ZX_PKT_TYPE_USER, packet.type);
default:
ZX_DLOG(DCHECK, status) << "unexpected wait status";
return false;
}
return true;
}
void MessagePumpFuchsia::Run(Delegate* delegate) {
......@@ -295,13 +287,9 @@ void MessagePumpFuchsia::Quit() {
}
void MessagePumpFuchsia::ScheduleWork() {
// Since this can be called on any thread, we need to ensure that our Run loop
// wakes up.
zx_port_packet_t packet = {};
packet.type = ZX_PKT_TYPE_USER;
zx_status_t status = zx_port_queue(port_.get(), &packet, 0);
DLOG_IF(ERROR, status != ZX_OK)
<< "zx_port_queue failed: " << status << " (port=" << port_.get() << ")";
// Stop AsyncDispatcher to let MessagePumpFuchsia::Run() handle message loop
// tasks.
async_dispatcher_.Stop();
}
void MessagePumpFuchsia::ScheduleDelayedWork(
......
......@@ -5,17 +5,18 @@
#ifndef BASE_MESSAGE_LOOP_MESSAGE_PUMP_FUCHSIA_H_
#define BASE_MESSAGE_LOOP_MESSAGE_PUMP_FUCHSIA_H_
#include <fdio/io.h>
#include <lib/async/wait.h>
#include "base/base_export.h"
#include "base/fuchsia/scoped_zx_handle.h"
#include "base/fuchsia/async_dispatcher.h"
#include "base/location.h"
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_pump.h"
#include "base/message_loop/watchable_io_message_pump_posix.h"
#include <fdio/io.h>
#include <fdio/private.h>
#include <zircon/syscalls/port.h>
typedef struct fdio fdio_t;
namespace base {
......@@ -33,7 +34,7 @@ class BASE_EXPORT MessagePumpFuchsia : public MessagePump,
};
// Manages an active watch on an zx_handle_t.
class ZxHandleWatchController {
class ZxHandleWatchController : public async_wait_t {
public:
explicit ZxHandleWatchController(const Location& from_here);
// Deleting the Controller implicitly calls StopWatchingZxHandle.
......@@ -46,6 +47,17 @@ class BASE_EXPORT MessagePumpFuchsia : public MessagePump,
const Location& created_from_location() { return created_from_location_; }
protected:
friend class MessagePumpFuchsia;
virtual bool WaitBegin();
static void HandleSignal(async_t* async,
async_wait_t* wait,
zx_status_t status,
const zx_packet_signal_t* signal);
const Location created_from_location_;
// This bool is used by the pump when invoking the ZxHandleWatcher callback,
// and by the FdHandleWatchController when invoking read & write callbacks,
// to cope with the possibility of the caller deleting the *Watcher within
......@@ -54,28 +66,8 @@ class BASE_EXPORT MessagePumpFuchsia : public MessagePump,
// to check the value on the stack to short-cut any post-callback work.
bool* was_stopped_ = nullptr;
protected:
friend class MessagePumpFuchsia;
// Start watching the handle.
virtual bool WaitBegin();
// Called by MessagePumpFuchsia when the handle is signalled. Accepts the
// set of signals that fired, and returns the intersection with those the
// caller is interested in.
zx_signals_t WaitEnd(zx_signals_t observed);
// Returns the key to use to uniquely identify this object's wait operation.
uint64_t wait_key() const {
return static_cast<uint64_t>(reinterpret_cast<uintptr_t>(this));
}
const Location created_from_location_;
// Set directly from the inputs to WatchFileDescriptor.
ZxHandleWatcher* watcher_ = nullptr;
zx_handle_t handle_ = ZX_HANDLE_INVALID;
zx_signals_t desired_signals_ = 0;
// Used to safely access resources owned by the associated message pump.
WeakPtr<MessagePumpFuchsia> weak_pump_;
......@@ -84,10 +76,6 @@ class BASE_EXPORT MessagePumpFuchsia : public MessagePump,
// after triggering.
bool persistent_ = false;
// Used to determine whether an asynchronous wait operation is active on
// this controller.
bool has_begun_ = false;
DISALLOW_COPY_AND_ASSIGN(ZxHandleWatchController);
};
......@@ -148,14 +136,14 @@ class BASE_EXPORT MessagePumpFuchsia : public MessagePump,
void ScheduleDelayedWork(const TimeTicks& delayed_work_time) override;
private:
// Handles IO events from the |port_|. Returns true if any events were
// received.
// Handles IO events by running |async_dispatcher_|. Returns true if any
// events were received or if ScheduleWork() was called.
bool HandleEvents(zx_time_t deadline);
// This flag is set to false when Run should return.
bool keep_running_ = true;
ScopedZxHandle port_;
AsyncDispatcher async_dispatcher_;
// The time at which we should call DoDelayedWork.
TimeTicks delayed_work_time_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment