Commit 8c2cfd61 authored by rtenneti@chromium.org's avatar rtenneti@chromium.org

Fix end_to_end_test performance regression caused by using mutexes

instead of notifications in server_thread. Testing only.

Merge internal change: 59322530

Reversed pause/resume changes of server_thread in the following CL:
https://codereview.chromium.org/127503002/

R=rch@chromium.org

Review URL: https://codereview.chromium.org/132073002

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@243948 0039d316-1c4b-4281-b951-d872f2087c98
parent a7e0f34e
...@@ -135,10 +135,10 @@ class QuicEndToEndTest : public PlatformTest { ...@@ -135,10 +135,10 @@ class QuicEndToEndTest : public PlatformTest {
server_thread_.reset(new ServerThread(server_address_, server_config_, server_thread_.reset(new ServerThread(server_address_, server_config_,
QuicSupportedVersions(), QuicSupportedVersions(),
strike_register_no_startup_period_)); strike_register_no_startup_period_));
server_thread_->Start(); server_thread_->Initialize();
server_thread_->WaitForServerStartup();
server_address_ = IPEndPoint(server_address_.address(), server_address_ = IPEndPoint(server_address_.address(),
server_thread_->GetPort()); server_thread_->GetPort());
server_thread_->Start();
server_started_ = true; server_started_ = true;
} }
......
...@@ -218,19 +218,15 @@ class EndToEndTest : public ::testing::TestWithParam<TestParams> { ...@@ -218,19 +218,15 @@ class EndToEndTest : public ::testing::TestWithParam<TestParams> {
server_thread_.reset(new ServerThread(server_address_, server_config_, server_thread_.reset(new ServerThread(server_address_, server_config_,
server_supported_versions_, server_supported_versions_,
strike_register_no_startup_period_)); strike_register_no_startup_period_));
server_thread_->Start(); server_thread_->Initialize();
server_thread_->WaitForServerStartup();
server_address_ = IPEndPoint(server_address_.address(), server_address_ = IPEndPoint(server_address_.address(),
server_thread_->GetPort()); server_thread_->GetPort());
QuicDispatcher* dispatcher = QuicDispatcher* dispatcher =
QuicServerPeer::GetDispatcher(server_thread_->server()); QuicServerPeer::GetDispatcher(server_thread_->server());
QuicDispatcherPeer::UseWriter(dispatcher, server_writer_);
server_writer_->SetConnectionHelper( server_writer_->SetConnectionHelper(
QuicDispatcherPeer::GetHelper(dispatcher)); QuicDispatcherPeer::GetHelper(dispatcher));
// TODO(rtenneti): Enable server_thread's Pause/Resume. server_thread_->Start();
// server_thread_->Pause();
QuicDispatcherPeer::UseWriter(dispatcher, server_writer_);
// TODO(rtenneti): Enable server_thread's Pause/Resume.
// server_thread_->Resume();
server_started_ = true; server_started_ = true;
} }
...@@ -728,9 +724,7 @@ TEST_P(EndToEndTest, InitialRTT) { ...@@ -728,9 +724,7 @@ TEST_P(EndToEndTest, InitialRTT) {
client_->client()->WaitForCryptoHandshakeConfirmed(); client_->client()->WaitForCryptoHandshakeConfirmed();
server_thread_->WaitForCryptoHandshakeConfirmed(); server_thread_->WaitForCryptoHandshakeConfirmed();
// Pause the server so we can access the server's internals without races. server_thread_->Pause();
// TODO(rtenneti): Enable server_thread's Pause/Resume.
// server_thread_->Pause();
QuicDispatcher* dispatcher = QuicDispatcher* dispatcher =
QuicServerPeer::GetDispatcher(server_thread_->server()); QuicServerPeer::GetDispatcher(server_thread_->server());
ASSERT_EQ(1u, dispatcher->session_map().size()); ASSERT_EQ(1u, dispatcher->session_map().size());
...@@ -751,8 +745,7 @@ TEST_P(EndToEndTest, InitialRTT) { ...@@ -751,8 +745,7 @@ TEST_P(EndToEndTest, InitialRTT) {
EXPECT_FALSE(client_sent_packet_manager.SmoothedRtt().IsInfinite()); EXPECT_FALSE(client_sent_packet_manager.SmoothedRtt().IsInfinite());
EXPECT_GE(static_cast<int64>(kMaxInitialRoundTripTimeUs), EXPECT_GE(static_cast<int64>(kMaxInitialRoundTripTimeUs),
server_sent_packet_manager.SmoothedRtt().ToMicroseconds()); server_sent_packet_manager.SmoothedRtt().ToMicroseconds());
// TODO(rtenneti): Enable server_thread's Pause/Resume. server_thread_->Resume();
// server_thread_->Resume();
} }
TEST_P(EndToEndTest, ResetConnection) { TEST_P(EndToEndTest, ResetConnection) {
......
...@@ -15,33 +15,48 @@ ServerThread::ServerThread(IPEndPoint address, ...@@ -15,33 +15,48 @@ ServerThread::ServerThread(IPEndPoint address,
const QuicVersionVector& supported_versions, const QuicVersionVector& supported_versions,
bool strike_register_no_startup_period) bool strike_register_no_startup_period)
: SimpleThread("server_thread"), : SimpleThread("server_thread"),
listening_(true, false),
confirmed_(true, false), confirmed_(true, false),
pause_(true, false),
paused_(true, false),
resume_(true, false),
quit_(true, false), quit_(true, false),
server_(config, supported_versions), server_(config, supported_versions),
address_(address), address_(address),
port_(0) { port_(0),
initialized_(false) {
if (strike_register_no_startup_period) { if (strike_register_no_startup_period) {
server_.SetStrikeRegisterNoStartupPeriod(); server_.SetStrikeRegisterNoStartupPeriod();
} }
} }
ServerThread::~ServerThread() { ServerThread::~ServerThread() {}
}
void ServerThread::Initialize() {
if (initialized_) {
return;
}
void ServerThread::Run() {
server_.Listen(address_); server_.Listen(address_);
port_lock_.Acquire(); port_lock_.Acquire();
port_ = server_.port(); port_ = server_.port();
port_lock_.Release(); port_lock_.Release();
listening_.Signal(); initialized_ = true;
}
void ServerThread::Run() {
if (!initialized_) {
Initialize();
}
while (!quit_.IsSignaled()) { while (!quit_.IsSignaled()) {
event_loop_mu_.Acquire(); if (pause_.IsSignaled() && !resume_.IsSignaled()) {
paused_.Signal();
resume_.Wait();
}
server_.WaitForEvents(); server_.WaitForEvents();
MaybeNotifyOfHandshakeConfirmation(); MaybeNotifyOfHandshakeConfirmation();
event_loop_mu_.Release();
} }
server_.Shutdown(); server_.Shutdown();
...@@ -51,11 +66,7 @@ int ServerThread::GetPort() { ...@@ -51,11 +66,7 @@ int ServerThread::GetPort() {
port_lock_.Acquire(); port_lock_.Acquire();
int rc = port_; int rc = port_;
port_lock_.Release(); port_lock_.Release();
return rc; return rc;
}
void ServerThread::WaitForServerStartup() {
listening_.Wait();
} }
void ServerThread::WaitForCryptoHandshakeConfirmed() { void ServerThread::WaitForCryptoHandshakeConfirmed() {
...@@ -63,15 +74,21 @@ void ServerThread::WaitForCryptoHandshakeConfirmed() { ...@@ -63,15 +74,21 @@ void ServerThread::WaitForCryptoHandshakeConfirmed() {
} }
void ServerThread::Pause() { void ServerThread::Pause() {
event_loop_mu_.Acquire(); DCHECK(!pause_.IsSignaled());
pause_.Signal();
paused_.Wait();
} }
void ServerThread::Resume() { void ServerThread::Resume() {
event_loop_mu_.AssertAcquired(); // Checks the calling thread only! DCHECK(!resume_.IsSignaled());
event_loop_mu_.Release(); DCHECK(pause_.IsSignaled());
resume_.Signal();
} }
void ServerThread::Quit() { void ServerThread::Quit() {
if (pause_.IsSignaled() && !resume_.IsSignaled()) {
resume_.Signal();
}
quit_.Signal(); quit_.Signal();
} }
......
...@@ -24,19 +24,22 @@ class ServerThread : public base::SimpleThread { ...@@ -24,19 +24,22 @@ class ServerThread : public base::SimpleThread {
virtual ~ServerThread(); virtual ~ServerThread();
// SimpleThread implementation. // Prepares the server, but does not start accepting connections. Useful for
virtual void Run() OVERRIDE; // injecting mocks.
void Initialize();
// Waits until the server has started and is listening for requests. // Runs the event loop. Will initialize if necessary.
void WaitForServerStartup(); virtual void Run() OVERRIDE;
// Waits for the handshake to be confirmed for the first session created. // Waits for the handshake to be confirmed for the first session created.
void WaitForCryptoHandshakeConfirmed(); void WaitForCryptoHandshakeConfirmed();
// Pauses execution of the server until Resume() is called. // Pauses execution of the server until Resume() is called. May only be
// called once.
void Pause(); void Pause();
// Resumes execution of the server after Pause() has been called. // Resumes execution of the server after Pause() has been called. May only
// be called once.
void Resume(); void Resume();
// Stops the server from executing and shuts it down, destroying all // Stops the server from executing and shuts it down, destroying all
...@@ -54,10 +57,11 @@ class ServerThread : public base::SimpleThread { ...@@ -54,10 +57,11 @@ class ServerThread : public base::SimpleThread {
private: private:
void MaybeNotifyOfHandshakeConfirmation(); void MaybeNotifyOfHandshakeConfirmation();
base::Lock event_loop_mu_; // Held when the server is processing events.
base::WaitableEvent listening_; // Notified when the server is listening.
base::WaitableEvent confirmed_; // Notified when the first handshake is base::WaitableEvent confirmed_; // Notified when the first handshake is
// confirmed. // confirmed.
base::WaitableEvent pause_; // Notified when the server should pause.
base::WaitableEvent paused_; // Notitied when the server has paused
base::WaitableEvent resume_; // Notified when the server should resume.
base::WaitableEvent quit_; // Notified when the server should quit. base::WaitableEvent quit_; // Notified when the server should quit.
tools::QuicServer server_; tools::QuicServer server_;
...@@ -65,6 +69,8 @@ class ServerThread : public base::SimpleThread { ...@@ -65,6 +69,8 @@ class ServerThread : public base::SimpleThread {
base::Lock port_lock_; base::Lock port_lock_;
int port_; int port_;
bool initialized_;
DISALLOW_COPY_AND_ASSIGN(ServerThread); DISALLOW_COPY_AND_ASSIGN(ServerThread);
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment