mirror of
https://github.com/cloudnative-pg/plugin-barman-cloud.git
synced 2026-01-11 21:23:12 +01:00
Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com> Signed-off-by: Francesco Canovai <francesco.canovai@enterprisedb.com> Co-authored-by: Francesco Canovai <francesco.canovai@enterprisedb.com>
488 lines
12 KiB
Go
488 lines
12 KiB
Go
package operator
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
|
|
"github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
|
|
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder"
|
|
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/object"
|
|
"github.com/cloudnative-pg/cnpg-i/pkg/lifecycle"
|
|
"github.com/cloudnative-pg/machinery/pkg/log"
|
|
"github.com/spf13/viper"
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/utils/ptr"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
|
|
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
|
|
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
|
|
)
|
|
|
|
// LifecycleImplementation is the implementation of the lifecycle handler
|
|
type LifecycleImplementation struct {
|
|
lifecycle.UnimplementedOperatorLifecycleServer
|
|
Client client.Client
|
|
}
|
|
|
|
// GetCapabilities exposes the lifecycle capabilities
|
|
func (impl LifecycleImplementation) GetCapabilities(
|
|
_ context.Context,
|
|
_ *lifecycle.OperatorLifecycleCapabilitiesRequest,
|
|
) (*lifecycle.OperatorLifecycleCapabilitiesResponse, error) {
|
|
return &lifecycle.OperatorLifecycleCapabilitiesResponse{
|
|
LifecycleCapabilities: []*lifecycle.OperatorLifecycleCapabilities{
|
|
{
|
|
Group: "",
|
|
Kind: "Pod",
|
|
OperationTypes: []*lifecycle.OperatorOperationType{
|
|
{
|
|
Type: lifecycle.OperatorOperationType_TYPE_CREATE,
|
|
},
|
|
{
|
|
Type: lifecycle.OperatorOperationType_TYPE_PATCH,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Group: batchv1.GroupName,
|
|
Kind: "Job",
|
|
OperationTypes: []*lifecycle.OperatorOperationType{
|
|
{
|
|
Type: lifecycle.OperatorOperationType_TYPE_CREATE,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// LifecycleHook is called when creating Kubernetes services
|
|
func (impl LifecycleImplementation) LifecycleHook(
|
|
ctx context.Context,
|
|
request *lifecycle.OperatorLifecycleRequest,
|
|
) (*lifecycle.OperatorLifecycleResponse, error) {
|
|
contextLogger := log.FromContext(ctx).WithName("lifecycle")
|
|
contextLogger.Info("Lifecycle hook reconciliation start")
|
|
operation := request.GetOperationType().GetType().Enum()
|
|
if operation == nil {
|
|
return nil, errors.New("no operation set")
|
|
}
|
|
|
|
kind, err := object.GetKind(request.GetObjectDefinition())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cluster cnpgv1.Cluster
|
|
if err := decoder.DecodeObject(
|
|
request.GetClusterDefinition(),
|
|
&cluster,
|
|
cnpgv1.GroupVersion.WithKind("Cluster"),
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pluginConfiguration := config.NewFromCluster(&cluster)
|
|
|
|
// barman object is required for both the archive and restore process
|
|
if err := pluginConfiguration.Validate(); err != nil {
|
|
contextLogger.Info("pluginConfiguration invalid, skipping lifecycle", "error", err)
|
|
return nil, nil
|
|
}
|
|
|
|
switch kind {
|
|
case "Pod":
|
|
contextLogger.Info("Reconciling pod")
|
|
return impl.reconcilePod(ctx, &cluster, request, pluginConfiguration)
|
|
case "Job":
|
|
contextLogger.Info("Reconciling job")
|
|
return impl.reconcileJob(ctx, &cluster, request, pluginConfiguration)
|
|
default:
|
|
return nil, fmt.Errorf("unsupported kind: %s", kind)
|
|
}
|
|
}
|
|
|
|
func (impl LifecycleImplementation) collectAdditionalEnvs(
|
|
ctx context.Context,
|
|
namespace string,
|
|
pluginConfiguration *config.PluginConfiguration,
|
|
) ([]corev1.EnvVar, error) {
|
|
var result []corev1.EnvVar
|
|
|
|
if len(pluginConfiguration.BarmanObjectName) > 0 {
|
|
envs, err := impl.collectObjectStoreEnvs(
|
|
ctx,
|
|
types.NamespacedName{
|
|
Name: pluginConfiguration.BarmanObjectName,
|
|
Namespace: namespace,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result = append(result, envs...)
|
|
}
|
|
|
|
if len(pluginConfiguration.RecoveryBarmanObjectName) > 0 {
|
|
envs, err := impl.collectObjectStoreEnvs(
|
|
ctx,
|
|
types.NamespacedName{
|
|
Name: pluginConfiguration.RecoveryBarmanObjectName,
|
|
Namespace: namespace,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result = append(result, envs...)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (impl LifecycleImplementation) collectObjectStoreEnvs(
|
|
ctx context.Context,
|
|
barmanObjectKey types.NamespacedName,
|
|
) ([]corev1.EnvVar, error) {
|
|
var objectStore barmancloudv1.ObjectStore
|
|
if err := impl.Client.Get(ctx, barmanObjectKey, &objectStore); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return objectStore.Spec.InstanceSidecarConfiguration.Env, nil
|
|
}
|
|
|
|
func (impl LifecycleImplementation) reconcileJob(
|
|
ctx context.Context,
|
|
cluster *cnpgv1.Cluster,
|
|
request *lifecycle.OperatorLifecycleRequest,
|
|
pluginConfiguration *config.PluginConfiguration,
|
|
) (*lifecycle.OperatorLifecycleResponse, error) {
|
|
env, err := impl.collectAdditionalEnvs(ctx, cluster.Namespace, pluginConfiguration)
|
|
if err != nil {
|
|
return nil, nil
|
|
}
|
|
|
|
return reconcileJob(ctx, cluster, request, pluginConfiguration, env)
|
|
}
|
|
|
|
func reconcileJob(
|
|
ctx context.Context,
|
|
cluster *cnpgv1.Cluster,
|
|
request *lifecycle.OperatorLifecycleRequest,
|
|
pluginConfiguration *config.PluginConfiguration,
|
|
env []corev1.EnvVar,
|
|
) (*lifecycle.OperatorLifecycleResponse, error) {
|
|
contextLogger := log.FromContext(ctx).WithName("lifecycle")
|
|
if pluginConfig := cluster.GetRecoverySourcePlugin(); pluginConfig == nil || pluginConfig.Name != metadata.PluginName {
|
|
contextLogger.Debug("cluster does not use the this plugin for recovery, skipping")
|
|
return nil, nil
|
|
}
|
|
|
|
var job batchv1.Job
|
|
if err := decoder.DecodeObject(
|
|
request.GetObjectDefinition(),
|
|
&job,
|
|
batchv1.SchemeGroupVersion.WithKind("Job"),
|
|
); err != nil {
|
|
contextLogger.Error(err, "failed to decode job")
|
|
return nil, err
|
|
}
|
|
|
|
contextLogger = log.FromContext(ctx).WithName("plugin-barman-cloud-lifecycle").
|
|
WithValues("jobName", job.Name)
|
|
contextLogger.Debug("starting job reconciliation")
|
|
|
|
if job.Spec.Template.Labels[utils.JobRoleLabelName] != "full-recovery" {
|
|
contextLogger.Debug("job is not a recovery job, skipping")
|
|
return nil, nil
|
|
}
|
|
|
|
mutatedJob := job.DeepCopy()
|
|
|
|
if err := reconcilePodSpec(
|
|
pluginConfiguration,
|
|
cluster,
|
|
&mutatedJob.Spec.Template.Spec,
|
|
"full-recovery",
|
|
corev1.Container{
|
|
Args: []string{"restore"},
|
|
},
|
|
env,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("while reconciling pod spec for job: %w", err)
|
|
}
|
|
|
|
patch, err := object.CreatePatch(mutatedJob, &job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
contextLogger.Debug("generated patch", "content", string(patch))
|
|
return &lifecycle.OperatorLifecycleResponse{
|
|
JsonPatch: patch,
|
|
}, nil
|
|
}
|
|
|
|
func (impl LifecycleImplementation) reconcilePod(
|
|
ctx context.Context,
|
|
cluster *cnpgv1.Cluster,
|
|
request *lifecycle.OperatorLifecycleRequest,
|
|
pluginConfiguration *config.PluginConfiguration,
|
|
) (*lifecycle.OperatorLifecycleResponse, error) {
|
|
env, err := impl.collectAdditionalEnvs(ctx, cluster.Namespace, pluginConfiguration)
|
|
if err != nil {
|
|
return nil, nil
|
|
}
|
|
|
|
return reconcilePod(ctx, cluster, request, pluginConfiguration, env)
|
|
}
|
|
|
|
func reconcilePod(
|
|
ctx context.Context,
|
|
cluster *cnpgv1.Cluster,
|
|
request *lifecycle.OperatorLifecycleRequest,
|
|
pluginConfiguration *config.PluginConfiguration,
|
|
env []corev1.EnvVar,
|
|
) (*lifecycle.OperatorLifecycleResponse, error) {
|
|
pod, err := decoder.DecodePodJSON(request.GetObjectDefinition())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
contextLogger := log.FromContext(ctx).WithName("plugin-barman-cloud-lifecycle").
|
|
WithValues("podName", pod.Name)
|
|
|
|
mutatedPod := pod.DeepCopy()
|
|
|
|
if len(pluginConfiguration.BarmanObjectName) != 0 {
|
|
if err := reconcilePodSpec(
|
|
pluginConfiguration,
|
|
cluster,
|
|
&mutatedPod.Spec,
|
|
"postgres",
|
|
corev1.Container{
|
|
Args: []string{"instance"},
|
|
},
|
|
env,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err)
|
|
}
|
|
} else {
|
|
contextLogger.Debug("No need to mutate instance with no backup & archiving configuration")
|
|
}
|
|
|
|
patch, err := object.CreatePatch(mutatedPod, pod)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
contextLogger.Debug("generated patch", "content", string(patch))
|
|
return &lifecycle.OperatorLifecycleResponse{
|
|
JsonPatch: patch,
|
|
}, nil
|
|
}
|
|
|
|
func reconcilePodSpec(
|
|
cfg *config.PluginConfiguration,
|
|
cluster *cnpgv1.Cluster,
|
|
spec *corev1.PodSpec,
|
|
mainContainerName string,
|
|
sidecarConfig corev1.Container,
|
|
additionalEnvs []corev1.EnvVar,
|
|
) error {
|
|
envs := []corev1.EnvVar{
|
|
{
|
|
Name: "NAMESPACE",
|
|
Value: cluster.Namespace,
|
|
},
|
|
{
|
|
Name: "CLUSTER_NAME",
|
|
Value: cluster.Name,
|
|
},
|
|
{
|
|
// TODO: should we really use this one?
|
|
// should we mount an emptyDir volume just for that?
|
|
Name: "SPOOL_DIRECTORY",
|
|
Value: "/controller/wal-restore-spool",
|
|
},
|
|
}
|
|
|
|
if len(cfg.BarmanObjectName) > 0 {
|
|
envs = append(envs,
|
|
corev1.EnvVar{
|
|
Name: "BARMAN_OBJECT_NAME",
|
|
Value: cfg.BarmanObjectName,
|
|
},
|
|
corev1.EnvVar{
|
|
Name: "SERVER_NAME",
|
|
Value: cfg.ServerName,
|
|
},
|
|
)
|
|
}
|
|
|
|
if len(cfg.RecoveryBarmanObjectName) > 0 {
|
|
envs = append(envs,
|
|
corev1.EnvVar{
|
|
Name: "RECOVERY_BARMAN_OBJECT_NAME",
|
|
Value: cfg.RecoveryBarmanObjectName,
|
|
},
|
|
corev1.EnvVar{
|
|
Name: "RECOVERY_SERVER_NAME",
|
|
Value: cfg.RecoveryServerName,
|
|
},
|
|
)
|
|
}
|
|
|
|
envs = append(envs, additionalEnvs...)
|
|
|
|
baseProbe := &corev1.Probe{
|
|
FailureThreshold: 3,
|
|
ProbeHandler: corev1.ProbeHandler{
|
|
Exec: &corev1.ExecAction{
|
|
Command: []string{"/manager", "healthcheck", "unix"},
|
|
},
|
|
},
|
|
}
|
|
|
|
// fixed values
|
|
sidecarConfig.Name = "plugin-barman-cloud"
|
|
sidecarConfig.Image = viper.GetString("sidecar-image")
|
|
sidecarConfig.ImagePullPolicy = cluster.Spec.ImagePullPolicy
|
|
sidecarConfig.LivenessProbe = baseProbe.DeepCopy()
|
|
sidecarConfig.StartupProbe = baseProbe.DeepCopy()
|
|
|
|
// merge the main container envs if they aren't already set
|
|
for _, container := range spec.Containers {
|
|
if container.Name == mainContainerName {
|
|
for _, env := range container.Env {
|
|
found := false
|
|
for _, existingEnv := range sidecarConfig.Env {
|
|
if existingEnv.Name == env.Name {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
sidecarConfig.Env = append(sidecarConfig.Env, env)
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
// merge the default envs if they aren't already set
|
|
for _, env := range envs {
|
|
found := false
|
|
for _, existingEnv := range sidecarConfig.Env {
|
|
if existingEnv.Name == env.Name {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
sidecarConfig.Env = append(sidecarConfig.Env, env)
|
|
}
|
|
}
|
|
|
|
if err := InjectPluginSidecarPodSpec(spec, &sidecarConfig, mainContainerName, true); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// TODO: move to machinery once the logic is finalized
|
|
|
|
// InjectPluginVolumePodSpec injects the plugin volume into a CNPG Pod spec.
|
|
func InjectPluginVolumePodSpec(spec *corev1.PodSpec, mainContainerName string) {
|
|
const (
|
|
pluginVolumeName = "plugins"
|
|
pluginMountPath = "/plugins"
|
|
)
|
|
|
|
foundPluginVolume := false
|
|
for i := range spec.Volumes {
|
|
if spec.Volumes[i].Name == pluginVolumeName {
|
|
foundPluginVolume = true
|
|
}
|
|
}
|
|
|
|
if foundPluginVolume {
|
|
return
|
|
}
|
|
|
|
spec.Volumes = append(spec.Volumes, corev1.Volume{
|
|
Name: pluginVolumeName,
|
|
VolumeSource: corev1.VolumeSource{
|
|
EmptyDir: &corev1.EmptyDirVolumeSource{},
|
|
},
|
|
})
|
|
|
|
for i := range spec.Containers {
|
|
if spec.Containers[i].Name == mainContainerName {
|
|
spec.Containers[i].VolumeMounts = append(
|
|
spec.Containers[i].VolumeMounts,
|
|
corev1.VolumeMount{
|
|
Name: pluginVolumeName,
|
|
MountPath: pluginMountPath,
|
|
},
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// InjectPluginSidecarPodSpec injects a plugin sidecar into a CNPG Pod spec.
|
|
//
|
|
// If the "injectMainContainerVolumes" flag is true, this will append all the volume
|
|
// mounts that are used in the instance manager Pod to the passed sidecar
|
|
// container, granting it superuser access to the PostgreSQL instance.
|
|
func InjectPluginSidecarPodSpec(
|
|
spec *corev1.PodSpec,
|
|
sidecar *corev1.Container,
|
|
mainContainerName string,
|
|
injectMainContainerVolumes bool,
|
|
) error {
|
|
sidecar = sidecar.DeepCopy()
|
|
InjectPluginVolumePodSpec(spec, mainContainerName)
|
|
|
|
var volumeMounts []corev1.VolumeMount
|
|
sidecarContainerFound := false
|
|
mainContainerFound := false
|
|
for i := range spec.Containers {
|
|
if spec.Containers[i].Name == mainContainerName {
|
|
volumeMounts = spec.Containers[i].VolumeMounts
|
|
mainContainerFound = true
|
|
}
|
|
}
|
|
|
|
if !mainContainerFound {
|
|
return errors.New("main container not found")
|
|
}
|
|
|
|
for i := range spec.InitContainers {
|
|
if spec.InitContainers[i].Name == sidecar.Name {
|
|
sidecarContainerFound = true
|
|
}
|
|
}
|
|
|
|
if sidecarContainerFound {
|
|
// The sidecar container was already added
|
|
return nil
|
|
}
|
|
|
|
// Do not modify the passed sidecar definition
|
|
if injectMainContainerVolumes {
|
|
sidecar.VolumeMounts = append(sidecar.VolumeMounts, volumeMounts...)
|
|
}
|
|
sidecar.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyAlways)
|
|
spec.InitContainers = append(spec.InitContainers, *sidecar)
|
|
|
|
return nil
|
|
}
|