Skip to content

Commit 80da27f

Browse files
committed
cluster: scale-out done check fixed
when scaling the cluster out, check joining nodes before finishing action Fixes #182
1 parent c14fb66 commit 80da27f

File tree

6 files changed

+156
-47
lines changed

6 files changed

+156
-47
lines changed

controllers/cassandracluster/cassandra_status.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,14 +372,28 @@ func (rcc *CassandraClusterReconciler) UpdateStatusIfActionEnded(ctx context.Con
372372
//Does the Scaling ended ?
373373
if nodesPerRacks == storedStatefulSet.Status.Replicas {
374374

375-
podsList, err := rcc.ListPods(ctx, cc.Namespace, k8s.LabelsForCassandraDCRack(cc, dcName, rackName))
375+
podsList, err := rcc.ListPodsOrderByNameAscending(ctx, cc.Namespace, k8s.LabelsForCassandraDCRack(cc, dcName, rackName))
376376
nb := len(podsList.Items)
377377
if err != nil || nb < 1 {
378378
return false
379379
}
380+
if nb < int(nodesPerRacks) {
381+
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "rack": dcRackName}).Warn(fmt.Sprintf(
382+
"Although statefulSet has %d replicas, only %d matching pods found", nodesPerRacks, nb))
383+
return false
384+
}
380385
pod := podsList.Items[nodesPerRacks-1]
386+
381387
//We need lastPod to be running to consider ScaleUp ended
382388
if cassandraPodIsReady(&pod) {
389+
if hasJoiningNodes, err := rcc.hasJoiningNodes(ctx, cc); err != nil {
390+
return false
391+
} else if hasJoiningNodes {
392+
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "dc": dcName, "rack": rackName,
393+
"err": err}).Info("Cluster has joining nodes, ScaleUp not yet completed")
394+
return false
395+
}
396+
383397
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "rack": dcRackName}).Info("ScaleUp is Done")
384398
rackLastAction.Status = api.StatusDone
385399
rackLastAction.EndTime = &now

controllers/cassandracluster/cassandra_status_test.go

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ func HelperInitCluster(t *testing.T, name string) (*CassandraClusterReconciler,
9797
var cc api.CassandraCluster
9898
yaml.Unmarshal(common.HelperLoadBytes(t, name), &cc)
9999

100+
cc.UID = "123456789" //We need to set a UID so PatchMaker does not fail when comparing owner references
101+
100102
ccList := api.CassandraClusterList{}
101103
//Create Fake client
102104
//Objects to track in the Fake client
@@ -203,33 +205,11 @@ func helperCreateCassandraCluster(ctx context.Context, t *testing.T, cassandraCl
203205
rcc.Client.Status().Update(ctx, sts)
204206

205207
//Create Statefulsets associated fake Pods
206-
podTemplate := v1.Pod{
207-
ObjectMeta: metav1.ObjectMeta{
208-
Name: "template",
209-
Namespace: namespace,
210-
Labels: map[string]string{
211-
"cluster": cc.Labels["cluster"],
212-
"dc-rack": dcRackName,
213-
"cassandraclusters.db.orange.com.dc": dc.Name,
214-
"cassandraclusters.db.orange.com.rack": rack.Name,
215-
"app": "cassandracluster",
216-
"cassandracluster": cc.Name,
217-
},
218-
},
219-
Status: v1.PodStatus{
220-
Phase: v1.PodRunning,
221-
ContainerStatuses: []v1.ContainerStatus{
222-
{
223-
Name: "cassandra",
224-
Ready: true,
225-
},
226-
},
227-
},
228-
}
208+
podTemplate := fakePodTemplate(cc, dc.Name, rack.Name)
229209

230210
for i := 0; i < int(sts.Status.Replicas); i++ {
231211
pod := podTemplate.DeepCopy()
232-
pod.Name = sts.Name + strconv.Itoa(i)
212+
pod.Name = sts.Name + "-" + strconv.Itoa(i)
233213
pod.Spec.Hostname = pod.Name
234214
pod.Spec.Subdomain = cc.Name
235215
if err = rcc.CreatePod(ctx, pod); err != nil {
@@ -265,6 +245,33 @@ func helperCreateCassandraCluster(ctx context.Context, t *testing.T, cassandraCl
265245
return rcc, &req
266246
}
267247

248+
func fakePodTemplate(cc *api.CassandraCluster, dcName, rackName string) v1.Pod {
249+
dcRackName := cc.GetDCRackName(dcName, rackName)
250+
return v1.Pod{
251+
ObjectMeta: metav1.ObjectMeta{
252+
Name: "template",
253+
Namespace: namespace,
254+
Labels: map[string]string{
255+
"cluster": cc.Labels["cluster"],
256+
"dc-rack": dcRackName,
257+
"cassandraclusters.db.orange.com.dc": dcName,
258+
"cassandraclusters.db.orange.com.rack": rackName,
259+
"app": "cassandracluster",
260+
"cassandracluster": cc.Name,
261+
},
262+
},
263+
Status: v1.PodStatus{
264+
Phase: v1.PodRunning,
265+
ContainerStatuses: []v1.ContainerStatus{
266+
{
267+
Name: "cassandra",
268+
Ready: true,
269+
},
270+
},
271+
},
272+
}
273+
}
274+
268275
func TestCassandraClusterReconciler(t *testing.T) {
269276
// tests speed-up
270277
httpmock.Activate()
@@ -476,6 +483,24 @@ func TestUpdateStatusIfDockerImageHasChanged(t *testing.T) {
476483

477484
}
478485

486+
func assertRackStatusPhase(assert *assert.Assertions, rcc *CassandraClusterReconciler, dcRackName string, expectedPhase api.ClusterStateInfo) {
487+
assert.Equal(expectedPhase.Name, rcc.cc.Status.CassandraRackStatus[dcRackName].Phase, dcRackName + " phase")
488+
}
489+
490+
func assertClusterStatusPhase(assert *assert.Assertions, rcc *CassandraClusterReconciler, expectedPhase api.ClusterStateInfo) {
491+
assert.Equal(expectedPhase.Name, rcc.cc.Status.Phase, "cluster phase")
492+
}
493+
494+
func assertRackStatusLastAction(assert *assert.Assertions, rcc *CassandraClusterReconciler, dcRackName string, expectedActionType api.ClusterStateInfo, expectedActionStatus string) {
495+
assert.Equal(expectedActionType.Name, rcc.cc.Status.CassandraRackStatus[dcRackName].CassandraLastAction.Name, "dc1-rack1 last action type")
496+
assert.Equal(expectedActionStatus, rcc.cc.Status.CassandraRackStatus[dcRackName].CassandraLastAction.Status, "dc1-rack1 last action status")
497+
}
498+
499+
func assertClusterStatusLastAction(assert *assert.Assertions, rcc *CassandraClusterReconciler, expectedActionType api.ClusterStateInfo, expectedActionStatus string) {
500+
assert.Equal(expectedActionType.Name, rcc.cc.Status.LastClusterAction, "cluster last action type")
501+
assert.Equal(expectedActionStatus, rcc.cc.Status.LastClusterActionStatus, "cluster last action status")
502+
}
503+
479504
func overrideDelayWaitWithNoDelay() {
480505
delayWait = func() time.Duration {
481506
return 0

controllers/cassandracluster/decommission_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ type podName struct {
6969
}
7070

7171
func podHost(stfsName string, id int8, rcc *CassandraClusterReconciler) podName {
72-
name := stfsName + strconv.Itoa(int(id))
72+
name := stfsName + "-" + strconv.Itoa(int(id))
7373
return podName{name, name + "." + rcc.cc.Name}
7474
}
7575

controllers/cassandracluster/pod.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,7 @@ func GetLastOrFirstPodItem(podsList []v1.Pod, last bool) (*v1.Pod, error) {
8888

8989
items := podsList[:]
9090

91-
// Sort pod list using ending number in field ObjectMeta.Name
92-
sort.Slice(items, func(i, j int) bool {
93-
id1, _ := strconv.Atoi(reEndingNumber.FindString(items[i].ObjectMeta.Name))
94-
id2, _ := strconv.Atoi(reEndingNumber.FindString(items[j].ObjectMeta.Name))
95-
return id1 < id2
96-
})
91+
sortPodsList(items)
9792

9893
idx := 0
9994
if last {
@@ -105,6 +100,15 @@ func GetLastOrFirstPodItem(podsList []v1.Pod, last bool) (*v1.Pod, error) {
105100
return &pod, nil
106101
}
107102

103+
// sortPodsList sorts pod list using ending number in field ObjectMeta.Name
104+
func sortPodsList(items []v1.Pod) {
105+
sort.Slice(items, func(i, j int) bool {
106+
id1, _ := strconv.Atoi(reEndingNumber.FindString(items[i].ObjectMeta.Name))
107+
id2, _ := strconv.Atoi(reEndingNumber.FindString(items[j].ObjectMeta.Name))
108+
return id1 < id2
109+
})
110+
}
111+
108112
// GetFirstPod returns the first pod satisfying the selector and being in the namespace
109113
func (rcc *CassandraClusterReconciler) GetFirstPod(ctx context.Context, namespace string, selector map[string]string) (*v1.Pod, error) {
110114
podsList, err := rcc.ListPods(ctx, namespace, selector)
@@ -182,7 +186,6 @@ func (rcc *CassandraClusterReconciler) hasUnschedulablePod(ctx context.Context,
182186
}
183187

184188
func (rcc *CassandraClusterReconciler) ListPods(ctx context.Context, namespace string, selector map[string]string) (*v1.PodList, error) {
185-
186189
clientOpt := &client.ListOptions{
187190
Namespace: namespace,
188191
LabelSelector: labels.SelectorFromSet(selector),
@@ -196,6 +199,14 @@ func (rcc *CassandraClusterReconciler) ListPods(ctx context.Context, namespace s
196199
return pl, rcc.Client.List(ctx, pl, opt...)
197200
}
198201

202+
func (rcc *CassandraClusterReconciler) ListPodsOrderByNameAscending(ctx context.Context, namespace string, selector map[string]string) (*v1.PodList, error) {
203+
pods, err := rcc.ListPods(ctx, namespace, selector)
204+
if pods != nil {
205+
sortPodsList(pods.Items)
206+
}
207+
return pods, err
208+
}
209+
199210
func (rcc *CassandraClusterReconciler) CreatePod(ctx context.Context, pod *v1.Pod) error {
200211
err := rcc.Client.Create(ctx, pod)
201212
if err != nil {

controllers/cassandracluster/pod_operation.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -112,21 +112,9 @@ func (rcc *CassandraClusterReconciler) handlePodOperation(ctx context.Context, c
112112
return breakResyncLoopSwitch, err
113113
}
114114

115-
podsList, err := rcc.ListCassandraClusterPods(ctx, cc)
116-
if err != nil {
117-
return true, err
118-
}
119-
firstPod, err := GetLastOrFirstPodReady(podsList, false)
120-
if err != nil {
121-
return true, err
122-
}
123-
124-
hostName := k8s.PodHostname(*firstPod)
125-
jolokiaClient, _ := NewJolokiaClient(ctx, hostName, JolokiaPort, rcc, cc.Spec.ImageJolokiaSecret, cc.Namespace)
126-
127-
hasJoiningNodes, err := jolokiaClient.hasJoiningNodes()
115+
hasJoiningNodes, err := rcc.hasJoiningNodes(ctx, cc)
128116
if err != nil {
129-
return true, err
117+
return breakResyncLoop, err
130118
}
131119
if hasJoiningNodes {
132120
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "dc": dcName, "rack": rackName,
@@ -151,6 +139,24 @@ func (rcc *CassandraClusterReconciler) handlePodOperation(ctx context.Context, c
151139
return breakResyncLoopSwitch, err
152140
}
153141

142+
func (rcc *CassandraClusterReconciler) hasJoiningNodes(ctx context.Context, cc *api.CassandraCluster) (bool, error) {
143+
podsList, err := rcc.ListCassandraClusterPods(ctx, cc)
144+
if err != nil {
145+
return false, err
146+
}
147+
firstPod, err := GetLastOrFirstPodReady(podsList, false)
148+
if err != nil {
149+
return false, err
150+
}
151+
152+
hostName := k8s.PodHostname(*firstPod)
153+
jolokiaClient, err := NewJolokiaClient(ctx, hostName, JolokiaPort, rcc, cc.Spec.ImageJolokiaSecret, cc.Namespace)
154+
if err != nil {
155+
return false, err
156+
}
157+
return jolokiaClient.hasJoiningNodes()
158+
}
159+
154160
// addPodOperationLabels will add Pod Labels labels on all Pod in the Current dcRackName
155161
func (rcc *CassandraClusterReconciler) addPodOperationLabels(ctx context.Context, cc *api.CassandraCluster, dcName string,
156162
rackName string, labels map[string]string) {

controllers/cassandracluster/scaleup_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package cassandracluster
33
import (
44
"context"
55
"fmt"
6+
"strconv"
67
"strings"
78
"testing"
89

10+
api "github.com/cscetbon/casskop/api/v2"
911
"github.com/jarcoal/httpmock"
1012
"github.com/stretchr/testify/assert"
1113
)
@@ -33,6 +35,31 @@ func registerJolokiaOperationJoiningNodes(host podName, numberOfJoiningNodes int
3335
"status": 200}`, stringOfSlice(joiningNodes))))
3436
}
3537

38+
func simulateNewPodsReady(t *testing.T, rcc *CassandraClusterReconciler, stfsName string, dc api.DC, scaleFrom, scaleTo int) {
39+
assert := assert.New(t)
40+
41+
sts, err := rcc.GetStatefulSet(ctx, rcc.cc.Namespace, stfsName)
42+
assert.NoError(err, "get sts")
43+
44+
//Now simulate sts to be ready for CassKop
45+
sts.Status.Replicas = *sts.Spec.Replicas
46+
sts.Status.ReadyReplicas = *sts.Spec.Replicas
47+
err = rcc.Client.Status().Update(ctx, sts)
48+
assert.NoError(err, "update sts status")
49+
50+
// create new fake Pods (consequence of scale-out) so action may finish
51+
podTemplate := fakePodTemplate(rcc.cc, dc.Name, dc.Rack[0].Name)
52+
for i := scaleFrom; i < scaleTo; i++ {
53+
pod := podTemplate.DeepCopy()
54+
pod.Name = sts.Name + "-" + strconv.Itoa(i)
55+
pod.Spec.Hostname = pod.Name
56+
pod.Spec.Subdomain = rcc.cc.Name
57+
if err = rcc.CreatePod(ctx, pod); err != nil {
58+
t.Fatalf("can't create pod: (%v)", err)
59+
}
60+
}
61+
}
62+
3663
func TestAddTwoNodes(t *testing.T) {
3764
overrideDelayWaitWithNoDelay()
3865
defer restoreDefaultDelayWait()
@@ -78,4 +105,30 @@ func TestAddTwoNodes(t *testing.T) {
78105
assert.GreaterOrEqual(jolokiaCallsCount(firstPod), 1)
79106
assertStatefulsetReplicas(ctx, t, rcc, expectedReplicas+1, cassandraCluster.Namespace, stfsName)
80107
}
108+
109+
assertClusterStatusPhase(assert, rcc, api.ClusterPhasePending)
110+
assertRackStatusPhase(assert, rcc, "dc1-rack1", api.ClusterPhasePending)
111+
assertClusterStatusLastAction(assert, rcc, api.ActionScaleUp, api.StatusOngoing)
112+
assertRackStatusLastAction(assert, rcc, "dc1-rack1", api.ActionScaleUp, api.StatusOngoing)
113+
114+
//Reconcile does not end the action even when sts and all new pods are ready, because there are joining nodes
115+
simulateNewPodsReady(t, rcc, stfsName, dc, 3, 5)
116+
registerJolokiaOperationJoiningNodes(firstPod, 1)
117+
for reconcileIteration := 0; reconcileIteration <= 2; reconcileIteration++ {
118+
reconcileValidation(t, rcc, *req)
119+
assert.GreaterOrEqual(jolokiaCallsCount(firstPod), 1)
120+
assertClusterStatusPhase(assert, rcc, api.ClusterPhasePending)
121+
assertRackStatusPhase(assert, rcc, "dc1-rack1", api.ClusterPhaseRunning)
122+
assertClusterStatusLastAction(assert, rcc, api.ActionScaleUp, api.StatusOngoing)
123+
assertRackStatusLastAction(assert, rcc, "dc1-rack1", api.ActionScaleUp, api.StatusOngoing)
124+
}
125+
126+
//Reconcile ends the action when sts and all new pods are ready and there are no joining nodes
127+
registerJolokiaOperationJoiningNodes(firstPod, 0)
128+
reconcileValidation(t, rcc, *req)
129+
assert.GreaterOrEqual(jolokiaCallsCount(firstPod), 1)
130+
assertClusterStatusPhase(assert, rcc, api.ClusterPhaseRunning)
131+
assertRackStatusPhase(assert, rcc, "dc1-rack1", api.ClusterPhaseRunning)
132+
assertClusterStatusLastAction(assert, rcc, api.ActionScaleUp, api.StatusDone)
133+
assertRackStatusLastAction(assert, rcc, "dc1-rack1", api.ActionScaleUp, api.StatusDone)
81134
}

0 commit comments

Comments
 (0)