From d5d0008dc1fd4f2e9abbe84e55497597a11aef3c Mon Sep 17 00:00:00 2001 From: Leonardo Cecchi Date: Fri, 29 Nov 2024 11:50:42 +0100 Subject: [PATCH] feat: separate recovery and cluster object store Signed-off-by: Leonardo Cecchi --- docs/examples/cluster-restore-archive.yaml | 27 ++++++ docs/examples/minio-store-bis.yaml | 23 +++++ docs/minio-bis/minio-deployment.yaml | 42 ++++++++++ docs/minio-bis/minio-pvc.yaml | 11 +++ docs/minio-bis/minio-secret.yaml | 7 ++ docs/minio-bis/minio-service.yaml | 11 +++ internal/cmd/instance/main.go | 6 +- internal/cmd/restore/main.go | 11 ++- internal/cnpgi/common/wal.go | 83 ++++++++++++++----- .../cnpgi/instance/internal/client/client.go | 36 +++++--- .../instance/internal/client/client_test.go | 4 +- internal/cnpgi/instance/manager.go | 57 +++++++++---- internal/cnpgi/instance/start.go | 16 +++- internal/cnpgi/operator/config/config.go | 4 +- internal/cnpgi/operator/lifecycle.go | 41 +++++---- internal/cnpgi/operator/reconciler.go | 58 +++++-------- internal/cnpgi/operator/specs/role.go | 35 ++++---- internal/cnpgi/restore/manager.go | 28 +++++-- internal/cnpgi/restore/restore.go | 66 ++++----------- internal/cnpgi/restore/start.go | 34 ++++++-- 20 files changed, 408 insertions(+), 192 deletions(-) create mode 100644 docs/examples/cluster-restore-archive.yaml create mode 100644 docs/examples/minio-store-bis.yaml create mode 100644 docs/minio-bis/minio-deployment.yaml create mode 100644 docs/minio-bis/minio-pvc.yaml create mode 100644 docs/minio-bis/minio-secret.yaml create mode 100644 docs/minio-bis/minio-service.yaml diff --git a/docs/examples/cluster-restore-archive.yaml b/docs/examples/cluster-restore-archive.yaml new file mode 100644 index 0000000..5c133de --- /dev/null +++ b/docs/examples/cluster-restore-archive.yaml @@ -0,0 +1,27 @@ +apiVersion: postgresql.cnpg.io/v1 +kind: Cluster +metadata: + name: cluster-restore +spec: + instances: 3 + imagePullPolicy: IfNotPresent + + bootstrap: + recovery: + source: source + + plugins: + - name: barman-cloud.cloudnative-pg.io + parameters: + barmanObjectName: minio-store-bis + + externalClusters: + - name: source + plugin: + name: barman-cloud.cloudnative-pg.io + parameters: + barmanObjectName: minio-store + serverName: cluster-example + + storage: + size: 1Gi diff --git a/docs/examples/minio-store-bis.yaml b/docs/examples/minio-store-bis.yaml new file mode 100644 index 0000000..e7bcfa0 --- /dev/null +++ b/docs/examples/minio-store-bis.yaml @@ -0,0 +1,23 @@ +apiVersion: barmancloud.cnpg.io/v1 +kind: ObjectStore +metadata: + name: minio-store-bis +spec: + configuration: + destinationPath: s3://backups/ + endpointURL: http://minio-bis:9000 + s3Credentials: + accessKeyId: + name: minio-bis + key: ACCESS_KEY_ID + secretAccessKey: + name: minio-bis + key: ACCESS_SECRET_KEY + wal: + compression: gzip + data: + additionalCommandArgs: + - "--min-chunk-size=5MB" + - "--read-timeout=60" + - "-vv" + diff --git a/docs/minio-bis/minio-deployment.yaml b/docs/minio-bis/minio-deployment.yaml new file mode 100644 index 0000000..c92e11e --- /dev/null +++ b/docs/minio-bis/minio-deployment.yaml @@ -0,0 +1,42 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: minio-bis + labels: + app: minio-bis +spec: + replicas: 1 + selector: + matchLabels: + app: minio-bis + template: + metadata: + labels: + app: minio-bis + spec: + containers: + - name: minio + image: minio/minio + ports: + - containerPort: 9000 + volumeMounts: + - mountPath: /data + name: data + args: + - server + - /data + env: + - name: MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-bis + key: ACCESS_KEY_ID + - name: MINIO_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-bis + key: ACCESS_SECRET_KEY + volumes: + - name: data + persistentVolumeClaim: + claimName: minio-bis diff --git a/docs/minio-bis/minio-pvc.yaml b/docs/minio-bis/minio-pvc.yaml new file mode 100644 index 0000000..26a7429 --- /dev/null +++ b/docs/minio-bis/minio-pvc.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: minio-bis +spec: + accessModes: + - ReadWriteOnce + volumeMode: Filesystem + resources: + requests: + storage: 1Gi diff --git a/docs/minio-bis/minio-secret.yaml b/docs/minio-bis/minio-secret.yaml new file mode 100644 index 0000000..d7aee12 --- /dev/null +++ b/docs/minio-bis/minio-secret.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +data: + ACCESS_KEY_ID: dVo5YWVndW9OZ29vY2g4bG9odHU4aXRlaTJhaHY0ZGE= + ACCESS_SECRET_KEY: ZWV6b2hkOFNpbm9oeG9od2VpbmdvbjhhaXI1T2h5b2g= +kind: Secret +metadata: + name: minio-bis diff --git a/docs/minio-bis/minio-service.yaml b/docs/minio-bis/minio-service.yaml new file mode 100644 index 0000000..d48f368 --- /dev/null +++ b/docs/minio-bis/minio-service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: minio-bis +spec: + selector: + app: minio-bis + ports: + - protocol: TCP + port: 9000 + targetPort: 9000 diff --git a/internal/cmd/instance/main.go b/internal/cmd/instance/main.go index 8c02b47..cbb2b44 100644 --- a/internal/cmd/instance/main.go +++ b/internal/cmd/instance/main.go @@ -36,12 +36,16 @@ func NewCmd() *cobra.Command { } _ = viper.BindEnv("namespace", "NAMESPACE") - _ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME") _ = viper.BindEnv("cluster-name", "CLUSTER_NAME") _ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") + + _ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME") _ = viper.BindEnv("server-name", "SERVER_NAME") + _ = viper.BindEnv("recovery-barman-object-name", "RECOVERY_BARMAN_OBJECT_NAME") + _ = viper.BindEnv("recovery-server-name", "RECOVERY_SERVER_NAME") + return cmd } diff --git a/internal/cmd/restore/main.go b/internal/cmd/restore/main.go index 16d45eb..617caf5 100644 --- a/internal/cmd/restore/main.go +++ b/internal/cmd/restore/main.go @@ -23,8 +23,11 @@ func NewCmd() *cobra.Command { "cluster-name", "pod-name", "spool-directory", - "barman-object-name", - "server-name", + + // IMPORTANT: barman-object-name and server-name are not required + // to restore a cluster. + "recovery-barman-object-name", + "recovery-server-name", } for _, k := range requiredSettings { @@ -42,8 +45,12 @@ func NewCmd() *cobra.Command { _ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") + _ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME") _ = viper.BindEnv("server-name", "SERVER_NAME") + _ = viper.BindEnv("recovery-barman-object-name", "RECOVERY_BARMAN_OBJECT_NAME") + _ = viper.BindEnv("recovery-server-name", "RECOVERY_SERVER_NAME") + return cmd } diff --git a/internal/cnpgi/common/wal.go b/internal/cnpgi/common/wal.go index fa212b9..8880ac3 100644 --- a/internal/cnpgi/common/wal.go +++ b/internal/cnpgi/common/wal.go @@ -24,15 +24,19 @@ import ( // WALServiceImplementation is the implementation of the WAL Service type WALServiceImplementation struct { - ServerName string - BarmanObjectKey client.ObjectKey + wal.UnimplementedWALServer ClusterObjectKey client.ObjectKey Client client.Client InstanceName string SpoolDirectory string PGDataPath string PGWALPath string - wal.UnimplementedWALServer + + BarmanObjectKey client.ObjectKey + ServerName string + + RecoveryBarmanObjectKey client.ObjectKey + RecoveryServerName string } // GetCapabilities implements the WALService interface @@ -123,8 +127,9 @@ func (w WALServiceImplementation) Restore( ctx context.Context, request *wal.WALRestoreRequest, ) (*wal.WALRestoreResult, error) { - contextLogger := log.FromContext(ctx) - startTime := time.Now() + // TODO: build full paths + walName := request.GetSourceWalName() + destinationPath := request.GetDestinationFileName() var cluster cnpgv1.Cluster if err := w.Client.Get(ctx, w.ClusterObjectKey, &cluster); err != nil { @@ -132,13 +137,45 @@ func (w WALServiceImplementation) Restore( } var objectStore barmancloudv1.ObjectStore - if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { - return nil, err + var serverName string + + switch { + case cluster.IsReplica() && cluster.Status.CurrentPrimary == w.InstanceName: + // Designated primary on replica cluster, using recovery object store + serverName = w.RecoveryServerName + if err := w.Client.Get(ctx, w.RecoveryBarmanObjectKey, &objectStore); err != nil { + return nil, err + } + + case cluster.Status.CurrentPrimary == "": + // Recovery from object store, using recovery object store + serverName = w.RecoveryServerName + if err := w.Client.Get(ctx, w.RecoveryBarmanObjectKey, &objectStore); err != nil { + return nil, err + } + + default: + // Using cluster object store + serverName = w.ServerName + if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { + return nil, err + } } - // TODO: build full paths - walName := request.GetSourceWalName() - destinationPath := request.GetDestinationFileName() + return &wal.WALRestoreResult{}, w.restoreFromBarmanObjectStore( + ctx, &cluster, &objectStore, serverName, walName, destinationPath) +} + +func (w WALServiceImplementation) restoreFromBarmanObjectStore( + ctx context.Context, + cluster *cnpgv1.Cluster, + objectStore *barmancloudv1.ObjectStore, + serverName string, + walName string, + destinationPath string, +) error { + contextLogger := log.FromContext(ctx) + startTime := time.Now() barmanConfiguration := &objectStore.Spec.Configuration @@ -151,37 +188,37 @@ func (w WALServiceImplementation) Restore( os.Environ(), ) if err != nil { - return nil, fmt.Errorf("while getting recover credentials: %w", err) + return fmt.Errorf("while getting recover credentials: %w", err) } env = MergeEnv(env, credentialsEnv) - options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, w.ServerName) + options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, serverName) if err != nil { - return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err) + return fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err) } // Create the restorer var walRestorer *barmanRestorer.WALRestorer if walRestorer, err = barmanRestorer.New(ctx, env, w.SpoolDirectory); err != nil { - return nil, fmt.Errorf("while creating the restorer: %w", err) + return fmt.Errorf("while creating the restorer: %w", err) } // Step 1: check if this WAL file is not already in the spool var wasInSpool bool if wasInSpool, err = walRestorer.RestoreFromSpool(walName, destinationPath); err != nil { - return nil, fmt.Errorf("while restoring a file from the spool directory: %w", err) + return fmt.Errorf("while restoring a file from the spool directory: %w", err) } if wasInSpool { contextLogger.Info("Restored WAL file from spool (parallel)", "walName", walName, ) - return nil, nil + return nil } // We skip this step if streaming connection is not available - if isStreamingAvailable(&cluster, w.InstanceName) { + if isStreamingAvailable(cluster, w.InstanceName) { if err := checkEndOfWALStreamFlag(walRestorer); err != nil { - return nil, err + return err } } @@ -194,7 +231,7 @@ func (w WALServiceImplementation) Restore( if IsWALFile(walName) { // If this is a regular WAL file, we try to prefetch if walFilesList, err = gatherWALFilesToRestore(walName, maxParallel); err != nil { - return nil, fmt.Errorf("while generating the list of WAL files to restore: %w", err) + return fmt.Errorf("while generating the list of WAL files to restore: %w", err) } } else { // This is not a regular WAL file, we fetch it directly @@ -209,18 +246,18 @@ func (w WALServiceImplementation) Restore( // is the one that PostgreSQL has requested to restore. // The failure has already been logged in walRestorer.RestoreList method if walStatus[0].Err != nil { - return nil, walStatus[0].Err + return walStatus[0].Err } // We skip this step if streaming connection is not available endOfWALStream := isEndOfWALStream(walStatus) - if isStreamingAvailable(&cluster, w.InstanceName) && endOfWALStream { + if isStreamingAvailable(cluster, w.InstanceName) && endOfWALStream { contextLogger.Info( "Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found") err = walRestorer.SetEndOfWALStream() if err != nil { - return nil, err + return err } } @@ -241,7 +278,7 @@ func (w WALServiceImplementation) Restore( "downloadTotalTime", time.Since(downloadStartTime), "totalTime", time.Since(startTime)) - return &wal.WALRestoreResult{}, nil + return nil } // Status implements the WALService interface diff --git a/internal/cnpgi/instance/internal/client/client.go b/internal/cnpgi/instance/internal/client/client.go index 66da622..cab7ebc 100644 --- a/internal/cnpgi/instance/internal/client/client.go +++ b/internal/cnpgi/instance/internal/client/client.go @@ -3,6 +3,7 @@ package client import ( "context" "fmt" + "math" "sync" "time" @@ -21,32 +22,41 @@ type cachedSecret struct { // ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers type ExtendedClient struct { client.Client - barmanObjectKey client.ObjectKey - cachedSecrets []*cachedSecret - mux *sync.Mutex - ttl int + barmanObjectKeys []client.ObjectKey + cachedSecrets []*cachedSecret + mux *sync.Mutex + ttl int } // NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation func NewExtendedClient( baseClient client.Client, - objectStoreKey client.ObjectKey, + objectStoreKeys []client.ObjectKey, ) client.Client { return &ExtendedClient{ - Client: baseClient, - barmanObjectKey: objectStoreKey, - mux: &sync.Mutex{}, + Client: baseClient, + barmanObjectKeys: objectStoreKeys, + mux: &sync.Mutex{}, } } func (e *ExtendedClient) refreshTTL(ctx context.Context) error { - var object v1.ObjectStore - if err := e.Get(ctx, e.barmanObjectKey, &object); err != nil { - return fmt.Errorf("failed to get the object store while refreshing the TTL parameter: %w", err) + minTTL := math.MaxInt + + for _, key := range e.barmanObjectKeys { + var object v1.ObjectStore + + if err := e.Get(ctx, key, &object); err != nil { + return fmt.Errorf("failed to get the object store while refreshing the TTL parameter: %w", err) + } + + currentTTL := object.Spec.InstanceSidecarConfiguration.GetCacheTTL() + if currentTTL < minTTL { + minTTL = currentTTL + } } - e.ttl = object.Spec.InstanceSidecarConfiguration.GetCacheTTL() - + e.ttl = minTTL return nil } diff --git a/internal/cnpgi/instance/internal/client/client_test.go b/internal/cnpgi/instance/internal/client/client_test.go index d5fb68d..be51749 100644 --- a/internal/cnpgi/instance/internal/client/client_test.go +++ b/internal/cnpgi/instance/internal/client/client_test.go @@ -55,7 +55,9 @@ var _ = Describe("ExtendedClient Get", func() { baseClient := fake.NewClientBuilder(). WithScheme(scheme). WithObjects(secretInClient, objectStore).Build() - extendedClient = NewExtendedClient(baseClient, client.ObjectKeyFromObject(objectStore)).(*ExtendedClient) + extendedClient = NewExtendedClient(baseClient, []client.ObjectKey{ + client.ObjectKeyFromObject(objectStore), + }).(*ExtendedClient) }) It("returns secret from cache if not expired", func(ctx SpecContext) { diff --git a/internal/cnpgi/instance/manager.go b/internal/cnpgi/instance/manager.go index a28fd81..7bc8e83 100644 --- a/internal/cnpgi/instance/manager.go +++ b/internal/cnpgi/instance/manager.go @@ -9,6 +9,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" @@ -33,20 +34,16 @@ func Start(ctx context.Context) error { setupLog := log.FromContext(ctx) setupLog.Info("Starting barman cloud instance plugin") namespace := viper.GetString("namespace") - boName := viper.GetString("barman-object-name") clusterName := viper.GetString("cluster-name") podName := viper.GetString("pod-name") - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + barmanObjectName := viper.GetString("barman-object-name") + recoveryBarmanObjectName := viper.GetString("recovery-barman-object-name") + + controllerOptions := ctrl.Options{ Scheme: scheme, Cache: cache.Options{ ByObject: map[client.Object]cache.ByObject{ - &barmancloudv1.ObjectStore{}: { - Field: fields.OneTermEqualSelector("metadata.name", boName), - Namespaces: map[string]cache.Config{ - namespace: {}, - }, - }, &cnpgv1.Cluster{}: { Field: fields.OneTermEqualSelector("metadata.name", clusterName), Namespaces: map[string]cache.Config{ @@ -62,7 +59,23 @@ func Start(ctx context.Context) error { }, }, }, - }) + } + + if len(recoveryBarmanObjectName) == 0 { + controllerOptions.Cache.ByObject[&barmancloudv1.ObjectStore{}] = cache.ByObject{ + Field: fields.OneTermEqualSelector("metadata.name", barmanObjectName), + Namespaces: map[string]cache.Config{ + namespace: {}, + }, + } + } else { + controllerOptions.Client.Cache.DisableFor = append( + controllerOptions.Client.Cache.DisableFor, + &barmancloudv1.ObjectStore{}, + ) + } + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), controllerOptions) if err != nil { setupLog.Error(err, "unable to start manager") return err @@ -70,23 +83,39 @@ func Start(ctx context.Context) error { barmanObjectKey := client.ObjectKey{ Namespace: namespace, - Name: boName, + Name: barmanObjectName, + } + recoveryBarmanObjectKey := client.ObjectKey{ + Namespace: namespace, + Name: recoveryBarmanObjectName, + } + + involvedObjectStores := make([]types.NamespacedName, 0, 2) + if len(barmanObjectName) > 0 { + involvedObjectStores = append(involvedObjectStores, barmanObjectKey) + } + if len(recoveryBarmanObjectName) > 0 { + involvedObjectStores = append(involvedObjectStores, recoveryBarmanObjectKey) } if err := mgr.Add(&CNPGI{ - Client: extendedclient.NewExtendedClient(mgr.GetClient(), barmanObjectKey), + Client: extendedclient.NewExtendedClient(mgr.GetClient(), involvedObjectStores), ClusterObjectKey: client.ObjectKey{ Namespace: namespace, Name: clusterName, }, - BarmanObjectKey: barmanObjectKey, - ServerName: viper.GetString("server-name"), - InstanceName: podName, + InstanceName: podName, // TODO: improve PGDataPath: viper.GetString("pgdata"), PGWALPath: path.Join(viper.GetString("pgdata"), "pg_wal"), SpoolDirectory: viper.GetString("spool-directory"), PluginPath: viper.GetString("plugin-path"), + + BarmanObjectKey: barmanObjectKey, + ServerName: viper.GetString("server-name"), + + RecoveryBarmanObjectKey: recoveryBarmanObjectKey, + RecoveryServerName: viper.GetString("recovery-server-name"), }); err != nil { setupLog.Error(err, "unable to create CNPGI runnable") return err diff --git a/internal/cnpgi/instance/start.go b/internal/cnpgi/instance/start.go index 5c4a319..6e7dedc 100644 --- a/internal/cnpgi/instance/start.go +++ b/internal/cnpgi/instance/start.go @@ -15,8 +15,6 @@ import ( // CNPGI is the implementation of the PostgreSQL sidecar type CNPGI struct { Client client.Client - BarmanObjectKey client.ObjectKey - ServerName string ClusterObjectKey client.ObjectKey PGDataPath string PGWALPath string @@ -24,20 +22,30 @@ type CNPGI struct { // mutually exclusive with serverAddress PluginPath string InstanceName string + + BarmanObjectKey client.ObjectKey + ServerName string + + RecoveryBarmanObjectKey client.ObjectKey + RecoveryServerName string } // Start starts the GRPC service func (c *CNPGI) Start(ctx context.Context) error { enrich := func(server *grpc.Server) error { wal.RegisterWALServer(server, common.WALServiceImplementation{ - BarmanObjectKey: c.BarmanObjectKey, ClusterObjectKey: c.ClusterObjectKey, - ServerName: c.ServerName, InstanceName: c.InstanceName, Client: c.Client, SpoolDirectory: c.SpoolDirectory, PGDataPath: c.PGDataPath, PGWALPath: c.PGWALPath, + + BarmanObjectKey: c.BarmanObjectKey, + ServerName: c.ServerName, + + RecoveryBarmanObjectKey: c.RecoveryBarmanObjectKey, + RecoveryServerName: c.RecoveryServerName, }) backup.RegisterBackupServer(server, BackupServiceImplementation{ Client: c.Client, diff --git a/internal/cnpgi/operator/config/config.go b/internal/cnpgi/operator/config/config.go index 9215cda..7c6e231 100644 --- a/internal/cnpgi/operator/config/config.go +++ b/internal/cnpgi/operator/config/config.go @@ -47,7 +47,7 @@ type PluginConfiguration struct { BarmanObjectName string ServerName string RecoveryBarmanObjectName string - RecoveryBarmanServerName string + RecoveryServerName string } // NewFromCluster extracts the configuration from the cluster @@ -82,7 +82,7 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration { BarmanObjectName: helper.Parameters["barmanObjectName"], ServerName: serverName, // used for restore/wal_restore - RecoveryBarmanServerName: recoveryServerName, + RecoveryServerName: recoveryServerName, RecoveryBarmanObjectName: recoveryBarmanObjectName, } diff --git a/internal/cnpgi/operator/lifecycle.go b/internal/cnpgi/operator/lifecycle.go index c7ab18b..841ec3c 100644 --- a/internal/cnpgi/operator/lifecycle.go +++ b/internal/cnpgi/operator/lifecycle.go @@ -115,13 +115,6 @@ func reconcileJob( return nil, nil } - // Since we're recovering from an existing object store, - // we set our primary object store name to the recovery one. - // This won't be needed anymore when wal-restore will be able - // to check two object stores - pluginConfiguration.BarmanObjectName = pluginConfiguration.RecoveryBarmanObjectName - pluginConfiguration.ServerName = pluginConfiguration.RecoveryBarmanServerName - var job batchv1.Job if err := decoder.DecodeObject( request.GetObjectDefinition(), @@ -219,14 +212,6 @@ func reconcilePodSpec( Name: "CLUSTER_NAME", Value: cluster.Name, }, - { - Name: "BARMAN_OBJECT_NAME", - Value: cfg.BarmanObjectName, - }, - { - Name: "SERVER_NAME", - Value: cfg.ServerName, - }, { // TODO: should we really use this one? // should we mount an emptyDir volume just for that? @@ -235,6 +220,32 @@ func reconcilePodSpec( }, } + if len(cfg.BarmanObjectName) > 0 { + envs = append(envs, + corev1.EnvVar{ + Name: "BARMAN_OBJECT_NAME", + Value: cfg.BarmanObjectName, + }, + corev1.EnvVar{ + Name: "SERVER_NAME", + Value: cfg.ServerName, + }, + ) + } + + if len(cfg.RecoveryBarmanObjectName) > 0 { + envs = append(envs, + corev1.EnvVar{ + Name: "RECOVERY_BARMAN_OBJECT_NAME", + Value: cfg.RecoveryBarmanObjectName, + }, + corev1.EnvVar{ + Name: "RECOVERY_SERVER_NAME", + Value: cfg.RecoveryServerName, + }, + ) + } + baseProbe := &corev1.Probe{ FailureThreshold: 3, ProbeHandler: corev1.ProbeHandler{ diff --git a/internal/cnpgi/operator/reconciler.go b/internal/cnpgi/operator/reconciler.go index ee66415..c00bbea 100644 --- a/internal/cnpgi/operator/reconciler.go +++ b/internal/cnpgi/operator/reconciler.go @@ -15,7 +15,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" - "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/specs" ) @@ -78,7 +77,6 @@ func (r ReconcilerImplementation) Pre( var barmanObjects []barmancloudv1.ObjectStore - // this could be empty during recoveries if pluginConfiguration.BarmanObjectName != "" { var barmanObject barmancloudv1.ObjectStore if err := r.Client.Get(ctx, client.ObjectKey{ @@ -86,7 +84,10 @@ func (r ReconcilerImplementation) Pre( Name: pluginConfiguration.BarmanObjectName, }, &barmanObject); err != nil { if apierrs.IsNotFound(err) { - contextLogger.Info("barman object configuration not found, requeuing") + contextLogger.Info( + "barman object configuration not found, requeuing", + "name", pluginConfiguration.BarmanObjectName, + "namespace", cluster.Namespace) return &reconciler.ReconcilerHooksResult{ Behavior: reconciler.ReconcilerHooksResult_BEHAVIOR_REQUEUE, }, nil @@ -98,15 +99,26 @@ func (r ReconcilerImplementation) Pre( barmanObjects = append(barmanObjects, barmanObject) } - if barmanObject, err := r.getRecoveryBarmanObject(ctx, &cluster); err != nil { - if apierrs.IsNotFound(err) { - contextLogger.Info("barman recovery object configuration not found, requeuing") - return &reconciler.ReconcilerHooksResult{ - Behavior: reconciler.ReconcilerHooksResult_BEHAVIOR_REQUEUE, - }, nil + if pluginConfiguration.RecoveryBarmanObjectName != "" { + var barmanObject barmancloudv1.ObjectStore + if err := r.Client.Get(ctx, client.ObjectKey{ + Namespace: cluster.Namespace, + Name: pluginConfiguration.RecoveryBarmanObjectName, + }, &barmanObject); err != nil { + if apierrs.IsNotFound(err) { + contextLogger.Info( + "barman recovery object configuration not found, requeuing", + "name", pluginConfiguration.RecoveryBarmanObjectName, + "namespace", cluster.Namespace, + ) + return &reconciler.ReconcilerHooksResult{ + Behavior: reconciler.ReconcilerHooksResult_BEHAVIOR_REQUEUE, + }, nil + } + return nil, err } - } else if barmanObject != nil { - barmanObjects = append(barmanObjects, *barmanObject) + + barmanObjects = append(barmanObjects, barmanObject) } var additionalSecretNames []string @@ -124,30 +136,6 @@ func (r ReconcilerImplementation) Pre( }, nil } -func (r ReconcilerImplementation) getRecoveryBarmanObject( - ctx context.Context, - cluster *cnpgv1.Cluster, -) (*barmancloudv1.ObjectStore, error) { - recoveryConfig := cluster.GetRecoverySourcePlugin() - if recoveryConfig != nil && recoveryConfig.Name == metadata.PluginName { - // TODO: refactor -> cnpg-i-machinery should be able to help us on getting - // the configuration for a recovery plugin - if recoveryObjectStore, ok := recoveryConfig.Parameters["barmanObjectName"]; ok { - var barmanObject barmancloudv1.ObjectStore - if err := r.Client.Get(ctx, client.ObjectKey{ - Namespace: cluster.Namespace, - Name: recoveryObjectStore, - }, &barmanObject); err != nil { - return nil, err - } - - return &barmanObject, nil - } - } - - return nil, nil -} - // Post implements the reconciler interface func (r ReconcilerImplementation) Post( _ context.Context, diff --git a/internal/cnpgi/operator/specs/role.go b/internal/cnpgi/operator/specs/role.go index 2fe521a..0b1cdc7 100644 --- a/internal/cnpgi/operator/specs/role.go +++ b/internal/cnpgi/operator/specs/role.go @@ -27,24 +27,10 @@ func BuildRole( } secretsSet := stringset.New() - for _, barmanObject := range barmanObjects { - role.Rules = append(role.Rules, rbacv1.PolicyRule{ - APIGroups: []string{ - "barmancloud.cnpg.io", - }, - Verbs: []string{ - "get", - "watch", - "list", - }, - Resources: []string{ - "objectstores", - }, - ResourceNames: []string{ - barmanObject.Name, - }, - }) + barmanObjectsSet := stringset.New() + for _, barmanObject := range barmanObjects { + barmanObjectsSet.Put(barmanObject.Name) for _, secret := range CollectSecretNamesFromCredentials(&barmanObject.Spec.Configuration.BarmanCredentials) { secretsSet.Put(secret) } @@ -54,6 +40,21 @@ func BuildRole( secretsSet.Put(secret) } + role.Rules = append(role.Rules, rbacv1.PolicyRule{ + APIGroups: []string{ + "barmancloud.cnpg.io", + }, + Verbs: []string{ + "get", + "watch", + "list", + }, + Resources: []string{ + "objectstores", + }, + ResourceNames: barmanObjectsSet.ToSortedList(), + }) + role.Rules = append(role.Rules, rbacv1.PolicyRule{ APIGroups: []string{ "", diff --git a/internal/cnpgi/restore/manager.go b/internal/cnpgi/restore/manager.go index 7571491..8926d15 100644 --- a/internal/cnpgi/restore/manager.go +++ b/internal/cnpgi/restore/manager.go @@ -32,7 +32,12 @@ func Start(ctx context.Context) error { setupLog.Info("Starting barman cloud instance plugin") namespace := viper.GetString("namespace") clusterName := viper.GetString("cluster-name") - boName := viper.GetString("barman-object-name") + + recoveryBarmanObjectName := viper.GetString("recovery-barman-object-name") + recoveryServerName := viper.GetString("recovery-server-name") + + barmanObjectName := viper.GetString("barman-object-name") + serverName := viper.GetString("server-name") objs := map[client.Object]cache.ByObject{ &cnpgv1.Cluster{}: { @@ -43,9 +48,9 @@ func Start(ctx context.Context) error { }, } - if boName != "" { + if recoveryBarmanObjectName != "" { objs[&barmancloudv1.ObjectStore{}] = cache.ByObject{ - Field: fields.OneTermEqualSelector("metadata.name", boName), + Field: fields.OneTermEqualSelector("metadata.name", recoveryBarmanObjectName), Namespaces: map[string]cache.Config{ namespace: {}, }, @@ -74,10 +79,6 @@ func Start(ctx context.Context) error { if err := mgr.Add(&CNPGI{ PluginPath: viper.GetString("plugin-path"), SpoolDirectory: viper.GetString("spool-directory"), - BarmanObjectKey: client.ObjectKey{ - Namespace: namespace, - Name: boName, - }, ClusterObjectKey: client.ObjectKey{ Namespace: namespace, Name: clusterName, @@ -85,7 +86,18 @@ func Start(ctx context.Context) error { Client: mgr.GetClient(), PGDataPath: viper.GetString("pgdata"), InstanceName: viper.GetString("pod-name"), - ServerName: viper.GetString("server-name"), + + ServerName: serverName, + BarmanObjectKey: client.ObjectKey{ + Namespace: namespace, + Name: barmanObjectName, + }, + + RecoveryServerName: recoveryServerName, + RecoveryBarmanObjectKey: client.ObjectKey{ + Namespace: namespace, + Name: recoveryBarmanObjectName, + }, }); err != nil { setupLog.Error(err, "unable to create CNPGI runnable") return err diff --git a/internal/cnpgi/restore/restore.go b/internal/cnpgi/restore/restore.go index 696cd97..2837053 100644 --- a/internal/cnpgi/restore/restore.go +++ b/internal/cnpgi/restore/restore.go @@ -43,9 +43,16 @@ const ( // JobHookImpl is the implementation of the restore job hooks type JobHookImpl struct { restore.UnimplementedRestoreJobHooksServer - Client client.Client - ClusterObjectKey client.ObjectKey - BackupToRestore client.ObjectKey + + Client client.Client + ClusterObjectKey client.ObjectKey + + BarmanObjectKey types.NamespacedName + ServerName string + + RecoveryBarmanObjectKey types.NamespacedName + RecoveryServerName string + SpoolDirectory string PgDataPath string PgWalFolderToSymlink string @@ -80,38 +87,17 @@ func (impl JobHookImpl) Restore( return nil, err } - recoveryPluginConfiguration := cluster.GetRecoverySourcePlugin() - var recoveryObjectStore barmancloudv1.ObjectStore - if err := impl.Client.Get(ctx, types.NamespacedName{ - Namespace: cluster.Namespace, - // TODO: refactor -> cnpg-i-machinery should be able to help us on getting - // the configuration for a recovery plugin - Name: recoveryPluginConfiguration.Parameters["barmanObjectName"], - }, &recoveryObjectStore); err != nil { + if err := impl.Client.Get(ctx, impl.RecoveryBarmanObjectKey, &recoveryObjectStore); err != nil { return nil, err } - var targetObjectStoreName types.NamespacedName - for _, plugin := range cluster.Spec.Plugins { - if plugin.IsEnabled() && plugin.Name == metadata.PluginName { - targetObjectStoreName = types.NamespacedName{ - Namespace: cluster.Namespace, - Name: plugin.Parameters["barmanObjectName"], - } - } - } - - var targetObjectStore barmancloudv1.ObjectStore - if targetObjectStoreName.Name != "" { - if err := impl.Client.Get(ctx, targetObjectStoreName, &targetObjectStore); err != nil { + if impl.BarmanObjectKey.Name != "" { + var targetObjectStore barmancloudv1.ObjectStore + if err := impl.Client.Get(ctx, impl.BarmanObjectKey, &targetObjectStore); err != nil { return nil, err } - } - // Before starting the restore we check if the archive destination is safe to use, - // otherwise we stop creating the cluster - if targetObjectStoreName.Name != "" { if err := impl.checkBackupDestination(ctx, &cluster, &targetObjectStore.Spec.Configuration); err != nil { return nil, err } @@ -123,6 +109,7 @@ func (impl JobHookImpl) Restore( impl.Client, &cluster, &recoveryObjectStore.Spec.Configuration, + impl.RecoveryServerName, ) if err != nil { return nil, err @@ -353,30 +340,13 @@ func loadBackupObjectFromExternalCluster( typedClient client.Client, cluster *cnpgv1.Cluster, recoveryObjectStore *api.BarmanObjectStoreConfiguration, + serverName string, ) (*cnpgv1.Backup, []string, error) { contextLogger := log.FromContext(ctx) - sourceName := cluster.Spec.Bootstrap.Recovery.Source - - if sourceName == "" { - return nil, nil, fmt.Errorf("recovery source not specified") - } - - server, found := cluster.ExternalCluster(sourceName) - if !found { - return nil, nil, fmt.Errorf("missing external cluster: %v", sourceName) - } - - // TODO: document this, should this be in the helper? - var serverName string - if pluginServerName, ok := server.PluginConfiguration.Parameters["serverName"]; ok { - serverName = pluginServerName - } else { - serverName = server.Name - } contextLogger.Info("Recovering from external cluster", - "sourceName", sourceName, - "serverName", serverName) + "serverName", serverName, + "objectStore", recoveryObjectStore) env, err := barmanCredentials.EnvSetRestoreCloudCredentials( ctx, diff --git a/internal/cnpgi/restore/start.go b/internal/cnpgi/restore/start.go index aef1449..0e0f0cc 100644 --- a/internal/cnpgi/restore/start.go +++ b/internal/cnpgi/restore/start.go @@ -15,14 +15,20 @@ import ( // CNPGI is the implementation of the PostgreSQL sidecar type CNPGI struct { - PluginPath string - SpoolDirectory string - BarmanObjectKey client.ObjectKey + PluginPath string + SpoolDirectory string + + BarmanObjectKey client.ObjectKey + ServerName string + + RecoveryBarmanObjectKey client.ObjectKey + RecoveryServerName string + ClusterObjectKey client.ObjectKey - Client client.Client - PGDataPath string - InstanceName string - ServerName string + + Client client.Client + PGDataPath string + InstanceName string } // Start starts the GRPC service @@ -32,14 +38,18 @@ func (c *CNPGI) Start(ctx context.Context) error { enrich := func(server *grpc.Server) error { wal.RegisterWALServer(server, common.WALServiceImplementation{ - BarmanObjectKey: c.BarmanObjectKey, ClusterObjectKey: c.ClusterObjectKey, InstanceName: c.InstanceName, Client: c.Client, SpoolDirectory: c.SpoolDirectory, PGDataPath: c.PGDataPath, PGWALPath: path.Join(c.PGDataPath, "pg_wal"), - ServerName: c.ServerName, + + BarmanObjectKey: c.BarmanObjectKey, + ServerName: c.ServerName, + + RecoveryBarmanObjectKey: c.RecoveryBarmanObjectKey, + RecoveryServerName: c.RecoveryServerName, }) restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{ @@ -48,6 +58,12 @@ func (c *CNPGI) Start(ctx context.Context) error { SpoolDirectory: c.SpoolDirectory, PgDataPath: c.PGDataPath, PgWalFolderToSymlink: PgWalVolumePgWalPath, + + BarmanObjectKey: c.BarmanObjectKey, + ServerName: c.ServerName, + + RecoveryBarmanObjectKey: c.RecoveryBarmanObjectKey, + RecoveryServerName: c.RecoveryServerName, }) common.AddHealthCheck(server)