Commit dbb3bb6b authored by rockot's avatar rockot Committed by Commit bot

Mojo IPC threading fixes

Two independent but related fixes here:

1. IPCSupportInitializer could previously outlive its IO
   runner, resulting in Mojo not being shut down in time.
   The EDK posts tasks directly to a MessageLoopForIO
   pointer because it's assumed to remain valid until
   EDK shutdown. This CL limits IPCSupportInitializer's
   lifetime to that of the IO thread, regardless of
   ScopedIPCSupport lifetime.

2. In single-process mode, the child IO thread cannot be
   used to create the client ChannelMojo, because all channels
   in-process must be created on the EDK IO thread. This problem only
   surfaced in practice because #1 above changed some shutdown
   ordering and tickled a DCHECK in EDK shutdown.

   This CL has ChannelMojo hop to its IO runner (only if
   necessary) before creating underlying messaging pipes.

BUG=None
R=morrita@chromium.org

Review URL: https://codereview.chromium.org/1130413002

Cr-Commit-Position: refs/heads/master@{#329262}
parent bbdef2a0
......@@ -7,6 +7,7 @@
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
#include "base/thread_task_runner_handle.h"
#include "ipc/ipc_listener.h"
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message_attachment_set.h"
......@@ -74,7 +75,10 @@ class ClientChannelMojo : public ChannelMojo,
const mojo::Callback<void(int32_t)>& callback) override;
private:
void BindPipe(mojo::ScopedMessagePipeHandle handle);
mojo::Binding<ClientChannel> binding_;
base::WeakPtrFactory<ClientChannelMojo> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(ClientChannelMojo);
};
......@@ -84,7 +88,8 @@ ClientChannelMojo::ClientChannelMojo(ChannelMojo::Delegate* delegate,
const ChannelHandle& handle,
Listener* listener)
: ChannelMojo(delegate, io_runner, handle, Channel::MODE_CLIENT, listener),
binding_(this) {
binding_(this),
weak_factory_(this) {
}
ClientChannelMojo::~ClientChannelMojo() {
......@@ -92,7 +97,8 @@ ClientChannelMojo::~ClientChannelMojo() {
void ClientChannelMojo::OnPipeAvailable(
mojo::embedder::ScopedPlatformHandle handle) {
binding_.Bind(CreateMessagingPipe(handle.Pass()));
CreateMessagingPipe(handle.Pass(), base::Bind(&ClientChannelMojo::BindPipe,
weak_factory_.GetWeakPtr()));
}
void ClientChannelMojo::OnConnectionError() {
......@@ -107,6 +113,10 @@ void ClientChannelMojo::Init(
callback.Run(GetSelfPID());
}
void ClientChannelMojo::BindPipe(mojo::ScopedMessagePipeHandle handle) {
binding_.Bind(handle.Pass());
}
//------------------------------------------------------------------------------
class ServerChannelMojo : public ChannelMojo, public mojo::ErrorHandler {
......@@ -125,11 +135,15 @@ class ServerChannelMojo : public ChannelMojo, public mojo::ErrorHandler {
void Close() override;
private:
void InitClientChannel(mojo::ScopedMessagePipeHandle peer_handle,
mojo::ScopedMessagePipeHandle handle);
// ClientChannelClient implementation
void ClientChannelWasInitialized(int32_t peer_pid);
mojo::InterfacePtr<ClientChannel> client_channel_;
mojo::ScopedMessagePipeHandle message_pipe_;
base::WeakPtrFactory<ServerChannelMojo> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(ServerChannelMojo);
};
......@@ -138,7 +152,8 @@ ServerChannelMojo::ServerChannelMojo(ChannelMojo::Delegate* delegate,
scoped_refptr<base::TaskRunner> io_runner,
const ChannelHandle& handle,
Listener* listener)
: ChannelMojo(delegate, io_runner, handle, Channel::MODE_SERVER, listener) {
: ChannelMojo(delegate, io_runner, handle, Channel::MODE_SERVER, listener),
weak_factory_(this) {
}
ServerChannelMojo::~ServerChannelMojo() {
......@@ -155,14 +170,20 @@ void ServerChannelMojo::OnPipeAvailable(
listener()->OnChannelError();
return;
}
CreateMessagingPipe(
handle.Pass(),
base::Bind(&ServerChannelMojo::InitClientChannel,
weak_factory_.GetWeakPtr(), base::Passed(&peer)));
}
void ServerChannelMojo::InitClientChannel(
mojo::ScopedMessagePipeHandle peer_handle,
mojo::ScopedMessagePipeHandle handle) {
client_channel_.Bind(
mojo::InterfacePtrInfo<ClientChannel>(
CreateMessagingPipe(handle.Pass()), 0u));
mojo::InterfacePtrInfo<ClientChannel>(handle.Pass(), 0u));
client_channel_.set_error_handler(this);
client_channel_->Init(
peer.Pass(),
static_cast<int32_t>(GetSelfPID()),
peer_handle.Pass(), static_cast<int32_t>(GetSelfPID()),
base::Bind(&ServerChannelMojo::ClientChannelWasInitialized,
base::Unretained(this)));
}
......@@ -190,13 +211,26 @@ base::ScopedFD TakeOrDupFile(internal::PlatformFileAttachment* attachment) {
#endif
} // namespace
} // namespace
//------------------------------------------------------------------------------
ChannelMojo::ChannelInfoDeleter::ChannelInfoDeleter(
scoped_refptr<base::TaskRunner> io_runner)
: io_runner(io_runner) {
}
ChannelMojo::ChannelInfoDeleter::~ChannelInfoDeleter() {
}
void ChannelMojo::ChannelInfoDeleter::operator()(
mojo::embedder::ChannelInfo* ptr) const {
mojo::embedder::DestroyChannelOnIOThread(ptr);
if (base::ThreadTaskRunnerHandle::Get() == io_runner) {
mojo::embedder::DestroyChannelOnIOThread(ptr);
} else {
io_runner->PostTask(
FROM_HERE, base::Bind(&mojo::embedder::DestroyChannelOnIOThread, ptr));
}
}
//------------------------------------------------------------------------------
......@@ -254,6 +288,7 @@ ChannelMojo::ChannelMojo(ChannelMojo::Delegate* delegate,
listener_(listener),
peer_pid_(base::kNullProcessId),
io_runner_(io_runner),
channel_info_(nullptr, ChannelInfoDeleter(nullptr)),
weak_factory_(this) {
// Create MojoBootstrap after all members are set as it touches
// ChannelMojo from a different thread.
......@@ -280,14 +315,47 @@ void ChannelMojo::InitOnIOThread(ChannelMojo::Delegate* delegate) {
delegate_->OnChannelCreated(weak_factory_.GetWeakPtr());
}
mojo::ScopedMessagePipeHandle ChannelMojo::CreateMessagingPipe(
mojo::embedder::ScopedPlatformHandle handle) {
DCHECK(!channel_info_.get());
void ChannelMojo::CreateMessagingPipe(
mojo::embedder::ScopedPlatformHandle handle,
const CreateMessagingPipeCallback& callback) {
auto return_callback = base::Bind(&ChannelMojo::OnMessagingPipeCreated,
weak_factory_.GetWeakPtr(), callback);
if (base::ThreadTaskRunnerHandle::Get() == io_runner_) {
CreateMessagingPipeOnIOThread(
handle.Pass(), base::ThreadTaskRunnerHandle::Get(), return_callback);
} else {
io_runner_->PostTask(
FROM_HERE,
base::Bind(&ChannelMojo::CreateMessagingPipeOnIOThread,
base::Passed(&handle), base::ThreadTaskRunnerHandle::Get(),
return_callback));
}
}
// static
void ChannelMojo::CreateMessagingPipeOnIOThread(
mojo::embedder::ScopedPlatformHandle handle,
scoped_refptr<base::TaskRunner> callback_runner,
const CreateMessagingPipeOnIOThreadCallback& callback) {
mojo::embedder::ChannelInfo* channel_info;
mojo::ScopedMessagePipeHandle pipe =
mojo::embedder::CreateChannelOnIOThread(handle.Pass(), &channel_info);
channel_info_.reset(channel_info);
return pipe.Pass();
if (base::ThreadTaskRunnerHandle::Get() == callback_runner) {
callback.Run(pipe.Pass(), channel_info);
} else {
callback_runner->PostTask(
FROM_HERE, base::Bind(callback, base::Passed(&pipe), channel_info));
}
}
void ChannelMojo::OnMessagingPipeCreated(
const CreateMessagingPipeCallback& callback,
mojo::ScopedMessagePipeHandle handle,
mojo::embedder::ChannelInfo* channel_info) {
DCHECK(!channel_info_.get());
channel_info_ = scoped_ptr<mojo::embedder::ChannelInfo, ChannelInfoDeleter>(
channel_info, ChannelInfoDeleter(io_runner_));
callback.Run(handle.Pass());
}
bool ChannelMojo::Connect() {
......
......@@ -51,6 +51,12 @@ class IPC_MOJO_EXPORT ChannelMojo
public MojoBootstrap::Delegate,
public NON_EXPORTED_BASE(internal::MessagePipeReader::Delegate) {
public:
using CreateMessagingPipeCallback =
base::Callback<void(mojo::ScopedMessagePipeHandle)>;
using CreateMessagingPipeOnIOThreadCallback =
base::Callback<void(mojo::ScopedMessagePipeHandle,
mojo::embedder::ChannelInfo*)>;
class Delegate {
public:
virtual ~Delegate() {}
......@@ -125,8 +131,8 @@ class IPC_MOJO_EXPORT ChannelMojo
Mode mode,
Listener* listener);
mojo::ScopedMessagePipeHandle CreateMessagingPipe(
mojo::embedder::ScopedPlatformHandle handle);
void CreateMessagingPipe(mojo::embedder::ScopedPlatformHandle handle,
const CreateMessagingPipeCallback& callback);
void InitMessageReader(mojo::ScopedMessagePipeHandle pipe, int32_t peer_pid);
Listener* listener() const { return listener_; }
......@@ -134,7 +140,12 @@ class IPC_MOJO_EXPORT ChannelMojo
private:
struct ChannelInfoDeleter {
explicit ChannelInfoDeleter(scoped_refptr<base::TaskRunner> io_runner);
~ChannelInfoDeleter();
void operator()(mojo::embedder::ChannelInfo* ptr) const;
scoped_refptr<base::TaskRunner> io_runner;
};
// ChannelMojo needs to kill its MessagePipeReader in delayed manner
......@@ -144,6 +155,14 @@ class IPC_MOJO_EXPORT ChannelMojo
void InitOnIOThread(ChannelMojo::Delegate* delegate);
static void CreateMessagingPipeOnIOThread(
mojo::embedder::ScopedPlatformHandle handle,
scoped_refptr<base::TaskRunner> callback_runner,
const CreateMessagingPipeOnIOThreadCallback& callback);
void OnMessagingPipeCreated(const CreateMessagingPipeCallback& callback,
mojo::ScopedMessagePipeHandle handle,
mojo::embedder::ChannelInfo* channel_info);
scoped_ptr<MojoBootstrap> bootstrap_;
base::WeakPtr<Delegate> delegate_;
Mode mode_;
......
......@@ -7,10 +7,12 @@
#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/thread_task_runner_handle.h"
#include "third_party/mojo/src/mojo/edk/embedder/embedder.h"
#include "third_party/mojo/src/mojo/edk/embedder/process_delegate.h"
......@@ -22,80 +24,155 @@ class IPCSupportInitializer : public mojo::embedder::ProcessDelegate {
public:
IPCSupportInitializer()
: init_count_(0),
shutting_down_(false) {
}
~IPCSupportInitializer() override {}
shutting_down_(false),
was_shut_down_(false),
observer_(nullptr),
weak_factory_(this) {}
void Init(scoped_refptr<base::TaskRunner> io_thread_task_runner) {
base::AutoLock locker(lock_);
DCHECK((init_count_ == 0 && !io_thread_task_runner_) ||
io_thread_task_runner_ == io_thread_task_runner);
if (shutting_down_) {
// If reinitialized before a pending shutdown task is executed, we
// effectively cancel the shutdown task.
DCHECK(init_count_ == 1);
shutting_down_ = false;
return;
}
~IPCSupportInitializer() override { DCHECK(!observer_); }
init_count_++;
if (init_count_ == 1) {
io_thread_task_runner_ = io_thread_task_runner;
mojo::embedder::InitIPCSupport(mojo::embedder::ProcessType::NONE,
io_thread_task_runner_,
this, io_thread_task_runner_,
mojo::embedder::ScopedPlatformHandle());
}
}
void Init(scoped_refptr<base::TaskRunner> io_thread_task_runner);
void ShutDown();
void ShutDown() {
base::AutoLock locker(lock_);
DCHECK(init_count_ > 0);
DCHECK(!shutting_down_);
// Forces the initializer to shut down even if scopers are still holding it.
void ForceShutdown();
if (init_count_ > 1) {
init_count_--;
return;
private:
// This watches for destruction of the MessageLoop that IPCSupportInitializer
// uses for IO, and guarantees that the initializer is shut down if it still
// exists when the loop is being destroyed.
class MessageLoopObserver : public base::MessageLoop::DestructionObserver {
public:
MessageLoopObserver(
scoped_refptr<base::TaskRunner> initializer_task_runner,
base::WeakPtr<IPCSupportInitializer> weak_initializer)
: initializer_task_runner_(initializer_task_runner),
weak_initializer_(weak_initializer) {}
~MessageLoopObserver() override {
base::MessageLoop::current()->RemoveDestructionObserver(this);
}
shutting_down_ = true;
if (base::MessageLoop::current() &&
base::MessageLoop::current()->task_runner() == io_thread_task_runner_) {
base::AutoUnlock unlocker_(lock_);
ShutDownOnIOThread();
} else {
io_thread_task_runner_->PostTask(
private:
// base::MessageLoop::DestructionObserver:
void WillDestroyCurrentMessageLoop() override {
initializer_task_runner_->PostTask(
FROM_HERE,
base::Bind(&IPCSupportInitializer::ShutDownOnIOThread,
base::Unretained(this)));
base::Bind(&IPCSupportInitializer::ForceShutdown, weak_initializer_));
}
}
private:
void ShutDownOnIOThread() {
base::AutoLock locker(lock_);
if (shutting_down_) {
DCHECK(init_count_ == 1);
mojo::embedder::ShutdownIPCSupportOnIOThread();
init_count_ = 0;
shutting_down_ = false;
io_thread_task_runner_ = nullptr;
}
}
scoped_refptr<base::TaskRunner> initializer_task_runner_;
base::WeakPtr<IPCSupportInitializer> weak_initializer_;
DISALLOW_COPY_AND_ASSIGN(MessageLoopObserver);
};
void ShutDownOnIOThread();
// mojo::embedder::ProcessDelegate:
void OnShutdownComplete() override {}
static void WatchMessageLoopOnIOThread(MessageLoopObserver* observer);
base::Lock lock_;
size_t init_count_;
bool shutting_down_;
// This is used to track whether shutdown has occurred yet, since we can be
// shut down by either the scoper or IO MessageLoop destruction.
bool was_shut_down_;
// The message loop destruction observer we have watching our IO loop. This
// is created on the initializer's own thread but is used and destroyed on the
// IO thread.
MessageLoopObserver* observer_;
scoped_refptr<base::TaskRunner> io_thread_task_runner_;
base::WeakPtrFactory<IPCSupportInitializer> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(IPCSupportInitializer);
};
void IPCSupportInitializer::Init(
scoped_refptr<base::TaskRunner> io_thread_task_runner) {
base::AutoLock locker(lock_);
DCHECK((init_count_ == 0 && !io_thread_task_runner_) ||
io_thread_task_runner_ == io_thread_task_runner);
if (shutting_down_) {
// If reinitialized before a pending shutdown task is executed, we
// effectively cancel the shutdown task.
DCHECK(init_count_ == 1);
shutting_down_ = false;
return;
}
init_count_++;
if (init_count_ == 1) {
was_shut_down_ = false;
observer_ = new MessageLoopObserver(base::ThreadTaskRunnerHandle::Get(),
weak_factory_.GetWeakPtr());
io_thread_task_runner_ = io_thread_task_runner;
io_thread_task_runner_->PostTask(
FROM_HERE, base::Bind(&WatchMessageLoopOnIOThread, observer_));
mojo::embedder::InitIPCSupport(
mojo::embedder::ProcessType::NONE, io_thread_task_runner_, this,
io_thread_task_runner_, mojo::embedder::ScopedPlatformHandle());
}
}
void IPCSupportInitializer::ShutDown() {
{
base::AutoLock locker(lock_);
if (shutting_down_ || was_shut_down_)
return;
DCHECK(init_count_ > 0);
if (init_count_ > 1) {
init_count_--;
return;
}
}
ForceShutdown();
}
void IPCSupportInitializer::ForceShutdown() {
base::AutoLock locker(lock_);
if (shutting_down_ || was_shut_down_)
return;
shutting_down_ = true;
if (base::MessageLoop::current() &&
base::MessageLoop::current()->task_runner() == io_thread_task_runner_) {
base::AutoUnlock unlocker_(lock_);
ShutDownOnIOThread();
} else {
io_thread_task_runner_->PostTask(
FROM_HERE, base::Bind(&IPCSupportInitializer::ShutDownOnIOThread,
base::Unretained(this)));
}
}
void IPCSupportInitializer::ShutDownOnIOThread() {
base::AutoLock locker(lock_);
if (shutting_down_ && !was_shut_down_) {
mojo::embedder::ShutdownIPCSupportOnIOThread();
init_count_ = 0;
shutting_down_ = false;
io_thread_task_runner_ = nullptr;
was_shut_down_ = true;
if (observer_) {
delete observer_;
observer_ = nullptr;
}
}
}
// static
void IPCSupportInitializer::WatchMessageLoopOnIOThread(
MessageLoopObserver* observer) {
base::MessageLoop::current()->AddDestructionObserver(observer);
}
base::LazyInstance<IPCSupportInitializer>::Leaky ipc_support_initializer;
} // namespace
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment