Commit 992eab60 authored by qsr's avatar qsr Committed by Commit bot

mojo: Add async waiter for python bindings

BUG=415486
R=sdefresne@chromium.org,cmasone@chromium.org

Review URL: https://codereview.chromium.org/573253002

Cr-Commit-Position: refs/heads/master@{#295480}
parent a02b3e5b
......@@ -4,7 +4,13 @@
# distutils: language = c++
from libc.stdint cimport int64_t
from libc.stdint cimport int64_t, intptr_t, uint32_t, uint64_t
cdef extern from "mojo/public/c/system/core.h" nogil:
ctypedef uint32_t MojoHandle
ctypedef uint64_t MojoDeadline
ctypedef uint32_t MojoHandleSignals
cdef extern from "mojo/public/cpp/bindings/callback.h" nogil:
......@@ -12,9 +18,20 @@ cdef extern from "mojo/public/cpp/bindings/callback.h" nogil:
CClosure()
cdef extern from "mojo/public/c/environment/async_waiter.h" nogil:
ctypedef intptr_t MojoAsyncWaitID
cdef extern from "mojo/public/python/src/python_system_helper.h" \
namespace "mojo" nogil:
namespace "mojo::python" nogil:
cdef CClosure BuildClosure(object)
cdef cppclass PythonAsyncWaiter "mojo::python::PythonAsyncWaiter":
PythonAsyncWaiter()
MojoAsyncWaitID AsyncWait(MojoHandle,
MojoHandleSignals,
MojoDeadline,
object)
void CancelWait(MojoAsyncWaitID)
cdef extern from "mojo/public/cpp/utility/run_loop.h" nogil:
......@@ -23,7 +40,8 @@ cdef extern from "mojo/public/cpp/utility/run_loop.h" nogil:
void Run() except *
void RunUntilIdle() except *
void Quit()
void PostDelayedTask(CClosure& task, int64_t delay)
void PostDelayedTask(CClosure&, int64_t)
cdef extern from "mojo/public/cpp/environment/environment.h" nogil:
cdef cppclass CEnvironment "mojo::Environment":
......
......@@ -324,6 +324,19 @@ cdef class Handle(object):
result = c_core.MojoWait(handle, csignals, cdeadline)
return result
def AsyncWait(self, signals, deadline, callback):
cdef c_core.MojoHandle handle = self._mojo_handle
cdef c_core.MojoHandleSignals csignals = signals
cdef c_core.MojoDeadline cdeadline = deadline
cdef c_environment.MojoAsyncWaitID wait_id = _ASYNC_WAITER.AsyncWait(
handle,
csignals,
cdeadline,
callback)
def cancel():
_ASYNC_WAITER.CancelWait(wait_id)
return cancel
def WriteMessage(self,
buffer=None,
handles=None,
......@@ -725,3 +738,4 @@ cdef class RunLoop(object):
cdef c_environment.CEnvironment* _ENVIRONMENT = new c_environment.CEnvironment()
cdef c_environment.PythonAsyncWaiter* _ASYNC_WAITER = new c_environment.PythonAsyncWaiter()
......@@ -6,6 +6,7 @@
#include "Python.h"
#include "mojo/public/cpp/environment/environment.h"
#include "mojo/public/cpp/environment/logging.h"
#include "mojo/public/cpp/system/macros.h"
#include "mojo/public/cpp/utility/run_loop.h"
......@@ -14,13 +15,9 @@ namespace {
class ScopedGIL {
public:
ScopedGIL() {
state_ = PyGILState_Ensure();
}
ScopedGIL() { state_ = PyGILState_Ensure(); }
~ScopedGIL() {
PyGILState_Release(state_);
}
~ScopedGIL() { PyGILState_Release(state_); }
private:
PyGILState_STATE state_;
......@@ -31,7 +28,7 @@ class ScopedGIL {
class PythonClosure : public mojo::Closure::Runnable {
public:
PythonClosure(PyObject* callable) : callable_(callable) {
MOJO_CHECK(callable);
MOJO_DCHECK(callable);
Py_XINCREF(callable);
}
......@@ -64,9 +61,69 @@ class PythonClosure : public mojo::Closure::Runnable {
MOJO_DISALLOW_COPY_AND_ASSIGN(PythonClosure);
};
void AsyncCallbackForwarder(void* closure, MojoResult result) {
mojo::Callback<void(MojoResult)>* callback =
static_cast<mojo::Callback<void(MojoResult)>*>(closure);
// callback will be deleted when it is run.
callback->Run(result);
}
} // namespace
namespace mojo {
namespace python {
class PythonAsyncWaiter::AsyncWaiterRunnable
: public mojo::Callback<void(MojoResult)>::Runnable {
public:
AsyncWaiterRunnable(PyObject* callable, CallbackMap* callbacks)
: wait_id_(0), callable_(callable), callbacks_(callbacks) {
MOJO_DCHECK(callable);
MOJO_DCHECK(callbacks_);
Py_XINCREF(callable);
}
virtual ~AsyncWaiterRunnable() {
ScopedGIL acquire_gil;
Py_DECREF(callable_);
}
void set_wait_id(int wait_id) { wait_id_ = wait_id; }
virtual void Run(MojoResult mojo_result) const MOJO_OVERRIDE {
MOJO_DCHECK(wait_id_);
// Remove to reference to this object from PythonAsyncWaiter and ensure this
// object will be destroyed when this method exits.
MOJO_DCHECK(callbacks_->find(wait_id_) != callbacks_->end());
internal::SharedPtr<mojo::Callback<void(MojoResult)> > self =
(*callbacks_)[wait_id_];
callbacks_->erase(wait_id_);
ScopedGIL acquire_gil;
PyObject* args_tuple = Py_BuildValue("(i)", mojo_result);
if (!args_tuple) {
mojo::RunLoop::current()->Quit();
return;
}
PyObject* result = PyObject_CallObject(callable_, args_tuple);
Py_DECREF(args_tuple);
if (result) {
Py_DECREF(result);
} else {
mojo::RunLoop::current()->Quit();
return;
}
}
private:
MojoAsyncWaitID wait_id_;
PyObject* callable_;
CallbackMap* callbacks_;
MOJO_DISALLOW_COPY_AND_ASSIGN(AsyncWaiterRunnable);
};
Closure BuildClosure(PyObject* callable) {
if (!PyCallable_Check(callable))
......@@ -76,4 +133,39 @@ Closure BuildClosure(PyObject* callable) {
static_cast<mojo::Closure::Runnable*>(new PythonClosure(callable)));
}
PythonAsyncWaiter::PythonAsyncWaiter() {
async_waiter_ = Environment::GetDefaultAsyncWaiter();
}
PythonAsyncWaiter::~PythonAsyncWaiter() {
for (CallbackMap::const_iterator it = callbacks_.begin();
it != callbacks_.end();
++it) {
async_waiter_->CancelWait(it->first);
}
}
MojoAsyncWaitID PythonAsyncWaiter::AsyncWait(MojoHandle handle,
MojoHandleSignals signals,
MojoDeadline deadline,
PyObject* callable) {
AsyncWaiterRunnable* runner = new AsyncWaiterRunnable(callable, &callbacks_);
internal::SharedPtr<mojo::Callback<void(MojoResult)> > callback(
new mojo::Callback<void(MojoResult)>(
static_cast<mojo::Callback<void(MojoResult)>::Runnable*>(runner)));
MojoAsyncWaitID wait_id = async_waiter_->AsyncWait(
handle, signals, deadline, &AsyncCallbackForwarder, callback.get());
callbacks_[wait_id] = callback;
runner->set_wait_id(wait_id);
return wait_id;
}
void PythonAsyncWaiter::CancelWait(MojoAsyncWaitID wait_id) {
if (callbacks_.find(wait_id) != callbacks_.end()) {
async_waiter_->CancelWait(wait_id);
callbacks_.erase(wait_id);
}
}
} // namespace python
} // namespace mojo
......@@ -5,18 +5,46 @@
#ifndef MOJO_PUBLIC_PYTHON_SRC_PYTHON_SYSTEM_HELPER_H_
#define MOJO_PUBLIC_PYTHON_SRC_PYTHON_SYSTEM_HELPER_H_
#include "Python.h"
// Python must be the first include, as it defines preprocessor variable without
// checking if they already exist.
#include <Python.h>
#include <map>
#include "mojo/public/c/environment/async_waiter.h"
#include "mojo/public/c/system/types.h"
#include "mojo/public/cpp/bindings/callback.h"
#include "mojo/public/cpp/bindings/lib/shared_ptr.h"
namespace mojo {
namespace python {
// Create a mojo::Closure from a callable python object.
mojo::Closure BuildClosure(PyObject* callable);
// Initalize mojo::RunLoop
void InitRunLoop();
class PythonAsyncWaiter {
public:
PythonAsyncWaiter();
~PythonAsyncWaiter();
MojoAsyncWaitID AsyncWait(MojoHandle handle,
MojoHandleSignals signals,
MojoDeadline deadline,
PyObject* callable);
void CancelWait(MojoAsyncWaitID wait_id);
private:
class AsyncWaiterRunnable;
typedef std::map<MojoAsyncWaitID,
internal::SharedPtr<mojo::Callback<void(MojoResult)> > >
CallbackMap;
CallbackMap callbacks_;
const MojoAsyncWaiter* async_waiter_;
};
} // namespace python
} // namespace mojo
#endif // MOJO_PUBLIC_PYTHON_SRC_PYTHON_SYSTEM_HELPER_H_
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import unittest
# pylint: disable=F0401
import mojo.embedder
from mojo import system
class AsyncWaitTest(unittest.TestCase):
def setUp(self):
mojo.embedder.Init()
self.loop = system.RunLoop()
self.array = []
self.handles = system.MessagePipe()
self.cancel = self.handles.handle0.AsyncWait(system.HANDLE_SIGNAL_READABLE,
system.DEADLINE_INDEFINITE,
self.OnResult)
self.loop.PostDelayedTask(self.WriteToHandle, 100)
def tearDown(self):
self.handles = None
self.array = None
self.loop = None
def OnResult(self, value):
self.array.append(value)
def WriteToHandle(self):
self.handles.handle1.WriteMessage()
def testAsyncWait(self):
self.loop.RunUntilIdle()
self.assertEquals(len(self.array), 1)
self.assertEquals(system.RESULT_OK, self.array[0])
self.cancel()
def testAsyncWaitCancel(self):
self.loop.PostDelayedTask(self.cancel, 50)
self.loop.RunUntilIdle()
self.assertEquals(len(self.array), 0)
self.cancel()
def testAsyncWaitImmediateCancel(self):
self.cancel()
self.loop.RunUntilIdle()
self.assertEquals(len(self.array), 0)
self.cancel()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment