Commit 9b681dcf authored by John Abd-El-Malek's avatar John Abd-El-Malek Committed by Commit Bot

Cleanup ipc_perftests.

-add an IO thread in child thread for the IPC case: this was missing and distorting results
-remove MojoChannelPerfTest.ChannelPingPong: this isn't representative of how IPC is used in chrome
-use idiomatic IPC macros: this doesn't change performance but is easier to read

Change-Id: I18fef0d766b2261ed8f6085fbc01d0215da6e1b4
Reviewed-on: https://chromium-review.googlesource.com/571327
Commit-Queue: John Abd-El-Malek <jam@chromium.org>
Reviewed-by: default avatarTom Sepez <tsepez@chromium.org>
Reviewed-by: default avatarKen Rockot <rockot@chromium.org>
Cr-Commit-Position: refs/heads/master@{#487277}
parent 037c2473
......@@ -24,49 +24,18 @@
#include "mojo/public/cpp/bindings/binding.h"
#include "mojo/public/cpp/system/message_pipe.h"
namespace IPC {
namespace {
// This class simply collects stats about abstract "events" (each of which has a
// start time and an end time).
class EventTimeTracker {
public:
explicit EventTimeTracker(const char* name) : name_(name), count_(0) {}
void AddEvent(const base::TimeTicks& start, const base::TimeTicks& end) {
DCHECK(end >= start);
count_++;
base::TimeDelta duration = end - start;
total_duration_ += duration;
max_duration_ = std::max(max_duration_, duration);
}
void ShowResults() const {
VLOG(1) << name_ << " count: " << count_;
VLOG(1) << name_ << " total duration: " << total_duration_.InMillisecondsF()
<< " ms";
VLOG(1) << name_ << " average duration: "
<< (total_duration_.InMillisecondsF() / static_cast<double>(count_))
<< " ms";
VLOG(1) << name_ << " maximum duration: " << max_duration_.InMillisecondsF()
<< " ms";
}
void Reset() {
count_ = 0;
total_duration_ = base::TimeDelta();
max_duration_ = base::TimeDelta();
}
#define IPC_MESSAGE_IMPL
#include "ipc/ipc_message_macros.h"
private:
const std::string name_;
#define IPC_MESSAGE_START TestMsgStart
uint64_t count_;
base::TimeDelta total_duration_;
base::TimeDelta max_duration_;
IPC_MESSAGE_CONTROL0(TestMsg_Hello)
IPC_MESSAGE_CONTROL0(TestMsg_Quit)
IPC_MESSAGE_CONTROL1(TestMsg_Ping, std::string)
IPC_SYNC_MESSAGE_CONTROL1_1(TestMsg_SyncPing, std::string, std::string)
DISALLOW_COPY_AND_ASSIGN(EventTimeTracker);
};
namespace IPC {
namespace {
class PerformanceChannelListener : public Listener {
public:
......@@ -75,8 +44,8 @@ class PerformanceChannelListener : public Listener {
sender_(NULL),
msg_count_(0),
msg_size_(0),
count_down_(0),
latency_tracker_("Server messages") {
sync_(false),
count_down_(0) {
VLOG(1) << "Server listener up";
}
......@@ -88,10 +57,11 @@ class PerformanceChannelListener : public Listener {
}
// Call this before running the message loop.
void SetTestParams(int msg_count, size_t msg_size) {
void SetTestParams(int msg_count, size_t msg_size, bool sync) {
DCHECK_EQ(0, count_down_);
msg_count_ = msg_count;
msg_size_ = msg_size;
sync_ = sync;
count_down_ = msg_count_;
payload_ = std::string(msg_size_, 'a');
}
......@@ -99,58 +69,59 @@ class PerformanceChannelListener : public Listener {
bool OnMessageReceived(const Message& message) override {
CHECK(sender_);
base::PickleIterator iter(message);
int64_t time_internal;
EXPECT_TRUE(iter.ReadInt64(&time_internal));
int msgid;
EXPECT_TRUE(iter.ReadInt(&msgid));
std::string reflected_payload;
EXPECT_TRUE(iter.ReadString(&reflected_payload));
// Include message deserialization in latency.
base::TimeTicks now = base::TimeTicks::Now();
bool handled = true;
IPC_BEGIN_MESSAGE_MAP(PerformanceChannelListener, message)
IPC_MESSAGE_HANDLER(TestMsg_Hello, OnHello)
IPC_MESSAGE_HANDLER(TestMsg_Ping, OnPing)
IPC_MESSAGE_UNHANDLED(handled = false)
IPC_END_MESSAGE_MAP()
return handled;
}
if (reflected_payload == "hello") {
void OnHello() {
// Start timing on hello.
latency_tracker_.Reset();
DCHECK(!perf_logger_.get());
std::string test_name =
base::StringPrintf("IPC_%s_Perf_%dx_%u", label_.c_str(), msg_count_,
static_cast<unsigned>(msg_size_));
perf_logger_.reset(new base::PerfTimeLogger(test_name.c_str()));
if (sync_) {
for (int i = 0; i < count_down_; ++i) {
std::string response;
sender_->Send(new TestMsg_SyncPing(payload_, &response));
}
base::MessageLoop::current()->QuitWhenIdle();
} else {
DCHECK_EQ(payload_.size(), reflected_payload.size());
SendPong();
}
}
latency_tracker_.AddEvent(
base::TimeTicks::FromInternalValue(time_internal), now);
void OnPing(const std::string& payload) {
// Include message deserialization in latency.
DCHECK_EQ(payload_.size(), payload.size());
CHECK(count_down_ > 0);
count_down_--;
if (count_down_ == 0) {
perf_logger_.reset(); // Stop the perf timer now.
latency_tracker_.ShowResults();
base::MessageLoop::current()->QuitWhenIdle();
return true;
}
return;
}
Message* msg = new Message(0, 2, Message::PRIORITY_NORMAL);
msg->WriteInt64(base::TimeTicks::Now().ToInternalValue());
msg->WriteInt(count_down_);
msg->WriteString(payload_);
sender_->Send(msg);
return true;
SendPong();
}
void SendPong() { sender_->Send(new TestMsg_Ping(payload_)); }
private:
std::string label_;
Sender* sender_;
int msg_count_;
size_t msg_size_;
bool sync_;
int count_down_;
std::string payload_;
EventTimeTracker latency_tracker_;
std::unique_ptr<base::PerfTimeLogger> perf_logger_;
};
......@@ -159,58 +130,46 @@ class PerformanceChannelListener : public Listener {
// "quit" is sent, it will exit.
class ChannelReflectorListener : public Listener {
public:
ChannelReflectorListener()
: channel_(NULL), latency_tracker_("Client messages") {
ChannelReflectorListener() : channel_(NULL) {
VLOG(1) << "Client listener up";
}
~ChannelReflectorListener() override {
VLOG(1) << "Client listener down";
latency_tracker_.ShowResults();
}
~ChannelReflectorListener() override { VLOG(1) << "Client listener down"; }
void Init(Channel* channel) {
void Init(Sender* channel) {
DCHECK(!channel_);
channel_ = channel;
}
bool OnMessageReceived(const Message& message) override {
CHECK(channel_);
bool handled = true;
IPC_BEGIN_MESSAGE_MAP(ChannelReflectorListener, message)
IPC_MESSAGE_HANDLER(TestMsg_Hello, OnHello)
IPC_MESSAGE_HANDLER(TestMsg_Ping, OnPing)
IPC_MESSAGE_HANDLER(TestMsg_SyncPing, OnSyncPing)
IPC_MESSAGE_HANDLER(TestMsg_Quit, OnQuit)
IPC_MESSAGE_UNHANDLED(handled = false)
IPC_END_MESSAGE_MAP()
return handled;
}
base::PickleIterator iter(message);
int64_t time_internal;
EXPECT_TRUE(iter.ReadInt64(&time_internal));
int msgid;
EXPECT_TRUE(iter.ReadInt(&msgid));
base::StringPiece payload;
EXPECT_TRUE(iter.ReadStringPiece(&payload));
// Include message deserialization in latency.
base::TimeTicks now = base::TimeTicks::Now();
void OnHello() { channel_->Send(new TestMsg_Hello); }
if (payload == "hello") {
latency_tracker_.Reset();
} else if (payload == "quit") {
latency_tracker_.ShowResults();
base::MessageLoop::current()->QuitWhenIdle();
return true;
} else {
// Don't track hello and quit messages.
latency_tracker_.AddEvent(
base::TimeTicks::FromInternalValue(time_internal), now);
void OnPing(const std::string& payload) {
channel_->Send(new TestMsg_Ping(payload));
}
Message* msg = new Message(0, 2, Message::PRIORITY_NORMAL);
msg->WriteInt64(base::TimeTicks::Now().ToInternalValue());
msg->WriteInt(msgid);
msg->WriteString(payload);
channel_->Send(msg);
return true;
void OnSyncPing(const std::string& payload, std::string* response) {
*response = payload;
}
void OnQuit() { base::MessageLoop::current()->QuitWhenIdle(); }
void Send(IPC::Message* message) { channel_->Send(message); }
private:
Channel* channel_;
EventTimeTracker latency_tracker_;
Sender* channel_;
};
// This class locks the current thread to a particular CPU core. This is
......@@ -302,44 +261,41 @@ class MojoChannelPerfTest : public IPCChannelMojoTestBase {
MojoChannelPerfTest() = default;
~MojoChannelPerfTest() override = default;
void RunTestChannelPingPong() {
void RunTestChannelProxyPingPong() {
io_thread_.reset(new base::TestIOThread(base::TestIOThread::kAutoStart));
Init("MojoPerfTestClient");
// Set up IPC channel and start client.
PerformanceChannelListener listener("Channel");
CreateChannel(&listener);
listener.Init(channel());
ASSERT_TRUE(ConnectChannel());
PerformanceChannelListener listener("ChannelProxy");
auto channel_proxy = IPC::ChannelProxy::Create(
TakeHandle().release(), IPC::Channel::MODE_SERVER, &listener,
io_thread_->task_runner());
listener.Init(channel_proxy.get());
LockThreadAffinity thread_locker(kSharedCore);
std::vector<PingPongTestParams> params = GetDefaultTestParams();
for (size_t i = 0; i < params.size(); i++) {
listener.SetTestParams(params[i].message_count(),
params[i].message_size());
params[i].message_size(), false);
// This initial message will kick-start the ping-pong of messages.
Message* message = new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("hello");
sender()->Send(message);
channel_proxy->Send(new TestMsg_Hello);
// Run message loop.
base::RunLoop().Run();
}
// Send quit message.
Message* message = new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("quit");
sender()->Send(message);
channel_proxy->Send(new TestMsg_Quit);
EXPECT_TRUE(WaitForClientShutdown());
DestroyChannel();
channel_proxy.reset();
io_thread_.reset();
}
void RunTestChannelProxyPingPong() {
void RunTestChannelProxySyncPing() {
io_thread_.reset(new base::TestIOThread(base::TestIOThread::kAutoStart));
Init("MojoPerfTestClient");
......@@ -355,25 +311,17 @@ class MojoChannelPerfTest : public IPCChannelMojoTestBase {
std::vector<PingPongTestParams> params = GetDefaultTestParams();
for (size_t i = 0; i < params.size(); i++) {
listener.SetTestParams(params[i].message_count(),
params[i].message_size());
params[i].message_size(), true);
// This initial message will kick-start the ping-pong of messages.
Message* message = new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("hello");
channel_proxy->Send(message);
channel_proxy->Send(new TestMsg_Hello);
// Run message loop.
base::RunLoop().Run();
}
// Send quit message.
Message* message = new Message(0, 2, Message::PRIORITY_NORMAL);
message->WriteInt64(base::TimeTicks::Now().ToInternalValue());
message->WriteInt(-1);
message->WriteString("quit");
channel_proxy->Send(message);
channel_proxy->Send(new TestMsg_Quit);
EXPECT_TRUE(WaitForClientShutdown());
channel_proxy.reset();
......@@ -391,13 +339,6 @@ class MojoChannelPerfTest : public IPCChannelMojoTestBase {
std::unique_ptr<base::TestIOThread> io_thread_;
};
TEST_F(MojoChannelPerfTest, ChannelPingPong) {
RunTestChannelPingPong();
base::RunLoop run_loop;
run_loop.RunUntilIdle();
}
TEST_F(MojoChannelPerfTest, ChannelProxyPingPong) {
RunTestChannelProxyPingPong();
......@@ -405,18 +346,11 @@ TEST_F(MojoChannelPerfTest, ChannelProxyPingPong) {
run_loop.RunUntilIdle();
}
// Test to see how many channels we can create.
TEST_F(MojoChannelPerfTest, DISABLED_MaxChannelCount) {
#if defined(OS_POSIX)
LOG(INFO) << "base::GetMaxFds " << base::GetMaxFds();
base::SetFdLimit(20000);
#endif
TEST_F(MojoChannelPerfTest, ChannelProxySyncPing) {
RunTestChannelProxySyncPing();
std::vector<mojo::edk::PlatformChannelPair*> channels;
for (size_t i = 0; i < 10000; ++i) {
LOG(INFO) << "channels size: " << channels.size();
channels.push_back(new mojo::edk::PlatformChannelPair());
}
base::RunLoop run_loop;
run_loop.RunUntilIdle();
}
class MojoPerfTestClient {
......@@ -430,17 +364,19 @@ class MojoPerfTestClient {
int Run(MojoHandle handle) {
handle_ = mojo::MakeScopedHandle(mojo::MessagePipeHandle(handle));
LockThreadAffinity thread_locker(kSharedCore);
std::unique_ptr<Channel> channel = ChannelMojo::Create(
std::move(handle_), Channel::MODE_CLIENT, listener_.get());
base::TestIOThread io_thread(base::TestIOThread::kAutoStart);
std::unique_ptr<ChannelProxy> channel =
IPC::ChannelProxy::Create(handle_.release(), Channel::MODE_CLIENT,
listener_.get(), io_thread.task_runner());
listener_->Init(channel.get());
CHECK(channel->Connect());
base::RunLoop().Run();
return 0;
}
private:
base::MessageLoopForIO main_message_loop_;
base::MessageLoop main_message_loop_;
std::unique_ptr<ChannelReflectorListener> listener_;
std::unique_ptr<Channel> channel_;
mojo::ScopedMessagePipeHandle handle_;
......@@ -472,6 +408,10 @@ class ReflectorImpl : public IPC::mojom::Reflector {
std::move(callback).Run(value);
}
void SyncPing(const std::string& value, PingCallback callback) override {
std::move(callback).Run(value);
}
void Quit() override { base::MessageLoop::current()->QuitWhenIdle(); }
mojo::Binding<IPC::mojom::Reflector> binding_;
......@@ -524,9 +464,17 @@ class MojoInterfacePerfTest : public mojo::edk::test::MojoTestBase {
}
}
if (sync_) {
for (int i = 0; i < count_down_; ++i) {
std::string response;
ping_receiver_->SyncPing(payload_, &response);
}
base::MessageLoop::current()->QuitWhenIdle();
} else {
ping_receiver_->Ping(payload_, base::Bind(&MojoInterfacePerfTest::OnPong,
base::Unretained(this)));
}
}
static int RunPingPongClient(MojoHandle mp) {
mojo::MessagePipeHandle mp_handle(mp);
......@@ -544,6 +492,8 @@ class MojoInterfacePerfTest : public mojo::edk::test::MojoTestBase {
return 0;
}
bool sync_ = false;
private:
int message_count_;
int count_down_;
......@@ -594,6 +544,14 @@ TEST_F(MojoInterfacePerfTest, MultiprocessPingPong) {
});
}
TEST_F(MojoInterfacePerfTest, MultiprocessSyncPing) {
sync_ = true;
RunTestClient("PingPongClient", [&](MojoHandle h) {
base::MessageLoop main_message_loop;
RunPingPongServer(h, "MultiprocessSync");
});
}
// A single process version of the above test.
TEST_P(MojoInProcessInterfacePerfTest, MultiThreadPingPong) {
MojoHandle server_handle, client_handle;
......
......@@ -31,6 +31,8 @@ interface IndirectTestDriver {
interface Reflector {
Ping(string value) => (string value);
[Sync]
SyncPing(string value) => (string response);
Quit();
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment