test(e2e): replica cluster (#88)

Signed-off-by: Francesco Canovai <francesco.canovai@enterprisedb.com>
Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
Co-authored-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
This commit is contained in:
Francesco Canovai 2024-12-09 16:05:36 +01:00 committed by GitHub
parent c4623066ce
commit e5a004d20d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 605 additions and 25 deletions

View File

@ -204,7 +204,7 @@ tasks:
deps: deps:
- build-images - build-images
cmds: cmds:
- go test -timeout 30m -v ./test/e2e - go test -timeout 60m -v ./test/e2e
ci: ci:
desc: Run the CI pipeline desc: Run the CI pipeline

View File

@ -41,6 +41,7 @@ import (
"github.com/cloudnative-pg/plugin-barman-cloud/test/e2e/internal/kustomize" "github.com/cloudnative-pg/plugin-barman-cloud/test/e2e/internal/kustomize"
_ "github.com/cloudnative-pg/plugin-barman-cloud/test/e2e/internal/tests/backup" _ "github.com/cloudnative-pg/plugin-barman-cloud/test/e2e/internal/tests/backup"
_ "github.com/cloudnative-pg/plugin-barman-cloud/test/e2e/internal/tests/replicacluster"
. "github.com/onsi/ginkgo/v2" . "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"

View File

@ -31,8 +31,9 @@ import (
pluginBarmanCloudV1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" pluginBarmanCloudV1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
) )
func NewAzuriteObjectStoreResources(namespace, name string) Resources { // NewAzuriteObjectStoreResources creates the resources required to create an Azurite object store.
return Resources{ func NewAzuriteObjectStoreResources(namespace, name string) *Resources {
return &Resources{
Deployment: newAzuriteDeployment(namespace, name), Deployment: newAzuriteDeployment(namespace, name),
Service: newAzuriteService(namespace, name), Service: newAzuriteService(namespace, name),
PVC: newAzuritePVC(namespace, name), PVC: newAzuritePVC(namespace, name),
@ -174,6 +175,7 @@ func newAzuritePVC(namespace, name string) *corev1.PersistentVolumeClaim {
} }
} }
// NewAzuriteObjectStore creates a new ObjectStore object for Azurite.
func NewAzuriteObjectStore(namespace, name, azuriteOSName string) *pluginBarmanCloudV1.ObjectStore { func NewAzuriteObjectStore(namespace, name, azuriteOSName string) *pluginBarmanCloudV1.ObjectStore {
return &pluginBarmanCloudV1.ObjectStore{ return &pluginBarmanCloudV1.ObjectStore{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{

View File

@ -31,8 +31,9 @@ import (
pluginBarmanCloudV1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" pluginBarmanCloudV1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
) )
func NewGCSObjectStoreResources(namespace, name string) Resources { // NewGCSObjectStoreResources creates the resources required to create a GCS object store.
return Resources{ func NewGCSObjectStoreResources(namespace, name string) *Resources {
return &Resources{
Deployment: newGCSDeployment(namespace, name), Deployment: newGCSDeployment(namespace, name),
Service: newGCSService(namespace, name), Service: newGCSService(namespace, name),
Secret: newGCSSecret(namespace, name), Secret: newGCSSecret(namespace, name),
@ -76,7 +77,8 @@ func newGCSDeployment(namespace, name string) *appsv1.Deployment {
}, },
}, },
Command: []string{"fake-gcs-server"}, Command: []string{"fake-gcs-server"},
Args: []string{"-scheme", Args: []string{
"-scheme",
"http", "http",
"-port", "-port",
"4443", "4443",
@ -167,6 +169,7 @@ func newGCSPVC(namespace, name string) *corev1.PersistentVolumeClaim {
} }
} }
// NewGCSObjectStore creates a new GCS object store.
func NewGCSObjectStore(namespace, name, gcsOSName string) *pluginBarmanCloudV1.ObjectStore { func NewGCSObjectStore(namespace, name, gcsOSName string) *pluginBarmanCloudV1.ObjectStore {
return &pluginBarmanCloudV1.ObjectStore{ return &pluginBarmanCloudV1.ObjectStore{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{

View File

@ -31,8 +31,9 @@ import (
pluginBarmanCloudV1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" pluginBarmanCloudV1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
) )
func NewMinioObjectStoreResources(namespace, name string) Resources { // NewMinioObjectStoreResources creates the resources required to create a Minio object store.
return Resources{ func NewMinioObjectStoreResources(namespace, name string) *Resources {
return &Resources{
Deployment: newMinioDeployment(namespace, name), Deployment: newMinioDeployment(namespace, name),
Service: newMinioService(namespace, name), Service: newMinioService(namespace, name),
PVC: newMinioPVC(namespace, name), PVC: newMinioPVC(namespace, name),
@ -189,6 +190,7 @@ func newMinioPVC(namespace, name string) *corev1.PersistentVolumeClaim {
} }
} }
// NewMinioObjectStore creates a new Minio object store.
func NewMinioObjectStore(namespace, name, minioOSName string) *pluginBarmanCloudV1.ObjectStore { func NewMinioObjectStore(namespace, name, minioOSName string) *pluginBarmanCloudV1.ObjectStore {
return &pluginBarmanCloudV1.ObjectStore{ return &pluginBarmanCloudV1.ObjectStore{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{

View File

@ -20,37 +20,45 @@ import (
"context" "context"
"fmt" "fmt"
"k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
v2 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
) )
const ( const (
// Size of the PVCs for the object stores // DefaultSize is the default size of the PVCs for the object stores.
DefaultSize = "1Gi" DefaultSize = "1Gi"
) )
// Resources represents the resources required to create an object store. // Resources represents the resources required to create an object store.
type Resources struct { type Resources struct {
Deployment *v1.Deployment Deployment *appsv1.Deployment
Service *v2.Service Service *corev1.Service
Secret *v2.Secret Secret *corev1.Secret
PVC *v2.PersistentVolumeClaim PVC *corev1.PersistentVolumeClaim
} }
// Create creates the object store resources. // Create creates the object store resources.
func (osr Resources) Create(ctx context.Context, cl client.Client) error { func (osr Resources) Create(ctx context.Context, cl client.Client) error {
if err := cl.Create(ctx, osr.PVC); err != nil { if osr.PVC != nil {
return fmt.Errorf("failed to create PVC: %w", err) if err := cl.Create(ctx, osr.PVC); err != nil {
return fmt.Errorf("failed to create PVC: %w", err)
}
} }
if err := cl.Create(ctx, osr.Secret); err != nil { if osr.Secret != nil {
return fmt.Errorf("failed to create secret: %w", err) if err := cl.Create(ctx, osr.Secret); err != nil {
return fmt.Errorf("failed to create secret: %w", err)
}
} }
if err := cl.Create(ctx, osr.Deployment); err != nil { if osr.Deployment != nil {
return fmt.Errorf("failed to create deployment: %w", err) if err := cl.Create(ctx, osr.Deployment); err != nil {
return fmt.Errorf("failed to create deployment: %w", err)
}
} }
if err := cl.Create(ctx, osr.Service); err != nil { if osr.Service != nil {
return fmt.Errorf("failed to create service: %w", err) if err := cl.Create(ctx, osr.Service); err != nil {
return fmt.Errorf("failed to create service: %w", err)
}
} }
return nil return nil

View File

@ -34,7 +34,7 @@ const (
minio = "minio" minio = "minio"
azurite = "azurite" azurite = "azurite"
gcs = "gcs" gcs = "gcs"
// Size of the PVCs for the object stores and the cluster instances // Size of the PVCs for the object stores and the cluster instances.
size = "1Gi" size = "1Gi"
srcClusterName = "source" srcClusterName = "source"
srcBackupName = "source" srcBackupName = "source"
@ -48,7 +48,7 @@ type testCaseFactory interface {
} }
type backupRestoreTestResources struct { type backupRestoreTestResources struct {
ObjectStoreResources objectstore.Resources ObjectStoreResources *objectstore.Resources
ObjectStore *pluginBarmanCloudV1.ObjectStore ObjectStore *pluginBarmanCloudV1.ObjectStore
SrcCluster *cloudnativepgv1.Cluster SrcCluster *cloudnativepgv1.Cluster
SrcBackup *cloudnativepgv1.Backup SrcBackup *cloudnativepgv1.Backup

View File

@ -0,0 +1,19 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package replicacluster contains tests validating replica clusters
// using the Barman Cloud Plugin.
package replicacluster

View File

@ -0,0 +1,272 @@
package replicacluster
import (
cloudnativepgv1 "github.com/cloudnative-pg/api/pkg/api/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pluginBarmanCloudV1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/test/e2e/internal/objectstore"
)
type testCaseFactory interface {
createReplicaClusterTestResources(namespace string) replicaClusterTestResources
}
const (
// Size of the PVCs for the object stores and the cluster instances.
size = "1Gi"
srcObjectStoreName = "source"
srcClusterName = "source"
srcBackupName = "source"
replicaObjectStoreName = "replica"
replicaClusterName = "replica"
replicaBackupName = "replica"
minioSrc = "minio-src"
minioReplica = "minio-replica"
gcsSrc = "fakegcs-src"
azuriteSrc = "azurite-src"
azuriteReplica = "azurite-replica"
)
type replicaClusterTestResources struct {
SrcObjectStoreResources *objectstore.Resources
SrcObjectStore *pluginBarmanCloudV1.ObjectStore
SrcCluster *cloudnativepgv1.Cluster
SrcBackup *cloudnativepgv1.Backup
ReplicaObjectStoreResources *objectstore.Resources
ReplicaObjectStore *pluginBarmanCloudV1.ObjectStore
ReplicaCluster *cloudnativepgv1.Cluster
ReplicaBackup *cloudnativepgv1.Backup
}
type s3ReplicaClusterFactory struct{}
func (f s3ReplicaClusterFactory) createReplicaClusterTestResources(namespace string) replicaClusterTestResources {
result := replicaClusterTestResources{}
result.SrcObjectStoreResources = objectstore.NewMinioObjectStoreResources(namespace, minioSrc)
result.SrcObjectStore = objectstore.NewMinioObjectStore(namespace, srcObjectStoreName, minioSrc)
result.SrcCluster = newSrcCluster(namespace)
result.SrcBackup = newSrcBackup(namespace)
result.ReplicaObjectStoreResources = objectstore.NewMinioObjectStoreResources(namespace, minioReplica)
result.ReplicaObjectStore = objectstore.NewMinioObjectStore(namespace, replicaObjectStoreName, minioReplica)
result.ReplicaCluster = newReplicaCluster(namespace)
result.ReplicaBackup = newReplicaBackup(namespace)
return result
}
type gcsReplicaClusterFactory struct{}
func (f gcsReplicaClusterFactory) createReplicaClusterTestResources(namespace string) replicaClusterTestResources {
result := replicaClusterTestResources{}
result.SrcObjectStoreResources = objectstore.NewGCSObjectStoreResources(namespace, gcsSrc)
result.SrcObjectStore = objectstore.NewGCSObjectStore(namespace, srcObjectStoreName, gcsSrc)
result.SrcCluster = newSrcCluster(namespace)
result.SrcCluster.Spec.ExternalClusters[1].PluginConfiguration.Parameters["barmanObjectName"] = srcObjectStoreName
result.SrcBackup = newSrcBackup(namespace)
// fake-gcs-server requires the STORAGE_EMULATOR_HOST environment variable to be set.
// We would have to set that variable to different values to point to the different fake-gcs-server instances,
// however the plugin does not support injecting in the sidecar variables with the same name and different values,
// so we can only point to a single instance. However, this reflects the real-world scenario, since GCS always
// points to the same endpoint.
result.ReplicaObjectStoreResources = &objectstore.Resources{}
result.ReplicaObjectStore = nil
result.ReplicaCluster = newReplicaCluster(namespace)
result.ReplicaCluster.Spec.Plugins[0].Parameters["barmanObjectName"] = srcObjectStoreName
result.ReplicaCluster.Spec.ExternalClusters[1].PluginConfiguration.Parameters["barmanObjectName"] = srcObjectStoreName
result.ReplicaBackup = newReplicaBackup(namespace)
return result
}
type azuriteReplicaClusterFactory struct{}
func (f azuriteReplicaClusterFactory) createReplicaClusterTestResources(namespace string) replicaClusterTestResources {
result := replicaClusterTestResources{}
result.SrcObjectStoreResources = objectstore.NewAzuriteObjectStoreResources(namespace, azuriteSrc)
result.SrcObjectStore = objectstore.NewAzuriteObjectStore(namespace, srcObjectStoreName, azuriteSrc)
result.SrcCluster = newSrcCluster(namespace)
result.SrcBackup = newSrcBackup(namespace)
result.ReplicaObjectStoreResources = objectstore.NewAzuriteObjectStoreResources(namespace, azuriteReplica)
result.ReplicaObjectStore = objectstore.NewAzuriteObjectStore(namespace, replicaObjectStoreName, azuriteReplica)
result.ReplicaCluster = newReplicaCluster(namespace)
result.ReplicaBackup = newReplicaBackup(namespace)
return result
}
func newSrcCluster(namespace string) *cloudnativepgv1.Cluster {
cluster := &cloudnativepgv1.Cluster{
TypeMeta: metav1.TypeMeta{
Kind: "Cluster",
APIVersion: "postgresql.cnpg.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: srcClusterName,
Namespace: namespace,
},
Spec: cloudnativepgv1.ClusterSpec{
Instances: 2,
ImagePullPolicy: corev1.PullAlways,
Plugins: cloudnativepgv1.PluginConfigurationList{
{
Name: "barman-cloud.cloudnative-pg.io",
Parameters: map[string]string{
"barmanObjectName": srcObjectStoreName,
},
},
},
PostgresConfiguration: cloudnativepgv1.PostgresConfiguration{
Parameters: map[string]string{
"log_min_messages": "DEBUG4",
},
},
StorageConfiguration: cloudnativepgv1.StorageConfiguration{
Size: size,
},
ReplicaCluster: &cloudnativepgv1.ReplicaClusterConfiguration{
Primary: "source",
Source: "replica",
},
ExternalClusters: []cloudnativepgv1.ExternalCluster{
{
Name: "source",
PluginConfiguration: &cloudnativepgv1.PluginConfiguration{
Name: "barman-cloud.cloudnative-pg.io",
Parameters: map[string]string{
"barmanObjectName": srcObjectStoreName,
"serverName": srcClusterName,
},
},
},
{
Name: "replica",
PluginConfiguration: &cloudnativepgv1.PluginConfiguration{
Name: "barman-cloud.cloudnative-pg.io",
Parameters: map[string]string{
"barmanObjectName": replicaObjectStoreName,
"serverName": replicaObjectStoreName,
},
},
},
},
},
}
return cluster
}
func newSrcBackup(namespace string) *cloudnativepgv1.Backup {
return &cloudnativepgv1.Backup{
TypeMeta: metav1.TypeMeta{
Kind: "Backup",
APIVersion: "postgresql.cnpg.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: srcBackupName,
Namespace: namespace,
},
Spec: cloudnativepgv1.BackupSpec{
Cluster: cloudnativepgv1.LocalObjectReference{
Name: srcClusterName,
},
Method: "plugin",
PluginConfiguration: &cloudnativepgv1.BackupPluginConfiguration{
Name: "barman-cloud.cloudnative-pg.io",
},
Target: "primary",
},
}
}
func newReplicaBackup(namespace string) *cloudnativepgv1.Backup {
return &cloudnativepgv1.Backup{
TypeMeta: metav1.TypeMeta{
Kind: "Backup",
APIVersion: "postgresql.cnpg.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: replicaBackupName,
Namespace: namespace,
},
Spec: cloudnativepgv1.BackupSpec{
Cluster: cloudnativepgv1.LocalObjectReference{
Name: replicaClusterName,
},
Method: "plugin",
PluginConfiguration: &cloudnativepgv1.BackupPluginConfiguration{
Name: "barman-cloud.cloudnative-pg.io",
},
},
}
}
func newReplicaCluster(namespace string) *cloudnativepgv1.Cluster {
cluster := &cloudnativepgv1.Cluster{
TypeMeta: metav1.TypeMeta{
Kind: "Cluster",
APIVersion: "postgresql.cnpg.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: replicaClusterName,
Namespace: namespace,
},
Spec: cloudnativepgv1.ClusterSpec{
Instances: 2,
ImagePullPolicy: corev1.PullAlways,
Bootstrap: &cloudnativepgv1.BootstrapConfiguration{
Recovery: &cloudnativepgv1.BootstrapRecovery{
Source: "source",
},
},
Plugins: cloudnativepgv1.PluginConfigurationList{
{
Name: "barman-cloud.cloudnative-pg.io",
Parameters: map[string]string{
"barmanObjectName": replicaObjectStoreName,
},
},
},
PostgresConfiguration: cloudnativepgv1.PostgresConfiguration{
Parameters: map[string]string{
"log_min_messages": "DEBUG4",
},
},
ExternalClusters: []cloudnativepgv1.ExternalCluster{
{
Name: "source",
PluginConfiguration: &cloudnativepgv1.PluginConfiguration{
Name: "barman-cloud.cloudnative-pg.io",
Parameters: map[string]string{
"barmanObjectName": srcObjectStoreName,
"serverName": srcClusterName,
},
},
},
{
Name: "replica",
PluginConfiguration: &cloudnativepgv1.PluginConfiguration{
Name: "barman-cloud.cloudnative-pg.io",
Parameters: map[string]string{
"barmanObjectName": replicaObjectStoreName,
"serverName": replicaObjectStoreName,
},
},
},
},
ReplicaCluster: &cloudnativepgv1.ReplicaClusterConfiguration{
Primary: "source",
Source: "source",
},
StorageConfiguration: cloudnativepgv1.StorageConfiguration{
Size: size,
},
},
}
return cluster
}

View File

@ -0,0 +1,273 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replicacluster
import (
"fmt"
"strings"
"time"
cloudnativepgv1 "github.com/cloudnative-pg/api/pkg/api/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
internalClient "github.com/cloudnative-pg/plugin-barman-cloud/test/e2e/internal/client"
cluster2 "github.com/cloudnative-pg/plugin-barman-cloud/test/e2e/internal/cluster"
"github.com/cloudnative-pg/plugin-barman-cloud/test/e2e/internal/command"
nmsp "github.com/cloudnative-pg/plugin-barman-cloud/test/e2e/internal/namespace"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("Replica cluster", func() {
var namespace *corev1.Namespace
var cl client.Client
BeforeEach(func(ctx SpecContext) {
var err error
cl, _, err = internalClient.NewClient()
Expect(err).NotTo(HaveOccurred())
namespace, err = nmsp.CreateUniqueNamespace(ctx, cl, "replica-cluster")
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func(ctx SpecContext) {
Expect(cl.Delete(ctx, namespace)).To(Succeed())
})
DescribeTable("can switchover to a replica cluster",
func(
ctx SpecContext,
factory testCaseFactory,
) {
testResources := factory.createReplicaClusterTestResources(namespace.Name)
By("starting the ObjectStore deployments")
Expect(testResources.SrcObjectStoreResources.Create(ctx, cl)).To(Succeed())
Expect(testResources.ReplicaObjectStoreResources.Create(ctx, cl)).To(Succeed())
By("creating the ObjectStores")
Expect(cl.Create(ctx, testResources.SrcObjectStore)).To(Succeed())
// We do not need to create the replica object store if we are using the same object store for both clusters.
if testResources.ReplicaObjectStore != nil {
Expect(cl.Create(ctx, testResources.ReplicaObjectStore)).To(Succeed())
}
By("Creating a CloudNativePG cluster")
src := testResources.SrcCluster
Expect(cl.Create(ctx, testResources.SrcCluster)).To(Succeed())
By("Having the Cluster ready")
Eventually(func(g Gomega) {
g.Expect(cl.Get(
ctx,
types.NamespacedName{
Name: src.Name,
Namespace: src.Namespace,
},
src)).To(Succeed())
g.Expect(cluster2.IsReady(*src)).To(BeTrue())
}).WithTimeout(5 * time.Minute).WithPolling(5 * time.Second).Should(Succeed())
By("Adding data to PostgreSQL")
clientSet, cfg, err := internalClient.NewClientSet()
Expect(err).NotTo(HaveOccurred())
_, _, err = command.ExecuteInContainer(ctx,
*clientSet,
cfg,
command.ContainerLocator{
NamespaceName: src.Namespace,
PodName: fmt.Sprintf("%v-1", src.Name),
ContainerName: "postgres",
},
nil,
[]string{"psql", "-tAc", "CREATE TABLE test (i int); INSERT INTO test VALUES (1);"})
Expect(err).NotTo(HaveOccurred())
By("Creating a backup")
backup := testResources.SrcBackup
Expect(cl.Create(ctx, backup)).To(Succeed())
By("Waiting for the backup to complete")
Eventually(func(g Gomega) {
g.Expect(cl.Get(ctx, types.NamespacedName{Name: backup.Name, Namespace: backup.Namespace},
backup)).To(Succeed())
g.Expect(backup.Status.Phase).To(BeEquivalentTo(cloudnativepgv1.BackupPhaseCompleted))
}).Within(2 * time.Minute).WithPolling(5 * time.Second).Should(Succeed())
By("Creating a replica cluster")
replica := testResources.ReplicaCluster
Expect(cl.Create(ctx, testResources.ReplicaCluster)).To(Succeed())
By("Having the replica cluster ready")
Eventually(func(g Gomega) {
g.Expect(cl.Get(
ctx,
types.NamespacedName{
Name: replica.Name,
Namespace: replica.Namespace,
},
replica)).To(Succeed())
g.Expect(cluster2.IsReady(*replica)).To(BeTrue())
}).WithTimeout(5 * time.Minute).WithPolling(5 * time.Second).Should(Succeed())
By("Checking the data in the replica cluster")
output, _, err := command.ExecuteInContainer(ctx,
*clientSet,
cfg,
command.ContainerLocator{
NamespaceName: replica.Namespace,
PodName: fmt.Sprintf("%v-1", replica.Name),
ContainerName: "postgres",
},
nil,
[]string{"psql", "-tAc", "SELECT count(*) FROM test;"})
Expect(err).NotTo(HaveOccurred())
Expect(output).To(BeEquivalentTo("1\n"))
// We want to check if the WALs archived by the operator outside the standard PostgreSQL archive_command
// are correctly archived.
By("Demoting the source to a replica")
err = cl.Get(ctx, types.NamespacedName{Name: src.Name, Namespace: src.Namespace}, src)
Expect(err).ToNot(HaveOccurred())
oldSrc := src.DeepCopy()
src.Spec.ReplicaCluster.Primary = replicaClusterName
Expect(cl.Patch(ctx, src, client.MergeFrom(oldSrc))).To(Succeed())
By("Waiting for all the source pods to be in recovery")
for i := 1; i <= src.Spec.Instances; i++ {
Eventually(func() (string, error) {
stdOut, stdErr, err := command.ExecuteInContainer(
ctx,
*clientSet,
cfg,
command.ContainerLocator{
NamespaceName: src.Namespace,
PodName: fmt.Sprintf("%v-%v", src.Name, i),
ContainerName: "postgres",
},
ptr.To(5*time.Second),
[]string{"psql", "-tAc", "SELECT pg_is_in_recovery();"})
if err != nil {
GinkgoWriter.Printf("stdout: %v\ntderr: %v\n", stdOut, stdErr)
}
return strings.Trim(stdOut, "\n"), err
}, 300, 10).Should(BeEquivalentTo("t"))
}
By("Getting the demotion token")
err = cl.Get(ctx, types.NamespacedName{Name: src.Name, Namespace: src.Namespace}, src)
Expect(err).ToNot(HaveOccurred())
token := src.Status.DemotionToken
By("Promoting the replica")
err = cl.Get(ctx, types.NamespacedName{Name: replica.Name, Namespace: replica.Namespace}, replica)
Expect(err).ToNot(HaveOccurred())
oldReplica := replica.DeepCopy()
replica.Spec.ReplicaCluster.PromotionToken = token
replica.Spec.ReplicaCluster.Primary = replica.Name
Expect(cl.Patch(ctx, replica, client.MergeFrom(oldReplica))).To(Succeed())
By("Waiting for the replica to be promoted")
Eventually(func() (string, error) {
stdOut, stdErr, err := command.ExecuteInContainer(
ctx,
*clientSet,
cfg,
command.ContainerLocator{
NamespaceName: replica.Namespace,
PodName: fmt.Sprintf("%v-1", replica.Name),
ContainerName: "postgres",
},
ptr.To(5*time.Second),
[]string{"psql", "-tAc", "SELECT pg_is_in_recovery();"})
if err != nil {
GinkgoWriter.Printf("stdout: %v\ntderr: %v\n", stdOut, stdErr)
}
return strings.Trim(stdOut, "\n"), err
}, 300, 10).Should(BeEquivalentTo("f"))
By("Adding new data to PostgreSQL")
clientSet, cfg, err = internalClient.NewClientSet()
Expect(err).NotTo(HaveOccurred())
_, _, err = command.ExecuteInContainer(ctx,
*clientSet,
cfg,
command.ContainerLocator{
NamespaceName: replica.Namespace,
PodName: fmt.Sprintf("%v-1", replica.Name),
ContainerName: "postgres",
},
nil,
[]string{"psql", "-tAc", "INSERT INTO test VALUES (2);"})
Expect(err).NotTo(HaveOccurred())
_, _, err = command.ExecuteInContainer(ctx,
*clientSet,
cfg,
command.ContainerLocator{
NamespaceName: replica.Namespace,
PodName: fmt.Sprintf("%v-1", replica.Name),
ContainerName: "postgres",
},
nil,
[]string{"psql", "-tAc", "SELECT pg_switch_wal();"})
Expect(err).NotTo(HaveOccurred())
By("Creating a backup in the replica cluster")
replicaBackup := testResources.ReplicaBackup
Expect(cl.Create(ctx, replicaBackup)).To(Succeed())
By("Waiting for the backup to complete")
Eventually(func(g Gomega) {
g.Expect(cl.Get(ctx, types.NamespacedName{Name: replicaBackup.Name, Namespace: replicaBackup.Namespace},
replicaBackup)).To(Succeed())
g.Expect(replicaBackup.Status.Phase).To(BeEquivalentTo(cloudnativepgv1.BackupPhaseCompleted))
}).Within(2 * time.Minute).WithPolling(5 * time.Second).Should(Succeed())
By("Checking the data in the former primary cluster")
Eventually(func(g Gomega) {
output, _, err = command.ExecuteInContainer(ctx,
*clientSet,
cfg,
command.ContainerLocator{
NamespaceName: src.Namespace,
PodName: fmt.Sprintf("%v-1", src.Name),
ContainerName: "postgres",
},
nil,
[]string{"psql", "-tAc", "SELECT count(*) FROM test;"})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(output).To(BeEquivalentTo("2\n"))
}).Within(2 * time.Minute).WithPolling(5 * time.Second).Should(Succeed())
},
Entry(
"with MinIO",
s3ReplicaClusterFactory{},
),
Entry(
"with Azurite",
azuriteReplicaClusterFactory{},
),
Entry(
"with fake-gcs-server",
gcsReplicaClusterFactory{},
),
)
})