diff --git a/cmd/instance/main.go b/cmd/instance/main.go index 2d81206..50073e2 100644 --- a/cmd/instance/main.go +++ b/cmd/instance/main.go @@ -1,10 +1,10 @@ package main import ( + "errors" "os" - barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" - "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/instance" + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -13,6 +13,9 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + + barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" + "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/instance" ) var ( @@ -29,8 +32,10 @@ func init() { func main() { setupLog.Info("Starting barman cloud instance plugin") - namespace := os.Getenv("NAMESPACE") - boName := os.Getenv("BARMAN_OBJECT_NAME") + namespace := mustGetEnv("NAMESPACE") + boName := mustGetEnv("BARMAN_OBJECT_NAME") + clusterName := mustGetEnv("CLUSTER_NAME") + instanceName := mustGetEnv("INSTANCE_NAME") mgr, err := controllerruntime.NewManager(controllerruntime.GetConfigOrDie(), controllerruntime.Options{ Scheme: scheme, @@ -42,6 +47,12 @@ func main() { namespace: {}, }, }, + &cnpgv1.Cluster{}: { + Field: fields.OneTermEqualSelector("metadata.name", clusterName), + Namespaces: map[string]cache.Config{ + namespace: {}, + }, + }, }, }, }) @@ -52,19 +63,24 @@ func main() { if err := mgr.Add(&instance.CNPGI{ Client: mgr.GetClient(), + ClusterObjectKey: client.ObjectKey{ + Namespace: namespace, + Name: clusterName, + }, BarmanObjectKey: client.ObjectKey{ Namespace: namespace, Name: boName, }, + InstanceName: instanceName, // TODO: improve - PGDataPath: os.Getenv("PGDATA"), - PGWALPath: os.Getenv("PGWAL"), - SpoolDirectory: os.Getenv("SPOOL_DIRECTORY"), - ServerCertPath: os.Getenv("SERVER_CERT"), - ServerKeyPath: os.Getenv("SERVER_KEY"), - ClientCertPath: os.Getenv("CLIENT_CERT"), - ServerAddress: os.Getenv("SERVER_ADDRESS"), - PluginPath: os.Getenv("PLUGIN_PATH"), + PGDataPath: mustGetEnv("PGDATA"), + PGWALPath: mustGetEnv("PGWAL"), + SpoolDirectory: mustGetEnv("SPOOL_DIRECTORY"), + ServerCertPath: mustGetEnv("SERVER_CERT"), + ServerKeyPath: mustGetEnv("SERVER_KEY"), + ClientCertPath: mustGetEnv("CLIENT_CERT"), + ServerAddress: mustGetEnv("SERVER_ADDRESS"), + PluginPath: mustGetEnv("PLUGIN_PATH"), }); err != nil { setupLog.Error(err, "unable to create CNPGI runnable") os.Exit(1) @@ -75,3 +91,17 @@ func main() { os.Exit(1) } } + +func mustGetEnv(envName string) string { + value := os.Getenv(envName) + if value == "" { + setupLog.Error( + errors.New("missing required env variable"), + "while fetching env variables", + "name", + envName, + ) + os.Exit(1) + } + return value +} diff --git a/go.mod b/go.mod index 12ad57f..24c5133 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,12 @@ go 1.22.0 require ( github.com/cloudnative-pg/barman-cloud v0.0.0-20240924124724-92831d48562a + github.com/cloudnative-pg/cloudnative-pg v1.24.0 github.com/cloudnative-pg/cnpg-i v0.0.0-20240902182059-c9f193bf825b github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20240926095718-27da985944d4 github.com/cloudnative-pg/machinery v0.0.0-20240919131343-9dd62b9257c7 github.com/onsi/ginkgo/v2 v2.20.2 github.com/onsi/gomega v1.34.2 - github.com/spf13/cobra v1.8.1 google.golang.org/grpc v1.66.0 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.0 @@ -46,6 +46,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect @@ -54,24 +55,30 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0 // indirect github.com/lib/pq v1.10.9 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/moby/spdystream v0.4.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.75.2 // indirect github.com/prometheus/client_golang v1.20.3 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.59.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/robfig/cron v1.2.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect + github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.19.0 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect diff --git a/go.sum b/go.sum index 3cbbb50..c47f300 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -14,6 +16,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudnative-pg/barman-cloud v0.0.0-20240924124724-92831d48562a h1:0v1ML9Eibfq3helbT9GtU0EstqFtG91k/MPO9azY5ME= github.com/cloudnative-pg/barman-cloud v0.0.0-20240924124724-92831d48562a/go.mod h1:Jm0tOp5oB7utpt8wz6RfSv31h1mThOtffjfyxVupriE= +github.com/cloudnative-pg/cloudnative-pg v1.24.0 h1:lY9IP/Gnh5ogNcFGoPnbD2eOiHdSdbdEp0PaUdOAMDQ= +github.com/cloudnative-pg/cloudnative-pg v1.24.0/go.mod h1:n7Qqax6os+x3+7Qu/GojUUeKlL1ELGV63dcO/tzIfB4= github.com/cloudnative-pg/cnpg-i v0.0.0-20240902182059-c9f193bf825b h1:DKMuZrvZJkaZ/rNswcA4q2YST5TRhKPlxFxL5wQXRRs= github.com/cloudnative-pg/cnpg-i v0.0.0-20240902182059-c9f193bf825b/go.mod h1:LhDyBxtoGsWNf0+YXj+uIhFlzHeY8N0QEkO6Kqv8yKY= github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20240926095718-27da985944d4 h1:ZZkI87sz/Bn6O6jaMB4BDpZihGSAk8HkwXB6+ECKlgA= @@ -74,6 +78,8 @@ github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoP github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= @@ -96,6 +102,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0 h1:mjQG0Vakr2h246kEDR85U8y8ZhPgT3bguTCajRa/jaw= +github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= @@ -106,6 +114,8 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/moby/spdystream v0.4.0 h1:Vy79D6mHeJJjiPdFEL2yku1kl0chZpJfZcPpb16BRl8= +github.com/moby/spdystream v0.4.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -113,6 +123,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag= github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8= @@ -124,6 +136,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.75.2 h1:6UsAv+jAevuGO2yZFU/BukV4o9NKnFMOuoouSA4G0ns= +github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.75.2/go.mod h1:XYrdZw5dW12Cjkt4ndbeNZZTBp4UCHtW0ccR9+sTtPU= github.com/prometheus/client_golang v1.20.3 h1:oPksm4K8B+Vt35tUhw6GbSNSgVlVSBH0qELP/7u83l4= github.com/prometheus/client_golang v1.20.3/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= @@ -132,6 +146,8 @@ github.com/prometheus/common v0.59.1 h1:LXb1quJHWm1P6wq/U824uxYi4Sg0oGvNeUm1z5dJ github.com/prometheus/common v0.59.1/go.mod h1:GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/internal/cnpgi/instance/identity.go b/internal/cnpgi/instance/identity.go index b8c0875..85e2f0a 100644 --- a/internal/cnpgi/instance/identity.go +++ b/internal/cnpgi/instance/identity.go @@ -4,10 +4,10 @@ import ( "context" "fmt" - barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" + "github.com/cloudnative-pg/cnpg-i/pkg/identity" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/cloudnative-pg/cnpg-i/pkg/identity" + barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" ) type IdentityImplementation struct { diff --git a/internal/cnpgi/instance/start.go b/internal/cnpgi/instance/start.go index e662f20..1501ee2 100644 --- a/internal/cnpgi/instance/start.go +++ b/internal/cnpgi/instance/start.go @@ -11,28 +11,32 @@ import ( ) type CNPGI struct { - Client client.Client - BarmanObjectKey client.ObjectKey - PGDataPath string - PGWALPath string - SpoolDirectory string - ServerCertPath string - ServerKeyPath string - ClientCertPath string + Client client.Client + BarmanObjectKey client.ObjectKey + ClusterObjectKey client.ObjectKey + PGDataPath string + PGWALPath string + SpoolDirectory string + ServerCertPath string + ServerKeyPath string + ClientCertPath string // mutually exclusive with pluginPath ServerAddress string // mutually exclusive with serverAddress - PluginPath string + PluginPath string + InstanceName string } func (c *CNPGI) Start(ctx context.Context) error { enrich := func(server *grpc.Server) error { wal.RegisterWALServer(server, WALServiceImplementation{ - BarmanObjectKey: c.BarmanObjectKey, - Client: c.Client, - SpoolDirectory: c.SpoolDirectory, - PGDataPath: c.PGDataPath, - PGWALPath: c.PGWALPath, + BarmanObjectKey: c.BarmanObjectKey, + ClusterObjectKey: c.ClusterObjectKey, + InstanceName: c.InstanceName, + Client: c.Client, + SpoolDirectory: c.SpoolDirectory, + PGDataPath: c.PGDataPath, + PGWALPath: c.PGWALPath, }) backup.RegisterBackupServer(server, BackupServiceImplementation{}) return nil diff --git a/internal/cnpgi/instance/wal.go b/internal/cnpgi/instance/wal.go index 6e3f4a3..764d370 100644 --- a/internal/cnpgi/instance/wal.go +++ b/internal/cnpgi/instance/wal.go @@ -4,31 +4,32 @@ import ( "context" "errors" "fmt" + "os" "strings" "time" barmanapi "github.com/cloudnative-pg/barman-cloud/pkg/api" - "github.com/cloudnative-pg/machinery/pkg/log" - - "os" - + "github.com/cloudnative-pg/barman-cloud/pkg/archiver" barmanCommand "github.com/cloudnative-pg/barman-cloud/pkg/command" barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials" barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer" - barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" + "github.com/cloudnative-pg/cnpg-i/pkg/wal" + "github.com/cloudnative-pg/machinery/pkg/log" apierrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/cloudnative-pg/barman-cloud/pkg/archiver" - "github.com/cloudnative-pg/cnpg-i/pkg/wal" + barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" ) type WALServiceImplementation struct { - BarmanObjectKey client.ObjectKey - Client client.Client - SpoolDirectory string - PGDataPath string - PGWALPath string + BarmanObjectKey client.ObjectKey + ClusterObjectKey client.ObjectKey + Client client.Client + InstanceName string + SpoolDirectory string + PGDataPath string + PGWALPath string wal.UnimplementedWALServer } @@ -93,6 +94,12 @@ func (w WALServiceImplementation) Restore(ctx context.Context, request *wal.WALR contextLogger := log.FromContext(ctx) startTime := time.Now() + var cluster *cnpgv1.Cluster + + if err := w.Client.Get(ctx, w.ClusterObjectKey, cluster); err != nil { + return nil, err + } + var objectStore barmancloudv1.ObjectStore if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { return nil, err @@ -140,14 +147,12 @@ func (w WALServiceImplementation) Restore(ctx context.Context, request *wal.WALR return nil, nil } - // TODO - // Step 2: return error if the end-of-wal-stream flag is set. // We skip this step if streaming connection is not available - //if isStreamingAvailable(cluster, podName) { - // if err := checkEndOfWALStreamFlag(walRestorer); err != nil { - // return nil, err - // } - //} + if isStreamingAvailable(cluster, w.InstanceName) { + if err := checkEndOfWALStreamFlag(walRestorer); err != nil { + return nil, err + } + } // Step 3: gather the WAL files names to restore. If the required file isn't a regular WAL, we download it directly. var walFilesList []string @@ -176,19 +181,17 @@ func (w WALServiceImplementation) Restore(ctx context.Context, request *wal.WALR return nil, walStatus[0].Err } - // TODO - // Step 5: set end-of-wal-stream flag if any download job returned file-not-found - // We skip this step if streaming connection is not available - //endOfWALStream := isEndOfWALStream(walStatus) - //if isStreamingAvailable(cluster, podName) && endOfWALStream { - // contextLogger.Info( - // "Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found") - // - // err = walRestorer.SetEndOfWALStream() - // if err != nil { - // return nil, err - // } - //} + //We skip this step if streaming connection is not available + endOfWALStream := isEndOfWALStream(walStatus) + if isStreamingAvailable(cluster, w.InstanceName) && endOfWALStream { + contextLogger.Info( + "Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found") + + err = walRestorer.SetEndOfWALStream() + if err != nil { + return nil, err + } + } successfulWalRestore := 0 for idx := range walStatus { @@ -220,7 +223,7 @@ func (w WALServiceImplementation) SetFirstRequired(ctx context.Context, request panic("implement me") } -// mergeEnv merges all the values inside incomingEnv into env +// mergeEnv merges all the values inside incomingEnv into env. func mergeEnv(env []string, incomingEnv []string) { for _, incomingItem := range incomingEnv { incomingKV := strings.SplitAfterN(incomingItem, "=", 2) @@ -235,28 +238,28 @@ func mergeEnv(env []string, incomingEnv []string) { } } -// TODO: refactor +// TODO: refactor. const ( - // ScratchDataDirectory is the directory to be used for scratch data + // ScratchDataDirectory is the directory to be used for scratch data. ScratchDataDirectory = "/controller" - // CertificatesDir location to store the certificates + // CertificatesDir location to store the certificates. CertificatesDir = ScratchDataDirectory + "/certificates/" // BarmanBackupEndpointCACertificateLocation is the location where the barman endpoint - // CA certificate is stored + // CA certificate is stored. BarmanBackupEndpointCACertificateLocation = CertificatesDir + BarmanBackupEndpointCACertificateFileName // BarmanBackupEndpointCACertificateFileName is the name of the file in which the barman endpoint - // CA certificate for backups is stored + // CA certificate for backups is stored. BarmanBackupEndpointCACertificateFileName = "backup-" + BarmanEndpointCACertificateFileName // BarmanRestoreEndpointCACertificateFileName is the name of the file in which the barman endpoint - // CA certificate for restores is stored + // CA certificate for restores is stored. BarmanRestoreEndpointCACertificateFileName = "restore-" + BarmanEndpointCACertificateFileName // BarmanEndpointCACertificateFileName is the name of the file in which the barman endpoint - // CA certificate is stored + // CA certificate is stored. BarmanEndpointCACertificateFileName = "barman-ca.crt" ) @@ -271,35 +274,35 @@ func getRestoreCABundleEnv(configuration *barmanapi.BarmanObjectStoreConfigurati return env } -//// isStreamingAvailable checks if this pod can replicate via streaming connection -//func isStreamingAvailable(cluster *apiv1.Cluster, podName string) bool { -// if cluster == nil { -// return false -// } -// -// // Easy case: If this pod is a replica, the streaming is always available -// if cluster.Status.CurrentPrimary != podName { -// return true -// } -// -// // Designated primary in a replica cluster: return true if the external cluster has streaming connection -// if cluster.IsReplica() { -// externalCluster, found := cluster.ExternalCluster(cluster.Spec.ReplicaCluster.Source) -// -// // This is a configuration error -// if !found { -// return false -// } -// -// return externalCluster.ConnectionParameters != nil -// } -// -// // Primary, we do not replicate from nobody -// return false -//} +// isStreamingAvailable checks if this pod can replicate via streaming connection. +func isStreamingAvailable(cluster *cnpgv1.Cluster, podName string) bool { + if cluster == nil { + return false + } + + // Easy case: If this pod is a replica, the streaming is always available + if cluster.Status.CurrentPrimary != podName { + return true + } + + // Designated primary in a replica cluster: return true if the external cluster has streaming connection + if cluster.IsReplica() { + externalCluster, found := cluster.ExternalCluster(cluster.Spec.ReplicaCluster.Source) + + // This is a configuration error + if !found { + return false + } + + return externalCluster.ConnectionParameters != nil + } + + // Primary, we do not replicate from nobody + return false +} // gatherWALFilesToRestore files a list of possible WAL files to restore, always -// including as the first one the requested WAL file +// including as the first one the requested WAL file. func gatherWALFilesToRestore(walName string, parallel int) (walList []string, err error) { var segment Segment @@ -321,3 +324,36 @@ func gatherWALFilesToRestore(walName string, parallel int) (walList []string, er return walList, err } + +// ErrEndOfWALStreamReached is returned when end of WAL is detected in the cloud archive +var ErrEndOfWALStreamReached = errors.New("end of WAL reached") + +// checkEndOfWALStreamFlag returns ErrEndOfWALStreamReached if the flag is set in the restorer. +func checkEndOfWALStreamFlag(walRestorer *barmanRestorer.WALRestorer) error { + contain, err := walRestorer.IsEndOfWALStream() + if err != nil { + return err + } + + if contain { + err := walRestorer.ResetEndOfWalStream() + if err != nil { + return err + } + + return ErrEndOfWALStreamReached + } + return nil +} + +// isEndOfWALStream returns true if one of the downloads has returned +// a file-not-found error. +func isEndOfWALStream(results []barmanRestorer.Result) bool { + for _, result := range results { + if errors.Is(result.Err, barmanRestorer.ErrWALNotFound) { + return true + } + } + + return false +} diff --git a/internal/cnpgi/instance/wal_import.go b/internal/cnpgi/instance/wal_import.go index 0fb8cf2..c10ad4e 100644 --- a/internal/cnpgi/instance/wal_import.go +++ b/internal/cnpgi/instance/wal_import.go @@ -28,21 +28,21 @@ import ( const ( // DefaultWALSegmentSize is the default size of a single WAL file - // This must be a power of 2 + // This must be a power of 2. DefaultWALSegmentSize = int64(1 << 24) - // WALHexOctetRe is a regex to match 8 Hex characters + // WALHexOctetRe is a regex to match 8 Hex characters. WALHexOctetRe = `([\dA-Fa-f]{8})` - // WALTimeLineRe is a regex to match the timeline in a WAL filename + // WALTimeLineRe is a regex to match the timeline in a WAL filename. WALTimeLineRe = WALHexOctetRe - // WALSegmentNameRe is a regex to match the segment parent log file and segment id + // WALSegmentNameRe is a regex to match the segment parent log file and segment id. WALSegmentNameRe = WALHexOctetRe + WALHexOctetRe ) var ( - // WALRe is the file segment name parser + // WALRe is the file segment name parser. WALRe = regexp.MustCompile(`^` + // everything has a timeline WALTimeLineRe + @@ -67,7 +67,7 @@ var ( // close (1) `)$`) - // WALSegmentRe is the file segment name parser + // WALSegmentRe is the file segment name parser. WALSegmentRe = regexp.MustCompile(`^` + // everything has a timeline WALTimeLineRe + @@ -75,11 +75,11 @@ var ( WALSegmentNameRe + `$`) - // ErrorBadWALSegmentName is raised when parsing an invalid segment name + // ErrorBadWALSegmentName is raised when parsing an invalid segment name. ErrorBadWALSegmentName = errors.New("invalid WAL segment name") ) -// Segment contains the information inside a WAL segment name +// Segment contains the information inside a WAL segment name. type Segment struct { // Timeline number Tli int32 @@ -92,7 +92,7 @@ type Segment struct { } // IsWALFile check if the passed file name is a regular WAL file. -// It supports either a full file path or a simple file name +// It supports either a full file path or a simple file name. func IsWALFile(name string) bool { baseName := path.Base(name) return WALSegmentRe.MatchString(baseName) @@ -100,7 +100,7 @@ func IsWALFile(name string) bool { // SegmentFromName retrieves the timeline, log ID and segment ID // from the name of a xlog segment, and can also handle a full path -// or a simple file name +// or a simple file name. func SegmentFromName(name string) (Segment, error) { var tli, log, seg int64 var err error @@ -136,7 +136,7 @@ func SegmentFromName(name string) (Segment, error) { } // MustSegmentFromName is analogous to SegmentFromName but panics -// if the segment name is invalid +// if the segment name is invalid. func MustSegmentFromName(name string) Segment { result, err := SegmentFromName(name) if err != nil { @@ -146,12 +146,12 @@ func MustSegmentFromName(name string) Segment { return result } -// Name gets the name of the segment +// Name gets the name of the segment. func (segment Segment) Name() string { return fmt.Sprintf("%08X%08X%08X", segment.Tli, segment.Log, segment.Seg) } -// WalSegmentsPerFile is the number of WAL Segments in a WAL File +// WalSegmentsPerFile is the number of WAL Segments in a WAL File. func WalSegmentsPerFile(walSegmentSize int64) int32 { // Given that segment section is represented by 8 hex characters, // we compute the number of wal segments in a file, by dividing diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 1537778..47b937c 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -57,7 +57,7 @@ func TestControllers(t *testing.T) { var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - ctx, cancel = context.WithCancel(context.TODO()) //nolint: fatcontext + ctx, cancel = context.WithCancel(context.TODO()) By("bootstrapping test environment") testEnv = &envtest.Environment{