Commit da1adc70 authored by Sergey Ulanov's avatar Sergey Ulanov Committed by Commit Bot

[Fuchsia] Add AsyncDispatcher.

Added AsyncDispatcher class which implements async dispatcher interface
for Fuchsia. It will be used in MessagePumpFuchsia. This will allow to
run FIDL on Chromium threads.

The implementation is based largely on the default async loop. That code
has been translated to C++ with all features that we don't need in
chromium removed.

Bug: 831384
Change-Id: I76459d364e1a4864dbe6f76f582b9fb87c79e027
Reviewed-on: https://chromium-review.googlesource.com/1003073
Commit-Queue: Sergey Ulanov <sergeyu@chromium.org>
Reviewed-by: default avatarDaniel Cheng <dcheng@chromium.org>
Reviewed-by: default avatarWez <wez@chromium.org>
Cr-Commit-Position: refs/heads/master@{#550416}
parent 54d6cb59
......@@ -1318,6 +1318,8 @@ jumbo_component("base") {
"base_paths_fuchsia.h",
"debug/stack_trace_fuchsia.cc",
"files/file_path_watcher_fuchsia.cc",
"fuchsia/async_dispatcher.cc",
"fuchsia/async_dispatcher.h",
"fuchsia/default_job.cc",
"fuchsia/default_job.h",
"fuchsia/fuchsia_logging.cc",
......@@ -1345,6 +1347,9 @@ jumbo_component("base") {
]
libs = [ "launchpad" ]
public_deps += [ "//third_party/fuchsia-sdk:async" ]
deps += [ "//third_party/fuchsia-sdk:async" ]
}
# NaCl.
......@@ -2511,6 +2516,9 @@ test("base_unittests") {
"files/file_locking_unittest.cc",
"posix/unix_domain_socket_unittest.cc",
]
sources += [ "fuchsia/async_dispatcher_unittest.cc" ]
deps += [ "//third_party/fuchsia-sdk:async" ]
}
if (is_android) {
......
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/fuchsia/async_dispatcher.h"
#include <lib/async/task.h>
#include <lib/async/wait.h>
#include <zircon/syscalls.h>
#include "base/fuchsia/fuchsia_logging.h"
namespace base {
namespace {
template <typename T>
uintptr_t key_from_ptr(T* ptr) {
return reinterpret_cast<uintptr_t>(ptr);
};
} // namespace
class AsyncDispatcher::WaitState : public LinkNode<WaitState> {
public:
explicit WaitState(AsyncDispatcher* async_dispatcher) {
async_dispatcher->wait_list_.Append(this);
}
~WaitState() { RemoveFromList(); }
async_wait_t* wait() {
// WaitState objects are allocated in-place in the |state| field of an
// enclosing async_wait_t, so async_wait_t address can be calculated by
// subtracting state offset in async_wait_t from |this|.
static_assert(std::is_standard_layout<async_wait_t>(),
"async_wait_t is expected to have standard layout.");
return reinterpret_cast<async_wait_t*>(reinterpret_cast<uint8_t*>(this) -
offsetof(async_wait_t, state));
}
private:
DISALLOW_COPY_AND_ASSIGN(WaitState);
};
class AsyncDispatcher::TaskState : public LinkNode<TaskState> {
public:
explicit TaskState(LinkNode<TaskState>* previous_task) {
InsertAfter(previous_task);
}
~TaskState() { RemoveFromList(); }
async_task_t* task() {
// TaskState objects are allocated in-place in the |state| field of an
// enclosing async_task_t, so async_task_t address can be calculated by
// subtracting state offset in async_task_t from |this|.
static_assert(std::is_standard_layout<async_task_t>(),
"async_task_t is expected to have standard layout.");
return reinterpret_cast<async_task_t*>(reinterpret_cast<uint8_t*>(this) -
offsetof(async_task_t, state));
}
private:
DISALLOW_COPY_AND_ASSIGN(TaskState);
};
AsyncDispatcher::AsyncDispatcher() {
zx_status_t status = zx_port_create(0u, port_.receive());
ZX_DCHECK(status == ZX_OK, status);
status = zx_timer_create(0u, ZX_CLOCK_MONOTONIC, timer_.receive());
ZX_DCHECK(status == ZX_OK, status);
status =
zx_object_wait_async(timer_.get(), port_.get(), key_from_ptr(&timer_),
ZX_TIMER_SIGNALED, ZX_WAIT_ASYNC_REPEATING);
ZX_DCHECK(status == ZX_OK, status);
status = zx_event_create(0, stop_event_.receive());
ZX_DCHECK(status == ZX_OK, status);
status = zx_object_wait_async(stop_event_.get(), port_.get(),
key_from_ptr(&stop_event_), ZX_EVENT_SIGNALED,
ZX_WAIT_ASYNC_REPEATING);
ZX_DCHECK(status == ZX_OK, status);
static const async_ops_t async_ops_t_impl = {
NowOp, BeginWaitOp, CancelWaitOp, PostTaskOp,
CancelTaskOp, QueuePacketOp, SetGuestBellTrapOp,
};
ops = &async_ops_t_impl;
DCHECK(!async_get_default());
async_set_default(this);
}
AsyncDispatcher::~AsyncDispatcher() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK_EQ(async_get_default(), this);
// Some waits and tasks may be canceled while the dispatcher is being
// destroyed, so pop-from-head until none remain.
while (!wait_list_.empty()) {
WaitState* state = wait_list_.head()->value();
async_wait_t* wait = state->wait();
state->~WaitState();
wait->handler(this, wait, ZX_ERR_CANCELED, nullptr);
}
while (!task_list_.empty()) {
TaskState* state = task_list_.head()->value();
async_task_t* task = state->task();
state->~TaskState();
task->handler(this, task, ZX_ERR_CANCELED);
}
async_set_default(nullptr);
}
zx_status_t AsyncDispatcher::DispatchOrWaitUntil(zx_time_t deadline) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
zx_port_packet_t packet = {};
zx_status_t status = zx_port_wait(port_.get(), deadline, &packet, 0);
if (status != ZX_OK)
return status;
if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE ||
packet.type == ZX_PKT_TYPE_SIGNAL_REP) {
if (packet.key == key_from_ptr(&timer_)) {
// |timer_| has expired.
DCHECK(packet.signal.observed & ZX_TIMER_SIGNALED);
DispatchTasks();
return ZX_OK;
} else if (packet.key == key_from_ptr(&stop_event_)) {
// Stop() was called.
DCHECK(packet.signal.observed & ZX_EVENT_SIGNALED);
status = zx_object_signal(stop_event_.get(), ZX_EVENT_SIGNALED, 0);
ZX_DCHECK(status == ZX_OK, status);
return ZX_ERR_CANCELED;
} else {
DCHECK_EQ(packet.type, ZX_PKT_TYPE_SIGNAL_ONE);
async_wait_t* wait = reinterpret_cast<async_wait_t*>(packet.key);
// Clean the state before invoking the handler: it may destroy the wait.
WaitState* state = reinterpret_cast<WaitState*>(&wait->state);
state->~WaitState();
wait->handler(this, wait, packet.status, &packet.signal);
return ZX_OK;
}
}
NOTREACHED();
return ZX_ERR_INTERNAL;
}
void AsyncDispatcher::Stop() {
// Can be called on any thread.
zx_status_t status =
zx_object_signal(stop_event_.get(), 0, ZX_EVENT_SIGNALED);
ZX_DCHECK(status == ZX_OK, status);
}
zx_time_t AsyncDispatcher::NowOp(async_t* async) {
DCHECK(async);
return zx_clock_get(ZX_CLOCK_MONOTONIC);
}
zx_status_t AsyncDispatcher::BeginWaitOp(async_t* async, async_wait_t* wait) {
return static_cast<AsyncDispatcher*>(async)->BeginWait(wait);
}
zx_status_t AsyncDispatcher::CancelWaitOp(async_t* async, async_wait_t* wait) {
return static_cast<AsyncDispatcher*>(async)->CancelWait(wait);
}
zx_status_t AsyncDispatcher::PostTaskOp(async_t* async, async_task_t* task) {
return static_cast<AsyncDispatcher*>(async)->PostTask(task);
}
zx_status_t AsyncDispatcher::CancelTaskOp(async_t* async, async_task_t* task) {
return static_cast<AsyncDispatcher*>(async)->CancelTask(task);
}
zx_status_t AsyncDispatcher::QueuePacketOp(async_t* async,
async_receiver_t* receiver,
const zx_packet_user_t* data) {
return ZX_ERR_NOT_SUPPORTED;
}
zx_status_t AsyncDispatcher::SetGuestBellTrapOp(async_t* async,
async_guest_bell_trap_t* trap,
zx_handle_t guest,
zx_vaddr_t addr,
size_t length) {
return ZX_ERR_NOT_SUPPORTED;
}
zx_status_t AsyncDispatcher::BeginWait(async_wait_t* wait) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
static_assert(sizeof(AsyncDispatcher::WaitState) <= sizeof(async_state_t),
"WaitState is too big");
WaitState* state = new (&wait->state) WaitState(this);
zx_status_t status = zx_object_wait_async(wait->object, port_.get(),
reinterpret_cast<uintptr_t>(wait),
wait->trigger, ZX_WAIT_ASYNC_ONCE);
if (status != ZX_OK)
state->~WaitState();
return status;
}
zx_status_t AsyncDispatcher::CancelWait(async_wait_t* wait) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
zx_status_t status =
zx_port_cancel(port_.get(), wait->object, (uintptr_t)wait);
if (status == ZX_OK) {
WaitState* state = reinterpret_cast<WaitState*>(&(wait->state));
state->~WaitState();
}
return status;
}
zx_status_t AsyncDispatcher::PostTask(async_task_t* task) {
// Can be called on any thread.
AutoLock lock(lock_);
// Find correct position for the new task in |task_list_| to keep the list
// sorted by deadline. This implementation has O(N) complexity, but it's
// acceptable - async task are not expected to be used frequently.
// TODO(sergeyu): Consider using a more efficient data structure if tasks
// performance becomes important.
LinkNode<TaskState>* node;
for (node = task_list_.head(); node != task_list_.end();
node = node->previous()) {
if (task->deadline >= node->value()->task()->deadline)
break;
}
static_assert(sizeof(AsyncDispatcher::TaskState) <= sizeof(async_state_t),
"TaskState is too big");
// Will insert new task after |node|.
new (&task->state) TaskState(node);
if (reinterpret_cast<TaskState*>(&task->state) == task_list_.head()) {
// Task inserted at head. Earliest deadline changed.
RestartTimerLocked();
}
return ZX_OK;
}
zx_status_t AsyncDispatcher::CancelTask(async_task_t* task) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
AutoLock lock(lock_);
if (!task->state.reserved[0])
return ZX_ERR_NOT_FOUND;
TaskState* state = reinterpret_cast<TaskState*>(&task->state);
state->~TaskState();
return ZX_OK;
}
void AsyncDispatcher::DispatchTasks() {
// Snapshot now value to set implicit bound for the tasks that will run before
// DispatchTasks() returns. This also helps to avoid calling zx_clock_get()
// more than necessary.
zx_time_t now = zx_clock_get(ZX_CLOCK_MONOTONIC);
while (true) {
async_task_t* task;
{
AutoLock lock(lock_);
if (task_list_.empty())
break;
TaskState* task_state = task_list_.head()->value();
task = task_state->task();
if (task->deadline > now) {
RestartTimerLocked();
break;
}
task_state->~TaskState();
// ~TaskState() is expected to reset the state to 0. The destructor
// removes the task from the |task_list_| and LinkNode::RemoveFromList()
// sets both its fields to nullptr, which is equivalent to resetting the
// state to 0.
DCHECK_EQ(task->state.reserved[0], 0u);
}
// The handler is responsible for freeing the |task| or it may reuse it.
task->handler(this, task, ZX_OK);
}
}
void AsyncDispatcher::RestartTimerLocked() {
lock_.AssertAcquired();
if (task_list_.empty())
return;
zx_time_t deadline = task_list_.head()->value()->task()->deadline;
zx_status_t status = zx_timer_set(timer_.get(), deadline, 0);
ZX_DCHECK(status == ZX_OK, status);
}
} // namespace base
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_FUCHSIA_ASYNC_DISPATCHER_H_
#define BASE_FUCHSIA_ASYNC_DISPATCHER_H_
#include <lib/async/default.h>
#include <lib/async/dispatcher.h>
#include "base/containers/linked_list.h"
#include "base/fuchsia/scoped_zx_handle.h"
#include "base/macros.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_checker.h"
namespace base {
// Implementation of dispatcher for Fuchsia's async library. It's necessary to
// run Fuchsia's library on chromium threads.
class AsyncDispatcher : public async_t {
public:
AsyncDispatcher();
~AsyncDispatcher();
// Returns after running one or more tasks or waits until |deadline|.
// Returns |ZX_OK| if some tasks were executed, |ZX_ERR_TIMED_OUT| - the
// deadline expired, |ZX_ERR_CANCELED| - Stop() was called.
zx_status_t DispatchOrWaitUntil(zx_time_t deadline);
// If Run() is being executed then it will return as soon as possible (e.g.
// finishing running the current task), otherwise the following Run() call
// will quit immediately instead of waiting until deadline expires.
void Stop();
private:
class WaitState;
class TaskState;
static zx_time_t NowOp(async_t* async);
static zx_status_t BeginWaitOp(async_t* async, async_wait_t* wait);
static zx_status_t CancelWaitOp(async_t* async, async_wait_t* wait);
static zx_status_t PostTaskOp(async_t* async, async_task_t* task);
static zx_status_t CancelTaskOp(async_t* async, async_task_t* task);
static zx_status_t QueuePacketOp(async_t* async,
async_receiver_t* receiver,
const zx_packet_user_t* data);
static zx_status_t SetGuestBellTrapOp(async_t* async,
async_guest_bell_trap_t* trap,
zx_handle_t guest,
zx_vaddr_t addr,
size_t length);
// async_ops_t implementation. Called by corresponding *Op() methods above.
zx_status_t BeginWait(async_wait_t* wait);
zx_status_t CancelWait(async_wait_t* wait);
zx_status_t PostTask(async_task_t* task);
zx_status_t CancelTask(async_task_t* task);
// Runs tasks in |task_list_| that have deadline in the past.
void DispatchTasks();
// Must be called while |lock_| is held.
void RestartTimerLocked();
THREAD_CHECKER(thread_checker_);
ScopedZxHandle port_;
ScopedZxHandle timer_;
ScopedZxHandle stop_event_;
LinkedList<WaitState> wait_list_;
// |lock_| must be held when accessing |task_list_|.
base::Lock lock_;
LinkedList<TaskState> task_list_;
DISALLOW_COPY_AND_ASSIGN(AsyncDispatcher);
};
} // namespace base
#endif // BASE_FUCHSIA_ASYNC_DISPATCHER_H_
\ No newline at end of file
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/fuchsia/async_dispatcher.h"
#include <lib/async/default.h>
#include <lib/async/task.h>
#include <lib/async/wait.h>
#include "base/callback.h"
#include "base/fuchsia/scoped_zx_handle.h"
#include "base/test/test_timeouts.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
namespace {
struct TestTask : public async_task_t {
explicit TestTask() {
state = ASYNC_STATE_INIT;
handler = &TaskProc;
deadline = 0;
}
static void TaskProc(async_t* async, async_task_t* task, zx_status_t status);
int num_calls = 0;
int repeats = 1;
OnceClosure on_call;
zx_status_t last_status = ZX_OK;
};
// static
void TestTask::TaskProc(async_t* async,
async_task_t* task,
zx_status_t status) {
EXPECT_EQ(async, async_get_default());
EXPECT_TRUE(status == ZX_OK || status == ZX_ERR_CANCELED)
<< "status: " << status;
auto* test_task = static_cast<TestTask*>(task);
test_task->num_calls++;
test_task->last_status = status;
if (!test_task->on_call.is_null())
std::move(test_task->on_call).Run();
if (test_task->num_calls < test_task->repeats)
async_post_task(async, task);
};
struct TestWait : public async_wait_t {
TestWait(zx_handle_t handle,
zx_signals_t signals) {
state = ASYNC_STATE_INIT;
handler = &HandleProc;
object = handle;
trigger = signals;
}
static void HandleProc(async_t* async,
async_wait_t* wait,
zx_status_t status,
const zx_packet_signal_t* signal);
int num_calls = 0;
OnceClosure on_call;
zx_status_t last_status = ZX_OK;
};
// static
void TestWait::HandleProc(async_t* async,
async_wait_t* wait,
zx_status_t status,
const zx_packet_signal_t* signal) {
EXPECT_EQ(async, async_get_default());
EXPECT_TRUE(status == ZX_OK || status == ZX_ERR_CANCELED)
<< "status: " << status;
auto* test_wait = static_cast<TestWait*>(wait);
test_wait->num_calls++;
test_wait->last_status = status;
if (!test_wait->on_call.is_null())
std::move(test_wait->on_call).Run();
}
} // namespace
class AsyncDispatcherTest : public testing::Test {
public:
AsyncDispatcherTest() {
dispatcher_ = std::make_unique<AsyncDispatcher>();
async_ = async_get_default();
EXPECT_TRUE(async_);
EXPECT_EQ(zx_socket_create(ZX_SOCKET_DATAGRAM, socket1_.receive(),
socket2_.receive()),
ZX_OK);
}
~AsyncDispatcherTest() override = default;
void RunUntilIdle() {
while (true) {
zx_status_t status = dispatcher_->DispatchOrWaitUntil(0);
if (status != ZX_OK) {
EXPECT_EQ(status, ZX_ERR_TIMED_OUT);
break;
}
}
}
protected:
std::unique_ptr<AsyncDispatcher> dispatcher_;
async_t* async_ = nullptr;
base::ScopedZxHandle socket1_;
base::ScopedZxHandle socket2_;
};
TEST_F(AsyncDispatcherTest, PostTask) {
TestTask task;
ASSERT_EQ(async_post_task(async_, &task), ZX_OK);
dispatcher_->DispatchOrWaitUntil(0);
EXPECT_EQ(task.num_calls, 1);
EXPECT_EQ(task.last_status, ZX_OK);
}
TEST_F(AsyncDispatcherTest, TaskRepeat) {
TestTask task;
task.repeats = 2;
ASSERT_EQ(async_post_task(async_, &task), ZX_OK);
RunUntilIdle();
EXPECT_EQ(task.num_calls, 2);
EXPECT_EQ(task.last_status, ZX_OK);
}
TEST_F(AsyncDispatcherTest, DelayedTask) {
TestTask task;
constexpr auto kDelay = TimeDelta::FromMilliseconds(5);
TimeTicks started = TimeTicks::Now();
task.deadline = zx_deadline_after(kDelay.InNanoseconds());
ASSERT_EQ(async_post_task(async_, &task), ZX_OK);
zx_status_t status = dispatcher_->DispatchOrWaitUntil(zx_deadline_after(
(kDelay + TestTimeouts::tiny_timeout()).InNanoseconds()));
EXPECT_EQ(status, ZX_OK);
EXPECT_GE(TimeTicks::Now() - started, kDelay);
}
TEST_F(AsyncDispatcherTest, CancelTask) {
TestTask task;
ASSERT_EQ(async_post_task(async_, &task), ZX_OK);
ASSERT_EQ(async_cancel_task(async_, &task), ZX_OK);
RunUntilIdle();
EXPECT_EQ(task.num_calls, 0);
}
TEST_F(AsyncDispatcherTest, TaskObserveShutdown) {
TestTask task;
ASSERT_EQ(async_post_task(async_, &task), ZX_OK);
dispatcher_.reset();
EXPECT_EQ(task.num_calls, 1);
EXPECT_EQ(task.last_status, ZX_ERR_CANCELED);
}
TEST_F(AsyncDispatcherTest, Wait) {
TestWait wait(socket1_.get(), ZX_SOCKET_READABLE);
EXPECT_EQ(async_begin_wait(async_, &wait), ZX_OK);
// Handler shouldn't be called because the event wasn't signaled.
RunUntilIdle();
EXPECT_EQ(wait.num_calls, 0);
char byte = 0;
EXPECT_EQ(zx_socket_write(socket2_.get(), /*options=*/0, &byte, sizeof(byte),
/*actual=*/nullptr),
ZX_OK);
zx_status_t status = dispatcher_->DispatchOrWaitUntil(
zx_deadline_after(TestTimeouts::tiny_timeout().InNanoseconds()));
EXPECT_EQ(status, ZX_OK);
EXPECT_EQ(wait.num_calls, 1);
EXPECT_EQ(wait.last_status, ZX_OK);
}
TEST_F(AsyncDispatcherTest, CancelWait) {
TestWait wait(socket1_.get(), ZX_SOCKET_READABLE);
EXPECT_EQ(async_begin_wait(async_, &wait), ZX_OK);
char byte = 0;
EXPECT_EQ(zx_socket_write(socket2_.get(), /*options=*/0, &byte, sizeof(byte),
/*actual=*/nullptr),
ZX_OK);
EXPECT_EQ(async_cancel_wait(async_, &wait), ZX_OK);
RunUntilIdle();
EXPECT_EQ(wait.num_calls, 0);
}
TEST_F(AsyncDispatcherTest, WaitShutdown) {
TestWait wait(socket1_.get(), ZX_SOCKET_READABLE);
EXPECT_EQ(async_begin_wait(async_, &wait), ZX_OK);
RunUntilIdle();
dispatcher_.reset();
EXPECT_EQ(wait.num_calls, 1);
EXPECT_EQ(wait.last_status, ZX_ERR_CANCELED);
}
} // namespace base
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
assert(is_fuchsia)
config("sdk_lib_dirs_config") {
lib_dirs = [ "sdk/arch/${target_cpu}/lib" ]
}
config("async_pkg_config") {
visibility = [ ":async" ]
configs = [ ":sdk_lib_dirs_config" ]
include_dirs = [
"sdk/pkg/async/include",
"sdk/pkg/async-default/include",
]
libs = [ "async.default" ]
}
static_library("async") {
public_configs = [ ":async_pkg_config" ]
sources = [
"sdk/pkg/async/ops.c",
]
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment