fix: ensure restore configuration points to manager wal-restore (#68)

Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
Signed-off-by: Francesco Canovai <francesco.canovai@enterprisedb.com>
Co-authored-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
Co-authored-by: Francesco Canovai <francesco.canovai@enterprisedb.com>
This commit is contained in:
Armando Ruocco 2024-11-28 14:04:50 +01:00 committed by GitHub
parent 74d4f5d190
commit afd4603023
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 104 additions and 72 deletions

View File

@ -22,6 +22,7 @@ func NewCmd() *cobra.Command {
"cluster-name", "cluster-name",
"pod-name", "pod-name",
"spool-directory", "spool-directory",
"server-name",
} }
for _, k := range requiredSettings { for _, k := range requiredSettings {
@ -40,6 +41,7 @@ func NewCmd() *cobra.Command {
_ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("server-name", "SERVER_NAME")
return cmd return cmd
} }

View File

@ -23,6 +23,8 @@ func NewCmd() *cobra.Command {
"cluster-name", "cluster-name",
"pod-name", "pod-name",
"spool-directory", "spool-directory",
"barman-object-name",
"server-name",
} }
for _, k := range requiredSettings { for _, k := range requiredSettings {
@ -40,6 +42,8 @@ func NewCmd() *cobra.Command {
_ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
_ = viper.BindEnv("server-name", "SERVER_NAME")
return cmd return cmd
} }

View File

@ -1,4 +1,4 @@
package instance package common
import ( import (
"context" "context"
@ -19,12 +19,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/common"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
) )
// WALServiceImplementation is the implementation of the WAL Service // WALServiceImplementation is the implementation of the WAL Service
type WALServiceImplementation struct { type WALServiceImplementation struct {
ServerName string
BarmanObjectKey client.ObjectKey BarmanObjectKey client.ObjectKey
ClusterObjectKey client.ObjectKey ClusterObjectKey client.ObjectKey
Client client.Client Client client.Client
@ -73,16 +73,6 @@ func (w WALServiceImplementation) Archive(
return nil, err return nil, err
} }
// TODO: refactor this code elsewhere
serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}
var objectStore barmancloudv1.ObjectStore var objectStore barmancloudv1.ObjectStore
if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil {
return nil, err return nil, err
@ -112,7 +102,7 @@ func (w WALServiceImplementation) Archive(
return nil, err return nil, err
} }
options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, serverName) options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, w.ServerName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -152,7 +142,7 @@ func (w WALServiceImplementation) Restore(
barmanConfiguration := &objectStore.Spec.Configuration barmanConfiguration := &objectStore.Spec.Configuration
env := common.GetRestoreCABundleEnv(barmanConfiguration) env := GetRestoreCABundleEnv(barmanConfiguration)
credentialsEnv, err := barmanCredentials.EnvSetBackupCloudCredentials( credentialsEnv, err := barmanCredentials.EnvSetBackupCloudCredentials(
ctx, ctx,
w.Client, w.Client,
@ -163,19 +153,9 @@ func (w WALServiceImplementation) Restore(
if err != nil { if err != nil {
return nil, fmt.Errorf("while getting recover credentials: %w", err) return nil, fmt.Errorf("while getting recover credentials: %w", err)
} }
env = common.MergeEnv(env, credentialsEnv) env = MergeEnv(env, credentialsEnv)
// TODO: refactor this code elsewhere options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, w.ServerName)
serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}
options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, serverName)
if err != nil { if err != nil {
return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err) return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err)
} }

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package instance package common
import ( import (
"errors" "errors"

View File

@ -31,6 +31,7 @@ type BackupServiceImplementation struct {
ClusterObjectKey client.ObjectKey ClusterObjectKey client.ObjectKey
Client client.Client Client client.Client
InstanceName string InstanceName string
ServerName string
backup.UnimplementedBackupServer backup.UnimplementedBackupServer
} }
@ -111,21 +112,12 @@ func (b BackupServiceImplementation) Backup(
return nil, err return nil, err
} }
serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}
backupName := fmt.Sprintf("backup-%v", pgTime.ToCompactISO8601(time.Now())) backupName := fmt.Sprintf("backup-%v", pgTime.ToCompactISO8601(time.Now()))
if err = backupCmd.Take( if err = backupCmd.Take(
ctx, ctx,
backupName, backupName,
serverName, b.ServerName,
env, env,
barmanCloudExecutor{}, barmanCloudExecutor{},
postgres.BackupTemporaryDirectory, postgres.BackupTemporaryDirectory,
@ -137,7 +129,7 @@ func (b BackupServiceImplementation) Backup(
executedBackupInfo, err := backupCmd.GetExecutedBackupInfo( executedBackupInfo, err := backupCmd.GetExecutedBackupInfo(
ctx, ctx,
backupName, backupName,
serverName, b.ServerName,
barmanCloudExecutor{}, barmanCloudExecutor{},
env) env)
if err != nil { if err != nil {

View File

@ -81,6 +81,7 @@ func Start(ctx context.Context) error {
Name: clusterName, Name: clusterName,
}, },
BarmanObjectKey: barmanObjectKey, BarmanObjectKey: barmanObjectKey,
ServerName: viper.GetString("server-name"),
InstanceName: podName, InstanceName: podName,
// TODO: improve // TODO: improve
PGDataPath: viper.GetString("pgdata"), PGDataPath: viper.GetString("pgdata"),

View File

@ -8,12 +8,15 @@ import (
"github.com/cloudnative-pg/cnpg-i/pkg/wal" "github.com/cloudnative-pg/cnpg-i/pkg/wal"
"google.golang.org/grpc" "google.golang.org/grpc"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/common"
) )
// CNPGI is the implementation of the PostgreSQL sidecar // CNPGI is the implementation of the PostgreSQL sidecar
type CNPGI struct { type CNPGI struct {
Client client.Client Client client.Client
BarmanObjectKey client.ObjectKey BarmanObjectKey client.ObjectKey
ServerName string
ClusterObjectKey client.ObjectKey ClusterObjectKey client.ObjectKey
PGDataPath string PGDataPath string
PGWALPath string PGWALPath string
@ -26,9 +29,10 @@ type CNPGI struct {
// Start starts the GRPC service // Start starts the GRPC service
func (c *CNPGI) Start(ctx context.Context) error { func (c *CNPGI) Start(ctx context.Context) error {
enrich := func(server *grpc.Server) error { enrich := func(server *grpc.Server) error {
wal.RegisterWALServer(server, WALServiceImplementation{ wal.RegisterWALServer(server, common.WALServiceImplementation{
BarmanObjectKey: c.BarmanObjectKey, BarmanObjectKey: c.BarmanObjectKey,
ClusterObjectKey: c.ClusterObjectKey, ClusterObjectKey: c.ClusterObjectKey,
ServerName: c.ServerName,
InstanceName: c.InstanceName, InstanceName: c.InstanceName,
Client: c.Client, Client: c.Client,
SpoolDirectory: c.SpoolDirectory, SpoolDirectory: c.SpoolDirectory,
@ -38,6 +42,7 @@ func (c *CNPGI) Start(ctx context.Context) error {
backup.RegisterBackupServer(server, BackupServiceImplementation{ backup.RegisterBackupServer(server, BackupServiceImplementation{
Client: c.Client, Client: c.Client,
BarmanObjectKey: c.BarmanObjectKey, BarmanObjectKey: c.BarmanObjectKey,
ServerName: c.ServerName,
ClusterObjectKey: c.ClusterObjectKey, ClusterObjectKey: c.ClusterObjectKey,
InstanceName: c.InstanceName, InstanceName: c.InstanceName,
}) })

View File

@ -45,6 +45,7 @@ func (e *ConfigurationError) IsEmpty() bool {
// PluginConfiguration is the configuration of the plugin // PluginConfiguration is the configuration of the plugin
type PluginConfiguration struct { type PluginConfiguration struct {
BarmanObjectName string BarmanObjectName string
ServerName string
RecoveryBarmanObjectName string RecoveryBarmanObjectName string
RecoveryBarmanServerName string RecoveryBarmanServerName string
} }
@ -56,6 +57,15 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration {
metadata.PluginName, metadata.PluginName,
) )
serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}
recoveryServerName := "" recoveryServerName := ""
recoveryBarmanObjectName := "" recoveryBarmanObjectName := ""
@ -70,6 +80,7 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration {
result := &PluginConfiguration{ result := &PluginConfiguration{
// used for the backup/archive // used for the backup/archive
BarmanObjectName: helper.Parameters["barmanObjectName"], BarmanObjectName: helper.Parameters["barmanObjectName"],
ServerName: serverName,
// used for restore/wal_restore // used for restore/wal_restore
RecoveryBarmanServerName: recoveryServerName, RecoveryBarmanServerName: recoveryServerName,
RecoveryBarmanObjectName: recoveryBarmanObjectName, RecoveryBarmanObjectName: recoveryBarmanObjectName,

View File

@ -115,6 +115,13 @@ func reconcileJob(
return nil, nil return nil, nil
} }
// Since we're recovering from an existing object store,
// we set our primary object store name to the recovery one.
// This won't be needed anymore when wal-restore will be able
// to check two object stores
pluginConfiguration.BarmanObjectName = pluginConfiguration.RecoveryBarmanObjectName
pluginConfiguration.ServerName = pluginConfiguration.RecoveryBarmanServerName
var job batchv1.Job var job batchv1.Job
if err := decoder.DecodeObject( if err := decoder.DecodeObject(
request.GetObjectDefinition(), request.GetObjectDefinition(),
@ -175,10 +182,14 @@ func reconcilePod(
mutatedPod := pod.DeepCopy() mutatedPod := pod.DeepCopy()
if err := reconcilePodSpec(pluginConfiguration, cluster, &mutatedPod.Spec, "postgres", corev1.Container{ if len(pluginConfiguration.BarmanObjectName) != 0 {
Args: []string{"instance"}, if err := reconcilePodSpec(pluginConfiguration, cluster, &mutatedPod.Spec, "postgres", corev1.Container{
}); err != nil { Args: []string{"instance"},
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err) }); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err)
}
} else {
contextLogger.Debug("No need to mutate instance with no backup & archiving configuration")
} }
patch, err := object.CreatePatch(mutatedPod, pod) patch, err := object.CreatePatch(mutatedPod, pod)
@ -212,6 +223,10 @@ func reconcilePodSpec(
Name: "BARMAN_OBJECT_NAME", Name: "BARMAN_OBJECT_NAME",
Value: cfg.BarmanObjectName, Value: cfg.BarmanObjectName,
}, },
{
Name: "SERVER_NAME",
Value: cfg.ServerName,
},
{ {
// TODO: should we really use this one? // TODO: should we really use this one?
// should we mount an emptyDir volume just for that? // should we mount an emptyDir volume just for that?

View File

@ -38,6 +38,13 @@ func (i IdentityImplementation) GetPluginCapabilities(
}, },
}, },
}, },
{
Type: &identity.PluginCapability_Service_{
Service: &identity.PluginCapability_Service{
Type: identity.PluginCapability_Service_TYPE_WAL_SERVICE,
},
},
},
}, },
}, nil }, nil
} }

View File

@ -33,6 +33,7 @@ func Start(ctx context.Context) error {
setupLog.Info("Starting barman cloud instance plugin") setupLog.Info("Starting barman cloud instance plugin")
namespace := viper.GetString("namespace") namespace := viper.GetString("namespace")
clusterName := viper.GetString("cluster-name") clusterName := viper.GetString("cluster-name")
boName := viper.GetString("barman-object-name")
objs := map[client.Object]cache.ByObject{ objs := map[client.Object]cache.ByObject{
&cnpgv1.Cluster{}: { &cnpgv1.Cluster{}: {
@ -43,6 +44,15 @@ func Start(ctx context.Context) error {
}, },
} }
if boName != "" {
objs[&barmancloudv1.ObjectStore{}] = cache.ByObject{
Field: fields.OneTermEqualSelector("metadata.name", boName),
Namespaces: map[string]cache.Config{
namespace: {},
},
}
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme, Scheme: scheme,
Cache: cache.Options{ Cache: cache.Options{
@ -65,12 +75,18 @@ func Start(ctx context.Context) error {
if err := mgr.Add(&CNPGI{ if err := mgr.Add(&CNPGI{
PluginPath: viper.GetString("plugin-path"), PluginPath: viper.GetString("plugin-path"),
SpoolDirectory: viper.GetString("spool-directory"), SpoolDirectory: viper.GetString("spool-directory"),
BarmanObjectKey: client.ObjectKey{
Namespace: namespace,
Name: boName,
},
ClusterObjectKey: client.ObjectKey{ ClusterObjectKey: client.ObjectKey{
Namespace: namespace, Namespace: namespace,
Name: clusterName, Name: clusterName,
}, },
Client: mgr.GetClient(), Client: mgr.GetClient(),
PGDataPath: viper.GetString("pgdata"), PGDataPath: viper.GetString("pgdata"),
InstanceName: viper.GetString("pod-name"),
ServerName: viper.GetString("server-name"),
}); err != nil { }); err != nil {
setupLog.Error(err, "unable to create CNPGI runnable") setupLog.Error(err, "unable to create CNPGI runnable")
return err return err

View File

@ -7,7 +7,6 @@ import (
"os" "os"
"os/exec" "os/exec"
"path" "path"
"strings"
"github.com/cloudnative-pg/barman-cloud/pkg/api" "github.com/cloudnative-pg/barman-cloud/pkg/api"
barmanArchiver "github.com/cloudnative-pg/barman-cloud/pkg/archiver" barmanArchiver "github.com/cloudnative-pg/barman-cloud/pkg/archiver"
@ -17,6 +16,7 @@ import (
barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials" barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials"
barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer" barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/pkg/postgres"
"github.com/cloudnative-pg/cloudnative-pg/pkg/utils" "github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder"
restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job" restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job"
@ -152,10 +152,7 @@ func (impl JobHookImpl) Restore(
} }
} }
config, err := getRestoreWalConfig(ctx, backup, &recoveryObjectStore.Spec.Configuration) config := getRestoreWalConfig()
if err != nil {
return nil, err
}
contextLogger.Info("sending restore response", "config", config, "env", env) contextLogger.Info("sending restore response", "config", config, "env", env)
return &restore.RestoreResponse{ return &restore.RestoreResponse{
@ -336,33 +333,17 @@ func (impl JobHookImpl) restoreCustomWalDir(ctx context.Context) (bool, error) {
// getRestoreWalConfig obtains the content to append to `custom.conf` allowing PostgreSQL // getRestoreWalConfig obtains the content to append to `custom.conf` allowing PostgreSQL
// to complete the WAL recovery from the object storage and then start // to complete the WAL recovery from the object storage and then start
// as a new primary // as a new primary
func getRestoreWalConfig( func getRestoreWalConfig() string {
ctx context.Context, restoreCmd := fmt.Sprintf(
backup *cnpgv1.Backup, "/controller/manager wal-restore --log-destination %s/%s.json %%f %%p",
barmanConfiguration *cnpgv1.BarmanObjectStoreConfiguration, postgres.LogPath, postgres.LogFileName)
) (string, error) {
var err error
cmd := []string{barmanCapabilities.BarmanCloudWalRestore}
if backup.Status.EndpointURL != "" {
cmd = append(cmd, "--endpoint-url", backup.Status.EndpointURL)
}
cmd = append(cmd, backup.Status.DestinationPath)
cmd = append(cmd, backup.Status.ServerName)
cmd, err = barmanCommand.AppendCloudProviderOptionsFromConfiguration(ctx, cmd, barmanConfiguration)
if err != nil {
return "", err
}
cmd = append(cmd, "%f", "%p")
recoveryFileContents := fmt.Sprintf( recoveryFileContents := fmt.Sprintf(
"recovery_target_action = promote\n"+ "recovery_target_action = promote\n"+
"restore_command = '%s'\n", "restore_command = '%s'\n",
strings.Join(cmd, " ")) restoreCmd)
return recoveryFileContents, nil return recoveryFileContents
} }
// loadBackupObjectFromExternalCluster generates an in-memory Backup structure given a reference to // loadBackupObjectFromExternalCluster generates an in-memory Backup structure given a reference to

View File

@ -2,20 +2,27 @@ package restore
import ( import (
"context" "context"
"path"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http"
restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job" restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job"
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
"google.golang.org/grpc" "google.golang.org/grpc"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/common"
) )
// CNPGI is the implementation of the PostgreSQL sidecar // CNPGI is the implementation of the PostgreSQL sidecar
type CNPGI struct { type CNPGI struct {
PluginPath string PluginPath string
SpoolDirectory string SpoolDirectory string
BarmanObjectKey client.ObjectKey
ClusterObjectKey client.ObjectKey ClusterObjectKey client.ObjectKey
Client client.Client Client client.Client
PGDataPath string PGDataPath string
InstanceName string
ServerName string
} }
// Start starts the GRPC service // Start starts the GRPC service
@ -24,6 +31,17 @@ func (c *CNPGI) Start(ctx context.Context) error {
const PgWalVolumePgWalPath = "/var/lib/postgresql/wal/pg_wal" const PgWalVolumePgWalPath = "/var/lib/postgresql/wal/pg_wal"
enrich := func(server *grpc.Server) error { enrich := func(server *grpc.Server) error {
wal.RegisterWALServer(server, common.WALServiceImplementation{
BarmanObjectKey: c.BarmanObjectKey,
ClusterObjectKey: c.ClusterObjectKey,
InstanceName: c.InstanceName,
Client: c.Client,
SpoolDirectory: c.SpoolDirectory,
PGDataPath: c.PGDataPath,
PGWALPath: path.Join(c.PGDataPath, "pg_wal"),
ServerName: c.ServerName,
})
restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{ restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{
Client: c.Client, Client: c.Client,
ClusterObjectKey: c.ClusterObjectKey, ClusterObjectKey: c.ClusterObjectKey,