feat: separate recovery object store from replica source (#83)

Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
Signed-off-by: Francesco Canovai <francesco.canovai@enterprisedb.com>
Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
Co-authored-by: Francesco Canovai <francesco.canovai@enterprisedb.com>
Co-authored-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
This commit is contained in:
Leonardo Cecchi 2024-12-09 13:29:20 +01:00 committed by GitHub
parent 56a163b46c
commit e4735a2f85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 344 additions and 475 deletions

View File

@ -24,26 +24,11 @@ import (
// InstanceSidecarConfiguration defines the configuration for the sidecar that runs in the instance pods.
type InstanceSidecarConfiguration struct {
// The expiration time of the cache entries not managed by the informers. Expressed in seconds.
// +optional
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=3600
// +kubebuilder:default=180
CacheTTL *int `json:"cacheTTL,omitempty"`
// The environment to be explicitly passed to the sidecar
// +optional
Env []corev1.EnvVar `json:"env,omitempty"`
}
// GetCacheTTL returns the cache TTL value, defaulting to 180 seconds if not set.
func (i InstanceSidecarConfiguration) GetCacheTTL() int {
if i.CacheTTL == nil {
return 180
}
return *i.CacheTTL
}
// ObjectStoreSpec defines the desired state of ObjectStore.
type ObjectStoreSpec struct {
Configuration barmanapi.BarmanObjectStoreConfiguration `json:"configuration"`

View File

@ -28,11 +28,6 @@ import (
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *InstanceSidecarConfiguration) DeepCopyInto(out *InstanceSidecarConfiguration) {
*out = *in
if in.CacheTTL != nil {
in, out := &in.CacheTTL, &out.CacheTTL
*out = new(int)
**out = **in
}
if in.Env != nil {
in, out := &in.Env, &out.Env
*out = make([]corev1.EnvVar, len(*in))

View File

@ -382,13 +382,6 @@ spec:
description: InstanceSidecarConfiguration defines the configuration
for the sidecar that runs in the instance pods.
properties:
cacheTTL:
default: 180
description: The expiration time of the cache entries not managed
by the informers. Expressed in seconds.
maximum: 3600
minimum: 0
type: integer
env:
description: The environment to be explicitly passed to the sidecar
items:

View File

@ -4,7 +4,7 @@ metadata:
name: cluster-example
spec:
instances: 3
imagePullPolicy: IfNotPresent
imagePullPolicy: Always
plugins:
- name: barman-cloud.cloudnative-pg.io
parameters:

2
go.mod
View File

@ -10,7 +10,7 @@ require (
github.com/cloudnative-pg/barman-cloud v0.0.0-20241105055149-ae6c2408bd14
github.com/cloudnative-pg/cloudnative-pg v1.24.1-0.20241113134512-8608232c2813
github.com/cloudnative-pg/cnpg-i v0.0.0-20241109002750-8abd359df734
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241129144432-bd94f16685d3
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241205093641-958e207b8afe
github.com/cloudnative-pg/machinery v0.0.0-20241105070525-042a028b767c
github.com/docker/docker v27.3.1+incompatible
github.com/onsi/ginkgo/v2 v2.21.0

4
go.sum
View File

@ -32,8 +32,8 @@ github.com/cloudnative-pg/cloudnative-pg v1.24.1-0.20241113134512-8608232c2813 h
github.com/cloudnative-pg/cloudnative-pg v1.24.1-0.20241113134512-8608232c2813/go.mod h1:f4hObdRVoQtMmVtWqZ6VDZBrI6ok9Td/UMhojQ+EPmk=
github.com/cloudnative-pg/cnpg-i v0.0.0-20241109002750-8abd359df734 h1:4jq/FUrlAKxu0Kw9PL5lj5Njq8pAnmUpP/kXKOrJAaE=
github.com/cloudnative-pg/cnpg-i v0.0.0-20241109002750-8abd359df734/go.mod h1:3U7miYasKr2rYCQzrn/IvbSQc0OpYF8ieZt2FKG4nv0=
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241129144432-bd94f16685d3 h1:hKTlmgyOq5ZS7t1eVa4SY1hH361gZ7VIb0an+BH9rJs=
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241129144432-bd94f16685d3/go.mod h1:X6r1fRuUEIAv4+5SSBY2RmQ201K6GcptOXgnmaX/8tY=
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241205093641-958e207b8afe h1:gUGqx4eTHreM0QWbszSx6wnbBw9Vavp5uYl4uA9fh1k=
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241205093641-958e207b8afe/go.mod h1:X6r1fRuUEIAv4+5SSBY2RmQ201K6GcptOXgnmaX/8tY=
github.com/cloudnative-pg/machinery v0.0.0-20241105070525-042a028b767c h1:t0RBU2gBiwJQ9XGeXlHPBYpsTscSKHgB5TfcWaiwanc=
github.com/cloudnative-pg/machinery v0.0.0-20241105070525-042a028b767c/go.mod h1:uBHGRIk5rt07mO4zjIC1uvGBWTH6PqIiD1PfpvPGZKU=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=

View File

@ -18,11 +18,8 @@ func NewCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, _ []string) error {
requiredSettings := []string{
"namespace",
"barman-object-name",
"cluster-name",
"pod-name",
"spool-directory",
"server-name",
}
for _, k := range requiredSettings {
@ -36,16 +33,9 @@ func NewCmd() *cobra.Command {
}
_ = viper.BindEnv("namespace", "NAMESPACE")
_ = viper.BindEnv("cluster-name", "CLUSTER_NAME")
_ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
_ = viper.BindEnv("server-name", "SERVER_NAME")
_ = viper.BindEnv("recovery-barman-object-name", "RECOVERY_BARMAN_OBJECT_NAME")
_ = viper.BindEnv("recovery-server-name", "RECOVERY_SERVER_NAME")
return cmd
}

View File

@ -20,14 +20,8 @@ func NewCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, _ []string) error {
requiredSettings := []string{
"namespace",
"cluster-name",
"pod-name",
"spool-directory",
// IMPORTANT: barman-object-name and server-name are not required
// to restore a cluster.
"recovery-barman-object-name",
"recovery-server-name",
}
for _, k := range requiredSettings {
@ -41,16 +35,9 @@ func NewCmd() *cobra.Command {
}
_ = viper.BindEnv("namespace", "NAMESPACE")
_ = viper.BindEnv("cluster-name", "CLUSTER_NAME")
_ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
_ = viper.BindEnv("server-name", "SERVER_NAME")
_ = viper.BindEnv("recovery-barman-object-name", "RECOVERY_BARMAN_OBJECT_NAME")
_ = viper.BindEnv("recovery-server-name", "RECOVERY_SERVER_NAME")
return cmd
}

View File

@ -0,0 +1,16 @@
package common
// walNotFoundError is raised when a WAL file has not been found in the object store
type walNotFoundError struct{}
func newWALNotFoundError() *walNotFoundError { return &walNotFoundError{} }
// ShouldPrintStackTrace tells whether the sidecar log stream should contain the stack trace
func (e walNotFoundError) ShouldPrintStackTrace() bool {
return false
}
// Error implements the error interface
func (e walNotFoundError) Error() string {
return "WAL file not found"
}

View File

@ -20,23 +20,17 @@ import (
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
)
// WALServiceImplementation is the implementation of the WAL Service
type WALServiceImplementation struct {
wal.UnimplementedWALServer
ClusterObjectKey client.ObjectKey
Client client.Client
InstanceName string
SpoolDirectory string
PGDataPath string
PGWALPath string
BarmanObjectKey client.ObjectKey
ServerName string
RecoveryBarmanObjectKey client.ObjectKey
RecoveryServerName string
Client client.Client
InstanceName string
SpoolDirectory string
PGDataPath string
PGWALPath string
}
// GetCapabilities implements the WALService interface
@ -72,13 +66,13 @@ func (w WALServiceImplementation) Archive(
contextLogger := log.FromContext(ctx)
contextLogger.Debug("starting wal archive")
var cluster cnpgv1.Cluster
if err := w.Client.Get(ctx, w.ClusterObjectKey, &cluster); err != nil {
configuration, err := config.NewFromClusterJSON(request.ClusterDefinition)
if err != nil {
return nil, err
}
var objectStore barmancloudv1.ObjectStore
if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil {
if err := w.Client.Get(ctx, configuration.GetBarmanObjectKey(), &objectStore); err != nil {
return nil, err
}
@ -106,7 +100,7 @@ func (w WALServiceImplementation) Archive(
return nil, err
}
options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, w.ServerName)
options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, configuration.ServerName)
if err != nil {
return nil, err
}
@ -127,12 +121,13 @@ func (w WALServiceImplementation) Restore(
ctx context.Context,
request *wal.WALRestoreRequest,
) (*wal.WALRestoreResult, error) {
// TODO: build full paths
contextLogger := log.FromContext(ctx)
walName := request.GetSourceWalName()
destinationPath := request.GetDestinationFileName()
var cluster cnpgv1.Cluster
if err := w.Client.Get(ctx, w.ClusterObjectKey, &cluster); err != nil {
configuration, err := config.NewFromClusterJSON(request.ClusterDefinition)
if err != nil {
return nil, err
}
@ -140,30 +135,35 @@ func (w WALServiceImplementation) Restore(
var serverName string
switch {
case cluster.IsReplica() && cluster.Status.CurrentPrimary == w.InstanceName:
// Designated primary on replica cluster, using recovery object store
serverName = w.RecoveryServerName
if err := w.Client.Get(ctx, w.RecoveryBarmanObjectKey, &objectStore); err != nil {
case configuration.Cluster.IsReplica() && configuration.Cluster.Status.CurrentPrimary == w.InstanceName:
// Designated primary on replica cluster, using replica source object store
serverName = configuration.ReplicaSourceServerName
if err := w.Client.Get(ctx, configuration.GetReplicaSourceBarmanObjectKey(), &objectStore); err != nil {
return nil, err
}
case cluster.Status.CurrentPrimary == "":
case configuration.Cluster.Status.CurrentPrimary == "":
// Recovery from object store, using recovery object store
serverName = w.RecoveryServerName
if err := w.Client.Get(ctx, w.RecoveryBarmanObjectKey, &objectStore); err != nil {
serverName = configuration.RecoveryServerName
if err := w.Client.Get(ctx, configuration.GetRecoveryBarmanObjectKey(), &objectStore); err != nil {
return nil, err
}
default:
// Using cluster object store
serverName = w.ServerName
if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil {
serverName = configuration.ServerName
if err := w.Client.Get(ctx, configuration.GetBarmanObjectKey(), &objectStore); err != nil {
return nil, err
}
}
contextLogger.Info(
"Restoring WAL file",
"objectStore", objectStore.Name,
"serverName", serverName,
"walName", walName)
return &wal.WALRestoreResult{}, w.restoreFromBarmanObjectStore(
ctx, &cluster, &objectStore, serverName, walName, destinationPath)
ctx, configuration.Cluster, &objectStore, serverName, walName, destinationPath)
}
func (w WALServiceImplementation) restoreFromBarmanObjectStore(
@ -246,6 +246,10 @@ func (w WALServiceImplementation) restoreFromBarmanObjectStore(
// is the one that PostgreSQL has requested to restore.
// The failure has already been logged in walRestorer.RestoreList method
if walStatus[0].Err != nil {
if errors.Is(walStatus[0].Err, barmanRestorer.ErrWALNotFound) {
return newWALNotFoundError()
}
return walStatus[0].Err
}

View File

@ -10,7 +10,6 @@ import (
barmanBackup "github.com/cloudnative-pg/barman-cloud/pkg/backup"
barmanCapabilities "github.com/cloudnative-pg/barman-cloud/pkg/capabilities"
barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/pkg/postgres"
"github.com/cloudnative-pg/cnpg-i/pkg/backup"
"github.com/cloudnative-pg/machinery/pkg/fileutils"
@ -22,16 +21,14 @@ import (
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/common"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
)
// BackupServiceImplementation is the implementation
// of the Backup CNPG capability
type BackupServiceImplementation struct {
BarmanObjectKey client.ObjectKey
ClusterObjectKey client.ObjectKey
Client client.Client
InstanceName string
ServerName string
Client client.Client
InstanceName string
backup.UnimplementedBackupServer
}
@ -65,20 +62,20 @@ func (b BackupServiceImplementation) GetCapabilities(
// Backup implements the Backup interface
func (b BackupServiceImplementation) Backup(
ctx context.Context,
_ *backup.BackupRequest,
request *backup.BackupRequest,
) (*backup.BackupResult, error) {
contextLogger := log.FromContext(ctx)
contextLogger.Info("Starting backup")
var cluster cnpgv1.Cluster
if err := b.Client.Get(ctx, b.ClusterObjectKey, &cluster); err != nil {
configuration, err := config.NewFromClusterJSON(request.ClusterDefinition)
if err != nil {
return nil, err
}
var objectStore barmancloudv1.ObjectStore
if err := b.Client.Get(ctx, b.BarmanObjectKey, &objectStore); err != nil {
contextLogger.Error(err, "while getting object store", "key", b.BarmanObjectKey)
if err := b.Client.Get(ctx, configuration.GetBarmanObjectKey(), &objectStore); err != nil {
contextLogger.Error(err, "while getting object store", "key", configuration.GetRecoveryBarmanObjectKey())
return nil, err
}
@ -117,7 +114,7 @@ func (b BackupServiceImplementation) Backup(
if err = backupCmd.Take(
ctx,
backupName,
b.ServerName,
configuration.ServerName,
env,
barmanCloudExecutor{},
postgres.BackupTemporaryDirectory,
@ -129,7 +126,7 @@ func (b BackupServiceImplementation) Backup(
executedBackupInfo, err := backupCmd.GetExecutedBackupInfo(
ctx,
backupName,
b.ServerName,
configuration.ServerName,
barmanCloudExecutor{},
env)
if err != nil {

View File

@ -2,20 +2,17 @@ package instance
import (
"context"
"fmt"
"github.com/cloudnative-pg/cnpg-i/pkg/identity"
"sigs.k8s.io/controller-runtime/pkg/client"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
)
// IdentityImplementation implements IdentityServer
type IdentityImplementation struct {
identity.UnimplementedIdentityServer
BarmanObjectKey client.ObjectKey
Client client.Client
Client client.Client
}
// GetPluginMetadata implements IdentityServer
@ -53,14 +50,9 @@ func (i IdentityImplementation) GetPluginCapabilities(
// Probe implements IdentityServer
func (i IdentityImplementation) Probe(
ctx context.Context,
_ context.Context,
_ *identity.ProbeRequest,
) (*identity.ProbeResponse, error) {
var obj barmancloudv1.ObjectStore
if err := i.Client.Get(ctx, i.BarmanObjectKey, &obj); err != nil {
return nil, fmt.Errorf("while fetching object store %s: %w", i.BarmanObjectKey.Name, err)
}
return &identity.ProbeResponse{
Ready: true,
}, nil

View File

@ -3,61 +3,56 @@ package client
import (
"context"
"fmt"
"math"
"reflect"
"sync"
"time"
"github.com/cloudnative-pg/machinery/pkg/log"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
v1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
)
type cachedSecret struct {
secret *corev1.Secret
// DefaultTTLSeconds is the default TTL in seconds of cache entries
const DefaultTTLSeconds = 10
type cachedEntry struct {
entry client.Object
fetchUnixTime int64
ttl time.Duration
}
func (e *cachedEntry) isExpired() bool {
return time.Now().Unix()-e.fetchUnixTime > int64(e.ttl)
}
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
type ExtendedClient struct {
client.Client
barmanObjectKeys []client.ObjectKey
cachedSecrets []*cachedSecret
mux *sync.Mutex
ttl int
cachedObjects []cachedEntry
mux *sync.Mutex
}
// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation
func NewExtendedClient(
baseClient client.Client,
objectStoreKeys []client.ObjectKey,
) client.Client {
return &ExtendedClient{
Client: baseClient,
barmanObjectKeys: objectStoreKeys,
mux: &sync.Mutex{},
Client: baseClient,
mux: &sync.Mutex{},
}
}
func (e *ExtendedClient) refreshTTL(ctx context.Context) error {
minTTL := math.MaxInt
for _, key := range e.barmanObjectKeys {
var object v1.ObjectStore
if err := e.Get(ctx, key, &object); err != nil {
return fmt.Errorf("failed to get the object store while refreshing the TTL parameter: %w", err)
}
currentTTL := object.Spec.InstanceSidecarConfiguration.GetCacheTTL()
if currentTTL < minTTL {
minTTL = currentTTL
}
func (e *ExtendedClient) isObjectCached(obj client.Object) bool {
if _, isSecret := obj.(*corev1.Secret); isSecret {
return true
}
e.ttl = minTTL
return nil
if _, isObjectStore := obj.(*corev1.Secret); isObjectStore {
return true
}
return false
}
// Get behaves like the original Get method, but uses a cache for secrets
@ -66,85 +61,88 @@ func (e *ExtendedClient) Get(
key client.ObjectKey,
obj client.Object,
opts ...client.GetOption,
) error {
if !e.isObjectCached(obj) {
return e.Client.Get(ctx, key, obj, opts...)
}
return e.getCachedObject(ctx, key, obj, opts...)
}
func (e *ExtendedClient) getCachedObject(
ctx context.Context,
key client.ObjectKey,
obj client.Object,
opts ...client.GetOption,
) error {
contextLogger := log.FromContext(ctx).
WithName("extended_client").
WithValues("name", key.Name, "namespace", key.Namespace)
if _, ok := obj.(*corev1.Secret); !ok {
contextLogger.Trace("not a secret, skipping")
return e.Client.Get(ctx, key, obj, opts...)
}
if err := e.refreshTTL(ctx); err != nil {
return err
}
if e.isCacheDisabled() {
contextLogger.Trace("cache is disabled")
return e.Client.Get(ctx, key, obj, opts...)
}
contextLogger.Trace("locking the cache")
e.mux.Lock()
defer e.mux.Unlock()
expiredSecretIndex := -1
// check if in cache
for idx, cache := range e.cachedSecrets {
if cache.secret.Namespace != key.Namespace || cache.secret.Name != key.Name {
expiredObjectIndex := -1
for idx, cacheEntry := range e.cachedObjects {
if cacheEntry.entry.GetNamespace() != key.Namespace || cacheEntry.entry.GetName() != key.Name {
continue
}
if e.isExpired(cache.fetchUnixTime) {
contextLogger.Trace("secret found, but it is expired")
expiredSecretIndex = idx
if cacheEntry.entry.GetObjectKind().GroupVersionKind() != obj.GetObjectKind().GroupVersionKind() {
continue
}
if cacheEntry.isExpired() {
contextLogger.Trace("expired object found")
expiredObjectIndex = idx
break
}
contextLogger.Debug("secret found, loading it from cache")
cache.secret.DeepCopyInto(obj.(*corev1.Secret))
contextLogger.Debug("object found, loading it from cache")
// Yes, this is a terrible hack, but that's exactly the way
// controller-runtime works.
// https://github.com/kubernetes-sigs/controller-runtime/blob/
// 717b32aede14c921d239cf1b974a11e718949865/pkg/cache/internal/cache_reader.go#L92
outVal := reflect.ValueOf(obj)
objVal := reflect.ValueOf(cacheEntry.entry)
if !objVal.Type().AssignableTo(outVal.Type()) {
return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type())
}
reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
return nil
}
if err := e.Client.Get(ctx, key, obj); err != nil {
if err := e.Client.Get(ctx, key, obj, opts...); err != nil {
return err
}
cs := &cachedSecret{
secret: obj.(*corev1.Secret).DeepCopy(),
cs := cachedEntry{
entry: obj.(runtime.Object).DeepCopyObject().(client.Object),
fetchUnixTime: time.Now().Unix(),
}
contextLogger.Debug("setting secret in the cache")
if expiredSecretIndex != -1 {
e.cachedSecrets[expiredSecretIndex] = cs
contextLogger.Debug("setting object in the cache")
if expiredObjectIndex != -1 {
e.cachedObjects[expiredObjectIndex] = cs
} else {
e.cachedSecrets = append(e.cachedSecrets, cs)
e.cachedObjects = append(e.cachedObjects, cs)
}
return nil
}
func (e *ExtendedClient) isExpired(unixTime int64) bool {
return time.Now().Unix()-unixTime > int64(e.ttl)
}
func (e *ExtendedClient) isCacheDisabled() bool {
const noCache = 0
return e.ttl == noCache
}
// RemoveSecret ensures that a secret is not present in the cache
func (e *ExtendedClient) RemoveSecret(key client.ObjectKey) {
if e.isCacheDisabled() {
return
}
// removeObject ensures that a object is not present in the cache
func (e *ExtendedClient) removeObject(object client.Object) {
e.mux.Lock()
defer e.mux.Unlock()
for i, cache := range e.cachedSecrets {
if cache.secret.Namespace == key.Namespace && cache.secret.Name == key.Name {
e.cachedSecrets = append(e.cachedSecrets[:i], e.cachedSecrets[i+1:]...)
for i, cache := range e.cachedObjects {
if cache.entry.GetNamespace() == object.GetNamespace() &&
cache.entry.GetName() == object.GetName() &&
cache.entry.GetObjectKind().GroupVersionKind() != object.GetObjectKind().GroupVersionKind() {
e.cachedObjects = append(e.cachedObjects[:i], e.cachedObjects[i+1:]...)
return
}
}
@ -156,16 +154,10 @@ func (e *ExtendedClient) Update(
obj client.Object,
opts ...client.UpdateOption,
) error {
if e.isCacheDisabled() {
return e.Client.Update(ctx, obj, opts...)
if e.isObjectCached(obj) {
e.removeObject(obj)
}
if _, ok := obj.(*corev1.Secret); !ok {
return e.Client.Update(ctx, obj, opts...)
}
e.RemoveSecret(client.ObjectKeyFromObject(obj))
return e.Client.Update(ctx, obj, opts...)
}
@ -175,16 +167,10 @@ func (e *ExtendedClient) Delete(
obj client.Object,
opts ...client.DeleteOption,
) error {
if e.isCacheDisabled() {
return e.Client.Delete(ctx, obj, opts...)
if e.isObjectCached(obj) {
e.removeObject(obj)
}
if _, ok := obj.(*corev1.Secret); !ok {
return e.Client.Delete(ctx, obj, opts...)
}
e.RemoveSecret(client.ObjectKeyFromObject(obj))
return e.Client.Delete(ctx, obj, opts...)
}
@ -195,15 +181,9 @@ func (e *ExtendedClient) Patch(
patch client.Patch,
opts ...client.PatchOption,
) error {
if e.isCacheDisabled() {
return e.Client.Patch(ctx, obj, patch, opts...)
if e.isObjectCached(obj) {
e.removeObject(obj)
}
if _, ok := obj.(*corev1.Secret); !ok {
return e.Client.Patch(ctx, obj, patch, opts...)
}
e.RemoveSecret(client.ObjectKeyFromObject(obj))
return e.Client.Patch(ctx, obj, patch, opts...)
}

View File

@ -6,7 +6,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
@ -45,19 +44,13 @@ var _ = Describe("ExtendedClient Get", func() {
Namespace: "default",
Name: "test-object-store",
},
Spec: v1.ObjectStoreSpec{
InstanceSidecarConfiguration: v1.InstanceSidecarConfiguration{
CacheTTL: ptr.To(60),
},
},
Spec: v1.ObjectStoreSpec{},
}
baseClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(secretInClient, objectStore).Build()
extendedClient = NewExtendedClient(baseClient, []client.ObjectKey{
client.ObjectKeyFromObject(objectStore),
}).(*ExtendedClient)
extendedClient = NewExtendedClient(baseClient).(*ExtendedClient)
})
It("returns secret from cache if not expired", func(ctx SpecContext) {
@ -70,22 +63,22 @@ var _ = Describe("ExtendedClient Get", func() {
// manually add the secret to the cache, this is not present in the fake client so we are sure it is from the
// cache
extendedClient.cachedSecrets = []*cachedSecret{
extendedClient.cachedObjects = []cachedEntry{
{
secret: secretNotInClient,
entry: secretNotInClient,
fetchUnixTime: time.Now().Unix(),
},
}
err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretNotInClient), secretInClient)
Expect(err).NotTo(HaveOccurred())
Expect(secretNotInClient).To(Equal(extendedClient.cachedSecrets[0].secret))
Expect(secretNotInClient).To(Equal(extendedClient.cachedObjects[0].entry))
})
It("fetches secret from base client if cache is expired", func(ctx SpecContext) {
extendedClient.cachedSecrets = []*cachedSecret{
extendedClient.cachedObjects = []cachedEntry{
{
secret: secretInClient.DeepCopy(),
entry: secretInClient.DeepCopy(),
fetchUnixTime: time.Now().Add(-2 * time.Minute).Unix(),
},
}
@ -111,6 +104,6 @@ var _ = Describe("ExtendedClient Get", func() {
err = extendedClient.Get(ctx, client.ObjectKeyFromObject(configMap), configMap)
Expect(err).NotTo(HaveOccurred())
Expect(extendedClient.cachedSecrets).To(BeEmpty())
Expect(extendedClient.cachedObjects).To(BeEmpty())
})
})

View File

@ -7,13 +7,10 @@ import (
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
@ -33,89 +30,35 @@ func init() {
func Start(ctx context.Context) error {
setupLog := log.FromContext(ctx)
setupLog.Info("Starting barman cloud instance plugin")
namespace := viper.GetString("namespace")
clusterName := viper.GetString("cluster-name")
podName := viper.GetString("pod-name")
barmanObjectName := viper.GetString("barman-object-name")
recoveryBarmanObjectName := viper.GetString("recovery-barman-object-name")
controllerOptions := ctrl.Options{
Scheme: scheme,
Cache: cache.Options{
ByObject: map[client.Object]cache.ByObject{
&cnpgv1.Cluster{}: {
Field: fields.OneTermEqualSelector("metadata.name", clusterName),
Namespaces: map[string]cache.Config{
namespace: {},
},
},
},
},
Client: client.Options{
Cache: &client.CacheOptions{
DisableFor: []client.Object{
&corev1.Secret{},
&barmancloudv1.ObjectStore{},
&cnpgv1.Cluster{},
},
},
},
}
if len(recoveryBarmanObjectName) == 0 {
controllerOptions.Cache.ByObject[&barmancloudv1.ObjectStore{}] = cache.ByObject{
Field: fields.OneTermEqualSelector("metadata.name", barmanObjectName),
Namespaces: map[string]cache.Config{
namespace: {},
},
}
} else {
controllerOptions.Client.Cache.DisableFor = append(
controllerOptions.Client.Cache.DisableFor,
&barmancloudv1.ObjectStore{},
)
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), controllerOptions)
if err != nil {
setupLog.Error(err, "unable to start manager")
return err
}
barmanObjectKey := client.ObjectKey{
Namespace: namespace,
Name: barmanObjectName,
}
recoveryBarmanObjectKey := client.ObjectKey{
Namespace: namespace,
Name: recoveryBarmanObjectName,
}
involvedObjectStores := make([]types.NamespacedName, 0, 2)
if len(barmanObjectName) > 0 {
involvedObjectStores = append(involvedObjectStores, barmanObjectKey)
}
if len(recoveryBarmanObjectName) > 0 {
involvedObjectStores = append(involvedObjectStores, recoveryBarmanObjectKey)
}
if err := mgr.Add(&CNPGI{
Client: extendedclient.NewExtendedClient(mgr.GetClient(), involvedObjectStores),
ClusterObjectKey: client.ObjectKey{
Namespace: namespace,
Name: clusterName,
},
Client: extendedclient.NewExtendedClient(mgr.GetClient()),
InstanceName: podName,
// TODO: improve
PGDataPath: viper.GetString("pgdata"),
PGWALPath: path.Join(viper.GetString("pgdata"), "pg_wal"),
SpoolDirectory: viper.GetString("spool-directory"),
PluginPath: viper.GetString("plugin-path"),
BarmanObjectKey: barmanObjectKey,
ServerName: viper.GetString("server-name"),
RecoveryBarmanObjectKey: recoveryBarmanObjectKey,
RecoveryServerName: viper.GetString("recovery-server-name"),
}); err != nil {
setupLog.Error(err, "unable to create CNPGI runnable")
return err

View File

@ -14,45 +14,28 @@ import (
// CNPGI is the implementation of the PostgreSQL sidecar
type CNPGI struct {
Client client.Client
ClusterObjectKey client.ObjectKey
PGDataPath string
PGWALPath string
SpoolDirectory string
Client client.Client
PGDataPath string
PGWALPath string
SpoolDirectory string
// mutually exclusive with serverAddress
PluginPath string
InstanceName string
BarmanObjectKey client.ObjectKey
ServerName string
RecoveryBarmanObjectKey client.ObjectKey
RecoveryServerName string
}
// Start starts the GRPC service
func (c *CNPGI) Start(ctx context.Context) error {
enrich := func(server *grpc.Server) error {
wal.RegisterWALServer(server, common.WALServiceImplementation{
ClusterObjectKey: c.ClusterObjectKey,
InstanceName: c.InstanceName,
Client: c.Client,
SpoolDirectory: c.SpoolDirectory,
PGDataPath: c.PGDataPath,
PGWALPath: c.PGWALPath,
BarmanObjectKey: c.BarmanObjectKey,
ServerName: c.ServerName,
RecoveryBarmanObjectKey: c.RecoveryBarmanObjectKey,
RecoveryServerName: c.RecoveryServerName,
InstanceName: c.InstanceName,
Client: c.Client,
SpoolDirectory: c.SpoolDirectory,
PGDataPath: c.PGDataPath,
PGWALPath: c.PGWALPath,
})
backup.RegisterBackupServer(server, BackupServiceImplementation{
Client: c.Client,
BarmanObjectKey: c.BarmanObjectKey,
ServerName: c.ServerName,
ClusterObjectKey: c.ClusterObjectKey,
InstanceName: c.InstanceName,
Client: c.Client,
InstanceName: c.InstanceName,
})
common.AddHealthCheck(server)
return nil
@ -60,8 +43,7 @@ func (c *CNPGI) Start(ctx context.Context) error {
srv := http.Server{
IdentityImpl: IdentityImplementation{
Client: c.Client,
BarmanObjectKey: c.BarmanObjectKey,
Client: c.Client,
},
Enrichers: []http.ServerEnricher{enrich},
PluginPath: c.PluginPath,

View File

@ -4,6 +4,9 @@ import (
"strings"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
)
@ -44,10 +47,77 @@ func (e *ConfigurationError) IsEmpty() bool {
// PluginConfiguration is the configuration of the plugin
type PluginConfiguration struct {
BarmanObjectName string
ServerName string
Cluster *cnpgv1.Cluster
BarmanObjectName string
ServerName string
RecoveryBarmanObjectName string
RecoveryServerName string
ReplicaSourceBarmanObjectName string
ReplicaSourceServerName string
}
// GetBarmanObjectKey gets the namespaced name of the barman object
func (config *PluginConfiguration) GetBarmanObjectKey() types.NamespacedName {
return types.NamespacedName{
Namespace: config.Cluster.Namespace,
Name: config.BarmanObjectName,
}
}
// GetRecoveryBarmanObjectKey gets the namespaced name of the recovery barman object
func (config *PluginConfiguration) GetRecoveryBarmanObjectKey() types.NamespacedName {
return types.NamespacedName{
Namespace: config.Cluster.Namespace,
Name: config.RecoveryBarmanObjectName,
}
}
// GetReplicaSourceBarmanObjectKey gets the namespaced name of the replica source barman object
func (config *PluginConfiguration) GetReplicaSourceBarmanObjectKey() types.NamespacedName {
return types.NamespacedName{
Namespace: config.Cluster.Namespace,
Name: config.ReplicaSourceBarmanObjectName,
}
}
// GetReferredBarmanObjectsKey gets the list of barman objects referred by this
// plugin configuration
func (config *PluginConfiguration) GetReferredBarmanObjectsKey() []types.NamespacedName {
result := make([]types.NamespacedName, 0, 3)
if len(config.BarmanObjectName) > 0 {
result = append(result, config.GetBarmanObjectKey())
}
if len(config.RecoveryBarmanObjectName) > 0 {
result = append(result, config.GetRecoveryBarmanObjectKey())
}
if len(config.ReplicaSourceBarmanObjectName) > 0 {
result = append(result, config.GetReplicaSourceBarmanObjectKey())
}
return result
}
func getClusterGVK() schema.GroupVersionKind {
return schema.GroupVersionKind{
Group: cnpgv1.GroupVersion.Group,
Version: cnpgv1.GroupVersion.Version,
Kind: cnpgv1.ClusterKind,
}
}
// NewFromClusterJSON decodes a JSON representation of a cluster.
func NewFromClusterJSON(clusterJSON []byte) (*PluginConfiguration, error) {
var result cnpgv1.Cluster
if err := decoder.DecodeObject(clusterJSON, &result, getClusterGVK()); err != nil {
return nil, err
}
return NewFromCluster(&result), nil
}
// NewFromCluster extracts the configuration from the cluster
@ -68,7 +138,6 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration {
recoveryServerName := ""
recoveryBarmanObjectName := ""
if recoveryParameters := getRecoveryParameters(cluster); recoveryParameters != nil {
recoveryBarmanObjectName = recoveryParameters["barmanObjectName"]
recoveryServerName = recoveryParameters["serverName"]
@ -77,20 +146,34 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration {
}
}
replicaSourceServerName := ""
replicaSourceBarmanObjectName := ""
if replicaSourceParameters := getReplicaSourceParameters(cluster); replicaSourceParameters != nil {
replicaSourceBarmanObjectName = replicaSourceParameters["barmanObjectName"]
replicaSourceServerName = replicaSourceParameters["serverName"]
if len(replicaSourceServerName) == 0 {
replicaSourceServerName = cluster.Name
}
}
result := &PluginConfiguration{
Cluster: cluster,
// used for the backup/archive
BarmanObjectName: helper.Parameters["barmanObjectName"],
ServerName: serverName,
// used for restore/wal_restore
// used for restore and wal_restore during backup recovery
RecoveryServerName: recoveryServerName,
RecoveryBarmanObjectName: recoveryBarmanObjectName,
// used for wal_restore in the designed primary of a replica cluster
ReplicaSourceServerName: replicaSourceServerName,
ReplicaSourceBarmanObjectName: replicaSourceBarmanObjectName,
}
return result
}
func getRecoveryParameters(cluster *cnpgv1.Cluster) map[string]string {
recoveryPluginConfiguration := cluster.GetRecoverySourcePlugin()
recoveryPluginConfiguration := getRecoverySourcePlugin(cluster)
if recoveryPluginConfiguration == nil {
return nil
}
@ -102,11 +185,67 @@ func getRecoveryParameters(cluster *cnpgv1.Cluster) map[string]string {
return recoveryPluginConfiguration.Parameters
}
func getReplicaSourceParameters(cluster *cnpgv1.Cluster) map[string]string {
replicaSourcePluginConfiguration := getReplicaSourcePlugin(cluster)
if replicaSourcePluginConfiguration == nil {
return nil
}
if replicaSourcePluginConfiguration.Name != metadata.PluginName {
return nil
}
return replicaSourcePluginConfiguration.Parameters
}
// getRecoverySourcePlugin returns the configuration of the plugin being
// the recovery source of the cluster. If no such plugin have been configured,
// nil is returned
func getRecoverySourcePlugin(cluster *cnpgv1.Cluster) *cnpgv1.PluginConfiguration {
if cluster.Spec.Bootstrap == nil || cluster.Spec.Bootstrap.Recovery == nil {
return nil
}
recoveryConfig := cluster.Spec.Bootstrap.Recovery
if len(recoveryConfig.Source) == 0 {
// Plugin-based recovery is supported only with
// An external cluster definition
return nil
}
recoveryExternalCluster, found := cluster.ExternalCluster(recoveryConfig.Source)
if !found {
// This error should have already been detected
// by the validating webhook.
return nil
}
return recoveryExternalCluster.PluginConfiguration
}
// getRecoverySourcePlugin returns the configuration of the plugin being
// the recovery source of the cluster. If no such plugin have been configured,
// nil is returned
func getReplicaSourcePlugin(cluster *cnpgv1.Cluster) *cnpgv1.PluginConfiguration {
if cluster.Spec.ReplicaCluster == nil || len(cluster.Spec.ReplicaCluster.Source) == 0 {
return nil
}
recoveryExternalCluster, found := cluster.ExternalCluster(cluster.Spec.ReplicaCluster.Source)
if !found {
// This error should have already been detected
// by the validating webhook.
return nil
}
return recoveryExternalCluster.PluginConfiguration
}
// Validate checks if the barmanObjectName is set
func (p *PluginConfiguration) Validate() error {
func (config *PluginConfiguration) Validate() error {
err := NewConfigurationError()
if len(p.BarmanObjectName) == 0 && len(p.RecoveryBarmanObjectName) == 0 {
if len(config.BarmanObjectName) == 0 && len(config.RecoveryBarmanObjectName) == 0 {
return err.WithMessage("no reference to barmanObjectName have been included")
}

View File

@ -168,14 +168,13 @@ func (impl LifecycleImplementation) reconcileJob(
return nil, nil
}
return reconcileJob(ctx, cluster, request, pluginConfiguration, env)
return reconcileJob(ctx, cluster, request, env)
}
func reconcileJob(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
env []corev1.EnvVar,
) (*lifecycle.OperatorLifecycleResponse, error) {
contextLogger := log.FromContext(ctx).WithName("lifecycle")
@ -206,7 +205,6 @@ func reconcileJob(
mutatedJob := job.DeepCopy()
if err := reconcilePodSpec(
pluginConfiguration,
cluster,
&mutatedJob.Spec.Template.Spec,
"full-recovery",
@ -262,7 +260,6 @@ func reconcilePod(
if len(pluginConfiguration.BarmanObjectName) != 0 {
if err := reconcilePodSpec(
pluginConfiguration,
cluster,
&mutatedPod.Spec,
"postgres",
@ -289,7 +286,6 @@ func reconcilePod(
}
func reconcilePodSpec(
cfg *config.PluginConfiguration,
cluster *cnpgv1.Cluster,
spec *corev1.PodSpec,
mainContainerName string,
@ -313,32 +309,6 @@ func reconcilePodSpec(
},
}
if len(cfg.BarmanObjectName) > 0 {
envs = append(envs,
corev1.EnvVar{
Name: "BARMAN_OBJECT_NAME",
Value: cfg.BarmanObjectName,
},
corev1.EnvVar{
Name: "SERVER_NAME",
Value: cfg.ServerName,
},
)
}
if len(cfg.RecoveryBarmanObjectName) > 0 {
envs = append(envs,
corev1.EnvVar{
Name: "RECOVERY_BARMAN_OBJECT_NAME",
Value: cfg.RecoveryBarmanObjectName,
},
corev1.EnvVar{
Name: "RECOVERY_SERVER_NAME",
Value: cfg.RecoveryServerName,
},
)
}
envs = append(envs, additionalEnvs...)
baseProbe := &corev1.Probe{

View File

@ -107,7 +107,7 @@ var _ = Describe("LifecycleImplementation", func() {
ObjectDefinition: jobJSON,
}
response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil)
response, err := reconcileJob(ctx, cluster, request, nil)
Expect(err).NotTo(HaveOccurred())
Expect(response).NotTo(BeNil())
Expect(response.JsonPatch).NotTo(BeEmpty())
@ -128,7 +128,7 @@ var _ = Describe("LifecycleImplementation", func() {
ObjectDefinition: jobJSON,
}
response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil)
response, err := reconcileJob(ctx, cluster, request, nil)
Expect(err).NotTo(HaveOccurred())
Expect(response).To(BeNil())
})
@ -138,7 +138,7 @@ var _ = Describe("LifecycleImplementation", func() {
ObjectDefinition: []byte("invalid-json"),
}
response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil)
response, err := reconcileJob(ctx, cluster, request, nil)
Expect(err).To(HaveOccurred())
Expect(response).To(BeNil())
})
@ -165,7 +165,7 @@ var _ = Describe("LifecycleImplementation", func() {
ObjectDefinition: jobJSON,
}
response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil)
response, err := reconcileJob(ctx, cluster, request, nil)
Expect(err).NotTo(HaveOccurred())
Expect(response).To(BeNil())
})

View File

@ -75,14 +75,10 @@ func (r ReconcilerImplementation) Pre(
contextLogger.Debug("parsing barman object configuration")
var barmanObjects []barmancloudv1.ObjectStore
if pluginConfiguration.BarmanObjectName != "" {
barmanObjects := make([]barmancloudv1.ObjectStore, 0, len(pluginConfiguration.GetReferredBarmanObjectsKey()))
for _, barmanObjectKey := range pluginConfiguration.GetReferredBarmanObjectsKey() {
var barmanObject barmancloudv1.ObjectStore
if err := r.Client.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: pluginConfiguration.BarmanObjectName,
}, &barmanObject); err != nil {
if err := r.Client.Get(ctx, barmanObjectKey, &barmanObject); err != nil {
if apierrs.IsNotFound(err) {
contextLogger.Info(
"barman object configuration not found, requeuing",
@ -99,30 +95,7 @@ func (r ReconcilerImplementation) Pre(
barmanObjects = append(barmanObjects, barmanObject)
}
if pluginConfiguration.RecoveryBarmanObjectName != "" {
var barmanObject barmancloudv1.ObjectStore
if err := r.Client.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: pluginConfiguration.RecoveryBarmanObjectName,
}, &barmanObject); err != nil {
if apierrs.IsNotFound(err) {
contextLogger.Info(
"barman recovery object configuration not found, requeuing",
"name", pluginConfiguration.RecoveryBarmanObjectName,
"namespace", cluster.Namespace,
)
return &reconciler.ReconcilerHooksResult{
Behavior: reconciler.ReconcilerHooksResult_BEHAVIOR_REQUEUE,
}, nil
}
return nil, err
}
barmanObjects = append(barmanObjects, barmanObject)
}
var additionalSecretNames []string
if err := r.ensureRole(ctx, &cluster, barmanObjects, additionalSecretNames); err != nil {
if err := r.ensureRole(ctx, &cluster, barmanObjects); err != nil {
return nil, err
}
@ -150,10 +123,9 @@ func (r ReconcilerImplementation) ensureRole(
ctx context.Context,
cluster *cnpgv1.Cluster,
barmanObjects []barmancloudv1.ObjectStore,
additionalSecretNames []string,
) error {
contextLogger := log.FromContext(ctx)
newRole := specs.BuildRole(cluster, barmanObjects, additionalSecretNames)
newRole := specs.BuildRole(cluster, barmanObjects)
var role rbacv1.Role
if err := r.Client.Get(ctx, client.ObjectKey{

View File

@ -15,7 +15,6 @@ import (
func BuildRole(
cluster *cnpgv1.Cluster,
barmanObjects []barmancloudv1.ObjectStore,
additionalSecretNames []string,
) *rbacv1.Role {
role := &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
@ -36,10 +35,6 @@ func BuildRole(
}
}
for _, secret := range additionalSecretNames {
secretsSet.Put(secret)
}
role.Rules = append(role.Rules, rbacv1.PolicyRule{
APIGroups: []string{
"barmancloud.cnpg.io",

View File

@ -33,12 +33,6 @@ func Start(ctx context.Context) error {
namespace := viper.GetString("namespace")
clusterName := viper.GetString("cluster-name")
recoveryBarmanObjectName := viper.GetString("recovery-barman-object-name")
recoveryServerName := viper.GetString("recovery-server-name")
barmanObjectName := viper.GetString("barman-object-name")
serverName := viper.GetString("server-name")
objs := map[client.Object]cache.ByObject{
&cnpgv1.Cluster{}: {
Field: fields.OneTermEqualSelector("metadata.name", clusterName),
@ -48,15 +42,6 @@ func Start(ctx context.Context) error {
},
}
if recoveryBarmanObjectName != "" {
objs[&barmancloudv1.ObjectStore{}] = cache.ByObject{
Field: fields.OneTermEqualSelector("metadata.name", recoveryBarmanObjectName),
Namespaces: map[string]cache.Config{
namespace: {},
},
}
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Cache: cache.Options{
@ -79,25 +64,9 @@ func Start(ctx context.Context) error {
if err := mgr.Add(&CNPGI{
PluginPath: viper.GetString("plugin-path"),
SpoolDirectory: viper.GetString("spool-directory"),
ClusterObjectKey: client.ObjectKey{
Namespace: namespace,
Name: clusterName,
},
Client: mgr.GetClient(),
PGDataPath: viper.GetString("pgdata"),
InstanceName: viper.GetString("pod-name"),
ServerName: serverName,
BarmanObjectKey: client.ObjectKey{
Namespace: namespace,
Name: barmanObjectName,
},
RecoveryServerName: recoveryServerName,
RecoveryBarmanObjectKey: client.ObjectKey{
Namespace: namespace,
Name: recoveryBarmanObjectName,
},
Client: mgr.GetClient(),
PGDataPath: viper.GetString("pgdata"),
InstanceName: viper.GetString("pod-name"),
}); err != nil {
setupLog.Error(err, "unable to create CNPGI runnable")
return err

View File

@ -18,17 +18,16 @@ import (
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/pkg/postgres"
"github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder"
restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job"
"github.com/cloudnative-pg/machinery/pkg/execlog"
"github.com/cloudnative-pg/machinery/pkg/fileutils"
"github.com/cloudnative-pg/machinery/pkg/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
)
const (
@ -44,14 +43,7 @@ const (
type JobHookImpl struct {
restore.UnimplementedRestoreJobHooksServer
Client client.Client
ClusterObjectKey client.ObjectKey
BarmanObjectKey types.NamespacedName
ServerName string
RecoveryBarmanObjectKey types.NamespacedName
RecoveryServerName string
Client client.Client
SpoolDirectory string
PgDataPath string
@ -78,27 +70,24 @@ func (impl JobHookImpl) Restore(
req *restore.RestoreRequest,
) (*restore.RestoreResponse, error) {
contextLogger := log.FromContext(ctx)
var cluster cnpgv1.Cluster
if err := decoder.DecodeObject(
req.GetClusterDefinition(),
&cluster,
cnpgv1.GroupVersion.WithKind("Cluster"),
); err != nil {
configuration, err := config.NewFromClusterJSON(req.ClusterDefinition)
if err != nil {
return nil, err
}
var recoveryObjectStore barmancloudv1.ObjectStore
if err := impl.Client.Get(ctx, impl.RecoveryBarmanObjectKey, &recoveryObjectStore); err != nil {
if err := impl.Client.Get(ctx, configuration.GetRecoveryBarmanObjectKey(), &recoveryObjectStore); err != nil {
return nil, err
}
if impl.BarmanObjectKey.Name != "" {
if configuration.BarmanObjectName != "" {
var targetObjectStore barmancloudv1.ObjectStore
if err := impl.Client.Get(ctx, impl.BarmanObjectKey, &targetObjectStore); err != nil {
if err := impl.Client.Get(ctx, configuration.GetBarmanObjectKey(), &targetObjectStore); err != nil {
return nil, err
}
if err := impl.checkBackupDestination(ctx, &cluster, &targetObjectStore.Spec.Configuration); err != nil {
if err := impl.checkBackupDestination(ctx, configuration.Cluster, &targetObjectStore.Spec.Configuration); err != nil {
return nil, err
}
}
@ -107,9 +96,9 @@ func (impl JobHookImpl) Restore(
backup, env, err := loadBackupObjectFromExternalCluster(
ctx,
impl.Client,
&cluster,
configuration.Cluster,
&recoveryObjectStore.Spec.Configuration,
impl.RecoveryServerName,
configuration.RecoveryServerName,
)
if err != nil {
return nil, err
@ -133,7 +122,7 @@ func (impl JobHookImpl) Restore(
return nil, err
}
if cluster.Spec.WalStorage != nil {
if configuration.Cluster.Spec.WalStorage != nil {
if _, err := impl.restoreCustomWalDir(ctx); err != nil {
return nil, err
}

View File

@ -18,14 +18,6 @@ type CNPGI struct {
PluginPath string
SpoolDirectory string
BarmanObjectKey client.ObjectKey
ServerName string
RecoveryBarmanObjectKey client.ObjectKey
RecoveryServerName string
ClusterObjectKey client.ObjectKey
Client client.Client
PGDataPath string
InstanceName string
@ -38,32 +30,18 @@ func (c *CNPGI) Start(ctx context.Context) error {
enrich := func(server *grpc.Server) error {
wal.RegisterWALServer(server, common.WALServiceImplementation{
ClusterObjectKey: c.ClusterObjectKey,
InstanceName: c.InstanceName,
Client: c.Client,
SpoolDirectory: c.SpoolDirectory,
PGDataPath: c.PGDataPath,
PGWALPath: path.Join(c.PGDataPath, "pg_wal"),
BarmanObjectKey: c.BarmanObjectKey,
ServerName: c.ServerName,
RecoveryBarmanObjectKey: c.RecoveryBarmanObjectKey,
RecoveryServerName: c.RecoveryServerName,
InstanceName: c.InstanceName,
Client: c.Client,
SpoolDirectory: c.SpoolDirectory,
PGDataPath: c.PGDataPath,
PGWALPath: path.Join(c.PGDataPath, "pg_wal"),
})
restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{
Client: c.Client,
ClusterObjectKey: c.ClusterObjectKey,
SpoolDirectory: c.SpoolDirectory,
PgDataPath: c.PGDataPath,
PgWalFolderToSymlink: PgWalVolumePgWalPath,
BarmanObjectKey: c.BarmanObjectKey,
ServerName: c.ServerName,
RecoveryBarmanObjectKey: c.RecoveryBarmanObjectKey,
RecoveryServerName: c.RecoveryServerName,
})
common.AddHealthCheck(server)