Commit a8cac9ec authored by Benoit Lize's avatar Benoit Lize Committed by Chromium LUCI CQ

[PartitionAlloc] Batch Thread cache allocations.

Allocate multiple objects at a time for the thread at a time.
This is meant to amortize allocation costs.

Bug: 998048
Change-Id: I837216fcb3cb76302a6d09e7890b52313ebf8fa2
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2562302
Commit-Queue: Benoit L <lizeb@chromium.org>
Reviewed-by: default avatarBartek Nowierski <bartekn@chromium.org>
Cr-Commit-Position: refs/heads/master@{#832357}
parent ed7c5b6a
......@@ -181,6 +181,62 @@ void ThreadCache::Delete(void* tcache_ptr) {
root->RawFree(tcache_ptr);
}
void ThreadCache::FillBucket(size_t bucket_index) {
// Filling multiple elements from the central allocator at a time has several
// advantages:
// - Amortize lock acquisition
// - Increase hit rate
// - Can improve locality, as consecutive allocations from the central
// allocator will likely return close addresses, especially early on.
//
// However, do not take too many items, to prevent memory bloat.
//
// Cache filling / purging policy:
// We aim at keeping the buckets neither empty nor full, while minimizing
// requests to the central allocator.
//
// For each bucket, there is a |limit| of how many cached objects there are in
// the bucket, so |count| < |limit| at all times.
// - Clearing: limit -> limit / 2
// - Filling: 0 -> limit / 4
//
// These thresholds are somewhat arbitrary, with these considerations:
// (1) Batched filling should not completely fill the bucket
// (2) Batched clearing should not completely clear the bucket
// (3) Batched filling should not be too eager
//
// If (1) and (2) do not hold, we risk oscillations of bucket filling /
// clearing which would greatly increase calls to the central allocator. (3)
// tries to keep memory usage low. So clearing half of the bucket, and filling
// a quarter of it are sensible defaults.
Bucket& bucket = buckets_[bucket_index];
int count = bucket.limit / 4;
size_t utilized_slot_size;
bool is_already_zeroed;
// Same as calling RawAlloc() |count| times, but acquires the lock only once.
internal::ScopedGuard<internal::ThreadSafe> guard(root_->lock_);
for (int i = 0; i < count; i++) {
// We allow the allocator to return nullptr, since filling the cache may
// safely fail, and the proper flag will be handled by the central
// allocator.
//
// |raw_size| is set to the slot size, as we don't know it. However, it is
// only used for direct-mapped allocations and single-slot ones anyway,
// which are not handled here.
void* ptr = root_->AllocFromBucket(
&root_->buckets[bucket_index], PartitionAllocReturnNull,
root_->buckets[bucket_index].slot_size /* raw_size */,
&utilized_slot_size, &is_already_zeroed);
// Central allocator is out of memory.
if (!ptr)
break;
PutInBucket(bucket, ptr);
}
}
void ThreadCache::ClearBucket(ThreadCache::Bucket& bucket, size_t limit) {
// Avoids acquiring the lock needlessly.
if (!bucket.count)
......
......@@ -195,8 +195,11 @@ class BASE_EXPORT ThreadCache {
explicit ThreadCache(PartitionRoot<ThreadSafe>* root);
static void Delete(void* thread_cache_ptr);
void PurgeInternal();
// Fills a bucket from the central allocator.
void FillBucket(size_t bucket_index);
// Empties the |bucket| until there are at most |limit| objects in it.
void ClearBucket(Bucket& bucket, size_t limit);
ALWAYS_INLINE void PutInBucket(Bucket& bucket, void* ptr);
// TODO(lizeb): Optimize the threshold.
static constexpr size_t kSizeThreshold = 512;
......@@ -208,7 +211,7 @@ class BASE_EXPORT ThreadCache {
kBucketCount < kNumBuckets,
"Cannot have more cached buckets than what the allocator supports");
std::atomic<bool> should_purge_;
std::atomic<bool> should_purge_{false};
Bucket buckets_[kBucketCount];
ThreadCacheStats stats_;
PartitionRoot<ThreadSafe>* const root_;
......@@ -232,9 +235,6 @@ class BASE_EXPORT ThreadCache {
ALWAYS_INLINE bool ThreadCache::MaybePutInCache(void* address,
size_t bucket_index) {
PA_REENTRANCY_GUARD(is_in_thread_cache_);
if (UNLIKELY(should_purge_.load(std::memory_order_relaxed)))
PurgeInternal();
INCREMENT_COUNTER(stats_.cache_fill_count);
if (UNLIKELY(bucket_index >= kBucketCount)) {
......@@ -246,18 +246,17 @@ ALWAYS_INLINE bool ThreadCache::MaybePutInCache(void* address,
PA_DCHECK(bucket.count != 0 || bucket.freelist_head == nullptr);
auto* entry = reinterpret_cast<PartitionFreelistEntry*>(address);
entry->SetNextForThreadCache(bucket.freelist_head);
bucket.freelist_head = entry;
bucket.count++;
PutInBucket(bucket, address);
INCREMENT_COUNTER(stats_.cache_fill_hits);
// Batched deallocation, amortizing lock acquisitions.
if (UNLIKELY(bucket.count >= bucket.limit)) {
ClearBucket(bucket, bucket.limit >> 1);
ClearBucket(bucket, bucket.limit / 2);
}
if (UNLIKELY(should_purge_.load(std::memory_order_relaxed)))
PurgeInternal();
return true;
}
......@@ -272,25 +271,39 @@ ALWAYS_INLINE void* ThreadCache::GetFromCache(size_t bucket_index) {
}
auto& bucket = buckets_[bucket_index];
auto* result = bucket.freelist_head;
if (UNLIKELY(!result)) {
if (LIKELY(bucket.freelist_head)) {
INCREMENT_COUNTER(stats_.alloc_hits);
} else {
PA_DCHECK(bucket.count == 0);
INCREMENT_COUNTER(stats_.alloc_miss_empty);
INCREMENT_COUNTER(stats_.alloc_misses);
return nullptr;
FillBucket(bucket_index);
// Very unlikely, means that the central allocator is out of memory. Let it
// deal with it (may return nullptr, may crash).
if (UNLIKELY(!bucket.freelist_head))
return nullptr;
}
PA_DCHECK(bucket.count != 0);
auto* result = bucket.freelist_head;
auto* next = result->GetNext();
PA_DCHECK(result != next);
bucket.count--;
PA_DCHECK(bucket.count != 0 || !next);
bucket.freelist_head = next;
INCREMENT_COUNTER(stats_.alloc_hits);
return result;
}
ALWAYS_INLINE void ThreadCache::PutInBucket(Bucket& bucket, void* ptr) {
auto* entry = reinterpret_cast<PartitionFreelistEntry*>(ptr);
entry->SetNextForThreadCache(bucket.freelist_head);
bucket.freelist_head = entry;
bucket.count++;
}
} // namespace internal
} // namespace base
......
......@@ -35,6 +35,14 @@ namespace internal {
namespace {
constexpr size_t kSmallSize = 12;
constexpr size_t kMaxCountForSmallBucket = 128;
constexpr size_t kFillCountForSmallBucket = kMaxCountForSmallBucket / 4;
constexpr size_t kMediumSize = 200;
constexpr size_t kMaxCountForMediumBucket = 64;
constexpr size_t kFillCountForMediumBucket = kMaxCountForMediumBucket / 4;
class LambdaThreadDelegate : public PlatformThread::Delegate {
public:
explicit LambdaThreadDelegate(OnceClosure f) : f_(std::move(f)) {}
......@@ -96,83 +104,89 @@ class ThreadCacheTest : public ::testing::Test {
};
TEST_F(ThreadCacheTest, Simple) {
const size_t kTestSize = 12;
void* ptr = g_root->Alloc(kTestSize, "");
void* ptr = g_root->Alloc(kSmallSize, "");
ASSERT_TRUE(ptr);
// There is a cache.
auto* tcache = g_root->thread_cache_for_testing();
EXPECT_TRUE(tcache);
uint16_t index = PartitionRoot<ThreadSafe>::SizeToBucketIndex(kTestSize);
EXPECT_EQ(0u, tcache->bucket_count_for_testing(index));
uint16_t index = PartitionRoot<ThreadSafe>::SizeToBucketIndex(kSmallSize);
EXPECT_EQ(kFillCountForSmallBucket - 1,
tcache->bucket_count_for_testing(index));
g_root->Free(ptr);
// Freeing fills the thread cache.
EXPECT_EQ(1u, tcache->bucket_count_for_testing(index));
EXPECT_EQ(kFillCountForSmallBucket, tcache->bucket_count_for_testing(index));
void* ptr2 = g_root->Alloc(kTestSize, "");
void* ptr2 = g_root->Alloc(kSmallSize, "");
EXPECT_EQ(ptr, ptr2);
// Allocated from the thread cache.
EXPECT_EQ(0u, tcache->bucket_count_for_testing(index));
EXPECT_EQ(kFillCountForSmallBucket - 1,
tcache->bucket_count_for_testing(index));
}
TEST_F(ThreadCacheTest, InexactSizeMatch) {
const size_t kTestSize = 12;
void* ptr = g_root->Alloc(kTestSize, "");
void* ptr = g_root->Alloc(kSmallSize, "");
ASSERT_TRUE(ptr);
// There is a cache.
auto* tcache = g_root->thread_cache_for_testing();
EXPECT_TRUE(tcache);
uint16_t index = PartitionRoot<ThreadSafe>::SizeToBucketIndex(kTestSize);
EXPECT_EQ(0u, tcache->bucket_count_for_testing(index));
uint16_t index = PartitionRoot<ThreadSafe>::SizeToBucketIndex(kSmallSize);
EXPECT_EQ(kFillCountForSmallBucket - 1,
tcache->bucket_count_for_testing(index));
g_root->Free(ptr);
// Freeing fills the thread cache.
EXPECT_EQ(1u, tcache->bucket_count_for_testing(index));
EXPECT_EQ(kFillCountForSmallBucket, tcache->bucket_count_for_testing(index));
void* ptr2 = g_root->Alloc(kTestSize + 1, "");
void* ptr2 = g_root->Alloc(kSmallSize + 1, "");
EXPECT_EQ(ptr, ptr2);
// Allocated from the thread cache.
EXPECT_EQ(0u, tcache->bucket_count_for_testing(index));
EXPECT_EQ(kFillCountForSmallBucket - 1,
tcache->bucket_count_for_testing(index));
}
TEST_F(ThreadCacheTest, MultipleObjectsCachedPerBucket) {
size_t bucket_index = FillThreadCacheAndReturnIndex(100, 10);
size_t bucket_index =
FillThreadCacheAndReturnIndex(kMediumSize, kFillCountForMediumBucket + 2);
auto* tcache = g_root->thread_cache_for_testing();
EXPECT_EQ(10u, tcache->bucket_count_for_testing(bucket_index));
EXPECT_EQ(2 * kFillCountForMediumBucket,
tcache->bucket_count_for_testing(bucket_index));
}
TEST_F(ThreadCacheTest, ObjectsCachedCountIsLimited) {
size_t bucket_index = FillThreadCacheAndReturnIndex(100, 1000);
size_t bucket_index = FillThreadCacheAndReturnIndex(kMediumSize, 1000);
auto* tcache = g_root->thread_cache_for_testing();
EXPECT_LT(tcache->bucket_count_for_testing(bucket_index), 1000u);
}
TEST_F(ThreadCacheTest, Purge) {
size_t bucket_index = FillThreadCacheAndReturnIndex(100, 10);
size_t bucket_index = FillThreadCacheAndReturnIndex(kMediumSize, 10);
auto* tcache = g_root->thread_cache_for_testing();
EXPECT_EQ(10u, tcache->bucket_count_for_testing(bucket_index));
EXPECT_EQ(kFillCountForMediumBucket,
tcache->bucket_count_for_testing(bucket_index));
tcache->Purge();
EXPECT_EQ(0u, tcache->bucket_count_for_testing(bucket_index));
}
TEST_F(ThreadCacheTest, NoCrossPartitionCache) {
const size_t kTestSize = 12;
ThreadSafePartitionRoot root{{PartitionOptions::Alignment::kAlignedAlloc,
PartitionOptions::ThreadCache::kDisabled}};
size_t bucket_index = FillThreadCacheAndReturnIndex(kTestSize);
void* ptr = root.Alloc(kTestSize, "");
size_t bucket_index = FillThreadCacheAndReturnIndex(kSmallSize);
void* ptr = root.Alloc(kSmallSize, "");
ASSERT_TRUE(ptr);
auto* tcache = g_root->thread_cache_for_testing();
EXPECT_EQ(1u, tcache->bucket_count_for_testing(bucket_index));
EXPECT_EQ(kFillCountForSmallBucket,
tcache->bucket_count_for_testing(bucket_index));
ThreadSafePartitionRoot::Free(ptr);
EXPECT_EQ(1u, tcache->bucket_count_for_testing(bucket_index));
EXPECT_EQ(kFillCountForSmallBucket,
tcache->bucket_count_for_testing(bucket_index));
}
#if defined(PA_ENABLE_THREAD_CACHE_STATISTICS) // Required to record hits and
......@@ -201,14 +215,13 @@ TEST_F(ThreadCacheTest, DirectMappedAllocationsAreNotCached) {
}
TEST_F(ThreadCacheTest, MultipleThreadCaches) {
const size_t kTestSize = 100;
FillThreadCacheAndReturnIndex(kTestSize);
FillThreadCacheAndReturnIndex(kMediumSize);
auto* parent_thread_tcache = g_root->thread_cache_for_testing();
ASSERT_TRUE(parent_thread_tcache);
LambdaThreadDelegate delegate{BindLambdaForTesting([&]() {
EXPECT_FALSE(g_root->thread_cache_for_testing()); // No allocations yet.
FillThreadCacheAndReturnIndex(kTestSize);
FillThreadCacheAndReturnIndex(kMediumSize);
auto* tcache = g_root->thread_cache_for_testing();
EXPECT_TRUE(tcache);
......@@ -221,15 +234,20 @@ TEST_F(ThreadCacheTest, MultipleThreadCaches) {
}
TEST_F(ThreadCacheTest, ThreadCacheReclaimedWhenThreadExits) {
const size_t kTestSize = 100;
// Make sure that there is always at least one object allocated in the test
// bucket, so that the PartitionPage is no reclaimed.
void* tmp = g_root->Alloc(kTestSize, "");
//
// Allocate enough objects to force a cache fill at the next allocation.
std::vector<void*> tmp;
for (size_t i = 0; i < kMaxCountForMediumBucket / 4; i++) {
tmp.push_back(g_root->Alloc(kMediumSize, ""));
}
void* other_thread_ptr;
LambdaThreadDelegate delegate{BindLambdaForTesting([&]() {
EXPECT_FALSE(g_root->thread_cache_for_testing()); // No allocations yet.
other_thread_ptr = g_root->Alloc(kTestSize, "");
other_thread_ptr = g_root->Alloc(kMediumSize, "");
g_root->Free(other_thread_ptr);
// |other_thread_ptr| is now in the thread cache.
})};
......@@ -238,22 +256,23 @@ TEST_F(ThreadCacheTest, ThreadCacheReclaimedWhenThreadExits) {
PlatformThread::Create(0, &delegate, &thread_handle);
PlatformThread::Join(thread_handle);
void* this_thread_ptr = g_root->Alloc(kTestSize, "");
void* this_thread_ptr = g_root->Alloc(kMediumSize, "");
// |other_thread_ptr| was returned to the central allocator, and is returned
// |here, as is comes from the freelist.
// here, as it comes from the freelist.
EXPECT_EQ(this_thread_ptr, other_thread_ptr);
g_root->Free(other_thread_ptr);
g_root->Free(tmp);
for (void* ptr : tmp)
g_root->Free(ptr);
}
TEST_F(ThreadCacheTest, ThreadCacheRegistry) {
const size_t kTestSize = 100;
auto* parent_thread_tcache = g_root->thread_cache_for_testing();
ASSERT_TRUE(parent_thread_tcache);
LambdaThreadDelegate delegate{BindLambdaForTesting([&]() {
EXPECT_FALSE(g_root->thread_cache_for_testing()); // No allocations yet.
FillThreadCacheAndReturnIndex(kTestSize);
FillThreadCacheAndReturnIndex(kSmallSize);
auto* tcache = g_root->thread_cache_for_testing();
EXPECT_TRUE(tcache);
......@@ -273,7 +292,6 @@ TEST_F(ThreadCacheTest, ThreadCacheRegistry) {
#if defined(PA_ENABLE_THREAD_CACHE_STATISTICS)
TEST_F(ThreadCacheTest, RecordStats) {
const size_t kTestSize = 100;
auto* tcache = g_root->thread_cache_for_testing();
DeltaCounter alloc_counter{tcache->stats_.alloc_count};
DeltaCounter alloc_hits_counter{tcache->stats_.alloc_hits};
......@@ -286,7 +304,7 @@ TEST_F(ThreadCacheTest, RecordStats) {
DeltaCounter cache_fill_misses_counter{tcache->stats_.cache_fill_misses};
// Cache has been purged, first allocation is a miss.
void* data = g_root->Alloc(kTestSize, "");
void* data = g_root->Alloc(kMediumSize, "");
EXPECT_EQ(1u, alloc_counter.Delta());
EXPECT_EQ(1u, alloc_miss_counter.Delta());
EXPECT_EQ(0u, alloc_hits_counter.Delta());
......@@ -299,38 +317,36 @@ TEST_F(ThreadCacheTest, RecordStats) {
tcache->Purge();
cache_fill_counter.Reset();
constexpr size_t kMaxCountForBucket = 128;
// Buckets are never full, fill always succeeds.
size_t bucket_index =
FillThreadCacheAndReturnIndex(kTestSize, kMaxCountForBucket + 10);
EXPECT_EQ(kMaxCountForBucket + 10, cache_fill_counter.Delta());
FillThreadCacheAndReturnIndex(kMediumSize, kMaxCountForMediumBucket + 10);
EXPECT_EQ(kMaxCountForMediumBucket + 10, cache_fill_counter.Delta());
EXPECT_EQ(0u, cache_fill_misses_counter.Delta());
// Memory footprint.
ThreadCacheStats stats;
ThreadCacheRegistry::Instance().DumpStats(true, &stats);
// Bucket was cleared (count halved, then refilled).
EXPECT_EQ(
g_root->buckets[bucket_index].slot_size * (kMaxCountForBucket / 2 + 10),
stats.bucket_total_memory);
EXPECT_EQ(g_root->buckets[bucket_index].slot_size *
(kMaxCountForMediumBucket / 2 + kFillCountForMediumBucket),
stats.bucket_total_memory);
EXPECT_EQ(sizeof(ThreadCache), stats.metadata_overhead);
}
TEST_F(ThreadCacheTest, MultipleThreadCachesAccounting) {
const size_t kTestSize = 100;
void* data = g_root->Alloc(kTestSize, "");
g_root->Free(data);
FillThreadCacheAndReturnIndex(kMediumSize);
uint64_t alloc_count = g_root->thread_cache_for_testing()->stats_.alloc_count;
LambdaThreadDelegate delegate{BindLambdaForTesting([&]() {
EXPECT_FALSE(g_root->thread_cache_for_testing()); // No allocations yet.
size_t bucket_index = FillThreadCacheAndReturnIndex(kTestSize);
size_t bucket_index = FillThreadCacheAndReturnIndex(kMediumSize);
ThreadCacheStats stats;
ThreadCacheRegistry::Instance().DumpStats(false, &stats);
// 2* for this thread and the parent one.
EXPECT_EQ(2 * g_root->buckets[bucket_index].slot_size,
stats.bucket_total_memory);
EXPECT_EQ(
2 * g_root->buckets[bucket_index].slot_size * kFillCountForMediumBucket,
stats.bucket_total_memory);
EXPECT_EQ(2 * sizeof(ThreadCache), stats.metadata_overhead);
uint64_t this_thread_alloc_count =
......@@ -349,14 +365,13 @@ TEST_F(ThreadCacheTest, PurgeAll) NO_THREAD_SAFETY_ANALYSIS {
std::atomic<bool> other_thread_started{false};
std::atomic<bool> purge_called{false};
const size_t kTestSize = 100;
size_t bucket_index = FillThreadCacheAndReturnIndex(kTestSize);
size_t bucket_index = FillThreadCacheAndReturnIndex(kSmallSize);
ThreadCache* this_thread_tcache = g_root->thread_cache_for_testing();
ThreadCache* other_thread_tcache = nullptr;
LambdaThreadDelegate delegate{
BindLambdaForTesting([&]() NO_THREAD_SAFETY_ANALYSIS {
FillThreadCacheAndReturnIndex(kTestSize);
FillThreadCacheAndReturnIndex(kSmallSize);
other_thread_tcache = g_root->thread_cache_for_testing();
other_thread_started.store(true, std::memory_order_release);
......@@ -364,11 +379,11 @@ TEST_F(ThreadCacheTest, PurgeAll) NO_THREAD_SAFETY_ANALYSIS {
}
// Purge() was not triggered from the other thread.
EXPECT_EQ(1u,
EXPECT_EQ(kFillCountForSmallBucket,
other_thread_tcache->bucket_count_for_testing(bucket_index));
// Allocations do not trigger Purge().
void* data = g_root->Alloc(1, "");
EXPECT_EQ(1u,
void* data = g_root->Alloc(kSmallSize, "");
EXPECT_EQ(kFillCountForSmallBucket - 1,
other_thread_tcache->bucket_count_for_testing(bucket_index));
// But deallocations do.
g_root->Free(data);
......@@ -382,14 +397,17 @@ TEST_F(ThreadCacheTest, PurgeAll) NO_THREAD_SAFETY_ANALYSIS {
while (!other_thread_started.load(std::memory_order_acquire)) {
}
EXPECT_EQ(1u, this_thread_tcache->bucket_count_for_testing(bucket_index));
EXPECT_EQ(1u, other_thread_tcache->bucket_count_for_testing(bucket_index));
EXPECT_EQ(kFillCountForSmallBucket,
this_thread_tcache->bucket_count_for_testing(bucket_index));
EXPECT_EQ(kFillCountForSmallBucket,
other_thread_tcache->bucket_count_for_testing(bucket_index));
ThreadCacheRegistry::Instance().PurgeAll();
// This thread is synchronously purged.
EXPECT_EQ(0u, this_thread_tcache->bucket_count_for_testing(bucket_index));
// Not the other one.
EXPECT_EQ(1u, other_thread_tcache->bucket_count_for_testing(bucket_index));
EXPECT_EQ(kFillCountForSmallBucket,
other_thread_tcache->bucket_count_for_testing(bucket_index));
purge_called.store(true, std::memory_order_release);
PlatformThread::Join(thread_handle);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment