chore: add secrets cache (#47)

Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
Signed-off-by: Francesco Canovai <francesco.canovai@enterprisedb.com>
Co-authored-by: Francesco Canovai <francesco.canovai@enterprisedb.com>
This commit is contained in:
Armando Ruocco 2024-11-14 16:10:17 +01:00 committed by GitHub
parent 74bc9e221e
commit beef96e12f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 435 additions and 28 deletions

View File

@ -45,11 +45,11 @@ help: ## Display this help.
.PHONY: manifests
manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and CustomResourceDefinition objects.
$(CONTROLLER_GEN) rbac:roleName=plugin-barman-cloud crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
$(CONTROLLER_GEN) rbac:roleName=plugin-barman-cloud crd webhook paths="./api/..." output:crd:artifacts:config=config/crd/bases
.PHONY: generate
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./api/..."
.PHONY: fmt
fmt: ## Run go fmt against code.

20
api/v1/doc.go Normal file
View File

@ -0,0 +1,20 @@
/*
Copyright The CloudNativePG Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package v1 contains API Schema definitions for the barmancloud v1 API group
// +kubebuilder:object:generate=true
// +groupName=barmancloud.cnpg.io
package v1

View File

@ -21,14 +21,30 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// InstanceSidecarConfiguration defines the configuration for the sidecar that runs in the instance pods.
type InstanceSidecarConfiguration struct {
// The expiration time of the cache entries not managed by the informers. Expressed in seconds.
// +optional
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=3600
// +kubebuilder:default=180
CacheTTL *int `json:"cacheTTL,omitempty"`
}
// GetCacheTTL returns the cache TTL value, defaulting to 180 seconds if not set.
func (i InstanceSidecarConfiguration) GetCacheTTL() int {
if i.CacheTTL == nil {
return 180
}
return *i.CacheTTL
}
// ObjectStoreSpec defines the desired state of ObjectStore.
type ObjectStoreSpec struct {
Configuration barmanapi.BarmanObjectStoreConfiguration `json:"configuration"`
// TODO: we add here any exclusive fields for our plugin CRD
// +optional
InstanceSidecarConfiguration InstanceSidecarConfiguration `json:"instanceSidecarConfiguration,omitempty"`
}
// ObjectStoreStatus defines the observed state of ObjectStore.
@ -39,13 +55,16 @@ type ObjectStoreStatus struct {
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +genclient
// +kubebuilder:storageversion
// ObjectStore is the Schema for the objectstores API.
type ObjectStore struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
metav1.ObjectMeta `json:"metadata"`
Spec ObjectStoreSpec `json:"spec,omitempty"`
Spec ObjectStoreSpec `json:"spec"`
// +optional
Status ObjectStoreStatus `json:"status,omitempty"`
}

View File

@ -24,6 +24,26 @@ import (
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *InstanceSidecarConfiguration) DeepCopyInto(out *InstanceSidecarConfiguration) {
*out = *in
if in.CacheTTL != nil {
in, out := &in.CacheTTL, &out.CacheTTL
*out = new(int)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InstanceSidecarConfiguration.
func (in *InstanceSidecarConfiguration) DeepCopy() *InstanceSidecarConfiguration {
if in == nil {
return nil
}
out := new(InstanceSidecarConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ObjectStore) DeepCopyInto(out *ObjectStore) {
*out = *in
@ -87,6 +107,7 @@ func (in *ObjectStoreList) DeepCopyObject() runtime.Object {
func (in *ObjectStoreSpec) DeepCopyInto(out *ObjectStoreSpec) {
*out = *in
in.Configuration.DeepCopyInto(&out.Configuration)
in.InstanceSidecarConfiguration.DeepCopyInto(&out.InstanceSidecarConfiguration)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ObjectStoreSpec.

View File

@ -378,6 +378,18 @@ spec:
required:
- destinationPath
type: object
instanceSidecarConfiguration:
description: InstanceSidecarConfiguration defines the configuration
for the sidecar that runs in the instance pods.
properties:
cacheTTL:
default: 180
description: The expiration time of the cache entries not managed
by the informers. Expressed in seconds.
maximum: 3600
minimum: 0
type: integer
type: object
required:
- configuration
type: object

4
go.mod
View File

@ -9,8 +9,8 @@ require (
github.com/cloudnative-pg/barman-cloud v0.0.0-20241105055149-ae6c2408bd14
github.com/cloudnative-pg/cloudnative-pg v1.24.1-0.20241113134512-8608232c2813
github.com/cloudnative-pg/cnpg-i v0.0.0-20241109002750-8abd359df734
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241014090747-e9c2b3738d19
github.com/cloudnative-pg/machinery v0.0.0-20241030141148-670a0f16f836
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241030141108-7e59fc9f4797
github.com/cloudnative-pg/machinery v0.0.0-20241105070525-042a028b767c
github.com/onsi/ginkgo/v2 v2.21.0
github.com/onsi/gomega v1.35.1
github.com/spf13/cobra v1.8.1

8
go.sum
View File

@ -24,10 +24,10 @@ github.com/cloudnative-pg/cloudnative-pg v1.24.1-0.20241113134512-8608232c2813 h
github.com/cloudnative-pg/cloudnative-pg v1.24.1-0.20241113134512-8608232c2813/go.mod h1:f4hObdRVoQtMmVtWqZ6VDZBrI6ok9Td/UMhojQ+EPmk=
github.com/cloudnative-pg/cnpg-i v0.0.0-20241109002750-8abd359df734 h1:4jq/FUrlAKxu0Kw9PL5lj5Njq8pAnmUpP/kXKOrJAaE=
github.com/cloudnative-pg/cnpg-i v0.0.0-20241109002750-8abd359df734/go.mod h1:3U7miYasKr2rYCQzrn/IvbSQc0OpYF8ieZt2FKG4nv0=
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241014090747-e9c2b3738d19 h1:qy+LrScvQpIwt4qeg9FfCJuoC9CbX/kpFGLF8vSobXg=
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241014090747-e9c2b3738d19/go.mod h1:X6r1fRuUEIAv4+5SSBY2RmQ201K6GcptOXgnmaX/8tY=
github.com/cloudnative-pg/machinery v0.0.0-20241030141148-670a0f16f836 h1:Hhg+I2QcaPNN5XaSsYb7Xw3PbQlvCA9eDY+SvVf902Q=
github.com/cloudnative-pg/machinery v0.0.0-20241030141148-670a0f16f836/go.mod h1:+mUFdys1IX+qwQUrV+/i56Tey/mYh8ZzWZYttwivRns=
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241030141108-7e59fc9f4797 h1:8iaPgTx16yzx8rrhOi99u+GWGp47kqveF9NShElsYKM=
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241030141108-7e59fc9f4797/go.mod h1:X6r1fRuUEIAv4+5SSBY2RmQ201K6GcptOXgnmaX/8tY=
github.com/cloudnative-pg/machinery v0.0.0-20241105070525-042a028b767c h1:t0RBU2gBiwJQ9XGeXlHPBYpsTscSKHgB5TfcWaiwanc=
github.com/cloudnative-pg/machinery v0.0.0-20241105070525-042a028b767c/go.mod h1:uBHGRIk5rt07mO4zjIC1uvGBWTH6PqIiD1PfpvPGZKU=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -0,0 +1,199 @@
package client
import (
"context"
"fmt"
"sync"
"time"
"github.com/cloudnative-pg/machinery/pkg/log"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
v1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
)
type cachedSecret struct {
secret *corev1.Secret
fetchUnixTime int64
}
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
type ExtendedClient struct {
client.Client
barmanObjectKey client.ObjectKey
cachedSecrets []*cachedSecret
mux *sync.Mutex
ttl int
}
// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation
func NewExtendedClient(
baseClient client.Client,
objectStoreKey client.ObjectKey,
) client.Client {
return &ExtendedClient{
Client: baseClient,
barmanObjectKey: objectStoreKey,
mux: &sync.Mutex{},
}
}
func (e *ExtendedClient) refreshTTL(ctx context.Context) error {
var object v1.ObjectStore
if err := e.Get(ctx, e.barmanObjectKey, &object); err != nil {
return fmt.Errorf("failed to get the object store while refreshing the TTL parameter: %w", err)
}
e.ttl = object.Spec.InstanceSidecarConfiguration.GetCacheTTL()
return nil
}
// Get behaves like the original Get method, but uses a cache for secrets
func (e *ExtendedClient) Get(
ctx context.Context,
key client.ObjectKey,
obj client.Object,
opts ...client.GetOption,
) error {
contextLogger := log.FromContext(ctx).
WithName("extended_client").
WithValues("name", key.Name, "namespace", key.Namespace)
if _, ok := obj.(*corev1.Secret); !ok {
contextLogger.Trace("not a secret, skipping")
return e.Client.Get(ctx, key, obj, opts...)
}
if err := e.refreshTTL(ctx); err != nil {
return err
}
if e.isCacheDisabled() {
contextLogger.Trace("cache is disabled")
return e.Client.Get(ctx, key, obj, opts...)
}
contextLogger.Trace("locking the cache")
e.mux.Lock()
defer e.mux.Unlock()
expiredSecretIndex := -1
// check if in cache
for idx, cache := range e.cachedSecrets {
if cache.secret.Namespace != key.Namespace || cache.secret.Name != key.Name {
continue
}
if e.isExpired(cache.fetchUnixTime) {
contextLogger.Trace("secret found, but it is expired")
expiredSecretIndex = idx
break
}
contextLogger.Debug("secret found, loading it from cache")
cache.secret.DeepCopyInto(obj.(*corev1.Secret))
return nil
}
if err := e.Client.Get(ctx, key, obj); err != nil {
return err
}
cs := &cachedSecret{
secret: obj.(*corev1.Secret).DeepCopy(),
fetchUnixTime: time.Now().Unix(),
}
contextLogger.Debug("setting secret in the cache")
if expiredSecretIndex != -1 {
e.cachedSecrets[expiredSecretIndex] = cs
} else {
e.cachedSecrets = append(e.cachedSecrets, cs)
}
return nil
}
func (e *ExtendedClient) isExpired(unixTime int64) bool {
return time.Now().Unix()-unixTime > int64(e.ttl)
}
func (e *ExtendedClient) isCacheDisabled() bool {
const noCache = 0
return e.ttl == noCache
}
// RemoveSecret ensures that a secret is not present in the cache
func (e *ExtendedClient) RemoveSecret(key client.ObjectKey) {
if e.isCacheDisabled() {
return
}
e.mux.Lock()
defer e.mux.Unlock()
for i, cache := range e.cachedSecrets {
if cache.secret.Namespace == key.Namespace && cache.secret.Name == key.Name {
e.cachedSecrets = append(e.cachedSecrets[:i], e.cachedSecrets[i+1:]...)
return
}
}
}
// Update behaves like the original Update method, but on secrets it removes the secret from the cache
func (e *ExtendedClient) Update(
ctx context.Context,
obj client.Object,
opts ...client.UpdateOption,
) error {
if e.isCacheDisabled() {
return e.Client.Update(ctx, obj, opts...)
}
if _, ok := obj.(*corev1.Secret); !ok {
return e.Client.Update(ctx, obj, opts...)
}
e.RemoveSecret(client.ObjectKeyFromObject(obj))
return e.Client.Update(ctx, obj, opts...)
}
// Delete behaves like the original Delete method, but on secrets it removes the secret from the cache
func (e *ExtendedClient) Delete(
ctx context.Context,
obj client.Object,
opts ...client.DeleteOption,
) error {
if e.isCacheDisabled() {
return e.Client.Delete(ctx, obj, opts...)
}
if _, ok := obj.(*corev1.Secret); !ok {
return e.Client.Delete(ctx, obj, opts...)
}
e.RemoveSecret(client.ObjectKeyFromObject(obj))
return e.Client.Delete(ctx, obj, opts...)
}
// Patch behaves like the original Patch method, but on secrets it removes the secret from the cache
func (e *ExtendedClient) Patch(
ctx context.Context,
obj client.Object,
patch client.Patch,
opts ...client.PatchOption,
) error {
if e.isCacheDisabled() {
return e.Client.Patch(ctx, obj, patch, opts...)
}
if _, ok := obj.(*corev1.Secret); !ok {
return e.Client.Patch(ctx, obj, patch, opts...)
}
e.RemoveSecret(client.ObjectKeyFromObject(obj))
return e.Client.Patch(ctx, obj, patch, opts...)
}

View File

@ -0,0 +1,114 @@
package client
import (
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
v1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var scheme = buildScheme()
func buildScheme() *runtime.Scheme {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
_ = v1.AddToScheme(scheme)
return scheme
}
var _ = Describe("ExtendedClient Get", func() {
var (
extendedClient *ExtendedClient
secretInClient *corev1.Secret
objectStore *v1.ObjectStore
)
BeforeEach(func() {
secretInClient = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-secret",
},
}
objectStore = &v1.ObjectStore{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-object-store",
},
Spec: v1.ObjectStoreSpec{
InstanceSidecarConfiguration: v1.InstanceSidecarConfiguration{
CacheTTL: ptr.To(60),
},
},
}
baseClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(secretInClient, objectStore).Build()
extendedClient = NewExtendedClient(baseClient, client.ObjectKeyFromObject(objectStore)).(*ExtendedClient)
})
It("returns secret from cache if not expired", func(ctx SpecContext) {
secretNotInClient := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-secret-not-in-client",
},
}
// manually add the secret to the cache, this is not present in the fake client so we are sure it is from the
// cache
extendedClient.cachedSecrets = []*cachedSecret{
{
secret: secretNotInClient,
fetchUnixTime: time.Now().Unix(),
},
}
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretNotInClient), secretInClient)
Expect(err).NotTo(HaveOccurred())
Expect(secretNotInClient).To(Equal(extendedClient.cachedSecrets[0].secret))
})
It("fetches secret from base client if cache is expired", func(ctx SpecContext) {
extendedClient.cachedSecrets = []*cachedSecret{
{
secret: secretInClient.DeepCopy(),
fetchUnixTime: time.Now().Add(-2 * time.Minute).Unix(),
},
}
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretInClient), secretInClient)
Expect(err).NotTo(HaveOccurred())
})
It("fetches secret from base client if not in cache", func(ctx SpecContext) {
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretInClient), secretInClient)
Expect(err).NotTo(HaveOccurred())
})
It("does not cache non-secret objects", func(ctx SpecContext) {
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-configmap",
},
}
err := extendedClient.Create(ctx, configMap)
Expect(err).ToNot(HaveOccurred())
err = extendedClient.Get(ctx, client.ObjectKeyFromObject(configMap), configMap)
Expect(err).NotTo(HaveOccurred())
Expect(extendedClient.cachedSecrets).To(BeEmpty())
})
})

View File

@ -0,0 +1,3 @@
// Package client provides an extended client that is capable of caching multiple secrets without relying on
// informers
package client

View File

@ -0,0 +1,13 @@
package client_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestClient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Client Suite")
}

View File

@ -18,6 +18,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
extendedclient "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/instance/internal/client"
)
var scheme = runtime.NewScheme()
@ -68,17 +69,19 @@ func Start(ctx context.Context) error {
os.Exit(1)
}
barmanObjectKey := client.ObjectKey{
Namespace: namespace,
Name: boName,
}
if err := mgr.Add(&CNPGI{
Client: mgr.GetClient(),
Client: extendedclient.NewExtendedClient(mgr.GetClient(), barmanObjectKey),
ClusterObjectKey: client.ObjectKey{
Namespace: namespace,
Name: clusterName,
},
BarmanObjectKey: client.ObjectKey{
Namespace: namespace,
Name: boName,
},
InstanceName: podName,
BarmanObjectKey: barmanObjectKey,
InstanceName: podName,
// TODO: improve
PGDataPath: viper.GetString("pgdata"),
PGWALPath: path.Join(viper.GetString("pgdata"), "pg_wal"),

View File

@ -65,6 +65,9 @@ func (w WALServiceImplementation) Archive(
ctx context.Context,
request *wal.WALArchiveRequest,
) (*wal.WALArchiveResult, error) {
contextLogger := log.FromContext(ctx)
contextLogger.Debug("starting wal archive")
var cluster cnpgv1.Cluster
if err := w.Client.Get(ctx, w.ClusterObjectKey, &cluster); err != nil {
return nil, err

View File

@ -62,8 +62,8 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration {
return result
}
// ValidateBarmanObjectName checks if the barmanObjectName is set
func (p *PluginConfiguration) ValidateBarmanObjectName() error {
// Validate checks if the barmanObjectName is set
func (p *PluginConfiguration) Validate() error {
err := NewConfigurationError()
if len(p.BarmanObjectName) != 0 {
return nil

View File

@ -85,6 +85,12 @@ func (impl LifecycleImplementation) LifecycleHook(
pluginConfiguration := config.NewFromCluster(&cluster)
// barman object is required for both the archive and restore process
if err := pluginConfiguration.Validate(); err != nil {
contextLogger.Info("pluginConfiguration invalid, skipping lifecycle", "error", err)
return nil, nil
}
switch kind {
case "Pod":
contextLogger.Info("Reconciling pod")
@ -159,18 +165,12 @@ func reconcilePod(
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
contextLogger := log.FromContext(ctx).WithName("lifecycle")
if err := pluginConfiguration.ValidateBarmanObjectName(); err != nil {
contextLogger.Info("no barman object name set, skipping pod sidecar injection")
return nil, nil
}
pod, err := decoder.DecodePodJSON(request.GetObjectDefinition())
if err != nil {
return nil, err
}
contextLogger = log.FromContext(ctx).WithName("plugin-barman-cloud-lifecycle").
contextLogger := log.FromContext(ctx).WithName("plugin-barman-cloud-lifecycle").
WithValues("podName", pod.Name)
mutatedPod := pod.DeepCopy()