fix: prevent memory leak by periodically cleaning up expired cache entries

Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
This commit is contained in:
Armando Ruocco 2025-11-25 17:54:42 +01:00 committed by Marco Nenciarini
parent 3f8d4f7257
commit 62b579101f
3 changed files with 217 additions and 8 deletions

View File

@ -36,6 +36,9 @@ import (
// DefaultTTLSeconds is the default TTL in seconds of cache entries // DefaultTTLSeconds is the default TTL in seconds of cache entries
const DefaultTTLSeconds = 10 const DefaultTTLSeconds = 10
// DefaultCleanupIntervalSeconds is the default interval in seconds for cache cleanup
const DefaultCleanupIntervalSeconds = 30
type cachedEntry struct { type cachedEntry struct {
entry client.Object entry client.Object
fetchUnixTime int64 fetchUnixTime int64
@ -49,18 +52,28 @@ func (e *cachedEntry) isExpired() bool {
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers // ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
type ExtendedClient struct { type ExtendedClient struct {
client.Client client.Client
cachedObjects []cachedEntry cachedObjects []cachedEntry
mux *sync.Mutex mux *sync.Mutex
cleanupInterval time.Duration
} }
// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation // NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation.
// It starts a background goroutine that periodically cleans up expired cache entries.
// The cleanup routine will stop when the provided context is cancelled.
func NewExtendedClient( func NewExtendedClient(
ctx context.Context,
baseClient client.Client, baseClient client.Client,
) client.Client { ) client.Client {
return &ExtendedClient{ ec := &ExtendedClient{
Client: baseClient, Client: baseClient,
mux: &sync.Mutex{}, mux: &sync.Mutex{},
cleanupInterval: DefaultCleanupIntervalSeconds * time.Second,
} }
// Start the background cleanup routine
go ec.startCleanupRoutine(ctx)
return ec
} }
func (e *ExtendedClient) isObjectCached(obj client.Object) bool { func (e *ExtendedClient) isObjectCached(obj client.Object) bool {
@ -208,3 +221,54 @@ func (e *ExtendedClient) Patch(
return e.Client.Patch(ctx, obj, patch, opts...) return e.Client.Patch(ctx, obj, patch, opts...)
} }
// startCleanupRoutine periodically removes expired entries from the cache.
// It runs until the context is cancelled.
func (e *ExtendedClient) startCleanupRoutine(ctx context.Context) {
contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup")
ticker := time.NewTicker(e.cleanupInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
contextLogger.Debug("stopping cache cleanup routine")
return
case <-ticker.C:
// Check context before cleanup to avoid unnecessary work during shutdown
if ctx.Err() != nil {
return
}
e.cleanupExpiredEntries(ctx)
}
}
}
// cleanupExpiredEntries removes all expired entries from the cache.
func (e *ExtendedClient) cleanupExpiredEntries(ctx context.Context) {
contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup")
e.mux.Lock()
defer e.mux.Unlock()
initialCount := len(e.cachedObjects)
if initialCount == 0 {
return
}
// Create a new slice with only non-expired entries
validEntries := make([]cachedEntry, 0, initialCount)
for _, entry := range e.cachedObjects {
if !entry.isExpired() {
validEntries = append(validEntries, entry)
}
}
removedCount := initialCount - len(validEntries)
if removedCount > 0 {
e.cachedObjects = validEntries
contextLogger.Debug("cleaned up expired cache entries",
"removedCount", removedCount,
"remainingCount", len(validEntries))
}
}

View File

@ -20,6 +20,7 @@ SPDX-License-Identifier: Apache-2.0
package client package client
import ( import (
"context"
"time" "time"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
@ -59,6 +60,7 @@ var _ = Describe("ExtendedClient Get", func() {
extendedClient *ExtendedClient extendedClient *ExtendedClient
secretInClient *corev1.Secret secretInClient *corev1.Secret
objectStore *barmancloudv1.ObjectStore objectStore *barmancloudv1.ObjectStore
cancelCtx context.CancelFunc
) )
BeforeEach(func() { BeforeEach(func() {
@ -79,7 +81,14 @@ var _ = Describe("ExtendedClient Get", func() {
baseClient := fake.NewClientBuilder(). baseClient := fake.NewClientBuilder().
WithScheme(scheme). WithScheme(scheme).
WithObjects(secretInClient, objectStore).Build() WithObjects(secretInClient, objectStore).Build()
extendedClient = NewExtendedClient(baseClient).(*ExtendedClient) ctx, cancel := context.WithCancel(context.Background())
cancelCtx = cancel
extendedClient = NewExtendedClient(ctx, baseClient).(*ExtendedClient)
})
AfterEach(func() {
// Cancel the context to stop the cleanup routine
cancelCtx()
}) })
It("returns secret from cache if not expired", func(ctx SpecContext) { It("returns secret from cache if not expired", func(ctx SpecContext) {
@ -164,3 +173,139 @@ var _ = Describe("ExtendedClient Get", func() {
Expect(objectStore.GetResourceVersion()).To(Equal("from cache")) Expect(objectStore.GetResourceVersion()).To(Equal("from cache"))
}) })
}) })
var _ = Describe("ExtendedClient Cache Cleanup", func() {
var (
extendedClient *ExtendedClient
cancelCtx context.CancelFunc
)
BeforeEach(func() {
baseClient := fake.NewClientBuilder().
WithScheme(scheme).
Build()
ctx, cancel := context.WithCancel(context.Background())
cancelCtx = cancel
extendedClient = NewExtendedClient(ctx, baseClient).(*ExtendedClient)
})
AfterEach(func() {
cancelCtx()
})
It("cleans up expired entries", func(ctx SpecContext) {
// Add some expired entries
expiredSecret1 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "expired-secret-1",
},
}
expiredSecret2 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "expired-secret-2",
},
}
validSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "valid-secret",
},
}
// Add expired entries (2 minutes ago)
addToCache(extendedClient, expiredSecret1, time.Now().Add(-2*time.Minute).Unix())
addToCache(extendedClient, expiredSecret2, time.Now().Add(-2*time.Minute).Unix())
// Add valid entry (just now)
addToCache(extendedClient, validSecret, time.Now().Unix())
Expect(extendedClient.cachedObjects).To(HaveLen(3))
// Trigger cleanup
extendedClient.cleanupExpiredEntries(ctx)
// Only the valid entry should remain
Expect(extendedClient.cachedObjects).To(HaveLen(1))
Expect(extendedClient.cachedObjects[0].entry.GetName()).To(Equal("valid-secret"))
})
It("does nothing when all entries are valid", func(ctx SpecContext) {
validSecret1 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "valid-secret-1",
},
}
validSecret2 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "valid-secret-2",
},
}
addToCache(extendedClient, validSecret1, time.Now().Unix())
addToCache(extendedClient, validSecret2, time.Now().Unix())
Expect(extendedClient.cachedObjects).To(HaveLen(2))
// Trigger cleanup
extendedClient.cleanupExpiredEntries(ctx)
// Both entries should remain
Expect(extendedClient.cachedObjects).To(HaveLen(2))
})
It("does nothing when cache is empty", func(ctx SpecContext) {
Expect(extendedClient.cachedObjects).To(BeEmpty())
// Trigger cleanup
extendedClient.cleanupExpiredEntries(ctx)
Expect(extendedClient.cachedObjects).To(BeEmpty())
})
It("removes all entries when all are expired", func(ctx SpecContext) {
expiredSecret1 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "expired-secret-1",
},
}
expiredSecret2 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "expired-secret-2",
},
}
addToCache(extendedClient, expiredSecret1, time.Now().Add(-2*time.Minute).Unix())
addToCache(extendedClient, expiredSecret2, time.Now().Add(-2*time.Minute).Unix())
Expect(extendedClient.cachedObjects).To(HaveLen(2))
// Trigger cleanup
extendedClient.cleanupExpiredEntries(ctx)
Expect(extendedClient.cachedObjects).To(BeEmpty())
})
It("stops cleanup routine when context is cancelled", func() {
// Create a new client with a short cleanup interval for testing
baseClient := fake.NewClientBuilder().
WithScheme(scheme).
Build()
ctx, cancel := context.WithCancel(context.Background())
ec := NewExtendedClient(ctx, baseClient).(*ExtendedClient)
ec.cleanupInterval = 10 * time.Millisecond
// Cancel the context immediately
cancel()
// Give the goroutine time to stop
time.Sleep(50 * time.Millisecond)
// The goroutine should have stopped gracefully (no panic or hanging)
// This test mainly verifies the cleanup routine respects context cancellation
})
})

View File

@ -84,7 +84,7 @@ func Start(ctx context.Context) error {
return err return err
} }
customCacheClient := extendedclient.NewExtendedClient(mgr.GetClient()) customCacheClient := extendedclient.NewExtendedClient(ctx, mgr.GetClient())
if err := mgr.Add(&CNPGI{ if err := mgr.Add(&CNPGI{
Client: customCacheClient, Client: customCacheClient,