chore: minor review for retention.go

Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
This commit is contained in:
Armando Ruocco 2025-03-10 13:50:41 +01:00 committed by Marco Nenciarini
parent 6aba7c7502
commit 027ddf1276
No known key found for this signature in database
GPG Key ID: 589F03F01BA55038
2 changed files with 18 additions and 41 deletions

View File

@ -73,7 +73,7 @@ func Start(ctx context.Context) error {
Namespace: namespace, Namespace: namespace,
Name: clusterName, Name: clusterName,
}, },
PodName: podName, CurrentPodName: podName,
}); err != nil { }); err != nil {
setupLog.Error(err, "unable to policy enforcement runnable") setupLog.Error(err, "unable to policy enforcement runnable")
return err return err

View File

@ -32,14 +32,10 @@ const defaultRetentionPolicyInterval = time.Minute * 5
// RetentionPolicyRunnable executes the retention policy described // RetentionPolicyRunnable executes the retention policy described
// in the BarmanObjectStore object periodically. // in the BarmanObjectStore object periodically.
type RetentionPolicyRunnable struct { type RetentionPolicyRunnable struct {
Client client.Client Client client.Client
Recorder record.EventRecorder Recorder record.EventRecorder
ClusterKey types.NamespacedName
// ClusterKey are the coordinates at which the cluster is stored CurrentPodName string
ClusterKey types.NamespacedName
// PodName is the current pod name
PodName string
} }
// Start enforce the backup retention policies periodically, using the // Start enforce the backup retention policies periodically, using the
@ -61,15 +57,11 @@ func (c *RetentionPolicyRunnable) Start(ctx context.Context) error {
// Wait before running another cycle // Wait before running another cycle
t := time.NewTimer(period) t := time.NewTimer(period)
defer func() { defer t.Stop()
t.Stop()
}()
select { select {
case <-ctx.Done(): case <-ctx.Done():
// The context was canceled
return nil return nil
case <-t.C: case <-t.C:
} }
} }
@ -107,32 +99,30 @@ func (c *RetentionPolicyRunnable) applyRetentionPolicy(
objectStore *barmancloudv1.ObjectStore, objectStore *barmancloudv1.ObjectStore,
) error { ) error {
contextLogger := log.FromContext(ctx) contextLogger := log.FromContext(ctx)
configuration := config.NewFromCluster(cluster) configuration := config.NewFromCluster(cluster)
retentionPolicy := objectStore.Spec.RetentionPolicy retentionPolicy := objectStore.Spec.RetentionPolicy
if len(retentionPolicy) == 0 { if len(retentionPolicy) == 0 {
contextLogger.Info("Skipping retention policy enforcement, no retention policy specified") contextLogger.Info("Skipping retention policy enforcement, no retention policy specified")
return nil return nil
} }
if cluster.Status.CurrentPrimary != c.PodName {
if cluster.Status.CurrentPrimary != c.CurrentPodName {
contextLogger.Info( contextLogger.Info(
"Skipping retention policy enforcement, not the current primary", "Skipping retention policy enforcement, not the current primary",
"currentPrimary", cluster.Status.CurrentPrimary, "podName", c.PodName) "currentPrimary", cluster.Status.CurrentPrimary, "podName", c.CurrentPodName)
return nil return nil
} }
contextLogger.Info("Applying backup retention policy", contextLogger.Info("Applying backup retention policy",
"retentionPolicy", retentionPolicy) "retentionPolicy", retentionPolicy)
osEnvironment := os.Environ()
caBundleEnvironment := common.GetRestoreCABundleEnv(&objectStore.Spec.Configuration)
env, err := barmanCredentials.EnvSetBackupCloudCredentials( env, err := barmanCredentials.EnvSetBackupCloudCredentials(
ctx, ctx,
c.Client, c.Client,
objectStore.Namespace, objectStore.Namespace,
&objectStore.Spec.Configuration, &objectStore.Spec.Configuration,
common.MergeEnv(osEnvironment, caBundleEnvironment)) common.MergeEnv(os.Environ(), common.GetRestoreCABundleEnv(&objectStore.Spec.Configuration)))
if err != nil { if err != nil {
contextLogger.Error(err, "while setting backup cloud credentials") contextLogger.Error(err, "while setting backup cloud credentials")
return err return err
@ -182,26 +172,20 @@ func (c *RetentionPolicyRunnable) updateRecoveryWindow(
if t == nil { if t == nil {
return nil return nil
} }
return ptr.To(metav1.NewTime(*t)) return ptr.To(metav1.NewTime(*t))
} }
firstRecoverabilityPoint := backupList.GetFirstRecoverabilityPoint()
lastSuccessfulBackupTime := backupList.GetLastSuccessfulBackupTime()
recoveryWindow := barmancloudv1.RecoveryWindow{ recoveryWindow := barmancloudv1.RecoveryWindow{
FirstRecoverabilityPoint: convertTime(firstRecoverabilityPoint), FirstRecoverabilityPoint: convertTime(backupList.GetFirstRecoverabilityPoint()),
LastSuccessfulBackupTime: convertTime(lastSuccessfulBackupTime), LastSuccessfulBackupTime: convertTime(backupList.GetLastSuccessfulBackupTime()),
} }
if objectStore.Status.ServerRecoveryWindow == nil { if objectStore.Status.ServerRecoveryWindow == nil {
objectStore.Status.ServerRecoveryWindow = make(map[string]barmancloudv1.RecoveryWindow) objectStore.Status.ServerRecoveryWindow = make(map[string]barmancloudv1.RecoveryWindow)
} }
objectStore.Status.ServerRecoveryWindow[serverName] = recoveryWindow objectStore.Status.ServerRecoveryWindow[serverName] = recoveryWindow
if err := c.Client.Status().Update(ctx, objectStore); err != nil {
return err
}
return nil return c.Client.Status().Update(ctx, objectStore)
} }
// deleteBackupsNotInCatalog deletes all Backup objects pointing to the given cluster that are not // deleteBackupsNotInCatalog deletes all Backup objects pointing to the given cluster that are not
@ -234,8 +218,7 @@ func deleteBackupsNotInCatalog(
// We chose to go with B // We chose to go with B
backups := cnpgv1.BackupList{} backups := cnpgv1.BackupList{}
err := cli.List(ctx, &backups, client.InNamespace(cluster.GetNamespace())) if err := cli.List(ctx, &backups, client.InNamespace(cluster.GetNamespace())); err != nil {
if err != nil {
return fmt.Errorf("while getting backups: %w", err) return fmt.Errorf("while getting backups: %w", err)
} }
@ -250,8 +233,7 @@ func deleteBackupsNotInCatalog(
// here we could add further checks, e.g. if the backup is not found but would still // here we could add further checks, e.g. if the backup is not found but would still
// be in the retention policy we could either not delete it or update it is status // be in the retention policy we could either not delete it or update it is status
if !slices.Contains(backupIDs, backup.Status.BackupID) { if !slices.Contains(backupIDs, backup.Status.BackupID) {
err := cli.Delete(ctx, &backups.Items[id]) if err := cli.Delete(ctx, &backups.Items[id]); err != nil {
if err != nil {
errors = append(errors, fmt.Errorf( errors = append(errors, fmt.Errorf(
"while deleting backup %s/%s: %w", "while deleting backup %s/%s: %w",
backup.Namespace, backup.Namespace,
@ -262,7 +244,7 @@ func deleteBackupsNotInCatalog(
} }
} }
if errors != nil { if len(errors) > 0 {
return fmt.Errorf("got errors while deleting Backups not in the cluster: %v", errors) return fmt.Errorf("got errors while deleting Backups not in the cluster: %v", errors)
} }
@ -271,15 +253,10 @@ func deleteBackupsNotInCatalog(
// useSameBackupLocation checks whether the given backup was taken using the same configuration as provided // useSameBackupLocation checks whether the given backup was taken using the same configuration as provided
func useSameBackupLocation(backup *cnpgv1.BackupStatus, cluster *cnpgv1.Cluster) bool { func useSameBackupLocation(backup *cnpgv1.BackupStatus, cluster *cnpgv1.Cluster) bool {
if cluster.Spec.Backup == nil { if cluster.Spec.Backup == nil || backup.Method != cnpgv1.BackupMethodPlugin {
return false
}
if backup.Method != cnpgv1.BackupMethodPlugin {
return false return false
} }
meta := newBackupResultMetadataFromMap(backup.PluginMetadata) meta := newBackupResultMetadataFromMap(backup.PluginMetadata)
return meta.clusterUID == string(cluster.UID) && meta.pluginName == metadata.PluginName return meta.clusterUID == string(cluster.UID) && meta.pluginName == metadata.PluginName
} }