mirror of
https://github.com/cloudnative-pg/plugin-barman-cloud.git
synced 2026-01-11 21:23:12 +01:00
feat: secret cache
Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
This commit is contained in:
parent
40c2b39e64
commit
7165fe9652
4
Makefile
4
Makefile
@ -45,11 +45,11 @@ help: ## Display this help.
|
||||
|
||||
.PHONY: manifests
|
||||
manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and CustomResourceDefinition objects.
|
||||
$(CONTROLLER_GEN) rbac:roleName=plugin-barman-cloud crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
|
||||
$(CONTROLLER_GEN) rbac:roleName=plugin-barman-cloud crd webhook paths="./api/..." output:crd:artifacts:config=config/crd/bases
|
||||
|
||||
.PHONY: generate
|
||||
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
|
||||
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."
|
||||
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./api/..."
|
||||
|
||||
.PHONY: fmt
|
||||
fmt: ## Run go fmt against code.
|
||||
|
||||
@ -21,14 +21,29 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
|
||||
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
|
||||
// InstanceSidecarConfiguration defines the configuration for the sidecar that runs in the instance pods.
|
||||
type InstanceSidecarConfiguration struct {
|
||||
// The expiration time of the cache entries not managed by the informers. Expressed in seconds.
|
||||
// +optional
|
||||
// +kubebuilder:validation:Minimum=0
|
||||
// +kubebuilder:validation:Maximum=3600
|
||||
// +kubebuilder:default=180
|
||||
CacheTTL *int `json:"cacheTTL,omitempty"`
|
||||
}
|
||||
|
||||
// GetCacheTTL returns the cache TTL value, defaulting to 180 seconds if not set.
|
||||
func (i InstanceSidecarConfiguration) GetCacheTTL() int {
|
||||
if i.CacheTTL == nil {
|
||||
return 180
|
||||
}
|
||||
return *i.CacheTTL
|
||||
}
|
||||
|
||||
// ObjectStoreSpec defines the desired state of ObjectStore.
|
||||
type ObjectStoreSpec struct {
|
||||
Configuration barmanapi.BarmanObjectStoreConfiguration `json:"configuration"`
|
||||
|
||||
// TODO: we add here any exclusive fields for our plugin CRD
|
||||
InstanceSidecarConfiguration InstanceSidecarConfiguration `json:"instanceSidecarConfiguration,omitempty"`
|
||||
}
|
||||
|
||||
// ObjectStoreStatus defines the observed state of ObjectStore.
|
||||
|
||||
@ -24,6 +24,26 @@ import (
|
||||
runtime "k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *InstanceSidecarConfiguration) DeepCopyInto(out *InstanceSidecarConfiguration) {
|
||||
*out = *in
|
||||
if in.CacheTTL != nil {
|
||||
in, out := &in.CacheTTL, &out.CacheTTL
|
||||
*out = new(int)
|
||||
**out = **in
|
||||
}
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InstanceSidecarConfiguration.
|
||||
func (in *InstanceSidecarConfiguration) DeepCopy() *InstanceSidecarConfiguration {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(InstanceSidecarConfiguration)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ObjectStore) DeepCopyInto(out *ObjectStore) {
|
||||
*out = *in
|
||||
@ -87,6 +107,7 @@ func (in *ObjectStoreList) DeepCopyObject() runtime.Object {
|
||||
func (in *ObjectStoreSpec) DeepCopyInto(out *ObjectStoreSpec) {
|
||||
*out = *in
|
||||
in.Configuration.DeepCopyInto(&out.Configuration)
|
||||
in.InstanceSidecarConfiguration.DeepCopyInto(&out.InstanceSidecarConfiguration)
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ObjectStoreSpec.
|
||||
|
||||
@ -378,6 +378,18 @@ spec:
|
||||
required:
|
||||
- destinationPath
|
||||
type: object
|
||||
instanceSidecarConfiguration:
|
||||
description: InstanceSidecarConfiguration defines the configuration
|
||||
for the sidecar that runs in the instance pods.
|
||||
properties:
|
||||
cacheTTL:
|
||||
default: 180
|
||||
description: The expiration time of the cache entries not managed
|
||||
by the informers. Expressed in seconds.
|
||||
maximum: 3600
|
||||
minimum: 0
|
||||
type: integer
|
||||
type: object
|
||||
required:
|
||||
- configuration
|
||||
type: object
|
||||
|
||||
@ -40,7 +40,6 @@ func NewCmd() *cobra.Command {
|
||||
_ = viper.BindEnv("pod-name", "POD_NAME")
|
||||
_ = viper.BindEnv("pgdata", "PGDATA")
|
||||
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
|
||||
_ = viper.BindEnv("secret-cache-ttl", "SECRET_CACHE_TTL")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
@ -2,10 +2,12 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cloudnative-pg/machinery/pkg/log"
|
||||
v1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
@ -15,23 +17,38 @@ type cachedSecret struct {
|
||||
fetchUnixTime int64
|
||||
}
|
||||
|
||||
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on 'list and watch'
|
||||
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
|
||||
type ExtendedClient struct {
|
||||
client.Client
|
||||
cachedSecrets []*cachedSecret
|
||||
mux *sync.Mutex
|
||||
ttl int64
|
||||
barmanObjectKey client.ObjectKey
|
||||
cachedSecrets []*cachedSecret
|
||||
mux *sync.Mutex
|
||||
ttl int
|
||||
}
|
||||
|
||||
// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation
|
||||
func NewExtendedClient(baseClient client.Client, ttl int64) client.Client {
|
||||
func NewExtendedClient(
|
||||
baseClient client.Client,
|
||||
objectStoreKey client.ObjectKey,
|
||||
) client.Client {
|
||||
return &ExtendedClient{
|
||||
Client: baseClient,
|
||||
ttl: ttl,
|
||||
mux: &sync.Mutex{},
|
||||
Client: baseClient,
|
||||
barmanObjectKey: objectStoreKey,
|
||||
mux: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ExtendedClient) refreshTTL(ctx context.Context) error {
|
||||
var object v1.ObjectStore
|
||||
if err := e.Get(ctx, e.barmanObjectKey, &object); err != nil {
|
||||
return fmt.Errorf("failed to get the object store while refreshing the TTL parameter: %w", err)
|
||||
}
|
||||
|
||||
e.ttl = object.Spec.InstanceSidecarConfiguration.GetCacheTTL()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *ExtendedClient) Get(
|
||||
ctx context.Context,
|
||||
key client.ObjectKey,
|
||||
@ -42,14 +59,21 @@ func (e *ExtendedClient) Get(
|
||||
WithName("extended_client").
|
||||
WithValues("name", key.Name, "namespace", key.Namespace)
|
||||
|
||||
if e.isCacheDisabled() {
|
||||
return e.Client.Get(ctx, key, obj, opts...)
|
||||
}
|
||||
|
||||
if _, ok := obj.(*corev1.Secret); !ok {
|
||||
contextLogger.Trace("not a secret, skipping")
|
||||
return e.Client.Get(ctx, key, obj, opts...)
|
||||
}
|
||||
|
||||
if err := e.refreshTTL(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if e.isCacheDisabled() {
|
||||
contextLogger.Trace("cache is disabled")
|
||||
return e.Client.Get(ctx, key, obj, opts...)
|
||||
}
|
||||
|
||||
contextLogger.Trace("locking the cache")
|
||||
e.mux.Lock()
|
||||
defer e.mux.Unlock()
|
||||
|
||||
@ -64,7 +88,7 @@ func (e *ExtendedClient) Get(
|
||||
expiredSecretIndex = idx
|
||||
break
|
||||
}
|
||||
contextLogger.Trace("secret found, loading it from cache")
|
||||
contextLogger.Debug("secret found, loading it from cache")
|
||||
cache.secret.DeepCopyInto(obj.(*corev1.Secret))
|
||||
return nil
|
||||
}
|
||||
@ -78,6 +102,7 @@ func (e *ExtendedClient) Get(
|
||||
fetchUnixTime: time.Now().Unix(),
|
||||
}
|
||||
|
||||
contextLogger.Debug("setting secret in the cache")
|
||||
if expiredSecretIndex != -1 {
|
||||
e.cachedSecrets[expiredSecretIndex] = cs
|
||||
} else {
|
||||
@ -88,7 +113,7 @@ func (e *ExtendedClient) Get(
|
||||
}
|
||||
|
||||
func (e *ExtendedClient) isExpired(unixTime int64) bool {
|
||||
return time.Now().Unix()-unixTime > e.ttl
|
||||
return time.Now().Unix()-unixTime > int64(e.ttl)
|
||||
}
|
||||
|
||||
func (e *ExtendedClient) isCacheDisabled() bool {
|
||||
@ -1,20 +1,35 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/utils/ptr"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
v1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var scheme = buildScheme()
|
||||
|
||||
func buildScheme() *runtime.Scheme {
|
||||
scheme := runtime.NewScheme()
|
||||
_ = corev1.AddToScheme(scheme)
|
||||
_ = v1.AddToScheme(scheme)
|
||||
|
||||
return scheme
|
||||
}
|
||||
|
||||
var _ = Describe("ExtendedClient Get", func() {
|
||||
var (
|
||||
extendedClient *ExtendedClient
|
||||
secretInClient *corev1.Secret
|
||||
objectStore *v1.ObjectStore
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
@ -24,8 +39,22 @@ var _ = Describe("ExtendedClient Get", func() {
|
||||
Name: "test-secret",
|
||||
},
|
||||
}
|
||||
baseClient := fake.NewClientBuilder().WithObjects(secretInClient).Build()
|
||||
extendedClient = NewExtendedClient(baseClient, 60).(*ExtendedClient)
|
||||
objectStore = &v1.ObjectStore{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "test-object-store",
|
||||
},
|
||||
Spec: v1.ObjectStoreSpec{
|
||||
InstanceSidecarConfiguration: v1.InstanceSidecarConfiguration{
|
||||
CacheTTL: ptr.To(60),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
baseClient := fake.NewClientBuilder().
|
||||
WithScheme(scheme).
|
||||
WithObjects(secretInClient, objectStore).Build()
|
||||
extendedClient = NewExtendedClient(baseClient, client.ObjectKeyFromObject(objectStore)).(*ExtendedClient)
|
||||
})
|
||||
|
||||
It("returns secret from cache if not expired", func(ctx SpecContext) {
|
||||
@ -1,3 +1,3 @@
|
||||
// Package client provides an extended client that is capable of caching multiple secrets without relying on
|
||||
// 'list and watch'
|
||||
// informers
|
||||
package client
|
||||
@ -2,6 +2,7 @@ package instance
|
||||
|
||||
import (
|
||||
"context"
|
||||
extendedclient "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/instance/internal/client"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
@ -18,7 +19,6 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
|
||||
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
|
||||
extendedclient "github.com/cloudnative-pg/plugin-barman-cloud/internal/client"
|
||||
)
|
||||
|
||||
var scheme = runtime.NewScheme()
|
||||
@ -37,7 +37,6 @@ func Start(ctx context.Context) error {
|
||||
boName := viper.GetString("barman-object-name")
|
||||
clusterName := viper.GetString("cluster-name")
|
||||
podName := viper.GetString("pod-name")
|
||||
secretCacheTTL := viper.GetInt64("secret-cache-ttl")
|
||||
|
||||
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
|
||||
Scheme: scheme,
|
||||
@ -70,17 +69,19 @@ func Start(ctx context.Context) error {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
barmanObjectKey := client.ObjectKey{
|
||||
Namespace: namespace,
|
||||
Name: boName,
|
||||
}
|
||||
|
||||
if err := mgr.Add(&CNPGI{
|
||||
Client: extendedclient.NewExtendedClient(mgr.GetClient(), secretCacheTTL),
|
||||
Client: extendedclient.NewExtendedClient(mgr.GetClient(), barmanObjectKey),
|
||||
ClusterObjectKey: client.ObjectKey{
|
||||
Namespace: namespace,
|
||||
Name: clusterName,
|
||||
},
|
||||
BarmanObjectKey: client.ObjectKey{
|
||||
Namespace: namespace,
|
||||
Name: boName,
|
||||
},
|
||||
InstanceName: podName,
|
||||
BarmanObjectKey: barmanObjectKey,
|
||||
InstanceName: podName,
|
||||
// TODO: improve
|
||||
PGDataPath: viper.GetString("pgdata"),
|
||||
PGWALPath: path.Join(viper.GetString("pgdata"), "pg_wal"),
|
||||
|
||||
@ -65,6 +65,9 @@ func (w WALServiceImplementation) Archive(
|
||||
ctx context.Context,
|
||||
request *wal.WALArchiveRequest,
|
||||
) (*wal.WALArchiveResult, error) {
|
||||
contextLogger := log.FromContext(ctx)
|
||||
contextLogger.Debug("starting wal archive")
|
||||
|
||||
var cluster cnpgv1.Cluster
|
||||
if err := w.Client.Get(ctx, w.ClusterObjectKey, &cluster); err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -62,8 +62,8 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration {
|
||||
return result
|
||||
}
|
||||
|
||||
// ValidateBarmanObjectName checks if the barmanObjectName is set
|
||||
func (p *PluginConfiguration) ValidateBarmanObjectName() error {
|
||||
// Validate checks if the barmanObjectName is set
|
||||
func (p *PluginConfiguration) Validate() error {
|
||||
err := NewConfigurationError()
|
||||
if len(p.BarmanObjectName) != 0 {
|
||||
return nil
|
||||
|
||||
@ -85,6 +85,12 @@ func (impl LifecycleImplementation) LifecycleHook(
|
||||
|
||||
pluginConfiguration := config.NewFromCluster(&cluster)
|
||||
|
||||
// barman object is required for both the archive and restore process
|
||||
if err := pluginConfiguration.Validate(); err != nil {
|
||||
contextLogger.Info("pluginConfiguration invalid, skipping lifecycle", "error", err)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch kind {
|
||||
case "Pod":
|
||||
contextLogger.Info("Reconciling pod")
|
||||
@ -160,10 +166,6 @@ func reconcilePod(
|
||||
pluginConfiguration *config.PluginConfiguration,
|
||||
) (*lifecycle.OperatorLifecycleResponse, error) {
|
||||
contextLogger := log.FromContext(ctx).WithName("lifecycle")
|
||||
if err := pluginConfiguration.ValidateBarmanObjectName(); err != nil {
|
||||
contextLogger.Info("no barman object name set, skipping pod sidecar injection")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
pod, err := decoder.DecodePodJSON(request.GetObjectDefinition())
|
||||
if err != nil {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user