Commit 46cde1de authored by Ken Rockot's avatar Ken Rockot Committed by Commit Bot

[mojo] Trim ThreadSafeForwarder template

Almost of all of the code in this class is shared acoss all instances of
the tempate. This introduces a ThreadSafeForwarderBase class and moves
the logic into it.

No functional changes.

Bug: 1016022
Change-Id: I515c472f8a6fdc97fdd25d90db596d23a89ce543
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1871195Reviewed-by: default avatarOksana Zhuravlova <oksamyt@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/master@{#708012}
parent ab8f999c
...@@ -202,6 +202,8 @@ component("bindings") { ...@@ -202,6 +202,8 @@ component("bindings") {
"sync_event_watcher.h", "sync_event_watcher.h",
"sync_handle_registry.h", "sync_handle_registry.h",
"sync_handle_watcher.h", "sync_handle_watcher.h",
"thread_safe_forwarder_base.cc",
"thread_safe_forwarder_base.h",
"thread_safe_interface_ptr.h", "thread_safe_interface_ptr.h",
"unique_associated_receiver_set.h", "unique_associated_receiver_set.h",
"unique_ptr_impl_ref_traits.h", "unique_ptr_impl_ref_traits.h",
......
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/bindings/thread_safe_forwarder_base.h"
#include <utility>
#include "base/logging.h"
#include "base/stl_util.h"
#include "mojo/public/cpp/bindings/sync_call_restrictions.h"
#include "mojo/public/cpp/bindings/sync_event_watcher.h"
namespace mojo {
ThreadSafeForwarderBase::ThreadSafeForwarderBase(
scoped_refptr<base::SequencedTaskRunner> task_runner,
const ForwardMessageCallback& forward,
const ForwardMessageWithResponderCallback& forward_with_responder,
const AssociatedGroup& associated_group)
: task_runner_(std::move(task_runner)),
forward_(forward),
forward_with_responder_(forward_with_responder),
associated_group_(associated_group),
sync_calls_(new InProgressSyncCalls()) {}
ThreadSafeForwarderBase::~ThreadSafeForwarderBase() {
// If there are ongoing sync calls signal their completion now.
base::AutoLock l(sync_calls_->lock);
for (auto* pending_response : sync_calls_->pending_responses)
pending_response->event.Signal();
}
bool ThreadSafeForwarderBase::PrefersSerializedMessages() {
// NOTE: This means SharedRemote etc will ignore lazy serialization hints and
// will always eagerly serialize messages.
return true;
}
bool ThreadSafeForwarderBase::Accept(Message* message) {
if (!message->associated_endpoint_handles()->empty()) {
// If this DCHECK fails, it is likely because:
// - This is a non-associated interface pointer setup using
// PtrWrapper::BindOnTaskRunner(
// InterfacePtrInfo<InterfaceType> ptr_info);
// Please see the TODO in that method.
// - This is an associated interface which hasn't been associated with a
// message pipe. In other words, the corresponding
// AssociatedInterfaceRequest hasn't been sent.
DCHECK(associated_group_.GetController());
message->SerializeAssociatedEndpointHandles(
associated_group_.GetController());
}
task_runner_->PostTask(FROM_HERE,
base::BindOnce(forward_, base::Passed(message)));
return true;
}
bool ThreadSafeForwarderBase::AcceptWithResponder(
Message* message,
std::unique_ptr<MessageReceiver> responder) {
if (!message->associated_endpoint_handles()->empty()) {
// Please see comment for the DCHECK in the previous method.
DCHECK(associated_group_.GetController());
message->SerializeAssociatedEndpointHandles(
associated_group_.GetController());
}
// Async messages are always posted (even if |task_runner_| runs tasks on
// this sequence) to guarantee that two async calls can't be reordered.
if (!message->has_flag(Message::kFlagIsSync)) {
auto reply_forwarder =
std::make_unique<ForwardToCallingThread>(std::move(responder));
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(forward_with_responder_, base::Passed(message),
std::move(reply_forwarder)));
return true;
}
SyncCallRestrictions::AssertSyncCallAllowed();
// If the InterfacePtr is bound to this sequence, dispatch it directly.
if (task_runner_->RunsTasksInCurrentSequence()) {
forward_with_responder_.Run(std::move(*message), std::move(responder));
return true;
}
// If the Remote is bound on another sequence, post the call.
auto response = base::MakeRefCounted<SyncResponseInfo>();
auto response_signaler = std::make_unique<SyncResponseSignaler>(response);
task_runner_->PostTask(
FROM_HERE, base::BindOnce(forward_with_responder_, base::Passed(message),
std::move(response_signaler)));
// Save the pending SyncResponseInfo so that if the sync call deletes
// |this|, we can signal the completion of the call to return from
// SyncWatch().
auto sync_calls = sync_calls_;
{
base::AutoLock l(sync_calls->lock);
sync_calls->pending_responses.push_back(response.get());
}
auto assign_true = [](bool* b) { *b = true; };
bool event_signaled = false;
SyncEventWatcher watcher(&response->event,
base::Bind(assign_true, &event_signaled));
const bool* stop_flags[] = {&event_signaled};
watcher.SyncWatch(stop_flags, 1);
{
base::AutoLock l(sync_calls->lock);
base::Erase(sync_calls->pending_responses, response.get());
}
if (response->received)
ignore_result(responder->Accept(&response->message));
return true;
}
ThreadSafeForwarderBase::SyncResponseInfo::SyncResponseInfo() = default;
ThreadSafeForwarderBase::SyncResponseInfo::~SyncResponseInfo() = default;
ThreadSafeForwarderBase::SyncResponseSignaler::SyncResponseSignaler(
scoped_refptr<SyncResponseInfo> response)
: response_(response) {}
ThreadSafeForwarderBase::SyncResponseSignaler::~SyncResponseSignaler() {
// If Accept() was not called we must still notify the waiter that the
// sync call is finished.
if (response_)
response_->event.Signal();
}
bool ThreadSafeForwarderBase::SyncResponseSignaler::Accept(Message* message) {
response_->message = std::move(*message);
response_->received = true;
response_->event.Signal();
response_ = nullptr;
return true;
}
ThreadSafeForwarderBase::InProgressSyncCalls::InProgressSyncCalls() = default;
ThreadSafeForwarderBase::InProgressSyncCalls::~InProgressSyncCalls() = default;
ThreadSafeForwarderBase::ForwardToCallingThread::ForwardToCallingThread(
std::unique_ptr<MessageReceiver> responder)
: responder_(std::move(responder)),
caller_task_runner_(base::SequencedTaskRunnerHandle::Get()) {}
ThreadSafeForwarderBase::ForwardToCallingThread::~ForwardToCallingThread() {
caller_task_runner_->DeleteSoon(FROM_HERE, std::move(responder_));
}
bool ThreadSafeForwarderBase::ForwardToCallingThread::Accept(Message* message) {
// The current instance will be deleted when this method returns, so we
// have to relinquish the responder's ownership so it does not get
// deleted.
caller_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&ForwardToCallingThread::CallAcceptAndDeleteResponder,
std::move(responder_), std::move(*message)));
return true;
}
// static
void ThreadSafeForwarderBase::ForwardToCallingThread::
CallAcceptAndDeleteResponder(std::unique_ptr<MessageReceiver> responder,
Message message) {
ignore_result(responder->Accept(&message));
}
} // namespace mojo
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_FORWARDER_BASE_H_
#define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_FORWARDER_BASE_H_
#include <memory>
#include <vector>
#include "base/callback.h"
#include "base/component_export.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_refptr.h"
#include "base/sequenced_task_runner.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/message.h"
namespace mojo {
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) ThreadSafeForwarderBase
: public MessageReceiverWithResponder {
public:
using ForwardMessageCallback = base::Callback<void(Message)>;
using ForwardMessageWithResponderCallback =
base::Callback<void(Message, std::unique_ptr<MessageReceiver>)>;
ThreadSafeForwarderBase(
scoped_refptr<base::SequencedTaskRunner> task_runner,
const ForwardMessageCallback& forward,
const ForwardMessageWithResponderCallback& forward_with_responder,
const AssociatedGroup& associated_group);
~ThreadSafeForwarderBase() override;
// MessageReceiverWithResponder implementation:
bool PrefersSerializedMessages() override;
bool Accept(Message* message) override;
bool AcceptWithResponder(Message* message,
std::unique_ptr<MessageReceiver> responder) override;
private:
// Data that we need to share between the sequences involved in a sync call.
struct SyncResponseInfo
: public base::RefCountedThreadSafe<SyncResponseInfo> {
SyncResponseInfo();
Message message;
bool received = false;
base::WaitableEvent event{base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED};
private:
friend class base::RefCountedThreadSafe<SyncResponseInfo>;
~SyncResponseInfo();
};
// A MessageReceiver that signals |response| when it either accepts the
// response message, or is destructed.
class SyncResponseSignaler : public MessageReceiver {
public:
explicit SyncResponseSignaler(scoped_refptr<SyncResponseInfo> response);
~SyncResponseSignaler() override;
bool Accept(Message* message) override;
private:
scoped_refptr<SyncResponseInfo> response_;
};
// A record of the pending sync responses for canceling pending sync calls
// when the owning ThreadSafeForwarder is destructed.
struct InProgressSyncCalls
: public base::RefCountedThreadSafe<InProgressSyncCalls> {
InProgressSyncCalls();
// |lock| protects access to |pending_responses|.
base::Lock lock;
std::vector<SyncResponseInfo*> pending_responses;
private:
friend class base::RefCountedThreadSafe<InProgressSyncCalls>;
~InProgressSyncCalls();
};
class ForwardToCallingThread : public MessageReceiver {
public:
explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder);
~ForwardToCallingThread() override;
private:
bool Accept(Message* message) override;
static void CallAcceptAndDeleteResponder(
std::unique_ptr<MessageReceiver> responder,
Message message);
std::unique_ptr<MessageReceiver> responder_;
scoped_refptr<base::SequencedTaskRunner> caller_task_runner_;
};
const scoped_refptr<base::SequencedTaskRunner> task_runner_;
const ForwardMessageCallback forward_;
const ForwardMessageWithResponderCallback forward_with_responder_;
AssociatedGroup associated_group_;
scoped_refptr<InProgressSyncCalls> sync_calls_;
DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarderBase);
};
} // namespace mojo
#endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_FORWARDER_BASE_H_
...@@ -6,13 +6,11 @@ ...@@ -6,13 +6,11 @@
#define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_
#include <memory> #include <memory>
#include <utility>
#include "base/bind.h" #include "base/bind.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/stl_util.h"
#include "base/synchronization/waitable_event.h"
#include "base/task_runner.h" #include "base/task_runner.h"
#include "base/threading/sequenced_task_runner_handle.h" #include "base/threading/sequenced_task_runner_handle.h"
#include "mojo/public/cpp/bindings/associated_group.h" #include "mojo/public/cpp/bindings/associated_group.h"
...@@ -21,7 +19,7 @@ ...@@ -21,7 +19,7 @@
#include "mojo/public/cpp/bindings/interface_ptr.h" #include "mojo/public/cpp/bindings/interface_ptr.h"
#include "mojo/public/cpp/bindings/message.h" #include "mojo/public/cpp/bindings/message.h"
#include "mojo/public/cpp/bindings/sync_call_restrictions.h" #include "mojo/public/cpp/bindings/sync_call_restrictions.h"
#include "mojo/public/cpp/bindings/sync_event_watcher.h" #include "mojo/public/cpp/bindings/thread_safe_forwarder_base.h"
// ThreadSafeInterfacePtr wraps a non-thread-safe InterfacePtr and proxies // ThreadSafeInterfacePtr wraps a non-thread-safe InterfacePtr and proxies
// messages to it. Async calls are posted to the sequence that the InteracePtr // messages to it. Async calls are posted to the sequence that the InteracePtr
...@@ -41,12 +39,9 @@ namespace mojo { ...@@ -41,12 +39,9 @@ namespace mojo {
// type may be useful if you need/want to manually manage the lifetime of the // type may be useful if you need/want to manually manage the lifetime of the
// underlying proxy object which will be used to ultimately send messages. // underlying proxy object which will be used to ultimately send messages.
template <typename Interface> template <typename Interface>
class ThreadSafeForwarder : public MessageReceiverWithResponder { class ThreadSafeForwarder : public ThreadSafeForwarderBase {
public: public:
using ProxyType = typename Interface::Proxy_; using ProxyType = typename Interface::Proxy_;
using ForwardMessageCallback = base::Callback<void(Message)>;
using ForwardMessageWithResponderCallback =
base::Callback<void(Message, std::unique_ptr<MessageReceiver>)>;
// Constructs a ThreadSafeForwarder through which Messages are forwarded to // Constructs a ThreadSafeForwarder through which Messages are forwarded to
// |forward| or |forward_with_responder| by posting to |task_runner|. // |forward| or |forward_with_responder| by posting to |task_runner|.
...@@ -55,205 +50,22 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { ...@@ -55,205 +50,22 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
// if any, back to the sequence which called the corresponding interface // if any, back to the sequence which called the corresponding interface
// method. // method.
ThreadSafeForwarder( ThreadSafeForwarder(
const scoped_refptr<base::SequencedTaskRunner>& task_runner, scoped_refptr<base::SequencedTaskRunner> task_runner,
const ForwardMessageCallback& forward, const ForwardMessageCallback& forward,
const ForwardMessageWithResponderCallback& forward_with_responder, const ForwardMessageWithResponderCallback& forward_with_responder,
const AssociatedGroup& associated_group) const AssociatedGroup& associated_group)
: proxy_(this), : ThreadSafeForwarderBase(std::move(task_runner),
task_runner_(task_runner), forward,
forward_(forward), forward_with_responder,
forward_with_responder_(forward_with_responder), associated_group),
associated_group_(associated_group), proxy_(this) {}
sync_calls_(new InProgressSyncCalls()) {}
~ThreadSafeForwarder() override = default;
~ThreadSafeForwarder() override {
// If there are ongoing sync calls signal their completion now.
base::AutoLock l(sync_calls_->lock);
for (const auto& pending_response : sync_calls_->pending_responses)
pending_response->event.Signal();
}
ProxyType& proxy() { return proxy_; } ProxyType& proxy() { return proxy_; }
private: private:
// MessageReceiverWithResponder implementation:
bool PrefersSerializedMessages() override {
// TSIP is primarily used because it emulates legacy IPC threading behavior.
// In practice this means it's only for cross-process messaging and we can
// just always assume messages should be serialized.
return true;
}
bool Accept(Message* message) override {
if (!message->associated_endpoint_handles()->empty()) {
// If this DCHECK fails, it is likely because:
// - This is a non-associated interface pointer setup using
// PtrWrapper::BindOnTaskRunner(
// InterfacePtrInfo<InterfaceType> ptr_info);
// Please see the TODO in that method.
// - This is an associated interface which hasn't been associated with a
// message pipe. In other words, the corresponding
// AssociatedInterfaceRequest hasn't been sent.
DCHECK(associated_group_.GetController());
message->SerializeAssociatedEndpointHandles(
associated_group_.GetController());
}
task_runner_->PostTask(FROM_HERE,
base::BindOnce(forward_, base::Passed(message)));
return true;
}
bool AcceptWithResponder(
Message* message,
std::unique_ptr<MessageReceiver> responder) override {
if (!message->associated_endpoint_handles()->empty()) {
// Please see comment for the DCHECK in the previous method.
DCHECK(associated_group_.GetController());
message->SerializeAssociatedEndpointHandles(
associated_group_.GetController());
}
// Async messages are always posted (even if |task_runner_| runs tasks on
// this sequence) to guarantee that two async calls can't be reordered.
if (!message->has_flag(Message::kFlagIsSync)) {
auto reply_forwarder =
std::make_unique<ForwardToCallingThread>(std::move(responder));
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(forward_with_responder_, base::Passed(message),
std::move(reply_forwarder)));
return true;
}
SyncCallRestrictions::AssertSyncCallAllowed();
// If the InterfacePtr is bound to this sequence, dispatch it directly.
if (task_runner_->RunsTasksInCurrentSequence()) {
forward_with_responder_.Run(std::move(*message), std::move(responder));
return true;
}
// If the InterfacePtr is bound on another sequence, post the call.
// TODO(yzshen, watk): We block both this sequence and the InterfacePtr
// sequence. Ideally only this sequence would block.
auto response = base::MakeRefCounted<SyncResponseInfo>();
auto response_signaler = std::make_unique<SyncResponseSignaler>(response);
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(forward_with_responder_, base::Passed(message),
std::move(response_signaler)));
// Save the pending SyncResponseInfo so that if the sync call deletes
// |this|, we can signal the completion of the call to return from
// SyncWatch().
auto sync_calls = sync_calls_;
{
base::AutoLock l(sync_calls->lock);
sync_calls->pending_responses.push_back(response.get());
}
auto assign_true = [](bool* b) { *b = true; };
bool event_signaled = false;
SyncEventWatcher watcher(&response->event,
base::Bind(assign_true, &event_signaled));
const bool* stop_flags[] = {&event_signaled};
watcher.SyncWatch(stop_flags, 1);
{
base::AutoLock l(sync_calls->lock);
base::Erase(sync_calls->pending_responses, response.get());
}
if (response->received)
ignore_result(responder->Accept(&response->message));
return true;
}
// Data that we need to share between the sequences involved in a sync call.
struct SyncResponseInfo
: public base::RefCountedThreadSafe<SyncResponseInfo> {
Message message;
bool received = false;
base::WaitableEvent event{base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED};
private:
friend class base::RefCountedThreadSafe<SyncResponseInfo>;
};
// A MessageReceiver that signals |response| when it either accepts the
// response message, or is destructed.
class SyncResponseSignaler : public MessageReceiver {
public:
explicit SyncResponseSignaler(scoped_refptr<SyncResponseInfo> response)
: response_(response) {}
~SyncResponseSignaler() override {
// If Accept() was not called we must still notify the waiter that the
// sync call is finished.
if (response_)
response_->event.Signal();
}
bool Accept(Message* message) override {
response_->message = std::move(*message);
response_->received = true;
response_->event.Signal();
response_ = nullptr;
return true;
}
private:
scoped_refptr<SyncResponseInfo> response_;
};
// A record of the pending sync responses for canceling pending sync calls
// when the owning ThreadSafeForwarder is destructed.
struct InProgressSyncCalls
: public base::RefCountedThreadSafe<InProgressSyncCalls> {
// |lock| protects access to |pending_responses|.
base::Lock lock;
std::vector<SyncResponseInfo*> pending_responses;
};
class ForwardToCallingThread : public MessageReceiver {
public:
explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder)
: responder_(std::move(responder)),
caller_task_runner_(base::SequencedTaskRunnerHandle::Get()) {}
~ForwardToCallingThread() override {
caller_task_runner_->DeleteSoon(FROM_HERE, std::move(responder_));
}
private:
bool Accept(Message* message) override {
// The current instance will be deleted when this method returns, so we
// have to relinquish the responder's ownership so it does not get
// deleted.
caller_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&ForwardToCallingThread::CallAcceptAndDeleteResponder,
std::move(responder_), std::move(*message)));
return true;
}
static void CallAcceptAndDeleteResponder(
std::unique_ptr<MessageReceiver> responder,
Message message) {
ignore_result(responder->Accept(&message));
}
std::unique_ptr<MessageReceiver> responder_;
scoped_refptr<base::SequencedTaskRunner> caller_task_runner_;
};
ProxyType proxy_; ProxyType proxy_;
const scoped_refptr<base::SequencedTaskRunner> task_runner_;
const ForwardMessageCallback forward_;
const ForwardMessageWithResponderCallback forward_with_responder_;
AssociatedGroup associated_group_;
scoped_refptr<InProgressSyncCalls> sync_calls_;
DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder);
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment