mirror of
https://github.com/mariadb-operator/mariadb-operator.git
synced 2025-08-08 06:35:58 +00:00
225 lines
7.2 KiB
Go
225 lines
7.2 KiB
Go
package replication
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
mariadbv1alpha1 "github.com/mariadb-operator/mariadb-operator/v25/api/v1alpha1"
|
|
"github.com/mariadb-operator/mariadb-operator/v25/pkg/builder"
|
|
"github.com/mariadb-operator/mariadb-operator/v25/pkg/controller/configmap"
|
|
"github.com/mariadb-operator/mariadb-operator/v25/pkg/controller/secret"
|
|
"github.com/mariadb-operator/mariadb-operator/v25/pkg/controller/service"
|
|
"github.com/mariadb-operator/mariadb-operator/v25/pkg/health"
|
|
"github.com/mariadb-operator/mariadb-operator/v25/pkg/refresolver"
|
|
"github.com/mariadb-operator/mariadb-operator/v25/pkg/statefulset"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/tools/record"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
)
|
|
|
|
type Option func(*ReplicationReconciler)
|
|
|
|
func WithRefResolver(rr *refresolver.RefResolver) Option {
|
|
return func(r *ReplicationReconciler) {
|
|
r.refResolver = rr
|
|
}
|
|
}
|
|
|
|
func WithSecretReconciler(sr *secret.SecretReconciler) Option {
|
|
return func(rr *ReplicationReconciler) {
|
|
rr.secretReconciler = sr
|
|
}
|
|
}
|
|
|
|
func WithServiceReconciler(sr *service.ServiceReconciler) Option {
|
|
return func(rr *ReplicationReconciler) {
|
|
rr.serviceReconciler = sr
|
|
}
|
|
}
|
|
|
|
type ReplicationReconciler struct {
|
|
client.Client
|
|
recorder record.EventRecorder
|
|
builder *builder.Builder
|
|
replConfig *ReplicationConfig
|
|
refResolver *refresolver.RefResolver
|
|
secretReconciler *secret.SecretReconciler
|
|
configMapreconciler *configmap.ConfigMapReconciler
|
|
serviceReconciler *service.ServiceReconciler
|
|
}
|
|
|
|
func NewReplicationReconciler(client client.Client, recorder record.EventRecorder, builder *builder.Builder, replConfig *ReplicationConfig,
|
|
opts ...Option) (*ReplicationReconciler, error) {
|
|
r := &ReplicationReconciler{
|
|
Client: client,
|
|
recorder: recorder,
|
|
builder: builder,
|
|
replConfig: replConfig,
|
|
}
|
|
for _, setOpt := range opts {
|
|
setOpt(r)
|
|
}
|
|
if r.refResolver == nil {
|
|
r.refResolver = refresolver.New(client)
|
|
}
|
|
if r.secretReconciler == nil {
|
|
reconciler, err := secret.NewSecretReconciler(client, builder)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r.secretReconciler = reconciler
|
|
}
|
|
if r.configMapreconciler == nil {
|
|
r.configMapreconciler = configmap.NewConfigMapReconciler(client, builder)
|
|
}
|
|
if r.serviceReconciler == nil {
|
|
r.serviceReconciler = service.NewServiceReconciler(client)
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
type reconcileRequest struct {
|
|
mariadb *mariadbv1alpha1.MariaDB
|
|
key types.NamespacedName
|
|
clientSet *ReplicationClientSet
|
|
}
|
|
|
|
func (r *ReplicationReconciler) Reconcile(ctx context.Context, mdb *mariadbv1alpha1.MariaDB) (ctrl.Result, error) {
|
|
if !mdb.Replication().Enabled {
|
|
return ctrl.Result{}, nil
|
|
}
|
|
logger := log.FromContext(ctx).WithName("replication")
|
|
switchoverLogger := log.FromContext(ctx).WithName("switchover")
|
|
|
|
if !mdb.IsMaxScaleEnabled() && mdb.IsSwitchingPrimary() {
|
|
clientSet, err := NewReplicationClientSet(mdb, r.refResolver)
|
|
if err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error creating mariadb clientset: %v", err)
|
|
}
|
|
defer clientSet.close()
|
|
|
|
req := reconcileRequest{
|
|
mariadb: mdb,
|
|
key: client.ObjectKeyFromObject(mdb),
|
|
clientSet: clientSet,
|
|
}
|
|
return ctrl.Result{}, r.reconcileSwitchover(ctx, &req, switchoverLogger)
|
|
}
|
|
healthy, err := health.IsStatefulSetHealthy(
|
|
ctx,
|
|
r.Client,
|
|
client.ObjectKeyFromObject(mdb),
|
|
health.WithDesiredReplicas(mdb.Spec.Replicas),
|
|
health.WithPort(mdb.Spec.Port),
|
|
health.WithEndpointPolicy(health.EndpointPolicyAll),
|
|
)
|
|
if err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error checking MariaDB health: %v", err)
|
|
}
|
|
if !healthy {
|
|
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
|
|
}
|
|
|
|
clientSet, err := NewReplicationClientSet(mdb, r.refResolver)
|
|
if err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error creating mariadb clientset: %v", err)
|
|
}
|
|
defer clientSet.close()
|
|
|
|
req := reconcileRequest{
|
|
mariadb: mdb,
|
|
key: client.ObjectKeyFromObject(mdb),
|
|
clientSet: clientSet,
|
|
}
|
|
if result, err := r.reconcileReplication(ctx, &req, logger); !result.IsZero() || err != nil {
|
|
return result, err
|
|
}
|
|
return ctrl.Result{}, r.reconcileSwitchover(ctx, &req, switchoverLogger)
|
|
}
|
|
|
|
// nolint:lll
|
|
func (r *ReplicationReconciler) ReconcileProbeConfigMap(ctx context.Context, configMapKeyRef mariadbv1alpha1.ConfigMapKeySelector,
|
|
mdb *mariadbv1alpha1.MariaDB) error {
|
|
if !mdb.Replication().Enabled {
|
|
return nil
|
|
}
|
|
req := configmap.ReconcileRequest{
|
|
Metadata: mdb.Spec.InheritMetadata,
|
|
Owner: mdb,
|
|
Key: types.NamespacedName{
|
|
Name: configMapKeyRef.Name,
|
|
Namespace: mdb.Namespace,
|
|
},
|
|
Data: map[string]string{
|
|
configMapKeyRef.Key: `#!/bin/bash
|
|
|
|
if [[ $(mariadb -u root -p"${MARIADB_ROOT_PASSWORD}" -e "SHOW VARIABLES LIKE 'rpl_semi_sync_slave_enabled';" --skip-column-names | grep -c "ON") -eq 1 ]]; then
|
|
mariadb -u root -p"${MARIADB_ROOT_PASSWORD}" -e "SHOW SLAVE STATUS\G" | grep -c "Slave_IO_Running: Yes"
|
|
else
|
|
mariadb -u root -p"${MARIADB_ROOT_PASSWORD}" -e "SELECT 1;"
|
|
fi
|
|
`,
|
|
},
|
|
}
|
|
return r.configMapreconciler.Reconcile(ctx, &req)
|
|
}
|
|
|
|
func (r *ReplicationReconciler) reconcileReplication(ctx context.Context, req *reconcileRequest, logger logr.Logger) (ctrl.Result, error) {
|
|
if req.mariadb.IsSwitchingPrimary() {
|
|
return ctrl.Result{}, nil
|
|
}
|
|
if req.mariadb.Status.CurrentPrimaryPodIndex == nil {
|
|
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
|
|
}
|
|
|
|
for i := 0; i < int(req.mariadb.Spec.Replicas); i++ {
|
|
pod := statefulset.PodName(req.mariadb.ObjectMeta, i)
|
|
|
|
if req.mariadb.Status.ReplicationStatus == nil {
|
|
if err := r.reconcileReplicationInPod(ctx, req, logger, i); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error configuring replication in Pod '%s': %v", pod, err)
|
|
}
|
|
}
|
|
|
|
state, ok := req.mariadb.Status.ReplicationStatus[pod]
|
|
if !ok || state == mariadbv1alpha1.ReplicationStateNotConfigured {
|
|
if err := r.reconcileReplicationInPod(ctx, req, logger, i); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error configuring replication in Pod '%s': %v", pod, err)
|
|
}
|
|
}
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *ReplicationReconciler) reconcileReplicationInPod(ctx context.Context, req *reconcileRequest, logger logr.Logger, index int) error {
|
|
pod := statefulset.PodName(req.mariadb.ObjectMeta, index)
|
|
primaryPodIndex := *req.mariadb.Status.CurrentPrimaryPodIndex
|
|
|
|
if primaryPodIndex == index {
|
|
logger.Info("Configuring primary", "pod", pod)
|
|
client, err := req.clientSet.currentPrimaryClient(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting current primary client: %v", err)
|
|
}
|
|
return r.replConfig.ConfigurePrimary(ctx, req.mariadb, client, index)
|
|
}
|
|
|
|
logger.Info("Configuring replica", "pod", pod)
|
|
client, err := req.clientSet.clientForIndex(ctx, index)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting replica client: %v", err)
|
|
}
|
|
return r.replConfig.ConfigureReplica(ctx, req.mariadb, client, index, primaryPodIndex, false)
|
|
}
|
|
|
|
func (r *ReplicationReconciler) patchStatus(ctx context.Context, mariadb *mariadbv1alpha1.MariaDB,
|
|
patcher func(*mariadbv1alpha1.MariaDBStatus)) error {
|
|
patch := client.MergeFrom(mariadb.DeepCopy())
|
|
patcher(&mariadb.Status)
|
|
return r.Status().Patch(ctx, mariadb, patch)
|
|
}
|