From 1c86ff65747b5b348fb1ed2b0e5b0594fd156116 Mon Sep 17 00:00:00 2001 From: Armando Ruocco Date: Tue, 1 Oct 2024 11:02:12 +0200 Subject: [PATCH] feat(spike): wal-archive and wal-restore methods (#4) Signed-off-by: Armando Ruocco Co-authored-by: Leonardo Cecchi --- Dockerfile | 4 +- cmd/instance/main.go | 107 ++++++++ cmd/{ => operator}/main.go | 0 go.mod | 22 +- go.sum | 40 ++- internal/cnpgi/instance/cmd.go | 22 -- internal/cnpgi/instance/identity.go | 13 +- internal/cnpgi/instance/start.go | 59 +++++ internal/cnpgi/instance/wal.go | 335 ++++++++++++++++++++++++-- internal/cnpgi/instance/wal_import.go | 194 +++++++++++++++ internal/controller/suite_test.go | 2 +- 11 files changed, 739 insertions(+), 59 deletions(-) create mode 100644 cmd/instance/main.go rename cmd/{ => operator}/main.go (100%) delete mode 100644 internal/cnpgi/instance/cmd.go create mode 100644 internal/cnpgi/instance/start.go create mode 100644 internal/cnpgi/instance/wal_import.go diff --git a/Dockerfile b/Dockerfile index 5c73c7f..31c9ae6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,7 +12,7 @@ COPY go.sum go.sum RUN go mod download # Copy the go source -COPY cmd/main.go cmd/main.go +COPY cmd/instance/main.go cmd/instance/main.go COPY api/ api/ COPY internal/ internal/ @@ -21,7 +21,7 @@ COPY internal/ internal/ # was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO # the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore, # by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. -RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/main.go +RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/instance/main.go # Use distroless as minimal base image to package the manager binary # Refer to https://github.com/GoogleContainerTools/distroless for more details diff --git a/cmd/instance/main.go b/cmd/instance/main.go new file mode 100644 index 0000000..50073e2 --- /dev/null +++ b/cmd/instance/main.go @@ -0,0 +1,107 @@ +package main + +import ( + "errors" + "os" + + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + controllerruntime "sigs.k8s.io/controller-runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" + "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/instance" +) + +var ( + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") +) + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + + utilruntime.Must(barmancloudv1.AddToScheme(scheme)) + // +kubebuilder:scaffold:scheme +} + +func main() { + setupLog.Info("Starting barman cloud instance plugin") + namespace := mustGetEnv("NAMESPACE") + boName := mustGetEnv("BARMAN_OBJECT_NAME") + clusterName := mustGetEnv("CLUSTER_NAME") + instanceName := mustGetEnv("INSTANCE_NAME") + + mgr, err := controllerruntime.NewManager(controllerruntime.GetConfigOrDie(), controllerruntime.Options{ + Scheme: scheme, + Cache: cache.Options{ + ByObject: map[client.Object]cache.ByObject{ + &barmancloudv1.ObjectStore{}: { + Field: fields.OneTermEqualSelector("metadata.name", boName), + Namespaces: map[string]cache.Config{ + namespace: {}, + }, + }, + &cnpgv1.Cluster{}: { + Field: fields.OneTermEqualSelector("metadata.name", clusterName), + Namespaces: map[string]cache.Config{ + namespace: {}, + }, + }, + }, + }, + }) + if err != nil { + setupLog.Error(err, "unable to start manager") + os.Exit(1) + } + + if err := mgr.Add(&instance.CNPGI{ + Client: mgr.GetClient(), + ClusterObjectKey: client.ObjectKey{ + Namespace: namespace, + Name: clusterName, + }, + BarmanObjectKey: client.ObjectKey{ + Namespace: namespace, + Name: boName, + }, + InstanceName: instanceName, + // TODO: improve + PGDataPath: mustGetEnv("PGDATA"), + PGWALPath: mustGetEnv("PGWAL"), + SpoolDirectory: mustGetEnv("SPOOL_DIRECTORY"), + ServerCertPath: mustGetEnv("SERVER_CERT"), + ServerKeyPath: mustGetEnv("SERVER_KEY"), + ClientCertPath: mustGetEnv("CLIENT_CERT"), + ServerAddress: mustGetEnv("SERVER_ADDRESS"), + PluginPath: mustGetEnv("PLUGIN_PATH"), + }); err != nil { + setupLog.Error(err, "unable to create CNPGI runnable") + os.Exit(1) + } + + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + setupLog.Error(err, "problem running manager") + os.Exit(1) + } +} + +func mustGetEnv(envName string) string { + value := os.Getenv(envName) + if value == "" { + setupLog.Error( + errors.New("missing required env variable"), + "while fetching env variables", + "name", + envName, + ) + os.Exit(1) + } + return value +} diff --git a/cmd/main.go b/cmd/operator/main.go similarity index 100% rename from cmd/main.go rename to cmd/operator/main.go diff --git a/go.mod b/go.mod index bb75be0..24c5133 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,15 @@ go 1.22.0 require ( github.com/cloudnative-pg/barman-cloud v0.0.0-20240924124724-92831d48562a - github.com/cloudnative-pg/cnpg-i v0.0.0-20240924030516-c5636170f248 - github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20240926153929-09e2c6f6689b + github.com/cloudnative-pg/cloudnative-pg v1.24.0 + github.com/cloudnative-pg/cnpg-i v0.0.0-20240902182059-c9f193bf825b + github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20240926095718-27da985944d4 + github.com/cloudnative-pg/machinery v0.0.0-20240919131343-9dd62b9257c7 github.com/onsi/ginkgo/v2 v2.20.2 github.com/onsi/gomega v1.34.2 - github.com/spf13/cobra v1.8.1 - google.golang.org/grpc v1.67.0 + google.golang.org/grpc v1.66.0 k8s.io/apimachinery v0.31.1 - k8s.io/client-go v0.31.1 + k8s.io/client-go v0.31.0 sigs.k8s.io/controller-runtime v0.19.0 ) @@ -19,10 +20,10 @@ require ( github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/blang/semver v3.5.1+incompatible // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/cloudnative-pg/machinery v0.0.0-20240919131343-9dd62b9257c7 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect @@ -45,6 +46,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect @@ -53,27 +55,35 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/moby/spdystream v0.4.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.75.2 // indirect github.com/prometheus/client_golang v1.20.3 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.59.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/robfig/cron v1.2.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect + github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.19.0 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect + github.com/thoas/go-funk v0.9.3 // indirect github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect go.opentelemetry.io/otel v1.28.0 // indirect diff --git a/go.sum b/go.sum index 68572de..c47f300 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,13 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ= +github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= @@ -12,10 +16,12 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudnative-pg/barman-cloud v0.0.0-20240924124724-92831d48562a h1:0v1ML9Eibfq3helbT9GtU0EstqFtG91k/MPO9azY5ME= github.com/cloudnative-pg/barman-cloud v0.0.0-20240924124724-92831d48562a/go.mod h1:Jm0tOp5oB7utpt8wz6RfSv31h1mThOtffjfyxVupriE= -github.com/cloudnative-pg/cnpg-i v0.0.0-20240924030516-c5636170f248 h1:eUGzb7YNjVLilwhgZoe4hDOO70fci3oqb/ZzQFbN3xg= -github.com/cloudnative-pg/cnpg-i v0.0.0-20240924030516-c5636170f248/go.mod h1:K9/4eAT3rh2bKIWyujoN8BIPRXa4d1Ls+eBY8PE8y6w= -github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20240926153929-09e2c6f6689b h1:T9G61tzOBoB5yvlDPULUoiUl6QxPmti3pkNFhQYGGQY= -github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20240926153929-09e2c6f6689b/go.mod h1:dV1+nE7jWENm/fcnKBeKsaScMz685rQPbPCCDydJgsY= +github.com/cloudnative-pg/cloudnative-pg v1.24.0 h1:lY9IP/Gnh5ogNcFGoPnbD2eOiHdSdbdEp0PaUdOAMDQ= +github.com/cloudnative-pg/cloudnative-pg v1.24.0/go.mod h1:n7Qqax6os+x3+7Qu/GojUUeKlL1ELGV63dcO/tzIfB4= +github.com/cloudnative-pg/cnpg-i v0.0.0-20240902182059-c9f193bf825b h1:DKMuZrvZJkaZ/rNswcA4q2YST5TRhKPlxFxL5wQXRRs= +github.com/cloudnative-pg/cnpg-i v0.0.0-20240902182059-c9f193bf825b/go.mod h1:LhDyBxtoGsWNf0+YXj+uIhFlzHeY8N0QEkO6Kqv8yKY= +github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20240926095718-27da985944d4 h1:ZZkI87sz/Bn6O6jaMB4BDpZihGSAk8HkwXB6+ECKlgA= +github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20240926095718-27da985944d4/go.mod h1:dV1+nE7jWENm/fcnKBeKsaScMz685rQPbPCCDydJgsY= github.com/cloudnative-pg/machinery v0.0.0-20240919131343-9dd62b9257c7 h1:glRSFwMeX1tb1wlN6ZxihPH3nMXL9ZlwU1/xvNFB0iE= github.com/cloudnative-pg/machinery v0.0.0-20240919131343-9dd62b9257c7/go.mod h1:bWp1Es5zlxElg4Z/c5f0RKOkDcyNvDHdYIvNcPQU4WM= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -72,6 +78,8 @@ github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoP github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= @@ -94,14 +102,20 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0 h1:mjQG0Vakr2h246kEDR85U8y8ZhPgT3bguTCajRa/jaw= +github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/moby/spdystream v0.4.0 h1:Vy79D6mHeJJjiPdFEL2yku1kl0chZpJfZcPpb16BRl8= +github.com/moby/spdystream v0.4.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -109,6 +123,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag= github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8= @@ -120,6 +136,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.75.2 h1:6UsAv+jAevuGO2yZFU/BukV4o9NKnFMOuoouSA4G0ns= +github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.75.2/go.mod h1:XYrdZw5dW12Cjkt4ndbeNZZTBp4UCHtW0ccR9+sTtPU= github.com/prometheus/client_golang v1.20.3 h1:oPksm4K8B+Vt35tUhw6GbSNSgVlVSBH0qELP/7u83l4= github.com/prometheus/client_golang v1.20.3/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= @@ -128,6 +146,8 @@ github.com/prometheus/common v0.59.1 h1:LXb1quJHWm1P6wq/U824uxYi4Sg0oGvNeUm1z5dJ github.com/prometheus/common v0.59.1/go.mod h1:GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -154,6 +174,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -162,6 +183,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= +github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -237,8 +260,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 h1: google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142/go.mod h1:d6be+8HhtEtucleCbxpPW9PA9XwISACu8nvpPqF0BVo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs= google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= -google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw= -google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= +google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -250,6 +273,7 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= @@ -264,8 +288,8 @@ k8s.io/apimachinery v0.31.1 h1:mhcUBbj7KUjaVhyXILglcVjuS4nYXiwC+KKFBgIVy7U= k8s.io/apimachinery v0.31.1/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= k8s.io/apiserver v0.31.0 h1:p+2dgJjy+bk+B1Csz+mc2wl5gHwvNkC9QJV+w55LVrY= k8s.io/apiserver v0.31.0/go.mod h1:KI9ox5Yu902iBnnyMmy7ajonhKnkeZYJhTZ/YI+WEMk= -k8s.io/client-go v0.31.1 h1:f0ugtWSbWpxHR7sjVpQwuvw9a3ZKLXX0u0itkFXufb0= -k8s.io/client-go v0.31.1/go.mod h1:sKI8871MJN2OyeqRlmA4W4KM9KBdBUpDLu/43eGemCg= +k8s.io/client-go v0.31.0 h1:QqEJzNjbN2Yv1H79SsS+SWnXkBgVu4Pj3CJQgbx0gI8= +k8s.io/client-go v0.31.0/go.mod h1:Y9wvC76g4fLjmU0BA+rV+h2cncoadjvjjkkIGoTLcGU= k8s.io/component-base v0.31.0 h1:/KIzGM5EvPNQcYgwq5NwoQBaOlVFrghoVGr8lG6vNRs= k8s.io/component-base v0.31.0/go.mod h1:TYVuzI1QmN4L5ItVdMSXKvH7/DtvIuas5/mm8YT3rTo= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= diff --git a/internal/cnpgi/instance/cmd.go b/internal/cnpgi/instance/cmd.go deleted file mode 100644 index f5db9fa..0000000 --- a/internal/cnpgi/instance/cmd.go +++ /dev/null @@ -1,22 +0,0 @@ -package instance - -import ( - "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" - "github.com/cloudnative-pg/cnpg-i/pkg/backup" - "github.com/cloudnative-pg/cnpg-i/pkg/wal" - "github.com/spf13/cobra" - "google.golang.org/grpc" -) - -func NewCMD() *cobra.Command { - cmd := http.CreateMainCmd(IdentityImplementation{}, func(server *grpc.Server) error { - // Register the declared implementations - wal.RegisterWALServer(server, WALServiceImplementation{}) - backup.RegisterBackupServer(server, BackupServiceImplementation{}) - return nil - }) - - cmd.Use = "plugin-instance" - - return cmd -} diff --git a/internal/cnpgi/instance/identity.go b/internal/cnpgi/instance/identity.go index e70e7c5..85e2f0a 100644 --- a/internal/cnpgi/instance/identity.go +++ b/internal/cnpgi/instance/identity.go @@ -2,12 +2,18 @@ package instance import ( "context" + "fmt" "github.com/cloudnative-pg/cnpg-i/pkg/identity" + "sigs.k8s.io/controller-runtime/pkg/client" + + barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" ) type IdentityImplementation struct { identity.UnimplementedIdentityServer + BarmanObjectKey client.ObjectKey + Client client.Client } func (i IdentityImplementation) GetPluginMetadata( @@ -42,9 +48,14 @@ func (i IdentityImplementation) GetPluginCapabilities( } func (i IdentityImplementation) Probe( - _ context.Context, + ctx context.Context, _ *identity.ProbeRequest, ) (*identity.ProbeResponse, error) { + var obj barmancloudv1.ObjectStore + if err := i.Client.Get(ctx, i.BarmanObjectKey, &obj); err != nil { + return nil, fmt.Errorf("while fetching object store %s: %w", i.BarmanObjectKey.Name, err) + } + return &identity.ProbeResponse{ Ready: true, }, nil diff --git a/internal/cnpgi/instance/start.go b/internal/cnpgi/instance/start.go new file mode 100644 index 0000000..1501ee2 --- /dev/null +++ b/internal/cnpgi/instance/start.go @@ -0,0 +1,59 @@ +package instance + +import ( + "context" + + "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" + "github.com/cloudnative-pg/cnpg-i/pkg/backup" + "github.com/cloudnative-pg/cnpg-i/pkg/wal" + "google.golang.org/grpc" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type CNPGI struct { + Client client.Client + BarmanObjectKey client.ObjectKey + ClusterObjectKey client.ObjectKey + PGDataPath string + PGWALPath string + SpoolDirectory string + ServerCertPath string + ServerKeyPath string + ClientCertPath string + // mutually exclusive with pluginPath + ServerAddress string + // mutually exclusive with serverAddress + PluginPath string + InstanceName string +} + +func (c *CNPGI) Start(ctx context.Context) error { + enrich := func(server *grpc.Server) error { + wal.RegisterWALServer(server, WALServiceImplementation{ + BarmanObjectKey: c.BarmanObjectKey, + ClusterObjectKey: c.ClusterObjectKey, + InstanceName: c.InstanceName, + Client: c.Client, + SpoolDirectory: c.SpoolDirectory, + PGDataPath: c.PGDataPath, + PGWALPath: c.PGWALPath, + }) + backup.RegisterBackupServer(server, BackupServiceImplementation{}) + return nil + } + + srv := http.Server{ + IdentityImpl: IdentityImplementation{ + Client: c.Client, + BarmanObjectKey: c.BarmanObjectKey, + }, + Enrichers: []http.ServerEnricher{enrich}, + ServerCertPath: c.ServerCertPath, + ServerKeyPath: c.ServerKeyPath, + ClientCertPath: c.ClientCertPath, + ServerAddress: c.ServerAddress, + PluginPath: c.PluginPath, + } + + return srv.Start(ctx) +} diff --git a/internal/cnpgi/instance/wal.go b/internal/cnpgi/instance/wal.go index fb13480..764d370 100644 --- a/internal/cnpgi/instance/wal.go +++ b/internal/cnpgi/instance/wal.go @@ -2,17 +2,38 @@ package instance import ( "context" + "errors" + "fmt" + "os" + "strings" + "time" + barmanapi "github.com/cloudnative-pg/barman-cloud/pkg/api" + "github.com/cloudnative-pg/barman-cloud/pkg/archiver" + barmanCommand "github.com/cloudnative-pg/barman-cloud/pkg/command" + barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials" + barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer" + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" "github.com/cloudnative-pg/cnpg-i/pkg/wal" + "github.com/cloudnative-pg/machinery/pkg/log" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + + barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" ) type WALServiceImplementation struct { + BarmanObjectKey client.ObjectKey + ClusterObjectKey client.ObjectKey + Client client.Client + InstanceName string + SpoolDirectory string + PGDataPath string + PGWALPath string wal.UnimplementedWALServer } -func (w WALServiceImplementation) GetCapabilities( - _ context.Context, _ *wal.WALCapabilitiesRequest, -) (*wal.WALCapabilitiesResult, error) { +func (w WALServiceImplementation) GetCapabilities(ctx context.Context, request *wal.WALCapabilitiesRequest) (*wal.WALCapabilitiesResult, error) { return &wal.WALCapabilitiesResult{ Capabilities: []*wal.WALCapability{ { @@ -33,30 +54,306 @@ func (w WALServiceImplementation) GetCapabilities( }, nil } -func (w WALServiceImplementation) Archive(_ context.Context, _ *wal.WALArchiveRequest) (*wal.WALArchiveResult, - error, -) { +func (w WALServiceImplementation) Archive(ctx context.Context, request *wal.WALArchiveRequest) (*wal.WALArchiveResult, error) { + var objectStore barmancloudv1.ObjectStore + if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { + return nil, err + } + + envArchive, err := barmanCredentials.EnvSetBackupCloudCredentials( + ctx, + w.Client, + objectStore.Namespace, + &objectStore.Spec.Configuration, + os.Environ()) + if apierrors.IsForbidden(err) { + return nil, errors.New("backup credentials don't yet have access permissions. Will retry reconciliation loop") + } + + arch, err := archiver.New(ctx, envArchive, w.SpoolDirectory, w.PGDataPath, w.PGWALPath) + if err != nil { + return nil, err + } + + options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, objectStore.Name) + if err != nil { + return nil, err + } + walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), 1) + result := arch.ArchiveList(ctx, walList, options) + for _, archiverResult := range result { + if archiverResult.Err != nil { + return nil, archiverResult.Err + } + } + + return &wal.WALArchiveResult{}, nil +} + +func (w WALServiceImplementation) Restore(ctx context.Context, request *wal.WALRestoreRequest) (*wal.WALRestoreResult, error) { + contextLogger := log.FromContext(ctx) + startTime := time.Now() + + var cluster *cnpgv1.Cluster + + if err := w.Client.Get(ctx, w.ClusterObjectKey, cluster); err != nil { + return nil, err + } + + var objectStore barmancloudv1.ObjectStore + if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { + return nil, err + } + + // TODO: build full paths + walName := request.GetSourceWalName() + destinationPath := request.GetDestinationFileName() + + barmanConfiguration := &objectStore.Spec.Configuration + + env := getRestoreCABundleEnv(barmanConfiguration) + credentialsEnv, err := barmanCredentials.EnvSetBackupCloudCredentials( + ctx, + w.Client, + objectStore.Namespace, + &objectStore.Spec.Configuration, + os.Environ(), + ) + if err != nil { + return nil, fmt.Errorf("while getting recover credentials: %w", err) + } + mergeEnv(env, credentialsEnv) + + options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, objectStore.Name) + if err != nil { + return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err) + } + + // Create the restorer + var walRestorer *barmanRestorer.WALRestorer + if walRestorer, err = barmanRestorer.New(ctx, env, w.SpoolDirectory); err != nil { + return nil, fmt.Errorf("while creating the restorer: %w", err) + } + + // Step 1: check if this WAL file is not already in the spool + var wasInSpool bool + if wasInSpool, err = walRestorer.RestoreFromSpool(walName, destinationPath); err != nil { + return nil, fmt.Errorf("while restoring a file from the spool directory: %w", err) + } + if wasInSpool { + contextLogger.Info("Restored WAL file from spool (parallel)", + "walName", walName, + ) + return nil, nil + } + + // We skip this step if streaming connection is not available + if isStreamingAvailable(cluster, w.InstanceName) { + if err := checkEndOfWALStreamFlag(walRestorer); err != nil { + return nil, err + } + } + + // Step 3: gather the WAL files names to restore. If the required file isn't a regular WAL, we download it directly. + var walFilesList []string + maxParallel := 1 + if barmanConfiguration.Wal != nil && barmanConfiguration.Wal.MaxParallel > 1 { + maxParallel = barmanConfiguration.Wal.MaxParallel + } + if IsWALFile(walName) { + // If this is a regular WAL file, we try to prefetch + if walFilesList, err = gatherWALFilesToRestore(walName, maxParallel); err != nil { + return nil, fmt.Errorf("while generating the list of WAL files to restore: %w", err) + } + } else { + // This is not a regular WAL file, we fetch it directly + walFilesList = []string{walName} + } + + // Step 4: download the WAL files into the required place + downloadStartTime := time.Now() + walStatus := walRestorer.RestoreList(ctx, walFilesList, destinationPath, options) + + // We return immediately if the first WAL has errors, because the first WAL + // is the one that PostgreSQL has requested to restore. + // The failure has already been logged in walRestorer.RestoreList method + if walStatus[0].Err != nil { + return nil, walStatus[0].Err + } + + //We skip this step if streaming connection is not available + endOfWALStream := isEndOfWALStream(walStatus) + if isStreamingAvailable(cluster, w.InstanceName) && endOfWALStream { + contextLogger.Info( + "Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found") + + err = walRestorer.SetEndOfWALStream() + if err != nil { + return nil, err + } + } + + successfulWalRestore := 0 + for idx := range walStatus { + if walStatus[idx].Err == nil { + successfulWalRestore++ + } + } + + contextLogger.Info("WAL restore command completed (parallel)", + "walName", walName, + "maxParallel", maxParallel, + "successfulWalRestore", successfulWalRestore, + "failedWalRestore", maxParallel-successfulWalRestore, + "startTime", startTime, + "downloadStartTime", downloadStartTime, + "downloadTotalTime", time.Since(downloadStartTime), + "totalTime", time.Since(startTime)) + + return &wal.WALRestoreResult{}, nil +} + +func (w WALServiceImplementation) Status(ctx context.Context, request *wal.WALStatusRequest) (*wal.WALStatusResult, error) { // TODO implement me panic("implement me") } -func (w WALServiceImplementation) Restore(_ context.Context, _ *wal.WALRestoreRequest) (*wal.WALRestoreResult, - error, -) { +func (w WALServiceImplementation) SetFirstRequired(ctx context.Context, request *wal.SetFirstRequiredRequest) (*wal.SetFirstRequiredResult, error) { // TODO implement me panic("implement me") } -func (w WALServiceImplementation) Status(_ context.Context, _ *wal.WALStatusRequest) (*wal.WALStatusResult, - error, -) { - // TODO implement me - panic("implement me") +// mergeEnv merges all the values inside incomingEnv into env. +func mergeEnv(env []string, incomingEnv []string) { + for _, incomingItem := range incomingEnv { + incomingKV := strings.SplitAfterN(incomingItem, "=", 2) + if len(incomingKV) != 2 { + continue + } + for idx, item := range env { + if strings.HasPrefix(item, incomingKV[0]) { + env[idx] = incomingItem + } + } + } } -func (w WALServiceImplementation) SetFirstRequired( - _ context.Context, _ *wal.SetFirstRequiredRequest, -) (*wal.SetFirstRequiredResult, error) { - // TODO implement me - panic("implement me") +// TODO: refactor. +const ( + // ScratchDataDirectory is the directory to be used for scratch data. + ScratchDataDirectory = "/controller" + + // CertificatesDir location to store the certificates. + CertificatesDir = ScratchDataDirectory + "/certificates/" + + // BarmanBackupEndpointCACertificateLocation is the location where the barman endpoint + // CA certificate is stored. + BarmanBackupEndpointCACertificateLocation = CertificatesDir + BarmanBackupEndpointCACertificateFileName + + // BarmanBackupEndpointCACertificateFileName is the name of the file in which the barman endpoint + // CA certificate for backups is stored. + BarmanBackupEndpointCACertificateFileName = "backup-" + BarmanEndpointCACertificateFileName + + // BarmanRestoreEndpointCACertificateFileName is the name of the file in which the barman endpoint + // CA certificate for restores is stored. + BarmanRestoreEndpointCACertificateFileName = "restore-" + BarmanEndpointCACertificateFileName + + // BarmanEndpointCACertificateFileName is the name of the file in which the barman endpoint + // CA certificate is stored. + BarmanEndpointCACertificateFileName = "barman-ca.crt" +) + +func getRestoreCABundleEnv(configuration *barmanapi.BarmanObjectStoreConfiguration) []string { + var env []string + + if configuration.EndpointCA != nil && configuration.BarmanCredentials.AWS != nil { + env = append(env, fmt.Sprintf("AWS_CA_BUNDLE=%s", BarmanBackupEndpointCACertificateLocation)) + } else if configuration.EndpointCA != nil && configuration.BarmanCredentials.Azure != nil { + env = append(env, fmt.Sprintf("REQUESTS_CA_BUNDLE=%s", BarmanBackupEndpointCACertificateLocation)) + } + return env +} + +// isStreamingAvailable checks if this pod can replicate via streaming connection. +func isStreamingAvailable(cluster *cnpgv1.Cluster, podName string) bool { + if cluster == nil { + return false + } + + // Easy case: If this pod is a replica, the streaming is always available + if cluster.Status.CurrentPrimary != podName { + return true + } + + // Designated primary in a replica cluster: return true if the external cluster has streaming connection + if cluster.IsReplica() { + externalCluster, found := cluster.ExternalCluster(cluster.Spec.ReplicaCluster.Source) + + // This is a configuration error + if !found { + return false + } + + return externalCluster.ConnectionParameters != nil + } + + // Primary, we do not replicate from nobody + return false +} + +// gatherWALFilesToRestore files a list of possible WAL files to restore, always +// including as the first one the requested WAL file. +func gatherWALFilesToRestore(walName string, parallel int) (walList []string, err error) { + var segment Segment + + segment, err = SegmentFromName(walName) + if err != nil { + // This seems an invalid segment name. It's not a problem + // because PostgreSQL may request also other files such as + // backup, history, etc. + // Let's just avoid prefetching in this case + return []string{walName}, nil + } + // NextSegments would accept postgresVersion and segmentSize, + // but we do not have this info here, so we pass nil. + segmentList := segment.NextSegments(parallel, nil, nil) + walList = make([]string, len(segmentList)) + for idx := range segmentList { + walList[idx] = segmentList[idx].Name() + } + + return walList, err +} + +// ErrEndOfWALStreamReached is returned when end of WAL is detected in the cloud archive +var ErrEndOfWALStreamReached = errors.New("end of WAL reached") + +// checkEndOfWALStreamFlag returns ErrEndOfWALStreamReached if the flag is set in the restorer. +func checkEndOfWALStreamFlag(walRestorer *barmanRestorer.WALRestorer) error { + contain, err := walRestorer.IsEndOfWALStream() + if err != nil { + return err + } + + if contain { + err := walRestorer.ResetEndOfWalStream() + if err != nil { + return err + } + + return ErrEndOfWALStreamReached + } + return nil +} + +// isEndOfWALStream returns true if one of the downloads has returned +// a file-not-found error. +func isEndOfWALStream(results []barmanRestorer.Result) bool { + for _, result := range results { + if errors.Is(result.Err, barmanRestorer.ErrWALNotFound) { + return true + } + } + + return false } diff --git a/internal/cnpgi/instance/wal_import.go b/internal/cnpgi/instance/wal_import.go new file mode 100644 index 0000000..c10ad4e --- /dev/null +++ b/internal/cnpgi/instance/wal_import.go @@ -0,0 +1,194 @@ +/* +Copyright The CloudNativePG Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instance + +import ( + "errors" + "fmt" + "path" + "regexp" + "strconv" +) + +// TODO: remove this file migrate in cnpg-machinery + +const ( + // DefaultWALSegmentSize is the default size of a single WAL file + // This must be a power of 2. + DefaultWALSegmentSize = int64(1 << 24) + + // WALHexOctetRe is a regex to match 8 Hex characters. + WALHexOctetRe = `([\dA-Fa-f]{8})` + + // WALTimeLineRe is a regex to match the timeline in a WAL filename. + WALTimeLineRe = WALHexOctetRe + + // WALSegmentNameRe is a regex to match the segment parent log file and segment id. + WALSegmentNameRe = WALHexOctetRe + WALHexOctetRe +) + +var ( + // WALRe is the file segment name parser. + WALRe = regexp.MustCompile(`^` + + // everything has a timeline + WALTimeLineRe + + // (1) optional + `(?:` + + // segment name, if a wal file + WALSegmentNameRe + + // and (2) optional + `(?:` + + // offset, if a backup label + `\.[\dA-Fa-f]{8}\.backup` + + // or + `|` + + // partial, if a partial file + `\.partial` + + // close (2) + `)?` + + // or + `|` + + // only .history, if a history file + `\.history` + + // close (1) + `)$`) + + // WALSegmentRe is the file segment name parser. + WALSegmentRe = regexp.MustCompile(`^` + + // everything has a timeline + WALTimeLineRe + + // segment name, if a wal file + WALSegmentNameRe + + `$`) + + // ErrorBadWALSegmentName is raised when parsing an invalid segment name. + ErrorBadWALSegmentName = errors.New("invalid WAL segment name") +) + +// Segment contains the information inside a WAL segment name. +type Segment struct { + // Timeline number + Tli int32 + + // Log number + Log int32 + + // Segment number + Seg int32 +} + +// IsWALFile check if the passed file name is a regular WAL file. +// It supports either a full file path or a simple file name. +func IsWALFile(name string) bool { + baseName := path.Base(name) + return WALSegmentRe.MatchString(baseName) +} + +// SegmentFromName retrieves the timeline, log ID and segment ID +// from the name of a xlog segment, and can also handle a full path +// or a simple file name. +func SegmentFromName(name string) (Segment, error) { + var tli, log, seg int64 + var err error + + baseName := path.Base(name) + // We could have used WALSegmentRe directly, but we wanted to adhere to barman code + subMatches := WALRe.FindStringSubmatch(baseName) + if len(subMatches) != 4 { + return Segment{}, ErrorBadWALSegmentName + } + + if len(subMatches[0]) != 24 { + return Segment{}, ErrorBadWALSegmentName + } + + if tli, err = strconv.ParseInt(subMatches[1], 16, 32); err != nil { + return Segment{}, ErrorBadWALSegmentName + } + + if log, err = strconv.ParseInt(subMatches[2], 16, 32); err != nil { + return Segment{}, ErrorBadWALSegmentName + } + + if seg, err = strconv.ParseInt(subMatches[3], 16, 32); err != nil { + return Segment{}, ErrorBadWALSegmentName + } + + return Segment{ + Tli: int32(tli), + Log: int32(log), + Seg: int32(seg), + }, nil +} + +// MustSegmentFromName is analogous to SegmentFromName but panics +// if the segment name is invalid. +func MustSegmentFromName(name string) Segment { + result, err := SegmentFromName(name) + if err != nil { + panic(err) + } + + return result +} + +// Name gets the name of the segment. +func (segment Segment) Name() string { + return fmt.Sprintf("%08X%08X%08X", segment.Tli, segment.Log, segment.Seg) +} + +// WalSegmentsPerFile is the number of WAL Segments in a WAL File. +func WalSegmentsPerFile(walSegmentSize int64) int32 { + // Given that segment section is represented by 8 hex characters, + // we compute the number of wal segments in a file, by dividing + // the "max segment number" by the wal segment size. + return int32(0xFFFFFFFF / walSegmentSize) //nolint:gosec +} + +// NextSegments generate the list of all possible segment names starting +// from `segment`, until the specified size is reached. This function will +// not ever generate timeline changes. +// If postgresVersion == nil, the latest postgres version is assumed. +// If segmentSize == nil, wal_segment_size=DefaultWALSegmentSize is assumed. +func (segment Segment) NextSegments(size int, postgresVersion *int, segmentSize *int64) []Segment { + result := make([]Segment, 0, size) + + var walSegPerFile int32 + if segmentSize == nil { + walSegPerFile = WalSegmentsPerFile(DefaultWALSegmentSize) + } else { + walSegPerFile = WalSegmentsPerFile(*segmentSize) + } + + skipLastSegment := postgresVersion != nil && *postgresVersion < 90300 + + currentSegment := segment + for len(result) < size { + result = append(result, Segment{ + Tli: currentSegment.Tli, + Log: currentSegment.Log, + Seg: currentSegment.Seg, + }) + currentSegment.Seg++ + if currentSegment.Seg > walSegPerFile || (skipLastSegment && currentSegment.Seg == walSegPerFile) { + currentSegment.Log++ + currentSegment.Seg = 0 + } + } + + return result +} diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 1537778..47b937c 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -57,7 +57,7 @@ func TestControllers(t *testing.T) { var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - ctx, cancel = context.WithCancel(context.TODO()) //nolint: fatcontext + ctx, cancel = context.WithCancel(context.TODO()) By("bootstrapping test environment") testEnv = &envtest.Environment{