mirror of
https://github.com/mariadb-operator/mariadb-operator.git
synced 2026-01-20 08:30:11 +00:00
216 lines
6.5 KiB
Go
216 lines
6.5 KiB
Go
package sql
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-multierror"
|
|
mariadbv1alpha1 "github.com/mariadb-operator/mariadb-operator/v25/api/v1alpha1"
|
|
condition "github.com/mariadb-operator/mariadb-operator/v25/pkg/condition"
|
|
"github.com/mariadb-operator/mariadb-operator/v25/pkg/health"
|
|
"github.com/mariadb-operator/mariadb-operator/v25/pkg/interfaces"
|
|
"github.com/mariadb-operator/mariadb-operator/v25/pkg/refresolver"
|
|
sqlClient "github.com/mariadb-operator/mariadb-operator/v25/pkg/sql"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
)
|
|
|
|
type SqlOptions struct {
|
|
RequeueInterval time.Duration
|
|
RequeueMaxOffset time.Duration
|
|
LogSql bool
|
|
}
|
|
|
|
type SqlOpt func(*SqlOptions)
|
|
|
|
func WithRequeueInterval(interval time.Duration) SqlOpt {
|
|
return func(opts *SqlOptions) {
|
|
opts.RequeueInterval = interval
|
|
}
|
|
}
|
|
|
|
func WithRequeueMaxOffset(offset time.Duration) SqlOpt {
|
|
return func(opts *SqlOptions) {
|
|
opts.RequeueMaxOffset = offset
|
|
}
|
|
}
|
|
|
|
func WithLogSql(logSql bool) SqlOpt {
|
|
return func(opts *SqlOptions) {
|
|
opts.LogSql = logSql
|
|
}
|
|
}
|
|
|
|
type SqlReconciler struct {
|
|
Client ctrlclient.Client
|
|
RefResolver *refresolver.RefResolver
|
|
ConditionReady *condition.Ready
|
|
|
|
WrappedReconciler WrappedReconciler
|
|
Finalizer Finalizer
|
|
|
|
SqlOptions
|
|
}
|
|
|
|
func NewSqlReconciler(client ctrlclient.Client, cr *condition.Ready, wr WrappedReconciler, f Finalizer,
|
|
opts ...SqlOpt) Reconciler {
|
|
reconciler := &SqlReconciler{
|
|
Client: client,
|
|
RefResolver: refresolver.New(client),
|
|
ConditionReady: cr,
|
|
WrappedReconciler: wr,
|
|
Finalizer: f,
|
|
SqlOptions: SqlOptions{
|
|
RequeueInterval: 10 * time.Hour,
|
|
RequeueMaxOffset: 1 * time.Hour,
|
|
LogSql: false,
|
|
},
|
|
}
|
|
for _, setOpt := range opts {
|
|
setOpt(&reconciler.SqlOptions)
|
|
}
|
|
return reconciler
|
|
}
|
|
|
|
func (r *SqlReconciler) Reconcile(ctx context.Context, resource Resource) (ctrl.Result, error) {
|
|
if resource.IsBeingDeleted() {
|
|
if result, err := r.Finalizer.Finalize(ctx, resource); !result.IsZero() || err != nil {
|
|
return result, err
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
mariadb, err := r.RefResolver.MariaDBObject(ctx, resource.MariaDBRef(), resource.GetNamespace())
|
|
if err != nil {
|
|
var errBundle *multierror.Error
|
|
errBundle = multierror.Append(errBundle, err)
|
|
|
|
err = r.WrappedReconciler.PatchStatus(ctx, r.ConditionReady.PatcherRefResolver(err, mariadb))
|
|
errBundle = multierror.Append(errBundle, err)
|
|
|
|
return ctrl.Result{}, fmt.Errorf("error getting MariaDB: %v", errBundle)
|
|
}
|
|
|
|
if result, err := waitForMariaDB(ctx, r.Client, mariadb, r.LogSql); !result.IsZero() || err != nil {
|
|
var errBundle *multierror.Error
|
|
|
|
if err != nil {
|
|
errBundle = multierror.Append(errBundle, err)
|
|
|
|
err := r.WrappedReconciler.PatchStatus(ctx, r.ConditionReady.PatcherWithError(err))
|
|
errBundle = multierror.Append(errBundle, err)
|
|
}
|
|
|
|
return result, errBundle.ErrorOrNil()
|
|
}
|
|
|
|
// TODO: connection pooling. See https://github.com/mariadb-operator/mariadb-operator/issues/7.
|
|
mdbClient, err := sqlClient.NewClientWithMariaDB(ctx, mariadb, r.RefResolver)
|
|
if err != nil {
|
|
var errBundle *multierror.Error
|
|
errBundle = multierror.Append(errBundle, err)
|
|
|
|
msg := fmt.Sprintf("Error connecting to MariaDB: %v", err)
|
|
err = r.WrappedReconciler.PatchStatus(ctx, r.ConditionReady.PatcherFailed(msg))
|
|
errBundle = multierror.Append(errBundle, err)
|
|
|
|
return r.retryResult(ctx, resource, errBundle)
|
|
}
|
|
defer mdbClient.Close()
|
|
|
|
err = r.WrappedReconciler.Reconcile(ctx, mdbClient)
|
|
var errBundle *multierror.Error
|
|
errBundle = multierror.Append(errBundle, err)
|
|
|
|
if err := errBundle.ErrorOrNil(); err != nil {
|
|
msg := fmt.Sprintf("Error creating %s: %v", resource.GetName(), err)
|
|
err = r.WrappedReconciler.PatchStatus(ctx, r.ConditionReady.PatcherFailed(msg))
|
|
errBundle = multierror.Append(errBundle, err)
|
|
|
|
return r.retryResult(ctx, resource, errBundle)
|
|
}
|
|
|
|
if err = r.Finalizer.AddFinalizer(ctx); err != nil {
|
|
errBundle = multierror.Append(errBundle, fmt.Errorf("error adding finalizer to %s: %v", resource.GetName(), err))
|
|
}
|
|
|
|
err = r.WrappedReconciler.PatchStatus(ctx, r.ConditionReady.PatcherWithError(errBundle.ErrorOrNil()))
|
|
errBundle = multierror.Append(errBundle, err)
|
|
|
|
return r.requeueResult(ctx, resource, errBundle.ErrorOrNil())
|
|
}
|
|
|
|
func (r *SqlReconciler) retryResult(ctx context.Context, resource Resource, err error) (ctrl.Result, error) {
|
|
if resource.RetryInterval() != nil {
|
|
log.FromContext(ctx).Error(err, "Error reconciling SQL resource", "resource", resource.GetName())
|
|
return ctrl.Result{RequeueAfter: resource.RetryInterval().Duration}, nil
|
|
}
|
|
if err != nil {
|
|
if r.LogSql {
|
|
log.FromContext(ctx).V(1).Info("Error reconciling SQL resource", "err", err)
|
|
}
|
|
return ctrl.Result{Requeue: true}, nil
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *SqlReconciler) requeueResult(ctx context.Context, resource Resource, err error) (ctrl.Result, error) {
|
|
if err != nil {
|
|
log.FromContext(ctx).V(1).Info("Error reconciling SQL resource", "err", err)
|
|
return ctrl.Result{Requeue: true}, nil
|
|
}
|
|
if resource.RequeueInterval() != nil {
|
|
requeueInterval := r.addRequeueIntervalOffset(resource.RequeueInterval().Duration)
|
|
if r.LogSql {
|
|
log.FromContext(ctx).V(1).Info("Requeuing SQL resource")
|
|
}
|
|
return ctrl.Result{RequeueAfter: requeueInterval}, nil
|
|
}
|
|
if r.RequeueInterval > 0 {
|
|
requeueInterval := r.addRequeueIntervalOffset(r.RequeueInterval)
|
|
if r.LogSql {
|
|
log.FromContext(ctx).V(1).Info("Requeuing SQL resource")
|
|
}
|
|
return ctrl.Result{RequeueAfter: requeueInterval}, nil
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *SqlReconciler) addRequeueIntervalOffset(duration time.Duration) time.Duration {
|
|
if r.RequeueMaxOffset > 0 {
|
|
return duration + time.Duration(rand.Int63()%int64(r.RequeueMaxOffset))
|
|
}
|
|
return duration
|
|
}
|
|
|
|
func waitForMariaDB(ctx context.Context, client ctrlclient.Client, mdb interfaces.MariaDBObject,
|
|
logSql bool) (ctrl.Result, error) {
|
|
|
|
kind := mdb.GetObjectKind()
|
|
if kind.GroupVersionKind().Kind == mariadbv1alpha1.ExternalMariaDBKind {
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
healthy, err := health.IsStatefulSetHealthy(
|
|
ctx,
|
|
client,
|
|
ctrlclient.ObjectKeyFromObject(mdb),
|
|
health.WithDesiredReplicas(mdb.GetReplicas()),
|
|
health.WithPort(mdb.GetPort()),
|
|
health.WithEndpointPolicy(health.EndpointPolicyAll),
|
|
)
|
|
if err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
if !healthy {
|
|
if logSql {
|
|
log.FromContext(ctx).V(1).Info("MariaDB unhealthy. Requeuing SQL resource")
|
|
}
|
|
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|