chore: review

Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
This commit is contained in:
Leonardo Cecchi 2024-11-26 12:24:51 +01:00 committed by Francesco Canovai
parent cacf3fdfcb
commit 56e41cd81f
11 changed files with 56 additions and 37 deletions

View File

@ -22,6 +22,7 @@ func NewCmd() *cobra.Command {
"cluster-name", "cluster-name",
"pod-name", "pod-name",
"spool-directory", "spool-directory",
"server-name",
} }
for _, k := range requiredSettings { for _, k := range requiredSettings {
@ -40,6 +41,7 @@ func NewCmd() *cobra.Command {
_ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("server-name", "SERVER_NAME")
return cmd return cmd
} }

View File

@ -23,6 +23,8 @@ func NewCmd() *cobra.Command {
"cluster-name", "cluster-name",
"pod-name", "pod-name",
"spool-directory", "spool-directory",
"barman-object-name",
"server-name",
} }
for _, k := range requiredSettings { for _, k := range requiredSettings {
@ -40,6 +42,8 @@ func NewCmd() *cobra.Command {
_ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
_ = viper.BindEnv("server-name", "SERVER_NAME")
return cmd return cmd
} }

View File

@ -24,6 +24,7 @@ import (
// WALServiceImplementation is the implementation of the WAL Service // WALServiceImplementation is the implementation of the WAL Service
type WALServiceImplementation struct { type WALServiceImplementation struct {
ServerName string
BarmanObjectKey client.ObjectKey BarmanObjectKey client.ObjectKey
ClusterObjectKey client.ObjectKey ClusterObjectKey client.ObjectKey
Client client.Client Client client.Client
@ -72,16 +73,6 @@ func (w WALServiceImplementation) Archive(
return nil, err return nil, err
} }
// TODO: refactor this code elsewhere
serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}
var objectStore barmancloudv1.ObjectStore var objectStore barmancloudv1.ObjectStore
if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil {
return nil, err return nil, err
@ -111,7 +102,7 @@ func (w WALServiceImplementation) Archive(
return nil, err return nil, err
} }
options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, serverName) options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, w.ServerName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -164,17 +155,7 @@ func (w WALServiceImplementation) Restore(
} }
env = MergeEnv(env, credentialsEnv) env = MergeEnv(env, credentialsEnv)
// TODO: refactor this code elsewhere options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, w.ServerName)
serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}
options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, serverName)
if err != nil { if err != nil {
return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err) return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err)
} }

View File

@ -31,6 +31,7 @@ type BackupServiceImplementation struct {
ClusterObjectKey client.ObjectKey ClusterObjectKey client.ObjectKey
Client client.Client Client client.Client
InstanceName string InstanceName string
ServerName string
backup.UnimplementedBackupServer backup.UnimplementedBackupServer
} }
@ -111,21 +112,12 @@ func (b BackupServiceImplementation) Backup(
return nil, err return nil, err
} }
serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}
backupName := fmt.Sprintf("backup-%v", pgTime.ToCompactISO8601(time.Now())) backupName := fmt.Sprintf("backup-%v", pgTime.ToCompactISO8601(time.Now()))
if err = backupCmd.Take( if err = backupCmd.Take(
ctx, ctx,
backupName, backupName,
serverName, b.ServerName,
env, env,
barmanCloudExecutor{}, barmanCloudExecutor{},
postgres.BackupTemporaryDirectory, postgres.BackupTemporaryDirectory,
@ -137,7 +129,7 @@ func (b BackupServiceImplementation) Backup(
executedBackupInfo, err := backupCmd.GetExecutedBackupInfo( executedBackupInfo, err := backupCmd.GetExecutedBackupInfo(
ctx, ctx,
backupName, backupName,
serverName, b.ServerName,
barmanCloudExecutor{}, barmanCloudExecutor{},
env) env)
if err != nil { if err != nil {

View File

@ -81,6 +81,7 @@ func Start(ctx context.Context) error {
Name: clusterName, Name: clusterName,
}, },
BarmanObjectKey: barmanObjectKey, BarmanObjectKey: barmanObjectKey,
ServerName: viper.GetString("server-name"),
InstanceName: podName, InstanceName: podName,
// TODO: improve // TODO: improve
PGDataPath: viper.GetString("pgdata"), PGDataPath: viper.GetString("pgdata"),

View File

@ -16,6 +16,7 @@ import (
type CNPGI struct { type CNPGI struct {
Client client.Client Client client.Client
BarmanObjectKey client.ObjectKey BarmanObjectKey client.ObjectKey
ServerName string
ClusterObjectKey client.ObjectKey ClusterObjectKey client.ObjectKey
PGDataPath string PGDataPath string
PGWALPath string PGWALPath string
@ -31,6 +32,7 @@ func (c *CNPGI) Start(ctx context.Context) error {
wal.RegisterWALServer(server, common.WALServiceImplementation{ wal.RegisterWALServer(server, common.WALServiceImplementation{
BarmanObjectKey: c.BarmanObjectKey, BarmanObjectKey: c.BarmanObjectKey,
ClusterObjectKey: c.ClusterObjectKey, ClusterObjectKey: c.ClusterObjectKey,
ServerName: c.ServerName,
InstanceName: c.InstanceName, InstanceName: c.InstanceName,
Client: c.Client, Client: c.Client,
SpoolDirectory: c.SpoolDirectory, SpoolDirectory: c.SpoolDirectory,
@ -40,6 +42,7 @@ func (c *CNPGI) Start(ctx context.Context) error {
backup.RegisterBackupServer(server, BackupServiceImplementation{ backup.RegisterBackupServer(server, BackupServiceImplementation{
Client: c.Client, Client: c.Client,
BarmanObjectKey: c.BarmanObjectKey, BarmanObjectKey: c.BarmanObjectKey,
ServerName: c.ServerName,
ClusterObjectKey: c.ClusterObjectKey, ClusterObjectKey: c.ClusterObjectKey,
InstanceName: c.InstanceName, InstanceName: c.InstanceName,
}) })

View File

@ -45,6 +45,7 @@ func (e *ConfigurationError) IsEmpty() bool {
// PluginConfiguration is the configuration of the plugin // PluginConfiguration is the configuration of the plugin
type PluginConfiguration struct { type PluginConfiguration struct {
BarmanObjectName string BarmanObjectName string
ServerName string
RecoveryBarmanObjectName string RecoveryBarmanObjectName string
RecoveryBarmanServerName string RecoveryBarmanServerName string
} }
@ -56,6 +57,15 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration {
metadata.PluginName, metadata.PluginName,
) )
serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}
recoveryServerName := "" recoveryServerName := ""
recoveryBarmanObjectName := "" recoveryBarmanObjectName := ""
@ -70,6 +80,7 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration {
result := &PluginConfiguration{ result := &PluginConfiguration{
// used for the backup/archive // used for the backup/archive
BarmanObjectName: helper.Parameters["barmanObjectName"], BarmanObjectName: helper.Parameters["barmanObjectName"],
ServerName: serverName,
// used for restore/wal_restore // used for restore/wal_restore
RecoveryBarmanServerName: recoveryServerName, RecoveryBarmanServerName: recoveryServerName,
RecoveryBarmanObjectName: recoveryBarmanObjectName, RecoveryBarmanObjectName: recoveryBarmanObjectName,

View File

@ -115,6 +115,13 @@ func reconcileJob(
return nil, nil return nil, nil
} }
// Since we're recoverying from an existing object store,
// we set our primary object store name to the recovery one.
// This won't be needed anymore when wal-restore will be able
// to check two object stores
pluginConfiguration.BarmanObjectName = pluginConfiguration.RecoveryBarmanObjectName
pluginConfiguration.ServerName = pluginConfiguration.RecoveryBarmanServerName
var job batchv1.Job var job batchv1.Job
if err := decoder.DecodeObject( if err := decoder.DecodeObject(
request.GetObjectDefinition(), request.GetObjectDefinition(),
@ -175,10 +182,14 @@ func reconcilePod(
mutatedPod := pod.DeepCopy() mutatedPod := pod.DeepCopy()
if err := reconcilePodSpec(pluginConfiguration, cluster, &mutatedPod.Spec, "postgres", corev1.Container{ if len(pluginConfiguration.BarmanObjectName) != 0 {
Args: []string{"instance"}, if err := reconcilePodSpec(pluginConfiguration, cluster, &mutatedPod.Spec, "postgres", corev1.Container{
}); err != nil { Args: []string{"instance"},
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err) }); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err)
}
} else {
contextLogger.Debug("No need to mutate instance with no backup & archiving confniguration")
} }
patch, err := object.CreatePatch(mutatedPod, pod) patch, err := object.CreatePatch(mutatedPod, pod)
@ -212,6 +223,10 @@ func reconcilePodSpec(
Name: "BARMAN_OBJECT_NAME", Name: "BARMAN_OBJECT_NAME",
Value: cfg.BarmanObjectName, Value: cfg.BarmanObjectName,
}, },
{
Name: "SERVER_NAME",
Value: cfg.ServerName,
},
{ {
// TODO: should we really use this one? // TODO: should we really use this one?
// should we mount an emptyDir volume just for that? // should we mount an emptyDir volume just for that?

View File

@ -38,6 +38,13 @@ func (i IdentityImplementation) GetPluginCapabilities(
}, },
}, },
}, },
{
Type: &identity.PluginCapability_Service_{
Service: &identity.PluginCapability_Service{
Type: identity.PluginCapability_Service_TYPE_WAL_SERVICE,
},
},
},
}, },
}, nil }, nil
} }

View File

@ -86,6 +86,7 @@ func Start(ctx context.Context) error {
Client: mgr.GetClient(), Client: mgr.GetClient(),
PGDataPath: viper.GetString("pgdata"), PGDataPath: viper.GetString("pgdata"),
InstanceName: viper.GetString("pod-name"), InstanceName: viper.GetString("pod-name"),
ServerName: viper.GetString("server-name"),
}); err != nil { }); err != nil {
setupLog.Error(err, "unable to create CNPGI runnable") setupLog.Error(err, "unable to create CNPGI runnable")
return err return err

View File

@ -22,6 +22,7 @@ type CNPGI struct {
Client client.Client Client client.Client
PGDataPath string PGDataPath string
InstanceName string InstanceName string
ServerName string
} }
// Start starts the GRPC service // Start starts the GRPC service
@ -38,6 +39,7 @@ func (c *CNPGI) Start(ctx context.Context) error {
SpoolDirectory: c.SpoolDirectory, SpoolDirectory: c.SpoolDirectory,
PGDataPath: c.PGDataPath, PGDataPath: c.PGDataPath,
PGWALPath: path.Join(c.PGDataPath, "pg_wal"), PGWALPath: path.Join(c.PGDataPath, "pg_wal"),
ServerName: c.ServerName,
}) })
restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{ restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{