mirror of
https://github.com/cloudnative-pg/plugin-barman-cloud.git
synced 2026-03-10 12:42:20 +01:00
Compare commits
5 Commits
74b4ce9ea2
...
9aa1b5da3a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9aa1b5da3a | ||
|
|
5bc006b035 | ||
|
|
97675a7685 | ||
|
|
1eeed9f783 | ||
|
|
6ec77fb159 |
2
Makefile
2
Makefile
@ -159,7 +159,7 @@ GOLANGCI_LINT = $(LOCALBIN)/golangci-lint
|
||||
|
||||
## Tool Versions
|
||||
KUSTOMIZE_VERSION ?= v5.4.3
|
||||
CONTROLLER_TOOLS_VERSION ?= v0.16.1
|
||||
CONTROLLER_TOOLS_VERSION ?= v0.19.0
|
||||
ENVTEST_VERSION ?= release-0.19
|
||||
GOLANGCI_LINT_VERSION ?= v1.64.8
|
||||
|
||||
|
||||
@ -94,6 +94,9 @@ type RecoveryWindow struct {
|
||||
|
||||
// The last failed backup time
|
||||
LastFailedBackupTime *metav1.Time `json:"lastFailedBackupTime,omitempty"`
|
||||
|
||||
// The last time a WAL file was successfully archived by this plugin
|
||||
LastArchivedWALTime *metav1.Time `json:"lastArchivedWALTime,omitempty"`
|
||||
}
|
||||
|
||||
// +kubebuilder:object:root=true
|
||||
|
||||
@ -169,6 +169,10 @@ func (in *RecoveryWindow) DeepCopyInto(out *RecoveryWindow) {
|
||||
in, out := &in.LastFailedBackupTime, &out.LastFailedBackupTime
|
||||
*out = (*in).DeepCopy()
|
||||
}
|
||||
if in.LastArchivedWALTime != nil {
|
||||
in, out := &in.LastArchivedWALTime, &out.LastArchivedWALTime
|
||||
*out = (*in).DeepCopy()
|
||||
}
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RecoveryWindow.
|
||||
|
||||
@ -676,6 +676,11 @@ spec:
|
||||
restored.
|
||||
format: date-time
|
||||
type: string
|
||||
lastArchivedWALTime:
|
||||
description: The last time a WAL file was successfully archived
|
||||
by this plugin
|
||||
format: date-time
|
||||
type: string
|
||||
lastFailedBackupTime:
|
||||
description: The last failed backup time
|
||||
format: date-time
|
||||
|
||||
@ -25,6 +25,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cloudnative-pg/barman-cloud/pkg/archiver"
|
||||
@ -38,7 +39,10 @@ import (
|
||||
walUtils "github.com/cloudnative-pg/machinery/pkg/fileutils/wals"
|
||||
"github.com/cloudnative-pg/machinery/pkg/log"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/utils/ptr"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
|
||||
@ -67,6 +71,11 @@ func (e *SpoolManagementError) Unwrap() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
const (
|
||||
// walStatusUpdateThrottle is the minimum time between status updates for WAL archiving
|
||||
walStatusUpdateThrottle = 5 * time.Minute
|
||||
)
|
||||
|
||||
// WALServiceImplementation is the implementation of the WAL Service
|
||||
type WALServiceImplementation struct {
|
||||
wal.UnimplementedWALServer
|
||||
@ -75,6 +84,9 @@ type WALServiceImplementation struct {
|
||||
SpoolDirectory string
|
||||
PGDataPath string
|
||||
PGWALPath string
|
||||
// LastStatusUpdate tracks the last time we updated the status for each ObjectStore+ServerName
|
||||
// Key format: "namespace/objectStoreName/serverName"
|
||||
LastStatusUpdate *sync.Map
|
||||
}
|
||||
|
||||
// GetCapabilities implements the WALService interface
|
||||
@ -102,6 +114,37 @@ func (w WALServiceImplementation) GetCapabilities(
|
||||
}, nil
|
||||
}
|
||||
|
||||
// shouldUpdateStatus checks if we should update the status based on the throttle.
|
||||
// It returns true if walStatusUpdateThrottle minutes have passed since the last update, or if this is the first update.
|
||||
func (w WALServiceImplementation) shouldUpdateStatus(objectStoreKey client.ObjectKey, serverName string) bool {
|
||||
if w.LastStatusUpdate == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s/%s", objectStoreKey.String(), serverName)
|
||||
lastUpdate, ok := w.LastStatusUpdate.Load(key)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
lastUpdateTime, ok := lastUpdate.(time.Time)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
return time.Since(lastUpdateTime) >= walStatusUpdateThrottle
|
||||
}
|
||||
|
||||
// recordStatusUpdate records that we just updated the status for a given ObjectStore and server.
|
||||
func (w WALServiceImplementation) recordStatusUpdate(objectStoreKey client.ObjectKey, serverName string) {
|
||||
if w.LastStatusUpdate == nil {
|
||||
return
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s/%s", objectStoreKey.String(), serverName)
|
||||
w.LastStatusUpdate.Store(key, time.Now())
|
||||
}
|
||||
|
||||
// Archive implements the WALService interface
|
||||
func (w WALServiceImplementation) Archive(
|
||||
ctx context.Context,
|
||||
@ -220,6 +263,28 @@ func (w WALServiceImplementation) Archive(
|
||||
}
|
||||
}
|
||||
|
||||
// Update the last archived WAL time in the ObjectStore status
|
||||
// Only update if walStatusUpdateThrottle minutes have passed since the last update to avoid hitting the API server too often
|
||||
objectStoreKey := configuration.GetBarmanObjectKey()
|
||||
if w.shouldUpdateStatus(objectStoreKey, configuration.ServerName) {
|
||||
contextLogger.Debug("Updating last archived WAL time", "serverName", configuration.ServerName)
|
||||
if err := setLastArchivedWALTime(
|
||||
ctx,
|
||||
w.Client,
|
||||
objectStoreKey,
|
||||
configuration.ServerName,
|
||||
time.Now(),
|
||||
); err != nil {
|
||||
// Log the error but don't fail the archive operation
|
||||
contextLogger.Error(err, "Error updating last archived WAL time in ObjectStore status")
|
||||
} else {
|
||||
contextLogger.Debug("Successfully updated last archived WAL time")
|
||||
w.recordStatusUpdate(objectStoreKey, configuration.ServerName)
|
||||
}
|
||||
} else {
|
||||
contextLogger.Debug("Skipping status update due to throttle", "serverName", configuration.ServerName)
|
||||
}
|
||||
|
||||
return &wal.WALArchiveResult{}, nil
|
||||
}
|
||||
|
||||
@ -509,3 +574,30 @@ func isEndOfWALStream(results []barmanRestorer.Result) bool {
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// SetLastArchivedWALTime sets the last archived WAL time in the
|
||||
// passed object store, for the passed server name.
|
||||
func setLastArchivedWALTime(
|
||||
ctx context.Context,
|
||||
c client.Client,
|
||||
objectStoreKey client.ObjectKey,
|
||||
serverName string,
|
||||
lastArchivedWALTime time.Time,
|
||||
) error {
|
||||
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
var objectStore barmancloudv1.ObjectStore
|
||||
|
||||
if err := c.Get(ctx, objectStoreKey, &objectStore); err != nil {
|
||||
return err
|
||||
}
|
||||
recoveryWindow := objectStore.Status.ServerRecoveryWindow[serverName]
|
||||
recoveryWindow.LastArchivedWALTime = ptr.To(metav1.NewTime(lastArchivedWALTime))
|
||||
|
||||
if objectStore.Status.ServerRecoveryWindow == nil {
|
||||
objectStore.Status.ServerRecoveryWindow = make(map[string]barmancloudv1.RecoveryWindow)
|
||||
}
|
||||
objectStore.Status.ServerRecoveryWindow[serverName] = recoveryWindow
|
||||
|
||||
return c.Status().Update(ctx, &objectStore)
|
||||
})
|
||||
}
|
||||
|
||||
@ -51,6 +51,7 @@ var (
|
||||
firstRecoverabilityPointMetricName = buildFqName("first_recoverability_point")
|
||||
lastAvailableBackupTimestampMetricName = buildFqName("last_available_backup_timestamp")
|
||||
lastFailedBackupTimestampMetricName = buildFqName("last_failed_backup_timestamp")
|
||||
lastArchivedWALTimestampMetricName = buildFqName("last_archived_wal_timestamp")
|
||||
)
|
||||
|
||||
func (m metricsImpl) GetCapabilities(
|
||||
@ -97,6 +98,11 @@ func (m metricsImpl) Define(
|
||||
Help: "The last failed backup as a unix timestamp",
|
||||
ValueType: &metrics.MetricType{Type: metrics.MetricType_TYPE_GAUGE},
|
||||
},
|
||||
{
|
||||
FqName: lastArchivedWALTimestampMetricName,
|
||||
Help: "The last archived WAL timestamp as a unix timestamp",
|
||||
ValueType: &metrics.MetricType{Type: metrics.MetricType_TYPE_GAUGE},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@ -136,6 +142,10 @@ func (m metricsImpl) Collect(
|
||||
FqName: lastFailedBackupTimestampMetricName,
|
||||
Value: 0,
|
||||
},
|
||||
{
|
||||
FqName: lastArchivedWALTimestampMetricName,
|
||||
Value: 0,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@ -143,6 +153,7 @@ func (m metricsImpl) Collect(
|
||||
var firstRecoverabilityPoint float64
|
||||
var lastAvailableBackup float64
|
||||
var lastFailedBackup float64
|
||||
var lastArchivedWAL float64
|
||||
if x.FirstRecoverabilityPoint != nil {
|
||||
firstRecoverabilityPoint = float64(x.FirstRecoverabilityPoint.Unix())
|
||||
}
|
||||
@ -152,6 +163,9 @@ func (m metricsImpl) Collect(
|
||||
if x.LastFailedBackupTime != nil {
|
||||
lastFailedBackup = float64(x.LastFailedBackupTime.Unix())
|
||||
}
|
||||
if x.LastArchivedWALTime != nil {
|
||||
lastArchivedWAL = float64(x.LastArchivedWALTime.Unix())
|
||||
}
|
||||
|
||||
return &metrics.CollectMetricsResult{
|
||||
Metrics: []*metrics.CollectMetric{
|
||||
@ -167,6 +181,10 @@ func (m metricsImpl) Collect(
|
||||
FqName: lastFailedBackupTimestampMetricName,
|
||||
Value: lastFailedBackup,
|
||||
},
|
||||
{
|
||||
FqName: lastArchivedWALTimestampMetricName,
|
||||
Value: lastArchivedWAL,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -22,10 +22,11 @@ package instance
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
|
||||
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
|
||||
"k8s.io/utils/ptr"
|
||||
"time"
|
||||
|
||||
"github.com/cloudnative-pg/cnpg-i/pkg/metrics"
|
||||
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
|
||||
@ -117,7 +118,7 @@ var _ = Describe("Metrics Collect method", func() {
|
||||
res, err := m.Collect(ctx, req)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(res).ToNot(BeNil())
|
||||
Expect(res.Metrics).To(HaveLen(3))
|
||||
Expect(res.Metrics).To(HaveLen(4))
|
||||
|
||||
// Verify the metrics
|
||||
metricsMap := make(map[string]float64)
|
||||
@ -131,6 +132,13 @@ var _ = Describe("Metrics Collect method", func() {
|
||||
|
||||
expectedLastBackup, _ := metricsMap[lastAvailableBackupTimestampMetricName]
|
||||
Expect(expectedLastBackup).To(BeNumerically("~", float64(objectStore.Status.ServerRecoveryWindow[clusterName].LastSuccessfulBackupTime.Unix()), 1))
|
||||
|
||||
// Check that unset timestamps are 0
|
||||
expectedLastFailedBackup, _ := metricsMap[lastFailedBackupTimestampMetricName]
|
||||
Expect(expectedLastFailedBackup).To(BeZero())
|
||||
|
||||
expectedLastArchivedWAL, _ := metricsMap[lastArchivedWALTimestampMetricName]
|
||||
Expect(expectedLastArchivedWAL).To(BeZero())
|
||||
})
|
||||
|
||||
It("should return an error if the object store is not found", func() {
|
||||
|
||||
@ -21,6 +21,7 @@ package instance
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http"
|
||||
"github.com/cloudnative-pg/cnpg-i/pkg/backup"
|
||||
@ -47,11 +48,12 @@ type CNPGI struct {
|
||||
func (c *CNPGI) Start(ctx context.Context) error {
|
||||
enrich := func(server *grpc.Server) error {
|
||||
wal.RegisterWALServer(server, common.WALServiceImplementation{
|
||||
InstanceName: c.InstanceName,
|
||||
Client: c.Client,
|
||||
SpoolDirectory: c.SpoolDirectory,
|
||||
PGDataPath: c.PGDataPath,
|
||||
PGWALPath: c.PGWALPath,
|
||||
InstanceName: c.InstanceName,
|
||||
Client: c.Client,
|
||||
SpoolDirectory: c.SpoolDirectory,
|
||||
PGDataPath: c.PGDataPath,
|
||||
PGWALPath: c.PGWALPath,
|
||||
LastStatusUpdate: &sync.Map{},
|
||||
})
|
||||
backup.RegisterBackupServer(server, BackupServiceImplementation{
|
||||
Client: c.Client,
|
||||
|
||||
@ -22,6 +22,7 @@ package restore
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http"
|
||||
restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job"
|
||||
@ -49,11 +50,12 @@ func (c *CNPGI) Start(ctx context.Context) error {
|
||||
|
||||
enrich := func(server *grpc.Server) error {
|
||||
wal.RegisterWALServer(server, common.WALServiceImplementation{
|
||||
InstanceName: c.InstanceName,
|
||||
Client: c.Client,
|
||||
SpoolDirectory: c.SpoolDirectory,
|
||||
PGDataPath: c.PGDataPath,
|
||||
PGWALPath: path.Join(c.PGDataPath, "pg_wal"),
|
||||
InstanceName: c.InstanceName,
|
||||
Client: c.Client,
|
||||
SpoolDirectory: c.SpoolDirectory,
|
||||
PGDataPath: c.PGDataPath,
|
||||
PGWALPath: path.Join(c.PGDataPath, "pg_wal"),
|
||||
LastStatusUpdate: &sync.Map{},
|
||||
})
|
||||
|
||||
restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{
|
||||
|
||||
@ -11,6 +11,17 @@
|
||||
],
|
||||
rebaseWhen: 'never',
|
||||
prConcurrentLimit: 5,
|
||||
// Override default ignorePaths to scan test/e2e for emulator image dependencies
|
||||
// Removed: '**/test/**'
|
||||
ignorePaths: [
|
||||
'**/node_modules/**',
|
||||
'**/bower_components/**',
|
||||
'**/vendor/**',
|
||||
'**/examples/**',
|
||||
'**/__tests__/**',
|
||||
'**/tests/**',
|
||||
'**/__fixtures__/**',
|
||||
],
|
||||
lockFileMaintenance: {
|
||||
enabled: true,
|
||||
},
|
||||
@ -28,7 +39,7 @@
|
||||
{
|
||||
customType: 'regex',
|
||||
managerFilePatterns: [
|
||||
'/(^Taskfile\\.yml$)/',
|
||||
'/(^|/)Taskfile\\.yml$/',
|
||||
],
|
||||
matchStrings: [
|
||||
'# renovate: datasource=(?<datasource>[a-z-.]+?) depName=(?<depName>[^\\s]+?)(?: (?:lookupName|packageName)=(?<packageName>[^\\s]+?))?(?: versioning=(?<versioning>[^\\s]+?))?(?: extractVersion=(?<extractVersion>[^\\s]+?))?(?: currentValue=(?<currentValue>[^\\s]+?))?\\s+[A-Za-z0-9_]+?_SHA\\s*:\\s*["\']?(?<currentDigest>[a-f0-9]+?)["\']?\\s',
|
||||
@ -38,7 +49,16 @@
|
||||
{
|
||||
customType: 'regex',
|
||||
managerFilePatterns: [
|
||||
'/(^docs/config\\.yaml$)/',
|
||||
'/\\.go$/',
|
||||
],
|
||||
matchStrings: [
|
||||
'//\\s*renovate:\\s*datasource=(?<datasource>[a-z-.]+?)\\s+depName=(?<depName>[^\\s]+?)(?:\\s+versioning=(?<versioning>[^\\s]+?))?\\s*\\n\\s*//\\s*Version:\\s*(?<currentValue>[^\\s]+?)\\s*\\n\\s*Image:\\s*"[^@]+@(?<currentDigest>sha256:[a-f0-9]+)"',
|
||||
],
|
||||
},
|
||||
{
|
||||
customType: 'regex',
|
||||
managerFilePatterns: [
|
||||
'/(^|/)docs/config\\.yaml$/',
|
||||
],
|
||||
matchStrings: [
|
||||
'# renovate: datasource=(?<datasource>[a-z-.]+?) depName=(?<depName>[^\\s]+?)(?: (?:lookupName|packageName)=(?<packageName>[^\\s]+?))?(?: versioning=(?<versioning>[^\\s]+?))?(?: extractVersion=(?<extractVersion>[^\\s]+?))?\\s+kubernetesVersion:\\s*["\']?(?<currentValue>.+?)["\']?\\s',
|
||||
|
||||
@ -71,8 +71,15 @@ func newAzuriteDeployment(namespace, name string) *appsv1.Deployment {
|
||||
Containers: []corev1.Container{
|
||||
{
|
||||
Name: name,
|
||||
// TODO: renovate the image
|
||||
Image: "mcr.microsoft.com/azure-storage/azurite",
|
||||
// renovate: datasource=docker depName=mcr.microsoft.com/azure-storage/azurite versioning=docker
|
||||
// Version: 3.35.0
|
||||
Image: "mcr.microsoft.com/azure-storage/azurite@sha256:647c63a91102a9d8e8000aab803436e1fc85fbb285e7ce830a82ee5d6661cf37",
|
||||
Args: []string{
|
||||
"azurite-blob",
|
||||
"--blobHost",
|
||||
"0.0.0.0",
|
||||
"--skipApiVersionCheck",
|
||||
},
|
||||
Ports: []corev1.ContainerPort{
|
||||
{
|
||||
ContainerPort: 10000,
|
||||
|
||||
@ -71,7 +71,9 @@ func newGCSDeployment(namespace, name string) *appsv1.Deployment {
|
||||
Containers: []corev1.Container{
|
||||
{
|
||||
Name: name,
|
||||
Image: "fsouza/fake-gcs-server:latest",
|
||||
// renovate: datasource=docker depName=fsouza/fake-gcs-server versioning=docker
|
||||
// Version: 1.52.3
|
||||
Image: "fsouza/fake-gcs-server@sha256:666f86b873120818b10a5e68d99401422fcf8b00c1f27fe89599c35236f48b4c",
|
||||
Ports: []corev1.ContainerPort{
|
||||
{
|
||||
ContainerPort: 4443,
|
||||
|
||||
@ -71,8 +71,9 @@ func newMinioDeployment(namespace, name string) *appsv1.Deployment {
|
||||
Containers: []corev1.Container{
|
||||
{
|
||||
Name: name,
|
||||
// TODO: renovate the image
|
||||
Image: "minio/minio:latest",
|
||||
// renovate: datasource=docker depName=minio/minio versioning=docker
|
||||
// Version: RELEASE.2025-09-07T16-13-09Z
|
||||
Image: "minio/minio@sha256:14cea493d9a34af32f524e538b8346cf79f3321eff8e708c1e2960462bd8936e",
|
||||
Args: []string{"server", "/data"},
|
||||
Ports: []corev1.ContainerPort{
|
||||
{
|
||||
|
||||
Loading…
Reference in New Issue
Block a user