plugin-barman-cloud/internal/cnpgi/operator/lifecycle.go
Tudor Golubenco 8c2e72a7a6 Move the startup probe configuration for the sidecar in the ObjectStore config
Signed-off-by: Tudor Golubenco <tudor@xata.io>
2025-09-14 10:07:58 -07:00

570 lines
15 KiB
Go

package operator
import (
"context"
"errors"
"fmt"
"strings"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/object"
"github.com/cloudnative-pg/cnpg-i/pkg/lifecycle"
"github.com/cloudnative-pg/machinery/pkg/log"
"github.com/spf13/viper"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
)
// LifecycleImplementation is the implementation of the lifecycle handler
type LifecycleImplementation struct {
lifecycle.UnimplementedOperatorLifecycleServer
Client client.Client
}
// GetCapabilities exposes the lifecycle capabilities
func (impl LifecycleImplementation) GetCapabilities(
_ context.Context,
_ *lifecycle.OperatorLifecycleCapabilitiesRequest,
) (*lifecycle.OperatorLifecycleCapabilitiesResponse, error) {
return &lifecycle.OperatorLifecycleCapabilitiesResponse{
LifecycleCapabilities: []*lifecycle.OperatorLifecycleCapabilities{
{
Group: "",
Kind: "Pod",
OperationTypes: []*lifecycle.OperatorOperationType{
{
Type: lifecycle.OperatorOperationType_TYPE_CREATE,
},
{
Type: lifecycle.OperatorOperationType_TYPE_EVALUATE,
},
},
},
{
Group: batchv1.GroupName,
Kind: "Job",
OperationTypes: []*lifecycle.OperatorOperationType{
{
Type: lifecycle.OperatorOperationType_TYPE_CREATE,
},
},
},
},
}, nil
}
// LifecycleHook is called when creating Kubernetes services
func (impl LifecycleImplementation) LifecycleHook(
ctx context.Context,
request *lifecycle.OperatorLifecycleRequest,
) (*lifecycle.OperatorLifecycleResponse, error) {
contextLogger := log.FromContext(ctx).WithName("lifecycle")
contextLogger.Info("Lifecycle hook reconciliation start")
operation := request.GetOperationType().GetType().Enum()
if operation == nil {
return nil, errors.New("no operation set")
}
kind, err := object.GetKind(request.GetObjectDefinition())
if err != nil {
return nil, err
}
var cluster cnpgv1.Cluster
if err := decoder.DecodeObjectLenient(
request.GetClusterDefinition(),
&cluster,
); err != nil {
return nil, err
}
pluginConfiguration := config.NewFromCluster(&cluster)
// barman object is required for both the archive and restore process
if err := pluginConfiguration.Validate(); err != nil {
contextLogger.Info("pluginConfiguration invalid, skipping lifecycle", "error", err)
return nil, nil
}
switch kind {
case "Pod":
contextLogger.Info("Reconciling pod")
return impl.reconcilePod(ctx, &cluster, request, pluginConfiguration)
case "Job":
contextLogger.Info("Reconciling job")
return impl.reconcileJob(ctx, &cluster, request, pluginConfiguration)
default:
return nil, fmt.Errorf("unsupported kind: %s", kind)
}
}
func (impl LifecycleImplementation) reconcileJob(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
env, err := impl.collectAdditionalEnvs(ctx, cluster.Namespace, pluginConfiguration)
if err != nil {
return nil, err
}
certificates, err := impl.collectAdditionalCertificates(ctx, pluginConfiguration)
if err != nil {
return nil, err
}
resources, err := impl.collectSidecarResourcesForRecoveryJob(ctx, pluginConfiguration)
if err != nil {
return nil, err
}
startupProbe, err := impl.collectSidecarStartupProbeForRecoveryJob(ctx, pluginConfiguration)
if err != nil {
return nil, err
}
return reconcileJob(ctx, cluster, request, sidecarConfiguration{
env: env,
certificates: certificates,
resources: resources,
startupProbe: startupProbe,
})
}
type sidecarConfiguration struct {
env []corev1.EnvVar
certificates []corev1.VolumeProjection
resources corev1.ResourceRequirements
startupProbe *barmancloudv1.ProbeConfig
}
func reconcileJob(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
config sidecarConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
contextLogger := log.FromContext(ctx).WithName("lifecycle")
if pluginConfig := cluster.GetRecoverySourcePlugin(); pluginConfig == nil || pluginConfig.Name != metadata.PluginName {
contextLogger.Debug("cluster does not use the this plugin for recovery, skipping")
return nil, nil
}
var job batchv1.Job
if err := decoder.DecodeObjectStrict(
request.GetObjectDefinition(),
&job,
batchv1.SchemeGroupVersion.WithKind("Job"),
); err != nil {
contextLogger.Error(err, "failed to decode job")
return nil, err
}
contextLogger = log.FromContext(ctx).WithName("plugin-barman-cloud-lifecycle").
WithValues("jobName", job.Name)
contextLogger.Debug("starting job reconciliation")
jobRole := getCNPGJobRole(&job)
if jobRole != "full-recovery" &&
jobRole != "snapshot-recovery" {
contextLogger.Debug("job is not a recovery job, skipping")
return nil, nil
}
mutatedJob := job.DeepCopy()
if err := reconcilePodSpec(
cluster,
&mutatedJob.Spec.Template.Spec,
jobRole,
corev1.Container{
Args: []string{"restore"},
},
config,
); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for job: %w", err)
}
patch, err := object.CreatePatch(mutatedJob, &job)
if err != nil {
return nil, err
}
contextLogger.Debug("generated patch", "content", string(patch))
return &lifecycle.OperatorLifecycleResponse{
JsonPatch: patch,
}, nil
}
func (impl LifecycleImplementation) reconcilePod(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
env, err := impl.collectAdditionalEnvs(ctx, cluster.Namespace, pluginConfiguration)
if err != nil {
return nil, err
}
certificates, err := impl.collectAdditionalCertificates(ctx, pluginConfiguration)
if err != nil {
return nil, err
}
resources, err := impl.collectSidecarResourcesForPod(ctx, pluginConfiguration)
if err != nil {
return nil, err
}
startupProbe, err := impl.collectSidecarStartupProbeForInstancePod(ctx, pluginConfiguration)
if err != nil {
return nil, err
}
return reconcilePod(ctx, cluster, request, pluginConfiguration, sidecarConfiguration{
env: env,
certificates: certificates,
resources: resources,
startupProbe: startupProbe,
})
}
func reconcilePod(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
config sidecarConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
pod, err := decoder.DecodePodJSON(request.GetObjectDefinition())
if err != nil {
return nil, err
}
contextLogger := log.FromContext(ctx).WithName("plugin-barman-cloud-lifecycle").
WithValues("podName", pod.Name)
mutatedPod := pod.DeepCopy()
if len(pluginConfiguration.BarmanObjectName) != 0 ||
len(pluginConfiguration.ReplicaSourceBarmanObjectName) != 0 {
if err := reconcilePodSpec(
cluster,
&mutatedPod.Spec,
"postgres",
corev1.Container{
Args: []string{"instance"},
},
config,
); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err)
}
} else {
contextLogger.Debug("No need to mutate instance with no backup & archiving configuration")
}
patch, err := object.CreatePatch(mutatedPod, pod)
if err != nil {
return nil, err
}
contextLogger.Debug("generated patch", "content", string(patch))
return &lifecycle.OperatorLifecycleResponse{
JsonPatch: patch,
}, nil
}
func reconcilePodSpec(
cluster *cnpgv1.Cluster,
spec *corev1.PodSpec,
mainContainerName string,
sidecarTemplate corev1.Container,
config sidecarConfiguration,
) error {
envs := []corev1.EnvVar{
{
Name: "NAMESPACE",
Value: cluster.Namespace,
},
{
Name: "CLUSTER_NAME",
Value: cluster.Name,
},
{
// TODO: should we really use this one?
// should we mount an emptyDir volume just for that?
Name: "SPOOL_DIRECTORY",
Value: "/controller/wal-restore-spool",
},
{
Name: "CUSTOM_CNPG_GROUP",
Value: cluster.GetObjectKind().GroupVersionKind().Group,
},
{
Name: "CUSTOM_CNPG_VERSION",
Value: cluster.GetObjectKind().GroupVersionKind().Version,
},
}
envs = append(envs, config.env...)
baseProbe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/manager", "healthcheck", "unix"},
},
},
}
// Apply configurable probe settings if available
if config.startupProbe != nil {
// Copy timing and threshold settings from user configuration
baseProbe.InitialDelaySeconds = config.startupProbe.InitialDelaySeconds
baseProbe.TimeoutSeconds = config.startupProbe.TimeoutSeconds
baseProbe.PeriodSeconds = config.startupProbe.PeriodSeconds
baseProbe.FailureThreshold = config.startupProbe.FailureThreshold
baseProbe.SuccessThreshold = config.startupProbe.SuccessThreshold
} else {
// Fallback to default values
baseProbe.FailureThreshold = 10
baseProbe.TimeoutSeconds = 10
}
// fixed values
sidecarTemplate.Name = "plugin-barman-cloud"
sidecarTemplate.Image = viper.GetString("sidecar-image")
sidecarTemplate.ImagePullPolicy = cluster.Spec.ImagePullPolicy
sidecarTemplate.StartupProbe = baseProbe.DeepCopy()
sidecarTemplate.SecurityContext = &corev1.SecurityContext{
AllowPrivilegeEscalation: ptr.To(false),
RunAsNonRoot: ptr.To(true),
Privileged: ptr.To(false),
ReadOnlyRootFilesystem: ptr.To(true),
SeccompProfile: &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeRuntimeDefault,
},
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{"ALL"},
},
}
sidecarTemplate.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyAlways)
sidecarTemplate.Resources = config.resources
// merge the main container envs if they aren't already set
for _, container := range spec.Containers {
if container.Name == mainContainerName {
for _, env := range container.Env {
found := false
for _, existingEnv := range sidecarTemplate.Env {
if existingEnv.Name == env.Name {
found = true
break
}
}
if !found {
sidecarTemplate.Env = append(sidecarTemplate.Env, env)
}
}
break
}
}
// merge the default envs if they aren't already set
for _, env := range envs {
found := false
for _, existingEnv := range sidecarTemplate.Env {
if existingEnv.Name == env.Name {
found = true
break
}
}
if !found {
sidecarTemplate.Env = append(sidecarTemplate.Env, env)
}
}
if len(config.certificates) > 0 {
sidecarTemplate.VolumeMounts = ensureVolumeMount(
sidecarTemplate.VolumeMounts,
corev1.VolumeMount{
Name: barmanCertificatesVolumeName,
MountPath: metadata.BarmanCertificatesPath,
})
spec.Volumes = ensureVolume(spec.Volumes, corev1.Volume{
Name: barmanCertificatesVolumeName,
VolumeSource: corev1.VolumeSource{
Projected: &corev1.ProjectedVolumeSource{
Sources: config.certificates,
},
},
})
} else {
sidecarTemplate.VolumeMounts = removeVolumeMount(sidecarTemplate.VolumeMounts, barmanCertificatesVolumeName)
spec.Volumes = removeVolume(spec.Volumes, barmanCertificatesVolumeName)
}
if err := injectPluginSidecarPodSpec(spec, &sidecarTemplate, mainContainerName); err != nil {
return err
}
return nil
}
// TODO: move to machinery once the logic is finalized
// InjectPluginVolumePodSpec injects the plugin volume into a CNPG Pod spec.
func InjectPluginVolumePodSpec(spec *corev1.PodSpec, mainContainerName string) {
const (
pluginVolumeName = "plugins"
pluginMountPath = "/plugins"
)
foundPluginVolume := false
for i := range spec.Volumes {
if spec.Volumes[i].Name == pluginVolumeName {
foundPluginVolume = true
}
}
if foundPluginVolume {
return
}
spec.Volumes = ensureVolume(spec.Volumes, corev1.Volume{
Name: pluginVolumeName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
})
for i := range spec.Containers {
if spec.Containers[i].Name == mainContainerName {
spec.Containers[i].VolumeMounts = ensureVolumeMount(
spec.Containers[i].VolumeMounts,
corev1.VolumeMount{
Name: pluginVolumeName,
MountPath: pluginMountPath,
},
)
}
}
}
// injectPluginSidecarPodSpec injects a plugin sidecar into a CNPG Pod spec.
func injectPluginSidecarPodSpec(
spec *corev1.PodSpec,
sidecar *corev1.Container,
mainContainerName string,
) error {
sidecar = sidecar.DeepCopy()
InjectPluginVolumePodSpec(spec, mainContainerName)
sidecarContainerFound := false
mainContainerFound := false
for i := range spec.Containers {
if spec.Containers[i].Name == mainContainerName {
sidecar.VolumeMounts = ensureVolumeMount(sidecar.VolumeMounts, spec.Containers[i].VolumeMounts...)
mainContainerFound = true
}
}
if !mainContainerFound {
return errors.New("main container not found")
}
for i := range spec.InitContainers {
if spec.InitContainers[i].Name == sidecar.Name {
sidecarContainerFound = true
spec.InitContainers[i] = *sidecar
}
}
if !sidecarContainerFound {
spec.InitContainers = append(spec.InitContainers, *sidecar)
}
return nil
}
// ensureVolume makes sure the passed volume is present in the list of volumes.
// If the volume is already present, it is updated.
func ensureVolume(volumes []corev1.Volume, volume corev1.Volume) []corev1.Volume {
volumeFound := false
for i := range volumes {
if volumes[i].Name == volume.Name {
volumeFound = true
volumes[i] = volume
}
}
if !volumeFound {
volumes = append(volumes, volume)
}
return volumes
}
// ensureVolumeMount makes sure the passed volume mounts are present in the list of volume mounts.
// If a volume mount is already present, it is updated.
func ensureVolumeMount(mounts []corev1.VolumeMount, volumeMounts ...corev1.VolumeMount) []corev1.VolumeMount {
for _, mount := range volumeMounts {
mountFound := false
for i := range mounts {
if mounts[i].Name == mount.Name {
mountFound = true
mounts[i] = mount
break
}
}
if !mountFound {
mounts = append(mounts, mount)
}
}
return mounts
}
// removeVolume removes a volume with a known name from a list of volumes.
func removeVolume(volumes []corev1.Volume, name string) []corev1.Volume {
var filteredVolumes []corev1.Volume
for _, volume := range volumes {
if volume.Name != name {
filteredVolumes = append(filteredVolumes, volume)
}
}
return filteredVolumes
}
func removeVolumeMount(mounts []corev1.VolumeMount, name string) []corev1.VolumeMount {
var filteredMounts []corev1.VolumeMount
for _, mount := range mounts {
if mount.Name != name {
filteredMounts = append(filteredMounts, mount)
}
}
return filteredMounts
}
// getCNPGJobRole gets the role associated to a CNPG job
func getCNPGJobRole(job *batchv1.Job) string {
const jobRoleLabelSuffix = "/jobRole"
for k, v := range job.Spec.Template.Labels {
if strings.HasSuffix(k, jobRoleLabelSuffix) {
return v
}
}
return ""
}