Commit 4e572c18 authored by kelvinp's avatar kelvinp Committed by Commit bot

Enables delegating signal strategy for It2Me Host.

Motivation:
Allow the It2me host to delegates signaling messages sending/receiving
via native messaging.

Summary of changes:
- Add a new flag useSignalingProxy on the connect message for
  com.google.chrome.remote_assistance.
- Move delegating_signal_strategy from the plugin directory to the
  signaling directory so that it can be shared by the client and the
  host
- Change the threading behavior of delegating_signal_strategy so that
  the delegate and the listener can be run on two different threads.

Review-Url: https://codereview.chromium.org/2384063008
Cr-Commit-Position: refs/heads/master@{#423692}
parent ac1c9846
...@@ -11,8 +11,6 @@ executable("remoting_client_plugin_newlib") { ...@@ -11,8 +11,6 @@ executable("remoting_client_plugin_newlib") {
sources = [ sources = [
"chromoting_instance.cc", "chromoting_instance.cc",
"chromoting_instance.h", "chromoting_instance.h",
"delegating_signal_strategy.cc",
"delegating_signal_strategy.h",
"pepper_address_resolver.cc", "pepper_address_resolver.cc",
"pepper_address_resolver.h", "pepper_address_resolver.h",
"pepper_audio_player.cc", "pepper_audio_player.cc",
...@@ -53,6 +51,7 @@ executable("remoting_client_plugin_newlib") { ...@@ -53,6 +51,7 @@ executable("remoting_client_plugin_newlib") {
"//remoting/client", "//remoting/client",
"//remoting/codec", "//remoting/codec",
"//remoting/protocol", "//remoting/protocol",
"//remoting/signaling",
"//third_party/libyuv", "//third_party/libyuv",
"//third_party/webrtc/modules/desktop_capture:primitives", "//third_party/webrtc/modules/desktop_capture:primitives",
"//ui/events:dom_keycode_converter", "//ui/events:dom_keycode_converter",
......
...@@ -43,7 +43,6 @@ ...@@ -43,7 +43,6 @@
#include "remoting/client/normalizing_input_filter_cros.h" #include "remoting/client/normalizing_input_filter_cros.h"
#include "remoting/client/normalizing_input_filter_mac.h" #include "remoting/client/normalizing_input_filter_mac.h"
#include "remoting/client/normalizing_input_filter_win.h" #include "remoting/client/normalizing_input_filter_win.h"
#include "remoting/client/plugin/delegating_signal_strategy.h"
#include "remoting/client/plugin/pepper_audio_player.h" #include "remoting/client/plugin/pepper_audio_player.h"
#include "remoting/client/plugin/pepper_main_thread_task_runner.h" #include "remoting/client/plugin/pepper_main_thread_task_runner.h"
#include "remoting/client/plugin/pepper_mouse_locker.h" #include "remoting/client/plugin/pepper_mouse_locker.h"
...@@ -56,6 +55,7 @@ ...@@ -56,6 +55,7 @@
#include "remoting/protocol/connection_to_host.h" #include "remoting/protocol/connection_to_host.h"
#include "remoting/protocol/host_stub.h" #include "remoting/protocol/host_stub.h"
#include "remoting/protocol/transport_context.h" #include "remoting/protocol/transport_context.h"
#include "remoting/signaling/delegating_signal_strategy.h"
#include "third_party/webrtc/base/helpers.h" #include "third_party/webrtc/base/helpers.h"
#include "third_party/webrtc/modules/desktop_capture/desktop_region.h" #include "third_party/webrtc/modules/desktop_capture/desktop_region.h"
#include "url/gurl.h" #include "url/gurl.h"
...@@ -671,8 +671,9 @@ void ChromotingInstance::HandleConnect(const base::DictionaryValue& data) { ...@@ -671,8 +671,9 @@ void ChromotingInstance::HandleConnect(const base::DictionaryValue& data) {
// Setup the signal strategy. // Setup the signal strategy.
signal_strategy_.reset(new DelegatingSignalStrategy( signal_strategy_.reset(new DelegatingSignalStrategy(
local_jid, base::Bind(&ChromotingInstance::SendOutgoingIq, local_jid, plugin_task_runner_,
weak_factory_.GetWeakPtr()))); base::Bind(&ChromotingInstance::SendOutgoingIq,
weak_factory_.GetWeakPtr())));
// Create TransportContext. // Create TransportContext.
scoped_refptr<protocol::TransportContext> transport_context( scoped_refptr<protocol::TransportContext> transport_context(
......
...@@ -47,6 +47,7 @@ source_set("common") { ...@@ -47,6 +47,7 @@ source_set("common") {
"//remoting/host", "//remoting/host",
"//remoting/protocol", "//remoting/protocol",
"//remoting/resources", "//remoting/resources",
"//remoting/signaling",
] ]
if (is_desktop_linux) { if (is_desktop_linux) {
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
#include "base/strings/string_util.h" #include "base/strings/string_util.h"
#include "base/threading/platform_thread.h" #include "base/threading/platform_thread.h"
#include "components/policy/policy_constants.h" #include "components/policy/policy_constants.h"
#include "net/socket/client_socket_factory.h"
#include "net/url_request/url_request_context_getter.h" #include "net/url_request/url_request_context_getter.h"
#include "remoting/base/auto_thread.h" #include "remoting/base/auto_thread.h"
#include "remoting/base/chromium_url_request.h" #include "remoting/base/chromium_url_request.h"
...@@ -61,11 +60,13 @@ It2MeHost::It2MeHost( ...@@ -61,11 +60,13 @@ It2MeHost::It2MeHost(
std::unique_ptr<PolicyWatcher> policy_watcher, std::unique_ptr<PolicyWatcher> policy_watcher,
std::unique_ptr<It2MeConfirmationDialog> confirmation_dialog, std::unique_ptr<It2MeConfirmationDialog> confirmation_dialog,
base::WeakPtr<It2MeHost::Observer> observer, base::WeakPtr<It2MeHost::Observer> observer,
const XmppSignalStrategy::XmppServerConfig& xmpp_server_config, std::unique_ptr<SignalStrategy> signal_strategy,
const std::string& username,
const std::string& directory_bot_jid) const std::string& directory_bot_jid)
: host_context_(std::move(host_context)), : host_context_(std::move(host_context)),
observer_(observer), observer_(observer),
xmpp_server_config_(xmpp_server_config), signal_strategy_(std::move(signal_strategy)),
username_(username),
directory_bot_jid_(directory_bot_jid), directory_bot_jid_(directory_bot_jid),
policy_watcher_(std::move(policy_watcher)), policy_watcher_(std::move(policy_watcher)),
confirmation_dialog_(std::move(confirmation_dialog)) { confirmation_dialog_(std::move(confirmation_dialog)) {
...@@ -173,8 +174,7 @@ void It2MeHost::FinishConnect() { ...@@ -173,8 +174,7 @@ void It2MeHost::FinishConnect() {
// Check the host domain policy. // Check the host domain policy.
if (!required_host_domain_.empty() && if (!required_host_domain_.empty() &&
!base::EndsWith(xmpp_server_config_.username, !base::EndsWith(username_, std::string("@") + required_host_domain_,
std::string("@") + required_host_domain_,
base::CompareCase::INSENSITIVE_ASCII)) { base::CompareCase::INSENSITIVE_ASCII)) {
SetState(kInvalidDomainError, ""); SetState(kInvalidDomainError, "");
return; return;
...@@ -184,19 +184,13 @@ void It2MeHost::FinishConnect() { ...@@ -184,19 +184,13 @@ void It2MeHost::FinishConnect() {
// TODO(wez): Move this to the worker thread. // TODO(wez): Move this to the worker thread.
host_key_pair_ = RsaKeyPair::Generate(); host_key_pair_ = RsaKeyPair::Generate();
// Create XMPP connection.
std::unique_ptr<SignalStrategy> signal_strategy(new XmppSignalStrategy(
net::ClientSocketFactory::GetDefaultFactory(),
host_context_->url_request_context_getter(), xmpp_server_config_));
// Request registration of the host for support. // Request registration of the host for support.
std::unique_ptr<RegisterSupportHostRequest> register_request( std::unique_ptr<RegisterSupportHostRequest> register_request(
new RegisterSupportHostRequest( new RegisterSupportHostRequest(
signal_strategy.get(), host_key_pair_, directory_bot_jid_, signal_strategy_.get(), host_key_pair_, directory_bot_jid_,
base::Bind(&It2MeHost::OnReceivedSupportID, base::Unretained(this)))); base::Bind(&It2MeHost::OnReceivedSupportID, base::Unretained(this))));
// Beyond this point nothing can fail, so save the config and request. // Beyond this point nothing can fail, so save the config and request.
signal_strategy_ = std::move(signal_strategy);
register_request_ = std::move(register_request); register_request_ = std::move(register_request);
// If NAT traversal is off then limit port range to allow firewall pin-holing. // If NAT traversal is off then limit port range to allow firewall pin-holing.
...@@ -249,7 +243,7 @@ void It2MeHost::FinishConnect() { ...@@ -249,7 +243,7 @@ void It2MeHost::FinishConnect() {
// Connect signaling and start the host. // Connect signaling and start the host.
signal_strategy_->Connect(); signal_strategy_->Connect();
host_->Start(xmpp_server_config_.username); host_->Start(username_);
SetState(kRequestedAccessCode, ""); SetState(kRequestedAccessCode, "");
return; return;
...@@ -551,7 +545,8 @@ scoped_refptr<It2MeHost> It2MeHostFactory::CreateIt2MeHost( ...@@ -551,7 +545,8 @@ scoped_refptr<It2MeHost> It2MeHostFactory::CreateIt2MeHost(
std::unique_ptr<ChromotingHostContext> context, std::unique_ptr<ChromotingHostContext> context,
policy::PolicyService* policy_service, policy::PolicyService* policy_service,
base::WeakPtr<It2MeHost::Observer> observer, base::WeakPtr<It2MeHost::Observer> observer,
const XmppSignalStrategy::XmppServerConfig& xmpp_server_config, std::unique_ptr<SignalStrategy> signal_strategy,
const std::string& username,
const std::string& directory_bot_jid) { const std::string& directory_bot_jid) {
DCHECK(context->ui_task_runner()->BelongsToCurrentThread()); DCHECK(context->ui_task_runner()->BelongsToCurrentThread());
...@@ -559,7 +554,7 @@ scoped_refptr<It2MeHost> It2MeHostFactory::CreateIt2MeHost( ...@@ -559,7 +554,7 @@ scoped_refptr<It2MeHost> It2MeHostFactory::CreateIt2MeHost(
PolicyWatcher::Create(policy_service, context->file_task_runner()); PolicyWatcher::Create(policy_service, context->file_task_runner());
return new It2MeHost(std::move(context), std::move(policy_watcher), return new It2MeHost(std::move(context), std::move(policy_watcher),
It2MeConfirmationDialog::Create(), observer, It2MeConfirmationDialog::Create(), observer,
xmpp_server_config, directory_bot_jid); std::move(signal_strategy), username, directory_bot_jid);
} }
} // namespace remoting } // namespace remoting
...@@ -67,7 +67,8 @@ class It2MeHost : public base::RefCountedThreadSafe<It2MeHost>, ...@@ -67,7 +67,8 @@ class It2MeHost : public base::RefCountedThreadSafe<It2MeHost>,
std::unique_ptr<PolicyWatcher> policy_watcher, std::unique_ptr<PolicyWatcher> policy_watcher,
std::unique_ptr<It2MeConfirmationDialog> confirmation_dialog, std::unique_ptr<It2MeConfirmationDialog> confirmation_dialog,
base::WeakPtr<It2MeHost::Observer> observer, base::WeakPtr<It2MeHost::Observer> observer,
const XmppSignalStrategy::XmppServerConfig& xmpp_server_config, std::unique_ptr<SignalStrategy> signal_strategy,
const std::string& username,
const std::string& directory_bot_jid); const std::string& directory_bot_jid);
// Methods called by the script object, from the plugin thread. // Methods called by the script object, from the plugin thread.
...@@ -155,13 +156,13 @@ class It2MeHost : public base::RefCountedThreadSafe<It2MeHost>, ...@@ -155,13 +156,13 @@ class It2MeHost : public base::RefCountedThreadSafe<It2MeHost>,
// Caller supplied fields. // Caller supplied fields.
std::unique_ptr<ChromotingHostContext> host_context_; std::unique_ptr<ChromotingHostContext> host_context_;
base::WeakPtr<It2MeHost::Observer> observer_; base::WeakPtr<It2MeHost::Observer> observer_;
XmppSignalStrategy::XmppServerConfig xmpp_server_config_; std::unique_ptr<SignalStrategy> signal_strategy_;
std::string username_;
std::string directory_bot_jid_; std::string directory_bot_jid_;
It2MeHostState state_ = kDisconnected; It2MeHostState state_ = kDisconnected;
scoped_refptr<RsaKeyPair> host_key_pair_; scoped_refptr<RsaKeyPair> host_key_pair_;
std::unique_ptr<SignalStrategy> signal_strategy_;
std::unique_ptr<RegisterSupportHostRequest> register_request_; std::unique_ptr<RegisterSupportHostRequest> register_request_;
std::unique_ptr<HostStatusLogger> host_status_logger_; std::unique_ptr<HostStatusLogger> host_status_logger_;
std::unique_ptr<DesktopEnvironmentFactory> desktop_environment_factory_; std::unique_ptr<DesktopEnvironmentFactory> desktop_environment_factory_;
...@@ -215,7 +216,8 @@ class It2MeHostFactory { ...@@ -215,7 +216,8 @@ class It2MeHostFactory {
std::unique_ptr<ChromotingHostContext> context, std::unique_ptr<ChromotingHostContext> context,
policy::PolicyService* policy_service, policy::PolicyService* policy_service,
base::WeakPtr<It2MeHost::Observer> observer, base::WeakPtr<It2MeHost::Observer> observer,
const XmppSignalStrategy::XmppServerConfig& xmpp_server_config, std::unique_ptr<SignalStrategy> signal_strategy,
const std::string& username,
const std::string& directory_bot_jid); const std::string& directory_bot_jid);
private: private:
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "remoting/host/chromoting_host_context.h" #include "remoting/host/chromoting_host_context.h"
#include "remoting/host/it2me/it2me_confirmation_dialog.h" #include "remoting/host/it2me/it2me_confirmation_dialog.h"
#include "remoting/host/policy_watcher.h" #include "remoting/host/policy_watcher.h"
#include "remoting/signaling/fake_signal_strategy.h"
#include "testing/gtest/include/gtest/gtest.h" #include "testing/gtest/include/gtest/gtest.h"
namespace remoting { namespace remoting {
...@@ -110,9 +111,6 @@ class It2MeHostTest : public testing::Test { ...@@ -110,9 +111,6 @@ class It2MeHostTest : public testing::Test {
scoped_refptr<It2MeHost> it2me_host_; scoped_refptr<It2MeHost> it2me_host_;
std::string directory_bot_jid_;
XmppSignalStrategy::XmppServerConfig xmpp_server_config_;
DISALLOW_COPY_AND_ASSIGN(It2MeHostTest); DISALLOW_COPY_AND_ASSIGN(It2MeHostTest);
}; };
...@@ -127,11 +125,12 @@ void It2MeHostTest::SetUp() { ...@@ -127,11 +125,12 @@ void It2MeHostTest::SetUp() {
ui_task_runner_ = host_context->ui_task_runner(); ui_task_runner_ = host_context->ui_task_runner();
dialog_ = new FakeIt2MeConfirmationDialog(); dialog_ = new FakeIt2MeConfirmationDialog();
it2me_host_ = it2me_host_ =
new It2MeHost(std::move(host_context), /*policy_watcher=*/nullptr, new It2MeHost(std::move(host_context),
base::WrapUnique(dialog_), /*observer=*/nullptr, /*policy_watcher=*/nullptr, base::WrapUnique(dialog_),
xmpp_server_config_, directory_bot_jid_); /*observer=*/nullptr,
base::WrapUnique(new FakeSignalStrategy("fake_local_jid")),
"fake_user_name", "fake_bot_jid");
} }
void It2MeHostTest::TearDown() { void It2MeHostTest::TearDown() {
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "build/build_config.h" #include "build/build_config.h"
#include "components/policy/policy_constants.h" #include "components/policy/policy_constants.h"
#include "net/base/url_util.h" #include "net/base/url_util.h"
#include "net/socket/client_socket_factory.h"
#include "net/url_request/url_request_context_getter.h" #include "net/url_request/url_request_context_getter.h"
#include "remoting/base/auto_thread_task_runner.h" #include "remoting/base/auto_thread_task_runner.h"
#include "remoting/host/chromoting_host_context.h" #include "remoting/host/chromoting_host_context.h"
...@@ -29,6 +30,7 @@ ...@@ -29,6 +30,7 @@
#include "remoting/host/policy_watcher.h" #include "remoting/host/policy_watcher.h"
#include "remoting/host/service_urls.h" #include "remoting/host/service_urls.h"
#include "remoting/protocol/name_value_map.h" #include "remoting/protocol/name_value_map.h"
#include "remoting/signaling/delegating_signal_strategy.h"
#if defined(OS_WIN) #if defined(OS_WIN)
#include "base/command_line.h" #include "base/command_line.h"
...@@ -92,16 +94,6 @@ It2MeNativeMessagingHost::It2MeNativeMessagingHost( ...@@ -92,16 +94,6 @@ It2MeNativeMessagingHost::It2MeNativeMessagingHost(
weak_factory_(this) { weak_factory_(this) {
weak_ptr_ = weak_factory_.GetWeakPtr(); weak_ptr_ = weak_factory_.GetWeakPtr();
const ServiceUrls* service_urls = ServiceUrls::GetInstance();
const bool xmpp_server_valid =
net::ParseHostAndPort(service_urls->xmpp_server_address(),
&xmpp_server_config_.host,
&xmpp_server_config_.port);
DCHECK(xmpp_server_valid);
xmpp_server_config_.use_tls = service_urls->xmpp_server_use_tls();
directory_bot_jid_ = service_urls->directory_bot_jid();
// The policy watcher runs on the |file_task_runner| but we want to run the // The policy watcher runs on the |file_task_runner| but we want to run the
// update code on |task_runner| so we use a shim to post the callback to the // update code on |task_runner| so we use a shim to post the callback to the
// preferred task runner. // preferred task runner.
...@@ -155,6 +147,8 @@ void It2MeNativeMessagingHost::OnMessage(const std::string& message) { ...@@ -155,6 +147,8 @@ void It2MeNativeMessagingHost::OnMessage(const std::string& message) {
ProcessConnect(std::move(message_dict), std::move(response)); ProcessConnect(std::move(message_dict), std::move(response));
} else if (type == "disconnect") { } else if (type == "disconnect") {
ProcessDisconnect(std::move(message_dict), std::move(response)); ProcessDisconnect(std::move(message_dict), std::move(response));
} else if (type == "incomingIq") {
ProcessIncomingIq(std::move(message_dict), std::move(response));
} else { } else {
SendErrorAndExit(std::move(response), "Unsupported request type: " + type); SendErrorAndExit(std::move(response), "Unsupported request type: " + type);
} }
...@@ -226,56 +220,92 @@ void It2MeNativeMessagingHost::ProcessConnect( ...@@ -226,56 +220,92 @@ void It2MeNativeMessagingHost::ProcessConnect(
return; return;
} }
XmppSignalStrategy::XmppServerConfig xmpp_config = xmpp_server_config_; std::string username;
if (!message->GetString("userName", &username)) {
if (!message->GetString("userName", &xmpp_config.username)) {
SendErrorAndExit(std::move(response), "'userName' not found in request."); SendErrorAndExit(std::move(response), "'userName' not found in request.");
return; return;
} }
std::string auth_service_with_token; bool use_signaling_proxy = false;
if (!message->GetString("authServiceWithToken", &auth_service_with_token)) { message->GetBoolean("useSignalingProxy", &use_signaling_proxy);
SendErrorAndExit(std::move(response),
"'authServiceWithToken' not found in request.");
return;
}
// For backward compatibility the webapp still passes OAuth service as part of const ServiceUrls* service_urls = ServiceUrls::GetInstance();
// the authServiceWithToken field. But auth service part is always expected to std::unique_ptr<SignalStrategy> signal_strategy;
// be set to oauth2.
const char kOAuth2ServicePrefix[] = "oauth2:"; if (!use_signaling_proxy) {
if (!base::StartsWith(auth_service_with_token, kOAuth2ServicePrefix, XmppSignalStrategy::XmppServerConfig xmpp_config;
base::CompareCase::SENSITIVE)) { xmpp_config.username = username;
SendErrorAndExit(std::move(response), "Invalid 'authServiceWithToken': " +
auth_service_with_token); const bool xmpp_server_valid =
return; net::ParseHostAndPort(service_urls->xmpp_server_address(),
} &xmpp_config.host, &xmpp_config.port);
DCHECK(xmpp_server_valid);
xmpp_config.use_tls = service_urls->xmpp_server_use_tls();
std::string auth_service_with_token;
if (!message->GetString("authServiceWithToken", &auth_service_with_token)) {
SendErrorAndExit(std::move(response),
"'authServiceWithToken' not found in request.");
return;
}
// For backward compatibility the webapp still passes OAuth service as part
// of the authServiceWithToken field. But auth service part is always
// expected to be set to oauth2.
const char kOAuth2ServicePrefix[] = "oauth2:";
if (!base::StartsWith(auth_service_with_token, kOAuth2ServicePrefix,
base::CompareCase::SENSITIVE)) {
SendErrorAndExit(std::move(response), "Invalid 'authServiceWithToken': " +
auth_service_with_token);
return;
}
xmpp_config.auth_token = xmpp_config.auth_token =
auth_service_with_token.substr(strlen(kOAuth2ServicePrefix)); auth_service_with_token.substr(strlen(kOAuth2ServicePrefix));
#if !defined(NDEBUG) #if !defined(NDEBUG)
std::string address; std::string address;
if (!message->GetString("xmppServerAddress", &address)) { if (!message->GetString("xmppServerAddress", &address)) {
SendErrorAndExit(std::move(response), SendErrorAndExit(std::move(response),
"'xmppServerAddress' not found in request."); "'xmppServerAddress' not found in request.");
return; return;
} }
if (!net::ParseHostAndPort(address, &xmpp_config.host, if (!net::ParseHostAndPort(address, &xmpp_config.host, &xmpp_config.port)) {
&xmpp_config.port)) { SendErrorAndExit(std::move(response),
SendErrorAndExit(std::move(response), "Invalid 'xmppServerAddress': " + address);
"Invalid 'xmppServerAddress': " + address); return;
return; }
}
if (!message->GetBoolean("xmppServerUseTls", &xmpp_config.use_tls)) { if (!message->GetBoolean("xmppServerUseTls", &xmpp_config.use_tls)) {
SendErrorAndExit(std::move(response), SendErrorAndExit(std::move(response),
"'xmppServerUseTls' not found in request."); "'xmppServerUseTls' not found in request.");
return; return;
}
#endif // !defined(NDEBUG)
signal_strategy.reset(new XmppSignalStrategy(
net::ClientSocketFactory::GetDefaultFactory(),
host_context_->url_request_context_getter(), xmpp_config));
} else {
std::string local_jid;
if (!message->GetString("localJid", &local_jid)) {
SendErrorAndExit(std::move(response), "'localJid' not found in request.");
return;
}
delegating_signal_strategy_ = new DelegatingSignalStrategy(
local_jid, host_context_->network_task_runner(),
base::Bind(&It2MeNativeMessagingHost::SendOutgoingIq,
weak_factory_.GetWeakPtr()));
signal_strategy.reset(delegating_signal_strategy_);
} }
if (!message->GetString("directoryBotJid", &directory_bot_jid_)) { std::string directory_bot_jid = service_urls->directory_bot_jid();
#if !defined(NDEBUG)
if (!message->GetString("directoryBotJid", &directory_bot_jid)) {
SendErrorAndExit(std::move(response), SendErrorAndExit(std::move(response),
"'directoryBotJid' not found in request."); "'directoryBotJid' not found in request.");
return; return;
...@@ -283,9 +313,9 @@ void It2MeNativeMessagingHost::ProcessConnect( ...@@ -283,9 +313,9 @@ void It2MeNativeMessagingHost::ProcessConnect(
#endif // !defined(NDEBUG) #endif // !defined(NDEBUG)
// Create the It2Me host and start connecting. // Create the It2Me host and start connecting.
it2me_host_ = it2me_host_ = factory_->CreateIt2MeHost(
factory_->CreateIt2MeHost(host_context_->Copy(), policy_service_, host_context_->Copy(), policy_service_, weak_ptr_,
weak_ptr_, xmpp_config, directory_bot_jid_); std::move(signal_strategy), username, directory_bot_jid);
it2me_host_->Connect(); it2me_host_->Connect();
SendMessageToClient(std::move(response)); SendMessageToClient(std::move(response));
...@@ -317,6 +347,27 @@ void It2MeNativeMessagingHost::ProcessDisconnect( ...@@ -317,6 +347,27 @@ void It2MeNativeMessagingHost::ProcessDisconnect(
SendMessageToClient(std::move(response)); SendMessageToClient(std::move(response));
} }
void It2MeNativeMessagingHost::ProcessIncomingIq(
std::unique_ptr<base::DictionaryValue> message,
std::unique_ptr<base::DictionaryValue> response) {
std::string iq;
if (!message->GetString("iq", &iq)) {
LOG(ERROR) << "Invalid incomingIq() data.";
return;
}
if (delegating_signal_strategy_)
delegating_signal_strategy_->OnIncomingMessage(iq);
SendMessageToClient(std::move(response));
};
void It2MeNativeMessagingHost::SendOutgoingIq(const std::string& iq) {
std::unique_ptr<base::DictionaryValue> message(new base::DictionaryValue());
message->SetString("iq", iq);
message->SetString("type", "sendOutgoingIq");
SendMessageToClient(std::move(message));
}
void It2MeNativeMessagingHost::SendErrorAndExit( void It2MeNativeMessagingHost::SendErrorAndExit(
std::unique_ptr<base::DictionaryValue> response, std::unique_ptr<base::DictionaryValue> response,
const std::string& description) const { const std::string& description) const {
......
...@@ -28,6 +28,7 @@ class SingleThreadTaskRunner; ...@@ -28,6 +28,7 @@ class SingleThreadTaskRunner;
namespace remoting { namespace remoting {
class ChromotingHostContext; class ChromotingHostContext;
class DelegatingSignalStrategy;
class ElevatedNativeMessagingHost; class ElevatedNativeMessagingHost;
class PolicyWatcher; class PolicyWatcher;
...@@ -67,10 +68,15 @@ class It2MeNativeMessagingHost : public It2MeHost::Observer, ...@@ -67,10 +68,15 @@ class It2MeNativeMessagingHost : public It2MeHost::Observer,
std::unique_ptr<base::DictionaryValue> response); std::unique_ptr<base::DictionaryValue> response);
void ProcessDisconnect(std::unique_ptr<base::DictionaryValue> message, void ProcessDisconnect(std::unique_ptr<base::DictionaryValue> message,
std::unique_ptr<base::DictionaryValue> response); std::unique_ptr<base::DictionaryValue> response);
void ProcessIncomingIq(std::unique_ptr<base::DictionaryValue> message,
std::unique_ptr<base::DictionaryValue> response);
void SendErrorAndExit(std::unique_ptr<base::DictionaryValue> response, void SendErrorAndExit(std::unique_ptr<base::DictionaryValue> response,
const std::string& description) const; const std::string& description) const;
void SendMessageToClient(std::unique_ptr<base::Value> message) const; void SendMessageToClient(std::unique_ptr<base::Value> message) const;
// Callback for DelegatingSignalStrategy.
void SendOutgoingIq(const std::string& iq);
// Called when initial policies are read. // Called when initial policies are read.
void OnPolicyUpdate(std::unique_ptr<base::DictionaryValue> policies); void OnPolicyUpdate(std::unique_ptr<base::DictionaryValue> policies);
...@@ -88,6 +94,7 @@ class It2MeNativeMessagingHost : public It2MeHost::Observer, ...@@ -88,6 +94,7 @@ class It2MeNativeMessagingHost : public It2MeHost::Observer,
#endif // defined(OS_WIN) #endif // defined(OS_WIN)
Client* client_ = nullptr; Client* client_ = nullptr;
DelegatingSignalStrategy* delegating_signal_strategy_ = nullptr;
std::unique_ptr<ChromotingHostContext> host_context_; std::unique_ptr<ChromotingHostContext> host_context_;
std::unique_ptr<It2MeHostFactory> factory_; std::unique_ptr<It2MeHostFactory> factory_;
scoped_refptr<It2MeHost> it2me_host_; scoped_refptr<It2MeHost> it2me_host_;
...@@ -104,12 +111,6 @@ class It2MeNativeMessagingHost : public It2MeHost::Observer, ...@@ -104,12 +111,6 @@ class It2MeNativeMessagingHost : public It2MeHost::Observer,
base::TimeDelta access_code_lifetime_; base::TimeDelta access_code_lifetime_;
std::string client_username_; std::string client_username_;
// IT2Me Talk server configuration used by |it2me_host_| to connect.
XmppSignalStrategy::XmppServerConfig xmpp_server_config_;
// Chromoting Bot JID used by |it2me_host_| to register the host.
std::string directory_bot_jid_;
// Indicates whether or not a policy has ever been read. This is to ensure // Indicates whether or not a policy has ever been read. This is to ensure
// that on startup, we do not accidentally start a connection before we have // that on startup, we do not accidentally start a connection before we have
// queried our policy restrictions. // queried our policy restrictions.
......
...@@ -127,13 +127,15 @@ class MockIt2MeHost : public It2MeHost { ...@@ -127,13 +127,15 @@ class MockIt2MeHost : public It2MeHost {
MockIt2MeHost(std::unique_ptr<ChromotingHostContext> context, MockIt2MeHost(std::unique_ptr<ChromotingHostContext> context,
std::unique_ptr<PolicyWatcher> policy_watcher, std::unique_ptr<PolicyWatcher> policy_watcher,
base::WeakPtr<It2MeHost::Observer> observer, base::WeakPtr<It2MeHost::Observer> observer,
const XmppSignalStrategy::XmppServerConfig& xmpp_server_config, std::unique_ptr<SignalStrategy> signal_strategy,
const std::string& username,
const std::string& directory_bot_jid) const std::string& directory_bot_jid)
: It2MeHost(std::move(context), : It2MeHost(std::move(context),
std::move(policy_watcher), std::move(policy_watcher),
/*confirmation_dialog_factory=*/nullptr, /*confirmation_dialog_factory=*/nullptr,
observer, observer,
xmpp_server_config, std::move(signal_strategy),
username,
directory_bot_jid) {} directory_bot_jid) {}
// It2MeHost overrides // It2MeHost overrides
...@@ -207,7 +209,8 @@ class MockIt2MeHostFactory : public It2MeHostFactory { ...@@ -207,7 +209,8 @@ class MockIt2MeHostFactory : public It2MeHostFactory {
std::unique_ptr<ChromotingHostContext> context, std::unique_ptr<ChromotingHostContext> context,
policy::PolicyService* policy_service, policy::PolicyService* policy_service,
base::WeakPtr<It2MeHost::Observer> observer, base::WeakPtr<It2MeHost::Observer> observer,
const XmppSignalStrategy::XmppServerConfig& xmpp_server_config, std::unique_ptr<SignalStrategy> signal_strategy,
const std::string& username,
const std::string& directory_bot_jid) override; const std::string& directory_bot_jid) override;
private: private:
...@@ -222,11 +225,13 @@ scoped_refptr<It2MeHost> MockIt2MeHostFactory::CreateIt2MeHost( ...@@ -222,11 +225,13 @@ scoped_refptr<It2MeHost> MockIt2MeHostFactory::CreateIt2MeHost(
std::unique_ptr<ChromotingHostContext> context, std::unique_ptr<ChromotingHostContext> context,
policy::PolicyService* policy_service, policy::PolicyService* policy_service,
base::WeakPtr<It2MeHost::Observer> observer, base::WeakPtr<It2MeHost::Observer> observer,
const XmppSignalStrategy::XmppServerConfig& xmpp_server_config, std::unique_ptr<SignalStrategy> signal_strategy,
const std::string& username,
const std::string& directory_bot_jid) { const std::string& directory_bot_jid) {
return new MockIt2MeHost(std::move(context), return new MockIt2MeHost(std::move(context),
/*policy_watcher=*/nullptr, observer, /*policy_watcher=*/nullptr, observer,
xmpp_server_config, directory_bot_jid); std::move(signal_strategy), username,
directory_bot_jid);
} }
} // namespace } // namespace
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
static_library("signaling") { static_library("signaling") {
sources = [ sources = [
"delegating_signal_strategy.cc",
"delegating_signal_strategy.h",
"iq_sender.cc", "iq_sender.cc",
"iq_sender.h", "iq_sender.h",
"jid_util.cc", "jid_util.cc",
......
...@@ -2,25 +2,36 @@ ...@@ -2,25 +2,36 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#include "remoting/client/plugin/delegating_signal_strategy.h" #include "remoting/signaling/delegating_signal_strategy.h"
#include "base/bind.h"
#include "base/rand_util.h" #include "base/rand_util.h"
#include "base/strings/string_number_conversions.h" #include "base/strings/string_number_conversions.h"
#include "third_party/webrtc/libjingle/xmllite/xmlelement.h" #include "third_party/webrtc/libjingle/xmllite/xmlelement.h"
#include "third_party/webrtc/libjingle/xmpp/constants.h"
namespace remoting { namespace remoting {
DelegatingSignalStrategy::DelegatingSignalStrategy( DelegatingSignalStrategy::DelegatingSignalStrategy(
std::string local_jid, std::string local_jid,
scoped_refptr<base::SingleThreadTaskRunner> client_task_runner,
const SendIqCallback& send_iq_callback) const SendIqCallback& send_iq_callback)
: local_jid_(local_jid), : local_jid_(local_jid),
send_iq_callback_(send_iq_callback) { delegate_task_runner_(base::ThreadTaskRunnerHandle::Get()),
} client_task_runner_(client_task_runner),
send_iq_callback_(send_iq_callback),
weak_factory_(this) {}
DelegatingSignalStrategy::~DelegatingSignalStrategy() { DelegatingSignalStrategy::~DelegatingSignalStrategy() {}
}
void DelegatingSignalStrategy::OnIncomingMessage(const std::string& message) { void DelegatingSignalStrategy::OnIncomingMessage(const std::string& message) {
if (!client_task_runner_->BelongsToCurrentThread()) {
client_task_runner_->PostTask(
FROM_HERE, base::Bind(&DelegatingSignalStrategy::OnIncomingMessage,
weak_factory_.GetWeakPtr(), message));
return;
}
std::unique_ptr<buzz::XmlElement> stanza(buzz::XmlElement::ForStr(message)); std::unique_ptr<buzz::XmlElement> stanza(buzz::XmlElement::ForStr(message));
if (!stanza.get()) { if (!stanza.get()) {
LOG(WARNING) << "Malformed XMPP stanza received: " << message; LOG(WARNING) << "Malformed XMPP stanza received: " << message;
...@@ -36,34 +47,46 @@ void DelegatingSignalStrategy::OnIncomingMessage(const std::string& message) { ...@@ -36,34 +47,46 @@ void DelegatingSignalStrategy::OnIncomingMessage(const std::string& message) {
} }
void DelegatingSignalStrategy::Connect() { void DelegatingSignalStrategy::Connect() {
DCHECK(client_task_runner_->BelongsToCurrentThread());
FOR_EACH_OBSERVER(Listener, listeners_,
OnSignalStrategyStateChange(CONNECTED));
} }
void DelegatingSignalStrategy::Disconnect() { void DelegatingSignalStrategy::Disconnect() {
DCHECK(client_task_runner_->BelongsToCurrentThread());
} }
SignalStrategy::State DelegatingSignalStrategy::GetState() const { SignalStrategy::State DelegatingSignalStrategy::GetState() const {
DCHECK(client_task_runner_->BelongsToCurrentThread());
return CONNECTED; return CONNECTED;
} }
SignalStrategy::Error DelegatingSignalStrategy::GetError() const { SignalStrategy::Error DelegatingSignalStrategy::GetError() const {
DCHECK(client_task_runner_->BelongsToCurrentThread());
return OK; return OK;
} }
std::string DelegatingSignalStrategy::GetLocalJid() const { std::string DelegatingSignalStrategy::GetLocalJid() const {
DCHECK(client_task_runner_->BelongsToCurrentThread());
return local_jid_; return local_jid_;
} }
void DelegatingSignalStrategy::AddListener(Listener* listener) { void DelegatingSignalStrategy::AddListener(Listener* listener) {
DCHECK(client_task_runner_->BelongsToCurrentThread());
listeners_.AddObserver(listener); listeners_.AddObserver(listener);
} }
void DelegatingSignalStrategy::RemoveListener(Listener* listener) { void DelegatingSignalStrategy::RemoveListener(Listener* listener) {
DCHECK(client_task_runner_->BelongsToCurrentThread());
listeners_.RemoveObserver(listener); listeners_.RemoveObserver(listener);
} }
bool DelegatingSignalStrategy::SendStanza( bool DelegatingSignalStrategy::SendStanza(
std::unique_ptr<buzz::XmlElement> stanza) { std::unique_ptr<buzz::XmlElement> stanza) {
send_iq_callback_.Run(stanza->Str()); DCHECK(client_task_runner_->BelongsToCurrentThread());
stanza->SetAttr(buzz::QN_FROM, GetLocalJid());
delegate_task_runner_->PostTask(FROM_HERE,
base::Bind(send_iq_callback_, stanza->Str()));
return true; return true;
} }
......
...@@ -7,7 +7,8 @@ ...@@ -7,7 +7,8 @@
#include "base/callback.h" #include "base/callback.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/observer_list.h" #include "base/memory/weak_ptr.h"
#include "base/observer_list_threadsafe.h"
#include "remoting/signaling/signal_strategy.h" #include "remoting/signaling/signal_strategy.h"
namespace base { namespace base {
...@@ -16,12 +17,26 @@ class SingleThreadTaskRunner; ...@@ -16,12 +17,26 @@ class SingleThreadTaskRunner;
namespace remoting { namespace remoting {
// A signaling strategy class that delegates IQ sending and receiving.
//
// Notes on thread safety:
// 1. This object can be created on any thread.
// 2. OnIncomingMessage() must be called on the same thread on which this object
// is created.
// 3. |send_iq_callback| will always be called on the thread that it is created.
// Note that |send_iq_callback| may be called after this object is destroyed.
// 4. The caller should invoke all methods on the SignalStrategy interface on
// the |client_task_runner|.
// 5. All listeners will be called on |client_task_runner| as well.
// 6. The destructor should always be called on the |client_task_runner|.
class DelegatingSignalStrategy : public SignalStrategy { class DelegatingSignalStrategy : public SignalStrategy {
public: public:
typedef base::Callback<void(const std::string&)> SendIqCallback; typedef base::Callback<void(const std::string&)> SendIqCallback;
DelegatingSignalStrategy(std::string local_jid, DelegatingSignalStrategy(
const SendIqCallback& send_iq_callback); std::string local_jid,
scoped_refptr<base::SingleThreadTaskRunner> client_task_runner,
const SendIqCallback& send_iq_callback);
~DelegatingSignalStrategy() override; ~DelegatingSignalStrategy() override;
void OnIncomingMessage(const std::string& message); void OnIncomingMessage(const std::string& message);
...@@ -39,10 +54,14 @@ class DelegatingSignalStrategy : public SignalStrategy { ...@@ -39,10 +54,14 @@ class DelegatingSignalStrategy : public SignalStrategy {
private: private:
std::string local_jid_; std::string local_jid_;
SendIqCallback send_iq_callback_; scoped_refptr<base::SingleThreadTaskRunner> delegate_task_runner_;
scoped_refptr<base::SingleThreadTaskRunner> client_task_runner_;
SendIqCallback send_iq_callback_;
base::ObserverList<Listener> listeners_; base::ObserverList<Listener> listeners_;
base::WeakPtrFactory<DelegatingSignalStrategy> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(DelegatingSignalStrategy); DISALLOW_COPY_AND_ASSIGN(DelegatingSignalStrategy);
}; };
......
...@@ -32,6 +32,7 @@ FakeSignalStrategy::FakeSignalStrategy(const std::string& jid) ...@@ -32,6 +32,7 @@ FakeSignalStrategy::FakeSignalStrategy(const std::string& jid)
jid_(jid), jid_(jid),
last_id_(0), last_id_(0),
weak_factory_(this) { weak_factory_(this) {
DetachFromThread();
} }
FakeSignalStrategy::~FakeSignalStrategy() { FakeSignalStrategy::~FakeSignalStrategy() {
......
...@@ -141,7 +141,7 @@ class XmppSignalStrategy::Core : public XmppLoginHandler::Delegate { ...@@ -141,7 +141,7 @@ class XmppSignalStrategy::Core : public XmppLoginHandler::Delegate {
base::ObserverList<Listener, true> listeners_; base::ObserverList<Listener, true> listeners_;
base::Timer keep_alive_timer_; base::RepeatingTimer keep_alive_timer_;
base::ThreadChecker thread_checker_; base::ThreadChecker thread_checker_;
...@@ -154,16 +154,12 @@ XmppSignalStrategy::Core::Core( ...@@ -154,16 +154,12 @@ XmppSignalStrategy::Core::Core(
const XmppSignalStrategy::XmppServerConfig& xmpp_server_config) const XmppSignalStrategy::XmppServerConfig& xmpp_server_config)
: socket_factory_(socket_factory), : socket_factory_(socket_factory),
request_context_getter_(request_context_getter), request_context_getter_(request_context_getter),
xmpp_server_config_(xmpp_server_config), xmpp_server_config_(xmpp_server_config) {
keep_alive_timer_(
FROM_HERE,
base::TimeDelta::FromSeconds(kKeepAliveIntervalSeconds),
base::Bind(&Core::SendKeepAlive, base::Unretained(this)),
true) {
#if defined(NDEBUG) #if defined(NDEBUG)
// Non-secure connections are allowed only for debugging. // Non-secure connections are allowed only for debugging.
CHECK(xmpp_server_config_.use_tls); CHECK(xmpp_server_config_.use_tls);
#endif #endif
thread_checker_.DetachFromThread();
} }
XmppSignalStrategy::Core::~Core() { XmppSignalStrategy::Core::~Core() {
...@@ -187,6 +183,11 @@ void XmppSignalStrategy::Core::Connect() { ...@@ -187,6 +183,11 @@ void XmppSignalStrategy::Core::Connect() {
int result = socket_->Connect(base::Bind( int result = socket_->Connect(base::Bind(
&Core::OnSocketConnected, base::Unretained(this))); &Core::OnSocketConnected, base::Unretained(this)));
keep_alive_timer_.Start(
FROM_HERE, base::TimeDelta::FromSeconds(kKeepAliveIntervalSeconds),
base::Bind(&Core::SendKeepAlive, base::Unretained(this)));
if (result != net::ERR_IO_PENDING) if (result != net::ERR_IO_PENDING)
OnSocketConnected(result); OnSocketConnected(result);
} }
......
...@@ -21,6 +21,8 @@ class URLRequestContextGetter; ...@@ -21,6 +21,8 @@ class URLRequestContextGetter;
namespace remoting { namespace remoting {
// XmppSignalStrategy implements SignalStrategy using direct XMPP connection. // XmppSignalStrategy implements SignalStrategy using direct XMPP connection.
// This class can be created on a different thread from the one it is used (when
// Connect() is called).
class XmppSignalStrategy : public SignalStrategy { class XmppSignalStrategy : public SignalStrategy {
public: public:
// XMPP Server configuration for XmppSignalStrategy. // XMPP Server configuration for XmppSignalStrategy.
...@@ -61,6 +63,8 @@ class XmppSignalStrategy : public SignalStrategy { ...@@ -61,6 +63,8 @@ class XmppSignalStrategy : public SignalStrategy {
const std::string& auth_token); const std::string& auth_token);
private: private:
// This ensures that even if a Listener deletes the current instance during
// OnSignalStrategyIncomingStanza(), we can delete |core_| asynchronously.
class Core; class Core;
std::unique_ptr<Core> core_; std::unique_ptr<Core> core_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment