Commit f22f5b8c authored by James Vecore's avatar James Vecore Committed by Commit Bot

[Nearby] Handle incoming messages on separate sequence

Bug: 1135840
Change-Id: I653e6af063a36f2695d21c0d9cf8cb9ce697ac6d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2506361
Commit-Queue: James Vecore <vecore@google.com>
Reviewed-by: default avatarRyan Hansberry <hansberry@chromium.org>
Cr-Commit-Position: refs/heads/master@{#822468}
parent 2760b389
......@@ -10,6 +10,7 @@
#include "chrome/services/sharing/webrtc/p2p_port_allocator.h"
#include "jingle/glue/thread_wrapper.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "mojo/public/cpp/bindings/self_owned_receiver.h"
#include "third_party/nearby/src/cpp/platform/public/future.h"
#include "third_party/webrtc/api/jsep.h"
#include "third_party/webrtc/api/peer_connection_interface.h"
......@@ -74,23 +75,47 @@ class ProxyAsyncResolverFactory final : public webrtc::AsyncResolverFactory {
sharing::IpcPacketSocketFactory* socket_factory_;
};
// This object only exists to forward incoming mojo messages. It will be created
// as a SelfOwnedReceiver on a separate sequence and will be cleaned up when the
// connection goes down. This is necessary to keep it pumping messages while the
// the main WebRtc thread is blocked on a future.
class IncomingMessageListener
: public sharing::mojom::IncomingMessagesListener {
public:
explicit IncomingMessageListener(
api::WebRtcSignalingMessenger::OnSignalingMessageCallback
signaling_message_callback)
: signaling_message_callback_(std::move(signaling_message_callback)) {
DCHECK(signaling_message_callback_);
}
~IncomingMessageListener() override = default;
// mojom::IncomingMessagesListener:
void OnMessage(const std::string& message) override {
signaling_message_callback_(ByteArray(message));
}
private:
api::WebRtcSignalingMessenger::OnSignalingMessageCallback
signaling_message_callback_;
};
// Used as a messenger in sending and receiving WebRTC messages between devices.
// The messages sent and received are considered untrusted since they
// originate in an untrusted sandboxed process on device.
class WebRtcSignalingMessengerImpl
: public api::WebRtcSignalingMessenger,
public sharing::mojom::IncomingMessagesListener {
class WebRtcSignalingMessengerImpl : public api::WebRtcSignalingMessenger {
public:
using OnSignalingMessageCallback =
api::WebRtcSignalingMessenger::OnSignalingMessageCallback;
WebRtcSignalingMessengerImpl(
const std::string& self_id,
const mojo::SharedRemote<sharing::mojom::WebRtcSignalingMessenger>&
messenger)
: self_id_(self_id), messenger_(messenger) {}
: self_id_(self_id),
messenger_(messenger),
task_runner_(
base::ThreadPool::CreateSequencedTaskRunner({base::MayBlock()})) {}
~WebRtcSignalingMessengerImpl() override = default;
~WebRtcSignalingMessengerImpl() override { StopReceivingMessages(); }
WebRtcSignalingMessengerImpl(const WebRtcSignalingMessengerImpl& other) =
delete;
......@@ -109,49 +134,59 @@ class WebRtcSignalingMessengerImpl
return success;
}
void BindIncomingReceiver(
mojo::PendingReceiver<sharing::mojom::IncomingMessagesListener>
pending_receiver,
api::WebRtcSignalingMessenger::OnSignalingMessageCallback callback) {
auto receiver = mojo::MakeSelfOwnedReceiver(
std::make_unique<IncomingMessageListener>(std::move(callback)),
std::move(pending_receiver), task_runner_);
receiver->set_connection_error_handler(base::BindOnce(
[](mojo::SharedRemote<sharing::mojom::WebRtcSignalingMessenger>
messenger) { messenger->StopReceivingMessages(); },
messenger_));
}
// api::WebRtcSignalingMessenger:
bool StartReceivingMessages(OnSignalingMessageCallback callback) override {
signaling_message_callback_ = std::move(callback);
incoming_messages_receiver_.reset();
bool success = false;
if (!messenger_->StartReceivingMessages(
self_id_, incoming_messages_receiver_.BindNewPipeAndPassRemote(),
&success) ||
mojo::PendingRemote<sharing::mojom::IncomingMessagesListener>
pending_remote;
mojo::PendingReceiver<sharing::mojom::IncomingMessagesListener>
pending_receiver = pending_remote.InitWithNewPipeAndPassReceiver();
if (!messenger_->StartReceivingMessages(self_id_, std::move(pending_remote),
&success) ||
!success) {
incoming_messages_receiver_.reset();
signaling_message_callback_ = nullptr;
receiving_messages_ = false;
return false;
}
incoming_messages_receiver_.set_disconnect_handler(
base::BindOnce(&WebRtcSignalingMessengerImpl::StopReceivingMessages,
base::Unretained(this)));
return success;
// Do the pending_receiver Bind call on the task runner itself so it can
// receive messages while the WebRtc thread is waiting. Any incoming
// messages will be queued until the Bind happens.
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&WebRtcSignalingMessengerImpl::BindIncomingReceiver,
base::Unretained(this), std::move(pending_receiver),
std::move(callback)));
receiving_messages_ = true;
return true;
}
// api::WebRtcSignalingMessenger:
void StopReceivingMessages() override {
incoming_messages_receiver_.reset();
signaling_message_callback_ = nullptr;
messenger_->StopReceivingMessages();
if (receiving_messages_) {
receiving_messages_ = false;
messenger_->StopReceivingMessages();
}
}
private:
// mojom::IncomingMessagesListener:
void OnMessage(const std::string& message) override {
if (signaling_message_callback_)
signaling_message_callback_(ByteArray(message));
}
bool receiving_messages_ = false;
std::string self_id_;
mojo::SharedRemote<sharing::mojom::WebRtcSignalingMessenger> messenger_;
mojo::Receiver<sharing::mojom::IncomingMessagesListener>
incoming_messages_receiver_{this};
OnSignalingMessageCallback signaling_message_callback_;
base::WeakPtrFactory<WebRtcSignalingMessengerImpl> weak_ptr_factory_{this};
scoped_refptr<base::SequencedTaskRunner> task_runner_;
};
} // namespace
......
......@@ -66,7 +66,7 @@ class WebRtcMediumTest : public ::testing::Test {
}
private:
base::test::SingleThreadTaskEnvironment task_environment_;
base::test::TaskEnvironment task_environment_;
testing::NiceMock<sharing::MockWebRtcDependencies> mojo_impl_;
mojo::SharedRemote<network::mojom::P2PSocketManager> socket_manager_;
......@@ -178,6 +178,12 @@ TEST_F(WebRtcMediumTest, GetMessenger_StartAndStopReceivingMessages) {
remote.Bind(std::move(listener));
remote->OnMessage(std::string(message));
}));
EXPECT_CALL(GetMockWebRtcDependencies(), StopReceivingMessages())
.WillRepeatedly(testing::Invoke([&]() {
if (remote.is_bound()) {
remote.reset();
}
}));
// TODO(https://crbug.com/1142001): Test with non-trivial |location_hint|.
std::unique_ptr<api::WebRtcSignalingMessenger> messenger =
......@@ -194,10 +200,9 @@ TEST_F(WebRtcMediumTest, GetMessenger_StartAndStopReceivingMessages) {
EXPECT_TRUE(remote.is_connected());
messenger->StopReceivingMessages();
// Run mojo disconnect handlers.
base::RunLoop().RunUntilIdle();
EXPECT_FALSE(remote.is_connected());
EXPECT_FALSE(remote.is_bound());
}
TEST_F(WebRtcMediumTest, GetMessengerAndStartReceivingMessagesTwice) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment