Commit bf7db0f9 authored by Chris Letnick's avatar Chris Letnick Committed by Commit Bot

Make ExternalConnector immediately usable.

Mojo is designed to be asynchronous and for the user to not have to
wait for and check connection states.

Merge-With: eureka-internal/370252
Merge-With: eureka-internal/370569
Merge-With: eureka-internal/371453
Merge-With: nest-camera-internal/54520

Bug: internal 150158878
Test: Verified on device.
Change-Id: Ibb4fd27f7bfbffa0b9c9c695c6593f5bdbd599c4
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2086689
Commit-Queue: Chris Letnick <cletnick@google.com>
Reviewed-by: default avatarKenneth MacKay <kmackay@chromium.org>
Auto-Submit: Chris Letnick <cletnick@google.com>
Cr-Commit-Position: refs/heads/master@{#747131}
parent 8f553df1
......@@ -10,7 +10,7 @@
#include <utility>
#include <vector>
#include "base/callback.h"
#include "base/callback_list.h"
#include "chromecast/external_mojo/public/mojom/connector.mojom.h"
#include "mojo/public/cpp/bindings/pending_receiver.h"
#include "mojo/public/cpp/bindings/pending_remote.h"
......@@ -32,13 +32,19 @@ class ExternalConnector {
const std::string& broker_path,
base::OnceCallback<void(std::unique_ptr<ExternalConnector>)> callback);
static std::unique_ptr<ExternalConnector> Create(
const std::string& broker_path);
virtual ~ExternalConnector() = default;
// Sets the callback that will be called if this class loses its connection to
// the Mojo broker. Note that once the connection is lost, this instance
// becomes nonfunctional (all public methods are no-ops); a new connection
// must be made instead.
virtual void SetConnectionErrorCallback(base::OnceClosure callback) = 0;
// Adds a callback that will be called if this class loses its connection to
// the Mojo broker. The calling class must retain the returned Subscription
// until it intends to unregister.
// By the time |callback| is executed, a new attempt at connecting will be
// started, and this object is valid. Note that some prior messages may be
// lost.
virtual std::unique_ptr<base::CallbackList<void()>::Subscription>
AddConnectionErrorCallback(base::RepeatingClosure callback) = 0;
// Registers a service that other Mojo processes/services can bind to. Others
// can call BindInterface(|service_name|, interface_name) to bind to this
......
......@@ -27,54 +27,36 @@ void ExternalConnector::Connect(
const std::string& broker_path,
base::OnceCallback<void(std::unique_ptr<ExternalConnector>)> callback) {
DCHECK(callback);
std::move(callback).Run(Create(broker_path));
}
mojo::PlatformChannelEndpoint endpoint =
mojo::NamedPlatformChannel::ConnectToServer(broker_path);
if (!endpoint.is_valid()) {
base::SequencedTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE,
base::BindOnce(&ExternalConnector::Connect, broker_path,
std::move(callback)),
kConnectRetryDelay);
return;
}
auto invitation = mojo::IncomingInvitation::Accept(std::move(endpoint));
auto pipe = invitation.ExtractMessagePipe(0);
if (!pipe) {
LOG(ERROR) << "Invalid message pipe";
base::SequencedTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE,
base::BindOnce(&ExternalConnector::Connect, broker_path,
std::move(callback)),
kConnectRetryDelay);
return;
}
mojo::Remote<external_mojo::mojom::ExternalConnector> connector(
mojo::PendingRemote<external_mojo::mojom::ExternalConnector>(
std::move(pipe), 0));
std::move(callback).Run(
std::make_unique<ExternalConnectorImpl>(std::move(connector)));
// static
std::unique_ptr<ExternalConnector> ExternalConnector::Create(
const std::string& broker_path) {
return std::make_unique<ExternalConnectorImpl>(broker_path);
}
ExternalConnectorImpl::ExternalConnectorImpl(
mojo::Remote<external_mojo::mojom::ExternalConnector> connector)
: connector_(std::move(connector)) {
connector_.set_disconnect_handler(base::BindOnce(
&ExternalConnectorImpl::OnMojoDisconnect, base::Unretained(this)));
ExternalConnectorImpl::ExternalConnectorImpl(const std::string& broker_path)
: broker_path_(broker_path) {
InitializeBrokerConnection();
}
ExternalConnectorImpl::ExternalConnectorImpl(
mojo::PendingRemote<external_mojo::mojom::ExternalConnector> unbound_state)
: unbound_state_(std::move(unbound_state)) {
const std::string& broker_path,
mojo::PendingRemote<external_mojo::mojom::ExternalConnector>
connector_pending_remote)
: broker_path_(broker_path),
connector_pending_remote_from_clone_(
std::move(connector_pending_remote)) {
DETACH_FROM_SEQUENCE(sequence_checker_);
}
ExternalConnectorImpl::~ExternalConnectorImpl() = default;
void ExternalConnectorImpl::SetConnectionErrorCallback(
base::OnceClosure callback) {
connection_error_callback_ = std::move(callback);
std::unique_ptr<base::CallbackList<void()>::Subscription>
ExternalConnectorImpl::AddConnectionErrorCallback(
base::RepeatingClosure callback) {
return error_callbacks_.Add(std::move(callback));
}
void ExternalConnectorImpl::RegisterService(const std::string& service_name,
......@@ -134,7 +116,8 @@ std::unique_ptr<ExternalConnector> ExternalConnectorImpl::Clone() {
if (BindConnectorIfNecessary()) {
connector_->Clone(std::move(receiver));
}
return std::make_unique<ExternalConnectorImpl>(std::move(connector_remote));
return std::make_unique<ExternalConnectorImpl>(broker_path_,
std::move(connector_remote));
}
void ExternalConnectorImpl::SendChromiumConnectorRequest(
......@@ -147,9 +130,10 @@ void ExternalConnectorImpl::SendChromiumConnectorRequest(
void ExternalConnectorImpl::OnMojoDisconnect() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
connector_.reset();
if (connection_error_callback_) {
std::move(connection_error_callback_).Run();
}
connector_pending_remote_from_clone_.reset();
connector_pending_receiver_for_broker_.reset();
InitializeBrokerConnection();
error_callbacks_.Notify();
}
bool ExternalConnectorImpl::BindConnectorIfNecessary() {
......@@ -160,17 +144,56 @@ bool ExternalConnectorImpl::BindConnectorIfNecessary() {
return true;
}
if (!unbound_state_.is_valid()) {
// OnMojoDisconnect was already called, but |this| was not destroyed.
return false;
}
DCHECK(connector_pending_remote_from_clone_.is_valid());
connector_.Bind(std::move(unbound_state_));
connector_.Bind(std::move(connector_pending_remote_from_clone_));
connector_.set_disconnect_handler(base::BindOnce(
&ExternalConnectorImpl::OnMojoDisconnect, base::Unretained(this)));
return true;
}
void ExternalConnectorImpl::InitializeBrokerConnection() {
// Setup the connector_ to be valid and bound.
// Once |connector_pending_receiver_for_broker_| is bound in the mojo broker,
// messages will be delivered.
mojo::PendingRemote<external_mojo::mojom::ExternalConnector> connector_remote;
connector_pending_receiver_for_broker_ =
connector_remote.InitWithNewPipeAndPassReceiver();
connector_.Bind(std::move(connector_remote));
connector_.set_disconnect_handler(base::BindOnce(
&ExternalConnectorImpl::OnMojoDisconnect, base::Unretained(this)));
AttemptBrokerConnection();
}
void ExternalConnectorImpl::AttemptBrokerConnection() {
mojo::PlatformChannelEndpoint endpoint =
mojo::NamedPlatformChannel::ConnectToServer(broker_path_);
if (!endpoint.is_valid()) {
base::SequencedTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE,
base::BindOnce(&ExternalConnectorImpl::AttemptBrokerConnection,
weak_factory_.GetWeakPtr()),
kConnectRetryDelay);
return;
}
auto invitation = mojo::IncomingInvitation::Accept(std::move(endpoint));
auto remote_pipe = invitation.ExtractMessagePipe(0);
if (!remote_pipe) {
LOG(ERROR) << "Invalid message pipe";
base::SequencedTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE,
base::BindOnce(&ExternalConnectorImpl::AttemptBrokerConnection,
weak_factory_.GetWeakPtr()),
kConnectRetryDelay);
return;
}
mojo::FuseMessagePipes(connector_pending_receiver_for_broker_.PassPipe(),
std::move(remote_pipe));
}
} // namespace external_service_support
} // namespace chromecast
......@@ -22,15 +22,16 @@ namespace external_service_support {
class ExternalConnectorImpl : public ExternalConnector {
public:
explicit ExternalConnectorImpl(const std::string& broker_path);
explicit ExternalConnectorImpl(
mojo::Remote<external_mojo::mojom::ExternalConnector> connector);
explicit ExternalConnectorImpl(
const std::string& broker_path,
mojo::PendingRemote<external_mojo::mojom::ExternalConnector>
unbound_state);
connector_pending_remote);
~ExternalConnectorImpl() override;
// ExternalConnector implementation:
void SetConnectionErrorCallback(base::OnceClosure callback) override;
std::unique_ptr<base::CallbackList<void()>::Subscription>
AddConnectionErrorCallback(base::RepeatingClosure callback) override;
void RegisterService(const std::string& service_name,
ExternalService* service) override;
void RegisterService(
......@@ -56,10 +57,24 @@ class ExternalConnectorImpl : public ExternalConnector {
mojo::ScopedMessagePipeHandle interface_pipe);
void OnMojoDisconnect();
bool BindConnectorIfNecessary();
void InitializeBrokerConnection();
void AttemptBrokerConnection();
std::string broker_path_;
mojo::Remote<external_mojo::mojom::ExternalConnector> connector_;
mojo::PendingRemote<external_mojo::mojom::ExternalConnector> unbound_state_;
base::OnceClosure connection_error_callback_;
base::CallbackList<void()> error_callbacks_;
// If connecting to a broker, |connector_pending_receiver_for_broker_| is used
// to keep |connector_| bound while waiting for the broker.
mojo::PendingReceiver<external_mojo::mojom::ExternalConnector>
connector_pending_receiver_for_broker_;
// If cloned, |connector_pending_remote_from_clone_| is stored until an IO
// operation is performed to ensure it happens on the correct sequence.
mojo::PendingRemote<external_mojo::mojom::ExternalConnector>
connector_pending_remote_from_clone_;
SEQUENCE_CHECKER(sequence_checker_);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment