fix: remove todos, ensure env are present

Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
This commit is contained in:
Armando Ruocco 2024-09-30 10:38:30 +02:00
parent 93ea6af1b1
commit a4f312a2bf
8 changed files with 203 additions and 110 deletions

View File

@ -1,10 +1,10 @@
package main
import (
"errors"
"os"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/instance"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -13,6 +13,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/instance"
)
var (
@ -29,8 +32,10 @@ func init() {
func main() {
setupLog.Info("Starting barman cloud instance plugin")
namespace := os.Getenv("NAMESPACE")
boName := os.Getenv("BARMAN_OBJECT_NAME")
namespace := mustGetEnv("NAMESPACE")
boName := mustGetEnv("BARMAN_OBJECT_NAME")
clusterName := mustGetEnv("CLUSTER_NAME")
instanceName := mustGetEnv("INSTANCE_NAME")
mgr, err := controllerruntime.NewManager(controllerruntime.GetConfigOrDie(), controllerruntime.Options{
Scheme: scheme,
@ -42,6 +47,12 @@ func main() {
namespace: {},
},
},
&cnpgv1.Cluster{}: {
Field: fields.OneTermEqualSelector("metadata.name", clusterName),
Namespaces: map[string]cache.Config{
namespace: {},
},
},
},
},
})
@ -52,19 +63,24 @@ func main() {
if err := mgr.Add(&instance.CNPGI{
Client: mgr.GetClient(),
ClusterObjectKey: client.ObjectKey{
Namespace: namespace,
Name: clusterName,
},
BarmanObjectKey: client.ObjectKey{
Namespace: namespace,
Name: boName,
},
InstanceName: instanceName,
// TODO: improve
PGDataPath: os.Getenv("PGDATA"),
PGWALPath: os.Getenv("PGWAL"),
SpoolDirectory: os.Getenv("SPOOL_DIRECTORY"),
ServerCertPath: os.Getenv("SERVER_CERT"),
ServerKeyPath: os.Getenv("SERVER_KEY"),
ClientCertPath: os.Getenv("CLIENT_CERT"),
ServerAddress: os.Getenv("SERVER_ADDRESS"),
PluginPath: os.Getenv("PLUGIN_PATH"),
PGDataPath: mustGetEnv("PGDATA"),
PGWALPath: mustGetEnv("PGWAL"),
SpoolDirectory: mustGetEnv("SPOOL_DIRECTORY"),
ServerCertPath: mustGetEnv("SERVER_CERT"),
ServerKeyPath: mustGetEnv("SERVER_KEY"),
ClientCertPath: mustGetEnv("CLIENT_CERT"),
ServerAddress: mustGetEnv("SERVER_ADDRESS"),
PluginPath: mustGetEnv("PLUGIN_PATH"),
}); err != nil {
setupLog.Error(err, "unable to create CNPGI runnable")
os.Exit(1)
@ -75,3 +91,17 @@ func main() {
os.Exit(1)
}
}
func mustGetEnv(envName string) string {
value := os.Getenv(envName)
if value == "" {
setupLog.Error(
errors.New("missing required env variable"),
"while fetching env variables",
"name",
envName,
)
os.Exit(1)
}
return value
}

9
go.mod
View File

@ -4,12 +4,12 @@ go 1.22.0
require (
github.com/cloudnative-pg/barman-cloud v0.0.0-20240924124724-92831d48562a
github.com/cloudnative-pg/cloudnative-pg v1.24.0
github.com/cloudnative-pg/cnpg-i v0.0.0-20240902182059-c9f193bf825b
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20240926095718-27da985944d4
github.com/cloudnative-pg/machinery v0.0.0-20240919131343-9dd62b9257c7
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2
github.com/spf13/cobra v1.8.1
google.golang.org/grpc v1.66.0
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.0
@ -46,6 +46,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
@ -54,24 +55,30 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/spdystream v0.4.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.75.2 // indirect
github.com/prometheus/client_golang v1.20.3 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.59.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.19.0 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect

16
go.sum
View File

@ -1,5 +1,7 @@
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@ -14,6 +16,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudnative-pg/barman-cloud v0.0.0-20240924124724-92831d48562a h1:0v1ML9Eibfq3helbT9GtU0EstqFtG91k/MPO9azY5ME=
github.com/cloudnative-pg/barman-cloud v0.0.0-20240924124724-92831d48562a/go.mod h1:Jm0tOp5oB7utpt8wz6RfSv31h1mThOtffjfyxVupriE=
github.com/cloudnative-pg/cloudnative-pg v1.24.0 h1:lY9IP/Gnh5ogNcFGoPnbD2eOiHdSdbdEp0PaUdOAMDQ=
github.com/cloudnative-pg/cloudnative-pg v1.24.0/go.mod h1:n7Qqax6os+x3+7Qu/GojUUeKlL1ELGV63dcO/tzIfB4=
github.com/cloudnative-pg/cnpg-i v0.0.0-20240902182059-c9f193bf825b h1:DKMuZrvZJkaZ/rNswcA4q2YST5TRhKPlxFxL5wQXRRs=
github.com/cloudnative-pg/cnpg-i v0.0.0-20240902182059-c9f193bf825b/go.mod h1:LhDyBxtoGsWNf0+YXj+uIhFlzHeY8N0QEkO6Kqv8yKY=
github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20240926095718-27da985944d4 h1:ZZkI87sz/Bn6O6jaMB4BDpZihGSAk8HkwXB6+ECKlgA=
@ -74,6 +78,8 @@ github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoP
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
@ -96,6 +102,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0 h1:mjQG0Vakr2h246kEDR85U8y8ZhPgT3bguTCajRa/jaw=
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
@ -106,6 +114,8 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/spdystream v0.4.0 h1:Vy79D6mHeJJjiPdFEL2yku1kl0chZpJfZcPpb16BRl8=
github.com/moby/spdystream v0.4.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -113,6 +123,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4=
github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag=
github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8=
@ -124,6 +136,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.75.2 h1:6UsAv+jAevuGO2yZFU/BukV4o9NKnFMOuoouSA4G0ns=
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.75.2/go.mod h1:XYrdZw5dW12Cjkt4ndbeNZZTBp4UCHtW0ccR9+sTtPU=
github.com/prometheus/client_golang v1.20.3 h1:oPksm4K8B+Vt35tUhw6GbSNSgVlVSBH0qELP/7u83l4=
github.com/prometheus/client_golang v1.20.3/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
@ -132,6 +146,8 @@ github.com/prometheus/common v0.59.1 h1:LXb1quJHWm1P6wq/U824uxYi4Sg0oGvNeUm1z5dJ
github.com/prometheus/common v0.59.1/go.mod h1:GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=

View File

@ -4,10 +4,10 @@ import (
"context"
"fmt"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/cnpg-i/pkg/identity"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/cloudnative-pg/cnpg-i/pkg/identity"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
)
type IdentityImplementation struct {

View File

@ -13,6 +13,7 @@ import (
type CNPGI struct {
Client client.Client
BarmanObjectKey client.ObjectKey
ClusterObjectKey client.ObjectKey
PGDataPath string
PGWALPath string
SpoolDirectory string
@ -23,12 +24,15 @@ type CNPGI struct {
ServerAddress string
// mutually exclusive with serverAddress
PluginPath string
InstanceName string
}
func (c *CNPGI) Start(ctx context.Context) error {
enrich := func(server *grpc.Server) error {
wal.RegisterWALServer(server, WALServiceImplementation{
BarmanObjectKey: c.BarmanObjectKey,
ClusterObjectKey: c.ClusterObjectKey,
InstanceName: c.InstanceName,
Client: c.Client,
SpoolDirectory: c.SpoolDirectory,
PGDataPath: c.PGDataPath,

View File

@ -4,28 +4,29 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"
"time"
barmanapi "github.com/cloudnative-pg/barman-cloud/pkg/api"
"github.com/cloudnative-pg/machinery/pkg/log"
"os"
"github.com/cloudnative-pg/barman-cloud/pkg/archiver"
barmanCommand "github.com/cloudnative-pg/barman-cloud/pkg/command"
barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials"
barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
"github.com/cloudnative-pg/machinery/pkg/log"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/cloudnative-pg/barman-cloud/pkg/archiver"
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
)
type WALServiceImplementation struct {
BarmanObjectKey client.ObjectKey
ClusterObjectKey client.ObjectKey
Client client.Client
InstanceName string
SpoolDirectory string
PGDataPath string
PGWALPath string
@ -93,6 +94,12 @@ func (w WALServiceImplementation) Restore(ctx context.Context, request *wal.WALR
contextLogger := log.FromContext(ctx)
startTime := time.Now()
var cluster *cnpgv1.Cluster
if err := w.Client.Get(ctx, w.ClusterObjectKey, cluster); err != nil {
return nil, err
}
var objectStore barmancloudv1.ObjectStore
if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil {
return nil, err
@ -140,14 +147,12 @@ func (w WALServiceImplementation) Restore(ctx context.Context, request *wal.WALR
return nil, nil
}
// TODO
// Step 2: return error if the end-of-wal-stream flag is set.
// We skip this step if streaming connection is not available
//if isStreamingAvailable(cluster, podName) {
// if err := checkEndOfWALStreamFlag(walRestorer); err != nil {
// return nil, err
// }
//}
if isStreamingAvailable(cluster, w.InstanceName) {
if err := checkEndOfWALStreamFlag(walRestorer); err != nil {
return nil, err
}
}
// Step 3: gather the WAL files names to restore. If the required file isn't a regular WAL, we download it directly.
var walFilesList []string
@ -176,19 +181,17 @@ func (w WALServiceImplementation) Restore(ctx context.Context, request *wal.WALR
return nil, walStatus[0].Err
}
// TODO
// Step 5: set end-of-wal-stream flag if any download job returned file-not-found
// We skip this step if streaming connection is not available
//endOfWALStream := isEndOfWALStream(walStatus)
//if isStreamingAvailable(cluster, podName) && endOfWALStream {
// contextLogger.Info(
// "Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found")
//
// err = walRestorer.SetEndOfWALStream()
// if err != nil {
// return nil, err
// }
//}
//We skip this step if streaming connection is not available
endOfWALStream := isEndOfWALStream(walStatus)
if isStreamingAvailable(cluster, w.InstanceName) && endOfWALStream {
contextLogger.Info(
"Set end-of-wal-stream flag as one of the WAL files to be prefetched was not found")
err = walRestorer.SetEndOfWALStream()
if err != nil {
return nil, err
}
}
successfulWalRestore := 0
for idx := range walStatus {
@ -220,7 +223,7 @@ func (w WALServiceImplementation) SetFirstRequired(ctx context.Context, request
panic("implement me")
}
// mergeEnv merges all the values inside incomingEnv into env
// mergeEnv merges all the values inside incomingEnv into env.
func mergeEnv(env []string, incomingEnv []string) {
for _, incomingItem := range incomingEnv {
incomingKV := strings.SplitAfterN(incomingItem, "=", 2)
@ -235,28 +238,28 @@ func mergeEnv(env []string, incomingEnv []string) {
}
}
// TODO: refactor
// TODO: refactor.
const (
// ScratchDataDirectory is the directory to be used for scratch data
// ScratchDataDirectory is the directory to be used for scratch data.
ScratchDataDirectory = "/controller"
// CertificatesDir location to store the certificates
// CertificatesDir location to store the certificates.
CertificatesDir = ScratchDataDirectory + "/certificates/"
// BarmanBackupEndpointCACertificateLocation is the location where the barman endpoint
// CA certificate is stored
// CA certificate is stored.
BarmanBackupEndpointCACertificateLocation = CertificatesDir + BarmanBackupEndpointCACertificateFileName
// BarmanBackupEndpointCACertificateFileName is the name of the file in which the barman endpoint
// CA certificate for backups is stored
// CA certificate for backups is stored.
BarmanBackupEndpointCACertificateFileName = "backup-" + BarmanEndpointCACertificateFileName
// BarmanRestoreEndpointCACertificateFileName is the name of the file in which the barman endpoint
// CA certificate for restores is stored
// CA certificate for restores is stored.
BarmanRestoreEndpointCACertificateFileName = "restore-" + BarmanEndpointCACertificateFileName
// BarmanEndpointCACertificateFileName is the name of the file in which the barman endpoint
// CA certificate is stored
// CA certificate is stored.
BarmanEndpointCACertificateFileName = "barman-ca.crt"
)
@ -271,35 +274,35 @@ func getRestoreCABundleEnv(configuration *barmanapi.BarmanObjectStoreConfigurati
return env
}
//// isStreamingAvailable checks if this pod can replicate via streaming connection
//func isStreamingAvailable(cluster *apiv1.Cluster, podName string) bool {
// if cluster == nil {
// return false
// }
//
// // Easy case: If this pod is a replica, the streaming is always available
// if cluster.Status.CurrentPrimary != podName {
// return true
// }
//
// // Designated primary in a replica cluster: return true if the external cluster has streaming connection
// if cluster.IsReplica() {
// externalCluster, found := cluster.ExternalCluster(cluster.Spec.ReplicaCluster.Source)
//
// // This is a configuration error
// if !found {
// return false
// }
//
// return externalCluster.ConnectionParameters != nil
// }
//
// // Primary, we do not replicate from nobody
// return false
//}
// isStreamingAvailable checks if this pod can replicate via streaming connection.
func isStreamingAvailable(cluster *cnpgv1.Cluster, podName string) bool {
if cluster == nil {
return false
}
// Easy case: If this pod is a replica, the streaming is always available
if cluster.Status.CurrentPrimary != podName {
return true
}
// Designated primary in a replica cluster: return true if the external cluster has streaming connection
if cluster.IsReplica() {
externalCluster, found := cluster.ExternalCluster(cluster.Spec.ReplicaCluster.Source)
// This is a configuration error
if !found {
return false
}
return externalCluster.ConnectionParameters != nil
}
// Primary, we do not replicate from nobody
return false
}
// gatherWALFilesToRestore files a list of possible WAL files to restore, always
// including as the first one the requested WAL file
// including as the first one the requested WAL file.
func gatherWALFilesToRestore(walName string, parallel int) (walList []string, err error) {
var segment Segment
@ -321,3 +324,36 @@ func gatherWALFilesToRestore(walName string, parallel int) (walList []string, er
return walList, err
}
// ErrEndOfWALStreamReached is returned when end of WAL is detected in the cloud archive
var ErrEndOfWALStreamReached = errors.New("end of WAL reached")
// checkEndOfWALStreamFlag returns ErrEndOfWALStreamReached if the flag is set in the restorer.
func checkEndOfWALStreamFlag(walRestorer *barmanRestorer.WALRestorer) error {
contain, err := walRestorer.IsEndOfWALStream()
if err != nil {
return err
}
if contain {
err := walRestorer.ResetEndOfWalStream()
if err != nil {
return err
}
return ErrEndOfWALStreamReached
}
return nil
}
// isEndOfWALStream returns true if one of the downloads has returned
// a file-not-found error.
func isEndOfWALStream(results []barmanRestorer.Result) bool {
for _, result := range results {
if errors.Is(result.Err, barmanRestorer.ErrWALNotFound) {
return true
}
}
return false
}

View File

@ -28,21 +28,21 @@ import (
const (
// DefaultWALSegmentSize is the default size of a single WAL file
// This must be a power of 2
// This must be a power of 2.
DefaultWALSegmentSize = int64(1 << 24)
// WALHexOctetRe is a regex to match 8 Hex characters
// WALHexOctetRe is a regex to match 8 Hex characters.
WALHexOctetRe = `([\dA-Fa-f]{8})`
// WALTimeLineRe is a regex to match the timeline in a WAL filename
// WALTimeLineRe is a regex to match the timeline in a WAL filename.
WALTimeLineRe = WALHexOctetRe
// WALSegmentNameRe is a regex to match the segment parent log file and segment id
// WALSegmentNameRe is a regex to match the segment parent log file and segment id.
WALSegmentNameRe = WALHexOctetRe + WALHexOctetRe
)
var (
// WALRe is the file segment name parser
// WALRe is the file segment name parser.
WALRe = regexp.MustCompile(`^` +
// everything has a timeline
WALTimeLineRe +
@ -67,7 +67,7 @@ var (
// close (1)
`)$`)
// WALSegmentRe is the file segment name parser
// WALSegmentRe is the file segment name parser.
WALSegmentRe = regexp.MustCompile(`^` +
// everything has a timeline
WALTimeLineRe +
@ -75,11 +75,11 @@ var (
WALSegmentNameRe +
`$`)
// ErrorBadWALSegmentName is raised when parsing an invalid segment name
// ErrorBadWALSegmentName is raised when parsing an invalid segment name.
ErrorBadWALSegmentName = errors.New("invalid WAL segment name")
)
// Segment contains the information inside a WAL segment name
// Segment contains the information inside a WAL segment name.
type Segment struct {
// Timeline number
Tli int32
@ -92,7 +92,7 @@ type Segment struct {
}
// IsWALFile check if the passed file name is a regular WAL file.
// It supports either a full file path or a simple file name
// It supports either a full file path or a simple file name.
func IsWALFile(name string) bool {
baseName := path.Base(name)
return WALSegmentRe.MatchString(baseName)
@ -100,7 +100,7 @@ func IsWALFile(name string) bool {
// SegmentFromName retrieves the timeline, log ID and segment ID
// from the name of a xlog segment, and can also handle a full path
// or a simple file name
// or a simple file name.
func SegmentFromName(name string) (Segment, error) {
var tli, log, seg int64
var err error
@ -136,7 +136,7 @@ func SegmentFromName(name string) (Segment, error) {
}
// MustSegmentFromName is analogous to SegmentFromName but panics
// if the segment name is invalid
// if the segment name is invalid.
func MustSegmentFromName(name string) Segment {
result, err := SegmentFromName(name)
if err != nil {
@ -146,12 +146,12 @@ func MustSegmentFromName(name string) Segment {
return result
}
// Name gets the name of the segment
// Name gets the name of the segment.
func (segment Segment) Name() string {
return fmt.Sprintf("%08X%08X%08X", segment.Tli, segment.Log, segment.Seg)
}
// WalSegmentsPerFile is the number of WAL Segments in a WAL File
// WalSegmentsPerFile is the number of WAL Segments in a WAL File.
func WalSegmentsPerFile(walSegmentSize int64) int32 {
// Given that segment section is represented by 8 hex characters,
// we compute the number of wal segments in a file, by dividing

View File

@ -57,7 +57,7 @@ func TestControllers(t *testing.T) {
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
ctx, cancel = context.WithCancel(context.TODO()) //nolint: fatcontext
ctx, cancel = context.WithCancel(context.TODO())
By("bootstrapping test environment")
testEnv = &envtest.Environment{