diff --git a/api/v1alpha1/etcdcluster_types.go b/api/v1alpha1/etcdcluster_types.go index f007ae04..e4d06d25 100644 --- a/api/v1alpha1/etcdcluster_types.go +++ b/api/v1alpha1/etcdcluster_types.go @@ -185,6 +185,124 @@ type EtcdClusterStatus struct { // +listType=map // +listMapKey=type Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` + + // Recovery captures the state of an in-progress (or last attempted) automatic + // disaster recovery from quorum loss. It is managed entirely by the operator's + // quorum-loss recovery state machine and is nil when no recovery has ever been + // attempted. + // +optional + Recovery *RecoveryStatus `json:"recovery,omitempty"` +} + +// RecoveryPhase enumerates the stages of the automatic quorum-loss recovery +// state machine. The phases form a strict, idempotent progression so that the +// controller can resume recovery safely after a restart or a transient error. +type RecoveryPhase string + +const ( + // RecoveryPhaseDetecting means the controller has observed a candidate + // quorum-loss event but has not yet confirmed it is sustained (it may still + // be a transient blip that self-heals before the grace window elapses). + RecoveryPhaseDetecting RecoveryPhase = "Detecting" + // RecoveryPhaseRebuilding means sustained quorum loss was confirmed and the + // controller is rebuilding a single-member cluster from a surviving member + // using --force-new-cluster. + RecoveryPhaseRebuilding RecoveryPhase = "Rebuilding" + // RecoveryPhaseScalingOut means the single-member cluster is healthy again + // and the controller is re-adding the remaining members one at a time via + // the normal learner-add path. + RecoveryPhaseScalingOut RecoveryPhase = "ScalingOut" + // RecoveryPhaseCompleted means the cluster was restored to its desired size + // and quorum. + RecoveryPhaseCompleted RecoveryPhase = "Completed" +) + +// RecoveryStatus records the progress of the quorum-loss recovery state machine. +type RecoveryStatus struct { + // Phase is the current stage of the recovery state machine. + // +optional + Phase RecoveryPhase `json:"phase,omitempty"` + + // SurvivorOrdinal is the StatefulSet pod ordinal whose data directory was + // chosen as the surviving source of truth for the rebuild. It is always 0 + // today (the operator keeps ordinal-0's PVC) but is recorded explicitly so + // the choice is auditable and future survivor-selection policies remain + // backward compatible. + // +optional + SurvivorOrdinal int32 `json:"survivorOrdinal,omitempty"` + + // DetectedTime is the first time a sustained-quorum-loss candidate was + // observed. It anchors the grace window used to distinguish true quorum loss + // from transient single-member failures. + // +optional + DetectedTime *metav1.Time `json:"detectedTime,omitempty"` + + // LastTransitionTime is the time the recovery phase last changed. + // +optional + LastTransitionTime *metav1.Time `json:"lastTransitionTime,omitempty"` + + // Message is a human-readable description of the current recovery step. + // +optional + Message string `json:"message,omitempty"` + + // Attempts is the number of times the operator has committed to a destructive + // rebuild for this cluster (i.e. entered the Rebuilding phase from Detecting). + // It is a monotonically increasing counter that survives across recoveries and + // is never reset, giving operators a durable signal of how often this cluster + // has needed disaster recovery — repeated recoveries usually point at an + // underlying infrastructure problem rather than a one-off event. + // +optional + Attempts int32 `json:"attempts,omitempty"` + + // DataLoss records the data-loss accounting for the most recent rebuild. + // + // Quorum-loss recovery via --force-new-cluster is NOT a lossless operation: + // the rebuilt cluster retains only the writes that were committed to the + // surviving member's local data directory. Any write that a now-destroyed + // majority had committed but had not yet replicated to the survivor is GONE. + // This field surfaces that fact explicitly (alongside a Warning Event, the + // DataLossPossible condition, and structured logs) so the loss is auditable + // and never silent. It is nil until a rebuild from a survivor completes. + // +optional + DataLoss *DataLossInfo `json:"dataLoss,omitempty"` +} + +// DataLossInfo captures what the operator knows about the data retained by — and +// therefore the data potentially lost during — a force-new-cluster rebuild. +// +// The operator cannot enumerate exactly which keys were lost (the members that +// held the un-replicated writes are gone), so this records the provable lower +// bound on retained state: the survivor's identity and its last committed +// revision. Everything the destroyed majority committed beyond SurvivorRevision +// is unrecoverable. +type DataLossInfo struct { + // SurvivorMemberID is the hex-encoded etcd member ID of the survivor whose + // data directory was used to rebuild the cluster. + // +optional + SurvivorMemberID string `json:"survivorMemberID,omitempty"` + + // SurvivorRevision is the key-value store revision present on the survivor at + // the moment the single-member cluster came back healthy. It is the highest + // revision guaranteed to be retained; any revision the lost majority committed + // above this value did not survive the rebuild. + // +optional + SurvivorRevision int64 `json:"survivorRevision,omitempty"` + + // RaftIndex is the survivor's raft committed index at rebuild time, recorded + // for forensic correlation with member logs. + // +optional + RaftIndex uint64 `json:"raftIndex,omitempty"` + + // RecoveredTime is when the rebuilt single-member cluster was confirmed + // healthy and this accounting was captured. + // +optional + RecoveredTime *metav1.Time `json:"recoveredTime,omitempty"` + + // Message is a human-readable, operator-facing summary of the data-loss + // situation, e.g. "recovered with possible data loss; rebuilt from member + // at revision ". + // +optional + Message string `json:"message,omitempty"` } // MemberStatus defines the observed state of a single etcd member. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 9db6ca12..7cd81c81 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -23,7 +23,7 @@ package v1alpha1 import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime" netx "net" ) @@ -79,6 +79,25 @@ func (in *CommonConfig) DeepCopy() *CommonConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DataLossInfo) DeepCopyInto(out *DataLossInfo) { + *out = *in + if in.RecoveredTime != nil { + in, out := &in.RecoveredTime, &out.RecoveredTime + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataLossInfo. +func (in *DataLossInfo) DeepCopy() *DataLossInfo { + if in == nil { + return nil + } + out := new(DataLossInfo) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EtcdCluster) DeepCopyInto(out *EtcdCluster) { *out = *in @@ -188,6 +207,11 @@ func (in *EtcdClusterStatus) DeepCopyInto(out *EtcdClusterStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Recovery != nil { + in, out := &in.Recovery, &out.Recovery + *out = new(RecoveryStatus) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EtcdClusterStatus. @@ -360,6 +384,34 @@ func (in *ProviderConfig) DeepCopy() *ProviderConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RecoveryStatus) DeepCopyInto(out *RecoveryStatus) { + *out = *in + if in.DetectedTime != nil { + in, out := &in.DetectedTime, &out.DetectedTime + *out = (*in).DeepCopy() + } + if in.LastTransitionTime != nil { + in, out := &in.LastTransitionTime, &out.LastTransitionTime + *out = (*in).DeepCopy() + } + if in.DataLoss != nil { + in, out := &in.DataLoss, &out.DataLoss + *out = new(DataLossInfo) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RecoveryStatus. +func (in *RecoveryStatus) DeepCopy() *RecoveryStatus { + if in == nil { + return nil + } + out := new(RecoveryStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StorageSpec) DeepCopyInto(out *StorageSpec) { *out = *in diff --git a/config/crd/bases/operator.etcd.io_etcdclusters.yaml b/config/crd/bases/operator.etcd.io_etcdclusters.yaml index 0ae1e80e..8fb78397 100644 --- a/config/crd/bases/operator.etcd.io_etcdclusters.yaml +++ b/config/crd/bases/operator.etcd.io_etcdclusters.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.20.1 + controller-gen.kubebuilder.io/version: v0.21.0 name: etcdclusters.operator.etcd.io spec: group: operator.etcd.io @@ -1332,6 +1332,97 @@ spec: This reflects the .status.readyReplicas of the underlying StatefulSet. format: int32 type: integer + recovery: + description: |- + Recovery captures the state of an in-progress (or last attempted) automatic + disaster recovery from quorum loss. It is managed entirely by the operator's + quorum-loss recovery state machine and is nil when no recovery has ever been + attempted. + properties: + attempts: + description: |- + Attempts is the number of times the operator has committed to a destructive + rebuild for this cluster (i.e. entered the Rebuilding phase from Detecting). + It is a monotonically increasing counter that survives across recoveries and + is never reset, giving operators a durable signal of how often this cluster + has needed disaster recovery — repeated recoveries usually point at an + underlying infrastructure problem rather than a one-off event. + format: int32 + type: integer + dataLoss: + description: |- + DataLoss records the data-loss accounting for the most recent rebuild. + + Quorum-loss recovery via --force-new-cluster is NOT a lossless operation: + the rebuilt cluster retains only the writes that were committed to the + surviving member's local data directory. Any write that a now-destroyed + majority had committed but had not yet replicated to the survivor is GONE. + This field surfaces that fact explicitly (alongside a Warning Event, the + DataLossPossible condition, and structured logs) so the loss is auditable + and never silent. It is nil until a rebuild from a survivor completes. + properties: + message: + description: |- + Message is a human-readable, operator-facing summary of the data-loss + situation, e.g. "recovered with possible data loss; rebuilt from member + at revision ". + type: string + raftIndex: + description: |- + RaftIndex is the survivor's raft committed index at rebuild time, recorded + for forensic correlation with member logs. + format: int64 + type: integer + recoveredTime: + description: |- + RecoveredTime is when the rebuilt single-member cluster was confirmed + healthy and this accounting was captured. + format: date-time + type: string + survivorMemberID: + description: |- + SurvivorMemberID is the hex-encoded etcd member ID of the survivor whose + data directory was used to rebuild the cluster. + type: string + survivorRevision: + description: |- + SurvivorRevision is the key-value store revision present on the survivor at + the moment the single-member cluster came back healthy. It is the highest + revision guaranteed to be retained; any revision the lost majority committed + above this value did not survive the rebuild. + format: int64 + type: integer + type: object + detectedTime: + description: |- + DetectedTime is the first time a sustained-quorum-loss candidate was + observed. It anchors the grace window used to distinguish true quorum loss + from transient single-member failures. + format: date-time + type: string + lastTransitionTime: + description: LastTransitionTime is the time the recovery phase + last changed. + format: date-time + type: string + message: + description: Message is a human-readable description of the current + recovery step. + type: string + phase: + description: Phase is the current stage of the recovery state + machine. + type: string + survivorOrdinal: + description: |- + SurvivorOrdinal is the StatefulSet pod ordinal whose data directory was + chosen as the surviving source of truth for the rebuild. It is always 0 + today (the operator keeps ordinal-0's PVC) but is recorded explicitly so + the choice is auditable and future survivor-selection policies remain + backward compatible. + format: int32 + type: integer + type: object type: object type: object served: true diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 6c9ac65e..edc824ae 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -28,6 +28,14 @@ rules: - list - patch - update +- apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - watch - apiGroups: - apps resources: diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index 867886a3..1b60027b 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -50,6 +50,21 @@ type EtcdClusterReconciler struct { Scheme *runtime.Scheme Recorder events.EventRecorder ImageRegistry string + + // clusterHealthFn probes the health of the given endpoints. It defaults to + // etcdutils.ClusterHealth and exists as a seam so the recovery state machine's + // survivor-health gate can be unit-tested without a live etcd. Resolved lazily + // via clusterHealth; nil means "use the real implementation". + clusterHealthFn func(eps []string) ([]etcdutils.EpHealth, error) +} + +// clusterHealth probes endpoint health via the injected seam, falling back to the +// real implementation when unset (the production path). +func (r *EtcdClusterReconciler) clusterHealth(eps []string) ([]etcdutils.EpHealth, error) { + if r.clusterHealthFn != nil { + return r.clusterHealthFn(eps) + } + return etcdutils.ClusterHealth(eps) } // reconcileState holds all transient data for a single reconciliation loop. @@ -68,6 +83,9 @@ type reconcileState struct { // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete +// Quorum-loss recovery reads the survivor pod (cached client => list+watch) to +// confirm it exists before arming the irreversible --force-new-cluster rebuild. +// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch;get;list;update // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;patch;update;delete // +kubebuilder:rbac:groups="cert-manager.io",resources=certificates,verbs=get;list;watch;create;patch;update;delete @@ -105,8 +123,25 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return res, err } - if err = r.performHealthChecks(ctx, state); err != nil { - return ctrl.Result{}, err + healthErr := r.performHealthChecks(ctx, state) + + // Quorum-loss recovery gate. A failed health check on a multi-member cluster + // can mean the cluster has permanently lost quorum (a majority of members are + // gone) and cannot self-heal. maybeRecoverQuorum inspects the observed member + // health, and — only on sustained, true quorum loss — drives an idempotent + // disaster-recovery state machine (rebuild-from-survivor + re-add members). + // While it owns the reconcile (handled=true) we requeue and skip normal + // scaling so the two paths never fight. See quorum_recovery.go. + if handled, requeueAfter, recErr := r.maybeRecoverQuorum(ctx, state, state.memberHealth, healthErr); handled || recErr != nil { + return ctrl.Result{RequeueAfter: requeueAfter}, recErr + } + + // During recovery scale-out the recovery state machine delegates membership + // re-adds to reconcileClusterState below; a transient per-member health error + // (e.g. a freshly added learner not yet caught up) must NOT short-circuit that + // path, or recovery would stall. Outside recovery, a health error is fatal. + if healthErr != nil && !recoveryActive(state.cluster) { + return ctrl.Result{}, healthErr } return r.reconcileClusterState(ctx, state) diff --git a/internal/controller/quorum_recovery.go b/internal/controller/quorum_recovery.go new file mode 100644 index 00000000..6e021217 --- /dev/null +++ b/internal/controller/quorum_recovery.go @@ -0,0 +1,624 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +// Quorum-loss disaster recovery. +// +// An etcd cluster makes progress only while a majority (quorum) of its voting +// members are reachable and can elect a leader. If a majority is permanently +// lost — e.g. two of three members' pods AND their data volumes are destroyed — +// the cluster can never elect a leader again on its own. etcd's documented +// disaster-recovery procedure for this is: +// +// 1. Take the data directory of a SURVIVING member. +// 2. Restart that one member with `--force-new-cluster`, which rewrites its +// raft membership to contain only itself, producing a healthy +// single-member cluster that retains all committed key/value data. +// 3. Re-add the other members one at a time. +// +// This file implements that procedure as an idempotent controller state machine +// that maps cleanly onto the operator's StatefulSet model: +// +// - The survivor is always pod ordinal 0, whose PVC the operator preserves. +// - "Restart pod-0 with --force-new-cluster" is expressed by patching the +// StatefulSet down to a single replica and injecting the +// `--force-new-cluster` flag (plus an existing-state config) onto the etcd +// container, then rolling pod-0. +// - Once pod-0 is a healthy single-member cluster, the flag is removed and the +// EXISTING reconcile loop's scale-out path (learner add + promote) rebuilds +// the cluster back up to Spec.Size. We deliberately reuse that path rather +// than duplicating membership logic here. +// +// Detection is guarded so it ONLY fires on true quorum loss: +// +// - A MAJORITY of the expected members must be unreachable / no leader must be +// electable. A single failed member out of three never triggers recovery — +// the cluster still has quorum and self-heals via normal reconciliation. +// - The condition must persist for quorumLossGracePeriod. A transient blip +// (rolling restart, brief network partition, node reboot) that clears within +// the window is ignored. The first-observed time is persisted on status so +// the window survives controller restarts. + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + ecv1alpha1 "go.etcd.io/etcd-operator/api/v1alpha1" + "go.etcd.io/etcd-operator/internal/etcdutils" + clientv3 "go.etcd.io/etcd/client/v3" +) + +const ( + // quorumLossGracePeriod is how long a candidate quorum-loss condition must + // persist before the controller commits to a disruptive rebuild. It must be + // comfortably longer than a normal rolling restart / leader election so we + // never recover a cluster that was about to heal on its own. + quorumLossGracePeriod = 60 * time.Second + + // forceNewClusterArg is the etcd flag that rewrites a member's raft + // membership to contain only itself, bootstrapping a new single-member + // cluster from existing data. + forceNewClusterArg = "--force-new-cluster" + + // recoveryForceNewClusterAnnotation is set on the StatefulSet while a rebuild + // is in progress. It is the single source of truth for "is --force-new-cluster + // currently injected", making the rebuild step idempotent and observable with + // `kubectl get sts -o yaml` even mid-recovery. + recoveryForceNewClusterAnnotation = "operator.etcd.io/force-new-cluster" + + // ConditionRecovering is the status condition type the operator raises while + // a quorum-loss recovery is in progress. + ConditionRecovering = "Recovering" + + // ConditionDataLossPossible is the status condition type the operator raises + // when it rebuilds a cluster from a single survivor. The rebuild retains only + // the data committed to that survivor, so any write the destroyed majority had + // committed but not yet replicated to the survivor is lost. This condition is + // set True and LEFT True after a rebuild so the loss remains visible long after + // the (transient) Recovering condition clears — it is an audit marker, not a + // liveness signal, and a human is expected to acknowledge it. + ConditionDataLossPossible = "DataLossPossible" +) + +// Event reasons surfaced on the EtcdCluster object during recovery. +const ( + EventReasonQuorumLossDetected = "QuorumLossDetected" + EventReasonRecoveryStarted = "RecoveryStarted" + EventReasonRecoveryRebuilding = "RecoveryRebuilding" + EventReasonRecoveryScalingOut = "RecoveryScalingOut" + EventReasonRecoveryCompleted = "RecoveryCompleted" + // EventReasonPossibleDataLoss is a Warning surfaced LOUDLY when a rebuild + // from a survivor completes. It is the user-visible counterpart of the + // DataLossPossible condition and the DataLoss status accounting. + EventReasonPossibleDataLoss = "PossibleDataLoss" +) + +// quorumAssessment is the outcome of inspecting member health for quorum loss. +type quorumAssessment struct { + // lost is true when a majority of expected members are unreachable AND no + // leader is electable — i.e. the cluster cannot make progress on its own. + lost bool + // expected is the desired voting-member count (cluster Spec.Size). + expected int + // reachable is the number of members that answered a status/health probe. + reachable int + // hasLeader is true if any reachable member reports a current leader. + hasLeader bool + // reason is a short human-readable explanation, used in logs/events. + reason string +} + +// assessQuorum is a pure function (no I/O) that decides whether the observed +// member health represents true quorum loss for a cluster of the given desired +// size. It is the single, table-testable decision point for detection. +// +// memberListErr is the error (if any) returned by the etcd member-list call; +// when the whole cluster is down this call fails and health is empty. We treat a +// failed member list combined with zero reachable members as a strong +// quorum-loss signal, but still require the majority-unreachable arithmetic to +// hold so a 1-node cluster's single restart is never misclassified. +func assessQuorum(desiredSize int, health []etcdutils.EpHealth, memberListErr error) quorumAssessment { + a := quorumAssessment{expected: desiredSize} + + for i := range health { + // Only count voting members toward quorum. A learner answers health + // probes but cannot vote in an election, so a healthy learner does not + // contribute to the quorum arithmetic. This mirrors countHealthyVoting. + if health[i].Health && (health[i].Status == nil || !health[i].Status.IsLearner) { + a.reachable++ + } + if health[i].Status != nil && health[i].Status.Leader != 0 { + a.hasLeader = true + } + } + + // Quorum for an N-member cluster is floor(N/2)+1. The cluster has lost quorum + // when fewer than that many members are reachable. + quorum := desiredSize/2 + 1 + + // A single-member cluster has a quorum of 1 and no fault tolerance; a "down" + // single member is an ordinary pod restart, not a disaster we can recover + // from another survivor. Never auto-rebuild a size-1 cluster. + if desiredSize <= 1 { + a.reason = "single-member cluster has no quorum-loss recovery path" + return a + } + + // If a leader is visible, quorum exists by definition — do not recover. + if a.hasLeader { + a.reason = fmt.Sprintf("leader present, %d/%d members reachable", a.reachable, desiredSize) + return a + } + + if a.reachable >= quorum { + // Enough members are up to (re)elect a leader; this is a transient + // leaderless window, not quorum loss. Let normal reconciliation wait it out. + a.reason = fmt.Sprintf("no leader yet but %d/%d members reachable (quorum %d) — electable", a.reachable, desiredSize, quorum) + return a + } + + // Majority unreachable and no leader: true quorum loss. + a.lost = true + if memberListErr != nil { + a.reason = fmt.Sprintf("quorum lost: %d/%d members reachable (quorum %d), member list failed: %v", + a.reachable, desiredSize, quorum, memberListErr) + } else { + a.reason = fmt.Sprintf("quorum lost: %d/%d members reachable (quorum %d), no leader electable", + a.reachable, desiredSize, quorum) + } + return a +} + +// recoveryActive reports whether a COMMITTED recovery is currently in flight, +// i.e. the controller has actually started rebuilding (Rebuilding or ScalingOut). +// The Detecting phase is deliberately excluded: during the grace window the +// cluster has only a *suspected* quorum loss that may still self-heal, so normal +// reconciliation must keep running and detection must remain cancellable. +// Callers use this to short-circuit normal reconciliation so scale logic doesn't +// fight the recovery state machine once a rebuild is underway. +func recoveryActive(ec *ecv1alpha1.EtcdCluster) bool { + r := ec.Status.Recovery + if r == nil { + return false + } + return r.Phase == ecv1alpha1.RecoveryPhaseRebuilding || r.Phase == ecv1alpha1.RecoveryPhaseScalingOut +} + +// maybeRecoverQuorum is the single entry point the controller calls from its +// health/reconcile path. It returns handled=true when it has taken ownership of +// this reconcile (the caller should requeue and not run normal scaling), along +// with the requeue delay. It is fully idempotent: calling it repeatedly with the +// same observed state advances the state machine by at most one safe step. +// +// health/memberListErr describe what the health check observed this loop. When a +// recovery is already in flight, those observations are re-derived against the +// current single-member cluster. +func (r *EtcdClusterReconciler) maybeRecoverQuorum( + ctx context.Context, + s *reconcileState, + health []etcdutils.EpHealth, + memberListErr error, +) (handled bool, requeueAfter time.Duration, err error) { + logger := log.FromContext(ctx).WithName("quorum-recovery") + + // If a recovery is already running, drive it forward regardless of the + // detection heuristic (the cluster is intentionally degraded mid-rebuild). + if recoveryActive(s.cluster) { + return r.advanceRecovery(ctx, logger, s, memberListErr) + } + + a := assessQuorum(s.cluster.Spec.Size, health, memberListErr) + if !a.lost { + // Healthy or transiently-degraded: clear any stale detection timestamp so + // a future blip starts its grace window fresh. + if s.cluster.Status.Recovery != nil && s.cluster.Status.Recovery.Phase == ecv1alpha1.RecoveryPhaseDetecting { + logger.Info("Candidate quorum loss cleared before grace period elapsed; cancelling detection", "detail", a.reason) + s.cluster.Status.Recovery = nil + meta.RemoveStatusCondition(&s.cluster.Status.Conditions, ConditionRecovering) + } + return false, 0, nil + } + + // Quorum loss observed. Start (or continue) the grace-period clock. + now := metav1.Now() + if s.cluster.Status.Recovery == nil || s.cluster.Status.Recovery.Phase != ecv1alpha1.RecoveryPhaseDetecting { + logger.Info("Candidate quorum loss observed; starting grace period before recovery", "grace", quorumLossGracePeriod, "detail", a.reason) + s.cluster.Status.Recovery = &ecv1alpha1.RecoveryStatus{ + Phase: ecv1alpha1.RecoveryPhaseDetecting, + DetectedTime: &now, + LastTransitionTime: &now, + Message: a.reason, + } + r.setRecoveringCondition(s.cluster, metav1.ConditionFalse, "QuorumLossSuspected", a.reason) + r.eventf(s.cluster, corev1.EventTypeWarning, EventReasonQuorumLossDetected, + "Suspected quorum loss: %s. Waiting %s to confirm before recovery.", a.reason, quorumLossGracePeriod) + return true, requeueDuration, nil + } + + // Already in Detecting: has the condition persisted long enough? + detected := s.cluster.Status.Recovery.DetectedTime + if detected != nil && now.Sub(detected.Time) < quorumLossGracePeriod { + remaining := quorumLossGracePeriod - now.Sub(detected.Time) + logger.Info("Quorum loss still within grace period; not recovering yet", "remaining", remaining.Round(time.Second), "detail", a.reason) + return true, requeueDuration, nil + } + + // Grace period elapsed and quorum is still lost: commit to recovery. This is + // the one place we cross from "suspected" to "destructive action", so it is + // where the durable attempt counter is bumped. + s.cluster.Status.Recovery.Attempts++ + logger.Info("Sustained quorum loss confirmed; initiating disaster recovery", + "detail", a.reason, "attempt", s.cluster.Status.Recovery.Attempts) + r.eventf(s.cluster, corev1.EventTypeWarning, EventReasonRecoveryStarted, + "Quorum loss sustained for %s (recovery attempt #%d); rebuilding cluster from survivor pod %s-0. WARNING: rebuilding from a single survivor may lose writes not yet replicated to it.", + quorumLossGracePeriod, s.cluster.Status.Recovery.Attempts, s.cluster.Name) + r.transitionRecovery(s.cluster, ecv1alpha1.RecoveryPhaseRebuilding, + fmt.Sprintf("rebuilding single-member cluster from survivor ordinal 0 (%s)", a.reason)) + return r.advanceRecovery(ctx, logger, s, memberListErr) +} + +// advanceRecovery executes one step of the recovery state machine based on the +// persisted phase. Each branch is idempotent. +func (r *EtcdClusterReconciler) advanceRecovery( + ctx context.Context, + logger logr.Logger, + s *reconcileState, + memberListErr error, +) (bool, time.Duration, error) { + switch s.cluster.Status.Recovery.Phase { + case ecv1alpha1.RecoveryPhaseRebuilding: + return r.recoveryRebuild(ctx, logger, s) + case ecv1alpha1.RecoveryPhaseScalingOut: + return r.recoveryScaleOut(logger, s, memberListErr) + default: + // Detecting handled by caller; Completed/empty means nothing to do. + return false, 0, nil + } +} + +// recoveryRebuild forces pod-0 to bootstrap a fresh single-member cluster from +// its surviving data directory. It is idempotent across these sub-steps: +// +// 1. Patch the StatefulSet to 1 replica and inject --force-new-cluster (marked +// by recoveryForceNewClusterAnnotation), rolling pod-0. +// 2. Wait until the single member is healthy and reports itself as leader. +// 3. Remove --force-new-cluster (so a future pod-0 restart doesn't re-fork the +// cluster) and advance to ScalingOut. +func (r *EtcdClusterReconciler) recoveryRebuild( + ctx context.Context, + logger logr.Logger, + s *reconcileState, +) (bool, time.Duration, error) { + sts := s.sts + + if !stsHasForceNewCluster(sts) { + // Safety gate before the destructive flag goes on: confirm the survivor pod + // actually exists. --force-new-cluster is irreversible (it rewrites raft + // membership), so we must not arm it for a pod the StatefulSet has not yet + // created — doing so against a not-yet-scheduled / freshly-replaced ordinal-0 + // risks forking from an empty data dir. If the pod is absent this loop, hold + // and requeue; the StatefulSet will (re)create it. + survivorPodName := fmt.Sprintf("%s-%d", sts.Name, s.cluster.Status.Recovery.SurvivorOrdinal) + var survivor corev1.Pod + if err := r.Get(ctx, client.ObjectKey{Namespace: sts.Namespace, Name: survivorPodName}, &survivor); err != nil { + if apierrors.IsNotFound(err) { + logger.Info("Rebuild step 1: survivor pod not present yet; holding before arming --force-new-cluster", + "survivorPod", survivorPodName) + return true, requeueDuration, nil + } + return true, 0, fmt.Errorf("failed to read survivor pod %s/%s before rebuild: %w", sts.Namespace, survivorPodName, err) + } + + logger.Info("Rebuild step 1: scaling StatefulSet to single survivor and injecting --force-new-cluster", + "survivorOrdinal", s.cluster.Status.Recovery.SurvivorOrdinal, "survivorPod", survivorPodName) + r.eventf(s.cluster, corev1.EventTypeWarning, EventReasonRecoveryRebuilding, + "Rebuilding: restarting %s with --force-new-cluster from surviving data.", survivorPodName) + // Realign the state ConfigMap with the single-member bootstrap. The + // ConfigMap is consumed by pod-0 via EnvFrom and otherwise still advertises + // a multi-member "existing" cluster, which contradicts --force-new-cluster. + // --force-new-cluster rewrites membership from pod-0's surviving data dir + // and ignores ETCD_INITIAL_CLUSTER*, so a surviving data dir masks the + // inconsistency — but on an empty/replaced PVC etcd would otherwise try to + // bootstrap a fresh 3-member "existing" cluster and hang. Rewriting to a + // single-member "new" state removes that footgun; the scale-out path + // restores the multi-member state per member as it re-adds them. + if err := applyEtcdClusterState(ctx, s.cluster, 1, r.Client, r.Scheme, logger); err != nil { + return true, 0, fmt.Errorf("failed to reset cluster state ConfigMap for rebuild: %w", err) + } + if err := r.patchStatefulSetForForceNewCluster(ctx, sts, true); err != nil { + return true, 0, fmt.Errorf("failed to inject --force-new-cluster: %w", err) + } + // Give the pod time to roll and bootstrap. + return true, requeueDuration, nil + } + + // Force-new-cluster is injected; check whether the single survivor is healthy. + singleEndpoint := []string{clientEndpointForOrdinalIndex(sts, 0)} + health, healthErr := r.clusterHealth(singleEndpoint) + if healthErr != nil || len(health) == 0 || !health[0].Health || health[0].Status == nil { + logger.Info("Rebuild step 2: waiting for survivor to bootstrap single-member cluster", + "healthErr", healthErr) + return true, requeueDuration, nil + } + + st := health[0].Status + if st.Leader == 0 || st.Leader != st.Header.MemberId { + logger.Info("Rebuild step 2: survivor up but not yet self-leader; waiting", + "leader", st.Leader, "self", st.Header.MemberId) + return true, requeueDuration, nil + } + + // Single-member cluster is healthy and is its own leader. Before we move on, + // account for the data loss this rebuild may have incurred: the cluster now + // contains exactly what THIS survivor had on disk. Anything the destroyed + // majority committed beyond the survivor's revision is gone. Surface that + // loudly and durably — Event + condition + structured log + status — so it is + // never silent. Record it once (the first time we confirm the survivor) so a + // requeue can't overwrite the captured revision with a later, post-write one. + if s.cluster.Status.Recovery.DataLoss == nil { + r.recordPossibleDataLoss(logger, s.cluster, st) + } + + // Drop the flag so the pod won't re-fork on its next restart, then move to + // scale-out. + logger.Info("Rebuild complete: survivor is a healthy single-member cluster; removing --force-new-cluster") + if err := r.patchStatefulSetForForceNewCluster(ctx, sts, false); err != nil { + return true, 0, fmt.Errorf("failed to remove --force-new-cluster after rebuild: %w", err) + } + r.transitionRecovery(s.cluster, ecv1alpha1.RecoveryPhaseScalingOut, + "single-member cluster restored; re-adding members one at a time") + r.eventf(s.cluster, corev1.EventTypeNormal, EventReasonRecoveryScalingOut, + "Survivor healthy; re-adding members to reach desired size %d.", s.cluster.Spec.Size) + return true, requeueDuration, nil +} + +// recordPossibleDataLoss captures and LOUDLY surfaces the data-loss accounting +// for a completed rebuild. The survivor's status (st) is the source of truth for +// what was retained: its member ID and the highest revision committed to its +// local store. Anything the destroyed majority committed above that revision did +// not survive --force-new-cluster and is unrecoverable. +// +// This is deliberately multi-channel so the loss cannot be missed: +// - status.recovery.dataLoss: machine-readable accounting that persists. +// - DataLossPossible condition (True): visible in `kubectl get`/`describe`, +// left True until a human acknowledges it. +// - a Warning Event: shows up in `kubectl describe` and event streams/alerts. +// - a structured log at the operator. +func (r *EtcdClusterReconciler) recordPossibleDataLoss( + logger logr.Logger, + ec *ecv1alpha1.EtcdCluster, + st *clientv3.StatusResponse, +) { + now := metav1.Now() + + var ( + memberID string + revision int64 + raftIdx uint64 + ) + if st.Header != nil { + memberID = fmt.Sprintf("%x", st.Header.MemberId) + revision = st.Header.Revision + } + raftIdx = st.RaftIndex + + msg := fmt.Sprintf( + "recovered with possible data loss; rebuilt from survivor member %s (ordinal %d) at revision %d. "+ + "Writes committed by the lost majority above revision %d were NOT retained.", + memberID, ec.Status.Recovery.SurvivorOrdinal, revision, revision) + + ec.Status.Recovery.DataLoss = &ecv1alpha1.DataLossInfo{ + SurvivorMemberID: memberID, + SurvivorRevision: revision, + RaftIndex: raftIdx, + RecoveredTime: &now, + Message: msg, + } + + // Audit marker: set True and leave it True. This is not a liveness signal; + // it records that a lossy recovery happened and is awaiting human review. + meta.SetStatusCondition(&ec.Status.Conditions, metav1.Condition{ + Type: ConditionDataLossPossible, + Status: metav1.ConditionTrue, + ObservedGeneration: ec.Generation, + Reason: "RebuiltFromSurvivor", + Message: msg, + }) + + logger.Info("POSSIBLE DATA LOSS during quorum-loss recovery", + "survivorMemberID", memberID, + "survivorRevision", revision, + "survivorRaftIndex", raftIdx, + "survivorOrdinal", ec.Status.Recovery.SurvivorOrdinal, + "detail", msg) + + r.eventf(ec, corev1.EventTypeWarning, EventReasonPossibleDataLoss, "%s", msg) +} + +// recoveryScaleOut checks whether the cluster has been rebuilt to its desired +// size and quorum. It does NOT itself add members — it hands control back to the +// normal reconcile path (which owns the learner add/promote logic) and only +// declares completion once Spec.Size healthy voting members exist. This keeps +// all membership-change code in one place. +func (r *EtcdClusterReconciler) recoveryScaleOut( + logger logr.Logger, + s *reconcileState, + memberListErr error, +) (bool, time.Duration, error) { + // Guard the delegation to reconcileClusterState. That path derives the + // member count from s.memberListResp and, when it is nil, treats the cluster + // as having zero members — which makes targetReplica != memberCnt scale the + // StatefulSet DOWN, removing a pod mid-rebuild and reversing recovery. A + // transient member-list failure (rolling learner pod, DNS hiccup) must NOT be + // interpreted as "a member was removed", so we keep ownership of the loop and + // requeue until the observation is usable again. + if memberListErr != nil || s.memberListResp == nil { + logger.Info("Recovery scale-out: member list unavailable this loop; holding instead of delegating", + "memberListErr", memberListErr) + return true, requeueDuration, nil + } + + healthy, leaderPresent := countHealthyVoting(s.memberHealth) + + if leaderPresent && healthy >= s.cluster.Spec.Size { + logger.Info("Recovery complete: cluster restored to desired size with quorum", + "healthyVoting", healthy, "desiredSize", s.cluster.Spec.Size) + r.transitionRecovery(s.cluster, ecv1alpha1.RecoveryPhaseCompleted, + fmt.Sprintf("recovered to %d healthy members with quorum", healthy)) + r.setRecoveringCondition(s.cluster, metav1.ConditionFalse, "RecoveryCompleted", + fmt.Sprintf("Cluster recovered to %d healthy members.", healthy)) + r.eventf(s.cluster, corev1.EventTypeNormal, EventReasonRecoveryCompleted, + "Quorum-loss recovery complete: %d healthy members with quorum.", healthy) + // Hand back to normal reconciliation for this and subsequent loops. + return false, 0, nil + } + + // Not yet at desired size: let the normal scale-out path run this loop (it + // adds at most one learner per loop). We stay in ScalingOut but return + // handled=false so reconcileClusterState executes. + logger.Info("Recovery scale-out in progress; delegating member re-add to normal reconcile path", + "healthyVoting", healthy, "desiredSize", s.cluster.Spec.Size, "leaderPresent", leaderPresent) + return false, 0, nil +} + +// countHealthyVoting returns the number of healthy, non-learner members and +// whether a leader is present among them. +func countHealthyVoting(health []etcdutils.EpHealth) (healthy int, leaderPresent bool) { + for i := range health { + h := health[i] + if !h.Health || h.Status == nil { + continue + } + if h.Status.IsLearner { + continue + } + healthy++ + if h.Status.Leader != 0 { + leaderPresent = true + } + } + return healthy, leaderPresent +} + +// stsHasForceNewCluster reports whether the StatefulSet currently carries the +// recovery marker annotation (and therefore the --force-new-cluster flag). +func stsHasForceNewCluster(sts *appsv1.StatefulSet) bool { + if sts == nil || sts.Annotations == nil { + return false + } + return sts.Annotations[recoveryForceNewClusterAnnotation] == "true" +} + +// patchStatefulSetForForceNewCluster toggles single-member rebuild mode on the +// StatefulSet: it sets replicas to 1, adds/removes the --force-new-cluster flag +// on the etcd container, and stamps/clears the marker annotation. It patches the +// live object in place so the change is minimal and survives operator restarts. +func (r *EtcdClusterReconciler) patchStatefulSetForForceNewCluster( + ctx context.Context, + sts *appsv1.StatefulSet, + enable bool, +) error { + base := sts.DeepCopy() + + if sts.Annotations == nil { + sts.Annotations = map[string]string{} + } + + one := int32(1) + if len(sts.Spec.Template.Spec.Containers) == 0 { + return fmt.Errorf("statefulset %s/%s has no containers", sts.Namespace, sts.Name) + } + container := &sts.Spec.Template.Spec.Containers[0] + + if enable { + sts.Spec.Replicas = &one + sts.Annotations[recoveryForceNewClusterAnnotation] = "true" + if !containsArg(container.Args, forceNewClusterArg) { + container.Args = append(container.Args, forceNewClusterArg) + } + } else { + delete(sts.Annotations, recoveryForceNewClusterAnnotation) + container.Args = removeArg(container.Args, forceNewClusterArg) + } + + return r.Patch(ctx, sts, client.MergeFrom(base)) +} + +func containsArg(args []string, target string) bool { + for _, a := range args { + if a == target { + return true + } + } + return false +} + +func removeArg(args []string, target string) []string { + out := args[:0] + for _, a := range args { + if a != target { + out = append(out, a) + } + } + return out +} + +// transitionRecovery moves the recovery state machine to a new phase, stamping +// the transition time and message and keeping the Recovering condition in sync. +func (r *EtcdClusterReconciler) transitionRecovery(ec *ecv1alpha1.EtcdCluster, phase ecv1alpha1.RecoveryPhase, msg string) { + now := metav1.Now() + if ec.Status.Recovery == nil { + ec.Status.Recovery = &ecv1alpha1.RecoveryStatus{DetectedTime: &now} + } + ec.Status.Recovery.Phase = phase + ec.Status.Recovery.LastTransitionTime = &now + ec.Status.Recovery.Message = msg + + if phase != ecv1alpha1.RecoveryPhaseCompleted { + r.setRecoveringCondition(ec, metav1.ConditionTrue, string(phase), msg) + } +} + +// setRecoveringCondition sets the standard Recovering condition on the cluster. +func (r *EtcdClusterReconciler) setRecoveringCondition(ec *ecv1alpha1.EtcdCluster, status metav1.ConditionStatus, reason, msg string) { + meta.SetStatusCondition(&ec.Status.Conditions, metav1.Condition{ + Type: ConditionRecovering, + Status: status, + ObservedGeneration: ec.Generation, + Reason: reason, + Message: msg, + }) +} + +// eventf records an Event on the cluster if a recorder is configured. It is nil-safe +// so unit tests can exercise the state machine without wiring an event recorder. +func (r *EtcdClusterReconciler) eventf(ec *ecv1alpha1.EtcdCluster, eventType, reason, msgFmt string, args ...interface{}) { + if r.Recorder == nil { + return + } + r.Recorder.Eventf(ec, nil, eventType, reason, reason, msgFmt, args...) +} diff --git a/internal/controller/quorum_recovery_test.go b/internal/controller/quorum_recovery_test.go new file mode 100644 index 00000000..f3e0ac49 --- /dev/null +++ b/internal/controller/quorum_recovery_test.go @@ -0,0 +1,592 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/events" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + ecv1alpha1 "go.etcd.io/etcd-operator/api/v1alpha1" + "go.etcd.io/etcd-operator/internal/etcdutils" + "go.etcd.io/etcd/api/v3/etcdserverpb" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// member builds an EpHealth entry for detection tests. leader==0 means the +// member reports no leader. +func member(id uint64, healthy, learner bool, leader uint64) etcdutils.EpHealth { + return etcdutils.EpHealth{ + Ep: "http://etcd-x:2379", + Health: healthy, + Status: &clientv3.StatusResponse{ + Header: &etcdserverpb.ResponseHeader{MemberId: id}, + Leader: leader, + IsLearner: learner, + }, + } +} + +// TestAssessQuorum exercises the pure detection decision across the cases that +// distinguish true quorum loss from transient or recoverable-by-normal-means +// degradation. This is the safety-critical guard: a false positive triggers a +// disruptive rebuild, so the table leans heavily on negative cases. +func TestAssessQuorum(t *testing.T) { + cases := []struct { + name string + desiredSize int + health []etcdutils.EpHealth + memberListErr error + wantLost bool + }{ + { + name: "3-node all healthy with leader => not lost", + desiredSize: 3, + health: []etcdutils.EpHealth{member(1, true, false, 1), member(2, true, false, 1), member(3, true, false, 1)}, + wantLost: false, + }, + { + name: "3-node single member down, quorum intact => not lost", + desiredSize: 3, + health: []etcdutils.EpHealth{member(1, true, false, 1), member(2, true, false, 1), member(3, false, false, 0)}, + wantLost: false, + }, + { + name: "3-node two members down, no leader => quorum LOST", + desiredSize: 3, + health: []etcdutils.EpHealth{member(1, true, false, 0), member(2, false, false, 0), member(3, false, false, 0)}, + wantLost: true, + }, + { + name: "3-node total outage, member list errored => quorum LOST", + desiredSize: 3, + health: nil, + memberListErr: errors.New("context deadline exceeded"), + wantLost: true, + }, + { + name: "3-node leaderless but majority reachable => electable, not lost", + desiredSize: 3, + health: []etcdutils.EpHealth{member(1, true, false, 0), member(2, true, false, 0), member(3, false, false, 0)}, + wantLost: false, + }, + { + name: "1-node down => never auto-recover (no survivor path)", + desiredSize: 1, + health: []etcdutils.EpHealth{member(1, false, false, 0)}, + wantLost: false, + }, + { + name: "1-node total outage with error => still never recover", + desiredSize: 1, health: nil, memberListErr: errors.New("down"), + wantLost: false, + }, + { + name: "5-node three down, no leader => quorum LOST", + desiredSize: 5, + health: []etcdutils.EpHealth{ + member(1, true, false, 0), member(2, true, false, 0), + member(3, false, false, 0), member(4, false, false, 0), member(5, false, false, 0), + }, + wantLost: true, + }, + { + name: "5-node two down => quorum intact, not lost", + desiredSize: 5, + health: []etcdutils.EpHealth{ + member(1, true, false, 1), member(2, true, false, 1), member(3, true, false, 1), + member(4, false, false, 0), member(5, false, false, 0), + }, + wantLost: false, + }, + { + name: "stale leader still visible on one node => treat as has-quorum, not lost", + desiredSize: 3, + health: []etcdutils.EpHealth{member(1, true, false, 1), member(2, false, false, 0), member(3, false, false, 0)}, + wantLost: false, + }, + { + // A learner answers health probes but cannot vote, so a surviving + // voter + healthy learner is NOT a quorum for a 3-node cluster. + name: "3-node survivor voter + healthy learner, no leader => quorum LOST", + desiredSize: 3, + health: []etcdutils.EpHealth{member(1, true, false, 0), member(2, true, true, 0), member(3, false, false, 0)}, + wantLost: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := assessQuorum(tc.desiredSize, tc.health, tc.memberListErr) + assert.Equal(t, tc.wantLost, got.lost, "reason: %s", got.reason) + assert.NotEmpty(t, got.reason) + }) + } +} + +func newTestReconciler() *EtcdClusterReconciler { + scheme := runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + _ = appsv1.AddToScheme(scheme) + _ = ecv1alpha1.AddToScheme(scheme) + + cl := fake.NewClientBuilder().WithScheme(scheme).Build() + return &EtcdClusterReconciler{Client: cl, Scheme: scheme} +} + +func objKey(name, namespace string) client.ObjectKey { + return client.ObjectKey{Name: name, Namespace: namespace} +} + +func threeNodeCluster() *ecv1alpha1.EtcdCluster { + return &ecv1alpha1.EtcdCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "etcd", Namespace: "default"}, + Spec: ecv1alpha1.EtcdClusterSpec{Size: 3, Version: "3.5.17"}, + } +} + +func threeReplicaSTS() *appsv1.StatefulSet { + three := int32(3) + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: "etcd", Namespace: "default"}, + Spec: appsv1.StatefulSetSpec{ + Replicas: &three, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "etcd", + Args: []string{"--name=$(POD_NAME)", "--listen-peer-urls=http://0.0.0.0:2380"}, + }}, + }, + }, + }, + } +} + +// quorumLostHealth is the observation for a 3-node cluster that has lost quorum. +func quorumLostHealth() []etcdutils.EpHealth { + return []etcdutils.EpHealth{member(1, true, false, 0), member(2, false, false, 0), member(3, false, false, 0)} +} + +// survivorPod builds the ordinal-0 pod that the rebuild step requires to exist +// before it arms --force-new-cluster. Tests that drive the Rebuilding phase must +// create it so the survivor-presence gate passes. +func survivorPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "etcd-0", Namespace: "default"}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "etcd"}}}, + } +} + +// TestMaybeRecoverQuorum_GracePeriod verifies the detection state machine waits +// out the grace window and only commits to recovery once quorum loss is +// sustained — and that it cancels cleanly if quorum returns first. +func TestMaybeRecoverQuorum_GracePeriod(t *testing.T) { + t.Run("transient loss within grace period does not recover", func(t *testing.T) { + r := newTestReconciler() + ec := threeNodeCluster() + s := &reconcileState{cluster: ec, sts: threeReplicaSTS()} + + // First observation: enters Detecting, requeues, does NOT recover. + handled, _, err := r.maybeRecoverQuorum(context.Background(), s, quorumLostHealth(), nil) + require.NoError(t, err) + assert.True(t, handled) + require.NotNil(t, ec.Status.Recovery) + assert.Equal(t, ecv1alpha1.RecoveryPhaseDetecting, ec.Status.Recovery.Phase) + + // Quorum returns (leader visible) before grace elapses => detection cancelled. + healthy := []etcdutils.EpHealth{member(1, true, false, 1), member(2, true, false, 1), member(3, true, false, 1)} + handled, _, err = r.maybeRecoverQuorum(context.Background(), s, healthy, nil) + require.NoError(t, err) + assert.False(t, handled) + assert.Nil(t, ec.Status.Recovery, "detection should be cleared when quorum returns") + assert.Nil(t, meta.FindStatusCondition(ec.Status.Conditions, ConditionRecovering)) + }) + + t.Run("sustained loss past grace period transitions to Rebuilding", func(t *testing.T) { + sts := threeReplicaSTS() + r := newTestReconciler() + ec := threeNodeCluster() + require.NoError(t, r.Create(context.Background(), sts)) + require.NoError(t, r.Create(context.Background(), survivorPod())) + s := &reconcileState{cluster: ec, sts: sts} + + // Enter Detecting. + _, _, err := r.maybeRecoverQuorum(context.Background(), s, quorumLostHealth(), nil) + require.NoError(t, err) + require.NotNil(t, ec.Status.Recovery) + + // Backdate DetectedTime so the grace period is considered elapsed. + past := metav1.NewTime(time.Now().Add(-2 * quorumLossGracePeriod)) + ec.Status.Recovery.DetectedTime = &past + + // Next observation still shows loss => commit to recovery (Rebuilding) and + // the rebuild step injects --force-new-cluster + scales to 1. + handled, _, err := r.maybeRecoverQuorum(context.Background(), s, quorumLostHealth(), nil) + require.NoError(t, err) + assert.True(t, handled) + assert.Equal(t, ecv1alpha1.RecoveryPhaseRebuilding, ec.Status.Recovery.Phase) + + // The StatefulSet must now be scaled to a single survivor with the flag. + var got appsv1.StatefulSet + require.NoError(t, r.Get(context.Background(), objKey("etcd", "default"), &got)) + require.NotNil(t, got.Spec.Replicas) + assert.Equal(t, int32(1), *got.Spec.Replicas) + assert.True(t, stsHasForceNewCluster(&got)) + assert.Contains(t, got.Spec.Template.Spec.Containers[0].Args, forceNewClusterArg) + + // The state ConfigMap must be realigned to a single-member "new" bootstrap + // so pod-0's environment is consistent with --force-new-cluster. + var cm corev1.ConfigMap + require.NoError(t, r.Get(context.Background(), objKey("etcd-state", "default"), &cm)) + assert.Equal(t, "new", cm.Data["ETCD_INITIAL_CLUSTER_STATE"]) + assert.NotContains(t, cm.Data["ETCD_INITIAL_CLUSTER"], ",", "single-member initial cluster has no comma") + + cond := meta.FindStatusCondition(ec.Status.Conditions, ConditionRecovering) + require.NotNil(t, cond) + assert.Equal(t, metav1.ConditionTrue, cond.Status) + }) +} + +// TestRecoveryRebuild_FlagRemovedWhenSurvivorHealthy is covered indirectly via +// patch helpers; here we unit-test the idempotent patch toggling directly. +func TestPatchStatefulSetForForceNewCluster(t *testing.T) { + sts := threeReplicaSTS() + r := newTestReconciler() + require.NoError(t, r.Create(context.Background(), sts)) + + // Enable twice — must be idempotent (single flag, replicas==1). + require.NoError(t, r.patchStatefulSetForForceNewCluster(context.Background(), sts, true)) + require.NoError(t, r.patchStatefulSetForForceNewCluster(context.Background(), sts, true)) + + var got appsv1.StatefulSet + require.NoError(t, r.Get(context.Background(), objKey("etcd", "default"), &got)) + assert.Equal(t, int32(1), *got.Spec.Replicas) + assert.True(t, stsHasForceNewCluster(&got)) + count := 0 + for _, a := range got.Spec.Template.Spec.Containers[0].Args { + if a == forceNewClusterArg { + count++ + } + } + assert.Equal(t, 1, count, "flag must appear exactly once") + + // Disable removes the flag and the marker annotation. + require.NoError(t, r.patchStatefulSetForForceNewCluster(context.Background(), &got, false)) + var got2 appsv1.StatefulSet + require.NoError(t, r.Get(context.Background(), objKey("etcd", "default"), &got2)) + assert.False(t, stsHasForceNewCluster(&got2)) + assert.NotContains(t, got2.Spec.Template.Spec.Containers[0].Args, forceNewClusterArg) +} + +// TestRecoveryScaleOut_CompletesAtDesiredSize verifies the scale-out phase hands +// back to normal reconciliation while degraded and declares completion once the +// desired number of healthy voting members with a leader is observed. +func TestRecoveryScaleOut_CompletesAtDesiredSize(t *testing.T) { + r := newTestReconciler() + ec := threeNodeCluster() + now := metav1.Now() + ec.Status.Recovery = &ecv1alpha1.RecoveryStatus{ + Phase: ecv1alpha1.RecoveryPhaseScalingOut, + DetectedTime: &now, + } + sts := threeReplicaSTS() + + t.Run("still degraded delegates to normal reconcile, stays ScalingOut", func(t *testing.T) { + s := &reconcileState{cluster: ec, sts: sts, + memberListResp: memberListResp(1), // member list is usable this loop + memberHealth: []etcdutils.EpHealth{member(1, true, false, 1)}} // only survivor + handled, _, err := r.maybeRecoverQuorum(context.Background(), s, s.memberHealth, nil) + require.NoError(t, err) + assert.False(t, handled, "scale-out must delegate to reconcileClusterState") + assert.Equal(t, ecv1alpha1.RecoveryPhaseScalingOut, ec.Status.Recovery.Phase) + }) + + t.Run("unusable member list holds instead of delegating (no scale-down)", func(t *testing.T) { + // memberListResp nil and/or a member-list error must NOT delegate to + // reconcileClusterState, which would read memberCnt=0 and scale the STS + // down, reversing the rebuild. The state machine keeps the loop and requeues. + s := &reconcileState{cluster: ec, sts: sts, + memberHealth: []etcdutils.EpHealth{member(1, true, false, 1)}} + handled, requeue, err := r.maybeRecoverQuorum(context.Background(), s, s.memberHealth, errors.New("context deadline exceeded")) + require.NoError(t, err) + assert.True(t, handled, "must hold ownership when member list is unusable") + assert.Greater(t, requeue, time.Duration(0), "should requeue to retry the observation") + assert.Equal(t, ecv1alpha1.RecoveryPhaseScalingOut, ec.Status.Recovery.Phase) + }) + + t.Run("desired size reached marks Completed", func(t *testing.T) { + full := []etcdutils.EpHealth{member(1, true, false, 1), member(2, true, false, 1), member(3, true, false, 1)} + s := &reconcileState{cluster: ec, sts: sts, memberListResp: memberListResp(3), memberHealth: full} + handled, _, err := r.maybeRecoverQuorum(context.Background(), s, full, nil) + require.NoError(t, err) + assert.False(t, handled) + require.NotNil(t, ec.Status.Recovery) + assert.Equal(t, ecv1alpha1.RecoveryPhaseCompleted, ec.Status.Recovery.Phase) + assert.False(t, recoveryActive(ec), "completed recovery is not active") + + cond := meta.FindStatusCondition(ec.Status.Conditions, ConditionRecovering) + require.NotNil(t, cond) + assert.Equal(t, metav1.ConditionFalse, cond.Status) + assert.Equal(t, "RecoveryCompleted", cond.Reason) + }) +} + +// memberListResp builds a minimal MemberListResponse with n members, enough for +// recoveryScaleOut's "is the member list usable this loop" guard. +func memberListResp(n int) *clientv3.MemberListResponse { + members := make([]*etcdserverpb.Member, 0, n) + for i := 0; i < n; i++ { + members = append(members, &etcdserverpb.Member{ID: uint64(i + 1)}) + } + return &clientv3.MemberListResponse{Members: members} +} + +func TestCountHealthyVoting(t *testing.T) { + health := []etcdutils.EpHealth{ + member(1, true, false, 1), // healthy voter + leader + member(2, true, false, 1), // healthy voter + member(3, true, true, 0), // healthy but learner => excluded + member(4, false, false, 0), // unhealthy => excluded + } + healthy, leader := countHealthyVoting(health) + assert.Equal(t, 2, healthy) + assert.True(t, leader) +} + +// TestRecoveryRebuild_RemovesFlagAndAdvances drives the dangerous Rebuilding leg +// to completion: once the single survivor reports healthy and self-leader, the +// rebuild MUST remove --force-new-cluster (so a future pod-0 restart can't re-fork +// the cluster) and advance to ScalingOut. The survivor-health probe is injected +// via the clusterHealthFn seam so no live etcd is required. A FakeRecorder proves +// the scale-out transition emits an observable event. +func TestRecoveryRebuild_RemovesFlagAndAdvances(t *testing.T) { + r := newTestReconciler() + rec := events.NewFakeRecorder(16) + r.Recorder = rec + + ec := threeNodeCluster() + sts := threeReplicaSTS() + require.NoError(t, r.Create(context.Background(), sts)) + require.NoError(t, r.Create(context.Background(), survivorPod())) + + // Enter Rebuilding with the grace period already elapsed (sustained loss). + _, _, err := r.maybeRecoverQuorum(context.Background(), &reconcileState{cluster: ec, sts: sts}, quorumLostHealth(), nil) + require.NoError(t, err) + require.NotNil(t, ec.Status.Recovery) + past := metav1.NewTime(time.Now().Add(-2 * quorumLossGracePeriod)) + ec.Status.Recovery.DetectedTime = &past + _, _, err = r.maybeRecoverQuorum(context.Background(), &reconcileState{cluster: ec, sts: sts}, quorumLostHealth(), nil) + require.NoError(t, err) + require.Equal(t, ecv1alpha1.RecoveryPhaseRebuilding, ec.Status.Recovery.Phase) + + // Re-read the patched STS (flag injected, replicas==1). + var rebuilding appsv1.StatefulSet + require.NoError(t, r.Get(context.Background(), objKey("etcd", "default"), &rebuilding)) + require.True(t, stsHasForceNewCluster(&rebuilding)) + + // First post-injection loop: survivor not yet healthy => stay in Rebuilding, + // flag still present. + r.clusterHealthFn = func(eps []string) ([]etcdutils.EpHealth, error) { + return []etcdutils.EpHealth{member(1, false, false, 0)}, nil + } + handled, _, err := r.maybeRecoverQuorum(context.Background(), &reconcileState{cluster: ec, sts: &rebuilding}, nil, nil) + require.NoError(t, err) + assert.True(t, handled) + assert.Equal(t, ecv1alpha1.RecoveryPhaseRebuilding, ec.Status.Recovery.Phase) + + // Next loop: survivor is healthy AND its own leader => drop the flag and + // advance to ScalingOut. + r.clusterHealthFn = func(eps []string) ([]etcdutils.EpHealth, error) { + return []etcdutils.EpHealth{member(1, true, false, 1)}, nil // leader==self id 1 + } + handled, _, err = r.maybeRecoverQuorum(context.Background(), &reconcileState{cluster: ec, sts: &rebuilding}, nil, nil) + require.NoError(t, err) + assert.True(t, handled) + assert.Equal(t, ecv1alpha1.RecoveryPhaseScalingOut, ec.Status.Recovery.Phase) + + // The flag and marker annotation MUST be gone from the live StatefulSet. + var afterRebuild appsv1.StatefulSet + require.NoError(t, r.Get(context.Background(), objKey("etcd", "default"), &afterRebuild)) + assert.False(t, stsHasForceNewCluster(&afterRebuild), "marker annotation must be cleared after rebuild") + assert.NotContains(t, afterRebuild.Spec.Template.Spec.Containers[0].Args, forceNewClusterArg, + "--force-new-cluster must be removed so pod-0 can't re-fork on restart") + + // At least one event must mention the scale-out transition. + sawScaleOut := false + for drain := true; drain; { + select { + case e := <-rec.Events: + if strings.Contains(e, EventReasonRecoveryScalingOut) { + sawScaleOut = true + } + default: + drain = false + } + } + assert.True(t, sawScaleOut, "expected a RecoveryScalingOut event when the rebuild completes") +} + +func TestRecoveryActive(t *testing.T) { + ec := threeNodeCluster() + assert.False(t, recoveryActive(ec)) + + ec.Status.Recovery = &ecv1alpha1.RecoveryStatus{Phase: ecv1alpha1.RecoveryPhaseRebuilding} + assert.True(t, recoveryActive(ec)) + + ec.Status.Recovery.Phase = ecv1alpha1.RecoveryPhaseCompleted + assert.False(t, recoveryActive(ec)) +} + +// survivorHealth builds the single-survivor health observation reported once the +// force-new-cluster bootstrap is up: healthy, self-leader, at a given revision. +func survivorHealth(memberID uint64, revision int64, raftIndex uint64) []etcdutils.EpHealth { + return []etcdutils.EpHealth{{ + Ep: "http://etcd-0:2379", + Health: true, + Status: &clientv3.StatusResponse{ + Header: &etcdserverpb.ResponseHeader{MemberId: memberID, Revision: revision}, + Leader: memberID, // self-leader + RaftIndex: raftIndex, + }, + }} +} + +// TestRecoveryRebuild_RecordsPossibleDataLoss is the core data-loss-accounting +// test. When a rebuild from the survivor completes, the operator MUST surface the +// possible loss on every channel: the DataLoss status accounting (with the +// survivor's id + retained revision), the DataLossPossible condition set True, +// and a Warning Event. The accounting must be captured exactly once and must NOT +// be overwritten by a later loop observing a higher (post-recovery-write) revision. +func TestRecoveryRebuild_RecordsPossibleDataLoss(t *testing.T) { + r := newTestReconciler() + rec := events.NewFakeRecorder(32) + r.Recorder = rec + + ec := threeNodeCluster() + sts := threeReplicaSTS() + require.NoError(t, r.Create(context.Background(), sts)) + require.NoError(t, r.Create(context.Background(), survivorPod())) + + // Commit to recovery (grace already elapsed) => Rebuilding, flag injected. + _, _, err := r.maybeRecoverQuorum(context.Background(), &reconcileState{cluster: ec, sts: sts}, quorumLostHealth(), nil) + require.NoError(t, err) + require.NotNil(t, ec.Status.Recovery) + past := metav1.NewTime(time.Now().Add(-2 * quorumLossGracePeriod)) + ec.Status.Recovery.DetectedTime = &past + _, _, err = r.maybeRecoverQuorum(context.Background(), &reconcileState{cluster: ec, sts: sts}, quorumLostHealth(), nil) + require.NoError(t, err) + require.Equal(t, ecv1alpha1.RecoveryPhaseRebuilding, ec.Status.Recovery.Phase) + + // Committing to a destructive rebuild bumps the durable attempt counter. + assert.Equal(t, int32(1), ec.Status.Recovery.Attempts, "first commit to rebuild => attempt #1") + + var rebuilding appsv1.StatefulSet + require.NoError(t, r.Get(context.Background(), objKey("etcd", "default"), &rebuilding)) + + // Survivor comes up healthy and self-leader at revision 4242 / raftIndex 9000. + r.clusterHealthFn = func(eps []string) ([]etcdutils.EpHealth, error) { + return survivorHealth(0xabc, 4242, 9000), nil + } + handled, _, err := r.maybeRecoverQuorum(context.Background(), &reconcileState{cluster: ec, sts: &rebuilding}, nil, nil) + require.NoError(t, err) + assert.True(t, handled) + require.Equal(t, ecv1alpha1.RecoveryPhaseScalingOut, ec.Status.Recovery.Phase) + + // (1) DataLoss accounting captured with survivor id + retained revision. + dl := ec.Status.Recovery.DataLoss + require.NotNil(t, dl, "DataLoss accounting must be recorded on rebuild completion") + assert.Equal(t, "abc", dl.SurvivorMemberID, "member id is hex-encoded") + assert.Equal(t, int64(4242), dl.SurvivorRevision) + assert.Equal(t, uint64(9000), dl.RaftIndex) + require.NotNil(t, dl.RecoveredTime) + assert.Contains(t, dl.Message, "possible data loss") + assert.Contains(t, dl.Message, "revision 4242") + + // (2) DataLossPossible condition set True (audit marker, left True). + cond := meta.FindStatusCondition(ec.Status.Conditions, ConditionDataLossPossible) + require.NotNil(t, cond, "DataLossPossible condition must be set") + assert.Equal(t, metav1.ConditionTrue, cond.Status) + assert.Equal(t, "RebuiltFromSurvivor", cond.Reason) + + // (3) A Warning Event surfaced the loss loudly. + sawDataLoss := false + for drain := true; drain; { + select { + case e := <-rec.Events: + if strings.Contains(e, EventReasonPossibleDataLoss) && strings.Contains(e, "Warning") { + sawDataLoss = true + } + default: + drain = false + } + } + assert.True(t, sawDataLoss, "expected a Warning PossibleDataLoss event on rebuild completion") + + // Idempotency: a subsequent rebuild loop observing a HIGHER revision (a write + // landed post-recovery) must NOT clobber the originally-retained revision. We + // re-enter Rebuilding to exercise the "DataLoss already set" guard directly. + ec.Status.Recovery.Phase = ecv1alpha1.RecoveryPhaseRebuilding + // Re-arm the flag annotation so recoveryRebuild takes the "survivor healthy" leg. + require.NoError(t, r.patchStatefulSetForForceNewCluster(context.Background(), &rebuilding, true)) + r.clusterHealthFn = func(eps []string) ([]etcdutils.EpHealth, error) { + return survivorHealth(0xabc, 5000, 9999), nil + } + _, _, err = r.maybeRecoverQuorum(context.Background(), &reconcileState{cluster: ec, sts: &rebuilding}, nil, nil) + require.NoError(t, err) + assert.Equal(t, int64(4242), ec.Status.Recovery.DataLoss.SurvivorRevision, + "data-loss revision must be captured once and not overwritten by later loops") +} + +// TestRecoveryRebuild_HoldsWhenSurvivorPodMissing verifies the survivor-presence +// safety gate: the operator must NOT arm the irreversible --force-new-cluster flag +// while the survivor pod does not exist. It holds (handled, no flag) and requeues. +func TestRecoveryRebuild_HoldsWhenSurvivorPodMissing(t *testing.T) { + r := newTestReconciler() + ec := threeNodeCluster() + sts := threeReplicaSTS() + require.NoError(t, r.Create(context.Background(), sts)) + // Deliberately do NOT create the survivor pod. + + ec.Status.Recovery = &ecv1alpha1.RecoveryStatus{ + Phase: ecv1alpha1.RecoveryPhaseRebuilding, + DetectedTime: &metav1.Time{Time: time.Now()}, + } + + handled, requeue, err := r.maybeRecoverQuorum(context.Background(), &reconcileState{cluster: ec, sts: sts}, nil, nil) + require.NoError(t, err) + assert.True(t, handled, "must keep ownership while waiting for the survivor pod") + assert.Greater(t, requeue, time.Duration(0)) + + var got appsv1.StatefulSet + require.NoError(t, r.Get(context.Background(), objKey("etcd", "default"), &got)) + assert.False(t, stsHasForceNewCluster(&got), + "--force-new-cluster must NOT be armed until the survivor pod exists") +} diff --git a/test/e2e/quorum_recovery_test.go b/test/e2e/quorum_recovery_test.go new file mode 100644 index 00000000..cc5cb2c2 --- /dev/null +++ b/test/e2e/quorum_recovery_test.go @@ -0,0 +1,467 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/e2e-framework/klient/wait" + "sigs.k8s.io/e2e-framework/pkg/envconf" + "sigs.k8s.io/e2e-framework/pkg/features" + + ecv1alpha1 "go.etcd.io/etcd-operator/api/v1alpha1" +) + +// TestQuorumLossRecovery exercises the operator's automatic quorum-loss disaster +// recovery end to end: +// +// 1. Create a 3-member EtcdCluster backed by PVCs and write sentinel keys. +// 2. Break quorum by destroying two of the three members' pods AND their PVCs, +// and HOLDING them down (the StatefulSet eagerly recreates them with empty +// data and etcd's own membership lets an empty pod rejoin and re-form quorum, +// so a single delete self-heals before the operator ever acts — see +// holdMajorityDown). Keeping a majority continuously unreachable past the +// operator's detection + grace window is what produces the textbook +// unrecoverable-by-restart scenario the recovery path exists for. +// 3. Assert the operator detects sustained quorum loss and rebuilds a +// single-member cluster from the surviving member with --force-new-cluster, +// then re-adds the other members back to a healthy 3-member cluster. +// 4. Assert the irreversible --force-new-cluster residue is cleared, all members +// share ONE cluster id (no split-brain re-fork), the possible-data-loss +// accounting is surfaced, and the sentinel data both survived the rebuild and +// provably replicated to a re-added member. +// +// The whole flow is bounded by explicit timeouts so a wedged recovery fails the +// test rather than hanging. +func TestQuorumLossRecovery(t *testing.T) { + const ( + clusterName = "etcd-quorum-recovery" + size = 3 + sentinelKey = "quorum-recovery-canary" + sentinelVal = "survived-the-disaster" + // extraKeys are written alongside the sentinel so the survival check + // exercises the keyspace (and its revision), not a single round-trip. + extraKeys = 5 + ) + + feature := features.New("quorum-loss-recovery") + + feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + // Build a 3-node cluster on PVCs so deleting a PVC truly destroys that + // member's data (the precondition for an unrecoverable majority loss). + createEtcdClusterWithPVC(ctx, t, c, clusterName, size) + waitForSTSReadiness(t, c, clusterName, size) + // A ready 3-replica StatefulSet on a fresh cluster already implies three + // voting members with no learners, so we do not re-probe here; the + // post-recovery waitForNoLearners is the load-bearing one. + + // Write the sentinel plus a handful of extra keys. Capture the resulting + // revision so we can later assert the survivor retained the full keyspace, + // not just that one key happens to read back. + verifyDataOperations(t, c, clusterName, sentinelKey, sentinelVal) + for i := 0; i < extraKeys; i++ { + putKey(t, c, fmt.Sprintf("%s-0", clusterName), + fmt.Sprintf("%s-extra-%d", sentinelKey, i), fmt.Sprintf("val-%d", i)) + } + return ctx + }) + + feature.Assess("break quorum by destroying and holding down a majority of members", + func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + // Precondition: the survivor's PVC (ordinal 0) must exist — the entire + // recovery rebuilds from it. Guard against a future STS retention-policy + // change silently wiping it. + assertPVCExists(ctx, t, c, fmt.Sprintf("etcd-data-%s-0", clusterName)) + + // Destroy ordinals 1 and 2 (pods + PVCs) and KEEP them destroyed until + // the operator commits to recovery. A one-shot delete is not enough: the + // StatefulSet recreates the empty pods within seconds and etcd's persisted + // membership lets them rejoin and re-form quorum, self-healing before the + // operator's grace window elapses. holdMajorityDown force-deletes the + // recreated pods+PVCs on a tight loop so a majority stays continuously + // unreachable, which is the only state assessQuorum classifies as true, + // sustained quorum loss. Ordinal 0 (the survivor) is never touched. + holdMajorityDown(ctx, t, c, clusterName, []int{2, 1}, 5*time.Minute) + t.Logf("held %s-1 and %s-2 down (pods + PVCs) until the operator committed to recovery", clusterName, clusterName) + return ctx + }, + ) + + feature.Assess("operator recovers cluster to 3 healthy members", + func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + // Recovery: rebuild single-member cluster, then re-add 2 members via the + // learner path. Allow a generous bound; fail (not hang) if exceeded. + waitForRecoveryCompleted(t, c, clusterName, 12*time.Minute) + waitForSTSReadiness(t, c, clusterName, size) + waitForNoLearners(t, c, fmt.Sprintf("%s-0", clusterName), size, 5*time.Minute) + return ctx + }, + ) + + feature.Assess("rebuild flag cleared and cluster did not split-brain", + func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + // The single most dangerous failure mode is leaving --force-new-cluster + // behind: pod-0 would re-fork the cluster on its next restart, or a pod + // that already force-forked now diverges. Once recovery is Completed both + // the marker annotation and the flag MUST be gone from the StatefulSet, + // AND all three live members must report ONE shared cluster id — a re-fork + // produces a divergent cluster id, which the spec-only check cannot catch. + assertForceNewClusterCleared(ctx, t, c, clusterName) + assertSingleClusterID(t, c, clusterName, size) + return ctx + }, + ) + + feature.Assess("possible data loss is surfaced (condition + status accounting)", + func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + // Recovery via --force-new-cluster is not lossless: it must NEVER be + // silent. After a rebuild the operator must record the data-loss + // accounting (survivor id + the RETAINED revision) and bump the durable + // attempt counter, and raise the DataLossPossible condition so a human can + // audit the loss. + assertPossibleDataLossSurfaced(ctx, t, c, clusterName) + return ctx + }, + ) + + feature.Assess("sentinel data survived the rebuild and replicated to a re-added member", + func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + // Read from the survivor (proves the data survived the force-new-cluster + // bootstrap) AND from a member that was destroyed and re-added. The + // re-added member is read with a node-LOCAL, serializable read so it is + // provably served from that member's own store — a linearizable read + // would route to the leader (almost always the survivor) and could pass + // even if replication to the rejoined member never happened. + survivor := fmt.Sprintf("%s-0", clusterName) + if got := readKey(t, c, survivor, sentinelKey); got != sentinelVal { + t.Fatalf("sentinel missing on survivor %s after recovery: %q=%q, want %q", + survivor, sentinelKey, got, sentinelVal) + } + t.Logf("sentinel intact on survivor %s: %s=%s", survivor, sentinelKey, sentinelVal) + + readded := fmt.Sprintf("%s-1", clusterName) + if got := readKeyLocalSerializable(t, c, readded, sentinelKey); got != sentinelVal { + t.Fatalf("sentinel did not replicate to re-added member %s (local serializable read): %q=%q, want %q", + readded, sentinelKey, got, sentinelVal) + } + t.Logf("sentinel replicated to re-added member %s (local serializable read): %s=%s", readded, sentinelKey, sentinelVal) + + // The whole keyspace, not just one key, must be present on the re-added + // member. Count keys under the sentinel prefix locally on etcd-1. + wantKeys := extraKeys + 1 // the sentinel + the extras + if got := countKeysLocalSerializable(t, c, readded, sentinelKey); got != wantKeys { + t.Fatalf("re-added member %s holds %d keys under %q prefix, want %d (keyspace did not fully replicate)", + readded, got, sentinelKey, wantKeys) + } + t.Logf("full keyspace (%d keys) replicated to re-added member %s", wantKeys, readded) + return ctx + }, + ) + + feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + cleanupEtcdCluster(ctx, t, c, clusterName) + return ctx + }) + + _ = testEnv.Test(t, feature.Feature()) +} + +// holdMajorityDown destroys the given member ordinals (pods + PVCs) and keeps them +// destroyed — force-deleting whatever the StatefulSet recreates — until the +// operator has COMMITTED to recovery (Recovery phase Rebuilding/ScalingOut/ +// Completed). Holding is required because a single delete self-heals: the +// StatefulSet recreates the empty pods and etcd's persisted membership lets them +// rejoin and re-form quorum long before the operator's detection + grace window +// elapses. Once the operator scales the StatefulSet down to the single survivor +// to inject --force-new-cluster, the recreated majority pods stop coming back and +// the loop completes. Ordinal 0 (the survivor) is never touched. +func holdMajorityDown(ctx context.Context, t *testing.T, c *envconf.Config, clusterName string, ordinals []int, timeout time.Duration) { + t.Helper() + committed := func(ec *ecv1alpha1.EtcdCluster) bool { + if ec.Status.Recovery == nil { + return false + } + switch ec.Status.Recovery.Phase { + case ecv1alpha1.RecoveryPhaseRebuilding, + ecv1alpha1.RecoveryPhaseScalingOut, + ecv1alpha1.RecoveryPhaseCompleted: + return true + } + return false + } + + err := wait.For(func(ctx context.Context) (bool, error) { + ec := getEtcdCluster(ctx, t, c, clusterName) + if committed(ec) { + t.Logf("operator committed to recovery, phase=%s msg=%q; stopping the majority-down hold", + ec.Status.Recovery.Phase, ec.Status.Recovery.Message) + return true, nil + } + // Re-destroy the majority's data and pods. Best-effort: any of these may be + // transiently absent/Terminating between recreations. + for _, ord := range ordinals { + deleteMemberPVCAndPod(ctx, t, c, clusterName, ord) + } + if rec := ec.Status.Recovery; rec != nil { + t.Logf("holding majority down; recovery phase=%s detail=%q", rec.Phase, rec.Message) + } + return false, nil + }, wait.WithTimeout(timeout), wait.WithInterval(3*time.Second)) + if err != nil { + t.Fatalf("operator did not commit to quorum-loss recovery within %s while a majority was held down: %v", timeout, err) + } +} + +// deleteMemberPVCAndPod force-deletes a single etcd member's pod and its PVC, +// destroying that member's data. The PVC is deleted first so the StatefulSet +// cannot re-bind the old volume when it recreates the pod, and the pod is removed +// with grace period 0 so it goes away immediately rather than draining — both are +// needed for the majority to be simultaneously, durably unreachable. +func deleteMemberPVCAndPod(ctx context.Context, t *testing.T, c *envconf.Config, clusterName string, ordinal int) { + t.Helper() + client := c.Client() + podName := fmt.Sprintf("%s-%d", clusterName, ordinal) + // PVC name follows the StatefulSet volumeClaimTemplate convention: -. + pvcName := fmt.Sprintf("etcd-data-%s-%d", clusterName, ordinal) + + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: namespace}, + } + if err := client.Resources().Delete(ctx, pvc); err != nil && !apierrors.IsNotFound(err) { + t.Logf("deleting PVC %s (best-effort): %v", pvcName, err) + } + + zero := int64(0) + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: podName, Namespace: namespace}} + if err := client.Resources().Delete(ctx, pod, func(o *metav1.DeleteOptions) { + o.GracePeriodSeconds = &zero + }); err != nil && !apierrors.IsNotFound(err) { + t.Logf("deleting pod %s (best-effort): %v", podName, err) + } +} + +// assertPVCExists fails if the named PVC is absent. Used to assert the survivor's +// volume is intact before (and implicitly the precondition recovery relies on). +func assertPVCExists(ctx context.Context, t *testing.T, c *envconf.Config, pvcName string) { + t.Helper() + var pvc corev1.PersistentVolumeClaim + if err := c.Client().Resources().Get(ctx, pvcName, namespace, &pvc); err != nil { + t.Fatalf("survivor PVC %s must exist before breaking quorum, but: %v", pvcName, err) + } +} + +// assertForceNewClusterCleared verifies the recovery left no residue on the +// StatefulSet: neither the marker annotation nor the --force-new-cluster flag may +// remain, or pod-0 would re-fork the cluster on its next restart. +func assertForceNewClusterCleared(ctx context.Context, t *testing.T, c *envconf.Config, clusterName string) { + t.Helper() + // Mirror the controller's constants (not importable from the e2e package). + const ( + forceNewClusterAnnotation = "operator.etcd.io/force-new-cluster" + forceNewClusterArg = "--force-new-cluster" + ) + var sts appsv1.StatefulSet + if err := c.Client().Resources().Get(ctx, clusterName, namespace, &sts); err != nil { + t.Fatalf("failed to get StatefulSet %s: %v", clusterName, err) + } + if _, ok := sts.Annotations[forceNewClusterAnnotation]; ok { + t.Fatalf("StatefulSet %s still carries the %s annotation after recovery", clusterName, forceNewClusterAnnotation) + } + for _, ct := range sts.Spec.Template.Spec.Containers { + for _, a := range ct.Args { + if a == forceNewClusterArg { + t.Fatalf("container %q still carries %s after recovery", ct.Name, forceNewClusterArg) + } + } + } + t.Logf("StatefulSet %s is clean: no force-new-cluster annotation or flag", clusterName) +} + +// assertSingleClusterID verifies every member reports the SAME etcd cluster id. +// A --force-new-cluster re-fork produces a divergent cluster id; this is the +// cheapest strong proof that the rebuild re-formed one coherent cluster rather +// than leaving a pod running a forked raft. It also confirms the expected member +// count. +func assertSingleClusterID(t *testing.T, c *envconf.Config, clusterName string, expectedMembers int) { + t.Helper() + ml := getEtcdMemberListPB(t, c, fmt.Sprintf("%s-0", clusterName)) + if ml.Header == nil || ml.Header.ClusterId == 0 { + t.Fatalf("member list response has no cluster id") + } + if len(ml.Members) != expectedMembers { + t.Fatalf("member list shows %d members, want %d", len(ml.Members), expectedMembers) + } + // Cross-check each member's view of the cluster id from its own endpoint. + want := ml.Header.ClusterId + for i := 0; i < expectedMembers; i++ { + pod := fmt.Sprintf("%s-%d", clusterName, i) + got := getEtcdMemberListPB(t, c, pod) + if got.Header == nil || got.Header.ClusterId != want { + var have uint64 + if got.Header != nil { + have = got.Header.ClusterId + } + t.Fatalf("member %s reports cluster id %x, want %x — the cluster split-brained during recovery", + pod, have, want) + } + } + t.Logf("all %d members share one cluster id %x — no split-brain re-fork", expectedMembers, want) +} + +// assertPossibleDataLossSurfaced verifies the operator did NOT silently recover: +// after a force-new-cluster rebuild it must raise the DataLossPossible condition +// (True) and populate status.recovery.dataLoss with the survivor's identity and +// the RETAINED revision, and it must have bumped the durable attempt counter, so +// the loss is auditable. +func assertPossibleDataLossSurfaced(ctx context.Context, t *testing.T, c *envconf.Config, clusterName string) { + t.Helper() + // Mirror the controller's condition type (not importable from the e2e package). + const conditionDataLossPossible = "DataLossPossible" + + ec := getEtcdCluster(ctx, t, c, clusterName) + + cond := meta.FindStatusCondition(ec.Status.Conditions, conditionDataLossPossible) + if cond == nil { + t.Fatalf("expected %s condition after a force-new-cluster rebuild; recovery must not be silent", conditionDataLossPossible) + } + if cond.Status != metav1.ConditionTrue { + t.Fatalf("%s condition = %q, want True", conditionDataLossPossible, cond.Status) + } + + if ec.Status.Recovery == nil { + t.Fatalf("expected status.recovery after rebuild, got nil") + } + if ec.Status.Recovery.Attempts < 1 { + t.Fatalf("expected status.recovery.attempts >= 1 after a committed recovery, got %d", ec.Status.Recovery.Attempts) + } + if ec.Status.Recovery.DataLoss == nil { + t.Fatalf("expected status.recovery.dataLoss accounting after rebuild, got nil") + } + dl := ec.Status.Recovery.DataLoss + if dl.SurvivorMemberID == "" { + t.Fatalf("data-loss accounting missing survivor member id") + } + // The retained revision is the load-bearing accounting value. We wrote the + // sentinel + extra keys before the disaster, so the survivor's store is well + // past the empty-cluster revision; a zero here means the capture is broken + // (e.g. a nil header silently zeroed it). + if dl.SurvivorRevision <= 1 { + t.Fatalf("data-loss accounting has SurvivorRevision=%d; expected the survivor's real (>1) revision after writes", dl.SurvivorRevision) + } + if dl.RecoveredTime == nil { + t.Fatalf("data-loss accounting missing recoveredTime") + } + if !strings.Contains(dl.Message, "data loss") && !strings.Contains(dl.Message, "NOT retained") { + t.Fatalf("data-loss message %q does not describe the loss", dl.Message) + } + t.Logf("possible data loss surfaced: member=%s revision=%d attempts=%d (%s)", + dl.SurvivorMemberID, dl.SurvivorRevision, ec.Status.Recovery.Attempts, dl.Message) +} + +// getEtcdCluster fetches the EtcdCluster CR. +func getEtcdCluster(ctx context.Context, t *testing.T, c *envconf.Config, name string) *ecv1alpha1.EtcdCluster { + t.Helper() + var ec ecv1alpha1.EtcdCluster + if err := c.Client().Resources().Get(ctx, name, namespace, &ec); err != nil { + t.Fatalf("failed to get EtcdCluster %s: %v", name, err) + } + return &ec +} + +// waitForRecoveryCompleted blocks until the recovery state machine reports the +// Completed phase. +func waitForRecoveryCompleted( + t *testing.T, c *envconf.Config, name string, timeout time.Duration, +) { + t.Helper() + err := wait.For(func(ctx context.Context) (bool, error) { + ec := getEtcdCluster(ctx, t, c, name) + if ec.Status.Recovery != nil && ec.Status.Recovery.Phase == ecv1alpha1.RecoveryPhaseCompleted { + t.Logf("recovery completed: %s", ec.Status.Recovery.Message) + return true, nil + } + return false, nil + }, wait.WithTimeout(timeout), wait.WithInterval(5*time.Second)) + if err != nil { + t.Fatalf("quorum-loss recovery did not complete within %s: %v", timeout, err) + } +} + +// putKey writes a single key via etcdctl inside the given pod. +func putKey(t *testing.T, c *envconf.Config, podName, key, value string) { + t.Helper() + _, stderr, err := execInPod(t, c, podName, namespace, []string{"etcdctl", "put", key, value}) + if err != nil { + t.Fatalf("failed to put key %q on %s: %v, stderr: %s", key, podName, err, stderr) + } +} + +// readKey reads a single key's value from the given pod via etcdctl (default +// linearizable read — may be served by the leader). +func readKey(t *testing.T, c *envconf.Config, podName, key string) string { + t.Helper() + stdout, stderr, err := execInPod(t, c, podName, namespace, + []string{"etcdctl", "get", key, "--print-value-only"}) + if err != nil { + t.Fatalf("failed to read key %q from %s: %v, stderr: %s", key, podName, err, stderr) + } + return strings.TrimSpace(stdout) +} + +// readKeyLocalSerializable reads a key from the pod's OWN local store: it pins the +// endpoint to the in-pod address and forces a serializable read so the answer is +// served by that member, not routed to the leader. This is what proves data +// actually replicated to a re-added member rather than being read back from the +// untouched survivor. +func readKeyLocalSerializable(t *testing.T, c *envconf.Config, podName, key string) string { + t.Helper() + stdout, stderr, err := execInPod(t, c, podName, namespace, + []string{"etcdctl", "get", key, "--print-value-only", + "--endpoints=http://127.0.0.1:2379", "--consistency=s"}) + if err != nil { + t.Fatalf("failed local serializable read of %q from %s: %v, stderr: %s", key, podName, err, stderr) + } + return strings.TrimSpace(stdout) +} + +// countKeysLocalSerializable counts the keys under the given prefix from the pod's +// own local store (serializable, node-local). +func countKeysLocalSerializable(t *testing.T, c *envconf.Config, podName, prefix string) int { + t.Helper() + stdout, stderr, err := execInPod(t, c, podName, namespace, + []string{"etcdctl", "get", prefix, "--prefix", "--keys-only", + "--endpoints=http://127.0.0.1:2379", "--consistency=s"}) + if err != nil { + t.Fatalf("failed local serializable key count for %q from %s: %v, stderr: %s", prefix, podName, err, stderr) + } + n := 0 + for _, line := range strings.Split(strings.TrimSpace(stdout), "\n") { + if strings.TrimSpace(line) != "" { + n++ + } + } + return n +}