package instance import ( "context" "errors" "fmt" "strings" "time" barmanapi "github.com/cloudnative-pg/barman-cloud/pkg/api" "github.com/cloudnative-pg/machinery/pkg/log" "os" barmanCommand "github.com/cloudnative-pg/barman-cloud/pkg/command" barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials" barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/cloudnative-pg/barman-cloud/pkg/archiver" "github.com/cloudnative-pg/cnpg-i/pkg/wal" ) type WALServiceImplementation struct { BarmanObjectKey client.ObjectKey Client client.Client SpoolDirectory string PGDataPath string PGWALPath string wal.UnimplementedWALServer } func (w WALServiceImplementation) GetCapabilities(ctx context.Context, request *wal.WALCapabilitiesRequest) (*wal.WALCapabilitiesResult, error) { return &wal.WALCapabilitiesResult{ Capabilities: []*wal.WALCapability{ { Type: &wal.WALCapability_Rpc{ Rpc: &wal.WALCapability_RPC{ Type: wal.WALCapability_RPC_TYPE_ARCHIVE_WAL, }, }, }, { Type: &wal.WALCapability_Rpc{ Rpc: &wal.WALCapability_RPC{ Type: wal.WALCapability_RPC_TYPE_RESTORE_WAL, }, }, }, }, }, nil } func (w WALServiceImplementation) Archive(ctx context.Context, request *wal.WALArchiveRequest) (*wal.WALArchiveResult, error) { var objectStore barmancloudv1.ObjectStore if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { return nil, err } envArchive, err := barmanCredentials.EnvSetBackupCloudCredentials( ctx, w.Client, objectStore.Namespace, &objectStore.Spec.Configuration, os.Environ()) if apierrors.IsForbidden(err) { return nil, errors.New("backup credentials don't yet have access permissions. Will retry reconciliation loop") } arch, err := archiver.New(ctx, envArchive, w.SpoolDirectory, w.PGDataPath, w.PGWALPath) if err != nil { return nil, err } options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, objectStore.Name) if err != nil { return nil, err } walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), 1) result := arch.ArchiveList(ctx, walList, options) for _, archiverResult := range result { if archiverResult.Err != nil { return nil, archiverResult.Err } } return &wal.WALArchiveResult{}, nil } func (w WALServiceImplementation) Restore(ctx context.Context, request *wal.WALRestoreRequest) (*wal.WALRestoreResult, error) { contextLogger := log.FromContext(ctx) startTime := time.Now() var objectStore barmancloudv1.ObjectStore if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { return nil, err } // TODO: build full paths walName := request.GetSourceWalName() destinationPath := request.GetDestinationFileName() barmanConfiguration := &objectStore.Spec.Configuration env := getRestoreCABundleEnv(barmanConfiguration) credentialsEnv, err := barmanCredentials.EnvSetBackupCloudCredentials( ctx, w.Client, objectStore.Namespace, &objectStore.Spec.Configuration, os.Environ(), ) if err != nil { return nil, fmt.Errorf("while getting recover credentials: %w", err) } mergeEnv(env, credentialsEnv) options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, objectStore.Name) if err != nil { return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err) } // Create the restorer var walRestorer *barmanRestorer.WALRestorer if walRestorer, err = barmanRestorer.New(ctx, env, w.SpoolDirectory); err != nil { return nil, fmt.Errorf("while creating the restorer: %w", err) } // Step 1: check if this WAL file is not already in the spool var wasInSpool bool if wasInSpool, err = walRestorer.RestoreFromSpool(walName, destinationPath); err != nil { return nil, fmt.Errorf("while restoring a file from the spool directory: %w", err) } if wasInSpool { contextLogger.Info("Restored WAL file from spool (parallel)", "walName", walName, ) return nil, nil } // TODO // Step 2: return error if the end-of-wal-stream flag is set. // We skip this step if streaming connection is not available //if isStreamingAvailable(cluster, podName) { // if err := checkEndOfWALStreamFlag(walRestorer); err != nil { // return nil, err // } //} // Step 3: gather the WAL files names to restore. If the required file isn't a regular WAL, we download it directly. var walFilesList []string maxParallel := 1 if barmanConfiguration.Wal != nil && barmanConfiguration.Wal.MaxParallel > 1 { maxParallel = barmanConfiguration.Wal.MaxParallel } if IsWALFile(walName) { // If this is a regular WAL file, we try to prefetch if walFilesList, err = gatherWALFilesToRestore(walName, maxParallel); err != nil { return nil, fmt.Errorf("while generating the list of WAL files to restore: %w", err) } } else { // This is not a regular WAL file, we fetch it directly walFilesList = []string{walName} } // Step 4: download the WAL files into the required place downloadStartTime := time.Now() walStatus := walRestorer.RestoreList(ctx, walFilesList, destinationPath, options) // We return immediately if the first WAL has errors, because the first WAL // is the one that PostgreSQL has requested to restore. // The failure has already been logged in walRestorer.RestoreList method if walStatus[0].Err != nil { return nil, walStatus[0].Err } // TODO // Step 5: set end-of-wal-stream flag if any download job returned file-not-found // We skip this step if streaming connection is not available //endOfWALStream := isEndOfWALStream(walStatus) //if isStreamingAvailable(cluster, podName) && endOfWALStream { // contextLogger.Info( // "Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found") // // err = walRestorer.SetEndOfWALStream() // if err != nil { // return nil, err // } //} successfulWalRestore := 0 for idx := range walStatus { if walStatus[idx].Err == nil { successfulWalRestore++ } } contextLogger.Info("WAL restore command completed (parallel)", "walName", walName, "maxParallel", maxParallel, "successfulWalRestore", successfulWalRestore, "failedWalRestore", maxParallel-successfulWalRestore, "startTime", startTime, "downloadStartTime", downloadStartTime, "downloadTotalTime", time.Since(downloadStartTime), "totalTime", time.Since(startTime)) return &wal.WALRestoreResult{}, nil } func (w WALServiceImplementation) Status(ctx context.Context, request *wal.WALStatusRequest) (*wal.WALStatusResult, error) { // TODO implement me panic("implement me") } func (w WALServiceImplementation) SetFirstRequired(ctx context.Context, request *wal.SetFirstRequiredRequest) (*wal.SetFirstRequiredResult, error) { // TODO implement me panic("implement me") } // mergeEnv merges all the values inside incomingEnv into env func mergeEnv(env []string, incomingEnv []string) { for _, incomingItem := range incomingEnv { incomingKV := strings.SplitAfterN(incomingItem, "=", 2) if len(incomingKV) != 2 { continue } for idx, item := range env { if strings.HasPrefix(item, incomingKV[0]) { env[idx] = incomingItem } } } } // TODO: refactor const ( // ScratchDataDirectory is the directory to be used for scratch data ScratchDataDirectory = "/controller" // CertificatesDir location to store the certificates CertificatesDir = ScratchDataDirectory + "/certificates/" // BarmanBackupEndpointCACertificateLocation is the location where the barman endpoint // CA certificate is stored BarmanBackupEndpointCACertificateLocation = CertificatesDir + BarmanBackupEndpointCACertificateFileName // BarmanBackupEndpointCACertificateFileName is the name of the file in which the barman endpoint // CA certificate for backups is stored BarmanBackupEndpointCACertificateFileName = "backup-" + BarmanEndpointCACertificateFileName // BarmanRestoreEndpointCACertificateFileName is the name of the file in which the barman endpoint // CA certificate for restores is stored BarmanRestoreEndpointCACertificateFileName = "restore-" + BarmanEndpointCACertificateFileName // BarmanEndpointCACertificateFileName is the name of the file in which the barman endpoint // CA certificate is stored BarmanEndpointCACertificateFileName = "barman-ca.crt" ) func getRestoreCABundleEnv(configuration *barmanapi.BarmanObjectStoreConfiguration) []string { var env []string if configuration.EndpointCA != nil && configuration.BarmanCredentials.AWS != nil { env = append(env, fmt.Sprintf("AWS_CA_BUNDLE=%s", BarmanBackupEndpointCACertificateLocation)) } else if configuration.EndpointCA != nil && configuration.BarmanCredentials.Azure != nil { env = append(env, fmt.Sprintf("REQUESTS_CA_BUNDLE=%s", BarmanBackupEndpointCACertificateLocation)) } return env } //// isStreamingAvailable checks if this pod can replicate via streaming connection //func isStreamingAvailable(cluster *apiv1.Cluster, podName string) bool { // if cluster == nil { // return false // } // // // Easy case: If this pod is a replica, the streaming is always available // if cluster.Status.CurrentPrimary != podName { // return true // } // // // Designated primary in a replica cluster: return true if the external cluster has streaming connection // if cluster.IsReplica() { // externalCluster, found := cluster.ExternalCluster(cluster.Spec.ReplicaCluster.Source) // // // This is a configuration error // if !found { // return false // } // // return externalCluster.ConnectionParameters != nil // } // // // Primary, we do not replicate from nobody // return false //} // gatherWALFilesToRestore files a list of possible WAL files to restore, always // including as the first one the requested WAL file func gatherWALFilesToRestore(walName string, parallel int) (walList []string, err error) { var segment Segment segment, err = SegmentFromName(walName) if err != nil { // This seems an invalid segment name. It's not a problem // because PostgreSQL may request also other files such as // backup, history, etc. // Let's just avoid prefetching in this case return []string{walName}, nil } // NextSegments would accept postgresVersion and segmentSize, // but we do not have this info here, so we pass nil. segmentList := segment.NextSegments(parallel, nil, nil) walList = make([]string, len(segmentList)) for idx := range segmentList { walList[idx] = segmentList[idx].Name() } return walList, err }