Commit ca86c9e8 authored by jam's avatar jam Committed by Commit bot

Add some perf tests to compare Mojo overhead.

Specifically, this compares making mojo interface calls vs just sending raw IPC::Messages or callbacks.

Review-Url: https://codereview.chromium.org/2608403002
Cr-Commit-Position: refs/heads/master@{#442016}
parent a37268b7
......@@ -206,12 +206,14 @@ if (!is_ios) {
deps = [
":ipc",
":test_interfaces",
":test_support",
"//base",
"//base:i18n",
"//base/test:test_support",
"//mojo/edk/system",
"//mojo/edk/test:test_support",
"//mojo/edk/test:test_support_impl",
"//testing/gtest",
]
}
......@@ -219,8 +221,6 @@ if (!is_ios) {
static_library("test_support") {
testonly = true
sources = [
"ipc_perftest_support.cc",
"ipc_perftest_support.h",
"ipc_security_test_util.cc",
"ipc_security_test_util.h",
"ipc_test_base.cc",
......
......@@ -8,36 +8,408 @@
#include "base/memory/ptr_util.h"
#include "base/process/process_metrics.h"
#include "base/run_loop.h"
#include "base/strings/stringprintf.h"
#include "base/test/perf_time_logger.h"
#include "base/test/test_io_thread.h"
#include "base/threading/thread_task_runner_handle.h"
#include "build/build_config.h"
#include "ipc/ipc_channel_mojo.h"
#include "ipc/ipc_perftest_support.h"
#include "ipc/ipc_test.mojom.h"
#include "ipc/ipc_test_base.h"
#include "mojo/edk/embedder/embedder.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/test/mojo_test_base.h"
#include "mojo/edk/test/multiprocess_test_helper.h"
#include "mojo/edk/test/scoped_ipc_support.h"
#include "mojo/public/cpp/bindings/binding.h"
#include "mojo/public/cpp/system/message_pipe.h"
namespace IPC {
namespace {
class MojoChannelPerfTest : public test::IPCChannelPerfTestBase {
// This class simply collects stats about abstract "events" (each of which has a
// start time and an end time).
class EventTimeTracker {
public:
void TearDown() override {
test::IPCChannelPerfTestBase::TearDown();
explicit EventTimeTracker(const char* name)
: name_(name),
count_(0) {
}
mojo::edk::test::MultiprocessTestHelper helper_;
void AddEvent(const base::TimeTicks& start, const base::TimeTicks& end) {
DCHECK(end >= start);
count_++;
base::TimeDelta duration = end - start;
total_duration_ += duration;
max_duration_ = std::max(max_duration_, duration);
}
void ShowResults() const {
VLOG(1) << name_ << " count: " << count_;
VLOG(1) << name_ << " total duration: "
<< total_duration_.InMillisecondsF() << " ms";
VLOG(1) << name_ << " average duration: "
<< (total_duration_.InMillisecondsF() / static_cast<double>(count_))
<< " ms";
VLOG(1) << name_ << " maximum duration: "
<< max_duration_.InMillisecondsF() << " ms";
}
void Reset() {
count_ = 0;
total_duration_ = base::TimeDelta();
max_duration_ = base::TimeDelta();
}
private:
const std::string name_;
uint64_t count_;
base::TimeDelta total_duration_;
base::TimeDelta max_duration_;
DISALLOW_COPY_AND_ASSIGN(EventTimeTracker);
};
class PerformanceChannelListener : public Listener {
public:
explicit PerformanceChannelListener(const std::string& label)
: label_(label),
sender_(NULL),
msg_count_(0),
msg_size_(0),
count_down_(0),
latency_tracker_("Server messages") {
VLOG(1) << "Server listener up";
}
~PerformanceChannelListener() override {
VLOG(1) << "Server listener down";
}
void Init(Sender* sender) {
DCHECK(!sender_);
sender_ = sender;
}
// Call this before running the message loop.
void SetTestParams(int msg_count, size_t msg_size) {
DCHECK_EQ(0, count_down_);
msg_count_ = msg_count;
msg_size_ = msg_size;
count_down_ = msg_count_;
payload_ = std::string(msg_size_, 'a');
}
bool OnMessageReceived(const Message& message) override {
CHECK(sender_);
base::PickleIterator iter(message);
int64_t time_internal;
EXPECT_TRUE(iter.ReadInt64(&time_internal));
int msgid;
EXPECT_TRUE(iter.ReadInt(&msgid));
std::string reflected_payload;
EXPECT_TRUE(iter.ReadString(&reflected_payload));
// Include message deserialization in latency.
base::TimeTicks now = base::TimeTicks::Now();
if (reflected_payload == "hello") {
// Start timing on hello.
latency_tracker_.Reset();
DCHECK(!perf_logger_.get());
std::string test_name =
base::StringPrintf("IPC_%s_Perf_%dx_%u",
label_.c_str(),
msg_count_,
static_cast<unsigned>(msg_size_));
perf_logger_.reset(new base::PerfTimeLogger(test_name.c_str()));
} else {
DCHECK_EQ(payload_.size(), reflected_payload.size());
latency_tracker_.AddEvent(
base::TimeTicks::FromInternalValue(time_internal), now);
CHECK(count_down_ > 0);
count_down_--;
if (count_down_ == 0) {
perf_logger_.reset(); // Stop the perf timer now.
latency_tracker_.ShowResults();
base::MessageLoop::current()->QuitWhenIdle();
return true;
}
}
Message* msg = new Message(0, 2, Message::PRIORITY_NORMAL);
msg->WriteInt64(base::TimeTicks::Now().ToInternalValue());
msg->WriteInt(count_down_);
msg->WriteString(payload_);
sender_->Send(msg);
return true;
}
private:
std::string label_;
Sender* sender_;
int msg_count_;
size_t msg_size_;
int count_down_;
std::string payload_;
EventTimeTracker latency_tracker_;
std::unique_ptr<base::PerfTimeLogger> perf_logger_;
};
// This channel listener just replies to all messages with the exact same
// message. It assumes each message has one string parameter. When the string
// "quit" is sent, it will exit.
class ChannelReflectorListener : public Listener {
public:
ChannelReflectorListener()
: channel_(NULL),
latency_tracker_("Client messages") {
VLOG(1) << "Client listener up";
}
~ChannelReflectorListener() override {
VLOG(1) << "Client listener down";
latency_tracker_.ShowResults();
}
void Init(Channel* channel) {
DCHECK(!channel_);
channel_ = channel;
}
bool OnMessageReceived(const Message& message) override {
CHECK(channel_);
base::PickleIterator iter(message);
int64_t time_internal;
EXPECT_TRUE(iter.ReadInt64(&time_internal));
int msgid;
EXPECT_TRUE(iter.ReadInt(&msgid));
base::StringPiece payload;
EXPECT_TRUE(iter.ReadStringPiece(&payload));
// Include message deserialization in latency.
base::TimeTicks now = base::TimeTicks::Now();
if (payload == "hello") {
latency_tracker_.Reset();
} else if (payload == "quit") {
latency_tracker_.ShowResults();
base::MessageLoop::current()->QuitWhenIdle();
return true;
} else {
// Don't track hello and quit messages.
latency_tracker_.AddEvent(
base::TimeTicks::FromInternalValue(time_internal), now);
}
Message* msg = new Message(0, 2, Message::PRIORITY_NORMAL);
msg->WriteInt64(base::TimeTicks::Now().ToInternalValue());
msg->WriteInt(msgid);
msg->WriteString(payload);
channel_->Send(msg);
return true;
}
private:
Channel* channel_;
EventTimeTracker latency_tracker_;
};
// This class locks the current thread to a particular CPU core. This is
// important because otherwise the different threads and processes of these
// tests end up on different CPU cores which means that all of the cores are
// lightly loaded so the OS (Windows and Linux) fails to ramp up the CPU
// frequency, leading to unpredictable and often poor performance.
class LockThreadAffinity {
public:
explicit LockThreadAffinity(int cpu_number) : affinity_set_ok_(false) {
#if defined(OS_WIN)
const DWORD_PTR thread_mask = static_cast<DWORD_PTR>(1) << cpu_number;
old_affinity_ = SetThreadAffinityMask(GetCurrentThread(), thread_mask);
affinity_set_ok_ = old_affinity_ != 0;
#elif defined(OS_LINUX)
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu_number, &cpuset);
auto get_result = sched_getaffinity(0, sizeof(old_cpuset_), &old_cpuset_);
DCHECK_EQ(0, get_result);
auto set_result = sched_setaffinity(0, sizeof(cpuset), &cpuset);
// Check for get_result failure, even though it should always succeed.
affinity_set_ok_ = (set_result == 0) && (get_result == 0);
#endif
if (!affinity_set_ok_)
LOG(WARNING) << "Failed to set thread affinity to CPU " << cpu_number;
}
~LockThreadAffinity() {
if (!affinity_set_ok_)
return;
#if defined(OS_WIN)
auto set_result = SetThreadAffinityMask(GetCurrentThread(), old_affinity_);
DCHECK_NE(0u, set_result);
#elif defined(OS_LINUX)
auto set_result = sched_setaffinity(0, sizeof(old_cpuset_), &old_cpuset_);
DCHECK_EQ(0, set_result);
#endif
}
private:
bool affinity_set_ok_;
#if defined(OS_WIN)
DWORD_PTR old_affinity_;
#elif defined(OS_LINUX)
cpu_set_t old_cpuset_;
#endif
DISALLOW_COPY_AND_ASSIGN(LockThreadAffinity);
};
class PingPongTestParams {
public:
PingPongTestParams(size_t size, int count)
: message_size_(size), message_count_(count) {
}
size_t message_size() const { return message_size_; }
int message_count() const { return message_count_; }
private:
size_t message_size_;
int message_count_;
};
std::vector<PingPongTestParams> GetDefaultTestParams() {
// Test several sizes. We use 12^N for message size, and limit the message
// count to keep the test duration reasonable.
#ifdef NDEBUG
const int kMultiplier = 100;
#else
// Debug builds on Windows run these tests orders of magnitude more slowly.
const int kMultiplier = 1;
#endif
std::vector<PingPongTestParams> list;
list.push_back(PingPongTestParams(12, 500 * kMultiplier));
list.push_back(PingPongTestParams(144, 500 * kMultiplier));
list.push_back(PingPongTestParams(1728, 500 * kMultiplier));
list.push_back(PingPongTestParams(20736, 120 * kMultiplier));
list.push_back(PingPongTestParams(248832, 10 * kMultiplier));
return list;
}
// Avoid core 0 due to conflicts with Intel's Power Gadget.
// Setting thread affinity will fail harmlessly on single/dual core machines.
const int kSharedCore = 2;
class MojoChannelPerfTest : public IPCChannelMojoTestBase {
public:
MojoChannelPerfTest() = default;
~MojoChannelPerfTest() override = default;
void RunTestChannelPingPong() {
Init("MojoPerfTestClient");
// Set up IPC channel and start client.
PerformanceChannelListener listener("Channel");
CreateChannel(&listener);
listener.Init(channel());
ASSERT_TRUE(ConnectChannel());
LockThreadAffinity thread_locker(kSharedCore);
std::vector<PingPongTestParams> params = GetDefaultTestParams();
for (size_t i = 0; i < params.size(); i++) {
listener.SetTestParams(params[i].message_count(),
params[i].message_size());
// This initial message will kick-start the ping-pong of messages.
Message* message =
new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("hello");
sender()->Send(message);
// Run message loop.
base::RunLoop().Run();
}
// Send quit message.
Message* message = new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("quit");
sender()->Send(message);
EXPECT_TRUE(WaitForClientShutdown());
DestroyChannel();
}
void RunTestChannelProxyPingPong() {
io_thread_.reset(new base::TestIOThread(base::TestIOThread::kAutoStart));
Init("MojoPerfTestClient");
// Set up IPC channel and start client.
PerformanceChannelListener listener("ChannelProxy");
auto channel_proxy = IPC::ChannelProxy::Create(
TakeHandle().release(), IPC::Channel::MODE_SERVER, &listener,
io_thread_->task_runner());
listener.Init(channel_proxy.get());
LockThreadAffinity thread_locker(kSharedCore);
std::vector<PingPongTestParams> params = GetDefaultTestParams();
for (size_t i = 0; i < params.size(); i++) {
listener.SetTestParams(params[i].message_count(),
params[i].message_size());
// This initial message will kick-start the ping-pong of messages.
Message* message = new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("hello");
channel_proxy->Send(message);
// Run message loop.
base::RunLoop().Run();
}
// Send quit message.
Message* message = new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("quit");
channel_proxy->Send(message);
EXPECT_TRUE(WaitForClientShutdown());
channel_proxy.reset();
io_thread_.reset();
}
scoped_refptr<base::TaskRunner> io_task_runner() {
if (io_thread_)
return io_thread_->task_runner();
return base::ThreadTaskRunnerHandle::Get();
}
private:
std::unique_ptr<base::TestIOThread> io_thread_;
};
TEST_F(MojoChannelPerfTest, ChannelPingPong) {
RunTestChannelPingPong(GetDefaultTestParams());
RunTestChannelPingPong();
base::RunLoop run_loop;
run_loop.RunUntilIdle();
}
TEST_F(MojoChannelPerfTest, ChannelProxyPingPong) {
RunTestChannelProxyPingPong(GetDefaultTestParams());
RunTestChannelProxyPingPong();
base::RunLoop run_loop;
run_loop.RunUntilIdle();
......@@ -57,36 +429,34 @@ TEST_F(MojoChannelPerfTest, DISABLED_MaxChannelCount) {
}
}
class MojoPerfTestClient : public test::PingPongTestClient {
class MojoPerfTestClient {
public:
typedef test::PingPongTestClient SuperType;
MojoPerfTestClient()
: listener_(new ChannelReflectorListener()) {
mojo::edk::test::MultiprocessTestHelper::ChildSetup();
}
MojoPerfTestClient();
~MojoPerfTestClient() = default;
std::unique_ptr<Channel> CreateChannel(Listener* listener) override;
int Run(MojoHandle handle) {
handle_ = mojo::MakeScopedHandle(mojo::MessagePipeHandle(handle));
LockThreadAffinity thread_locker(kSharedCore);
std::unique_ptr<Channel> channel = ChannelMojo::Create(
std::move(handle_), Channel::MODE_CLIENT, listener_.get());
listener_->Init(channel.get());
CHECK(channel->Connect());
int Run(MojoHandle handle);
base::RunLoop().Run();
return 0;
}
private:
mojo::edk::test::ScopedIPCSupport ipc_support_;
base::MessageLoopForIO main_message_loop_;
std::unique_ptr<ChannelReflectorListener> listener_;
std::unique_ptr<Channel> channel_;
mojo::ScopedMessagePipeHandle handle_;
};
MojoPerfTestClient::MojoPerfTestClient()
: ipc_support_(base::ThreadTaskRunnerHandle::Get()) {
mojo::edk::test::MultiprocessTestHelper::ChildSetup();
}
std::unique_ptr<Channel> MojoPerfTestClient::CreateChannel(Listener* listener) {
return ChannelMojo::Create(std::move(handle_), Channel::MODE_CLIENT,
listener);
}
int MojoPerfTestClient::Run(MojoHandle handle) {
handle_ = mojo::MakeScopedHandle(mojo::MessagePipeHandle(handle));
return RunMain();
}
MULTIPROCESS_TEST_MAIN(MojoPerfTestClientTestChildMain) {
MojoPerfTestClient client;
int rv = mojo::edk::test::MultiprocessTestHelper::RunClientMain(
......@@ -98,5 +468,208 @@ MULTIPROCESS_TEST_MAIN(MojoPerfTestClientTestChildMain) {
return rv;
}
class ReflectorImpl : public IPC::mojom::Reflector {
public:
explicit ReflectorImpl(mojo::ScopedMessagePipeHandle handle)
: binding_(this, std::move(handle)) {}
~ReflectorImpl() override {
ignore_result(binding_.Unbind().PassMessagePipe().release());
}
private:
// IPC::mojom::Reflector:
void Ping(const std::string& value, const PingCallback& callback) override {
callback.Run(value);
}
void Quit() override {
base::MessageLoop::current()->QuitWhenIdle();
}
mojo::Binding<IPC::mojom::Reflector> binding_;
};
class MojoInterfacePerfTest : public mojo::edk::test::MojoTestBase {
public:
MojoInterfacePerfTest() : message_count_(0), count_down_(0) {}
protected:
void RunPingPongServer(MojoHandle mp, const std::string& label) {
base::MessageLoop main_message_loop;
label_ = label;
mojo::MessagePipeHandle mp_handle(mp);
mojo::ScopedMessagePipeHandle scoped_mp(mp_handle);
ping_receiver_.Bind(IPC::mojom::ReflectorPtrInfo(
std::move(scoped_mp), 0u));
LockThreadAffinity thread_locker(kSharedCore);
std::vector<PingPongTestParams> params = GetDefaultTestParams();
for (size_t i = 0; i < params.size(); i++) {
ping_receiver_->Ping(
"hello",
base::Bind(&MojoInterfacePerfTest::OnPong, base::Unretained(this)));
message_count_ = count_down_ = params[i].message_count();
payload_ = std::string(params[i].message_size(), 'a');
base::RunLoop().Run();
}
ping_receiver_->Quit();
ignore_result(ping_receiver_.PassInterface().PassHandle().release());
}
void OnPong(const std::string& value) {
if (value == "hello") {
DCHECK(!perf_logger_.get());
std::string test_name =
base::StringPrintf("IPC_%s_Perf_%dx_%zu",
label_.c_str(),
message_count_,
payload_.size());
perf_logger_.reset(new base::PerfTimeLogger(test_name.c_str()));
} else {
DCHECK_EQ(payload_.size(), value.size());
CHECK(count_down_ > 0);
count_down_--;
if (count_down_ == 0) {
perf_logger_.reset();
base::MessageLoop::current()->QuitWhenIdle();
return;
}
}
ping_receiver_->Ping(
payload_,
base::Bind(&MojoInterfacePerfTest::OnPong, base::Unretained(this)));
}
static int RunPingPongClient(MojoHandle mp) {
mojo::MessagePipeHandle mp_handle(mp);
mojo::ScopedMessagePipeHandle scoped_mp(mp_handle);
// In single process mode, this is running in a task and by default other
// tasks (in particular, the binding) won't run. To keep the single process
// and multi-process code paths the same, enable nestable tasks.
base::MessageLoop::ScopedNestableTaskAllower nest_loop(
base::MessageLoop::current());
LockThreadAffinity thread_locker(kSharedCore);
ReflectorImpl impl(std::move(scoped_mp));
base::RunLoop().Run();
return 0;
}
private:
int message_count_;
int count_down_;
std::string label_;
std::string payload_;
IPC::mojom::ReflectorPtr ping_receiver_;
std::unique_ptr<base::PerfTimeLogger> perf_logger_;
DISALLOW_COPY_AND_ASSIGN(MojoInterfacePerfTest);
};
DEFINE_TEST_CLIENT_WITH_PIPE(PingPongClient, MojoInterfacePerfTest, h) {
base::MessageLoop main_message_loop;
return RunPingPongClient(h);
}
// Similar to MojoChannelPerfTest above, but uses a Mojo interface instead of
// raw IPC::Messages.
TEST_F(MojoInterfacePerfTest, MultiprocessPingPong) {
RUN_CHILD_ON_PIPE(PingPongClient, h)
RunPingPongServer(h, "MultiProcess");
END_CHILD()
}
// A single process version of the above test.
TEST_F(MojoInterfacePerfTest, SingleProcessPingPong) {
MojoHandle server_handle, client_handle;
CreateMessagePipe(&server_handle, &client_handle);
base::Thread client_thread("PingPongClient");
client_thread.Start();
client_thread.task_runner()->PostTask(
FROM_HERE,
base::Bind(base::IgnoreResult(&RunPingPongClient), client_handle));
RunPingPongServer(server_handle, "SingleProcess");
}
class CallbackPerfTest : public testing::Test {
public:
CallbackPerfTest()
: client_thread_("PingPongClient"), message_count_(0), count_down_(0) {}
protected:
void RunPingPongServer() {
client_thread_.Start();
LockThreadAffinity thread_locker(kSharedCore);
std::vector<PingPongTestParams> params = GetDefaultTestParams();
for (size_t i = 0; i < params.size(); i++) {
std::string hello("hello");
client_thread_.task_runner()->PostTask(
FROM_HERE,
base::Bind(&CallbackPerfTest::Ping, base::Unretained(this), hello));
message_count_ = count_down_ = params[i].message_count();
payload_ = std::string(params[i].message_size(), 'a');
base::RunLoop().Run();
}
}
void Ping(const std::string& value) {
main_message_loop.task_runner()->PostTask(
FROM_HERE,
base::Bind(&CallbackPerfTest::OnPong, base::Unretained(this),
value));
}
void OnPong(const std::string& value) {
if (value == "hello") {
DCHECK(!perf_logger_.get());
std::string test_name =
base::StringPrintf("Callback_Perf_%dx_%zu",
message_count_,
payload_.size());
perf_logger_.reset(new base::PerfTimeLogger(test_name.c_str()));
} else {
DCHECK_EQ(payload_.size(), value.size());
CHECK(count_down_ > 0);
count_down_--;
if (count_down_ == 0) {
perf_logger_.reset();
base::MessageLoop::current()->QuitWhenIdle();
return;
}
}
client_thread_.task_runner()->PostTask(
FROM_HERE,
base::Bind(&CallbackPerfTest::Ping, base::Unretained(this), payload_));
}
private:
base::Thread client_thread_;
base::MessageLoop main_message_loop;
int message_count_;
int count_down_;
std::string payload_;
std::unique_ptr<base::PerfTimeLogger> perf_logger_;
DISALLOW_COPY_AND_ASSIGN(CallbackPerfTest);
};
// Sends the same data as above using PostTask instead of IPCs for comparison.
TEST_F(CallbackPerfTest, PingPong) {
RunPingPongServer();
}
} // namespace
} // namespace IPC
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipc/ipc_perftest_support.h"
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <memory>
#include <string>
#include "base/logging.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/pickle.h"
#include "base/run_loop.h"
#include "base/strings/stringprintf.h"
#include "base/test/perf_time_logger.h"
#include "base/test/test_io_thread.h"
#include "base/threading/thread.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/time/time.h"
#include "build/build_config.h"
#include "ipc/ipc_channel.h"
#include "ipc/ipc_channel_proxy.h"
#include "ipc/ipc_descriptors.h"
#include "ipc/ipc_message_utils.h"
#include "ipc/ipc_sender.h"
#include "mojo/edk/test/scoped_ipc_support.h"
namespace IPC {
namespace test {
// Avoid core 0 due to conflicts with Intel's Power Gadget.
// Setting thread affinity will fail harmlessly on single/dual core machines.
const int kSharedCore = 2;
// This class simply collects stats about abstract "events" (each of which has a
// start time and an end time).
class EventTimeTracker {
public:
explicit EventTimeTracker(const char* name)
: name_(name),
count_(0) {
}
void AddEvent(const base::TimeTicks& start, const base::TimeTicks& end) {
DCHECK(end >= start);
count_++;
base::TimeDelta duration = end - start;
total_duration_ += duration;
max_duration_ = std::max(max_duration_, duration);
}
void ShowResults() const {
VLOG(1) << name_ << " count: " << count_;
VLOG(1) << name_ << " total duration: "
<< total_duration_.InMillisecondsF() << " ms";
VLOG(1) << name_ << " average duration: "
<< (total_duration_.InMillisecondsF() / static_cast<double>(count_))
<< " ms";
VLOG(1) << name_ << " maximum duration: "
<< max_duration_.InMillisecondsF() << " ms";
}
void Reset() {
count_ = 0;
total_duration_ = base::TimeDelta();
max_duration_ = base::TimeDelta();
}
private:
const std::string name_;
uint64_t count_;
base::TimeDelta total_duration_;
base::TimeDelta max_duration_;
DISALLOW_COPY_AND_ASSIGN(EventTimeTracker);
};
// This channel listener just replies to all messages with the exact same
// message. It assumes each message has one string parameter. When the string
// "quit" is sent, it will exit.
class ChannelReflectorListener : public Listener {
public:
ChannelReflectorListener()
: channel_(NULL),
latency_tracker_("Client messages") {
VLOG(1) << "Client listener up";
}
~ChannelReflectorListener() override {
VLOG(1) << "Client listener down";
latency_tracker_.ShowResults();
}
void Init(Channel* channel) {
DCHECK(!channel_);
channel_ = channel;
}
bool OnMessageReceived(const Message& message) override {
CHECK(channel_);
base::PickleIterator iter(message);
int64_t time_internal;
EXPECT_TRUE(iter.ReadInt64(&time_internal));
int msgid;
EXPECT_TRUE(iter.ReadInt(&msgid));
base::StringPiece payload;
EXPECT_TRUE(iter.ReadStringPiece(&payload));
// Include message deserialization in latency.
base::TimeTicks now = base::TimeTicks::Now();
if (payload == "hello") {
latency_tracker_.Reset();
} else if (payload == "quit") {
latency_tracker_.ShowResults();
base::MessageLoop::current()->QuitWhenIdle();
return true;
} else {
// Don't track hello and quit messages.
latency_tracker_.AddEvent(
base::TimeTicks::FromInternalValue(time_internal), now);
}
Message* msg = new Message(0, 2, Message::PRIORITY_NORMAL);
msg->WriteInt64(base::TimeTicks::Now().ToInternalValue());
msg->WriteInt(msgid);
msg->WriteString(payload);
channel_->Send(msg);
return true;
}
private:
Channel* channel_;
EventTimeTracker latency_tracker_;
};
class PerformanceChannelListener : public Listener {
public:
explicit PerformanceChannelListener(const std::string& label)
: label_(label),
sender_(NULL),
msg_count_(0),
msg_size_(0),
count_down_(0),
latency_tracker_("Server messages") {
VLOG(1) << "Server listener up";
}
~PerformanceChannelListener() override {
VLOG(1) << "Server listener down";
}
void Init(Sender* sender) {
DCHECK(!sender_);
sender_ = sender;
}
// Call this before running the message loop.
void SetTestParams(int msg_count, size_t msg_size) {
DCHECK_EQ(0, count_down_);
msg_count_ = msg_count;
msg_size_ = msg_size;
count_down_ = msg_count_;
payload_ = std::string(msg_size_, 'a');
}
bool OnMessageReceived(const Message& message) override {
CHECK(sender_);
base::PickleIterator iter(message);
int64_t time_internal;
EXPECT_TRUE(iter.ReadInt64(&time_internal));
int msgid;
EXPECT_TRUE(iter.ReadInt(&msgid));
std::string reflected_payload;
EXPECT_TRUE(iter.ReadString(&reflected_payload));
// Include message deserialization in latency.
base::TimeTicks now = base::TimeTicks::Now();
if (reflected_payload == "hello") {
// Start timing on hello.
latency_tracker_.Reset();
DCHECK(!perf_logger_.get());
std::string test_name =
base::StringPrintf("IPC_%s_Perf_%dx_%u",
label_.c_str(),
msg_count_,
static_cast<unsigned>(msg_size_));
perf_logger_.reset(new base::PerfTimeLogger(test_name.c_str()));
} else {
DCHECK_EQ(payload_.size(), reflected_payload.size());
latency_tracker_.AddEvent(
base::TimeTicks::FromInternalValue(time_internal), now);
CHECK(count_down_ > 0);
count_down_--;
if (count_down_ == 0) {
perf_logger_.reset(); // Stop the perf timer now.
latency_tracker_.ShowResults();
base::MessageLoop::current()->QuitWhenIdle();
return true;
}
}
Message* msg = new Message(0, 2, Message::PRIORITY_NORMAL);
msg->WriteInt64(base::TimeTicks::Now().ToInternalValue());
msg->WriteInt(count_down_);
msg->WriteString(payload_);
sender_->Send(msg);
return true;
}
private:
std::string label_;
Sender* sender_;
int msg_count_;
size_t msg_size_;
int count_down_;
std::string payload_;
EventTimeTracker latency_tracker_;
std::unique_ptr<base::PerfTimeLogger> perf_logger_;
};
IPCChannelPerfTestBase::IPCChannelPerfTestBase() = default;
IPCChannelPerfTestBase::~IPCChannelPerfTestBase() = default;
std::vector<PingPongTestParams>
IPCChannelPerfTestBase::GetDefaultTestParams() {
// Test several sizes. We use 12^N for message size, and limit the message
// count to keep the test duration reasonable.
#ifdef NDEBUG
const int kMultiplier = 100;
#else
// Debug builds on Windows run these tests orders of magnitude more slowly.
const int kMultiplier = 1;
#endif
std::vector<PingPongTestParams> list;
list.push_back(PingPongTestParams(12, 500 * kMultiplier));
list.push_back(PingPongTestParams(144, 500 * kMultiplier));
list.push_back(PingPongTestParams(1728, 500 * kMultiplier));
list.push_back(PingPongTestParams(20736, 120 * kMultiplier));
list.push_back(PingPongTestParams(248832, 10 * kMultiplier));
return list;
}
void IPCChannelPerfTestBase::RunTestChannelPingPong(
const std::vector<PingPongTestParams>& params) {
auto message_loop = base::MakeUnique<base::MessageLoopForIO>();
mojo::edk::test::ScopedIPCSupport ipc_support(message_loop->task_runner());
InitWithCustomMessageLoop("MojoPerfTestClient", std::move(message_loop));
// Set up IPC channel and start client.
PerformanceChannelListener listener("Channel");
CreateChannel(&listener);
listener.Init(channel());
ASSERT_TRUE(ConnectChannel());
LockThreadAffinity thread_locker(kSharedCore);
for (size_t i = 0; i < params.size(); i++) {
listener.SetTestParams(params[i].message_count(),
params[i].message_size());
// This initial message will kick-start the ping-pong of messages.
Message* message =
new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("hello");
sender()->Send(message);
// Run message loop.
base::RunLoop().Run();
}
// Send quit message.
Message* message = new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("quit");
sender()->Send(message);
EXPECT_TRUE(WaitForClientShutdown());
DestroyChannel();
}
void IPCChannelPerfTestBase::RunTestChannelProxyPingPong(
const std::vector<PingPongTestParams>& params) {
io_thread_.reset(new base::TestIOThread(base::TestIOThread::kAutoStart));
{
auto message_loop = base::MakeUnique<base::MessageLoopForIO>();
mojo::edk::test::ScopedIPCSupport ipc_support(io_thread_->task_runner());
InitWithCustomMessageLoop("MojoPerfTestClient", std::move(message_loop));
// Set up IPC channel and start client.
PerformanceChannelListener listener("ChannelProxy");
auto channel_proxy = IPC::ChannelProxy::Create(
TakeHandle().release(), IPC::Channel::MODE_SERVER, &listener,
io_thread_->task_runner());
listener.Init(channel_proxy.get());
LockThreadAffinity thread_locker(kSharedCore);
for (size_t i = 0; i < params.size(); i++) {
listener.SetTestParams(params[i].message_count(),
params[i].message_size());
// This initial message will kick-start the ping-pong of messages.
Message* message = new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("hello");
channel_proxy->Send(message);
// Run message loop.
base::RunLoop().Run();
}
// Send quit message.
Message* message = new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("quit");
channel_proxy->Send(message);
EXPECT_TRUE(WaitForClientShutdown());
channel_proxy.reset();
}
io_thread_.reset();
}
PingPongTestClient::PingPongTestClient()
: listener_(new ChannelReflectorListener()) {
}
PingPongTestClient::~PingPongTestClient() {
}
int PingPongTestClient::RunMain() {
LockThreadAffinity thread_locker(kSharedCore);
std::unique_ptr<Channel> channel = CreateChannel(listener_.get());
listener_->Init(channel.get());
CHECK(channel->Connect());
base::RunLoop().Run();
return 0;
}
scoped_refptr<base::TaskRunner> PingPongTestClient::task_runner() {
return main_message_loop_.task_runner();
}
LockThreadAffinity::LockThreadAffinity(int cpu_number)
: affinity_set_ok_(false) {
#if defined(OS_WIN)
const DWORD_PTR thread_mask = static_cast<DWORD_PTR>(1) << cpu_number;
old_affinity_ = SetThreadAffinityMask(GetCurrentThread(), thread_mask);
affinity_set_ok_ = old_affinity_ != 0;
#elif defined(OS_LINUX)
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu_number, &cpuset);
auto get_result = sched_getaffinity(0, sizeof(old_cpuset_), &old_cpuset_);
DCHECK_EQ(0, get_result);
auto set_result = sched_setaffinity(0, sizeof(cpuset), &cpuset);
// Check for get_result failure, even though it should always succeed.
affinity_set_ok_ = (set_result == 0) && (get_result == 0);
#endif
if (!affinity_set_ok_)
LOG(WARNING) << "Failed to set thread affinity to CPU " << cpu_number;
}
LockThreadAffinity::~LockThreadAffinity() {
if (!affinity_set_ok_)
return;
#if defined(OS_WIN)
auto set_result = SetThreadAffinityMask(GetCurrentThread(), old_affinity_);
DCHECK_NE(0u, set_result);
#elif defined(OS_LINUX)
auto set_result = sched_setaffinity(0, sizeof(old_cpuset_), &old_cpuset_);
DCHECK_EQ(0, set_result);
#endif
}
} // namespace test
} // namespace IPC
// Copyright (c) 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef IPC_IPC_PERFTEST_SUPPORT_H_
#define IPC_IPC_PERFTEST_SUPPORT_H_
#include <stddef.h>
#include <memory>
#include <vector>
#include "base/macros.h"
#include "base/test/test_io_thread.h"
#include "base/threading/thread_task_runner_handle.h"
#include "build/build_config.h"
#include "ipc/ipc_test_base.h"
namespace IPC {
namespace test {
class ChannelReflectorListener;
class PingPongTestParams {
public:
PingPongTestParams(size_t size, int count)
: message_size_(size), message_count_(count) {
}
size_t message_size() const { return message_size_; }
int message_count() const { return message_count_; }
private:
size_t message_size_;
int message_count_;
};
class IPCChannelPerfTestBase : public IPCChannelMojoTestBase {
public:
IPCChannelPerfTestBase();
~IPCChannelPerfTestBase() override;
static std::vector<PingPongTestParams> GetDefaultTestParams();
void RunTestChannelPingPong(
const std::vector<PingPongTestParams>& params_list);
void RunTestChannelProxyPingPong(
const std::vector<PingPongTestParams>& params_list);
scoped_refptr<base::TaskRunner> io_task_runner() {
if (io_thread_)
return io_thread_->task_runner();
return base::ThreadTaskRunnerHandle::Get();
}
private:
std::unique_ptr<base::TestIOThread> io_thread_;
};
class PingPongTestClient {
public:
PingPongTestClient();
virtual ~PingPongTestClient();
virtual std::unique_ptr<Channel> CreateChannel(Listener* listener) = 0;
int RunMain();
scoped_refptr<base::TaskRunner> task_runner();
private:
base::MessageLoopForIO main_message_loop_;
std::unique_ptr<ChannelReflectorListener> listener_;
std::unique_ptr<Channel> channel_;
};
// This class locks the current thread to a particular CPU core. This is
// important because otherwise the different threads and processes of these
// tests end up on different CPU cores which means that all of the cores are
// lightly loaded so the OS (Windows and Linux) fails to ramp up the CPU
// frequency, leading to unpredictable and often poor performance.
class LockThreadAffinity {
public:
explicit LockThreadAffinity(int cpu_number);
~LockThreadAffinity();
private:
bool affinity_set_ok_;
#if defined(OS_WIN)
DWORD_PTR old_affinity_;
#elif defined(OS_LINUX)
cpu_set_t old_cpuset_;
#endif
DISALLOW_COPY_AND_ASSIGN(LockThreadAffinity);
};
}
}
#endif // IPC_IPC_PERFTEST_SUPPORT_H_
......@@ -28,3 +28,8 @@ interface PingReceiver {
interface IndirectTestDriver {
GetPingReceiver(associated PingReceiver& request);
};
interface Reflector {
Ping(string value) => (string value);
Quit();
};
......@@ -6,12 +6,18 @@
#include "base/command_line.h"
#include "base/test/perf_test_suite.h"
#include "base/test/test_io_thread.h"
#include "mojo/edk/embedder/embedder.h"
#include "mojo/edk/test/scoped_ipc_support.h"
#include "mojo/edk/test/test_support_impl.h"
int main(int argc, char** argv) {
base::PerfTestSuite test(argc, argv);
mojo::edk::Init();
base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
mojo::edk::test::ScopedIPCSupport ipc_support(test_io_thread.task_runner());
mojo::test::TestSupport::Init(new mojo::edk::test::TestSupportImpl());
return test.Run();
}
......@@ -21,10 +21,7 @@ int main(int argc, char** argv) {
mojo::edk::Init();
base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
// Leak this because its destructor calls mojo::edk::ShutdownIPCSupport which
// really does nothing in the new EDK but does depend on the current message
// loop, which is destructed inside base::LaunchUnitTests.
new mojo::edk::test::ScopedIPCSupport(test_io_thread.task_runner());
mojo::edk::test::ScopedIPCSupport ipc_support(test_io_thread.task_runner());
mojo::test::TestSupport::Init(new mojo::edk::test::TestSupportImpl());
return test.Run();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment