Files
Artyom Babiy 2485c599d5 Fix v25
2025-07-28 15:33:30 +02:00

225 lines
7.2 KiB
Go

package replication
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
mariadbv1alpha1 "github.com/mariadb-operator/mariadb-operator/v25/api/v1alpha1"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/builder"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/controller/configmap"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/controller/secret"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/controller/service"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/health"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/refresolver"
"github.com/mariadb-operator/mariadb-operator/v25/pkg/statefulset"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
type Option func(*ReplicationReconciler)
func WithRefResolver(rr *refresolver.RefResolver) Option {
return func(r *ReplicationReconciler) {
r.refResolver = rr
}
}
func WithSecretReconciler(sr *secret.SecretReconciler) Option {
return func(rr *ReplicationReconciler) {
rr.secretReconciler = sr
}
}
func WithServiceReconciler(sr *service.ServiceReconciler) Option {
return func(rr *ReplicationReconciler) {
rr.serviceReconciler = sr
}
}
type ReplicationReconciler struct {
client.Client
recorder record.EventRecorder
builder *builder.Builder
replConfig *ReplicationConfig
refResolver *refresolver.RefResolver
secretReconciler *secret.SecretReconciler
configMapreconciler *configmap.ConfigMapReconciler
serviceReconciler *service.ServiceReconciler
}
func NewReplicationReconciler(client client.Client, recorder record.EventRecorder, builder *builder.Builder, replConfig *ReplicationConfig,
opts ...Option) (*ReplicationReconciler, error) {
r := &ReplicationReconciler{
Client: client,
recorder: recorder,
builder: builder,
replConfig: replConfig,
}
for _, setOpt := range opts {
setOpt(r)
}
if r.refResolver == nil {
r.refResolver = refresolver.New(client)
}
if r.secretReconciler == nil {
reconciler, err := secret.NewSecretReconciler(client, builder)
if err != nil {
return nil, err
}
r.secretReconciler = reconciler
}
if r.configMapreconciler == nil {
r.configMapreconciler = configmap.NewConfigMapReconciler(client, builder)
}
if r.serviceReconciler == nil {
r.serviceReconciler = service.NewServiceReconciler(client)
}
return r, nil
}
type reconcileRequest struct {
mariadb *mariadbv1alpha1.MariaDB
key types.NamespacedName
clientSet *ReplicationClientSet
}
func (r *ReplicationReconciler) Reconcile(ctx context.Context, mdb *mariadbv1alpha1.MariaDB) (ctrl.Result, error) {
if !mdb.Replication().Enabled {
return ctrl.Result{}, nil
}
logger := log.FromContext(ctx).WithName("replication")
switchoverLogger := log.FromContext(ctx).WithName("switchover")
if !mdb.IsMaxScaleEnabled() && mdb.IsSwitchingPrimary() {
clientSet, err := NewReplicationClientSet(mdb, r.refResolver)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error creating mariadb clientset: %v", err)
}
defer clientSet.close()
req := reconcileRequest{
mariadb: mdb,
key: client.ObjectKeyFromObject(mdb),
clientSet: clientSet,
}
return ctrl.Result{}, r.reconcileSwitchover(ctx, &req, switchoverLogger)
}
healthy, err := health.IsStatefulSetHealthy(
ctx,
r.Client,
client.ObjectKeyFromObject(mdb),
health.WithDesiredReplicas(mdb.Spec.Replicas),
health.WithPort(mdb.Spec.Port),
health.WithEndpointPolicy(health.EndpointPolicyAll),
)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error checking MariaDB health: %v", err)
}
if !healthy {
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}
clientSet, err := NewReplicationClientSet(mdb, r.refResolver)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error creating mariadb clientset: %v", err)
}
defer clientSet.close()
req := reconcileRequest{
mariadb: mdb,
key: client.ObjectKeyFromObject(mdb),
clientSet: clientSet,
}
if result, err := r.reconcileReplication(ctx, &req, logger); !result.IsZero() || err != nil {
return result, err
}
return ctrl.Result{}, r.reconcileSwitchover(ctx, &req, switchoverLogger)
}
// nolint:lll
func (r *ReplicationReconciler) ReconcileProbeConfigMap(ctx context.Context, configMapKeyRef mariadbv1alpha1.ConfigMapKeySelector,
mdb *mariadbv1alpha1.MariaDB) error {
if !mdb.Replication().Enabled {
return nil
}
req := configmap.ReconcileRequest{
Metadata: mdb.Spec.InheritMetadata,
Owner: mdb,
Key: types.NamespacedName{
Name: configMapKeyRef.Name,
Namespace: mdb.Namespace,
},
Data: map[string]string{
configMapKeyRef.Key: `#!/bin/bash
if [[ $(mariadb -u root -p"${MARIADB_ROOT_PASSWORD}" -e "SHOW VARIABLES LIKE 'rpl_semi_sync_slave_enabled';" --skip-column-names | grep -c "ON") -eq 1 ]]; then
mariadb -u root -p"${MARIADB_ROOT_PASSWORD}" -e "SHOW SLAVE STATUS\G" | grep -c "Slave_IO_Running: Yes"
else
mariadb -u root -p"${MARIADB_ROOT_PASSWORD}" -e "SELECT 1;"
fi
`,
},
}
return r.configMapreconciler.Reconcile(ctx, &req)
}
func (r *ReplicationReconciler) reconcileReplication(ctx context.Context, req *reconcileRequest, logger logr.Logger) (ctrl.Result, error) {
if req.mariadb.IsSwitchingPrimary() {
return ctrl.Result{}, nil
}
if req.mariadb.Status.CurrentPrimaryPodIndex == nil {
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}
for i := 0; i < int(req.mariadb.Spec.Replicas); i++ {
pod := statefulset.PodName(req.mariadb.ObjectMeta, i)
if req.mariadb.Status.ReplicationStatus == nil {
if err := r.reconcileReplicationInPod(ctx, req, logger, i); err != nil {
return ctrl.Result{}, fmt.Errorf("error configuring replication in Pod '%s': %v", pod, err)
}
}
state, ok := req.mariadb.Status.ReplicationStatus[pod]
if !ok || state == mariadbv1alpha1.ReplicationStateNotConfigured {
if err := r.reconcileReplicationInPod(ctx, req, logger, i); err != nil {
return ctrl.Result{}, fmt.Errorf("error configuring replication in Pod '%s': %v", pod, err)
}
}
}
return ctrl.Result{}, nil
}
func (r *ReplicationReconciler) reconcileReplicationInPod(ctx context.Context, req *reconcileRequest, logger logr.Logger, index int) error {
pod := statefulset.PodName(req.mariadb.ObjectMeta, index)
primaryPodIndex := *req.mariadb.Status.CurrentPrimaryPodIndex
if primaryPodIndex == index {
logger.Info("Configuring primary", "pod", pod)
client, err := req.clientSet.currentPrimaryClient(ctx)
if err != nil {
return fmt.Errorf("error getting current primary client: %v", err)
}
return r.replConfig.ConfigurePrimary(ctx, req.mariadb, client, index)
}
logger.Info("Configuring replica", "pod", pod)
client, err := req.clientSet.clientForIndex(ctx, index)
if err != nil {
return fmt.Errorf("error getting replica client: %v", err)
}
return r.replConfig.ConfigureReplica(ctx, req.mariadb, client, index, primaryPodIndex, false)
}
func (r *ReplicationReconciler) patchStatus(ctx context.Context, mariadb *mariadbv1alpha1.MariaDB,
patcher func(*mariadbv1alpha1.MariaDBStatus)) error {
patch := client.MergeFrom(mariadb.DeepCopy())
patcher(&mariadb.Status)
return r.Status().Patch(ctx, mariadb, patch)
}