Commit 1e79e750 authored by Yuwei Huang's avatar Yuwei Huang Committed by Commit Bot

[remoting][FTL] Fix MuxingSignalStrategy and add unittest

* Fix a bug in MuxingSignalStrategy such that it keeps waiting for both
  strategies to connect instead of giving up and using whatever is
  available. This essentially rewrites the state transition logic.
* Add unittest for MuxingSignalStrategy.

Bug: 966632
Change-Id: Ic94b1ef47b5109acd5d603f02cb8659353c02a82
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1628130
Commit-Queue: Joe Downing <joedow@chromium.org>
Reviewed-by: default avatarJoe Downing <joedow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#662874}
parent cc262936
......@@ -84,6 +84,11 @@ bool RemotingRegisterSupportHostRequest::OnSignalStrategyIncomingStanza(
}
void RemotingRegisterSupportHostRequest::RegisterHost() {
if (state_ != State::NOT_STARTED) {
return;
}
state_ = State::REGISTERING;
apis::v1::RegisterSupportHostRequest request;
request.set_public_key(key_pair_->GetPublicKey());
if (signal_strategy_->ftl_signal_strategy()->GetState() ==
......@@ -113,9 +118,11 @@ void RemotingRegisterSupportHostRequest::OnRegisterHostResult(
const grpc::Status& status,
const apis::v1::RegisterSupportHostResponse& response) {
if (!status.ok()) {
state_ = State::NOT_STARTED;
RunCallback({}, {}, MapError(status.error_code()));
return;
}
state_ = State::REGISTERED;
base::TimeDelta lifetime =
base::TimeDelta::FromSeconds(response.support_id_lifetime_seconds());
RunCallback(response.support_id(), lifetime, protocol::ErrorCode::OK);
......
......@@ -37,6 +37,15 @@ class RemotingRegisterSupportHostRequest final
private:
using RemoteSupportService = apis::v1::RemoteSupportService;
// MuxingSignalStrategy might notify a CONNECTED state for more than once, so
// this is necessary to prevent trying to register twice when a strategy is
// connected after the timeout.
enum class State {
NOT_STARTED,
REGISTERING,
REGISTERED,
};
// SignalStrategy::Listener interface.
void OnSignalStrategyStateChange(SignalStrategy::State state) override;
bool OnSignalStrategyIncomingStanza(
......@@ -57,6 +66,8 @@ class RemotingRegisterSupportHostRequest final
std::unique_ptr<OAuthTokenGetter> token_getter_;
GrpcAuthenticatedExecutor grpc_executor_;
State state_ = State::NOT_STARTED;
std::unique_ptr<RemoteSupportService::Stub> remote_support_;
DISALLOW_COPY_AND_ASSIGN(RemotingRegisterSupportHostRequest);
......
......@@ -129,6 +129,7 @@ source_set("unit_tests") {
"jid_util_unittest.cc",
"log_to_server_unittest.cc",
"message_tracker_unittest.cc",
"muxing_signal_strategy_unittest.cc",
"push_notification_subscriber_unittest.cc",
"server_log_entry_unittest.cc",
"server_log_entry_unittest.h",
......
......@@ -40,6 +40,20 @@ FakeSignalStrategy::~FakeSignalStrategy() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
}
void FakeSignalStrategy::SetState(State state) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (state == state_) {
return;
}
state_ = state;
for (auto& observer : listeners_)
observer.OnSignalStrategyStateChange(state_);
}
void FakeSignalStrategy::SetPeerCallback(const PeerCallback& peer_callback) {
peer_callback_ = peer_callback;
}
void FakeSignalStrategy::ConnectTo(FakeSignalStrategy* peer) {
PeerCallback peer_callback =
base::Bind(&FakeSignalStrategy::DeliverMessageOnThread,
......@@ -64,18 +78,34 @@ void FakeSignalStrategy::SimulateMessageReordering() {
simulate_reorder_ = true;
}
void FakeSignalStrategy::OnIncomingMessage(
std::unique_ptr<jingle_xmpp::XmlElement> stanza) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!simulate_reorder_) {
NotifyListeners(std::move(stanza));
return;
}
// Simulate IQ messages re-ordering by swapping the delivery order of
// next pair of messages.
if (pending_stanza_) {
NotifyListeners(std::move(stanza));
NotifyListeners(std::move(pending_stanza_));
pending_stanza_.reset();
} else {
pending_stanza_ = std::move(stanza);
}
}
void FakeSignalStrategy::Connect() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
state_ = CONNECTED;
for (auto& observer : listeners_)
observer.OnSignalStrategyStateChange(CONNECTED);
SetState(CONNECTED);
}
void FakeSignalStrategy::Disconnect() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
state_ = DISCONNECTED;
for (auto& observer : listeners_)
observer.OnSignalStrategyStateChange(DISCONNECTED);
SetState(DISCONNECTED);
}
SignalStrategy::State FakeSignalStrategy::GetState() const {
......@@ -134,26 +164,6 @@ void FakeSignalStrategy::DeliverMessageOnThread(
std::move(stanza)));
}
void FakeSignalStrategy::OnIncomingMessage(
std::unique_ptr<jingle_xmpp::XmlElement> stanza) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!simulate_reorder_) {
NotifyListeners(std::move(stanza));
return;
}
// Simulate IQ messages re-ordering by swapping the delivery order of
// next pair of messages.
if (pending_stanza_) {
NotifyListeners(std::move(stanza));
NotifyListeners(std::move(pending_stanza_));
pending_stanza_.reset();
} else {
pending_stanza_ = std::move(stanza);
}
}
void FakeSignalStrategy::NotifyListeners(
std::unique_ptr<jingle_xmpp::XmlElement> stanza) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
......@@ -177,8 +187,4 @@ void FakeSignalStrategy::NotifyListeners(
}
}
void FakeSignalStrategy::SetPeerCallback(const PeerCallback& peer_callback) {
peer_callback_ = peer_callback;
}
} // namespace remoting
......@@ -25,6 +25,9 @@ namespace remoting {
class FakeSignalStrategy : public SignalStrategy {
public:
using PeerCallback = base::RepeatingCallback<void(
std::unique_ptr<jingle_xmpp::XmlElement> message)>;
// Calls ConenctTo() to connect |peer1| and |peer2|. Both |peer1| and |peer2|
// must belong to the current thread.
static void Connect(FakeSignalStrategy* peer1, FakeSignalStrategy* peer2);
......@@ -40,7 +43,8 @@ class FakeSignalStrategy : public SignalStrategy {
send_delay_ = delay;
}
void SetState(State state) const;
void SetState(State state);
void SetPeerCallback(const PeerCallback& peer_callback);
// Connects current FakeSignalStrategy to receive messages from |peer|.
void ConnectTo(FakeSignalStrategy* peer);
......@@ -51,6 +55,9 @@ class FakeSignalStrategy : public SignalStrategy {
// next pair of messages.
void SimulateMessageReordering();
// Called by the |peer_|. Takes ownership of |stanza|.
void OnIncomingMessage(std::unique_ptr<jingle_xmpp::XmlElement> stanza);
// SignalStrategy interface.
void Connect() override;
void Disconnect() override;
......@@ -63,18 +70,12 @@ class FakeSignalStrategy : public SignalStrategy {
std::string GetNextId() override;
private:
typedef base::Callback<void(std::unique_ptr<jingle_xmpp::XmlElement> message)>
PeerCallback;
static void DeliverMessageOnThread(
scoped_refptr<base::SingleThreadTaskRunner> thread,
base::WeakPtr<FakeSignalStrategy> target,
std::unique_ptr<jingle_xmpp::XmlElement> stanza);
// Called by the |peer_|. Takes ownership of |stanza|.
void OnIncomingMessage(std::unique_ptr<jingle_xmpp::XmlElement> stanza);
void NotifyListeners(std::unique_ptr<jingle_xmpp::XmlElement> stanza);
void SetPeerCallback(const PeerCallback& peer_callback);
scoped_refptr<base::SingleThreadTaskRunner> main_thread_;
......
......@@ -17,9 +17,7 @@
#include "base/strings/string_number_conversions.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/timer/timer.h"
#include "remoting/signaling/ftl_signal_strategy.h"
#include "remoting/signaling/signaling_address.h"
#include "remoting/signaling/xmpp_signal_strategy.h"
#include "third_party/libjingle_xmpp/xmllite/xmlelement.h"
namespace remoting {
......@@ -33,8 +31,8 @@ constexpr base::TimeDelta kWaitForAllStrategiesConnectedTimeout =
class MuxingSignalStrategy::Core final : public SignalStrategy::Listener {
public:
Core(std::unique_ptr<FtlSignalStrategy> ftl_signal_strategy,
std::unique_ptr<XmppSignalStrategy> xmpp_signal_strategy);
Core(std::unique_ptr<SignalStrategy> ftl_signal_strategy,
std::unique_ptr<SignalStrategy> xmpp_signal_strategy);
~Core() override;
void Invalidate();
......@@ -46,18 +44,24 @@ class MuxingSignalStrategy::Core final : public SignalStrategy::Listener {
void RemoveListener(SignalStrategy::Listener* listener);
bool SendStanza(std::unique_ptr<jingle_xmpp::XmlElement> stanza);
FtlSignalStrategy* ftl_signal_strategy() {
return ftl_signal_strategy_.get();
}
SignalStrategy* ftl_signal_strategy() { return ftl_signal_strategy_.get(); }
XmppSignalStrategy* xmpp_signal_strategy() {
return xmpp_signal_strategy_.get();
}
SignalStrategy* xmpp_signal_strategy() { return xmpp_signal_strategy_.get(); }
private:
enum class MuxingState {
ALL_DISCONNECTED,
SOME_CONNECTING,
ONLY_ONE_CONNECTED_BEFORE_TIMEOUT,
ALL_CONNECTED,
ONLY_ONE_CONNECTED_AFTER_TIMEOUT,
};
SignalStrategy* GetSignalStrategyForStanza(
const jingle_xmpp::XmlElement* stanza);
void UpdateTimerState();
// Returns true if the state is updated.
bool UpdateState();
void OnWaitForAllStrategiesConnectedTimeout();
......@@ -72,11 +76,11 @@ class MuxingSignalStrategy::Core final : public SignalStrategy::Listener {
base::ObserverList<SignalStrategy::Listener> listeners_;
std::unique_ptr<FtlSignalStrategy> ftl_signal_strategy_;
std::unique_ptr<XmppSignalStrategy> xmpp_signal_strategy_;
std::unique_ptr<SignalStrategy> ftl_signal_strategy_;
std::unique_ptr<SignalStrategy> xmpp_signal_strategy_;
SignalingAddress current_local_address_;
State previous_state_;
MuxingState state_ = MuxingState::ALL_DISCONNECTED;
base::OneShotTimer wait_for_all_strategies_connected_timeout_timer_;
......@@ -87,18 +91,19 @@ class MuxingSignalStrategy::Core final : public SignalStrategy::Listener {
};
MuxingSignalStrategy::Core::Core(
std::unique_ptr<FtlSignalStrategy> ftl_signal_strategy,
std::unique_ptr<XmppSignalStrategy> xmpp_signal_strategy)
std::unique_ptr<SignalStrategy> ftl_signal_strategy,
std::unique_ptr<SignalStrategy> xmpp_signal_strategy)
: weak_factory_(this) {
ftl_signal_strategy_ = std::move(ftl_signal_strategy);
xmpp_signal_strategy_ = std::move(xmpp_signal_strategy);
DCHECK(ftl_signal_strategy_);
DCHECK(xmpp_signal_strategy_);
DCHECK_EQ(State::DISCONNECTED, ftl_signal_strategy_->GetState());
DCHECK_EQ(State::DISCONNECTED, xmpp_signal_strategy_->GetState());
ftl_signal_strategy_->AddListener(this);
xmpp_signal_strategy_->AddListener(this);
UpdateTimerState();
previous_state_ = GetState();
UpdateState();
}
MuxingSignalStrategy::Core::~Core() {
......@@ -123,16 +128,19 @@ void MuxingSignalStrategy::Core::Connect() {
SignalStrategy::State MuxingSignalStrategy::Core::GetState() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (IsEveryStrategyDisconnected()) {
return State::DISCONNECTED;
}
if (IsAnyStrategyConnected() &&
!wait_for_all_strategies_connected_timeout_timer_.IsRunning()) {
return State::CONNECTED;
switch (state_) {
case MuxingState::ALL_DISCONNECTED:
return State::DISCONNECTED;
case MuxingState::SOME_CONNECTING:
case MuxingState::ONLY_ONE_CONNECTED_BEFORE_TIMEOUT:
return State::CONNECTING;
case MuxingState::ONLY_ONE_CONNECTED_AFTER_TIMEOUT:
case MuxingState::ALL_CONNECTED:
return State::CONNECTED;
default:
NOTREACHED();
return State::DISCONNECTED;
}
return State::CONNECTING;
}
const SignalingAddress& MuxingSignalStrategy::Core::GetLocalAddress() const {
......@@ -175,19 +183,56 @@ SignalStrategy* MuxingSignalStrategy::Core::GetSignalStrategyForStanza(
return nullptr;
}
if (receiver.channel() == SignalingAddress::Channel::FTL) {
DCHECK(ftl_signal_strategy_->GetLocalAddress().empty() ||
ftl_signal_strategy_->GetLocalAddress().channel() ==
SignalingAddress::Channel::FTL)
<< "|ftl_signal_strategy_|'s local address channel is not FTL. "
<< "You might have flipped the signal strategies. "
<< "Local address: " << ftl_signal_strategy_->GetLocalAddress().jid();
return ftl_signal_strategy_.get();
} else {
DCHECK(xmpp_signal_strategy_->GetLocalAddress().empty() ||
xmpp_signal_strategy_->GetLocalAddress().channel() !=
SignalingAddress::Channel::FTL)
<< "|xmpp_signal_strategy_|'s local address channel is FTL. "
<< "You might have flipped the signal strategies. "
<< "Local address: " << xmpp_signal_strategy_->GetLocalAddress().jid();
}
return xmpp_signal_strategy_.get();
}
void MuxingSignalStrategy::Core::UpdateTimerState() {
if (IsEveryStrategyConnected() || IsEveryStrategyDisconnected()) {
bool MuxingSignalStrategy::Core::UpdateState() {
MuxingState new_state = state_;
if (IsEveryStrategyConnected()) {
wait_for_all_strategies_connected_timeout_timer_.AbandonAndStop();
new_state = MuxingState::ALL_CONNECTED;
} else if (IsEveryStrategyDisconnected()) {
wait_for_all_strategies_connected_timeout_timer_.AbandonAndStop();
new_state = MuxingState::ALL_DISCONNECTED;
} else if (IsAnyStrategyConnected()) {
wait_for_all_strategies_connected_timeout_timer_.Start(
FROM_HERE, kWaitForAllStrategiesConnectedTimeout, this,
&MuxingSignalStrategy::Core::OnWaitForAllStrategiesConnectedTimeout);
if (state_ == MuxingState::ALL_CONNECTED // One connection is dropped
|| (state_ == MuxingState::ONLY_ONE_CONNECTED_BEFORE_TIMEOUT &&
!wait_for_all_strategies_connected_timeout_timer_.IsRunning())) {
new_state = MuxingState::ONLY_ONE_CONNECTED_AFTER_TIMEOUT;
} else if (state_ != MuxingState::ONLY_ONE_CONNECTED_AFTER_TIMEOUT) {
new_state = MuxingState::ONLY_ONE_CONNECTED_BEFORE_TIMEOUT;
if (!wait_for_all_strategies_connected_timeout_timer_.IsRunning()) {
wait_for_all_strategies_connected_timeout_timer_.Start(
FROM_HERE, kWaitForAllStrategiesConnectedTimeout, this,
&MuxingSignalStrategy::Core::
OnWaitForAllStrategiesConnectedTimeout);
}
}
// Otherwise we are not changing the state unless all strategies are
// connected or all strategies are disconnected.
} else {
new_state = MuxingState::SOME_CONNECTING;
}
if (state_ == new_state) {
return false;
}
state_ = new_state;
return true;
}
void MuxingSignalStrategy::Core::OnWaitForAllStrategiesConnectedTimeout() {
......@@ -201,13 +246,11 @@ void MuxingSignalStrategy::Core::OnWaitForAllStrategiesConnectedTimeout() {
void MuxingSignalStrategy::Core::OnSignalStrategyStateChange(
SignalStrategy::State unused) {
UpdateTimerState();
State new_state = GetState();
if (previous_state_ != new_state) {
bool is_state_changed = UpdateState();
if (is_state_changed) {
for (auto& listener : listeners_) {
listener.OnSignalStrategyStateChange(new_state);
listener.OnSignalStrategyStateChange(GetState());
}
previous_state_ = new_state;
}
}
......@@ -247,8 +290,8 @@ bool MuxingSignalStrategy::Core::IsEveryStrategyDisconnected() const {
}
MuxingSignalStrategy::MuxingSignalStrategy(
std::unique_ptr<FtlSignalStrategy> ftl_signal_strategy,
std::unique_ptr<XmppSignalStrategy> xmpp_signal_strategy)
std::unique_ptr<SignalStrategy> ftl_signal_strategy,
std::unique_ptr<SignalStrategy> xmpp_signal_strategy)
: ftl_signal_strategy_(std::move(ftl_signal_strategy)),
xmpp_signal_strategy_(std::move(xmpp_signal_strategy)) {}
......@@ -291,11 +334,11 @@ std::string MuxingSignalStrategy::GetNextId() {
return base::NumberToString(base::RandUint64());
}
FtlSignalStrategy* MuxingSignalStrategy::ftl_signal_strategy() {
SignalStrategy* MuxingSignalStrategy::ftl_signal_strategy() {
return GetCore()->ftl_signal_strategy();
}
XmppSignalStrategy* MuxingSignalStrategy::xmpp_signal_strategy() {
SignalStrategy* MuxingSignalStrategy::xmpp_signal_strategy() {
return GetCore()->xmpp_signal_strategy();
}
......
......@@ -12,9 +12,6 @@
namespace remoting {
class FtlSignalStrategy;
class XmppSignalStrategy;
// WARNING: This class is designed to be used exclusively by
// JingleSessionManager on the host during the XMPP->FTL signaling migration
// process. It doesn't support anything other than sending and receiving
......@@ -28,9 +25,8 @@ class XmppSignalStrategy;
// on another thread.
class MuxingSignalStrategy final : public SignalStrategy {
public:
MuxingSignalStrategy(
std::unique_ptr<FtlSignalStrategy> ftl_signal_strategy,
std::unique_ptr<XmppSignalStrategy> xmpp_signal_strategy);
MuxingSignalStrategy(std::unique_ptr<SignalStrategy> ftl_signal_strategy,
std::unique_ptr<SignalStrategy> xmpp_signal_strategy);
~MuxingSignalStrategy() override;
// SignalStrategy implementations.
......@@ -38,12 +34,18 @@ class MuxingSignalStrategy final : public SignalStrategy {
// This will connect both |ftl_signal_strategy_| and |xmpp_signal_strategy_|.
void Connect() override;
// Returns:
// * DISCONNECTED if both of the signal strategies are disconnected
// * CONNECTED if both of the signal strategies are connected, or only one of
// the strategy is connected while a timeout has been elapsed (the other
// strategy can be either disconnected or connecting)
// * CONNECTING in other cases
// The state is a mapping of the MuxingState (defined in
// MuxingSignalStrategy::Core):
//
// ALL_DISCONNECTED -> DISCONNECTED
// SOME_CONNECTING, ONLY_ONE_CONNECTED_BEFORE_TIMEOUT -> CONNECTING
// ALL_CONNECTED, ONLY_ONE_CONNECTED_AFTER_TIMEOUT -> CONNECTED
//
// Note that MuxingSignalStrategy will notify listeners whenever the muxing
// state is changed, which means listeners may get notified for
// CONNECTING->CONNECTING and CONNECTED->CONNECTED transitions. This is to
// allow heartbeat sender to send new heartbeat when a strategy is connected
// or disconnected after the timeout.
State GetState() const override;
// GetLocalAddress() can only be called inside
......@@ -56,8 +58,8 @@ class MuxingSignalStrategy final : public SignalStrategy {
bool SendStanza(std::unique_ptr<jingle_xmpp::XmlElement> stanza) override;
std::string GetNextId() override;
FtlSignalStrategy* ftl_signal_strategy();
XmppSignalStrategy* xmpp_signal_strategy();
SignalStrategy* ftl_signal_strategy();
SignalStrategy* xmpp_signal_strategy();
private:
class Core;
......@@ -77,8 +79,8 @@ class MuxingSignalStrategy final : public SignalStrategy {
Core* GetCoreImpl();
// These will be moved to |core_| once the core is created.
std::unique_ptr<FtlSignalStrategy> ftl_signal_strategy_;
std::unique_ptr<XmppSignalStrategy> xmpp_signal_strategy_;
std::unique_ptr<SignalStrategy> ftl_signal_strategy_;
std::unique_ptr<SignalStrategy> xmpp_signal_strategy_;
std::unique_ptr<Core> core_;
DISALLOW_COPY_AND_ASSIGN(MuxingSignalStrategy);
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment