Commit 06654e11 authored by Ken Rockot's avatar Ken Rockot Committed by Commit Bot

[mojo] Better sync behavior for SharedRemote

This CL changes SharedRemote so that its sync calls now block only the
calling sequence rather than additionally blocking the underlying
Remote's bound sequence.

Fixed: 1016022
Change-Id: I2483c130d882de24686c0bb1396a894171a733fd
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1871249
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: default avatarDarin Fisher <darin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#708028}
parent 3078b029
...@@ -164,6 +164,10 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) InterfaceEndpointClient ...@@ -164,6 +164,10 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) InterfaceEndpointClient
const char* interface_name() const { return interface_name_; } const char* interface_name() const { return interface_name_; }
void force_outgoing_messages_async(bool force) {
force_outgoing_messages_async_ = force;
}
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
void SetNextCallLocation(const base::Location& location) { void SetNextCallLocation(const base::Location& location) {
next_call_location_ = location; next_call_location_ = location;
...@@ -275,6 +279,17 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) InterfaceEndpointClient ...@@ -275,6 +279,17 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) InterfaceEndpointClient
base::Location next_call_location_; base::Location next_call_location_;
#endif #endif
// If set to |true|, the endpoint ignores the sync flag when sending messages.
// This means that all messages are sent as if they were async, and all
// incoming replies are treated as if they replied to an async message. It is
// NOT appropriate to call generated sync method signatures (i.e. mojom
// interface methods with output arguments) on such endpoints.
//
// This exists only to facilitate APIs forwarding opaque sync messages through
// the endpoint from some other sequence which blocks on the reply, such as
// with sync calls on a SharedRemote.
bool force_outgoing_messages_async_ = false;
SEQUENCE_CHECKER(sequence_checker_); SEQUENCE_CHECKER(sequence_checker_);
base::WeakPtrFactory<InterfaceEndpointClient> weak_ptr_factory_{this}; base::WeakPtrFactory<InterfaceEndpointClient> weak_ptr_factory_{this};
......
...@@ -76,6 +76,11 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) AssociatedInterfacePtrStateBase { ...@@ -76,6 +76,11 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) AssociatedInterfacePtrStateBase {
endpoint_client_->AcceptWithResponder(&message, std::move(responder)); endpoint_client_->AcceptWithResponder(&message, std::move(responder));
} }
void force_outgoing_messages_async(bool force) {
DCHECK(endpoint_client_);
endpoint_client_->force_outgoing_messages_async(force);
}
protected: protected:
void Swap(AssociatedInterfacePtrStateBase* other); void Swap(AssociatedInterfacePtrStateBase* other);
void Bind(ScopedInterfaceEndpointHandle handle, void Bind(ScopedInterfaceEndpointHandle handle,
......
...@@ -319,7 +319,7 @@ bool InterfaceEndpointClient::SendMessageWithResponder( ...@@ -319,7 +319,7 @@ bool InterfaceEndpointClient::SendMessageWithResponder(
if (!is_control_message && idle_handler_) if (!is_control_message && idle_handler_)
++num_unacked_messages_; ++num_unacked_messages_;
if (!is_sync) { if (!is_sync || force_outgoing_messages_async_) {
async_responders_[request_id] = std::move(responder); async_responders_[request_id] = std::move(responder);
return true; return true;
} }
...@@ -531,7 +531,8 @@ bool InterfaceEndpointClient::HandleValidatedMessage(Message* message) { ...@@ -531,7 +531,8 @@ bool InterfaceEndpointClient::HandleValidatedMessage(Message* message) {
} else if (message->has_flag(Message::kFlagIsResponse)) { } else if (message->has_flag(Message::kFlagIsResponse)) {
uint64_t request_id = message->request_id(); uint64_t request_id = message->request_id();
if (message->has_flag(Message::kFlagIsSync)) { if (message->has_flag(Message::kFlagIsSync) &&
!force_outgoing_messages_async_) {
auto it = sync_responses_.find(request_id); auto it = sync_responses_.find(request_id);
if (it == sync_responses_.end()) if (it == sync_responses_.end())
return false; return false;
......
...@@ -61,6 +61,11 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) InterfacePtrStateBase { ...@@ -61,6 +61,11 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) InterfacePtrStateBase {
return endpoint_client_ && endpoint_client_->has_pending_responders(); return endpoint_client_ && endpoint_client_->has_pending_responders();
} }
void force_outgoing_messages_async(bool force) {
DCHECK(endpoint_client_);
endpoint_client_->force_outgoing_messages_async(force);
}
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
void SetNextCallLocation(const base::Location& location) { void SetNextCallLocation(const base::Location& location) {
endpoint_client_->SetNextCallLocation(location); endpoint_client_->SetNextCallLocation(location);
......
...@@ -112,6 +112,11 @@ class SharedRemoteBase ...@@ -112,6 +112,11 @@ class SharedRemoteBase
void Bind(PendingType remote) { void Bind(PendingType remote) {
DCHECK(task_runner_->RunsTasksInCurrentSequence()); DCHECK(task_runner_->RunsTasksInCurrentSequence());
remote_.Bind(std::move(remote)); remote_.Bind(std::move(remote));
// The ThreadSafeForwarder will always block the calling thread on a
// reply, so there's no need for the endpoint to employ its own sync
// waiting logic.
remote_.internal_state()->force_outgoing_messages_async(true);
} }
void Accept(Message message) { void Accept(Message message) {
...@@ -161,14 +166,16 @@ class SharedRemoteBase ...@@ -161,14 +166,16 @@ class SharedRemoteBase
// thread, but has some additional overhead and latency in message transmission // thread, but has some additional overhead and latency in message transmission
// as a trade-off. // as a trade-off.
// //
// Async calls are posted to the sequence that the underlying Remote is bound // Async calls are posted to the bound sequence (the sequence that the
// to, and responses are posted back to the calling sequence. Sync calls are // underlying Remote is bound to, i.e. |bind_task_runner| below), and responses
// dispatched directly if the call is made on the sequence that the wrapped // are posted back to the calling sequence. Sync calls are dispatched directly
// Remote is bound to, or posted otherwise. It's important to be aware that // if the call is made on the bound sequence, or posted otherwise.
// sync calls block both the calling sequence and the bound Remote's sequence. //
// That means that you cannot make sync calls through a SharedRemote if the // This means that in general, when making calls from sequences other than the
// underlying Remote is bound to a sequence that cannot block, like the IPC // bound sequence, a hop is first made *to* the bound sequence; and when
// thread. // receiving replies, a hop is made *from* the bound the sequence.
//
// Note that sync calls only block the calling sequence.
template <typename Interface> template <typename Interface>
class SharedRemote { class SharedRemote {
public: public:
......
...@@ -140,6 +140,7 @@ mojom("test_mojom") { ...@@ -140,6 +140,7 @@ mojom("test_mojom") {
"enum_headers_unittest.test-mojom", "enum_headers_unittest.test-mojom",
"idle_tracking_unittest.test-mojom", "idle_tracking_unittest.test-mojom",
"receiver_unittest.test-mojom", "receiver_unittest.test-mojom",
"remote_unittest.test-mojom",
"service_factory_unittest.test-mojom", "service_factory_unittest.test-mojom",
"struct_headers_unittest.test-mojom", "struct_headers_unittest.test-mojom",
] ]
......
...@@ -34,6 +34,9 @@ class BindingsTestBase ...@@ -34,6 +34,9 @@ class BindingsTestBase
// Helper which other test fixtures can use. // Helper which other test fixtures can use.
static void SetupSerializationBehavior(BindingsTestSerializationMode mode); static void SetupSerializationBehavior(BindingsTestSerializationMode mode);
protected:
base::test::TaskEnvironment* task_environment() { return &task_environment_; }
private: private:
base::test::TaskEnvironment task_environment_; base::test::TaskEnvironment task_environment_;
}; };
......
...@@ -22,8 +22,10 @@ ...@@ -22,8 +22,10 @@
#include "mojo/public/cpp/bindings/receiver.h" #include "mojo/public/cpp/bindings/receiver.h"
#include "mojo/public/cpp/bindings/remote.h" #include "mojo/public/cpp/bindings/remote.h"
#include "mojo/public/cpp/bindings/remote_set.h" #include "mojo/public/cpp/bindings/remote_set.h"
#include "mojo/public/cpp/bindings/self_owned_receiver.h"
#include "mojo/public/cpp/bindings/shared_remote.h" #include "mojo/public/cpp/bindings/shared_remote.h"
#include "mojo/public/cpp/bindings/tests/bindings_test_base.h" #include "mojo/public/cpp/bindings/tests/bindings_test_base.h"
#include "mojo/public/cpp/bindings/tests/remote_unittest.test-mojom.h"
#include "mojo/public/cpp/bindings/unique_receiver_set.h" #include "mojo/public/cpp/bindings/unique_receiver_set.h"
#include "mojo/public/cpp/system/wait.h" #include "mojo/public/cpp/system/wait.h"
#include "mojo/public/interfaces/bindings/tests/math_calculator.mojom.h" #include "mojo/public/interfaces/bindings/tests/math_calculator.mojom.h"
...@@ -34,6 +36,7 @@ ...@@ -34,6 +36,7 @@
namespace mojo { namespace mojo {
namespace test { namespace test {
namespace remote_unittest {
namespace { namespace {
class MathCalculatorImpl : public math::Calculator { class MathCalculatorImpl : public math::Calculator {
...@@ -934,6 +937,60 @@ TEST_P(RemoteTest, SharedRemoteWithTaskRunner) { ...@@ -934,6 +937,60 @@ TEST_P(RemoteTest, SharedRemoteWithTaskRunner) {
shared_remote.reset(); shared_remote.reset();
} }
constexpr int32_t kMagicNumber = 42;
class SharedRemoteSyncTestImpl : public mojom::SharedRemoteSyncTest {
public:
SharedRemoteSyncTestImpl() = default;
~SharedRemoteSyncTestImpl() override = default;
// mojom::SharedRemoteSyncTest implementation:
void Fetch(FetchCallback callback) override {
// Post an async task to our current task runner to respond to this message.
// Because the Remote and Receiver are bound to the same sequence, this will
// only run if the Remote doesn't block the sequence on the sync call made
// by the test below.
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(std::move(callback), kMagicNumber));
}
};
TEST_P(RemoteTest, SharedRemoteSyncOnlyBlocksCallingSequence) {
// Verifies that a sync call on a SharedRemote only blocks the calling
// sequence, not the sequence to which the underlying Remote is bound.
// See https://crbug.com/1016022.
const scoped_refptr<base::SequencedTaskRunner> bound_task_runner =
base::CreateSequencedTaskRunner(
{base::ThreadPool(), base::WithBaseSyncPrimitives()});
PendingRemote<mojom::SharedRemoteSyncTest> pending_remote;
auto receiver = pending_remote.InitWithNewPipeAndPassReceiver();
SharedRemote<mojom::SharedRemoteSyncTest> remote(std::move(pending_remote),
bound_task_runner);
bound_task_runner->PostTask(
FROM_HERE,
base::BindOnce(
[](mojo::PendingReceiver<mojom::SharedRemoteSyncTest> receiver) {
mojo::MakeSelfOwnedReceiver(
std::make_unique<SharedRemoteSyncTestImpl>(),
std::move(receiver));
},
std::move(receiver)));
int32_t value = 0;
remote->Fetch(&value);
EXPECT_EQ(kMagicNumber, value);
remote.reset();
// Resetting |remote| above will ultimately post a task to |bound_task_runner|
// to signal a connection error and trigger the self-owned Receiver's
// destruction. This ensures that the task will run, avoiding leaks.
task_environment()->RunUntilIdle();
}
TEST_P(RemoteTest, RemoteSet) { TEST_P(RemoteTest, RemoteSet) {
std::vector<base::Optional<MathCalculatorImpl>> impls(3); std::vector<base::Optional<MathCalculatorImpl>> impls(3);
...@@ -1023,5 +1080,6 @@ TEST_P(RemoteTest, RemoteSet) { ...@@ -1023,5 +1080,6 @@ TEST_P(RemoteTest, RemoteSet) {
INSTANTIATE_MOJO_BINDINGS_TEST_SUITE_P(RemoteTest); INSTANTIATE_MOJO_BINDINGS_TEST_SUITE_P(RemoteTest);
} // namespace } // namespace
} // namespace remote_unittest
} // namespace test } // namespace test
} // namespace mojo } // namespace mojo
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
module mojo.test.remote_unittest.mojom;
interface SharedRemoteSyncTest {
[Sync] Fetch() => (int32 value);
};
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment