plugin-barman-cloud/internal/cnpgi/operator/lifecycle.go
Leonardo Cecchi 13e3fab268
feat: lenient decoding of CNPG resources (#192)
This patch enables the barman-cloud plugin to function with an operator
that is structurally identical to CNPG but works with a different API group.

It achieves this through lenient decoding of the provided CNPG resources
and injecting the detected GVK into the sidecar, enabling it to correctly
encode and decode the Kubernetes resources.

Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com>
Co-authored-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com>
2025-03-14 12:23:23 +01:00

467 lines
12 KiB
Go

package operator
import (
"context"
"errors"
"fmt"
"strings"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/object"
"github.com/cloudnative-pg/cnpg-i/pkg/lifecycle"
"github.com/cloudnative-pg/machinery/pkg/log"
"github.com/spf13/viper"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
)
// LifecycleImplementation is the implementation of the lifecycle handler
type LifecycleImplementation struct {
lifecycle.UnimplementedOperatorLifecycleServer
Client client.Client
}
// GetCapabilities exposes the lifecycle capabilities
func (impl LifecycleImplementation) GetCapabilities(
_ context.Context,
_ *lifecycle.OperatorLifecycleCapabilitiesRequest,
) (*lifecycle.OperatorLifecycleCapabilitiesResponse, error) {
return &lifecycle.OperatorLifecycleCapabilitiesResponse{
LifecycleCapabilities: []*lifecycle.OperatorLifecycleCapabilities{
{
Group: "",
Kind: "Pod",
OperationTypes: []*lifecycle.OperatorOperationType{
{
Type: lifecycle.OperatorOperationType_TYPE_CREATE,
},
{
Type: lifecycle.OperatorOperationType_TYPE_PATCH,
},
},
},
{
Group: batchv1.GroupName,
Kind: "Job",
OperationTypes: []*lifecycle.OperatorOperationType{
{
Type: lifecycle.OperatorOperationType_TYPE_CREATE,
},
},
},
},
}, nil
}
// LifecycleHook is called when creating Kubernetes services
func (impl LifecycleImplementation) LifecycleHook(
ctx context.Context,
request *lifecycle.OperatorLifecycleRequest,
) (*lifecycle.OperatorLifecycleResponse, error) {
contextLogger := log.FromContext(ctx).WithName("lifecycle")
contextLogger.Info("Lifecycle hook reconciliation start")
operation := request.GetOperationType().GetType().Enum()
if operation == nil {
return nil, errors.New("no operation set")
}
kind, err := object.GetKind(request.GetObjectDefinition())
if err != nil {
return nil, err
}
var cluster cnpgv1.Cluster
if err := decoder.DecodeObjectLenient(
request.GetClusterDefinition(),
&cluster,
); err != nil {
return nil, err
}
pluginConfiguration := config.NewFromCluster(&cluster)
// barman object is required for both the archive and restore process
if err := pluginConfiguration.Validate(); err != nil {
contextLogger.Info("pluginConfiguration invalid, skipping lifecycle", "error", err)
return nil, nil
}
switch kind {
case "Pod":
contextLogger.Info("Reconciling pod")
return impl.reconcilePod(ctx, &cluster, request, pluginConfiguration)
case "Job":
contextLogger.Info("Reconciling job")
return impl.reconcileJob(ctx, &cluster, request, pluginConfiguration)
default:
return nil, fmt.Errorf("unsupported kind: %s", kind)
}
}
func (impl LifecycleImplementation) reconcileJob(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
env, err := impl.collectAdditionalEnvs(ctx, cluster.Namespace, pluginConfiguration)
if err != nil {
return nil, err
}
certificates, err := impl.collectAdditionalCertificates(ctx, cluster.Namespace, pluginConfiguration)
if err != nil {
return nil, err
}
return reconcileJob(ctx, cluster, request, env, certificates)
}
func reconcileJob(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
env []corev1.EnvVar,
certificates []corev1.VolumeProjection,
) (*lifecycle.OperatorLifecycleResponse, error) {
contextLogger := log.FromContext(ctx).WithName("lifecycle")
if pluginConfig := cluster.GetRecoverySourcePlugin(); pluginConfig == nil || pluginConfig.Name != metadata.PluginName {
contextLogger.Debug("cluster does not use the this plugin for recovery, skipping")
return nil, nil
}
var job batchv1.Job
if err := decoder.DecodeObjectStrict(
request.GetObjectDefinition(),
&job,
batchv1.SchemeGroupVersion.WithKind("Job"),
); err != nil {
contextLogger.Error(err, "failed to decode job")
return nil, err
}
contextLogger = log.FromContext(ctx).WithName("plugin-barman-cloud-lifecycle").
WithValues("jobName", job.Name)
contextLogger.Debug("starting job reconciliation")
if getCNPGJobRole(&job) != "full-recovery" {
contextLogger.Debug("job is not a recovery job, skipping")
return nil, nil
}
mutatedJob := job.DeepCopy()
if err := reconcilePodSpec(
cluster,
&mutatedJob.Spec.Template.Spec,
"full-recovery",
corev1.Container{
Args: []string{"restore"},
},
env,
certificates,
); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for job: %w", err)
}
patch, err := object.CreatePatch(mutatedJob, &job)
if err != nil {
return nil, err
}
contextLogger.Debug("generated patch", "content", string(patch))
return &lifecycle.OperatorLifecycleResponse{
JsonPatch: patch,
}, nil
}
func (impl LifecycleImplementation) reconcilePod(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
env, err := impl.collectAdditionalEnvs(ctx, cluster.Namespace, pluginConfiguration)
if err != nil {
return nil, err
}
certificates, err := impl.collectAdditionalCertificates(ctx, cluster.Namespace, pluginConfiguration)
if err != nil {
return nil, err
}
return reconcilePod(ctx, cluster, request, pluginConfiguration, env, certificates)
}
func reconcilePod(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
env []corev1.EnvVar,
certificates []corev1.VolumeProjection,
) (*lifecycle.OperatorLifecycleResponse, error) {
pod, err := decoder.DecodePodJSON(request.GetObjectDefinition())
if err != nil {
return nil, err
}
contextLogger := log.FromContext(ctx).WithName("plugin-barman-cloud-lifecycle").
WithValues("podName", pod.Name)
mutatedPod := pod.DeepCopy()
if len(pluginConfiguration.BarmanObjectName) != 0 {
if err := reconcilePodSpec(
cluster,
&mutatedPod.Spec,
"postgres",
corev1.Container{
Args: []string{"instance"},
},
env,
certificates,
); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err)
}
} else {
contextLogger.Debug("No need to mutate instance with no backup & archiving configuration")
}
patch, err := object.CreatePatch(mutatedPod, pod)
if err != nil {
return nil, err
}
contextLogger.Debug("generated patch", "content", string(patch))
return &lifecycle.OperatorLifecycleResponse{
JsonPatch: patch,
}, nil
}
func reconcilePodSpec(
cluster *cnpgv1.Cluster,
spec *corev1.PodSpec,
mainContainerName string,
sidecarConfig corev1.Container,
additionalEnvs []corev1.EnvVar,
certificates []corev1.VolumeProjection,
) error {
envs := []corev1.EnvVar{
{
Name: "NAMESPACE",
Value: cluster.Namespace,
},
{
Name: "CLUSTER_NAME",
Value: cluster.Name,
},
{
// TODO: should we really use this one?
// should we mount an emptyDir volume just for that?
Name: "SPOOL_DIRECTORY",
Value: "/controller/wal-restore-spool",
},
{
Name: "CUSTOM_CNPG_GROUP",
Value: cluster.GetObjectKind().GroupVersionKind().Group,
},
{
Name: "CUSTOM_CNPG_VERSION",
Value: cluster.GetObjectKind().GroupVersionKind().Version,
},
}
envs = append(envs, additionalEnvs...)
baseProbe := &corev1.Probe{
FailureThreshold: 10,
TimeoutSeconds: 10,
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/manager", "healthcheck", "unix"},
},
},
}
// fixed values
sidecarConfig.Name = "plugin-barman-cloud"
sidecarConfig.Image = viper.GetString("sidecar-image")
sidecarConfig.ImagePullPolicy = cluster.Spec.ImagePullPolicy
sidecarConfig.StartupProbe = baseProbe.DeepCopy()
// merge the main container envs if they aren't already set
for _, container := range spec.Containers {
if container.Name == mainContainerName {
for _, env := range container.Env {
found := false
for _, existingEnv := range sidecarConfig.Env {
if existingEnv.Name == env.Name {
found = true
break
}
}
if !found {
sidecarConfig.Env = append(sidecarConfig.Env, env)
}
}
break
}
}
// merge the default envs if they aren't already set
for _, env := range envs {
found := false
for _, existingEnv := range sidecarConfig.Env {
if existingEnv.Name == env.Name {
found = true
break
}
}
if !found {
sidecarConfig.Env = append(sidecarConfig.Env, env)
}
}
if err := injectPluginSidecarPodSpec(spec, &sidecarConfig, mainContainerName); err != nil {
return err
}
// inject the volume containing the certificates if needed
if !volumeListHasVolume(spec.Volumes, barmanCertificatesVolumeName) {
spec.Volumes = append(spec.Volumes, corev1.Volume{
Name: barmanCertificatesVolumeName,
VolumeSource: corev1.VolumeSource{
Projected: &corev1.ProjectedVolumeSource{
Sources: certificates,
},
},
})
}
return nil
}
// TODO: move to machinery once the logic is finalized
// InjectPluginVolumePodSpec injects the plugin volume into a CNPG Pod spec.
func InjectPluginVolumePodSpec(spec *corev1.PodSpec, mainContainerName string) {
const (
pluginVolumeName = "plugins"
pluginMountPath = "/plugins"
)
foundPluginVolume := false
for i := range spec.Volumes {
if spec.Volumes[i].Name == pluginVolumeName {
foundPluginVolume = true
}
}
if foundPluginVolume {
return
}
spec.Volumes = append(spec.Volumes, corev1.Volume{
Name: pluginVolumeName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
})
for i := range spec.Containers {
if spec.Containers[i].Name == mainContainerName {
spec.Containers[i].VolumeMounts = append(
spec.Containers[i].VolumeMounts,
corev1.VolumeMount{
Name: pluginVolumeName,
MountPath: pluginMountPath,
},
)
}
}
}
// injectPluginSidecarPodSpec injects a plugin sidecar into a CNPG Pod spec.
//
// If the "injectMainContainerVolumes" flag is true, this will append all the volume
// mounts that are used in the instance manager Pod to the passed sidecar
// container, granting it superuser access to the PostgreSQL instance.
func injectPluginSidecarPodSpec(
spec *corev1.PodSpec,
sidecar *corev1.Container,
mainContainerName string,
) error {
sidecar = sidecar.DeepCopy()
InjectPluginVolumePodSpec(spec, mainContainerName)
var volumeMounts []corev1.VolumeMount
sidecarContainerFound := false
mainContainerFound := false
for i := range spec.Containers {
if spec.Containers[i].Name == mainContainerName {
volumeMounts = spec.Containers[i].VolumeMounts
mainContainerFound = true
}
}
if !mainContainerFound {
return errors.New("main container not found")
}
for i := range spec.InitContainers {
if spec.InitContainers[i].Name == sidecar.Name {
sidecarContainerFound = true
}
}
if sidecarContainerFound {
// The sidecar container was already added
return nil
}
// Do not modify the passed sidecar definition
sidecar.VolumeMounts = append(
sidecar.VolumeMounts,
corev1.VolumeMount{
Name: barmanCertificatesVolumeName,
MountPath: metadata.BarmanCertificatesPath,
})
sidecar.VolumeMounts = append(sidecar.VolumeMounts, volumeMounts...)
sidecar.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyAlways)
spec.InitContainers = append(spec.InitContainers, *sidecar)
return nil
}
// volumeListHasVolume check if a volume with a known name exists
// in the volume list
func volumeListHasVolume(volumes []corev1.Volume, name string) bool {
for i := range volumes {
if volumes[i].Name == name {
return true
}
}
return false
}
// getCNPGJobRole gets the role associated to a CNPG job
func getCNPGJobRole(job *batchv1.Job) string {
const jobRoleLabelSuffix = "/jobRole"
for k, v := range job.Spec.Template.Labels {
if strings.HasSuffix(k, jobRoleLabelSuffix) {
return v
}
}
return ""
}