From afd4603023ce0f245687856eb05d9a30875b8bac Mon Sep 17 00:00:00 2001 From: Armando Ruocco Date: Thu, 28 Nov 2024 14:04:50 +0100 Subject: [PATCH] fix: ensure restore configuration points to manager `wal-restore` (#68) Signed-off-by: Armando Ruocco Signed-off-by: Leonardo Cecchi Signed-off-by: Francesco Canovai Co-authored-by: Leonardo Cecchi Co-authored-by: Francesco Canovai --- internal/cmd/instance/main.go | 2 ++ internal/cmd/restore/main.go | 4 +++ internal/cnpgi/{instance => common}/wal.go | 32 ++++------------- .../cnpgi/{instance => common}/wal_import.go | 2 +- internal/cnpgi/instance/backup.go | 14 ++------ internal/cnpgi/instance/manager.go | 1 + internal/cnpgi/instance/start.go | 7 +++- internal/cnpgi/operator/config/config.go | 11 ++++++ internal/cnpgi/operator/lifecycle.go | 23 +++++++++--- internal/cnpgi/restore/identity.go | 7 ++++ internal/cnpgi/restore/manager.go | 20 +++++++++-- internal/cnpgi/restore/restore.go | 35 +++++-------------- internal/cnpgi/restore/start.go | 18 ++++++++++ 13 files changed, 104 insertions(+), 72 deletions(-) rename internal/cnpgi/{instance => common}/wal.go (92%) rename internal/cnpgi/{instance => common}/wal_import.go (99%) diff --git a/internal/cmd/instance/main.go b/internal/cmd/instance/main.go index 090660e..8c02b47 100644 --- a/internal/cmd/instance/main.go +++ b/internal/cmd/instance/main.go @@ -22,6 +22,7 @@ func NewCmd() *cobra.Command { "cluster-name", "pod-name", "spool-directory", + "server-name", } for _, k := range requiredSettings { @@ -40,6 +41,7 @@ func NewCmd() *cobra.Command { _ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") + _ = viper.BindEnv("server-name", "SERVER_NAME") return cmd } diff --git a/internal/cmd/restore/main.go b/internal/cmd/restore/main.go index bc79b1e..16d45eb 100644 --- a/internal/cmd/restore/main.go +++ b/internal/cmd/restore/main.go @@ -23,6 +23,8 @@ func NewCmd() *cobra.Command { "cluster-name", "pod-name", "spool-directory", + "barman-object-name", + "server-name", } for _, k := range requiredSettings { @@ -40,6 +42,8 @@ func NewCmd() *cobra.Command { _ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") + _ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME") + _ = viper.BindEnv("server-name", "SERVER_NAME") return cmd } diff --git a/internal/cnpgi/instance/wal.go b/internal/cnpgi/common/wal.go similarity index 92% rename from internal/cnpgi/instance/wal.go rename to internal/cnpgi/common/wal.go index 12c8d2a..fa212b9 100644 --- a/internal/cnpgi/instance/wal.go +++ b/internal/cnpgi/common/wal.go @@ -1,4 +1,4 @@ -package instance +package common import ( "context" @@ -19,12 +19,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" - "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/common" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" ) // WALServiceImplementation is the implementation of the WAL Service type WALServiceImplementation struct { + ServerName string BarmanObjectKey client.ObjectKey ClusterObjectKey client.ObjectKey Client client.Client @@ -73,16 +73,6 @@ func (w WALServiceImplementation) Archive( return nil, err } - // TODO: refactor this code elsewhere - serverName := cluster.Name - for _, plugin := range cluster.Spec.Plugins { - if plugin.IsEnabled() && plugin.Name == metadata.PluginName { - if pluginServerName, ok := plugin.Parameters["serverName"]; ok { - serverName = pluginServerName - } - } - } - var objectStore barmancloudv1.ObjectStore if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { return nil, err @@ -112,7 +102,7 @@ func (w WALServiceImplementation) Archive( return nil, err } - options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, serverName) + options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, w.ServerName) if err != nil { return nil, err } @@ -152,7 +142,7 @@ func (w WALServiceImplementation) Restore( barmanConfiguration := &objectStore.Spec.Configuration - env := common.GetRestoreCABundleEnv(barmanConfiguration) + env := GetRestoreCABundleEnv(barmanConfiguration) credentialsEnv, err := barmanCredentials.EnvSetBackupCloudCredentials( ctx, w.Client, @@ -163,19 +153,9 @@ func (w WALServiceImplementation) Restore( if err != nil { return nil, fmt.Errorf("while getting recover credentials: %w", err) } - env = common.MergeEnv(env, credentialsEnv) + env = MergeEnv(env, credentialsEnv) - // TODO: refactor this code elsewhere - serverName := cluster.Name - for _, plugin := range cluster.Spec.Plugins { - if plugin.IsEnabled() && plugin.Name == metadata.PluginName { - if pluginServerName, ok := plugin.Parameters["serverName"]; ok { - serverName = pluginServerName - } - } - } - - options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, serverName) + options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, w.ServerName) if err != nil { return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err) } diff --git a/internal/cnpgi/instance/wal_import.go b/internal/cnpgi/common/wal_import.go similarity index 99% rename from internal/cnpgi/instance/wal_import.go rename to internal/cnpgi/common/wal_import.go index a2005dd..0c4ca62 100644 --- a/internal/cnpgi/instance/wal_import.go +++ b/internal/cnpgi/common/wal_import.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package instance +package common import ( "errors" diff --git a/internal/cnpgi/instance/backup.go b/internal/cnpgi/instance/backup.go index 9ebe96a..5e1e042 100644 --- a/internal/cnpgi/instance/backup.go +++ b/internal/cnpgi/instance/backup.go @@ -31,6 +31,7 @@ type BackupServiceImplementation struct { ClusterObjectKey client.ObjectKey Client client.Client InstanceName string + ServerName string backup.UnimplementedBackupServer } @@ -111,21 +112,12 @@ func (b BackupServiceImplementation) Backup( return nil, err } - serverName := cluster.Name - for _, plugin := range cluster.Spec.Plugins { - if plugin.IsEnabled() && plugin.Name == metadata.PluginName { - if pluginServerName, ok := plugin.Parameters["serverName"]; ok { - serverName = pluginServerName - } - } - } - backupName := fmt.Sprintf("backup-%v", pgTime.ToCompactISO8601(time.Now())) if err = backupCmd.Take( ctx, backupName, - serverName, + b.ServerName, env, barmanCloudExecutor{}, postgres.BackupTemporaryDirectory, @@ -137,7 +129,7 @@ func (b BackupServiceImplementation) Backup( executedBackupInfo, err := backupCmd.GetExecutedBackupInfo( ctx, backupName, - serverName, + b.ServerName, barmanCloudExecutor{}, env) if err != nil { diff --git a/internal/cnpgi/instance/manager.go b/internal/cnpgi/instance/manager.go index dfcb75c..0c4dfa2 100644 --- a/internal/cnpgi/instance/manager.go +++ b/internal/cnpgi/instance/manager.go @@ -81,6 +81,7 @@ func Start(ctx context.Context) error { Name: clusterName, }, BarmanObjectKey: barmanObjectKey, + ServerName: viper.GetString("server-name"), InstanceName: podName, // TODO: improve PGDataPath: viper.GetString("pgdata"), diff --git a/internal/cnpgi/instance/start.go b/internal/cnpgi/instance/start.go index 34cb89a..ce1c6d3 100644 --- a/internal/cnpgi/instance/start.go +++ b/internal/cnpgi/instance/start.go @@ -8,12 +8,15 @@ import ( "github.com/cloudnative-pg/cnpg-i/pkg/wal" "google.golang.org/grpc" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/common" ) // CNPGI is the implementation of the PostgreSQL sidecar type CNPGI struct { Client client.Client BarmanObjectKey client.ObjectKey + ServerName string ClusterObjectKey client.ObjectKey PGDataPath string PGWALPath string @@ -26,9 +29,10 @@ type CNPGI struct { // Start starts the GRPC service func (c *CNPGI) Start(ctx context.Context) error { enrich := func(server *grpc.Server) error { - wal.RegisterWALServer(server, WALServiceImplementation{ + wal.RegisterWALServer(server, common.WALServiceImplementation{ BarmanObjectKey: c.BarmanObjectKey, ClusterObjectKey: c.ClusterObjectKey, + ServerName: c.ServerName, InstanceName: c.InstanceName, Client: c.Client, SpoolDirectory: c.SpoolDirectory, @@ -38,6 +42,7 @@ func (c *CNPGI) Start(ctx context.Context) error { backup.RegisterBackupServer(server, BackupServiceImplementation{ Client: c.Client, BarmanObjectKey: c.BarmanObjectKey, + ServerName: c.ServerName, ClusterObjectKey: c.ClusterObjectKey, InstanceName: c.InstanceName, }) diff --git a/internal/cnpgi/operator/config/config.go b/internal/cnpgi/operator/config/config.go index 53e0de3..9215cda 100644 --- a/internal/cnpgi/operator/config/config.go +++ b/internal/cnpgi/operator/config/config.go @@ -45,6 +45,7 @@ func (e *ConfigurationError) IsEmpty() bool { // PluginConfiguration is the configuration of the plugin type PluginConfiguration struct { BarmanObjectName string + ServerName string RecoveryBarmanObjectName string RecoveryBarmanServerName string } @@ -56,6 +57,15 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration { metadata.PluginName, ) + serverName := cluster.Name + for _, plugin := range cluster.Spec.Plugins { + if plugin.IsEnabled() && plugin.Name == metadata.PluginName { + if pluginServerName, ok := plugin.Parameters["serverName"]; ok { + serverName = pluginServerName + } + } + } + recoveryServerName := "" recoveryBarmanObjectName := "" @@ -70,6 +80,7 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration { result := &PluginConfiguration{ // used for the backup/archive BarmanObjectName: helper.Parameters["barmanObjectName"], + ServerName: serverName, // used for restore/wal_restore RecoveryBarmanServerName: recoveryServerName, RecoveryBarmanObjectName: recoveryBarmanObjectName, diff --git a/internal/cnpgi/operator/lifecycle.go b/internal/cnpgi/operator/lifecycle.go index 8155b37..acceb16 100644 --- a/internal/cnpgi/operator/lifecycle.go +++ b/internal/cnpgi/operator/lifecycle.go @@ -115,6 +115,13 @@ func reconcileJob( return nil, nil } + // Since we're recovering from an existing object store, + // we set our primary object store name to the recovery one. + // This won't be needed anymore when wal-restore will be able + // to check two object stores + pluginConfiguration.BarmanObjectName = pluginConfiguration.RecoveryBarmanObjectName + pluginConfiguration.ServerName = pluginConfiguration.RecoveryBarmanServerName + var job batchv1.Job if err := decoder.DecodeObject( request.GetObjectDefinition(), @@ -175,10 +182,14 @@ func reconcilePod( mutatedPod := pod.DeepCopy() - if err := reconcilePodSpec(pluginConfiguration, cluster, &mutatedPod.Spec, "postgres", corev1.Container{ - Args: []string{"instance"}, - }); err != nil { - return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err) + if len(pluginConfiguration.BarmanObjectName) != 0 { + if err := reconcilePodSpec(pluginConfiguration, cluster, &mutatedPod.Spec, "postgres", corev1.Container{ + Args: []string{"instance"}, + }); err != nil { + return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err) + } + } else { + contextLogger.Debug("No need to mutate instance with no backup & archiving configuration") } patch, err := object.CreatePatch(mutatedPod, pod) @@ -212,6 +223,10 @@ func reconcilePodSpec( Name: "BARMAN_OBJECT_NAME", Value: cfg.BarmanObjectName, }, + { + Name: "SERVER_NAME", + Value: cfg.ServerName, + }, { // TODO: should we really use this one? // should we mount an emptyDir volume just for that? diff --git a/internal/cnpgi/restore/identity.go b/internal/cnpgi/restore/identity.go index 6e70eec..2308898 100644 --- a/internal/cnpgi/restore/identity.go +++ b/internal/cnpgi/restore/identity.go @@ -38,6 +38,13 @@ func (i IdentityImplementation) GetPluginCapabilities( }, }, }, + { + Type: &identity.PluginCapability_Service_{ + Service: &identity.PluginCapability_Service{ + Type: identity.PluginCapability_Service_TYPE_WAL_SERVICE, + }, + }, + }, }, }, nil } diff --git a/internal/cnpgi/restore/manager.go b/internal/cnpgi/restore/manager.go index 9bd79c4..ac00640 100644 --- a/internal/cnpgi/restore/manager.go +++ b/internal/cnpgi/restore/manager.go @@ -33,6 +33,7 @@ func Start(ctx context.Context) error { setupLog.Info("Starting barman cloud instance plugin") namespace := viper.GetString("namespace") clusterName := viper.GetString("cluster-name") + boName := viper.GetString("barman-object-name") objs := map[client.Object]cache.ByObject{ &cnpgv1.Cluster{}: { @@ -43,6 +44,15 @@ func Start(ctx context.Context) error { }, } + if boName != "" { + objs[&barmancloudv1.ObjectStore{}] = cache.ByObject{ + Field: fields.OneTermEqualSelector("metadata.name", boName), + Namespaces: map[string]cache.Config{ + namespace: {}, + }, + } + } + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, Cache: cache.Options{ @@ -65,12 +75,18 @@ func Start(ctx context.Context) error { if err := mgr.Add(&CNPGI{ PluginPath: viper.GetString("plugin-path"), SpoolDirectory: viper.GetString("spool-directory"), + BarmanObjectKey: client.ObjectKey{ + Namespace: namespace, + Name: boName, + }, ClusterObjectKey: client.ObjectKey{ Namespace: namespace, Name: clusterName, }, - Client: mgr.GetClient(), - PGDataPath: viper.GetString("pgdata"), + Client: mgr.GetClient(), + PGDataPath: viper.GetString("pgdata"), + InstanceName: viper.GetString("pod-name"), + ServerName: viper.GetString("server-name"), }); err != nil { setupLog.Error(err, "unable to create CNPGI runnable") return err diff --git a/internal/cnpgi/restore/restore.go b/internal/cnpgi/restore/restore.go index 4ad1745..696cd97 100644 --- a/internal/cnpgi/restore/restore.go +++ b/internal/cnpgi/restore/restore.go @@ -7,7 +7,6 @@ import ( "os" "os/exec" "path" - "strings" "github.com/cloudnative-pg/barman-cloud/pkg/api" barmanArchiver "github.com/cloudnative-pg/barman-cloud/pkg/archiver" @@ -17,6 +16,7 @@ import ( barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials" barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" + "github.com/cloudnative-pg/cloudnative-pg/pkg/postgres" "github.com/cloudnative-pg/cloudnative-pg/pkg/utils" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder" restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job" @@ -152,10 +152,7 @@ func (impl JobHookImpl) Restore( } } - config, err := getRestoreWalConfig(ctx, backup, &recoveryObjectStore.Spec.Configuration) - if err != nil { - return nil, err - } + config := getRestoreWalConfig() contextLogger.Info("sending restore response", "config", config, "env", env) return &restore.RestoreResponse{ @@ -336,33 +333,17 @@ func (impl JobHookImpl) restoreCustomWalDir(ctx context.Context) (bool, error) { // getRestoreWalConfig obtains the content to append to `custom.conf` allowing PostgreSQL // to complete the WAL recovery from the object storage and then start // as a new primary -func getRestoreWalConfig( - ctx context.Context, - backup *cnpgv1.Backup, - barmanConfiguration *cnpgv1.BarmanObjectStoreConfiguration, -) (string, error) { - var err error - - cmd := []string{barmanCapabilities.BarmanCloudWalRestore} - if backup.Status.EndpointURL != "" { - cmd = append(cmd, "--endpoint-url", backup.Status.EndpointURL) - } - cmd = append(cmd, backup.Status.DestinationPath) - cmd = append(cmd, backup.Status.ServerName) - - cmd, err = barmanCommand.AppendCloudProviderOptionsFromConfiguration(ctx, cmd, barmanConfiguration) - if err != nil { - return "", err - } - - cmd = append(cmd, "%f", "%p") +func getRestoreWalConfig() string { + restoreCmd := fmt.Sprintf( + "/controller/manager wal-restore --log-destination %s/%s.json %%f %%p", + postgres.LogPath, postgres.LogFileName) recoveryFileContents := fmt.Sprintf( "recovery_target_action = promote\n"+ "restore_command = '%s'\n", - strings.Join(cmd, " ")) + restoreCmd) - return recoveryFileContents, nil + return recoveryFileContents } // loadBackupObjectFromExternalCluster generates an in-memory Backup structure given a reference to diff --git a/internal/cnpgi/restore/start.go b/internal/cnpgi/restore/start.go index da06988..2d5411f 100644 --- a/internal/cnpgi/restore/start.go +++ b/internal/cnpgi/restore/start.go @@ -2,20 +2,27 @@ package restore import ( "context" + "path" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job" + "github.com/cloudnative-pg/cnpg-i/pkg/wal" "google.golang.org/grpc" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/common" ) // CNPGI is the implementation of the PostgreSQL sidecar type CNPGI struct { PluginPath string SpoolDirectory string + BarmanObjectKey client.ObjectKey ClusterObjectKey client.ObjectKey Client client.Client PGDataPath string + InstanceName string + ServerName string } // Start starts the GRPC service @@ -24,6 +31,17 @@ func (c *CNPGI) Start(ctx context.Context) error { const PgWalVolumePgWalPath = "/var/lib/postgresql/wal/pg_wal" enrich := func(server *grpc.Server) error { + wal.RegisterWALServer(server, common.WALServiceImplementation{ + BarmanObjectKey: c.BarmanObjectKey, + ClusterObjectKey: c.ClusterObjectKey, + InstanceName: c.InstanceName, + Client: c.Client, + SpoolDirectory: c.SpoolDirectory, + PGDataPath: c.PGDataPath, + PGWALPath: path.Join(c.PGDataPath, "pg_wal"), + ServerName: c.ServerName, + }) + restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{ Client: c.Client, ClusterObjectKey: c.ClusterObjectKey,