Mojo: Make parallel sync/async embedder channel creation/destruction APIs.

(I still have to write tests and convert some uses. I also want to add
more logic to the async destruction, so that the |Channel| knows that
it's about to be torn down.)

R=yzshen@chromium.org

Review URL: https://codereview.chromium.org/466563002

Cr-Commit-Position: refs/heads/master@{#289015}
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@289015 0039d316-1c4b-4281-b951-d872f2087c98
parent 3557fd48
...@@ -24,17 +24,19 @@ namespace embedder { ...@@ -24,17 +24,19 @@ namespace embedder {
// outside world. But we need to define it before our (internal-only) functions // outside world. But we need to define it before our (internal-only) functions
// that use it. // that use it.
struct ChannelInfo { struct ChannelInfo {
explicit ChannelInfo(scoped_refptr<system::Channel> channel) ChannelInfo() {}
: channel(channel) {}
~ChannelInfo() {} ~ChannelInfo() {}
scoped_refptr<system::Channel> channel; scoped_refptr<system::Channel> channel;
// May be null, in which case |DestroyChannelOnIOThread()| must be used (from
// the IO thread), instead of |DestroyChannel()|.
scoped_refptr<base::TaskRunner> io_thread_task_runner;
}; };
namespace { namespace {
// Helper for |CreateChannelOnIOThread()|. (Note: May return null for some // Helper for |CreateChannel...()|. (Note: May return null for some failures.)
// failures.)
scoped_refptr<system::Channel> MakeChannel( scoped_refptr<system::Channel> MakeChannel(
ScopedPlatformHandle platform_handle, ScopedPlatformHandle platform_handle,
scoped_refptr<system::MessagePipe> message_pipe) { scoped_refptr<system::MessagePipe> message_pipe) {
...@@ -73,13 +75,13 @@ scoped_refptr<system::Channel> MakeChannel( ...@@ -73,13 +75,13 @@ scoped_refptr<system::Channel> MakeChannel(
return channel; return channel;
} }
void CreateChannelOnIOThread( void CreateChannelHelper(
ScopedPlatformHandle platform_handle, ScopedPlatformHandle platform_handle,
scoped_ptr<ChannelInfo> channel_info,
scoped_refptr<system::MessagePipe> message_pipe, scoped_refptr<system::MessagePipe> message_pipe,
DidCreateChannelCallback callback, DidCreateChannelCallback callback,
scoped_refptr<base::TaskRunner> callback_thread_task_runner) { scoped_refptr<base::TaskRunner> callback_thread_task_runner) {
scoped_ptr<ChannelInfo> channel_info( channel_info->channel = MakeChannel(platform_handle.Pass(), message_pipe);
new ChannelInfo(MakeChannel(platform_handle.Pass(), message_pipe)));
// Hand the channel back to the embedder. // Hand the channel back to the embedder.
if (callback_thread_task_runner) { if (callback_thread_task_runner) {
...@@ -96,6 +98,29 @@ void Init() { ...@@ -96,6 +98,29 @@ void Init() {
system::entrypoints::SetCore(new system::Core()); system::entrypoints::SetCore(new system::Core());
} }
// TODO(vtl): Write tests for this.
ScopedMessagePipeHandle CreateChannelOnIOThread(
ScopedPlatformHandle platform_handle,
ChannelInfo** channel_info) {
DCHECK(platform_handle.is_valid());
DCHECK(channel_info);
std::pair<scoped_refptr<system::MessagePipeDispatcher>,
scoped_refptr<system::MessagePipe> > remote_message_pipe =
system::MessagePipeDispatcher::CreateRemoteMessagePipe();
system::Core* core = system::entrypoints::GetCore();
DCHECK(core);
ScopedMessagePipeHandle rv(
MessagePipeHandle(core->AddDispatcher(remote_message_pipe.first)));
*channel_info = new ChannelInfo();
(*channel_info)->channel =
MakeChannel(platform_handle.Pass(), remote_message_pipe.second);
return rv.Pass();
}
ScopedMessagePipeHandle CreateChannel( ScopedMessagePipeHandle CreateChannel(
ScopedPlatformHandle platform_handle, ScopedPlatformHandle platform_handle,
scoped_refptr<base::TaskRunner> io_thread_task_runner, scoped_refptr<base::TaskRunner> io_thread_task_runner,
...@@ -111,15 +136,24 @@ ScopedMessagePipeHandle CreateChannel( ...@@ -111,15 +136,24 @@ ScopedMessagePipeHandle CreateChannel(
DCHECK(core); DCHECK(core);
ScopedMessagePipeHandle rv( ScopedMessagePipeHandle rv(
MessagePipeHandle(core->AddDispatcher(remote_message_pipe.first))); MessagePipeHandle(core->AddDispatcher(remote_message_pipe.first)));
// TODO(vtl): Do we properly handle the failure case here?
scoped_ptr<ChannelInfo> channel_info(new ChannelInfo());
channel_info->io_thread_task_runner = io_thread_task_runner;
if (rv.is_valid()) { if (rv.is_valid()) {
io_thread_task_runner->PostTask(FROM_HERE, io_thread_task_runner->PostTask(FROM_HERE,
base::Bind(&CreateChannelOnIOThread, base::Bind(&CreateChannelHelper,
base::Passed(&platform_handle), base::Passed(&platform_handle),
base::Passed(&channel_info),
remote_message_pipe.second, remote_message_pipe.second,
callback, callback,
callback_thread_task_runner)); callback_thread_task_runner));
} else {
(callback_thread_task_runner ? callback_thread_task_runner
: io_thread_task_runner)
->PostTask(FROM_HERE, base::Bind(callback, channel_info.release()));
} }
return rv.Pass(); return rv.Pass();
} }
...@@ -134,6 +168,15 @@ void DestroyChannelOnIOThread(ChannelInfo* channel_info) { ...@@ -134,6 +168,15 @@ void DestroyChannelOnIOThread(ChannelInfo* channel_info) {
delete channel_info; delete channel_info;
} }
// TODO(vtl): Write tests for this.
void DestroyChannel(ChannelInfo* channel_info) {
DCHECK(channel_info);
DCHECK(channel_info->io_thread_task_runner);
channel_info->io_thread_task_runner->PostTask(
FROM_HERE, base::Bind(&DestroyChannelOnIOThread, channel_info));
}
MojoResult CreatePlatformHandleWrapper( MojoResult CreatePlatformHandleWrapper(
ScopedPlatformHandle platform_handle, ScopedPlatformHandle platform_handle,
MojoHandle* platform_handle_wrapper_handle) { MojoHandle* platform_handle_wrapper_handle) {
......
...@@ -18,44 +18,84 @@ namespace embedder { ...@@ -18,44 +18,84 @@ namespace embedder {
// Must be called first to initialize the (global, singleton) system. // Must be called first to initialize the (global, singleton) system.
MOJO_SYSTEM_IMPL_EXPORT void Init(); MOJO_SYSTEM_IMPL_EXPORT void Init();
// Creates a new "channel", returning a handle to the bootstrap message pipe on // A "channel" is a connection on top of an OS "pipe", on top of which Mojo
// that channel. |platform_handle| should be an OS-dependent handle to one side // message pipes (etc.) can be multiplexed. It must "live" on some I/O thread.
// of a suitable bidirectional OS "pipe" (e.g., a file descriptor to a socket on
// POSIX, a handle to a named pipe on Windows); this "pipe" should be connected
// and ready for operation (e.g., to be written to or read from).
// |io_thread_task_runner| should be a |TaskRunner| for the thread on which the
// "channel" will run (read data and demultiplex).
// //
// On completion, it will run |callback| with a pointer to a |ChannelInfo| // There are two "channel" creation/destruction APIs: the synchronous
// (which is meant to be opaque to the embedder). If // |CreateChannelOnIOThread()|/|DestroyChannelOnIOThread()|, which must be
// |callback_thread_task_runner| is non-null, it the callback will be posted to // called from the I/O thread, and the asynchronous
// that task runner. Otherwise, it will be run on the I/O thread directly. // |CreateChannel()|/|DestroyChannel()|, which may be called from any thread.
// //
// Returns an invalid |MOJO_HANDLE_INVALID| on error. Note that this will happen // Both creation functions have a |platform_handle| argument, which should be an
// only if, e.g., the handle table is full (operation of the channel begins // OS-dependent handle to one side of a suitable bidirectional OS "pipe" (e.g.,
// asynchronously and if, e.g., the other end of the "pipe" is closed, this will // a file descriptor to a socket on POSIX, a handle to a named pipe on Windows);
// report an error to the returned handle in the usual way). // this "pipe" should be connected and ready for operation (e.g., to be written
// to or read from).
// //
// Notes: The handle returned is ready for use immediately, with messages // Both (synchronously) return a handle to the bootstrap message pipe on the
// written to it queued. E.g., it would be perfectly valid for a message to be // channel that was (or is to be) created, or |MOJO_HANDLE_INVALID| on error
// immediately written to the returned handle and the handle closed, all before // (but note that this will happen only if, e.g., the handle table is full).
// the channel has begun operation on the IO thread. In this case, the channel // This message pipe may be used immediately, but since channel operation
// is expected to connect as usual, send the queued message, and report that the // actually begins asynchronously, other errors may still occur (e.g., if the
// handle was closed to the other side. (This message may well contain another // other end of the "pipe" is closed) and be reported in the usual way to the
// handle, so there may well still be message pipes "on" this channel.) // returned handle.
//
// (E.g., a message written immediately to the returned handle will be queued
// and the handle immediately closed, before the channel begins operation. In
// this case, the channel should connect as usual, send the queued message, and
// report that the handle was closed to the other side. The message sent may
// have other handles, so there may still be message pipes "on" this channel.)
//
// Both also produce a |ChannelInfo*| (a pointer to an opaque object) -- the
// first synchronously and second asynchronously.
//
// The destruction functions are similarly synchronous and asynchronous,
// respectively, and take the |ChannelInfo*| produced by the creation function.
// (Note: One may call |DestroyChannelOnIOThread()| with the result of
// |CreateChannel()|, but not |DestroyChannel()| with the result of
// |CreateChannelOnIOThread()|.)
// //
// TODO(vtl): Figure out channel teardown. // TODO(vtl): Figure out channel teardown.
struct ChannelInfo; struct ChannelInfo;
// Creates a channel; must only be called from the I/O thread. |platform_handle|
// should be a handle to a connected OS "pipe". Eventually (even on failure),
// the "out" value |*channel_info| should be passed to
// |DestroyChannelOnIOThread()| to tear down the channel. Returns a handle to
// the bootstrap message pipe.
MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
CreateChannelOnIOThread(ScopedPlatformHandle platform_handle,
ChannelInfo** channel_info);
typedef base::Callback<void(ChannelInfo*)> DidCreateChannelCallback; typedef base::Callback<void(ChannelInfo*)> DidCreateChannelCallback;
// Creates a channel asynchronously; may be called from any thread.
// |platform_handle| should be a handle to a connected OS "pipe".
// |io_thread_task_runner| should be the |TaskRunner| for the I/O thread.
// |callback| should be the callback to call with the |ChannelInfo*|, which
// should eventually be passed to |DestroyChannel()| (or
// |DestroyChannelOnIOThread()|) to tear down the channel; the callback will be
// called using |callback_thread_task_runner| if that is non-null, or otherwise
// it will be called using |io_thread_task_runner|. Returns a handle to the
// bootstrap message pipe.
MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
CreateChannel(ScopedPlatformHandle platform_handle, CreateChannel(ScopedPlatformHandle platform_handle,
scoped_refptr<base::TaskRunner> io_thread_task_runner, scoped_refptr<base::TaskRunner> io_thread_task_runner,
DidCreateChannelCallback callback, DidCreateChannelCallback callback,
scoped_refptr<base::TaskRunner> callback_thread_task_runner); scoped_refptr<base::TaskRunner> callback_thread_task_runner);
// Destroys a channel that was created using either |CreateChannelOnIOThread()|
// or |CreateChannel()|; must only be called from the I/O thread. |channel_info|
// should be the "out" value from |CreateChannelOnIOThread()| or the value
// provided to the callback to |CreateChannel()|.
MOJO_SYSTEM_IMPL_EXPORT void DestroyChannelOnIOThread( MOJO_SYSTEM_IMPL_EXPORT void DestroyChannelOnIOThread(
ChannelInfo* channel_info); ChannelInfo* channel_info);
// Destroys a channel (asynchronously) that was created using |CreateChannel()|
// (note: NOT |CreateChannelOnIOThread()|); may be called from any thread.
// |channel_info| should be the value provided to the callback to
// |CreateChannel()|.
MOJO_SYSTEM_IMPL_EXPORT void DestroyChannel(ChannelInfo* channel_info);
// Creates a |MojoHandle| that wraps the given |PlatformHandle| (taking // Creates a |MojoHandle| that wraps the given |PlatformHandle| (taking
// ownership of it). This |MojoHandle| can then, e.g., be passed through message // ownership of it). This |MojoHandle| can then, e.g., be passed through message
// pipes. Note: This takes ownership (and thus closes) |platform_handle| even on // pipes. Note: This takes ownership (and thus closes) |platform_handle| even on
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment