Files
mariadb-operator/internal/controller/physicalbackup_controller_snapshot.go
Artyom Babiy 2485c599d5 Fix v25
2025-07-28 15:33:30 +02:00

344 lines
12 KiB
Go

package controller
import (
"context"
"errors"
"fmt"
"time"
volumesnapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1"
mariadbv1alpha1 "github.com/mariadb-operator/mariadb-operator/v25/api/v1alpha1"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/builder"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/metadata"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/predicate"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/sql"
mdbsnapshot "github.com/mariadb-operator/mariadb-operator/v25/pkg/volumesnapshot"
"github.com/robfig/cron/v3"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
ctrlbuilder "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
func (r *PhysicalBackupReconciler) reconcileSnapshots(ctx context.Context, backup *mariadbv1alpha1.PhysicalBackup,
mariadb *mariadbv1alpha1.MariaDB) (ctrl.Result, error) {
exist, err := r.Discovery.VolumeSnapshotExist()
if err != nil {
return ctrl.Result{}, err
}
if !exist {
r.Recorder.Event(backup, corev1.EventTypeWarning, mariadbv1alpha1.ReasonCRDNotFound,
"Unable to reconcile PhysicalBackup: VolumeSnapshot CRD not installed in the cluster")
log.FromContext(ctx).Error(errors.New("VolumeSnapshot CRD not installed in the cluster"), "Unable to reconcile PhysicalBackup")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
snapshotList, err := mdbsnapshot.ListVolumeSnapshots(ctx, r.Client, backup)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error listing VolumeSnapshots: %v", err)
}
if err := r.reconcileSnapshotStatus(ctx, backup, snapshotList); err != nil {
return ctrl.Result{}, fmt.Errorf("error reconciling status: %v", err)
}
if err := r.cleanupSnapshots(ctx, backup, snapshotList); err != nil {
return ctrl.Result{}, fmt.Errorf("error cleaning up Jobs: %v", err)
}
if result, err := r.waitForInProgressSnapshots(ctx, backup, snapshotList); !result.IsZero() || err != nil {
return result, err
}
return r.reconcileTemplate(ctx, backup, len(snapshotList.Items), func(now time.Time, cronSchedule cron.Schedule) (ctrl.Result, error) {
return r.scheduleSnapshot(ctx, backup, mariadb, now, cronSchedule)
})
}
func (r *PhysicalBackupReconciler) watchSnapshots(ctx context.Context, builder *ctrlbuilder.Builder) error {
volumeSnapshotExists, err := r.Discovery.VolumeSnapshotExist()
if err != nil {
return fmt.Errorf("error discovering VolumeSnapshot: %v", err)
}
if volumeSnapshotExists {
log.FromContext(ctx).
WithName("watcher").
WithValues(
"kind", "VolumeSnapshot",
"label", metadata.PhysicalBackupNameLabel,
).
Info("Watching labeled VolumeSnapshots")
builder.Watches(
&volumesnapshotv1.VolumeSnapshot{},
handler.EnqueueRequestsFromMapFunc(r.mapVolumeSnapshotsToRequests),
ctrlbuilder.WithPredicates(
predicate.PredicateWithLabel(metadata.PhysicalBackupNameLabel),
),
)
}
return nil
}
func (r *PhysicalBackupReconciler) mapVolumeSnapshotsToRequests(ctx context.Context, obj client.Object) []reconcile.Request {
physicalBackupName, ok := obj.GetLabels()[metadata.PhysicalBackupNameLabel]
if !ok {
return nil
}
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Name: physicalBackupName,
Namespace: obj.GetNamespace(),
},
},
}
}
func (r *PhysicalBackupReconciler) reconcileSnapshotStatus(ctx context.Context, backup *mariadbv1alpha1.PhysicalBackup,
snapshotList *volumesnapshotv1.VolumeSnapshotList) error {
logger := log.FromContext(ctx).WithName("status").V(1)
schedule := ptr.Deref(backup.Spec.Schedule, mariadbv1alpha1.PhysicalBackupSchedule{})
if schedule.Suspend {
if err := r.patchStatus(ctx, backup, func(status *mariadbv1alpha1.PhysicalBackupStatus) {
status.SetCondition(metav1.Condition{
Type: mariadbv1alpha1.ConditionTypeComplete,
Status: metav1.ConditionFalse,
Reason: mariadbv1alpha1.ConditionReasonSnapshotSuspended,
Message: "Suspended",
})
}); err != nil {
logger.Info("error patching status", "err", err)
}
return nil
}
numReady := 0
for _, snapshot := range snapshotList.Items {
status := ptr.Deref(snapshot.Status, volumesnapshotv1.VolumeSnapshotStatus{})
ready := ptr.Deref(status.ReadyToUse, false)
if status.Error != nil {
message := ptr.Deref(status.Error.Message, "Error")
if err := r.patchStatus(ctx, backup, func(status *mariadbv1alpha1.PhysicalBackupStatus) {
status.SetCondition(metav1.Condition{
Type: mariadbv1alpha1.ConditionTypeComplete,
Status: metav1.ConditionFalse,
Reason: mariadbv1alpha1.ConditionReasonSnapshotFailed,
Message: message,
})
}); err != nil {
logger.Info("error patching status", "err", err)
}
return nil
} else if ready {
numReady++
}
}
if len(snapshotList.Items) > 0 && numReady == len(snapshotList.Items) {
if err := r.patchStatus(ctx, backup, func(status *mariadbv1alpha1.PhysicalBackupStatus) {
status.SetCondition(metav1.Condition{
Type: mariadbv1alpha1.ConditionTypeComplete,
Status: metav1.ConditionTrue,
Reason: mariadbv1alpha1.ConditionReasonSnapshotComplete,
Message: "Success",
})
}); err != nil {
logger.Info("error patching status", "err", err)
}
} else if len(snapshotList.Items) > 0 {
if err := r.patchStatus(ctx, backup, func(status *mariadbv1alpha1.PhysicalBackupStatus) {
status.SetCondition(metav1.Condition{
Type: mariadbv1alpha1.ConditionTypeComplete,
Status: metav1.ConditionFalse,
Reason: mariadbv1alpha1.ConditionReasonSnapshotInProgress,
Message: "In progress",
})
}); err != nil {
logger.Info("error patching status", "err", err)
}
} else {
message := "Not complete"
if backup.Spec.Schedule != nil {
message = "Scheduled"
}
if err := r.patchStatus(ctx, backup, func(status *mariadbv1alpha1.PhysicalBackupStatus) {
status.SetCondition(metav1.Condition{
Type: mariadbv1alpha1.ConditionTypeComplete,
Status: metav1.ConditionFalse,
Reason: mariadbv1alpha1.ConditionReasonSnapshotNotComplete,
Message: message,
})
}); err != nil {
logger.Info("error patching status", "err", err)
}
}
return nil
}
func (r *PhysicalBackupReconciler) cleanupSnapshots(ctx context.Context, backup *mariadbv1alpha1.PhysicalBackup,
snapshotList *volumesnapshotv1.VolumeSnapshotList) error {
if backup.Spec.Schedule == nil {
return nil
}
var readySnapshotNames []string
for _, snapshot := range snapshotList.Items {
if mdbsnapshot.IsVolumeSnapshotReady(&snapshot) {
readySnapshotNames = append(readySnapshotNames, snapshot.Name)
}
}
maxRetention := backup.Spec.MaxRetention
if maxRetention == (metav1.Duration{}) {
maxRetention = mariadbv1alpha1.DefaultPhysicalBackupMaxRetention
}
logger := log.FromContext(ctx).WithName("snapshot")
oldSnapshotNames := r.BackupProcessor.GetOldBackupFiles(readySnapshotNames, maxRetention.Duration, logger)
for _, snapshotName := range oldSnapshotNames {
key := types.NamespacedName{
Name: snapshotName,
Namespace: backup.Namespace,
}
var snapshot volumesnapshotv1.VolumeSnapshot
if err := r.Get(ctx, key, &snapshot); err != nil {
if apierrors.IsNotFound(err) {
continue
}
return fmt.Errorf("error getting VolumeSnapshot \"%s\": %v", key.Name, err)
}
err := r.Delete(ctx, &snapshot)
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("error deleting VolumeSnapshot \"%s\": %v", snapshot.Name, err)
}
logger.V(1).Info("Deleted old Snapshot", "snapshot", key.Name, "physicalbackup", backup.Name)
}
return nil
}
func (r *PhysicalBackupReconciler) scheduleSnapshot(ctx context.Context, backup *mariadbv1alpha1.PhysicalBackup,
mariadb *mariadbv1alpha1.MariaDB, now time.Time, schedule cron.Schedule) (ctrl.Result, error) {
if mariadb.Status.CurrentPrimaryPodIndex == nil {
log.FromContext(ctx).V(1).Info("Current primary not set. Requeuing...")
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
snapshotKey := types.NamespacedName{
Name: getObjectName(backup, now),
Namespace: mariadb.Namespace,
}
if err := r.createVolumeSnapshot(ctx, snapshotKey, backup, mariadb); err != nil {
return ctrl.Result{}, fmt.Errorf("error creating VolumeSnapshot: %v", err)
}
if err := r.patchStatus(ctx, backup, func(status *mariadbv1alpha1.PhysicalBackupStatus) {
status.LastScheduleCheckTime = &metav1.Time{
Time: now,
}
status.LastScheduleTime = &metav1.Time{
Time: now,
}
if schedule != nil {
status.NextScheduleTime = &metav1.Time{
Time: schedule.Next(now),
}
}
}); err != nil {
return ctrl.Result{}, fmt.Errorf("error patching status: %v", err)
}
return ctrl.Result{}, nil
}
func (r *PhysicalBackupReconciler) createVolumeSnapshot(ctx context.Context, snapshotKey types.NamespacedName,
backup *mariadbv1alpha1.PhysicalBackup, mariadb *mariadbv1alpha1.MariaDB) error {
if mariadb.Status.CurrentPrimaryPodIndex == nil {
return errors.New("CurrentPrimaryPodIndex must be set")
}
podIndex := *mariadb.Status.CurrentPrimaryPodIndex
logger := log.FromContext(ctx).
WithName("snapshot").
WithValues(
"mariadb", mariadb.Name,
"pod-index", podIndex,
)
client, err := sql.NewInternalClientWithPodIndex(ctx, mariadb, r.RefResolver, podIndex)
if err != nil {
return fmt.Errorf("error getting SQL client: %v", err)
}
defer client.Close()
logger.V(1).Info("Locking tables with read lock")
if err := client.LockTablesWithReadLock(ctx); err != nil {
return fmt.Errorf("error locking tables with read lock: %v", err)
}
defer func() {
logger.V(1).Info("Unlocking tables with read lock")
if err := client.UnlockTables(ctx); err != nil {
logger.Error(err, "error unlocking tables")
}
}()
primaryPvcKey := mariadb.PVCKey(builder.StorageVolume, *mariadb.Status.CurrentPrimaryPodIndex)
desiredSnapshot, err := r.Builder.BuildVolumeSnapshot(snapshotKey, backup, primaryPvcKey)
if err != nil {
return fmt.Errorf("error building VolumeSnapshot: %v", err)
}
var snapshot volumesnapshotv1.VolumeSnapshot
if err = r.Get(ctx, snapshotKey, &snapshot); err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("error getting VolumeSnapshot: %v", err)
}
if err := r.Create(ctx, desiredSnapshot); err != nil {
return fmt.Errorf("error creating VolumeSnapshot: %v", err)
}
r.Recorder.Eventf(
backup,
corev1.EventTypeNormal,
mariadbv1alpha1.ReasonVolumeSnapshotCreated,
"VolumeSnapshot %s scheduled",
desiredSnapshot.Name,
)
}
return nil // TODO: handle already exists
}
func (r *PhysicalBackupReconciler) waitForInProgressSnapshots(ctx context.Context, backup *mariadbv1alpha1.PhysicalBackup,
snapshotList *volumesnapshotv1.VolumeSnapshotList) (ctrl.Result, error) {
for _, snapshot := range snapshotList.Items {
if !mdbsnapshot.IsVolumeSnapshotReady(&snapshot) {
if backup.Spec.Timeout != nil && !snapshot.CreationTimestamp.IsZero() &&
time.Since(snapshot.CreationTimestamp.Time) > backup.Spec.Timeout.Duration {
log.FromContext(ctx).Info("PhysicalBackup VolumeSnapshot timed out. Deleting...", "snapshot", snapshot.Name)
if err := r.Delete(ctx, &snapshot); err != nil {
return ctrl.Result{}, fmt.Errorf("error deleting expired VolumeSnapshot: %v", err)
}
return ctrl.Result{Requeue: true}, nil
}
status := ptr.Deref(snapshot.Status, volumesnapshotv1.VolumeSnapshotStatus{})
log.FromContext(ctx).V(1).Info(
"PhysicalBackup VolumeSnapshot is not ready. Requeuing...",
"snapshot", snapshot.Name,
"ready", ptr.Deref(status.ReadyToUse, false),
"error", status.Error,
)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
}
return ctrl.Result{}, nil
}