From 1eeed9f7837675d4519b55ab34fec9e01f4a8f98 Mon Sep 17 00:00:00 2001 From: Simona Pencea Date: Tue, 25 Nov 2025 11:21:22 +0100 Subject: [PATCH] fix: throttle updates to lastwalarchtime to 5min Signed-off-by: Simona Pencea --- internal/cnpgi/common/wal.go | 69 +++++++++++++++++++++---- internal/cnpgi/instance/metrics_test.go | 12 ++++- internal/cnpgi/instance/start.go | 12 +++-- internal/cnpgi/restore/start.go | 12 +++-- 4 files changed, 82 insertions(+), 23 deletions(-) diff --git a/internal/cnpgi/common/wal.go b/internal/cnpgi/common/wal.go index 3649897..6aebf7e 100644 --- a/internal/cnpgi/common/wal.go +++ b/internal/cnpgi/common/wal.go @@ -25,6 +25,7 @@ import ( "fmt" "os" "path" + "sync" "time" "github.com/cloudnative-pg/barman-cloud/pkg/archiver" @@ -70,6 +71,11 @@ func (e *SpoolManagementError) Unwrap() error { return e.err } +const ( + // walStatusUpdateThrottle is the minimum time between status updates for WAL archiving + walStatusUpdateThrottle = 5 * time.Minute +) + // WALServiceImplementation is the implementation of the WAL Service type WALServiceImplementation struct { wal.UnimplementedWALServer @@ -78,6 +84,9 @@ type WALServiceImplementation struct { SpoolDirectory string PGDataPath string PGWALPath string + // LastStatusUpdate tracks the last time we updated the status for each ObjectStore+ServerName + // Key format: "namespace/objectStoreName/serverName" + LastStatusUpdate *sync.Map } // GetCapabilities implements the WALService interface @@ -105,6 +114,37 @@ func (w WALServiceImplementation) GetCapabilities( }, nil } +// shouldUpdateStatus checks if we should update the status based on the throttle. +// It returns true if walStatusUpdateThrottle minutes have passed since the last update, or if this is the first update. +func (w WALServiceImplementation) shouldUpdateStatus(objectStoreKey client.ObjectKey, serverName string) bool { + if w.LastStatusUpdate == nil { + return true + } + + key := fmt.Sprintf("%s/%s", objectStoreKey.String(), serverName) + lastUpdate, ok := w.LastStatusUpdate.Load(key) + if !ok { + return true + } + + lastUpdateTime, ok := lastUpdate.(time.Time) + if !ok { + return true + } + + return time.Since(lastUpdateTime) >= walStatusUpdateThrottle +} + +// recordStatusUpdate records that we just updated the status for a given ObjectStore and server. +func (w WALServiceImplementation) recordStatusUpdate(objectStoreKey client.ObjectKey, serverName string) { + if w.LastStatusUpdate == nil { + return + } + + key := fmt.Sprintf("%s/%s", objectStoreKey.String(), serverName) + w.LastStatusUpdate.Store(key, time.Now()) +} + // Archive implements the WALService interface func (w WALServiceImplementation) Archive( ctx context.Context, @@ -224,18 +264,25 @@ func (w WALServiceImplementation) Archive( } // Update the last archived WAL time in the ObjectStore status - contextLogger.Debug("Updating last archived WAL time", "serverName", configuration.ServerName) - if err := setLastArchivedWALTime( - ctx, - w.Client, - configuration.GetBarmanObjectKey(), - configuration.ServerName, - time.Now(), - ); err != nil { - // Log the error but don't fail the archive operation - contextLogger.Error(err, "Error updating last archived WAL time in ObjectStore status") + // Only update if walStatusUpdateThrottle minutes have passed since the last update to avoid hitting the API server too often + objectStoreKey := configuration.GetBarmanObjectKey() + if w.shouldUpdateStatus(objectStoreKey, configuration.ServerName) { + contextLogger.Debug("Updating last archived WAL time", "serverName", configuration.ServerName) + if err := setLastArchivedWALTime( + ctx, + w.Client, + objectStoreKey, + configuration.ServerName, + time.Now(), + ); err != nil { + // Log the error but don't fail the archive operation + contextLogger.Error(err, "Error updating last archived WAL time in ObjectStore status") + } else { + contextLogger.Debug("Successfully updated last archived WAL time") + w.recordStatusUpdate(objectStoreKey, configuration.ServerName) + } } else { - contextLogger.Debug("Successfully updated last archived WAL time") + contextLogger.Debug("Skipping status update due to throttle", "serverName", configuration.ServerName) } return &wal.WALArchiveResult{}, nil diff --git a/internal/cnpgi/instance/metrics_test.go b/internal/cnpgi/instance/metrics_test.go index 963332f..c362a21 100644 --- a/internal/cnpgi/instance/metrics_test.go +++ b/internal/cnpgi/instance/metrics_test.go @@ -22,10 +22,11 @@ package instance import ( "context" "encoding/json" + "time" + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" "k8s.io/utils/ptr" - "time" "github.com/cloudnative-pg/cnpg-i/pkg/metrics" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" @@ -117,7 +118,7 @@ var _ = Describe("Metrics Collect method", func() { res, err := m.Collect(ctx, req) Expect(err).ToNot(HaveOccurred()) Expect(res).ToNot(BeNil()) - Expect(res.Metrics).To(HaveLen(3)) + Expect(res.Metrics).To(HaveLen(4)) // Verify the metrics metricsMap := make(map[string]float64) @@ -131,6 +132,13 @@ var _ = Describe("Metrics Collect method", func() { expectedLastBackup, _ := metricsMap[lastAvailableBackupTimestampMetricName] Expect(expectedLastBackup).To(BeNumerically("~", float64(objectStore.Status.ServerRecoveryWindow[clusterName].LastSuccessfulBackupTime.Unix()), 1)) + + // Check that unset timestamps are 0 + expectedLastFailedBackup, _ := metricsMap[lastFailedBackupTimestampMetricName] + Expect(expectedLastFailedBackup).To(BeZero()) + + expectedLastArchivedWAL, _ := metricsMap[lastArchivedWALTimestampMetricName] + Expect(expectedLastArchivedWAL).To(BeZero()) }) It("should return an error if the object store is not found", func() { diff --git a/internal/cnpgi/instance/start.go b/internal/cnpgi/instance/start.go index b222653..ffb205b 100644 --- a/internal/cnpgi/instance/start.go +++ b/internal/cnpgi/instance/start.go @@ -21,6 +21,7 @@ package instance import ( "context" + "sync" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" "github.com/cloudnative-pg/cnpg-i/pkg/backup" @@ -47,11 +48,12 @@ type CNPGI struct { func (c *CNPGI) Start(ctx context.Context) error { enrich := func(server *grpc.Server) error { wal.RegisterWALServer(server, common.WALServiceImplementation{ - InstanceName: c.InstanceName, - Client: c.Client, - SpoolDirectory: c.SpoolDirectory, - PGDataPath: c.PGDataPath, - PGWALPath: c.PGWALPath, + InstanceName: c.InstanceName, + Client: c.Client, + SpoolDirectory: c.SpoolDirectory, + PGDataPath: c.PGDataPath, + PGWALPath: c.PGWALPath, + LastStatusUpdate: &sync.Map{}, }) backup.RegisterBackupServer(server, BackupServiceImplementation{ Client: c.Client, diff --git a/internal/cnpgi/restore/start.go b/internal/cnpgi/restore/start.go index efb7828..f377715 100644 --- a/internal/cnpgi/restore/start.go +++ b/internal/cnpgi/restore/start.go @@ -22,6 +22,7 @@ package restore import ( "context" "path" + "sync" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job" @@ -49,11 +50,12 @@ func (c *CNPGI) Start(ctx context.Context) error { enrich := func(server *grpc.Server) error { wal.RegisterWALServer(server, common.WALServiceImplementation{ - InstanceName: c.InstanceName, - Client: c.Client, - SpoolDirectory: c.SpoolDirectory, - PGDataPath: c.PGDataPath, - PGWALPath: path.Join(c.PGDataPath, "pg_wal"), + InstanceName: c.InstanceName, + Client: c.Client, + SpoolDirectory: c.SpoolDirectory, + PGDataPath: c.PGDataPath, + PGWALPath: path.Join(c.PGDataPath, "pg_wal"), + LastStatusUpdate: &sync.Map{}, }) restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{