Commit 5e23e598 authored by Ken Rockot's avatar Ken Rockot Committed by Commit Bot

[mojo] Fix sync calls on SharedRemote

This fixes some edge cases which can cause SharedRemote to either block
unnecessarily on its bound sequence or fail to block as intended on its
bound sequence, depending on how it's constructed and called into.

We want to repurpose the underlying Remote's sync waiting behavior when
the SharedRemote makes a sync call from the bound sequence, but when a
call is made from any other sequence, the SharedRemote must implement
its own waiting mechanism.

This change ensures that SharedRemote configures its underlying Remote
to disable sync waiting by default, and it's selectively re-enabled only
around any (potentially nested) sync calls made from the bound sequence.

Fixed: 1102921
Change-Id: I603e48e38d859c43d7b732a91445314714ea0591
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2286051
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: default avatarErik Chen <erikchen@chromium.org>
Cr-Commit-Position: refs/heads/master@{#786180}
parent 31ac9fdc
...@@ -261,7 +261,7 @@ ChannelMojo::CreateThreadSafeChannel() { ...@@ -261,7 +261,7 @@ ChannelMojo::CreateThreadSafeChannel() {
base::BindRepeating( base::BindRepeating(
&ChannelMojo::ForwardMessageWithResponderFromThreadSafePtr, &ChannelMojo::ForwardMessageWithResponderFromThreadSafePtr,
weak_ptr_), weak_ptr_),
*bootstrap_->GetAssociatedGroup()); base::DoNothing(), *bootstrap_->GetAssociatedGroup());
} }
void ChannelMojo::OnPeerPidReceived(int32_t peer_pid) { void ChannelMojo::OnPeerPidReceived(int32_t peer_pid) {
......
...@@ -66,6 +66,11 @@ class SharedRemoteBase ...@@ -66,6 +66,11 @@ class SharedRemoteBase
: RemoteWrapper(base::SequencedTaskRunnerHandle::Get()) { : RemoteWrapper(base::SequencedTaskRunnerHandle::Get()) {
remote_ = std::move(remote); remote_ = std::move(remote);
associated_group_ = *remote_.internal_state()->associated_group(); associated_group_ = *remote_.internal_state()->associated_group();
// By default we force all messages to behave as if async within the
// Remote, as SharedRemote implements its own waiting mechanism to block
// only the calling thread when making sync calls.
remote_.internal_state()->force_outgoing_messages_async(true);
} }
explicit RemoteWrapper(scoped_refptr<base::SequencedTaskRunner> task_runner) explicit RemoteWrapper(scoped_refptr<base::SequencedTaskRunner> task_runner)
...@@ -88,6 +93,7 @@ class SharedRemoteBase ...@@ -88,6 +93,7 @@ class SharedRemoteBase
return std::make_unique<ThreadSafeForwarder<InterfaceType>>( return std::make_unique<ThreadSafeForwarder<InterfaceType>>(
task_runner_, base::BindRepeating(&RemoteWrapper::Accept, this), task_runner_, base::BindRepeating(&RemoteWrapper::Accept, this),
base::BindRepeating(&RemoteWrapper::AcceptWithResponder, this), base::BindRepeating(&RemoteWrapper::AcceptWithResponder, this),
base::BindRepeating(&RemoteWrapper::ForceAsyncSend, this),
associated_group_); associated_group_);
} }
...@@ -127,9 +133,9 @@ class SharedRemoteBase ...@@ -127,9 +133,9 @@ class SharedRemoteBase
DCHECK(task_runner_->RunsTasksInCurrentSequence()); DCHECK(task_runner_->RunsTasksInCurrentSequence());
remote_.Bind(std::move(remote)); remote_.Bind(std::move(remote));
// The ThreadSafeForwarder will always block the calling thread on a // By default we force all messages to behave as if async within the
// reply, so there's no need for the endpoint to employ its own sync // Remote, as SharedRemote implements its own waiting mechanism to block
// waiting logic. // only the calling thread when making sync calls.
remote_.internal_state()->force_outgoing_messages_async(true); remote_.internal_state()->force_outgoing_messages_async(true);
} }
...@@ -143,6 +149,10 @@ class SharedRemoteBase ...@@ -143,6 +149,10 @@ class SharedRemoteBase
std::move(message), std::move(responder)); std::move(message), std::move(responder));
} }
void ForceAsyncSend(bool force) {
remote_.internal_state()->force_outgoing_messages_async(force);
}
void DeleteOnCorrectThread() const { void DeleteOnCorrectThread() const {
if (!task_runner_->RunsTasksInCurrentSequence()) { if (!task_runner_->RunsTasksInCurrentSequence()) {
// NOTE: This is only called when there are no more references to // NOTE: This is only called when there are no more references to
...@@ -220,19 +230,30 @@ class SharedRemoteBase ...@@ -220,19 +230,30 @@ class SharedRemoteBase
template <typename Interface> template <typename Interface>
class SharedRemote { class SharedRemote {
public: public:
// Constructs an unbound SharedRemote. This object cannot issue Interface
// method calls and does not schedule any tasks. A default-constructed
// SharedRemote may be replaced with a bound one via copy- or move-assignment.
SharedRemote() = default; SharedRemote() = default;
explicit SharedRemote(PendingRemote<Interface> pending_remote)
: remote_(pending_remote.is_valid() // Constructs a SharedRemote bound to `pending_remote` on the calling
? SharedRemoteBase<Remote<Interface>>::Create( // sequence. See `Bind()` below for more details.
std::move(pending_remote)) explicit SharedRemote(PendingRemote<Interface> pending_remote) {
: nullptr) {} Bind(std::move(pending_remote), nullptr);
}
// Constructs a SharedRemote bound to `pending_remote` on the sequence given
// by `bind_task_runner`. See `Bind()` below for more details.
SharedRemote(PendingRemote<Interface> pending_remote, SharedRemote(PendingRemote<Interface> pending_remote,
scoped_refptr<base::SequencedTaskRunner> bind_task_runner) scoped_refptr<base::SequencedTaskRunner> bind_task_runner) {
: remote_(pending_remote.is_valid() Bind(std::move(pending_remote), std::move(bind_task_runner));
? SharedRemoteBase<Remote<Interface>>::Create( }
std::move(pending_remote),
std::move(bind_task_runner)) // SharedRemote supports both copy and move construction and assignment. These
: nullptr) {} // are explicitly defaulted here for clarity.
SharedRemote(const SharedRemote&) = default;
SharedRemote(SharedRemote&&) = default;
SharedRemote& operator=(const SharedRemote&) = default;
SharedRemote& operator=(SharedRemote&&) = default;
bool is_bound() const { return remote_ != nullptr; } bool is_bound() const { return remote_ != nullptr; }
explicit operator bool() const { return is_bound(); } explicit operator bool() const { return is_bound(); }
...@@ -253,6 +274,34 @@ class SharedRemote { ...@@ -253,6 +274,34 @@ class SharedRemote {
// underlying endpoint. // underlying endpoint.
void reset() { remote_.reset(); } void reset() { remote_.reset(); }
// Binds this SharedRemote to `pending_remote` on the sequence given by
// `bind_task_runner`, or the calling sequence if `bind_task_runner` is null.
// Once bound, the SharedRemote may be used to send messages on the underlying
// Remote. Messages always bounce through `bind_task_runner` before sending,
// unless the caller is issuing a [Sync] call from `bind_task_runner` already;
// in which case this behaves exactly like a regular Remote for that call.
//
// Any reply received by the SharedRemote is dispatched to whatever
// SequencedTaskRunner was current when the corresponding request was made.
//
// A bound SharedRemote may be copied any number of times, to any number of
// threads. Each copy sends messages through the same underlying Remote, after
// bouncing through the same `bind_task_runner`.
//
// If this SharedRemote was already bound, it will be effectively unbound by
// this call and re-bound to `pending_remote`. Any prior copies made are NOT
// affected and will retain their reference to the original Remote.
void Bind(PendingRemote<Interface> pending_remote,
scoped_refptr<base::SequencedTaskRunner> bind_task_runner) {
if (bind_task_runner && pending_remote) {
remote_ = SharedRemoteBase<Remote<Interface>>::Create(
std::move(pending_remote), std::move(bind_task_runner));
} else if (pending_remote) {
remote_ = SharedRemoteBase<Remote<Interface>>::Create(
std::move(pending_remote));
}
}
private: private:
scoped_refptr<SharedRemoteBase<Remote<Interface>>> remote_; scoped_refptr<SharedRemoteBase<Remote<Interface>>> remote_;
}; };
......
...@@ -1003,14 +1003,13 @@ TEST_P(RemoteTest, SharedRemoteSyncOnlyBlocksCallingSequence) { ...@@ -1003,14 +1003,13 @@ TEST_P(RemoteTest, SharedRemoteSyncOnlyBlocksCallingSequence) {
SharedRemote<mojom::SharedRemoteSyncTest> remote(std::move(pending_remote), SharedRemote<mojom::SharedRemoteSyncTest> remote(std::move(pending_remote),
bound_task_runner); bound_task_runner);
bound_task_runner->PostTask( bound_task_runner->PostTask(
FROM_HERE, FROM_HERE, base::BindOnce(
base::BindOnce( [](PendingReceiver<mojom::SharedRemoteSyncTest> receiver) {
[](mojo::PendingReceiver<mojom::SharedRemoteSyncTest> receiver) { MakeSelfOwnedReceiver(
mojo::MakeSelfOwnedReceiver( std::make_unique<SharedRemoteSyncTestImpl>(),
std::make_unique<SharedRemoteSyncTestImpl>(), std::move(receiver));
std::move(receiver)); },
}, std::move(receiver)));
std::move(receiver)));
int32_t value = 0; int32_t value = 0;
remote->Fetch(&value); remote->Fetch(&value);
...@@ -1024,6 +1023,70 @@ TEST_P(RemoteTest, SharedRemoteSyncOnlyBlocksCallingSequence) { ...@@ -1024,6 +1023,70 @@ TEST_P(RemoteTest, SharedRemoteSyncOnlyBlocksCallingSequence) {
task_environment()->RunUntilIdle(); task_environment()->RunUntilIdle();
} }
TEST_P(RemoteTest, SharedRemoteSyncCallsFromOffBoundConstructionSequence) {
// Regression test for https://crbug.com/1102921. Verifies that when
// bound to its construction sequence, a SharedRemote doesn't try blocking
// that sequence when a sync call is made from another sequence.
const scoped_refptr<base::SequencedTaskRunner> background_task_runner =
base::ThreadPool::CreateSequencedTaskRunner(
{base::WithBaseSyncPrimitives()});
// Ensure waiting on the main thread is not allowed so that blocking attempts
// will break the test.
base::DisallowBaseSyncPrimitives();
PendingRemote<mojom::SharedRemoteSyncTest> pending_remote;
SharedRemoteSyncTestImpl impl;
Receiver<mojom::SharedRemoteSyncTest> receiver(
&impl, pending_remote.InitWithNewPipeAndPassReceiver());
int32_t value = 0;
base::RunLoop loop;
base::OnceClosure quit = loop.QuitClosure();
SharedRemote<mojom::SharedRemoteSyncTest> remote(std::move(pending_remote));
background_task_runner->PostTask(
FROM_HERE, base::BindLambdaForTesting([remote, &value, &quit] {
EXPECT_TRUE(remote->Fetch(&value));
EXPECT_EQ(kMagicNumber, value);
std::move(quit).Run();
}));
loop.Run();
// TaskEnvironment teardown wants to block the main thread.
base::internal::ResetThreadRestrictionsForTesting();
}
TEST_P(RemoteTest, SharedRemoteSyncCallsFromBoundNonConstructionSequence) {
// Regression test for https://crbug.com/1102921. Verifies that when
// bound to some sequence other than that which constructed it, a SharedRemote
// properly blocks when making sync calls from the bound sequence.
const scoped_refptr<base::SequencedTaskRunner> background_task_runner =
base::ThreadPool::CreateSequencedTaskRunner(
{base::WithBaseSyncPrimitives()});
PendingRemote<mojom::SharedRemoteSyncTest> pending_remote;
SharedRemoteSyncTestImpl impl;
Receiver<mojom::SharedRemoteSyncTest> receiver(
&impl, pending_remote.InitWithNewPipeAndPassReceiver());
int32_t value = 0;
base::RunLoop loop;
base::OnceClosure quit = loop.QuitClosure();
SharedRemote<mojom::SharedRemoteSyncTest> remote(
std::move(pending_remote), std::move(background_task_runner));
background_task_runner->PostTask(
FROM_HERE, base::BindLambdaForTesting([remote, &value, &quit] {
EXPECT_TRUE(remote->Fetch(&value));
EXPECT_EQ(kMagicNumber, value);
std::move(quit).Run();
}));
loop.Run();
}
TEST_P(RemoteTest, RemoteSet) { TEST_P(RemoteTest, RemoteSet) {
std::vector<base::Optional<MathCalculatorImpl>> impls(3); std::vector<base::Optional<MathCalculatorImpl>> impls(3);
......
...@@ -17,10 +17,12 @@ ThreadSafeForwarderBase::ThreadSafeForwarderBase( ...@@ -17,10 +17,12 @@ ThreadSafeForwarderBase::ThreadSafeForwarderBase(
scoped_refptr<base::SequencedTaskRunner> task_runner, scoped_refptr<base::SequencedTaskRunner> task_runner,
ForwardMessageCallback forward, ForwardMessageCallback forward,
ForwardMessageWithResponderCallback forward_with_responder, ForwardMessageWithResponderCallback forward_with_responder,
ForceAsyncSendCallback force_async_send,
const AssociatedGroup& associated_group) const AssociatedGroup& associated_group)
: task_runner_(std::move(task_runner)), : task_runner_(std::move(task_runner)),
forward_(std::move(forward)), forward_(std::move(forward)),
forward_with_responder_(std::move(forward_with_responder)), forward_with_responder_(std::move(forward_with_responder)),
force_async_send_(std::move(force_async_send)),
associated_group_(associated_group), associated_group_(associated_group),
sync_calls_(new InProgressSyncCalls()) {} sync_calls_(new InProgressSyncCalls()) {}
...@@ -79,9 +81,21 @@ bool ThreadSafeForwarderBase::AcceptWithResponder( ...@@ -79,9 +81,21 @@ bool ThreadSafeForwarderBase::AcceptWithResponder(
SyncCallRestrictions::AssertSyncCallAllowed(); SyncCallRestrictions::AssertSyncCallAllowed();
// If the InterfacePtr is bound to this sequence, dispatch it directly. // If the Remote is bound to this sequence, send the message immediately and
// let Remote use its own internal sync waiting mechanism.
if (task_runner_->RunsTasksInCurrentSequence()) { if (task_runner_->RunsTasksInCurrentSequence()) {
const base::WeakPtr<ThreadSafeForwarderBase> weak_self =
weak_ptr_factory_.GetWeakPtr();
++sync_call_nesting_level_;
if (sync_call_nesting_level_ == 1)
force_async_send_.Run(false);
forward_with_responder_.Run(std::move(*message), std::move(responder)); forward_with_responder_.Run(std::move(*message), std::move(responder));
if (weak_self) {
// NOTE: |this| may be deleted within the callback run above.
--sync_call_nesting_level_;
if (!sync_call_nesting_level_)
force_async_send_.Run(true);
}
return true; return true;
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/memory/scoped_refptr.h" #include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/sequenced_task_runner.h" #include "base/sequenced_task_runner.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event.h"
...@@ -21,17 +22,48 @@ ...@@ -21,17 +22,48 @@
namespace mojo { namespace mojo {
// This class defines out-of-line logic common to the behavior of
// ThreadSafeForwarder<Interface>, which is in turn used to support the
// implementation of SharedRemote<Interface> and
// (deprecated) ThreadSafeInterfacePtr<Interface>.
//
// This object is sequence-affine and it provides an opaque interface to an
// underlying weakly-referenced interface proxy (e.g. a Remote) which may be
// bound on a different sequence and referenced weakly by any number of other
// ThreadSafeForwarders. The opaque interface is provide via a set of callbacks
// bound internally by e.g. SharedRemote or ThreadSafeInterfacePtr.
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) ThreadSafeForwarderBase class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) ThreadSafeForwarderBase
: public MessageReceiverWithResponder { : public MessageReceiverWithResponder {
public: public:
// A callback used to send a message on the underlying interface proxy. Used
// only for messages with no reply.
using ForwardMessageCallback = base::RepeatingCallback<void(Message)>; using ForwardMessageCallback = base::RepeatingCallback<void(Message)>;
// A callback used to send a message on the underlying interface proxy. Used
// only for messages with no reply.
using ForwardMessageWithResponderCallback = using ForwardMessageWithResponderCallback =
base::RepeatingCallback<void(Message, std::unique_ptr<MessageReceiver>)>; base::RepeatingCallback<void(Message, std::unique_ptr<MessageReceiver>)>;
// A callback used to reconfigure the underlying proxy by changing whether or
// not it can perform its own blocking waits for [Sync] message replies. When
// `force` is false, the proxy behaves normally and will block the calling
// thread when used to issue a sync message; when `force` is true however,
// [Sync] is effectively ignored when sending messages and the reply is
// received asynchronously. ThreadSafeForwarderBase uses this to disable
// normal synchronous behavior and implement its own sync waiting from the
// caller's thread rather than the proxy's bound thread.
using ForceAsyncSendCallback = base::RepeatingCallback<void(bool force)>;
// Constructs a new ThreadSafeForwarderBase which forwards requests to a proxy
// bound on `task_runner`. Forwarding is done opaquely via the callbacks given
// in `forward` (to send one-off messages), `forward_with_responder` (to send
// messages expecting replies), and `force_async_send` to control sync IPC
// behavior within the underlying proxy.
ThreadSafeForwarderBase( ThreadSafeForwarderBase(
scoped_refptr<base::SequencedTaskRunner> task_runner, scoped_refptr<base::SequencedTaskRunner> task_runner,
ForwardMessageCallback forward, ForwardMessageCallback forward,
ForwardMessageWithResponderCallback forward_with_responder, ForwardMessageWithResponderCallback forward_with_responder,
ForceAsyncSendCallback force_async_send,
const AssociatedGroup& associated_group); const AssociatedGroup& associated_group);
~ThreadSafeForwarderBase() override; ~ThreadSafeForwarderBase() override;
...@@ -107,8 +139,11 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) ThreadSafeForwarderBase ...@@ -107,8 +139,11 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) ThreadSafeForwarderBase
const scoped_refptr<base::SequencedTaskRunner> task_runner_; const scoped_refptr<base::SequencedTaskRunner> task_runner_;
const ForwardMessageCallback forward_; const ForwardMessageCallback forward_;
const ForwardMessageWithResponderCallback forward_with_responder_; const ForwardMessageWithResponderCallback forward_with_responder_;
const ForceAsyncSendCallback force_async_send_;
AssociatedGroup associated_group_; AssociatedGroup associated_group_;
scoped_refptr<InProgressSyncCalls> sync_calls_; scoped_refptr<InProgressSyncCalls> sync_calls_;
int sync_call_nesting_level_ = 0;
base::WeakPtrFactory<ThreadSafeForwarderBase> weak_ptr_factory_{this};
DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarderBase); DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarderBase);
}; };
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include <utility> #include <utility>
#include "base/bind.h" #include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/task_runner.h" #include "base/task_runner.h"
...@@ -56,10 +57,12 @@ class ThreadSafeForwarder : public ThreadSafeForwarderBase { ...@@ -56,10 +57,12 @@ class ThreadSafeForwarder : public ThreadSafeForwarderBase {
scoped_refptr<base::SequencedTaskRunner> task_runner, scoped_refptr<base::SequencedTaskRunner> task_runner,
ForwardMessageCallback forward, ForwardMessageCallback forward,
ForwardMessageWithResponderCallback forward_with_responder, ForwardMessageWithResponderCallback forward_with_responder,
ForceAsyncSendCallback force_async_send,
const AssociatedGroup& associated_group) const AssociatedGroup& associated_group)
: ThreadSafeForwarderBase(std::move(task_runner), : ThreadSafeForwarderBase(std::move(task_runner),
std::move(forward), std::move(forward),
std::move(forward_with_responder), std::move(forward_with_responder),
std::move(force_async_send),
associated_group), associated_group),
proxy_(this) {} proxy_(this) {}
...@@ -156,7 +159,7 @@ class ThreadSafeInterfacePtrBase ...@@ -156,7 +159,7 @@ class ThreadSafeInterfacePtrBase
return std::make_unique<ThreadSafeForwarder<InterfaceType>>( return std::make_unique<ThreadSafeForwarder<InterfaceType>>(
task_runner_, base::BindRepeating(&PtrWrapper::Accept, this), task_runner_, base::BindRepeating(&PtrWrapper::Accept, this),
base::BindRepeating(&PtrWrapper::AcceptWithResponder, this), base::BindRepeating(&PtrWrapper::AcceptWithResponder, this),
associated_group_); base::DoNothing(), associated_group_);
} }
private: private:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment