Commit 1058b7c6 authored by sergeyu@chromium.org's avatar sergeyu@chromium.org

Update ProtocolPerfTest to run host and client on different threads

Previously the host and client were running on the main thread, so they
both would compete for it and that skews the perf numbers, particularly
with NSS SSL sockets because they use a separate thread.

BUG=394067
R=rmsousa@chromium.org

Review URL: https://codereview.chromium.org/414443009

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@285738 0039d316-1c4b-4281-b951-d872f2087c98
parent 078aed43
...@@ -19,13 +19,15 @@ namespace remoting { ...@@ -19,13 +19,15 @@ namespace remoting {
// static // static
void FakeSignalStrategy::Connect(FakeSignalStrategy* peer1, void FakeSignalStrategy::Connect(FakeSignalStrategy* peer1,
FakeSignalStrategy* peer2) { FakeSignalStrategy* peer2) {
peer1->peer_ = peer2; DCHECK(peer1->main_thread_->BelongsToCurrentThread());
peer2->peer_ = peer1; DCHECK(peer2->main_thread_->BelongsToCurrentThread());
peer1->ConnectTo(peer2);
peer2->ConnectTo(peer1);
} }
FakeSignalStrategy::FakeSignalStrategy(const std::string& jid) FakeSignalStrategy::FakeSignalStrategy(const std::string& jid)
: jid_(jid), : main_thread_(base::ThreadTaskRunnerHandle::Get()),
peer_(NULL), jid_(jid),
last_id_(0), last_id_(0),
weak_factory_(this) { weak_factory_(this) {
...@@ -38,6 +40,22 @@ FakeSignalStrategy::~FakeSignalStrategy() { ...@@ -38,6 +40,22 @@ FakeSignalStrategy::~FakeSignalStrategy() {
} }
} }
void FakeSignalStrategy::ConnectTo(FakeSignalStrategy* peer) {
PeerCallback peer_callback =
base::Bind(&FakeSignalStrategy::DeliverMessageOnThread,
main_thread_,
weak_factory_.GetWeakPtr());
if (peer->main_thread_->BelongsToCurrentThread()) {
peer->SetPeerCallback(peer_callback);
} else {
peer->main_thread_->PostTask(
FROM_HERE,
base::Bind(&FakeSignalStrategy::SetPeerCallback,
base::Unretained(peer),
peer_callback));
}
}
void FakeSignalStrategy::Connect() { void FakeSignalStrategy::Connect() {
DCHECK(CalledOnValidThread()); DCHECK(CalledOnValidThread());
FOR_EACH_OBSERVER(Listener, listeners_, FOR_EACH_OBSERVER(Listener, listeners_,
...@@ -78,8 +96,8 @@ bool FakeSignalStrategy::SendStanza(scoped_ptr<buzz::XmlElement> stanza) { ...@@ -78,8 +96,8 @@ bool FakeSignalStrategy::SendStanza(scoped_ptr<buzz::XmlElement> stanza) {
stanza->SetAttr(buzz::QN_FROM, jid_); stanza->SetAttr(buzz::QN_FROM, jid_);
if (peer_) { if (!peer_callback_.is_null()) {
peer_->OnIncomingMessage(stanza.Pass()); peer_callback_.Run(stanza.Pass());
return true; return true;
} else { } else {
return false; return false;
...@@ -91,35 +109,41 @@ std::string FakeSignalStrategy::GetNextId() { ...@@ -91,35 +109,41 @@ std::string FakeSignalStrategy::GetNextId() {
return base::IntToString(last_id_); return base::IntToString(last_id_);
} }
// static
void FakeSignalStrategy::DeliverMessageOnThread(
scoped_refptr<base::SingleThreadTaskRunner> thread,
base::WeakPtr<FakeSignalStrategy> target,
scoped_ptr<buzz::XmlElement> stanza) {
thread->PostTask(FROM_HERE,
base::Bind(&FakeSignalStrategy::OnIncomingMessage,
target, base::Passed(&stanza)));
}
void FakeSignalStrategy::OnIncomingMessage( void FakeSignalStrategy::OnIncomingMessage(
scoped_ptr<buzz::XmlElement> stanza) { scoped_ptr<buzz::XmlElement> stanza) {
pending_messages_.push(stanza.get()); DCHECK(CalledOnValidThread());
buzz::XmlElement* stanza_ptr = stanza.get();
received_messages_.push_back(stanza.release()); received_messages_.push_back(stanza.release());
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::Bind(&FakeSignalStrategy::DeliverIncomingMessages,
weak_factory_.GetWeakPtr()));
}
void FakeSignalStrategy::DeliverIncomingMessages() { const std::string& to_field = stanza_ptr->Attr(buzz::QN_TO);
while (!pending_messages_.empty()) {
buzz::XmlElement* stanza = pending_messages_.front();
const std::string& to_field = stanza->Attr(buzz::QN_TO);
if (to_field != jid_) { if (to_field != jid_) {
LOG(WARNING) << "Dropping stanza that is addressed to " << to_field LOG(WARNING) << "Dropping stanza that is addressed to " << to_field
<< ". Local jid: " << jid_ << ". Local jid: " << jid_
<< ". Message content: " << stanza->Str(); << ". Message content: " << stanza_ptr->Str();
return; return;
} }
ObserverListBase<Listener>::Iterator it(listeners_); ObserverListBase<Listener>::Iterator it(listeners_);
Listener* listener; Listener* listener;
while ((listener = it.GetNext()) != NULL) { while ((listener = it.GetNext()) != NULL) {
if (listener->OnSignalStrategyIncomingStanza(stanza)) if (listener->OnSignalStrategyIncomingStanza(stanza_ptr))
break; break;
} }
}
pending_messages_.pop(); void FakeSignalStrategy::SetPeerCallback(const PeerCallback& peer_callback) {
} peer_callback_ = peer_callback;
} }
} // namespace remoting } // namespace remoting
...@@ -15,11 +15,17 @@ ...@@ -15,11 +15,17 @@
#include "remoting/signaling/iq_sender.h" #include "remoting/signaling/iq_sender.h"
#include "remoting/signaling/signal_strategy.h" #include "remoting/signaling/signal_strategy.h"
namespace base {
class SingleThreadTaskRunner;
} // namespace base
namespace remoting { namespace remoting {
class FakeSignalStrategy : public SignalStrategy, class FakeSignalStrategy : public SignalStrategy,
public base::NonThreadSafe { public base::NonThreadSafe {
public: public:
// Calls ConenctTo() to connect |peer1| and |peer2|. Both |peer1| and |peer2|
// must belong to the current thread.
static void Connect(FakeSignalStrategy* peer1, FakeSignalStrategy* peer2); static void Connect(FakeSignalStrategy* peer1, FakeSignalStrategy* peer2);
FakeSignalStrategy(const std::string& jid); FakeSignalStrategy(const std::string& jid);
...@@ -29,6 +35,9 @@ class FakeSignalStrategy : public SignalStrategy, ...@@ -29,6 +35,9 @@ class FakeSignalStrategy : public SignalStrategy,
return received_messages_; return received_messages_;
} }
// Connects current FakeSignalStrategy to receive messages from |peer|.
void ConnectTo(FakeSignalStrategy* peer);
// SignalStrategy interface. // SignalStrategy interface.
virtual void Connect() OVERRIDE; virtual void Connect() OVERRIDE;
virtual void Disconnect() OVERRIDE; virtual void Disconnect() OVERRIDE;
...@@ -41,13 +50,22 @@ class FakeSignalStrategy : public SignalStrategy, ...@@ -41,13 +50,22 @@ class FakeSignalStrategy : public SignalStrategy,
virtual std::string GetNextId() OVERRIDE; virtual std::string GetNextId() OVERRIDE;
private: private:
typedef base::Callback<void(scoped_ptr<buzz::XmlElement> message)>
PeerCallback;
static void DeliverMessageOnThread(
scoped_refptr<base::SingleThreadTaskRunner> thread,
base::WeakPtr<FakeSignalStrategy> target,
scoped_ptr<buzz::XmlElement> stanza);
// Called by the |peer_|. Takes ownership of |stanza|. // Called by the |peer_|. Takes ownership of |stanza|.
void OnIncomingMessage(scoped_ptr<buzz::XmlElement> stanza); void OnIncomingMessage(scoped_ptr<buzz::XmlElement> stanza);
void SetPeerCallback(const PeerCallback& peer_callback);
void DeliverIncomingMessages(); scoped_refptr<base::SingleThreadTaskRunner> main_thread_;
std::string jid_; std::string jid_;
FakeSignalStrategy* peer_; PeerCallback peer_callback_;
ObserverList<Listener, true> listeners_; ObserverList<Listener, true> listeners_;
int last_id_; int last_id_;
...@@ -55,9 +73,6 @@ class FakeSignalStrategy : public SignalStrategy, ...@@ -55,9 +73,6 @@ class FakeSignalStrategy : public SignalStrategy,
// All received messages, includes thouse still in |pending_messages_|. // All received messages, includes thouse still in |pending_messages_|.
std::list<buzz::XmlElement*> received_messages_; std::list<buzz::XmlElement*> received_messages_;
// Queue of messages that have yet to be delivered to observers.
std::queue<buzz::XmlElement*> pending_messages_;
base::WeakPtrFactory<FakeSignalStrategy> weak_factory_; base::WeakPtrFactory<FakeSignalStrategy> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(FakeSignalStrategy); DISALLOW_COPY_AND_ASSIGN(FakeSignalStrategy);
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "base/single_thread_task_runner.h" #include "base/single_thread_task_runner.h"
#include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event.h"
#include "base/thread_task_runner_handle.h" #include "base/thread_task_runner_handle.h"
#include "jingle/glue/thread_wrapper.h"
#include "net/base/test_data_directory.h" #include "net/base/test_data_directory.h"
#include "net/url_request/url_request_context_getter.h" #include "net/url_request/url_request_context_getter.h"
#include "remoting/base/rsa_key_pair.h" #include "remoting/base/rsa_key_pair.h"
...@@ -44,13 +45,19 @@ class ProtocolPerfTest : public testing::Test, ...@@ -44,13 +45,19 @@ class ProtocolPerfTest : public testing::Test,
public HostStatusObserver { public HostStatusObserver {
public: public:
ProtocolPerfTest() ProtocolPerfTest()
: capture_thread_("capture"), : host_thread_("host"),
capture_thread_("capture"),
encode_thread_("encode") { encode_thread_("encode") {
VideoScheduler::EnableTimestampsForTests(); VideoScheduler::EnableTimestampsForTests();
host_thread_.StartWithOptions(
base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
capture_thread_.Start(); capture_thread_.Start();
encode_thread_.Start(); encode_thread_.Start();
} }
virtual ~ProtocolPerfTest() { virtual ~ProtocolPerfTest() {
host_thread_.message_loop_proxy()->DeleteSoon(FROM_HERE, host_.release());
host_thread_.message_loop_proxy()->DeleteSoon(FROM_HERE,
host_signaling_.release());
message_loop_.RunUntilIdle(); message_loop_.RunUntilIdle();
} }
...@@ -100,9 +107,10 @@ class ProtocolPerfTest : public testing::Test, ...@@ -100,9 +107,10 @@ class ProtocolPerfTest : public testing::Test,
// HostStatusObserver interface. // HostStatusObserver interface.
virtual void OnClientConnected(const std::string& jid) OVERRIDE { virtual void OnClientConnected(const std::string& jid) OVERRIDE {
host_connected_ = true; message_loop_.PostTask(
if (client_connected_) FROM_HERE,
connecting_loop_->Quit(); base::Bind(&ProtocolPerfTest::OnHostConnectedMainThread,
base::Unretained(this)));
} }
protected: protected:
...@@ -116,6 +124,12 @@ class ProtocolPerfTest : public testing::Test, ...@@ -116,6 +124,12 @@ class ProtocolPerfTest : public testing::Test,
ASSERT_TRUE(client_connected_ && host_connected_); ASSERT_TRUE(client_connected_ && host_connected_);
} }
void OnHostConnectedMainThread() {
host_connected_ = true;
if (client_connected_)
connecting_loop_->Quit();
}
void ReceiveFrame(base::TimeDelta* latency) { void ReceiveFrame(base::TimeDelta* latency) {
waiting_frames_loop_.reset(new base::RunLoop()); waiting_frames_loop_.reset(new base::RunLoop());
on_frame_task_ = waiting_frames_loop_->QuitClosure(); on_frame_task_ = waiting_frames_loop_->QuitClosure();
...@@ -143,21 +157,38 @@ class ProtocolPerfTest : public testing::Test, ...@@ -143,21 +157,38 @@ class ProtocolPerfTest : public testing::Test,
} }
} }
// Creates test host and client and starts connection between them. Caller
// should call WaitConnected() to wait until connection is established. The
// host is started on |host_thread_| while the client works on the main
// thread.
void StartHostAndClient(protocol::ChannelConfig::Codec video_codec) { void StartHostAndClient(protocol::ChannelConfig::Codec video_codec) {
host_signaling_.reset(new FakeSignalStrategy(kHostJid));
client_signaling_.reset(new FakeSignalStrategy(kClientJid)); client_signaling_.reset(new FakeSignalStrategy(kClientJid));
FakeSignalStrategy::Connect(host_signaling_.get(), client_signaling_.get());
protocol::NetworkSettings network_settings( jingle_glue::JingleThreadWrapper::EnsureForCurrentMessageLoop();
protocol::NetworkSettings::NAT_TRAVERSAL_OUTGOING);
scoped_ptr<protocol::CandidateSessionConfig> protocol_config = protocol_config_ = protocol::CandidateSessionConfig::CreateDefault();
protocol::CandidateSessionConfig::CreateDefault(); protocol_config_->DisableAudioChannel();
protocol_config->DisableAudioChannel(); protocol_config_->mutable_video_configs()->clear();
protocol_config->mutable_video_configs()->clear(); protocol_config_->mutable_video_configs()->push_back(
protocol_config->mutable_video_configs()->push_back(protocol::ChannelConfig( protocol::ChannelConfig(
protocol::ChannelConfig::TRANSPORT_STREAM, 2, video_codec)); protocol::ChannelConfig::TRANSPORT_STREAM, 2, video_codec));
host_thread_.message_loop_proxy()->PostTask(
FROM_HERE,
base::Bind(&ProtocolPerfTest::StartHost, base::Unretained(this)));
}
void StartHost() {
DCHECK(host_thread_.message_loop_proxy()->BelongsToCurrentThread());
jingle_glue::JingleThreadWrapper::EnsureForCurrentMessageLoop();
host_signaling_.reset(new FakeSignalStrategy(kHostJid));
host_signaling_->ConnectTo(client_signaling_.get());
protocol::NetworkSettings network_settings(
protocol::NetworkSettings::NAT_TRAVERSAL_OUTGOING);
// TODO(sergeyu): Replace with a fake port allocator. // TODO(sergeyu): Replace with a fake port allocator.
scoped_ptr<cricket::HttpPortAllocatorBase> host_port_allocator = scoped_ptr<cricket::HttpPortAllocatorBase> host_port_allocator =
protocol::ChromiumPortAllocator::Create(NULL, network_settings) protocol::ChromiumPortAllocator::Create(NULL, network_settings)
...@@ -177,12 +208,12 @@ class ProtocolPerfTest : public testing::Test, ...@@ -177,12 +208,12 @@ class ProtocolPerfTest : public testing::Test,
host_.reset(new ChromotingHost(host_signaling_.get(), host_.reset(new ChromotingHost(host_signaling_.get(),
&desktop_environment_factory_, &desktop_environment_factory_,
session_manager.Pass(), session_manager.Pass(),
message_loop_.message_loop_proxy(), host_thread_.message_loop_proxy(),
message_loop_.message_loop_proxy(), host_thread_.message_loop_proxy(),
capture_thread_.message_loop_proxy(), capture_thread_.message_loop_proxy(),
encode_thread_.message_loop_proxy(), encode_thread_.message_loop_proxy(),
message_loop_.message_loop_proxy(), host_thread_.message_loop_proxy(),
message_loop_.message_loop_proxy())); host_thread_.message_loop_proxy()));
base::FilePath certs_dir(net::GetTestCertsDirectory()); base::FilePath certs_dir(net::GetTestCertsDirectory());
...@@ -208,9 +239,20 @@ class ProtocolPerfTest : public testing::Test, ...@@ -208,9 +239,20 @@ class ProtocolPerfTest : public testing::Test,
host_->SetAuthenticatorFactory(auth_factory.Pass()); host_->SetAuthenticatorFactory(auth_factory.Pass());
host_->AddStatusObserver(this); host_->AddStatusObserver(this);
host_->set_protocol_config(protocol_config->Clone()); host_->set_protocol_config(protocol_config_->Clone());
host_->Start(kHostOwner); host_->Start(kHostOwner);
message_loop_.PostTask(FROM_HERE,
base::Bind(&ProtocolPerfTest::StartClientAfterHost,
base::Unretained(this)));
}
void StartClientAfterHost() {
client_signaling_->ConnectTo(host_signaling_.get());
protocol::NetworkSettings network_settings(
protocol::NetworkSettings::NAT_TRAVERSAL_OUTGOING);
// Initialize client. // Initialize client.
client_context_.reset( client_context_.reset(
new ClientContext(base::ThreadTaskRunnerHandle::Get())); new ClientContext(base::ThreadTaskRunnerHandle::Get()));
...@@ -238,7 +280,7 @@ class ProtocolPerfTest : public testing::Test, ...@@ -238,7 +280,7 @@ class ProtocolPerfTest : public testing::Test,
auth_methods)); auth_methods));
client_.reset(new ChromotingClient( client_.reset(new ChromotingClient(
client_context_.get(), this, this, scoped_ptr<AudioPlayer>())); client_context_.get(), this, this, scoped_ptr<AudioPlayer>()));
client_->SetProtocolConfigForTests(protocol_config->Clone()); client_->SetProtocolConfigForTests(protocol_config_->Clone());
client_->Start( client_->Start(
client_signaling_.get(), client_authenticator.Pass(), client_signaling_.get(), client_authenticator.Pass(),
client_transport_factory.Pass(), kHostJid, std::string()); client_transport_factory.Pass(), kHostJid, std::string());
...@@ -252,9 +294,12 @@ class ProtocolPerfTest : public testing::Test, ...@@ -252,9 +294,12 @@ class ProtocolPerfTest : public testing::Test,
base::MessageLoopForIO message_loop_; base::MessageLoopForIO message_loop_;
FakeDesktopEnvironmentFactory desktop_environment_factory_; base::Thread host_thread_;
base::Thread capture_thread_; base::Thread capture_thread_;
base::Thread encode_thread_; base::Thread encode_thread_;
FakeDesktopEnvironmentFactory desktop_environment_factory_;
scoped_ptr<protocol::CandidateSessionConfig> protocol_config_;
scoped_ptr<FakeSignalStrategy> host_signaling_; scoped_ptr<FakeSignalStrategy> host_signaling_;
scoped_ptr<FakeSignalStrategy> client_signaling_; scoped_ptr<FakeSignalStrategy> client_signaling_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment