plugin-barman-cloud/internal/cnpgi/operator/lifecycle.go
Jonathan Gonzalez V. b7b62f1cb4
chore: add pprof server to operator and sidecar
We add the pprof server to the operator and the sidecar, to avoid any
conflict with the CloudNativePG operator and clusters we use the port
6061

Closes #421

Signed-off-by: Jonathan Gonzalez V. <jonathan.gonzalez@enterprisedb.com>
2025-06-26 16:57:56 +02:00

550 lines
14 KiB
Go

package operator
import (
"context"
"errors"
"fmt"
"strings"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder"
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/object"
"github.com/cloudnative-pg/cnpg-i/pkg/lifecycle"
"github.com/cloudnative-pg/machinery/pkg/log"
"github.com/spf13/viper"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
)
// LifecycleImplementation is the implementation of the lifecycle handler
type LifecycleImplementation struct {
lifecycle.UnimplementedOperatorLifecycleServer
Client client.Client
}
// GetCapabilities exposes the lifecycle capabilities
func (impl LifecycleImplementation) GetCapabilities(
_ context.Context,
_ *lifecycle.OperatorLifecycleCapabilitiesRequest,
) (*lifecycle.OperatorLifecycleCapabilitiesResponse, error) {
return &lifecycle.OperatorLifecycleCapabilitiesResponse{
LifecycleCapabilities: []*lifecycle.OperatorLifecycleCapabilities{
{
Group: "",
Kind: "Pod",
OperationTypes: []*lifecycle.OperatorOperationType{
{
Type: lifecycle.OperatorOperationType_TYPE_CREATE,
},
{
Type: lifecycle.OperatorOperationType_TYPE_EVALUATE,
},
},
},
{
Group: batchv1.GroupName,
Kind: "Job",
OperationTypes: []*lifecycle.OperatorOperationType{
{
Type: lifecycle.OperatorOperationType_TYPE_CREATE,
},
},
},
},
}, nil
}
// LifecycleHook is called when creating Kubernetes services
func (impl LifecycleImplementation) LifecycleHook(
ctx context.Context,
request *lifecycle.OperatorLifecycleRequest,
) (*lifecycle.OperatorLifecycleResponse, error) {
contextLogger := log.FromContext(ctx).WithName("lifecycle")
contextLogger.Info("Lifecycle hook reconciliation start")
operation := request.GetOperationType().GetType().Enum()
if operation == nil {
return nil, errors.New("no operation set")
}
kind, err := object.GetKind(request.GetObjectDefinition())
if err != nil {
return nil, err
}
var cluster cnpgv1.Cluster
if err := decoder.DecodeObjectLenient(
request.GetClusterDefinition(),
&cluster,
); err != nil {
return nil, err
}
pluginConfiguration := config.NewFromCluster(&cluster)
// barman object is required for both the archive and restore process
if err := pluginConfiguration.Validate(); err != nil {
contextLogger.Info("pluginConfiguration invalid, skipping lifecycle", "error", err)
return nil, nil
}
switch kind {
case "Pod":
contextLogger.Info("Reconciling pod")
return impl.reconcilePod(ctx, &cluster, request, pluginConfiguration)
case "Job":
contextLogger.Info("Reconciling job")
return impl.reconcileJob(ctx, &cluster, request, pluginConfiguration)
default:
return nil, fmt.Errorf("unsupported kind: %s", kind)
}
}
func (impl LifecycleImplementation) reconcileJob(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
env, err := impl.collectAdditionalEnvs(ctx, cluster.Namespace, pluginConfiguration)
if err != nil {
return nil, err
}
certificates, err := impl.collectAdditionalCertificates(ctx, pluginConfiguration)
if err != nil {
return nil, err
}
resources, err := impl.collectSidecarResourcesForRecoveryJob(ctx, pluginConfiguration)
if err != nil {
return nil, err
}
return reconcileJob(ctx, cluster, request, sidecarConfiguration{
env: env,
certificates: certificates,
resources: resources,
})
}
type sidecarConfiguration struct {
env []corev1.EnvVar
certificates []corev1.VolumeProjection
resources corev1.ResourceRequirements
}
func reconcileJob(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
config sidecarConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
contextLogger := log.FromContext(ctx).WithName("lifecycle")
if pluginConfig := cluster.GetRecoverySourcePlugin(); pluginConfig == nil || pluginConfig.Name != metadata.PluginName {
contextLogger.Debug("cluster does not use the this plugin for recovery, skipping")
return nil, nil
}
var job batchv1.Job
if err := decoder.DecodeObjectStrict(
request.GetObjectDefinition(),
&job,
batchv1.SchemeGroupVersion.WithKind("Job"),
); err != nil {
contextLogger.Error(err, "failed to decode job")
return nil, err
}
contextLogger = log.FromContext(ctx).WithName("plugin-barman-cloud-lifecycle").
WithValues("jobName", job.Name)
contextLogger.Debug("starting job reconciliation")
jobRole := getCNPGJobRole(&job)
if jobRole != "full-recovery" &&
jobRole != "snapshot-recovery" {
contextLogger.Debug("job is not a recovery job, skipping")
return nil, nil
}
mutatedJob := job.DeepCopy()
if err := reconcilePodSpec(
cluster,
&mutatedJob.Spec.Template.Spec,
jobRole,
corev1.Container{
Args: []string{"restore"},
},
config,
); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for job: %w", err)
}
patch, err := object.CreatePatch(mutatedJob, &job)
if err != nil {
return nil, err
}
contextLogger.Debug("generated patch", "content", string(patch))
return &lifecycle.OperatorLifecycleResponse{
JsonPatch: patch,
}, nil
}
func (impl LifecycleImplementation) reconcilePod(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
env, err := impl.collectAdditionalEnvs(ctx, cluster.Namespace, pluginConfiguration)
if err != nil {
return nil, err
}
certificates, err := impl.collectAdditionalCertificates(ctx, pluginConfiguration)
if err != nil {
return nil, err
}
resources, err := impl.collectSidecarResourcesForPod(ctx, pluginConfiguration)
if err != nil {
return nil, err
}
return reconcilePod(ctx, cluster, request, pluginConfiguration, sidecarConfiguration{
env: env,
certificates: certificates,
resources: resources,
})
}
func reconcilePod(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
config sidecarConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
pod, err := decoder.DecodePodJSON(request.GetObjectDefinition())
if err != nil {
return nil, err
}
contextLogger := log.FromContext(ctx).WithName("plugin-barman-cloud-lifecycle").
WithValues("podName", pod.Name)
mutatedPod := pod.DeepCopy()
if len(pluginConfiguration.BarmanObjectName) != 0 ||
len(pluginConfiguration.ReplicaSourceBarmanObjectName) != 0 {
if err := reconcilePodSpec(
cluster,
&mutatedPod.Spec,
"postgres",
corev1.Container{
Args: []string{"instance"},
},
config,
); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err)
}
} else {
contextLogger.Debug("No need to mutate instance with no backup & archiving configuration")
}
patch, err := object.CreatePatch(mutatedPod, pod)
if err != nil {
return nil, err
}
contextLogger.Debug("generated patch", "content", string(patch))
return &lifecycle.OperatorLifecycleResponse{
JsonPatch: patch,
}, nil
}
func reconcilePodSpec(
cluster *cnpgv1.Cluster,
spec *corev1.PodSpec,
mainContainerName string,
sidecarTemplate corev1.Container,
config sidecarConfiguration,
) error {
envs := []corev1.EnvVar{
{
Name: "NAMESPACE",
Value: cluster.Namespace,
},
{
Name: "CLUSTER_NAME",
Value: cluster.Name,
},
{
// TODO: should we really use this one?
// should we mount an emptyDir volume just for that?
Name: "SPOOL_DIRECTORY",
Value: "/controller/wal-restore-spool",
},
{
Name: "CUSTOM_CNPG_GROUP",
Value: cluster.GetObjectKind().GroupVersionKind().Group,
},
{
Name: "CUSTOM_CNPG_VERSION",
Value: cluster.GetObjectKind().GroupVersionKind().Version,
},
}
envs = append(envs, config.env...)
baseProbe := &corev1.Probe{
FailureThreshold: 10,
TimeoutSeconds: 10,
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/manager", "healthcheck", "unix"},
},
},
}
// fixed values
sidecarTemplate.Name = "plugin-barman-cloud"
sidecarTemplate.Image = viper.GetString("sidecar-image")
sidecarTemplate.ImagePullPolicy = cluster.Spec.ImagePullPolicy
sidecarTemplate.StartupProbe = baseProbe.DeepCopy()
sidecarTemplate.SecurityContext = &corev1.SecurityContext{
AllowPrivilegeEscalation: ptr.To(false),
RunAsNonRoot: ptr.To(true),
Privileged: ptr.To(false),
ReadOnlyRootFilesystem: ptr.To(true),
SeccompProfile: &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeRuntimeDefault,
},
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{"ALL"},
},
}
sidecarTemplate.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyAlways)
sidecarTemplate.Resources = config.resources
sidecarTemplate.Ports = []corev1.ContainerPort{
{
Name: "pprof",
ContainerPort: 6061,
Protocol: corev1.ProtocolTCP,
},
}
// merge the main container envs if they aren't already set
for _, container := range spec.Containers {
if container.Name == mainContainerName {
for _, env := range container.Env {
found := false
for _, existingEnv := range sidecarTemplate.Env {
if existingEnv.Name == env.Name {
found = true
break
}
}
if !found {
sidecarTemplate.Env = append(sidecarTemplate.Env, env)
}
}
break
}
}
// merge the default envs if they aren't already set
for _, env := range envs {
found := false
for _, existingEnv := range sidecarTemplate.Env {
if existingEnv.Name == env.Name {
found = true
break
}
}
if !found {
sidecarTemplate.Env = append(sidecarTemplate.Env, env)
}
}
if len(config.certificates) > 0 {
sidecarTemplate.VolumeMounts = ensureVolumeMount(
sidecarTemplate.VolumeMounts,
corev1.VolumeMount{
Name: barmanCertificatesVolumeName,
MountPath: metadata.BarmanCertificatesPath,
})
spec.Volumes = ensureVolume(spec.Volumes, corev1.Volume{
Name: barmanCertificatesVolumeName,
VolumeSource: corev1.VolumeSource{
Projected: &corev1.ProjectedVolumeSource{
Sources: config.certificates,
},
},
})
} else {
sidecarTemplate.VolumeMounts = removeVolumeMount(sidecarTemplate.VolumeMounts, barmanCertificatesVolumeName)
spec.Volumes = removeVolume(spec.Volumes, barmanCertificatesVolumeName)
}
if err := injectPluginSidecarPodSpec(spec, &sidecarTemplate, mainContainerName); err != nil {
return err
}
return nil
}
// TODO: move to machinery once the logic is finalized
// InjectPluginVolumePodSpec injects the plugin volume into a CNPG Pod spec.
func InjectPluginVolumePodSpec(spec *corev1.PodSpec, mainContainerName string) {
const (
pluginVolumeName = "plugins"
pluginMountPath = "/plugins"
)
foundPluginVolume := false
for i := range spec.Volumes {
if spec.Volumes[i].Name == pluginVolumeName {
foundPluginVolume = true
}
}
if foundPluginVolume {
return
}
spec.Volumes = ensureVolume(spec.Volumes, corev1.Volume{
Name: pluginVolumeName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
})
for i := range spec.Containers {
if spec.Containers[i].Name == mainContainerName {
spec.Containers[i].VolumeMounts = ensureVolumeMount(
spec.Containers[i].VolumeMounts,
corev1.VolumeMount{
Name: pluginVolumeName,
MountPath: pluginMountPath,
},
)
}
}
}
// injectPluginSidecarPodSpec injects a plugin sidecar into a CNPG Pod spec.
func injectPluginSidecarPodSpec(
spec *corev1.PodSpec,
sidecar *corev1.Container,
mainContainerName string,
) error {
sidecar = sidecar.DeepCopy()
InjectPluginVolumePodSpec(spec, mainContainerName)
sidecarContainerFound := false
mainContainerFound := false
for i := range spec.Containers {
if spec.Containers[i].Name == mainContainerName {
sidecar.VolumeMounts = ensureVolumeMount(sidecar.VolumeMounts, spec.Containers[i].VolumeMounts...)
mainContainerFound = true
}
}
if !mainContainerFound {
return errors.New("main container not found")
}
for i := range spec.InitContainers {
if spec.InitContainers[i].Name == sidecar.Name {
sidecarContainerFound = true
spec.InitContainers[i] = *sidecar
}
}
if !sidecarContainerFound {
spec.InitContainers = append(spec.InitContainers, *sidecar)
}
return nil
}
// ensureVolume makes sure the passed volume is present in the list of volumes.
// If the volume is already present, it is updated.
func ensureVolume(volumes []corev1.Volume, volume corev1.Volume) []corev1.Volume {
volumeFound := false
for i := range volumes {
if volumes[i].Name == volume.Name {
volumeFound = true
volumes[i] = volume
}
}
if !volumeFound {
volumes = append(volumes, volume)
}
return volumes
}
// ensureVolumeMount makes sure the passed volume mounts are present in the list of volume mounts.
// If a volume mount is already present, it is updated.
func ensureVolumeMount(mounts []corev1.VolumeMount, volumeMounts ...corev1.VolumeMount) []corev1.VolumeMount {
for _, mount := range volumeMounts {
mountFound := false
for i := range mounts {
if mounts[i].Name == mount.Name {
mountFound = true
mounts[i] = mount
break
}
}
if !mountFound {
mounts = append(mounts, mount)
}
}
return mounts
}
// removeVolume removes a volume with a known name from a list of volumes.
func removeVolume(volumes []corev1.Volume, name string) []corev1.Volume {
var filteredVolumes []corev1.Volume
for _, volume := range volumes {
if volume.Name != name {
filteredVolumes = append(filteredVolumes, volume)
}
}
return filteredVolumes
}
func removeVolumeMount(mounts []corev1.VolumeMount, name string) []corev1.VolumeMount {
var filteredMounts []corev1.VolumeMount
for _, mount := range mounts {
if mount.Name != name {
filteredMounts = append(filteredMounts, mount)
}
}
return filteredMounts
}
// getCNPGJobRole gets the role associated to a CNPG job
func getCNPGJobRole(job *batchv1.Job) string {
const jobRoleLabelSuffix = "/jobRole"
for k, v := range job.Spec.Template.Labels {
if strings.HasSuffix(k, jobRoleLabelSuffix) {
return v
}
}
return ""
}