feat: finish implementation

Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
This commit is contained in:
Armando Ruocco 2024-11-06 16:37:13 +01:00 committed by Marco Nenciarini
parent cf0ecf19f0
commit 40c2b39e64
No known key found for this signature in database
GPG Key ID: 589F03F01BA55038
5 changed files with 126 additions and 21 deletions

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/cloudnative-pg/machinery/pkg/log"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
) )
@ -27,6 +28,7 @@ func NewExtendedClient(baseClient client.Client, ttl int64) client.Client {
return &ExtendedClient{ return &ExtendedClient{
Client: baseClient, Client: baseClient,
ttl: ttl, ttl: ttl,
mux: &sync.Mutex{},
} }
} }
@ -36,6 +38,10 @@ func (e *ExtendedClient) Get(
obj client.Object, obj client.Object,
opts ...client.GetOption, opts ...client.GetOption,
) error { ) error {
contextLogger := log.FromContext(ctx).
WithName("extended_client").
WithValues("name", key.Name, "namespace", key.Namespace)
if e.isCacheDisabled() { if e.isCacheDisabled() {
return e.Client.Get(ctx, key, obj, opts...) return e.Client.Get(ctx, key, obj, opts...)
} }
@ -47,37 +53,36 @@ func (e *ExtendedClient) Get(
e.mux.Lock() e.mux.Lock()
defer e.mux.Unlock() defer e.mux.Unlock()
expiredSecretIndex := -1
// check if in cache // check if in cache
for _, cache := range e.cachedSecrets { for idx, cache := range e.cachedSecrets {
if cache.secret.Namespace == key.Namespace && cache.secret.Name == key.Name { if cache.secret.Namespace != key.Namespace || cache.secret.Name != key.Name {
if !e.isExpired(cache.fetchUnixTime) { continue
cache.secret.DeepCopyInto(obj.(*corev1.Secret)) }
return nil if e.isExpired(cache.fetchUnixTime) {
} contextLogger.Trace("secret found, but it is expired")
expiredSecretIndex = idx
break break
} }
contextLogger.Trace("secret found, loading it from cache")
cache.secret.DeepCopyInto(obj.(*corev1.Secret))
return nil
} }
if err := e.Client.Get(ctx, key, obj); err != nil { if err := e.Client.Get(ctx, key, obj); err != nil {
return err return err
} }
secret := obj.(*corev1.Secret) cs := &cachedSecret{
secret: obj.(*corev1.Secret).DeepCopy(),
// check if the secret is already in cache if so replace it fetchUnixTime: time.Now().Unix(),
for _, cache := range e.cachedSecrets {
if cache.secret.Namespace == key.Namespace && cache.secret.Name == key.Name {
cache.secret = secret.DeepCopy()
cache.fetchUnixTime = time.Now().Unix()
return nil
}
} }
// otherwise add it to the cache if expiredSecretIndex != -1 {
e.cachedSecrets = append(e.cachedSecrets, &cachedSecret{ e.cachedSecrets[expiredSecretIndex] = cs
secret: secret.DeepCopy(), } else {
fetchUnixTime: time.Now().Unix(), e.cachedSecrets = append(e.cachedSecrets, cs)
}) }
return nil return nil
} }

View File

@ -0,0 +1,84 @@
package client
import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var _ = Describe("ExtendedClient Get", func() {
var (
extendedClient *ExtendedClient
secretInClient *corev1.Secret
)
BeforeEach(func() {
secretInClient = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-secret",
},
}
baseClient := fake.NewClientBuilder().WithObjects(secretInClient).Build()
extendedClient = NewExtendedClient(baseClient, 60).(*ExtendedClient)
})
It("returns secret from cache if not expired", func(ctx SpecContext) {
secretNotInClient := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-secret-not-in-client",
},
}
// manually add the secret to the cache, this is not present in the fake client so we are sure it is from the
// cache
extendedClient.cachedSecrets = []*cachedSecret{
{
secret: secretNotInClient,
fetchUnixTime: time.Now().Unix(),
},
}
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretNotInClient), secretInClient)
Expect(err).NotTo(HaveOccurred())
Expect(secretNotInClient).To(Equal(extendedClient.cachedSecrets[0].secret))
})
It("fetches secret from base client if cache is expired", func(ctx SpecContext) {
extendedClient.cachedSecrets = []*cachedSecret{
{
secret: secretInClient.DeepCopy(),
fetchUnixTime: time.Now().Add(-2 * time.Minute).Unix(),
},
}
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretInClient), secretInClient)
Expect(err).NotTo(HaveOccurred())
})
It("fetches secret from base client if not in cache", func(ctx SpecContext) {
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretInClient), secretInClient)
Expect(err).NotTo(HaveOccurred())
})
It("does not cache non-secret objects", func(ctx SpecContext) {
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-configmap",
},
}
err := extendedClient.Create(ctx, configMap)
Expect(err).ToNot(HaveOccurred())
err = extendedClient.Get(ctx, client.ObjectKeyFromObject(configMap), configMap)
Expect(err).NotTo(HaveOccurred())
Expect(extendedClient.cachedSecrets).To(BeEmpty())
})
})

View File

@ -0,0 +1,13 @@
package client_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestClient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Client Suite")
}

View File

@ -40,6 +40,7 @@ func NewCmd() *cobra.Command {
_ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("secret-cache-ttl", "SECRET_CACHE_TTL")
return cmd return cmd
} }

View File

@ -18,6 +18,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
extendedclient "github.com/cloudnative-pg/plugin-barman-cloud/internal/client"
) )
var scheme = runtime.NewScheme() var scheme = runtime.NewScheme()
@ -36,6 +37,7 @@ func Start(ctx context.Context) error {
boName := viper.GetString("barman-object-name") boName := viper.GetString("barman-object-name")
clusterName := viper.GetString("cluster-name") clusterName := viper.GetString("cluster-name")
podName := viper.GetString("pod-name") podName := viper.GetString("pod-name")
secretCacheTTL := viper.GetInt64("secret-cache-ttl")
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme, Scheme: scheme,
@ -69,7 +71,7 @@ func Start(ctx context.Context) error {
} }
if err := mgr.Add(&CNPGI{ if err := mgr.Add(&CNPGI{
Client: mgr.GetClient(), Client: extendedclient.NewExtendedClient(mgr.GetClient(), secretCacheTTL),
ClusterObjectKey: client.ObjectKey{ ClusterObjectKey: client.ObjectKey{
Namespace: namespace, Namespace: namespace,
Name: clusterName, Name: clusterName,