fix: throttle updates to lastwalarchtime to 5min

Signed-off-by: Simona Pencea <simona@xata.io>
This commit is contained in:
Simona Pencea 2025-11-25 11:21:22 +01:00
parent 6ec77fb159
commit 1eeed9f783
4 changed files with 82 additions and 23 deletions

View File

@ -25,6 +25,7 @@ import (
"fmt" "fmt"
"os" "os"
"path" "path"
"sync"
"time" "time"
"github.com/cloudnative-pg/barman-cloud/pkg/archiver" "github.com/cloudnative-pg/barman-cloud/pkg/archiver"
@ -70,6 +71,11 @@ func (e *SpoolManagementError) Unwrap() error {
return e.err return e.err
} }
const (
// walStatusUpdateThrottle is the minimum time between status updates for WAL archiving
walStatusUpdateThrottle = 5 * time.Minute
)
// WALServiceImplementation is the implementation of the WAL Service // WALServiceImplementation is the implementation of the WAL Service
type WALServiceImplementation struct { type WALServiceImplementation struct {
wal.UnimplementedWALServer wal.UnimplementedWALServer
@ -78,6 +84,9 @@ type WALServiceImplementation struct {
SpoolDirectory string SpoolDirectory string
PGDataPath string PGDataPath string
PGWALPath string PGWALPath string
// LastStatusUpdate tracks the last time we updated the status for each ObjectStore+ServerName
// Key format: "namespace/objectStoreName/serverName"
LastStatusUpdate *sync.Map
} }
// GetCapabilities implements the WALService interface // GetCapabilities implements the WALService interface
@ -105,6 +114,37 @@ func (w WALServiceImplementation) GetCapabilities(
}, nil }, nil
} }
// shouldUpdateStatus checks if we should update the status based on the throttle.
// It returns true if walStatusUpdateThrottle minutes have passed since the last update, or if this is the first update.
func (w WALServiceImplementation) shouldUpdateStatus(objectStoreKey client.ObjectKey, serverName string) bool {
if w.LastStatusUpdate == nil {
return true
}
key := fmt.Sprintf("%s/%s", objectStoreKey.String(), serverName)
lastUpdate, ok := w.LastStatusUpdate.Load(key)
if !ok {
return true
}
lastUpdateTime, ok := lastUpdate.(time.Time)
if !ok {
return true
}
return time.Since(lastUpdateTime) >= walStatusUpdateThrottle
}
// recordStatusUpdate records that we just updated the status for a given ObjectStore and server.
func (w WALServiceImplementation) recordStatusUpdate(objectStoreKey client.ObjectKey, serverName string) {
if w.LastStatusUpdate == nil {
return
}
key := fmt.Sprintf("%s/%s", objectStoreKey.String(), serverName)
w.LastStatusUpdate.Store(key, time.Now())
}
// Archive implements the WALService interface // Archive implements the WALService interface
func (w WALServiceImplementation) Archive( func (w WALServiceImplementation) Archive(
ctx context.Context, ctx context.Context,
@ -224,18 +264,25 @@ func (w WALServiceImplementation) Archive(
} }
// Update the last archived WAL time in the ObjectStore status // Update the last archived WAL time in the ObjectStore status
contextLogger.Debug("Updating last archived WAL time", "serverName", configuration.ServerName) // Only update if walStatusUpdateThrottle minutes have passed since the last update to avoid hitting the API server too often
if err := setLastArchivedWALTime( objectStoreKey := configuration.GetBarmanObjectKey()
ctx, if w.shouldUpdateStatus(objectStoreKey, configuration.ServerName) {
w.Client, contextLogger.Debug("Updating last archived WAL time", "serverName", configuration.ServerName)
configuration.GetBarmanObjectKey(), if err := setLastArchivedWALTime(
configuration.ServerName, ctx,
time.Now(), w.Client,
); err != nil { objectStoreKey,
// Log the error but don't fail the archive operation configuration.ServerName,
contextLogger.Error(err, "Error updating last archived WAL time in ObjectStore status") time.Now(),
); err != nil {
// Log the error but don't fail the archive operation
contextLogger.Error(err, "Error updating last archived WAL time in ObjectStore status")
} else {
contextLogger.Debug("Successfully updated last archived WAL time")
w.recordStatusUpdate(objectStoreKey, configuration.ServerName)
}
} else { } else {
contextLogger.Debug("Successfully updated last archived WAL time") contextLogger.Debug("Skipping status update due to throttle", "serverName", configuration.ServerName)
} }
return &wal.WALArchiveResult{}, nil return &wal.WALArchiveResult{}, nil

View File

@ -22,10 +22,11 @@ package instance
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"time"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
"time"
"github.com/cloudnative-pg/cnpg-i/pkg/metrics" "github.com/cloudnative-pg/cnpg-i/pkg/metrics"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
@ -117,7 +118,7 @@ var _ = Describe("Metrics Collect method", func() {
res, err := m.Collect(ctx, req) res, err := m.Collect(ctx, req)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(res).ToNot(BeNil()) Expect(res).ToNot(BeNil())
Expect(res.Metrics).To(HaveLen(3)) Expect(res.Metrics).To(HaveLen(4))
// Verify the metrics // Verify the metrics
metricsMap := make(map[string]float64) metricsMap := make(map[string]float64)
@ -131,6 +132,13 @@ var _ = Describe("Metrics Collect method", func() {
expectedLastBackup, _ := metricsMap[lastAvailableBackupTimestampMetricName] expectedLastBackup, _ := metricsMap[lastAvailableBackupTimestampMetricName]
Expect(expectedLastBackup).To(BeNumerically("~", float64(objectStore.Status.ServerRecoveryWindow[clusterName].LastSuccessfulBackupTime.Unix()), 1)) Expect(expectedLastBackup).To(BeNumerically("~", float64(objectStore.Status.ServerRecoveryWindow[clusterName].LastSuccessfulBackupTime.Unix()), 1))
// Check that unset timestamps are 0
expectedLastFailedBackup, _ := metricsMap[lastFailedBackupTimestampMetricName]
Expect(expectedLastFailedBackup).To(BeZero())
expectedLastArchivedWAL, _ := metricsMap[lastArchivedWALTimestampMetricName]
Expect(expectedLastArchivedWAL).To(BeZero())
}) })
It("should return an error if the object store is not found", func() { It("should return an error if the object store is not found", func() {

View File

@ -21,6 +21,7 @@ package instance
import ( import (
"context" "context"
"sync"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http"
"github.com/cloudnative-pg/cnpg-i/pkg/backup" "github.com/cloudnative-pg/cnpg-i/pkg/backup"
@ -47,11 +48,12 @@ type CNPGI struct {
func (c *CNPGI) Start(ctx context.Context) error { func (c *CNPGI) Start(ctx context.Context) error {
enrich := func(server *grpc.Server) error { enrich := func(server *grpc.Server) error {
wal.RegisterWALServer(server, common.WALServiceImplementation{ wal.RegisterWALServer(server, common.WALServiceImplementation{
InstanceName: c.InstanceName, InstanceName: c.InstanceName,
Client: c.Client, Client: c.Client,
SpoolDirectory: c.SpoolDirectory, SpoolDirectory: c.SpoolDirectory,
PGDataPath: c.PGDataPath, PGDataPath: c.PGDataPath,
PGWALPath: c.PGWALPath, PGWALPath: c.PGWALPath,
LastStatusUpdate: &sync.Map{},
}) })
backup.RegisterBackupServer(server, BackupServiceImplementation{ backup.RegisterBackupServer(server, BackupServiceImplementation{
Client: c.Client, Client: c.Client,

View File

@ -22,6 +22,7 @@ package restore
import ( import (
"context" "context"
"path" "path"
"sync"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http"
restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job" restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job"
@ -49,11 +50,12 @@ func (c *CNPGI) Start(ctx context.Context) error {
enrich := func(server *grpc.Server) error { enrich := func(server *grpc.Server) error {
wal.RegisterWALServer(server, common.WALServiceImplementation{ wal.RegisterWALServer(server, common.WALServiceImplementation{
InstanceName: c.InstanceName, InstanceName: c.InstanceName,
Client: c.Client, Client: c.Client,
SpoolDirectory: c.SpoolDirectory, SpoolDirectory: c.SpoolDirectory,
PGDataPath: c.PGDataPath, PGDataPath: c.PGDataPath,
PGWALPath: path.Join(c.PGDataPath, "pg_wal"), PGWALPath: path.Join(c.PGDataPath, "pg_wal"),
LastStatusUpdate: &sync.Map{},
}) })
restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{ restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{