Commit 01e8a28e authored by Benoit Lize's avatar Benoit Lize Committed by Chromium LUCI CQ

[PartitionAlloc] Add periodic reclaiming to the thread cache.

The thread cache can accumulate a bounded but non trivial amount of
memory. Add a periodic (from a a timer) reclaiming mechanism. However,
since the timer should not wake up an otherwise idle process, it stops
unless there were enough allocations on the main thread since the last
purge was called.

Bug: 998048
Change-Id: I7e2bb6126b03a00d9c6723d1105d53fe728f8c7b
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2527172
Commit-Queue: Benoit L <lizeb@chromium.org>
Reviewed-by: default avatarKentaro Hara <haraken@chromium.org>
Reviewed-by: default avatarBartek Nowierski <bartekn@chromium.org>
Cr-Commit-Position: refs/heads/master@{#833742}
parent afff371a
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include "base/allocator/partition_allocator/partition_alloc.h" #include "base/allocator/partition_allocator/partition_alloc.h"
#include "base/allocator/partition_allocator/partition_alloc_check.h" #include "base/allocator/partition_allocator/partition_alloc_check.h"
#include "base/threading/thread_task_runner_handle.h"
namespace base { namespace base {
...@@ -30,6 +31,8 @@ static std::atomic<bool> g_has_instance; ...@@ -30,6 +31,8 @@ static std::atomic<bool> g_has_instance;
} // namespace } // namespace
constexpr base::TimeDelta ThreadCacheRegistry::kPurgeInterval;
// static // static
ThreadCacheRegistry& ThreadCacheRegistry::Instance() { ThreadCacheRegistry& ThreadCacheRegistry::Instance() {
return g_instance; return g_instance;
...@@ -102,6 +105,74 @@ void ThreadCacheRegistry::PurgeAll() { ...@@ -102,6 +105,74 @@ void ThreadCacheRegistry::PurgeAll() {
current_thread_tcache->Purge(); current_thread_tcache->Purge();
} }
void ThreadCacheRegistry::StartPeriodicPurge() {
PostDelayedPurgeTask();
}
void ThreadCacheRegistry::PostDelayedPurgeTask() {
PA_DCHECK(!has_pending_purge_task_);
ThreadTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE,
base::BindOnce(&ThreadCacheRegistry::PeriodicPurge,
base::Unretained(this)),
kPurgeInterval);
has_pending_purge_task_ = true;
}
void ThreadCacheRegistry::PeriodicPurge() {
has_pending_purge_task_ = false;
ThreadCache* tcache = ThreadCache::Get();
PA_DCHECK(tcache);
uint64_t allocations = tcache->stats_.alloc_count;
uint64_t allocations_since_last_purge =
allocations - allocations_at_last_purge_;
// Purge should not run when there is little activity in the process. We
// assume that the main thread is a reasonable proxy for the process activity,
// where the main thread is the current one.
//
// If we didn't see enough allocations since the last purge, don't schedule a
// new one, and ask the thread cache to notify us of deallocations. This makes
// the next |kMinMainThreadAllocationsForPurging| deallocations slightly
// slower.
//
// Once the threshold is reached, reschedule a purge task. We count
// deallocations rather than allocations because these are the ones that fill
// the cache, and also because we already have a check on the deallocation
// path, not on the allocation one that we don't want to slow down.
bool enough_allocations =
allocations_since_last_purge >= kMinMainThreadAllocationsForPurging;
tcache->SetNotifiesRegistry(!enough_allocations);
deallocations_ = 0;
PurgeAll();
if (enough_allocations) {
allocations_at_last_purge_ = allocations;
PostDelayedPurgeTask();
}
}
void ThreadCacheRegistry::OnDeallocation() {
deallocations_++;
if (deallocations_ > kMinMainThreadAllocationsForPurging) {
ThreadCache* tcache = ThreadCache::Get();
PA_DCHECK(tcache);
deallocations_ = 0;
tcache->SetNotifiesRegistry(false);
if (has_pending_purge_task_)
return;
// This is called from the thread cache, which is called from the central
// allocator. This means that any allocation made by task posting will make
// it reentrant, unless we disable the thread cache.
tcache->Disable();
PostDelayedPurgeTask();
tcache->Enable();
}
}
// static // static
void ThreadCache::Init(PartitionRoot<ThreadSafe>* root) { void ThreadCache::Init(PartitionRoot<ThreadSafe>* root) {
PA_CHECK(root->buckets[kBucketCount - 1].slot_size == kSizeThreshold); PA_CHECK(root->buckets[kBucketCount - 1].slot_size == kSizeThreshold);
...@@ -147,8 +218,13 @@ ThreadCache* ThreadCache::Create(PartitionRoot<internal::ThreadSafe>* root) { ...@@ -147,8 +218,13 @@ ThreadCache* ThreadCache::Create(PartitionRoot<internal::ThreadSafe>* root) {
} }
ThreadCache::ThreadCache(PartitionRoot<ThreadSafe>* root) ThreadCache::ThreadCache(PartitionRoot<ThreadSafe>* root)
: buckets_(), stats_(), root_(root), next_(nullptr), prev_(nullptr) { : buckets_(),
ThreadCacheRegistry::Instance().RegisterThreadCache(this); stats_(),
root_(root),
registry_(&ThreadCacheRegistry::Instance()),
next_(nullptr),
prev_(nullptr) {
registry_->RegisterThreadCache(this);
for (int index = 0; index < kBucketCount; index++) { for (int index = 0; index < kBucketCount; index++) {
const auto& root_bucket = root->buckets[index]; const auto& root_bucket = root->buckets[index];
...@@ -170,7 +246,7 @@ ThreadCache::ThreadCache(PartitionRoot<ThreadSafe>* root) ...@@ -170,7 +246,7 @@ ThreadCache::ThreadCache(PartitionRoot<ThreadSafe>* root)
} }
ThreadCache::~ThreadCache() { ThreadCache::~ThreadCache() {
ThreadCacheRegistry::Instance().UnregisterThreadCache(this); registry_->UnregisterThreadCache(this);
Purge(); Purge();
} }
...@@ -261,6 +337,22 @@ void ThreadCache::ClearBucket(ThreadCache::Bucket& bucket, size_t limit) { ...@@ -261,6 +337,22 @@ void ThreadCache::ClearBucket(ThreadCache::Bucket& bucket, size_t limit) {
PA_DCHECK(bucket.count == limit); PA_DCHECK(bucket.count == limit);
} }
void ThreadCache::HandleNonNormalMode() {
switch (mode_.load(std::memory_order_relaxed)) {
case Mode::kPurge:
PurgeInternal();
mode_.store(Mode::kNormal, std::memory_order_relaxed);
break;
case Mode::kNotifyRegistry:
registry_->OnDeallocation();
break;
default:
break;
}
}
void ThreadCache::AccumulateStats(ThreadCacheStats* stats) const { void ThreadCache::AccumulateStats(ThreadCacheStats* stats) const {
stats->alloc_count += stats_.alloc_count; stats->alloc_count += stats_.alloc_count;
stats->alloc_hits += stats_.alloc_hits; stats->alloc_hits += stats_.alloc_hits;
...@@ -281,9 +373,22 @@ void ThreadCache::AccumulateStats(ThreadCacheStats* stats) const { ...@@ -281,9 +373,22 @@ void ThreadCache::AccumulateStats(ThreadCacheStats* stats) const {
} }
void ThreadCache::SetShouldPurge() { void ThreadCache::SetShouldPurge() {
// Purge may be triggered by an external event, in which case it should not
// take precedence over the notification mode, otherwise we risk disabling
// periodic purge entirely.
//
// Also, no other thread can set this to notification mode.
if (mode_.load(std::memory_order_relaxed) != Mode::kNormal)
return;
// We don't need any synchronization, and don't really care if the purge is // We don't need any synchronization, and don't really care if the purge is
// carried out "right away", hence relaxed atomics. // carried out "right away", hence relaxed atomics.
should_purge_.store(true, std::memory_order_relaxed); mode_.store(Mode::kPurge, std::memory_order_relaxed);
}
void ThreadCache::SetNotifiesRegistry(bool enabled) {
mode_.store(enabled ? Mode::kNotifyRegistry : Mode::kNormal,
std::memory_order_relaxed);
} }
void ThreadCache::Purge() { void ThreadCache::Purge() {
...@@ -294,8 +399,14 @@ void ThreadCache::Purge() { ...@@ -294,8 +399,14 @@ void ThreadCache::Purge() {
void ThreadCache::PurgeInternal() { void ThreadCache::PurgeInternal() {
for (auto& bucket : buckets_) for (auto& bucket : buckets_)
ClearBucket(bucket, 0); ClearBucket(bucket, 0);
}
void ThreadCache::Disable() {
root_->with_thread_cache = false;
}
should_purge_.store(false, std::memory_order_relaxed); void ThreadCache::Enable() {
root_->with_thread_cache = true;
} }
} // namespace internal } // namespace internal
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "base/macros.h" #include "base/macros.h"
#include "base/no_destructor.h" #include "base/no_destructor.h"
#include "base/partition_alloc_buildflags.h" #include "base/partition_alloc_buildflags.h"
#include "base/sequenced_task_runner.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
// Need TLS support. // Need TLS support.
...@@ -65,13 +66,27 @@ class BASE_EXPORT ThreadCacheRegistry { ...@@ -65,13 +66,27 @@ class BASE_EXPORT ThreadCacheRegistry {
// a later point (during a deallocation). // a later point (during a deallocation).
void PurgeAll(); void PurgeAll();
// Starts a periodic timer on the current thread to purge all thread caches.
void StartPeriodicPurge();
void OnDeallocation();
static PartitionLock& GetLock() { return Instance().lock_; } static PartitionLock& GetLock() { return Instance().lock_; }
bool has_pending_purge_task() const { return has_pending_purge_task_; }
static constexpr TimeDelta kPurgeInterval = TimeDelta::FromSeconds(1);
static constexpr int kMinMainThreadAllocationsForPurging = 1000;
private: private:
void PeriodicPurge();
void PostDelayedPurgeTask();
friend class NoDestructor<ThreadCacheRegistry>; friend class NoDestructor<ThreadCacheRegistry>;
// Not using base::Lock as the object's constructor must be constexpr. // Not using base::Lock as the object's constructor must be constexpr.
PartitionLock lock_; PartitionLock lock_;
ThreadCache* list_head_ GUARDED_BY(GetLock()) = nullptr; ThreadCache* list_head_ GUARDED_BY(GetLock()) = nullptr;
uint64_t allocations_at_last_purge_ = 0;
int deallocations_ = 0;
bool has_pending_purge_task_ = false;
}; };
constexpr ThreadCacheRegistry::ThreadCacheRegistry() = default; constexpr ThreadCacheRegistry::ThreadCacheRegistry() = default;
...@@ -175,12 +190,17 @@ class BASE_EXPORT ThreadCache { ...@@ -175,12 +190,17 @@ class BASE_EXPORT ThreadCache {
// Asks this cache to trigger |Purge()| at a later point. Can be called from // Asks this cache to trigger |Purge()| at a later point. Can be called from
// any thread. // any thread.
void SetShouldPurge(); void SetShouldPurge();
void SetNotifiesRegistry(bool enabled);
// Empties the cache. // Empties the cache.
// The Partition lock must *not* be held when calling this. // The Partition lock must *not* be held when calling this.
// Must be called from the thread this cache is for. // Must be called from the thread this cache is for.
void Purge(); void Purge();
void AccumulateStats(ThreadCacheStats* stats) const; void AccumulateStats(ThreadCacheStats* stats) const;
// Disables the thread cache for its associated root.
void Disable();
void Enable();
size_t bucket_count_for_testing(size_t index) const { size_t bucket_count_for_testing(size_t index) const {
return buckets_[index].count; return buckets_[index].count;
} }
...@@ -194,6 +214,7 @@ class BASE_EXPORT ThreadCache { ...@@ -194,6 +214,7 @@ class BASE_EXPORT ThreadCache {
uint16_t count; uint16_t count;
uint16_t limit; uint16_t limit;
}; };
enum class Mode { kNormal, kPurge, kNotifyRegistry };
explicit ThreadCache(PartitionRoot<ThreadSafe>* root); explicit ThreadCache(PartitionRoot<ThreadSafe>* root);
static void Delete(void* thread_cache_ptr); static void Delete(void* thread_cache_ptr);
...@@ -203,6 +224,7 @@ class BASE_EXPORT ThreadCache { ...@@ -203,6 +224,7 @@ class BASE_EXPORT ThreadCache {
// Empties the |bucket| until there are at most |limit| objects in it. // Empties the |bucket| until there are at most |limit| objects in it.
void ClearBucket(Bucket& bucket, size_t limit); void ClearBucket(Bucket& bucket, size_t limit);
ALWAYS_INLINE void PutInBucket(Bucket& bucket, void* ptr); ALWAYS_INLINE void PutInBucket(Bucket& bucket, void* ptr);
void HandleNonNormalMode();
// TODO(lizeb): Optimize the threshold. // TODO(lizeb): Optimize the threshold.
static constexpr size_t kSizeThreshold = 512; static constexpr size_t kSizeThreshold = 512;
...@@ -214,10 +236,11 @@ class BASE_EXPORT ThreadCache { ...@@ -214,10 +236,11 @@ class BASE_EXPORT ThreadCache {
kBucketCount < kNumBuckets, kBucketCount < kNumBuckets,
"Cannot have more cached buckets than what the allocator supports"); "Cannot have more cached buckets than what the allocator supports");
std::atomic<bool> should_purge_{false}; std::atomic<Mode> mode_{Mode::kNormal};
Bucket buckets_[kBucketCount]; Bucket buckets_[kBucketCount];
ThreadCacheStats stats_; ThreadCacheStats stats_;
PartitionRoot<ThreadSafe>* const root_; PartitionRoot<ThreadSafe>* const root_;
ThreadCacheRegistry* const registry_;
#if DCHECK_IS_ON() #if DCHECK_IS_ON()
bool is_in_thread_cache_ = false; bool is_in_thread_cache_ = false;
#endif #endif
...@@ -257,8 +280,8 @@ ALWAYS_INLINE bool ThreadCache::MaybePutInCache(void* address, ...@@ -257,8 +280,8 @@ ALWAYS_INLINE bool ThreadCache::MaybePutInCache(void* address,
ClearBucket(bucket, bucket.limit / 2); ClearBucket(bucket, bucket.limit / 2);
} }
if (UNLIKELY(should_purge_.load(std::memory_order_relaxed))) if (UNLIKELY(mode_.load(std::memory_order_relaxed) != Mode::kNormal))
PurgeInternal(); HandleNonNormalMode();
return true; return true;
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "base/callback.h" #include "base/callback.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#include "base/test/bind.h" #include "base/test/bind.h"
#include "base/test/task_environment.h"
#include "base/threading/platform_thread.h" #include "base/threading/platform_thread.h"
#include "build/build_config.h" #include "build/build_config.h"
#include "testing/gtest/include/gtest/gtest.h" #include "testing/gtest/include/gtest/gtest.h"
...@@ -24,9 +25,7 @@ ...@@ -24,9 +25,7 @@
// With *SAN, PartitionAlloc is replaced in partition_alloc.h by ASAN, so we // With *SAN, PartitionAlloc is replaced in partition_alloc.h by ASAN, so we
// cannot test the thread cache. // cannot test the thread cache.
// //
// Finally, the thread cache currently uses `thread_local`, which causes issues // Finally, the thread cache is not supported on all platforms.
// on Windows 7 (at least). As long as it doesn't use something else on Windows,
// disable the cache (and tests)
#if !BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \ #if !BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
!defined(MEMORY_TOOL_REPLACES_ALLOCATOR) && \ !defined(MEMORY_TOOL_REPLACES_ALLOCATOR) && \
defined(PA_THREAD_CACHE_SUPPORTED) defined(PA_THREAD_CACHE_SUPPORTED)
...@@ -103,7 +102,14 @@ class ThreadCacheTest : public ::testing::Test { ...@@ -103,7 +102,14 @@ class ThreadCacheTest : public ::testing::Test {
ASSERT_TRUE(tcache); ASSERT_TRUE(tcache);
tcache->Purge(); tcache->Purge();
} }
void TearDown() override {}
void TearDown() override {
task_env_.FastForwardUntilNoTasksRemain();
ASSERT_FALSE(ThreadCacheRegistry::Instance().has_pending_purge_task());
}
base::test::TaskEnvironment task_env_{
base::test::TaskEnvironment::TimeSource::MOCK_TIME};
}; };
TEST_F(ThreadCacheTest, Simple) { TEST_F(ThreadCacheTest, Simple) {
...@@ -421,6 +427,103 @@ TEST_F(ThreadCacheTest, PurgeAll) NO_THREAD_SAFETY_ANALYSIS { ...@@ -421,6 +427,103 @@ TEST_F(ThreadCacheTest, PurgeAll) NO_THREAD_SAFETY_ANALYSIS {
PlatformThread::Join(thread_handle); PlatformThread::Join(thread_handle);
} }
TEST_F(ThreadCacheTest, PeriodicPurge) {
ThreadCacheRegistry::Instance().StartPeriodicPurge();
EXPECT_TRUE(ThreadCacheRegistry::Instance().has_pending_purge_task());
std::atomic<bool> other_thread_started{false};
std::atomic<bool> purge_called{false};
size_t bucket_index = FillThreadCacheAndReturnIndex(kMediumSize);
ThreadCache* this_thread_tcache = g_root->thread_cache_for_testing();
ThreadCache* other_thread_tcache = nullptr;
LambdaThreadDelegate delegate{
BindLambdaForTesting([&]() NO_THREAD_SAFETY_ANALYSIS {
FillThreadCacheAndReturnIndex(kMediumSize);
other_thread_tcache = g_root->thread_cache_for_testing();
other_thread_started.store(true, std::memory_order_release);
while (!purge_called.load(std::memory_order_acquire)) {
}
// Purge() was not triggered from the other thread.
EXPECT_EQ(kFillCountForMediumBucket,
other_thread_tcache->bucket_count_for_testing(bucket_index));
// Allocations do not trigger Purge().
void* data = g_root->Alloc(1, "");
EXPECT_EQ(kFillCountForMediumBucket,
other_thread_tcache->bucket_count_for_testing(bucket_index));
// But deallocations do.
g_root->Free(data);
EXPECT_EQ(0u,
other_thread_tcache->bucket_count_for_testing(bucket_index));
})};
PlatformThreadHandle thread_handle;
PlatformThread::Create(0, &delegate, &thread_handle);
while (!other_thread_started.load(std::memory_order_acquire)) {
}
EXPECT_EQ(kFillCountForMediumBucket,
this_thread_tcache->bucket_count_for_testing(bucket_index));
EXPECT_EQ(kFillCountForMediumBucket,
other_thread_tcache->bucket_count_for_testing(bucket_index));
task_env_.FastForwardBy(ThreadCacheRegistry::kPurgeInterval);
// Not enough allocations since last purge, don't reschedule it.
EXPECT_FALSE(ThreadCacheRegistry::Instance().has_pending_purge_task());
// This thread is synchronously purged.
EXPECT_EQ(0u, this_thread_tcache->bucket_count_for_testing(bucket_index));
// Not the other one.
EXPECT_EQ(kFillCountForMediumBucket,
other_thread_tcache->bucket_count_for_testing(bucket_index));
purge_called.store(true, std::memory_order_release);
PlatformThread::Join(thread_handle);
}
TEST_F(ThreadCacheTest, PeriodicPurgeStopsAndRestarts) {
const size_t kTestSize = 100;
ThreadCacheRegistry::Instance().StartPeriodicPurge();
EXPECT_TRUE(ThreadCacheRegistry::Instance().has_pending_purge_task());
size_t bucket_index = FillThreadCacheAndReturnIndex(kTestSize);
auto* tcache = ThreadCache::Get();
EXPECT_GT(tcache->bucket_count_for_testing(bucket_index), 0u);
task_env_.FastForwardBy(ThreadCacheRegistry::kPurgeInterval);
// Not enough allocations since last purge, don't reschedule it.
EXPECT_FALSE(ThreadCacheRegistry::Instance().has_pending_purge_task());
// This thread is synchronously purged.
EXPECT_EQ(0u, tcache->bucket_count_for_testing(bucket_index));
// 1 allocation is not enough to restart it.
FillThreadCacheAndReturnIndex(kTestSize);
EXPECT_FALSE(ThreadCacheRegistry::Instance().has_pending_purge_task());
for (int i = 0; i < ThreadCacheRegistry::kMinMainThreadAllocationsForPurging;
i++) {
FillThreadCacheAndReturnIndex(kTestSize);
}
EXPECT_TRUE(ThreadCacheRegistry::Instance().has_pending_purge_task());
EXPECT_GT(tcache->bucket_count_for_testing(bucket_index), 0u);
task_env_.FastForwardBy(ThreadCacheRegistry::kPurgeInterval);
EXPECT_EQ(0u, tcache->bucket_count_for_testing(bucket_index));
// Since there were enough allocations, another task is posted.
EXPECT_TRUE(ThreadCacheRegistry::Instance().has_pending_purge_task());
FillThreadCacheAndReturnIndex(kTestSize);
task_env_.FastForwardBy(ThreadCacheRegistry::kPurgeInterval);
EXPECT_EQ(0u, tcache->bucket_count_for_testing(bucket_index));
// Not enough this time.
EXPECT_FALSE(ThreadCacheRegistry::Instance().has_pending_purge_task());
}
} // namespace internal } // namespace internal
} // namespace base } // namespace base
......
...@@ -170,6 +170,10 @@ ...@@ -170,6 +170,10 @@
#include "content/common/android/cpu_affinity.h" #include "content/common/android/cpu_affinity.h"
#endif #endif
#if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC)
#include "base/allocator/partition_allocator/thread_cache.h"
#endif
namespace content { namespace content {
extern int GpuMain(const content::MainFunctionParams&); extern int GpuMain(const content::MainFunctionParams&);
#if BUILDFLAG(ENABLE_PLUGINS) #if BUILDFLAG(ENABLE_PLUGINS)
...@@ -1009,6 +1013,10 @@ int ContentMainRunnerImpl::RunBrowser(MainFunctionParams& main_params, ...@@ -1009,6 +1013,10 @@ int ContentMainRunnerImpl::RunBrowser(MainFunctionParams& main_params,
// Enable PCScan once we are certain that FeatureList was initialized. // Enable PCScan once we are certain that FeatureList was initialized.
EnablePCScanForMallocPartitionsIfNeeded(); EnablePCScanForMallocPartitionsIfNeeded();
#if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC)
base::internal::ThreadCacheRegistry::Instance().StartPeriodicPurge();
#endif
if (should_start_minimal_browser) { if (should_start_minimal_browser) {
DVLOG(0) << "Chrome is running in minimal browser mode."; DVLOG(0) << "Chrome is running in minimal browser mode.";
return -1; return -1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment