From a59f69d83a51aee723cbb98c5fc0f882547701dd Mon Sep 17 00:00:00 2001 From: Armando Ruocco Date: Thu, 26 Sep 2024 16:48:56 +0200 Subject: [PATCH] wip: restore Signed-off-by: Armando Ruocco --- cmd/instance/main.go | 3 +- internal/cnpgi/instance/identity.go | 1 + internal/cnpgi/instance/start.go | 1 + internal/cnpgi/instance/wal.go | 235 +++++++++++++++++++++++++- internal/cnpgi/instance/wal_import.go | 194 +++++++++++++++++++++ 5 files changed, 429 insertions(+), 5 deletions(-) create mode 100644 internal/cnpgi/instance/wal_import.go diff --git a/cmd/instance/main.go b/cmd/instance/main.go index f85c6d3..2d81206 100644 --- a/cmd/instance/main.go +++ b/cmd/instance/main.go @@ -1,13 +1,14 @@ package main import ( + "os" + barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/instance" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "os" controllerruntime "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" diff --git a/internal/cnpgi/instance/identity.go b/internal/cnpgi/instance/identity.go index 7cf6154..ae82efe 100644 --- a/internal/cnpgi/instance/identity.go +++ b/internal/cnpgi/instance/identity.go @@ -3,6 +3,7 @@ package instance import ( "context" "fmt" + barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" "sigs.k8s.io/controller-runtime/pkg/client" diff --git a/internal/cnpgi/instance/start.go b/internal/cnpgi/instance/start.go index bd6dc3d..e662f20 100644 --- a/internal/cnpgi/instance/start.go +++ b/internal/cnpgi/instance/start.go @@ -2,6 +2,7 @@ package instance import ( "context" + "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" "github.com/cloudnative-pg/cnpg-i/pkg/backup" "github.com/cloudnative-pg/cnpg-i/pkg/wal" diff --git a/internal/cnpgi/instance/wal.go b/internal/cnpgi/instance/wal.go index 7aee72b..6e3f4a3 100644 --- a/internal/cnpgi/instance/wal.go +++ b/internal/cnpgi/instance/wal.go @@ -3,11 +3,20 @@ package instance import ( "context" "errors" - barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials" - apierrors "k8s.io/apimachinery/pkg/api/errors" + "fmt" + "strings" + "time" + + barmanapi "github.com/cloudnative-pg/barman-cloud/pkg/api" + "github.com/cloudnative-pg/machinery/pkg/log" + "os" + barmanCommand "github.com/cloudnative-pg/barman-cloud/pkg/command" + barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials" + barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/cloudnative-pg/barman-cloud/pkg/archiver" @@ -81,8 +90,124 @@ func (w WALServiceImplementation) Archive(ctx context.Context, request *wal.WALA } func (w WALServiceImplementation) Restore(ctx context.Context, request *wal.WALRestoreRequest) (*wal.WALRestoreResult, error) { - // TODO implement me - panic("implement me") + contextLogger := log.FromContext(ctx) + startTime := time.Now() + + var objectStore barmancloudv1.ObjectStore + if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { + return nil, err + } + + // TODO: build full paths + walName := request.GetSourceWalName() + destinationPath := request.GetDestinationFileName() + + barmanConfiguration := &objectStore.Spec.Configuration + + env := getRestoreCABundleEnv(barmanConfiguration) + credentialsEnv, err := barmanCredentials.EnvSetBackupCloudCredentials( + ctx, + w.Client, + objectStore.Namespace, + &objectStore.Spec.Configuration, + os.Environ(), + ) + if err != nil { + return nil, fmt.Errorf("while getting recover credentials: %w", err) + } + mergeEnv(env, credentialsEnv) + + options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, objectStore.Name) + if err != nil { + return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err) + } + + // Create the restorer + var walRestorer *barmanRestorer.WALRestorer + if walRestorer, err = barmanRestorer.New(ctx, env, w.SpoolDirectory); err != nil { + return nil, fmt.Errorf("while creating the restorer: %w", err) + } + + // Step 1: check if this WAL file is not already in the spool + var wasInSpool bool + if wasInSpool, err = walRestorer.RestoreFromSpool(walName, destinationPath); err != nil { + return nil, fmt.Errorf("while restoring a file from the spool directory: %w", err) + } + if wasInSpool { + contextLogger.Info("Restored WAL file from spool (parallel)", + "walName", walName, + ) + return nil, nil + } + + // TODO + // Step 2: return error if the end-of-wal-stream flag is set. + // We skip this step if streaming connection is not available + //if isStreamingAvailable(cluster, podName) { + // if err := checkEndOfWALStreamFlag(walRestorer); err != nil { + // return nil, err + // } + //} + + // Step 3: gather the WAL files names to restore. If the required file isn't a regular WAL, we download it directly. + var walFilesList []string + maxParallel := 1 + if barmanConfiguration.Wal != nil && barmanConfiguration.Wal.MaxParallel > 1 { + maxParallel = barmanConfiguration.Wal.MaxParallel + } + if IsWALFile(walName) { + // If this is a regular WAL file, we try to prefetch + if walFilesList, err = gatherWALFilesToRestore(walName, maxParallel); err != nil { + return nil, fmt.Errorf("while generating the list of WAL files to restore: %w", err) + } + } else { + // This is not a regular WAL file, we fetch it directly + walFilesList = []string{walName} + } + + // Step 4: download the WAL files into the required place + downloadStartTime := time.Now() + walStatus := walRestorer.RestoreList(ctx, walFilesList, destinationPath, options) + + // We return immediately if the first WAL has errors, because the first WAL + // is the one that PostgreSQL has requested to restore. + // The failure has already been logged in walRestorer.RestoreList method + if walStatus[0].Err != nil { + return nil, walStatus[0].Err + } + + // TODO + // Step 5: set end-of-wal-stream flag if any download job returned file-not-found + // We skip this step if streaming connection is not available + //endOfWALStream := isEndOfWALStream(walStatus) + //if isStreamingAvailable(cluster, podName) && endOfWALStream { + // contextLogger.Info( + // "Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found") + // + // err = walRestorer.SetEndOfWALStream() + // if err != nil { + // return nil, err + // } + //} + + successfulWalRestore := 0 + for idx := range walStatus { + if walStatus[idx].Err == nil { + successfulWalRestore++ + } + } + + contextLogger.Info("WAL restore command completed (parallel)", + "walName", walName, + "maxParallel", maxParallel, + "successfulWalRestore", successfulWalRestore, + "failedWalRestore", maxParallel-successfulWalRestore, + "startTime", startTime, + "downloadStartTime", downloadStartTime, + "downloadTotalTime", time.Since(downloadStartTime), + "totalTime", time.Since(startTime)) + + return &wal.WALRestoreResult{}, nil } func (w WALServiceImplementation) Status(ctx context.Context, request *wal.WALStatusRequest) (*wal.WALStatusResult, error) { @@ -94,3 +219,105 @@ func (w WALServiceImplementation) SetFirstRequired(ctx context.Context, request // TODO implement me panic("implement me") } + +// mergeEnv merges all the values inside incomingEnv into env +func mergeEnv(env []string, incomingEnv []string) { + for _, incomingItem := range incomingEnv { + incomingKV := strings.SplitAfterN(incomingItem, "=", 2) + if len(incomingKV) != 2 { + continue + } + for idx, item := range env { + if strings.HasPrefix(item, incomingKV[0]) { + env[idx] = incomingItem + } + } + } +} + +// TODO: refactor +const ( + // ScratchDataDirectory is the directory to be used for scratch data + ScratchDataDirectory = "/controller" + + // CertificatesDir location to store the certificates + CertificatesDir = ScratchDataDirectory + "/certificates/" + + // BarmanBackupEndpointCACertificateLocation is the location where the barman endpoint + // CA certificate is stored + BarmanBackupEndpointCACertificateLocation = CertificatesDir + BarmanBackupEndpointCACertificateFileName + + // BarmanBackupEndpointCACertificateFileName is the name of the file in which the barman endpoint + // CA certificate for backups is stored + BarmanBackupEndpointCACertificateFileName = "backup-" + BarmanEndpointCACertificateFileName + + // BarmanRestoreEndpointCACertificateFileName is the name of the file in which the barman endpoint + // CA certificate for restores is stored + BarmanRestoreEndpointCACertificateFileName = "restore-" + BarmanEndpointCACertificateFileName + + // BarmanEndpointCACertificateFileName is the name of the file in which the barman endpoint + // CA certificate is stored + BarmanEndpointCACertificateFileName = "barman-ca.crt" +) + +func getRestoreCABundleEnv(configuration *barmanapi.BarmanObjectStoreConfiguration) []string { + var env []string + + if configuration.EndpointCA != nil && configuration.BarmanCredentials.AWS != nil { + env = append(env, fmt.Sprintf("AWS_CA_BUNDLE=%s", BarmanBackupEndpointCACertificateLocation)) + } else if configuration.EndpointCA != nil && configuration.BarmanCredentials.Azure != nil { + env = append(env, fmt.Sprintf("REQUESTS_CA_BUNDLE=%s", BarmanBackupEndpointCACertificateLocation)) + } + return env +} + +//// isStreamingAvailable checks if this pod can replicate via streaming connection +//func isStreamingAvailable(cluster *apiv1.Cluster, podName string) bool { +// if cluster == nil { +// return false +// } +// +// // Easy case: If this pod is a replica, the streaming is always available +// if cluster.Status.CurrentPrimary != podName { +// return true +// } +// +// // Designated primary in a replica cluster: return true if the external cluster has streaming connection +// if cluster.IsReplica() { +// externalCluster, found := cluster.ExternalCluster(cluster.Spec.ReplicaCluster.Source) +// +// // This is a configuration error +// if !found { +// return false +// } +// +// return externalCluster.ConnectionParameters != nil +// } +// +// // Primary, we do not replicate from nobody +// return false +//} + +// gatherWALFilesToRestore files a list of possible WAL files to restore, always +// including as the first one the requested WAL file +func gatherWALFilesToRestore(walName string, parallel int) (walList []string, err error) { + var segment Segment + + segment, err = SegmentFromName(walName) + if err != nil { + // This seems an invalid segment name. It's not a problem + // because PostgreSQL may request also other files such as + // backup, history, etc. + // Let's just avoid prefetching in this case + return []string{walName}, nil + } + // NextSegments would accept postgresVersion and segmentSize, + // but we do not have this info here, so we pass nil. + segmentList := segment.NextSegments(parallel, nil, nil) + walList = make([]string, len(segmentList)) + for idx := range segmentList { + walList[idx] = segmentList[idx].Name() + } + + return walList, err +} diff --git a/internal/cnpgi/instance/wal_import.go b/internal/cnpgi/instance/wal_import.go new file mode 100644 index 0000000..0fb8cf2 --- /dev/null +++ b/internal/cnpgi/instance/wal_import.go @@ -0,0 +1,194 @@ +/* +Copyright The CloudNativePG Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instance + +import ( + "errors" + "fmt" + "path" + "regexp" + "strconv" +) + +// TODO: remove this file migrate in cnpg-machinery + +const ( + // DefaultWALSegmentSize is the default size of a single WAL file + // This must be a power of 2 + DefaultWALSegmentSize = int64(1 << 24) + + // WALHexOctetRe is a regex to match 8 Hex characters + WALHexOctetRe = `([\dA-Fa-f]{8})` + + // WALTimeLineRe is a regex to match the timeline in a WAL filename + WALTimeLineRe = WALHexOctetRe + + // WALSegmentNameRe is a regex to match the segment parent log file and segment id + WALSegmentNameRe = WALHexOctetRe + WALHexOctetRe +) + +var ( + // WALRe is the file segment name parser + WALRe = regexp.MustCompile(`^` + + // everything has a timeline + WALTimeLineRe + + // (1) optional + `(?:` + + // segment name, if a wal file + WALSegmentNameRe + + // and (2) optional + `(?:` + + // offset, if a backup label + `\.[\dA-Fa-f]{8}\.backup` + + // or + `|` + + // partial, if a partial file + `\.partial` + + // close (2) + `)?` + + // or + `|` + + // only .history, if a history file + `\.history` + + // close (1) + `)$`) + + // WALSegmentRe is the file segment name parser + WALSegmentRe = regexp.MustCompile(`^` + + // everything has a timeline + WALTimeLineRe + + // segment name, if a wal file + WALSegmentNameRe + + `$`) + + // ErrorBadWALSegmentName is raised when parsing an invalid segment name + ErrorBadWALSegmentName = errors.New("invalid WAL segment name") +) + +// Segment contains the information inside a WAL segment name +type Segment struct { + // Timeline number + Tli int32 + + // Log number + Log int32 + + // Segment number + Seg int32 +} + +// IsWALFile check if the passed file name is a regular WAL file. +// It supports either a full file path or a simple file name +func IsWALFile(name string) bool { + baseName := path.Base(name) + return WALSegmentRe.MatchString(baseName) +} + +// SegmentFromName retrieves the timeline, log ID and segment ID +// from the name of a xlog segment, and can also handle a full path +// or a simple file name +func SegmentFromName(name string) (Segment, error) { + var tli, log, seg int64 + var err error + + baseName := path.Base(name) + // We could have used WALSegmentRe directly, but we wanted to adhere to barman code + subMatches := WALRe.FindStringSubmatch(baseName) + if len(subMatches) != 4 { + return Segment{}, ErrorBadWALSegmentName + } + + if len(subMatches[0]) != 24 { + return Segment{}, ErrorBadWALSegmentName + } + + if tli, err = strconv.ParseInt(subMatches[1], 16, 32); err != nil { + return Segment{}, ErrorBadWALSegmentName + } + + if log, err = strconv.ParseInt(subMatches[2], 16, 32); err != nil { + return Segment{}, ErrorBadWALSegmentName + } + + if seg, err = strconv.ParseInt(subMatches[3], 16, 32); err != nil { + return Segment{}, ErrorBadWALSegmentName + } + + return Segment{ + Tli: int32(tli), + Log: int32(log), + Seg: int32(seg), + }, nil +} + +// MustSegmentFromName is analogous to SegmentFromName but panics +// if the segment name is invalid +func MustSegmentFromName(name string) Segment { + result, err := SegmentFromName(name) + if err != nil { + panic(err) + } + + return result +} + +// Name gets the name of the segment +func (segment Segment) Name() string { + return fmt.Sprintf("%08X%08X%08X", segment.Tli, segment.Log, segment.Seg) +} + +// WalSegmentsPerFile is the number of WAL Segments in a WAL File +func WalSegmentsPerFile(walSegmentSize int64) int32 { + // Given that segment section is represented by 8 hex characters, + // we compute the number of wal segments in a file, by dividing + // the "max segment number" by the wal segment size. + return int32(0xFFFFFFFF / walSegmentSize) //nolint:gosec +} + +// NextSegments generate the list of all possible segment names starting +// from `segment`, until the specified size is reached. This function will +// not ever generate timeline changes. +// If postgresVersion == nil, the latest postgres version is assumed. +// If segmentSize == nil, wal_segment_size=DefaultWALSegmentSize is assumed. +func (segment Segment) NextSegments(size int, postgresVersion *int, segmentSize *int64) []Segment { + result := make([]Segment, 0, size) + + var walSegPerFile int32 + if segmentSize == nil { + walSegPerFile = WalSegmentsPerFile(DefaultWALSegmentSize) + } else { + walSegPerFile = WalSegmentsPerFile(*segmentSize) + } + + skipLastSegment := postgresVersion != nil && *postgresVersion < 90300 + + currentSegment := segment + for len(result) < size { + result = append(result, Segment{ + Tli: currentSegment.Tli, + Log: currentSegment.Log, + Seg: currentSegment.Seg, + }) + currentSegment.Seg++ + if currentSegment.Seg > walSegPerFile || (skipLastSegment && currentSegment.Seg == walSegPerFile) { + currentSegment.Log++ + currentSegment.Seg = 0 + } + } + + return result +}