From 88fd3e504f35e004fab47ca33a2e67dd40120e2c Mon Sep 17 00:00:00 2001 From: MichaluxPL <68371308+MichaluxPL@users.noreply.github.com> Date: Tue, 22 Apr 2025 10:21:17 +0200 Subject: [PATCH] feat(wal): parallel WAL archiving (#262) This patch allows the plugin to archive WAL files in parallel. Fix: #260 Fix: #266 Signed-off-by: MichaluxPL <68371308+MichaluxPL@users.noreply.github.com> Signed-off-by: Leonardo Cecchi Signed-off-by: Francesco Canovai Co-authored-by: Leonardo Cecchi Co-authored-by: Francesco Canovai --- docs/examples/minio-store.yaml | 1 + internal/cnpgi/common/wal.go | 75 ++++++++++++++++++++++++++++++++-- 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/docs/examples/minio-store.yaml b/docs/examples/minio-store.yaml index a6575f5..38b116f 100644 --- a/docs/examples/minio-store.yaml +++ b/docs/examples/minio-store.yaml @@ -21,6 +21,7 @@ spec: key: ACCESS_SECRET_KEY wal: compression: gzip + maxParallel: 8 data: additionalCommandArgs: - "--min-chunk-size=5MB" diff --git a/internal/cnpgi/common/wal.go b/internal/cnpgi/common/wal.go index 586991d..0da8a39 100644 --- a/internal/cnpgi/common/wal.go +++ b/internal/cnpgi/common/wal.go @@ -14,6 +14,7 @@ import ( barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" "github.com/cloudnative-pg/cnpg-i/pkg/wal" + walUtils "github.com/cloudnative-pg/machinery/pkg/fileutils/wals" "github.com/cloudnative-pg/machinery/pkg/log" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -24,6 +25,34 @@ import ( "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config" ) +// ErrMissingPermissions is raised when the sidecar has no +// permission to download the credentials needed to reach +// the object storage. +// This will be fixed by the reconciliation loop in the +// operator plugin. +var ErrMissingPermissions = fmt.Errorf("no permission to download the backup credentials, retrying") + +// SpoolManagementError is raised when a spool management +// error has been detected +type SpoolManagementError struct { + walName string + err error +} + +// Error implements the error interface +func (e *SpoolManagementError) Error() string { + return fmt.Sprintf( + "while testing the existence of the WAL file (%s) in the spool directory: %v", + e.walName, + e.err.Error(), + ) +} + +// Unwrap implements the error interface +func (e *SpoolManagementError) Unwrap() error { + return e.err +} + // WALServiceImplementation is the implementation of the WAL Service type WALServiceImplementation struct { wal.UnimplementedWALServer @@ -67,6 +96,10 @@ func (w WALServiceImplementation) Archive( contextLogger := log.FromContext(ctx) contextLogger.Debug("starting wal archive") + baseWalName := path.Base(request.GetSourceFileName()) + + // Step 1: parse the configuration and get the environment variables needed + // for barman-cloud-wal-archive configuration, err := config.NewFromClusterJSON(request.ClusterDefinition) if err != nil { return nil, err @@ -87,7 +120,7 @@ func (w WALServiceImplementation) Archive( ) if err != nil { if apierrors.IsForbidden(err) { - return nil, errors.New("backup credentials don't yet have access permissions. Will retry reconciliation loop") + return nil, ErrMissingPermissions } return nil, err } @@ -103,12 +136,48 @@ func (w WALServiceImplementation) Archive( return nil, err } + // Step 2: check if this WAL file has not been already archived + var isDeletedFromSpool bool + isDeletedFromSpool, err = arch.DeleteFromSpool(baseWalName) + if err != nil { + return nil, &SpoolManagementError{ + walName: baseWalName, + err: err, + } + } + if isDeletedFromSpool { + contextLogger.Info("WAL file already archived, skipping", + "walName", baseWalName) + return nil, nil + } + + // Step 3: gather the WAL files names to archive options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, configuration.ServerName) if err != nil { return nil, err } - walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), 1) - result := arch.ArchiveList(ctx, walList, options) + + maxParallel := 1 + if objectStore.Spec.Configuration.Wal != nil && objectStore.Spec.Configuration.Wal.MaxParallel > 0 { + maxParallel = objectStore.Spec.Configuration.Wal.MaxParallel + } + + maxResults := maxParallel - 1 + walFilesList := walUtils.GatherReadyWALFiles( + ctx, + walUtils.GatherReadyWALFilesConfig{ + MaxResults: maxResults, + SkipWALs: []string{baseWalName}, + PgDataPath: w.PGDataPath, + }, + ) + + // Ensure the requested WAL file is always the first one being + // archived + walFilesList.Ready = append([]string{request.GetSourceFileName()}, walFilesList.Ready...) + contextLogger.Debug("WAL files to archive", "walFilesListReady", walFilesList.Ready) + + result := arch.ArchiveList(ctx, walFilesList.ReadyItemsToSlice(), options) for _, archiverResult := range result { if archiverResult.Err != nil { return nil, archiverResult.Err