Skip to content

Commit 440a327

Browse files
committed
fix non-atomic replica number read populating hashring
1 parent d2757dd commit 440a327

File tree

1 file changed

+8
-5
lines changed

1 file changed

+8
-5
lines changed

main.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,7 @@ func (c *controller) sync(ctx context.Context) {
647647
}
648648

649649
statefulsets := make(map[string][]*appsv1.StatefulSet)
650+
stsReplicas := make(map[string]int32)
650651

651652
for _, obj := range c.ssetInf.GetStore().List() {
652653
sts, ok := obj.(*appsv1.StatefulSet)
@@ -665,6 +666,7 @@ func (c *controller) sync(ctx context.Context) {
665666
if err != nil {
666667
level.Error(c.logger).Log("msg", "failed to get desired replicas for Statefulset", "sts", sts.Name, "err", err)
667668
}
669+
stsReplicas[sts.Name] = desiredReplicas
668670
// If hashring is not initialized, need to wait for all pods ready within statefulset before generating hashring
669671
if !exist && c.options.allowOnlyReadyReplicas {
670672
for i := range desiredReplicas {
@@ -711,7 +713,7 @@ func (c *controller) sync(ctx context.Context) {
711713
return
712714
}
713715

714-
c.populate(ctx, hashrings, statefulsets)
716+
c.populate(ctx, hashrings, statefulsets, stsReplicas)
715717
level.Info(c.logger).Log("msg", "hashring populated", "hashring", fmt.Sprintf("%+v", hashrings))
716718

717719
err = c.saveHashring(ctx, hashrings, cm)
@@ -757,7 +759,7 @@ func (c *controller) waitForPod(ctx context.Context, name string) error {
757759
})
758760
}
759761

760-
func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet) {
762+
func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet, stsReplicas map[string]int32) {
761763
for i, h := range hashrings {
762764
stsList, exists := statefulsets[h.Hashring]
763765
// Sort by sts name for deterministic endpoints order
@@ -772,9 +774,10 @@ func (c *controller) populate(ctx context.Context, hashrings []receive.HashringC
772774
var endpoints []receive.Endpoint
773775

774776
for _, sts := range stsList {
775-
desiredReplicas, err := c.getStsDesiredReplicas(ctx, sts)
776-
if err != nil {
777-
level.Error(c.logger).Log("msg", "failed to get desired replicas for Statefulset", "sts", sts.Name, "err", err)
777+
desiredReplicas, ok := stsReplicas[sts.Name]
778+
if !ok {
779+
level.Error(c.logger).Log("msg", "failed to get desired replicas for Statefulset", "sts", sts.Name)
780+
continue
778781
}
779782
for i := range desiredReplicas {
780783
podName := fmt.Sprintf("%s-%d", sts.Name, i)

0 commit comments

Comments
 (0)