mirror of
https://github.com/cloudnative-pg/plugin-barman-cloud.git
synced 2026-01-12 05:33:11 +01:00
fix(object-cache): improve reliability of object cache management (#508)
* make sure objects expire after DefaultTTLSeconds * make cached objects have GKV information * fix cache retrieval and removal logic Closes #502 Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com> Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com> Co-authored-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
This commit is contained in:
parent
3c0d8c3a33
commit
8c3db955ef
@ -9,7 +9,6 @@ import (
|
|||||||
|
|
||||||
"github.com/cloudnative-pg/machinery/pkg/log"
|
"github.com/cloudnative-pg/machinery/pkg/log"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
|
||||||
pluginBarman "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
|
pluginBarman "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
|
||||||
@ -21,11 +20,11 @@ const DefaultTTLSeconds = 10
|
|||||||
type cachedEntry struct {
|
type cachedEntry struct {
|
||||||
entry client.Object
|
entry client.Object
|
||||||
fetchUnixTime int64
|
fetchUnixTime int64
|
||||||
ttl time.Duration
|
ttlSeconds int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *cachedEntry) isExpired() bool {
|
func (e *cachedEntry) isExpired() bool {
|
||||||
return time.Now().Unix()-e.fetchUnixTime > int64(e.ttl)
|
return time.Now().Unix()-e.fetchUnixTime > e.ttlSeconds
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
|
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
|
||||||
@ -91,7 +90,7 @@ func (e *ExtendedClient) getCachedObject(
|
|||||||
if cacheEntry.entry.GetNamespace() != key.Namespace || cacheEntry.entry.GetName() != key.Name {
|
if cacheEntry.entry.GetNamespace() != key.Namespace || cacheEntry.entry.GetName() != key.Name {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if cacheEntry.entry.GetObjectKind().GroupVersionKind() != obj.GetObjectKind().GroupVersionKind() {
|
if reflect.TypeOf(cacheEntry.entry) != reflect.TypeOf(obj) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if cacheEntry.isExpired() {
|
if cacheEntry.isExpired() {
|
||||||
@ -121,8 +120,9 @@ func (e *ExtendedClient) getCachedObject(
|
|||||||
}
|
}
|
||||||
|
|
||||||
cs := cachedEntry{
|
cs := cachedEntry{
|
||||||
entry: obj.(runtime.Object).DeepCopyObject().(client.Object),
|
entry: obj.DeepCopyObject().(client.Object),
|
||||||
fetchUnixTime: time.Now().Unix(),
|
fetchUnixTime: time.Now().Unix(),
|
||||||
|
ttlSeconds: DefaultTTLSeconds,
|
||||||
}
|
}
|
||||||
|
|
||||||
contextLogger.Debug("setting object in the cache")
|
contextLogger.Debug("setting object in the cache")
|
||||||
@ -143,7 +143,7 @@ func (e *ExtendedClient) removeObject(object client.Object) {
|
|||||||
for i, cache := range e.cachedObjects {
|
for i, cache := range e.cachedObjects {
|
||||||
if cache.entry.GetNamespace() == object.GetNamespace() &&
|
if cache.entry.GetNamespace() == object.GetNamespace() &&
|
||||||
cache.entry.GetName() == object.GetName() &&
|
cache.entry.GetName() == object.GetName() &&
|
||||||
cache.entry.GetObjectKind().GroupVersionKind() != object.GetObjectKind().GroupVersionKind() {
|
reflect.TypeOf(cache.entry) == reflect.TypeOf(object) {
|
||||||
e.cachedObjects = append(e.cachedObjects[:i], e.cachedObjects[i+1:]...)
|
e.cachedObjects = append(e.cachedObjects[:i], e.cachedObjects[i+1:]...)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,7 +9,7 @@ import (
|
|||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||||
|
|
||||||
v1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
|
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo/v2"
|
. "github.com/onsi/ginkgo/v2"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
@ -20,16 +20,26 @@ var scheme = buildScheme()
|
|||||||
func buildScheme() *runtime.Scheme {
|
func buildScheme() *runtime.Scheme {
|
||||||
scheme := runtime.NewScheme()
|
scheme := runtime.NewScheme()
|
||||||
_ = corev1.AddToScheme(scheme)
|
_ = corev1.AddToScheme(scheme)
|
||||||
_ = v1.AddToScheme(scheme)
|
_ = barmancloudv1.AddToScheme(scheme)
|
||||||
|
|
||||||
return scheme
|
return scheme
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addToCache(c *ExtendedClient, obj client.Object, fetchUnixTime int64) {
|
||||||
|
ce := cachedEntry{
|
||||||
|
entry: obj.DeepCopyObject().(client.Object),
|
||||||
|
fetchUnixTime: fetchUnixTime,
|
||||||
|
ttlSeconds: DefaultTTLSeconds,
|
||||||
|
}
|
||||||
|
ce.entry.SetResourceVersion("from cache")
|
||||||
|
c.cachedObjects = append(c.cachedObjects, ce)
|
||||||
|
}
|
||||||
|
|
||||||
var _ = Describe("ExtendedClient Get", func() {
|
var _ = Describe("ExtendedClient Get", func() {
|
||||||
var (
|
var (
|
||||||
extendedClient *ExtendedClient
|
extendedClient *ExtendedClient
|
||||||
secretInClient *corev1.Secret
|
secretInClient *corev1.Secret
|
||||||
objectStore *v1.ObjectStore
|
objectStore *barmancloudv1.ObjectStore
|
||||||
)
|
)
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
@ -39,12 +49,12 @@ var _ = Describe("ExtendedClient Get", func() {
|
|||||||
Name: "test-secret",
|
Name: "test-secret",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
objectStore = &v1.ObjectStore{
|
objectStore = &barmancloudv1.ObjectStore{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Namespace: "default",
|
Namespace: "default",
|
||||||
Name: "test-object-store",
|
Name: "test-object-store",
|
||||||
},
|
},
|
||||||
Spec: v1.ObjectStoreSpec{},
|
Spec: barmancloudv1.ObjectStoreSpec{},
|
||||||
}
|
}
|
||||||
|
|
||||||
baseClient := fake.NewClientBuilder().
|
baseClient := fake.NewClientBuilder().
|
||||||
@ -61,35 +71,34 @@ var _ = Describe("ExtendedClient Get", func() {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// manually add the secret to the cache, this is not present in the fake client so we are sure it is from the
|
// manually add the secret to the cache, this is not present in the fake client,
|
||||||
// cache
|
// so we are sure it is from the cache
|
||||||
extendedClient.cachedObjects = []cachedEntry{
|
addToCache(extendedClient, secretNotInClient, time.Now().Unix())
|
||||||
{
|
|
||||||
entry: secretNotInClient,
|
|
||||||
fetchUnixTime: time.Now().Unix(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretNotInClient), secretInClient)
|
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretNotInClient), secretInClient)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(secretNotInClient).To(Equal(extendedClient.cachedObjects[0].entry))
|
Expect(secretInClient).To(Equal(extendedClient.cachedObjects[0].entry))
|
||||||
|
Expect(secretInClient.GetResourceVersion()).To(Equal("from cache"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("fetches secret from base client if cache is expired", func(ctx SpecContext) {
|
It("fetches secret from base client if cache is expired", func(ctx SpecContext) {
|
||||||
extendedClient.cachedObjects = []cachedEntry{
|
addToCache(extendedClient, secretInClient, time.Now().Add(-2*time.Minute).Unix())
|
||||||
{
|
|
||||||
entry: secretInClient.DeepCopy(),
|
|
||||||
fetchUnixTime: time.Now().Add(-2 * time.Minute).Unix(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretInClient), secretInClient)
|
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretInClient), secretInClient)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(secretInClient.GetResourceVersion()).NotTo(Equal("from cache"))
|
||||||
|
|
||||||
|
// the cache is updated with the new value
|
||||||
|
Expect(extendedClient.cachedObjects).To(HaveLen(1))
|
||||||
|
Expect(extendedClient.cachedObjects[0].entry.GetResourceVersion()).NotTo(Equal("from cache"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("fetches secret from base client if not in cache", func(ctx SpecContext) {
|
It("fetches secret from base client if not in cache", func(ctx SpecContext) {
|
||||||
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretInClient), secretInClient)
|
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretInClient), secretInClient)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
// the cache is updated with the new value
|
||||||
|
Expect(extendedClient.cachedObjects).To(HaveLen(1))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("does not cache non-secret objects", func(ctx SpecContext) {
|
It("does not cache non-secret objects", func(ctx SpecContext) {
|
||||||
@ -106,4 +115,33 @@ var _ = Describe("ExtendedClient Get", func() {
|
|||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(extendedClient.cachedObjects).To(BeEmpty())
|
Expect(extendedClient.cachedObjects).To(BeEmpty())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("returns the correct object from cache when multiple objects with the same object key are cached",
|
||||||
|
func(ctx SpecContext) {
|
||||||
|
secretNotInClient := &corev1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "default",
|
||||||
|
Name: "common-name",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
objectStoreNotInClient := &barmancloudv1.ObjectStore{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "default",
|
||||||
|
Name: "common-name",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// manually add the objects to the cache, these are not present in the fake client,
|
||||||
|
// so we are sure they are from the cache
|
||||||
|
addToCache(extendedClient, secretNotInClient, time.Now().Unix())
|
||||||
|
addToCache(extendedClient, objectStoreNotInClient, time.Now().Unix())
|
||||||
|
|
||||||
|
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretNotInClient), secretInClient)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = extendedClient.Get(ctx, client.ObjectKeyFromObject(objectStoreNotInClient), objectStore)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
Expect(secretInClient.GetResourceVersion()).To(Equal("from cache"))
|
||||||
|
Expect(objectStore.GetResourceVersion()).To(Equal("from cache"))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user