Commit 4b795006 authored by Sigurdur Asgeirsson's avatar Sigurdur Asgeirsson Committed by Commit Bot

Add more instrumentation to MessageQuotaChecker.

Keeping track of the average write rate on the instrumented message pipe
will allow reasoning on whether there's a spurt of messages coming
through, or whether the unread quota protocol implementation is
stalling somehow.
Likewise the additional cumulative counts and the lifetime will shed
some light on the usage of the abused interfaces.

Bug: 1011441
Change-Id: I9e3d2b1fcfe2a264208108d2c7ff3cdc4dd11e22
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2013480
Commit-Queue: Sigurður Ásgeirsson <siggi@chromium.org>
Reviewed-by: default avatarKen Rockot <rockot@google.com>
Reviewed-by: default avatarChris Hamilton <chrisha@chromium.org>
Cr-Commit-Position: refs/heads/master@{#734528}
parent b77b38a6
...@@ -38,7 +38,9 @@ const base::FeatureParam<int> kMojoRecordUnreadMessageCountCrashThreshold = { ...@@ -38,7 +38,9 @@ const base::FeatureParam<int> kMojoRecordUnreadMessageCountCrashThreshold = {
NOINLINE void MaybeDumpWithoutCrashing( NOINLINE void MaybeDumpWithoutCrashing(
size_t total_quota_used, size_t total_quota_used,
base::Optional<size_t> message_pipe_quota_used) { base::Optional<size_t> message_pipe_quota_used,
int64_t seconds_since_construction,
double average_write_rate) {
static bool have_crashed = false; static bool have_crashed = false;
if (have_crashed) if (have_crashed)
return; return;
...@@ -54,18 +56,33 @@ NOINLINE void MaybeDumpWithoutCrashing( ...@@ -54,18 +56,33 @@ NOINLINE void MaybeDumpWithoutCrashing(
local_quota_used -= message_pipe_quota_used.value(); local_quota_used -= message_pipe_quota_used.value();
} }
// Normalize the write rate to writes/second.
double average_write_rate_per_second =
average_write_rate /
MessageQuotaChecker::DecayingRateAverage::kSecondsPerSamplingInterval;
base::debug::Alias(&total_quota_used); base::debug::Alias(&total_quota_used);
base::debug::Alias(&local_quota_used); base::debug::Alias(&local_quota_used);
base::debug::Alias(&had_message_pipe); base::debug::Alias(&had_message_pipe);
base::debug::Alias(&seconds_since_construction);
base::debug::Alias(&average_write_rate_per_second);
// This is happening because the user of the interface implicated on the crash // This is happening because the user of the interface implicated on the crash
// stack has queued up an unreasonable number of messages, namely // stack has queued up an unreasonable number of messages, namely
// |quota_used|. // |total_quota_used|.
base::debug::DumpWithoutCrashing(); base::debug::DumpWithoutCrashing();
} }
int64_t ToSamplingInterval(base::TimeTicks when) {
return (when - base::TimeTicks::UnixEpoch()).InSeconds() /
MessageQuotaChecker::DecayingRateAverage::kSecondsPerSamplingInterval;
}
} // namespace } // namespace
constexpr size_t
MessageQuotaChecker::DecayingRateAverage::kSecondsPerSamplingInterval;
constexpr double MessageQuotaChecker::DecayingRateAverage::kSampleWeight;
// static // static
scoped_refptr<MessageQuotaChecker> MessageQuotaChecker::MaybeCreate() { scoped_refptr<MessageQuotaChecker> MessageQuotaChecker::MaybeCreate() {
static const Configuration config = GetConfiguration(); static const Configuration config = GetConfiguration();
...@@ -73,11 +90,13 @@ scoped_refptr<MessageQuotaChecker> MessageQuotaChecker::MaybeCreate() { ...@@ -73,11 +90,13 @@ scoped_refptr<MessageQuotaChecker> MessageQuotaChecker::MaybeCreate() {
} }
void MessageQuotaChecker::BeforeWrite() { void MessageQuotaChecker::BeforeWrite() {
++messages_written_;
QuotaCheckImpl(0u); QuotaCheckImpl(0u);
} }
void MessageQuotaChecker::BeforeMessagesEnqueued(size_t num) { void MessageQuotaChecker::BeforeMessagesEnqueued(size_t num) {
DCHECK_NE(num, 0u); DCHECK_NE(num, 0u);
messages_enqueued_ += num;
QuotaCheckImpl(num); QuotaCheckImpl(num);
} }
...@@ -85,7 +104,7 @@ void MessageQuotaChecker::AfterMessagesDequeued(size_t num) { ...@@ -85,7 +104,7 @@ void MessageQuotaChecker::AfterMessagesDequeued(size_t num) {
base::AutoLock hold(lock_); base::AutoLock hold(lock_);
DCHECK_LE(num, consumed_quota_); DCHECK_LE(num, consumed_quota_);
DCHECK_NE(num, 0u); DCHECK_NE(num, 0u);
messages_dequeued_ += num;
consumed_quota_ -= num; consumed_quota_ -= num;
} }
...@@ -129,7 +148,7 @@ scoped_refptr<MessageQuotaChecker> MessageQuotaChecker::MaybeCreateForTesting( ...@@ -129,7 +148,7 @@ scoped_refptr<MessageQuotaChecker> MessageQuotaChecker::MaybeCreateForTesting(
} }
MessageQuotaChecker::MessageQuotaChecker(const Configuration* config) MessageQuotaChecker::MessageQuotaChecker(const Configuration* config)
: config_(config) {} : config_(config), creation_time_(base::TimeTicks::Now()) {}
MessageQuotaChecker::~MessageQuotaChecker() = default; MessageQuotaChecker::~MessageQuotaChecker() = default;
// static // static
...@@ -185,12 +204,14 @@ void MessageQuotaChecker::QuotaCheckImpl(size_t num_enqueued) { ...@@ -185,12 +204,14 @@ void MessageQuotaChecker::QuotaCheckImpl(size_t num_enqueued) {
// local and the message pipe queues into individual variables, then pass them // local and the message pipe queues into individual variables, then pass them
// into the crashing function. // into the crashing function.
size_t total_quota_used = 0u; size_t total_quota_used = 0u;
base::TimeTicks now;
double average_write_rate = 0.0;
base::Optional<size_t> message_pipe_quota_used; base::Optional<size_t> message_pipe_quota_used;
{ {
base::AutoLock hold(lock_); base::AutoLock hold(lock_);
message_pipe_quota_used = GetCurrentMessagePipeQuota(); message_pipe_quota_used = GetCurrentMessagePipeQuota();
now = base::TimeTicks::Now();
if (num_enqueued) { if (num_enqueued) {
consumed_quota_ += num_enqueued; consumed_quota_ += num_enqueued;
} else { } else {
...@@ -200,6 +221,9 @@ void MessageQuotaChecker::QuotaCheckImpl(size_t num_enqueued) { ...@@ -200,6 +221,9 @@ void MessageQuotaChecker::QuotaCheckImpl(size_t num_enqueued) {
DCHECK(message_pipe_); DCHECK(message_pipe_);
DCHECK(message_pipe_quota_used.has_value()); DCHECK(message_pipe_quota_used.has_value());
// Accrue this write event to the write rate average.
write_rate_average_.AccrueEvent(now);
// Account for the message about to be written to the message pipe in the // Account for the message about to be written to the message pipe in the
// the full tally. // the full tally.
++message_pipe_quota_used.value(); ++message_pipe_quota_used.value();
...@@ -212,14 +236,72 @@ void MessageQuotaChecker::QuotaCheckImpl(size_t num_enqueued) { ...@@ -212,14 +236,72 @@ void MessageQuotaChecker::QuotaCheckImpl(size_t num_enqueued) {
if (total_quota_used > max_consumed_quota_) { if (total_quota_used > max_consumed_quota_) {
max_consumed_quota_ = total_quota_used; max_consumed_quota_ = total_quota_used;
new_max = true; new_max = true;
// Retrieve the average rate, in the case that a crash is imminent.
average_write_rate = write_rate_average_.GetDecayedRateAverage(now);
} }
} }
if (new_max && config_->crash_threshold != 0 && if (new_max && config_->crash_threshold != 0 &&
total_quota_used >= config_->crash_threshold) { total_quota_used >= config_->crash_threshold) {
config_->maybe_crash_function(total_quota_used, message_pipe_quota_used); DCHECK(!now.is_null());
int64_t seconds_since_construction = (now - creation_time_).InSeconds();
config_->maybe_crash_function(total_quota_used, message_pipe_quota_used,
seconds_since_construction,
average_write_rate);
} }
} }
MessageQuotaChecker::DecayingRateAverage::DecayingRateAverage() {
events_sampling_interval_ = ToSamplingInterval(base::TimeTicks::Now());
// Pretend the current decayed average is one sampling interval old to
// maintain an easy invariant that
// |events_sampling_interval_| > |decayed_average_sampling_interval_|.
decayed_average_sampling_interval_ = events_sampling_interval_ - 1;
}
void MessageQuotaChecker::DecayingRateAverage::AccrueEvent(
base::TimeTicks when) {
DCHECK_GT(events_sampling_interval_, decayed_average_sampling_interval_);
const int64_t sampling_interval = ToSamplingInterval(when);
DCHECK_GE(sampling_interval, events_sampling_interval_);
if (sampling_interval == events_sampling_interval_) {
// The time is still in the sampling interval, just add the event.
++events_;
return;
}
DCHECK_GT(sampling_interval, decayed_average_sampling_interval_);
// Add the new sample and decay it to the previous event sampling interval.
// A new sample is weighed at kSampleWeight into the average, whereas the old
// average is weighed at (1-kSampleWeight)^age;
const int64_t avg_age =
events_sampling_interval_ - decayed_average_sampling_interval_;
decayed_average_ = decayed_average_ * pow(1 - kSampleWeight, avg_age) +
kSampleWeight * events_;
decayed_average_sampling_interval_ = events_sampling_interval_;
// Start a new event sampling interval.
events_ = 1;
events_sampling_interval_ = sampling_interval;
}
double MessageQuotaChecker::DecayingRateAverage::GetDecayedRateAverage(
base::TimeTicks when) const {
DCHECK_GT(events_sampling_interval_, decayed_average_sampling_interval_);
// Compute the current rate average as of |events_sampling_interval_|.
const int64_t avg_age =
events_sampling_interval_ - decayed_average_sampling_interval_;
double avg = decayed_average_ * pow(1 - kSampleWeight, avg_age) +
kSampleWeight * events_;
// Age the average to |when|.
const int64_t sampling_interval = ToSamplingInterval(when);
DCHECK_GE(sampling_interval, events_sampling_interval_);
return avg *
pow(1 - kSampleWeight, sampling_interval - events_sampling_interval_);
}
} // namespace internal } // namespace internal
} // namespace mojo } // namespace mojo
...@@ -6,11 +6,13 @@ ...@@ -6,11 +6,13 @@
#define MOJO_PUBLIC_CPP_BINDINGS_LIB_MESSAGE_QUOTA_CHECKER_H_ #define MOJO_PUBLIC_CPP_BINDINGS_LIB_MESSAGE_QUOTA_CHECKER_H_
#include <stdint.h> #include <stdint.h>
#include <memory>
#include "base/component_export.h" #include "base/component_export.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/optional.h" #include "base/optional.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#include "base/time/time.h"
#include "mojo/public/cpp/system/message_pipe.h" #include "mojo/public/cpp/system/message_pipe.h"
namespace mojo { namespace mojo {
...@@ -39,6 +41,35 @@ namespace internal { ...@@ -39,6 +41,35 @@ namespace internal {
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) MessageQuotaChecker class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) MessageQuotaChecker
: public base::RefCountedThreadSafe<MessageQuotaChecker> { : public base::RefCountedThreadSafe<MessageQuotaChecker> {
public: public:
// A helper class to maintain a decaying average for the rate of events per
// sampling interval over time.
class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) DecayingRateAverage {
public:
DecayingRateAverage();
// Accrues one event at time |when|. Note that |when| must increase
// monotonically from one event to the next.
void AccrueEvent(base::TimeTicks when);
// Retrieves the current rate average, decayed to |when|.
double GetDecayedRateAverage(base::TimeTicks when) const;
// The length of a sampling interval in seconds.
static constexpr size_t kSecondsPerSamplingInterval = 5;
private:
// A new sample is weighed at this rate into the average, whereas the old
// average is weighed at (1-kSampleWeight)^age;
static constexpr double kSampleWeight = 0.5;
// The event count for the current or most recent sampling interval.
size_t events_ = 0;
int64_t events_sampling_interval_;
// The so-far accrued average and the sampling interval it covers.
double decayed_average_ = 0.0;
int64_t decayed_average_sampling_interval_;
};
// Returns a new instance if this invocation has been sampled for quota // Returns a new instance if this invocation has been sampled for quota
// checking. // checking.
static scoped_refptr<MessageQuotaChecker> MaybeCreate(); static scoped_refptr<MessageQuotaChecker> MaybeCreate();
...@@ -79,15 +110,26 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) MessageQuotaChecker ...@@ -79,15 +110,26 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) MessageQuotaChecker
const Configuration* config_; const Configuration* config_;
// The time ticks when this instance was created.
const base::TimeTicks creation_time_;
// Locks all local state. // Locks all local state.
base::Lock lock_; base::Lock lock_;
// Cumulative counts for the number of messages enqueued with
// |BeforeMessagesEnqueued()| and dequeued with |BeforeMessagesDequeued()|.
std::atomic<uint64_t> messages_enqueued_;
std::atomic<uint64_t> messages_dequeued_;
std::atomic<uint64_t> messages_written_;
// A decaying average of the rate of call to BeforeWrite per second.
DecayingRateAverage write_rate_average_;
// The locally consumed quota, e.g. the difference between the counts passed // The locally consumed quota, e.g. the difference between the counts passed
// to |BeforeMessagesEnqueued()| and |BeforeMessagesDequeued()|. // to |BeforeMessagesEnqueued()| and |BeforeMessagesDequeued()|.
size_t consumed_quota_ = 0u; size_t consumed_quota_ = 0u;
// The high watermark consumed quota observed. // The high watermark consumed quota observed.
size_t max_consumed_quota_ = 0u; size_t max_consumed_quota_ = 0u;
// The quota level that triggers a crash dump, or zero to disable crashing.
size_t crash_threshold_ = 0u;
// The message pipe this instance observes, if any. // The message pipe this instance observes, if any.
MessagePipeHandle message_pipe_; MessagePipeHandle message_pipe_;
}; };
...@@ -98,7 +140,9 @@ struct MessageQuotaChecker::Configuration { ...@@ -98,7 +140,9 @@ struct MessageQuotaChecker::Configuration {
size_t unread_message_count_quota = 0u; size_t unread_message_count_quota = 0u;
size_t crash_threshold = 0u; size_t crash_threshold = 0u;
void (*maybe_crash_function)(size_t quota_used, void (*maybe_crash_function)(size_t quota_used,
base::Optional<size_t> message_pipe_quota_used); base::Optional<size_t> message_pipe_quota_used,
int64_t seconds_since_construction,
double average_write_rate);
}; };
} // namespace internal } // namespace internal
......
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
#include "base/optional.h" #include "base/optional.h"
#include "base/test/scoped_feature_list.h" #include "base/test/scoped_feature_list.h"
#include "base/test/task_environment.h"
#include "base/time/time.h"
#include "mojo/public/c/system/quota.h" #include "mojo/public/c/system/quota.h"
#include "mojo/public/cpp/bindings/features.h" #include "mojo/public/cpp/bindings/features.h"
#include "mojo/public/cpp/system/message_pipe.h" #include "mojo/public/cpp/system/message_pipe.h"
...@@ -30,23 +32,41 @@ class MessageQuotaCheckerTest : public testing::Test { ...@@ -30,23 +32,41 @@ class MessageQuotaCheckerTest : public testing::Test {
using MessageQuotaChecker = internal::MessageQuotaChecker; using MessageQuotaChecker = internal::MessageQuotaChecker;
using Configuration = MessageQuotaChecker::Configuration; using Configuration = MessageQuotaChecker::Configuration;
static void RecordDumpAttempt( static constexpr base::TimeDelta kOneSamplingInterval =
size_t total_quota_used, base::TimeDelta::FromSeconds(MessageQuotaChecker::DecayingRateAverage::
base::Optional<size_t> message_pipe_quota_used) { kSecondsPerSamplingInterval);
void Advance(base::TimeDelta delta) {
task_environment_.FastForwardBy(delta);
}
static void RecordDumpAttempt(size_t total_quota_used,
base::Optional<size_t> message_pipe_quota_used,
int64_t seconds_since_construction,
double average_write_rate) {
++instance_->num_dumps_; ++instance_->num_dumps_;
instance_->last_dump_total_quota_used_ = total_quota_used; instance_->last_dump_total_quota_used_ = total_quota_used;
instance_->last_dump_message_pipe_quota_used_ = message_pipe_quota_used; instance_->last_dump_message_pipe_quota_used_ = message_pipe_quota_used;
instance_->last_seconds_since_construction_ = seconds_since_construction;
instance_->last_average_write_rate_ = average_write_rate;
} }
// Mock time to allow testing
base::test::TaskEnvironment task_environment_{
base::test::TaskEnvironment::TimeSource::MOCK_TIME};
size_t num_dumps_ = false; size_t num_dumps_ = false;
size_t last_dump_total_quota_used_ = 0u; size_t last_dump_total_quota_used_ = 0u;
base::Optional<size_t> last_dump_message_pipe_quota_used_; base::Optional<size_t> last_dump_message_pipe_quota_used_;
int64_t last_seconds_since_construction_ = 0;
double last_average_write_rate_ = 0.0;
static const Configuration enabled_config_; static const Configuration enabled_config_;
static MessageQuotaCheckerTest* instance_; static MessageQuotaCheckerTest* instance_;
}; };
constexpr base::TimeDelta MessageQuotaCheckerTest::kOneSamplingInterval;
const MessageQuotaCheckerTest::Configuration const MessageQuotaCheckerTest::Configuration
MessageQuotaCheckerTest::enabled_config_ = {true, 1, 100, 200, MessageQuotaCheckerTest::enabled_config_ = {true, 1, 100, 200,
&RecordDumpAttempt}; &RecordDumpAttempt};
...@@ -159,6 +179,9 @@ TEST_F(MessageQuotaCheckerTest, DumpsCoreOnOverrun) { ...@@ -159,6 +179,9 @@ TEST_F(MessageQuotaCheckerTest, DumpsCoreOnOverrun) {
scoped_refptr<MessageQuotaChecker> checker = scoped_refptr<MessageQuotaChecker> checker =
MessageQuotaChecker::MaybeCreateForTesting(enabled_config_); MessageQuotaChecker::MaybeCreateForTesting(enabled_config_);
// Fast forward time by a few sampling intervals.
Advance(10 * kOneSamplingInterval);
// Queue up 100 messages. // Queue up 100 messages.
checker->SetMessagePipe(pipe.handle0.get()); checker->SetMessagePipe(pipe.handle0.get());
const char kMessage[] = "hello"; const char kMessage[] = "hello";
...@@ -177,24 +200,69 @@ TEST_F(MessageQuotaCheckerTest, DumpsCoreOnOverrun) { ...@@ -177,24 +200,69 @@ TEST_F(MessageQuotaCheckerTest, DumpsCoreOnOverrun) {
ASSERT_EQ(0u, num_dumps_); ASSERT_EQ(0u, num_dumps_);
checker->BeforeMessagesEnqueued(50); checker->BeforeMessagesEnqueued(50);
ASSERT_EQ(1u, num_dumps_); EXPECT_EQ(1u, num_dumps_);
ASSERT_EQ(200u, last_dump_total_quota_used_); EXPECT_EQ(200u, last_dump_total_quota_used_);
ASSERT_EQ(100u, last_dump_message_pipe_quota_used_); EXPECT_EQ(100u, last_dump_message_pipe_quota_used_);
EXPECT_EQ((10 * kOneSamplingInterval).InSeconds(),
last_seconds_since_construction_);
EXPECT_EQ(50.0, last_average_write_rate_);
Advance(kOneSamplingInterval);
checker->BeforeWrite(); checker->BeforeWrite();
ASSERT_EQ(MOJO_RESULT_OK, ASSERT_EQ(MOJO_RESULT_OK,
WriteMessageRaw(pipe.handle0.get(), kMessage, sizeof(kMessage), WriteMessageRaw(pipe.handle0.get(), kMessage, sizeof(kMessage),
nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE)); nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE));
ASSERT_EQ(2u, num_dumps_); EXPECT_EQ(2u, num_dumps_);
ASSERT_EQ(201u, last_dump_total_quota_used_); EXPECT_EQ(201u, last_dump_total_quota_used_);
ASSERT_EQ(101u, last_dump_message_pipe_quota_used_); EXPECT_EQ(101u, last_dump_message_pipe_quota_used_);
EXPECT_EQ((11 * kOneSamplingInterval).InSeconds(),
last_seconds_since_construction_);
EXPECT_EQ(25.5, last_average_write_rate_);
Advance(kOneSamplingInterval);
checker->SetMessagePipe(mojo::MessagePipeHandle()); checker->SetMessagePipe(mojo::MessagePipeHandle());
checker->BeforeMessagesEnqueued(250); checker->BeforeMessagesEnqueued(250);
ASSERT_EQ(3u, num_dumps_); EXPECT_EQ(3u, num_dumps_);
ASSERT_EQ(350u, last_dump_total_quota_used_); EXPECT_EQ(350u, last_dump_total_quota_used_);
ASSERT_FALSE(last_dump_message_pipe_quota_used_.has_value()); EXPECT_FALSE(last_dump_message_pipe_quota_used_.has_value());
EXPECT_EQ((12 * kOneSamplingInterval).InSeconds(),
last_seconds_since_construction_);
EXPECT_EQ(12.75, last_average_write_rate_);
}
TEST_F(MessageQuotaCheckerTest, DecayingRateAverage) {
MessageQuotaChecker::DecayingRateAverage avg;
base::TimeTicks t0 = base::TimeTicks::Now();
EXPECT_EQ(0.0, avg.GetDecayedRateAverage(t0));
// Tally two events in the same second.
avg.AccrueEvent(t0);
avg.AccrueEvent(t0);
// The decayed average rate is half of the rate this sampling interval.
EXPECT_EQ(1.0, avg.GetDecayedRateAverage(t0));
t0 += kOneSamplingInterval;
// Tally another two events in a subsequent sampling interval.
avg.AccrueEvent(t0);
avg.AccrueEvent(t0);
EXPECT_EQ(1.5, avg.GetDecayedRateAverage(t0));
t0 += kOneSamplingInterval;
// Tally another two events in a subsequent sampling interval.
avg.AccrueEvent(t0);
avg.AccrueEvent(t0);
EXPECT_EQ(1.75, avg.GetDecayedRateAverage(t0));
// Make sure the average is decayed with time.
EXPECT_EQ(0.875, avg.GetDecayedRateAverage(t0 + kOneSamplingInterval));
EXPECT_EQ(0.4375, avg.GetDecayedRateAverage(t0 + 2 * kOneSamplingInterval));
t0 += 10 * kOneSamplingInterval;
avg.AccrueEvent(t0);
avg.AccrueEvent(t0);
// The previous average should have decayed by 1/1024.
EXPECT_EQ(1.0 + 1.75 / 1024.0, avg.GetDecayedRateAverage(t0));
} }
} // namespace } // namespace
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment