mirror of
https://github.com/cloudnative-pg/plugin-barman-cloud.git
synced 2026-01-14 06:33:10 +01:00
Compare commits
3 Commits
5cac225e33
...
df32ec1ce0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df32ec1ce0 | ||
|
|
1a4b70a1b2 | ||
|
|
921b20c249 |
@ -128,6 +128,7 @@ pluginConfiguration
|
|||||||
podName
|
podName
|
||||||
postgres
|
postgres
|
||||||
postgresql
|
postgresql
|
||||||
|
pprof
|
||||||
primaryUpdateStrategy
|
primaryUpdateStrategy
|
||||||
rbac
|
rbac
|
||||||
rc
|
rc
|
||||||
|
|||||||
@ -52,6 +52,13 @@ func NewCmd() *cobra.Command {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cmd.Flags().String("pprof-server",
|
||||||
|
"",
|
||||||
|
"The address where pprof server should be exposed, for example: 0.0.0.0:6061. "+
|
||||||
|
"Empty string means disabled. Disabled by default",
|
||||||
|
)
|
||||||
|
_ = viper.BindPFlag("pprof-server", cmd.Flags().Lookup("pprof-server"))
|
||||||
|
|
||||||
_ = viper.BindEnv("namespace", "NAMESPACE")
|
_ = viper.BindEnv("namespace", "NAMESPACE")
|
||||||
_ = viper.BindEnv("cluster-name", "CLUSTER_NAME")
|
_ = viper.BindEnv("cluster-name", "CLUSTER_NAME")
|
||||||
_ = viper.BindEnv("pod-name", "POD_NAME")
|
_ = viper.BindEnv("pod-name", "POD_NAME")
|
||||||
|
|||||||
@ -36,6 +36,9 @@ import (
|
|||||||
// DefaultTTLSeconds is the default TTL in seconds of cache entries
|
// DefaultTTLSeconds is the default TTL in seconds of cache entries
|
||||||
const DefaultTTLSeconds = 10
|
const DefaultTTLSeconds = 10
|
||||||
|
|
||||||
|
// DefaultCleanupIntervalSeconds is the default interval in seconds for cache cleanup
|
||||||
|
const DefaultCleanupIntervalSeconds = 30
|
||||||
|
|
||||||
type cachedEntry struct {
|
type cachedEntry struct {
|
||||||
entry client.Object
|
entry client.Object
|
||||||
fetchUnixTime int64
|
fetchUnixTime int64
|
||||||
@ -49,18 +52,30 @@ func (e *cachedEntry) isExpired() bool {
|
|||||||
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
|
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
|
||||||
type ExtendedClient struct {
|
type ExtendedClient struct {
|
||||||
client.Client
|
client.Client
|
||||||
cachedObjects []cachedEntry
|
cachedObjects []cachedEntry
|
||||||
mux *sync.Mutex
|
mux *sync.Mutex
|
||||||
|
cleanupInterval time.Duration
|
||||||
|
cleanupDone chan struct{} // Signals when cleanup routine exits
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation
|
// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation.
|
||||||
|
// It starts a background goroutine that periodically cleans up expired cache entries.
|
||||||
|
// The cleanup routine will stop when the provided context is cancelled.
|
||||||
func NewExtendedClient(
|
func NewExtendedClient(
|
||||||
|
ctx context.Context,
|
||||||
baseClient client.Client,
|
baseClient client.Client,
|
||||||
) client.Client {
|
) client.Client {
|
||||||
return &ExtendedClient{
|
ec := &ExtendedClient{
|
||||||
Client: baseClient,
|
Client: baseClient,
|
||||||
mux: &sync.Mutex{},
|
mux: &sync.Mutex{},
|
||||||
|
cleanupInterval: DefaultCleanupIntervalSeconds * time.Second,
|
||||||
|
cleanupDone: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start the background cleanup routine
|
||||||
|
go ec.startCleanupRoutine(ctx)
|
||||||
|
|
||||||
|
return ec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ExtendedClient) isObjectCached(obj client.Object) bool {
|
func (e *ExtendedClient) isObjectCached(obj client.Object) bool {
|
||||||
@ -208,3 +223,55 @@ func (e *ExtendedClient) Patch(
|
|||||||
|
|
||||||
return e.Client.Patch(ctx, obj, patch, opts...)
|
return e.Client.Patch(ctx, obj, patch, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startCleanupRoutine periodically removes expired entries from the cache.
|
||||||
|
// It runs until the context is cancelled.
|
||||||
|
func (e *ExtendedClient) startCleanupRoutine(ctx context.Context) {
|
||||||
|
defer close(e.cleanupDone)
|
||||||
|
contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup")
|
||||||
|
ticker := time.NewTicker(e.cleanupInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
contextLogger.Debug("stopping cache cleanup routine")
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
// Check context before cleanup to avoid unnecessary work during shutdown
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e.cleanupExpiredEntries(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupExpiredEntries removes all expired entries from the cache.
|
||||||
|
func (e *ExtendedClient) cleanupExpiredEntries(ctx context.Context) {
|
||||||
|
contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup")
|
||||||
|
|
||||||
|
e.mux.Lock()
|
||||||
|
defer e.mux.Unlock()
|
||||||
|
|
||||||
|
initialCount := len(e.cachedObjects)
|
||||||
|
if initialCount == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new slice with only non-expired entries
|
||||||
|
validEntries := make([]cachedEntry, 0, initialCount)
|
||||||
|
for _, entry := range e.cachedObjects {
|
||||||
|
if !entry.isExpired() {
|
||||||
|
validEntries = append(validEntries, entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
removedCount := initialCount - len(validEntries)
|
||||||
|
if removedCount > 0 {
|
||||||
|
e.cachedObjects = validEntries
|
||||||
|
contextLogger.Debug("cleaned up expired cache entries",
|
||||||
|
"removedCount", removedCount,
|
||||||
|
"remainingCount", len(validEntries))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -20,6 +20,7 @@ SPDX-License-Identifier: Apache-2.0
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
@ -59,6 +60,7 @@ var _ = Describe("ExtendedClient Get", func() {
|
|||||||
extendedClient *ExtendedClient
|
extendedClient *ExtendedClient
|
||||||
secretInClient *corev1.Secret
|
secretInClient *corev1.Secret
|
||||||
objectStore *barmancloudv1.ObjectStore
|
objectStore *barmancloudv1.ObjectStore
|
||||||
|
cancelCtx context.CancelFunc
|
||||||
)
|
)
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
@ -79,7 +81,14 @@ var _ = Describe("ExtendedClient Get", func() {
|
|||||||
baseClient := fake.NewClientBuilder().
|
baseClient := fake.NewClientBuilder().
|
||||||
WithScheme(scheme).
|
WithScheme(scheme).
|
||||||
WithObjects(secretInClient, objectStore).Build()
|
WithObjects(secretInClient, objectStore).Build()
|
||||||
extendedClient = NewExtendedClient(baseClient).(*ExtendedClient)
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancelCtx = cancel
|
||||||
|
extendedClient = NewExtendedClient(ctx, baseClient).(*ExtendedClient)
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
// Cancel the context to stop the cleanup routine
|
||||||
|
cancelCtx()
|
||||||
})
|
})
|
||||||
|
|
||||||
It("returns secret from cache if not expired", func(ctx SpecContext) {
|
It("returns secret from cache if not expired", func(ctx SpecContext) {
|
||||||
@ -164,3 +173,141 @@ var _ = Describe("ExtendedClient Get", func() {
|
|||||||
Expect(objectStore.GetResourceVersion()).To(Equal("from cache"))
|
Expect(objectStore.GetResourceVersion()).To(Equal("from cache"))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var _ = Describe("ExtendedClient Cache Cleanup", func() {
|
||||||
|
var (
|
||||||
|
extendedClient *ExtendedClient
|
||||||
|
cancelCtx context.CancelFunc
|
||||||
|
)
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
baseClient := fake.NewClientBuilder().
|
||||||
|
WithScheme(scheme).
|
||||||
|
Build()
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancelCtx = cancel
|
||||||
|
extendedClient = NewExtendedClient(ctx, baseClient).(*ExtendedClient)
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
cancelCtx()
|
||||||
|
})
|
||||||
|
|
||||||
|
It("cleans up expired entries", func(ctx SpecContext) {
|
||||||
|
// Add some expired entries
|
||||||
|
expiredSecret1 := &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "default",
|
||||||
|
Name: "expired-secret-1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
expiredSecret2 := &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "default",
|
||||||
|
Name: "expired-secret-2",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
validSecret := &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "default",
|
||||||
|
Name: "valid-secret",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add expired entries (2 minutes ago)
|
||||||
|
addToCache(extendedClient, expiredSecret1, time.Now().Add(-2*time.Minute).Unix())
|
||||||
|
addToCache(extendedClient, expiredSecret2, time.Now().Add(-2*time.Minute).Unix())
|
||||||
|
// Add valid entry (just now)
|
||||||
|
addToCache(extendedClient, validSecret, time.Now().Unix())
|
||||||
|
|
||||||
|
Expect(extendedClient.cachedObjects).To(HaveLen(3))
|
||||||
|
|
||||||
|
// Trigger cleanup
|
||||||
|
extendedClient.cleanupExpiredEntries(ctx)
|
||||||
|
|
||||||
|
// Only the valid entry should remain
|
||||||
|
Expect(extendedClient.cachedObjects).To(HaveLen(1))
|
||||||
|
Expect(extendedClient.cachedObjects[0].entry.GetName()).To(Equal("valid-secret"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("does nothing when all entries are valid", func(ctx SpecContext) {
|
||||||
|
validSecret1 := &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "default",
|
||||||
|
Name: "valid-secret-1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
validSecret2 := &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "default",
|
||||||
|
Name: "valid-secret-2",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
addToCache(extendedClient, validSecret1, time.Now().Unix())
|
||||||
|
addToCache(extendedClient, validSecret2, time.Now().Unix())
|
||||||
|
|
||||||
|
Expect(extendedClient.cachedObjects).To(HaveLen(2))
|
||||||
|
|
||||||
|
// Trigger cleanup
|
||||||
|
extendedClient.cleanupExpiredEntries(ctx)
|
||||||
|
|
||||||
|
// Both entries should remain
|
||||||
|
Expect(extendedClient.cachedObjects).To(HaveLen(2))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("does nothing when cache is empty", func(ctx SpecContext) {
|
||||||
|
Expect(extendedClient.cachedObjects).To(BeEmpty())
|
||||||
|
|
||||||
|
// Trigger cleanup
|
||||||
|
extendedClient.cleanupExpiredEntries(ctx)
|
||||||
|
|
||||||
|
Expect(extendedClient.cachedObjects).To(BeEmpty())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("removes all entries when all are expired", func(ctx SpecContext) {
|
||||||
|
expiredSecret1 := &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "default",
|
||||||
|
Name: "expired-secret-1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
expiredSecret2 := &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "default",
|
||||||
|
Name: "expired-secret-2",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
addToCache(extendedClient, expiredSecret1, time.Now().Add(-2*time.Minute).Unix())
|
||||||
|
addToCache(extendedClient, expiredSecret2, time.Now().Add(-2*time.Minute).Unix())
|
||||||
|
|
||||||
|
Expect(extendedClient.cachedObjects).To(HaveLen(2))
|
||||||
|
|
||||||
|
// Trigger cleanup
|
||||||
|
extendedClient.cleanupExpiredEntries(ctx)
|
||||||
|
|
||||||
|
Expect(extendedClient.cachedObjects).To(BeEmpty())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("stops cleanup routine when context is cancelled", func() {
|
||||||
|
// Create a new client with a short cleanup interval for testing
|
||||||
|
baseClient := fake.NewClientBuilder().
|
||||||
|
WithScheme(scheme).
|
||||||
|
Build()
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ec := NewExtendedClient(ctx, baseClient).(*ExtendedClient)
|
||||||
|
ec.cleanupInterval = 10 * time.Millisecond
|
||||||
|
|
||||||
|
// Cancel the context immediately
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// Verify the cleanup routine actually stops by waiting for the done channel
|
||||||
|
select {
|
||||||
|
case <-ec.cleanupDone:
|
||||||
|
// Success: cleanup routine exited as expected
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
Fail("cleanup routine did not stop within timeout")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|||||||
@ -52,7 +52,8 @@ func Start(ctx context.Context) error {
|
|||||||
namespace := viper.GetString("namespace")
|
namespace := viper.GetString("namespace")
|
||||||
|
|
||||||
controllerOptions := ctrl.Options{
|
controllerOptions := ctrl.Options{
|
||||||
Scheme: scheme,
|
PprofBindAddress: viper.GetString("pprof-server"),
|
||||||
|
Scheme: scheme,
|
||||||
Client: client.Options{
|
Client: client.Options{
|
||||||
// Important: the caching options below are used by
|
// Important: the caching options below are used by
|
||||||
// controller-runtime only.
|
// controller-runtime only.
|
||||||
@ -83,7 +84,7 @@ func Start(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
customCacheClient := extendedclient.NewExtendedClient(mgr.GetClient())
|
customCacheClient := extendedclient.NewExtendedClient(ctx, mgr.GetClient())
|
||||||
|
|
||||||
if err := mgr.Add(&CNPGI{
|
if err := mgr.Add(&CNPGI{
|
||||||
Client: customCacheClient,
|
Client: customCacheClient,
|
||||||
|
|||||||
@ -74,3 +74,24 @@ spec:
|
|||||||
|
|
||||||
For a complete list of supported options, refer to the
|
For a complete list of supported options, refer to the
|
||||||
[official Barman Cloud documentation](https://docs.pgbarman.org/release/latest/).
|
[official Barman Cloud documentation](https://docs.pgbarman.org/release/latest/).
|
||||||
|
|
||||||
|
## Enable the pprof debug server for the sidecar
|
||||||
|
|
||||||
|
You can enable the instance sidecar's pprof debug HTTP server by adding the `--pprof-server=<address>` flag to the container's
|
||||||
|
arguments via `.spec.instanceSidecarConfiguration.additionalContainerArgs`.
|
||||||
|
|
||||||
|
Pass a bind address in the form `<host>:<port>` (for example, `0.0.0.0:6061`).
|
||||||
|
An empty value disables the server (disabled by default).
|
||||||
|
|
||||||
|
### Example
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
apiVersion: barmancloud.cnpg.io/v1
|
||||||
|
kind: ObjectStore
|
||||||
|
metadata:
|
||||||
|
name: my-store
|
||||||
|
spec:
|
||||||
|
instanceSidecarConfiguration:
|
||||||
|
additionalContainerArgs:
|
||||||
|
- "--pprof-server=0.0.0.0:6061"
|
||||||
|
```
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user