Commit 2e146d74 authored by jamesr's avatar jamesr Committed by Commit bot

Microbenchmark for the cost of posting tasks to different pump types

This adds microbenchmarks to measure the cost of posting a task to
a message loop running different pump types. This measures the wall and
(where supported) thread time that elapses from different numbers of
threads posting to one target. This is not designed to measure the time
taken by the target thread or fairness of posting threads.

Each test is set up with one target thread and 1-4 posting threads. The
posting threads each run one task that posts batches of tasks to the
target thread until they have been working for at least 5 seconds of
wall time. The tasks on the target thread simply increment a counter.
The test runner starts each posting thread, posts a start task to each
then joins them and then joins the target thread and aggregates stats.

BUG=412137

Review URL: https://codereview.chromium.org/551183002

Cr-Commit-Position: refs/heads/master@{#297264}
parent af3e9e0b
...@@ -35,5 +35,17 @@ class JavaHandlerThread { ...@@ -35,5 +35,17 @@ class JavaHandlerThread {
}); });
} }
@CalledByNative
private void stop(final long nativeThread, final long nativeEvent) {
new Handler(mThread.getLooper()).post(new Runnable() {
@Override
public void run() {
nativeStopThread(nativeThread, nativeEvent);
}
});
mThread.quitSafely();
}
private native void nativeInitializeThread(long nativeJavaHandlerThread, long nativeEvent); private native void nativeInitializeThread(long nativeJavaHandlerThread, long nativeEvent);
private native void nativeStopThread(long nativeJavaHandlerThread, long nativeEvent);
} }
...@@ -44,6 +44,15 @@ void JavaHandlerThread::Start() { ...@@ -44,6 +44,15 @@ void JavaHandlerThread::Start() {
} }
void JavaHandlerThread::Stop() { void JavaHandlerThread::Stop() {
JNIEnv* env = base::android::AttachCurrentThread();
base::WaitableEvent shutdown_event(false, false);
Java_JavaHandlerThread_stop(env,
java_thread_.obj(),
reinterpret_cast<intptr_t>(this),
reinterpret_cast<intptr_t>(&shutdown_event));
// Wait for thread to shut down before returning.
base::ThreadRestrictions::ScopedAllowWait wait_allowed;
shutdown_event.Wait();
} }
void JavaHandlerThread::InitializeThread(JNIEnv* env, jobject obj, void JavaHandlerThread::InitializeThread(JNIEnv* env, jobject obj,
...@@ -54,6 +63,11 @@ void JavaHandlerThread::InitializeThread(JNIEnv* env, jobject obj, ...@@ -54,6 +63,11 @@ void JavaHandlerThread::InitializeThread(JNIEnv* env, jobject obj,
reinterpret_cast<base::WaitableEvent*>(event)->Signal(); reinterpret_cast<base::WaitableEvent*>(event)->Signal();
} }
void JavaHandlerThread::StopThread(JNIEnv* env, jobject obj, jlong event) {
static_cast<MessageLoopForUI*>(message_loop_.get())->Quit();
reinterpret_cast<base::WaitableEvent*>(event)->Signal();
}
// static // static
bool JavaHandlerThread::RegisterBindings(JNIEnv* env) { bool JavaHandlerThread::RegisterBindings(JNIEnv* env) {
return RegisterNativesImpl(env); return RegisterNativesImpl(env);
......
...@@ -34,6 +34,7 @@ class BASE_EXPORT JavaHandlerThread { ...@@ -34,6 +34,7 @@ class BASE_EXPORT JavaHandlerThread {
// Called from java on the newly created thread. // Called from java on the newly created thread.
// Start() will not return before this methods has finished. // Start() will not return before this methods has finished.
void InitializeThread(JNIEnv* env, jobject obj, jlong event); void InitializeThread(JNIEnv* env, jobject obj, jlong event);
void StopThread(JNIEnv* env, jobject obj, jlong event);
static bool RegisterBindings(JNIEnv* env); static bool RegisterBindings(JNIEnv* env);
......
...@@ -816,6 +816,7 @@ ...@@ -816,6 +816,7 @@
], ],
'sources': [ 'sources': [
'threading/thread_perftest.cc', 'threading/thread_perftest.cc',
'message_loop/message_pump_perftest.cc',
'test/run_all_unittests.cc', 'test/run_all_unittests.cc',
'../testing/perf/perf_test.cc' '../testing/perf/perf_test.cc'
], ],
......
...@@ -391,12 +391,22 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate { ...@@ -391,12 +391,22 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
// Returns true if the message loop is "idle". Provided for testing. // Returns true if the message loop is "idle". Provided for testing.
bool IsIdleForTesting(); bool IsIdleForTesting();
// Wakes up the message pump. Can be called on any thread. The caller is
// responsible for synchronizing ScheduleWork() calls.
void ScheduleWork(bool was_empty);
// Returns the TaskAnnotator which is used to add debug information to posted
// tasks.
debug::TaskAnnotator* task_annotator() { return &task_annotator_; }
// Runs the specified PendingTask.
void RunTask(const PendingTask& pending_task);
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
protected: protected:
scoped_ptr<MessagePump> pump_; scoped_ptr<MessagePump> pump_;
private: private:
friend class internal::IncomingTaskQueue;
friend class RunLoop; friend class RunLoop;
// Configures various members for the two constructors. // Configures various members for the two constructors.
...@@ -408,9 +418,6 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate { ...@@ -408,9 +418,6 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
// Called to process any delayed non-nestable tasks. // Called to process any delayed non-nestable tasks.
bool ProcessNextDelayedNonNestableTask(); bool ProcessNextDelayedNonNestableTask();
// Runs the specified PendingTask.
void RunTask(const PendingTask& pending_task);
// Calls RunTask or queues the pending_task on the deferred task list if it // Calls RunTask or queues the pending_task on the deferred task list if it
// cannot be run right now. Returns true if the task was run. // cannot be run right now. Returns true if the task was run.
bool DeferOrRunPendingTask(const PendingTask& pending_task); bool DeferOrRunPendingTask(const PendingTask& pending_task);
...@@ -423,18 +430,10 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate { ...@@ -423,18 +430,10 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
// true if some work was done. // true if some work was done.
bool DeletePendingTasks(); bool DeletePendingTasks();
// Returns the TaskAnnotator which is used to add debug information to posted
// tasks.
debug::TaskAnnotator* task_annotator() { return &task_annotator_; }
// Loads tasks from the incoming queue to |work_queue_| if the latter is // Loads tasks from the incoming queue to |work_queue_| if the latter is
// empty. // empty.
void ReloadWorkQueue(); void ReloadWorkQueue();
// Wakes up the message pump. Can be called on any thread. The caller is
// responsible for synchronizing ScheduleWork() calls.
void ScheduleWork(bool was_empty);
// Start recording histogram info about events and action IF it was enabled // Start recording histogram info about events and action IF it was enabled
// and IF the statistics recorder can accept a registration of our histogram. // and IF the statistics recorder can accept a registration of our histogram.
void StartHistogrammer(); void StartHistogrammer();
......
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/bind.h"
#include "base/format_macros.h"
#include "base/memory/scoped_vector.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/thread.h"
#include "base/time/time.h"
#include "build/build_config.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/perf/perf_test.h"
#if defined(OS_ANDROID)
#include "base/android/java_handler_thread.h"
#endif
namespace base {
namespace {
class ScheduleWorkTest : public testing::Test {
public:
ScheduleWorkTest() : counter_(0) {}
void Increment(uint64_t amount) { counter_ += amount; }
void Schedule(int index) {
base::TimeTicks start = base::TimeTicks::HighResNow();
base::TimeTicks thread_start;
if (TimeTicks::IsThreadNowSupported())
thread_start = base::TimeTicks::ThreadNow();
base::TimeDelta minimum = base::TimeDelta::Max();
base::TimeDelta maximum = base::TimeDelta();
base::TimeTicks now, lastnow = start;
uint64_t schedule_calls = 0u;
do {
for (size_t i = 0; i < kBatchSize; ++i) {
target_message_loop()->ScheduleWork(true);
schedule_calls++;
}
now = base::TimeTicks::HighResNow();
base::TimeDelta laptime = now - lastnow;
lastnow = now;
minimum = std::min(minimum, laptime);
maximum = std::max(maximum, laptime);
} while (now - start < base::TimeDelta::FromSeconds(kTargetTimeSec));
scheduling_times_[index] = now - start;
if (TimeTicks::IsThreadNowSupported())
scheduling_thread_times_[index] =
base::TimeTicks::ThreadNow() - thread_start;
min_batch_times_[index] = minimum;
max_batch_times_[index] = maximum;
target_message_loop()->PostTask(FROM_HERE,
base::Bind(&ScheduleWorkTest::Increment,
base::Unretained(this),
schedule_calls));
}
void ScheduleWork(MessageLoop::Type target_type, int num_scheduling_threads) {
#if defined(OS_ANDROID)
if (target_type == MessageLoop::TYPE_JAVA) {
java_thread_.reset(new android::JavaHandlerThread("target"));
java_thread_->Start();
} else
#endif
{
target_.reset(new Thread("target"));
target_->StartWithOptions(Thread::Options(target_type, 0u));
}
ScopedVector<Thread> scheduling_threads;
scheduling_times_.reset(new base::TimeDelta[num_scheduling_threads]);
scheduling_thread_times_.reset(new base::TimeDelta[num_scheduling_threads]);
min_batch_times_.reset(new base::TimeDelta[num_scheduling_threads]);
max_batch_times_.reset(new base::TimeDelta[num_scheduling_threads]);
for (int i = 0; i < num_scheduling_threads; ++i) {
scheduling_threads.push_back(new Thread("posting thread"));
scheduling_threads[i]->Start();
}
for (int i = 0; i < num_scheduling_threads; ++i) {
scheduling_threads[i]->message_loop()->PostTask(
FROM_HERE,
base::Bind(&ScheduleWorkTest::Schedule, base::Unretained(this), i));
}
for (int i = 0; i < num_scheduling_threads; ++i) {
scheduling_threads[i]->Stop();
}
#if defined(OS_ANDROID)
if (target_type == MessageLoop::TYPE_JAVA) {
java_thread_->Stop();
java_thread_.reset();
} else
#endif
{
target_->Stop();
target_.reset();
}
base::TimeDelta total_time;
base::TimeDelta total_thread_time;
base::TimeDelta min_batch_time = base::TimeDelta::Max();
base::TimeDelta max_batch_time = base::TimeDelta();
for (int i = 0; i < num_scheduling_threads; ++i) {
total_time += scheduling_times_[i];
total_thread_time += scheduling_thread_times_[i];
min_batch_time = std::min(min_batch_time, min_batch_times_[i]);
max_batch_time = std::max(max_batch_time, max_batch_times_[i]);
}
std::string trace = StringPrintf(
"%d_threads_scheduling_to_%s_pump",
num_scheduling_threads,
target_type == MessageLoop::TYPE_IO
? "io"
: (target_type == MessageLoop::TYPE_UI ? "ui" : "default"));
perf_test::PrintResult(
"task",
"",
trace,
total_time.InMicroseconds() / static_cast<double>(counter_),
"us/task",
true);
perf_test::PrintResult(
"task",
"_min_batch_time",
trace,
min_batch_time.InMicroseconds() / static_cast<double>(kBatchSize),
"us/task",
false);
perf_test::PrintResult(
"task",
"_max_batch_time",
trace,
max_batch_time.InMicroseconds() / static_cast<double>(kBatchSize),
"us/task",
false);
if (TimeTicks::IsThreadNowSupported()) {
perf_test::PrintResult(
"task",
"_thread_time",
trace,
total_thread_time.InMicroseconds() / static_cast<double>(counter_),
"us/task",
true);
}
}
MessageLoop* target_message_loop() {
#if defined(OS_ANDROID)
if (java_thread_)
return java_thread_->message_loop();
#endif
return target_->message_loop();
}
private:
scoped_ptr<Thread> target_;
#if defined(OS_ANDROID)
scoped_ptr<android::JavaHandlerThread> java_thread_;
#endif
scoped_ptr<base::TimeDelta[]> scheduling_times_;
scoped_ptr<base::TimeDelta[]> scheduling_thread_times_;
scoped_ptr<base::TimeDelta[]> min_batch_times_;
scoped_ptr<base::TimeDelta[]> max_batch_times_;
uint64_t counter_;
static const size_t kTargetTimeSec = 5;
static const size_t kBatchSize = 1000;
};
TEST_F(ScheduleWorkTest, ThreadTimeToIOFromOneThread) {
ScheduleWork(MessageLoop::TYPE_IO, 1);
}
TEST_F(ScheduleWorkTest, ThreadTimeToIOFromTwoThreads) {
ScheduleWork(MessageLoop::TYPE_IO, 2);
}
TEST_F(ScheduleWorkTest, ThreadTimeToIOFromFourThreads) {
ScheduleWork(MessageLoop::TYPE_IO, 4);
}
TEST_F(ScheduleWorkTest, ThreadTimeToUIFromOneThread) {
ScheduleWork(MessageLoop::TYPE_UI, 1);
}
TEST_F(ScheduleWorkTest, ThreadTimeToUIFromTwoThreads) {
ScheduleWork(MessageLoop::TYPE_UI, 2);
}
TEST_F(ScheduleWorkTest, ThreadTimeToUIFromFourThreads) {
ScheduleWork(MessageLoop::TYPE_UI, 4);
}
TEST_F(ScheduleWorkTest, ThreadTimeToDefaultFromOneThread) {
ScheduleWork(MessageLoop::TYPE_DEFAULT, 1);
}
TEST_F(ScheduleWorkTest, ThreadTimeToDefaultFromTwoThreads) {
ScheduleWork(MessageLoop::TYPE_DEFAULT, 2);
}
TEST_F(ScheduleWorkTest, ThreadTimeToDefaultFromFourThreads) {
ScheduleWork(MessageLoop::TYPE_DEFAULT, 4);
}
#if defined(OS_ANDROID)
TEST_F(ScheduleWorkTest, ThreadTimeToJavaFromOneThread) {
ScheduleWork(MessageLoop::TYPE_JAVA, 1);
}
TEST_F(ScheduleWorkTest, ThreadTimeToJavaFromTwoThreads) {
ScheduleWork(MessageLoop::TYPE_JAVA, 2);
}
TEST_F(ScheduleWorkTest, ThreadTimeToJavaFromFourThreads) {
ScheduleWork(MessageLoop::TYPE_JAVA, 4);
}
#endif
static void DoNothing() {
}
class FakeMessagePump : public MessagePump {
public:
FakeMessagePump() {}
virtual ~FakeMessagePump() {}
virtual void Run(Delegate* delegate) OVERRIDE {}
virtual void Quit() OVERRIDE {}
virtual void ScheduleWork() OVERRIDE {}
virtual void ScheduleDelayedWork(
const TimeTicks& delayed_work_time) OVERRIDE {}
};
class PostTaskTest : public testing::Test {
public:
void Run(int batch_size, int tasks_per_reload) {
base::TimeTicks start = base::TimeTicks::HighResNow();
base::TimeTicks now;
MessageLoop loop(scoped_ptr<MessagePump>(new FakeMessagePump));
scoped_refptr<internal::IncomingTaskQueue> queue(
new internal::IncomingTaskQueue(&loop));
uint32_t num_posted = 0;
do {
for (int i = 0; i < batch_size; ++i) {
for (int j = 0; j < tasks_per_reload; ++j) {
queue->AddToIncomingQueue(
FROM_HERE, base::Bind(&DoNothing), base::TimeDelta(), false);
num_posted++;
}
TaskQueue loop_local_queue;
queue->ReloadWorkQueue(&loop_local_queue);
while (!loop_local_queue.empty()) {
PendingTask t = loop_local_queue.front();
loop_local_queue.pop();
loop.RunTask(t);
}
}
now = base::TimeTicks::HighResNow();
} while (now - start < base::TimeDelta::FromSeconds(5));
std::string trace = StringPrintf("%d_tasks_per_reload", tasks_per_reload);
perf_test::PrintResult(
"task",
"",
trace,
(now - start).InMicroseconds() / static_cast<double>(num_posted),
"us/task",
true);
}
};
TEST_F(PostTaskTest, OneTaskPerReload) {
Run(10000, 1);
}
TEST_F(PostTaskTest, TenTasksPerReload) {
Run(10000, 10);
}
TEST_F(PostTaskTest, OneHundredTasksPerReload) {
Run(1000, 100);
}
} // namespace
} // namespace base
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "base/bind.h" #include "base/bind.h"
#include "base/command_line.h" #include "base/command_line.h"
#include "base/memory/scoped_vector.h" #include "base/memory/scoped_vector.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/condition_variable.h" #include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event.h"
...@@ -259,7 +260,6 @@ typedef EventPerfTest<ConditionVariableEvent> ConditionVariablePerfTest; ...@@ -259,7 +260,6 @@ typedef EventPerfTest<ConditionVariableEvent> ConditionVariablePerfTest;
TEST_F(ConditionVariablePerfTest, EventPingPong) { TEST_F(ConditionVariablePerfTest, EventPingPong) {
RunPingPongTest("4_ConditionVariable_Threads", 4); RunPingPongTest("4_ConditionVariable_Threads", 4);
} }
#if defined(OS_POSIX) #if defined(OS_POSIX)
// Absolutely 100% minimal posix waitable event. If there is a better/faster // Absolutely 100% minimal posix waitable event. If there is a better/faster
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment