Skip to content

Commit 2d3ea03

Browse files
authored
fix non-atomic replica number read populating hashring (#16)
this causes some ingestion dips when scaling up
2 parents d2757dd + 099b76b commit 2d3ea03

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

main.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -647,24 +647,29 @@ func (c *controller) sync(ctx context.Context) {
647647
}
648648

649649
statefulsets := make(map[string][]*appsv1.StatefulSet)
650+
stsReplicas := make(map[string]int32)
650651

651652
for _, obj := range c.ssetInf.GetStore().List() {
652653
sts, ok := obj.(*appsv1.StatefulSet)
653654

654655
if !ok {
655656
level.Error(c.logger).Log("msg", "failed type assertion from expected StatefulSet")
657+
continue
656658
}
657659

658660
hashring, ok := sts.Labels[hashringLabelKey]
659661
if !ok {
662+
level.Error(c.logger).Log("msg", "failed to get hashring label from StatefulSet", "sts", sts.Name)
660663
continue
661664
}
662665

663666
stsReplica, exist := c.replicas[sts.Name]
664667
desiredReplicas, err := c.getStsDesiredReplicas(ctx, sts)
665668
if err != nil {
666669
level.Error(c.logger).Log("msg", "failed to get desired replicas for Statefulset", "sts", sts.Name, "err", err)
670+
continue
667671
}
672+
stsReplicas[sts.Name] = desiredReplicas
668673
// If hashring is not initialized, need to wait for all pods ready within statefulset before generating hashring
669674
if !exist && c.options.allowOnlyReadyReplicas {
670675
for i := range desiredReplicas {
@@ -711,7 +716,7 @@ func (c *controller) sync(ctx context.Context) {
711716
return
712717
}
713718

714-
c.populate(ctx, hashrings, statefulsets)
719+
c.populate(ctx, hashrings, statefulsets, stsReplicas)
715720
level.Info(c.logger).Log("msg", "hashring populated", "hashring", fmt.Sprintf("%+v", hashrings))
716721

717722
err = c.saveHashring(ctx, hashrings, cm)
@@ -757,7 +762,7 @@ func (c *controller) waitForPod(ctx context.Context, name string) error {
757762
})
758763
}
759764

760-
func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet) {
765+
func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet, stsReplicas map[string]int32) {
761766
for i, h := range hashrings {
762767
stsList, exists := statefulsets[h.Hashring]
763768
// Sort by sts name for deterministic endpoints order
@@ -772,10 +777,7 @@ func (c *controller) populate(ctx context.Context, hashrings []receive.HashringC
772777
var endpoints []receive.Endpoint
773778

774779
for _, sts := range stsList {
775-
desiredReplicas, err := c.getStsDesiredReplicas(ctx, sts)
776-
if err != nil {
777-
level.Error(c.logger).Log("msg", "failed to get desired replicas for Statefulset", "sts", sts.Name, "err", err)
778-
}
780+
desiredReplicas := stsReplicas[sts.Name]
779781
for i := range desiredReplicas {
780782
podName := fmt.Sprintf("%s-%d", sts.Name, i)
781783
pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{})

0 commit comments

Comments
 (0)