diff --git a/main.go b/main.go index 65915bd..11ba741 100644 --- a/main.go +++ b/main.go @@ -647,16 +647,19 @@ func (c *controller) sync(ctx context.Context) { } statefulsets := make(map[string][]*appsv1.StatefulSet) + stsReplicas := make(map[string]int32) for _, obj := range c.ssetInf.GetStore().List() { sts, ok := obj.(*appsv1.StatefulSet) if !ok { level.Error(c.logger).Log("msg", "failed type assertion from expected StatefulSet") + continue } hashring, ok := sts.Labels[hashringLabelKey] if !ok { + level.Error(c.logger).Log("msg", "failed to get hashring label from StatefulSet", "sts", sts.Name) continue } @@ -664,7 +667,9 @@ func (c *controller) sync(ctx context.Context) { desiredReplicas, err := c.getStsDesiredReplicas(ctx, sts) if err != nil { level.Error(c.logger).Log("msg", "failed to get desired replicas for Statefulset", "sts", sts.Name, "err", err) + continue } + stsReplicas[sts.Name] = desiredReplicas // If hashring is not initialized, need to wait for all pods ready within statefulset before generating hashring if !exist && c.options.allowOnlyReadyReplicas { for i := range desiredReplicas { @@ -711,7 +716,7 @@ func (c *controller) sync(ctx context.Context) { return } - c.populate(ctx, hashrings, statefulsets) + c.populate(ctx, hashrings, statefulsets, stsReplicas) level.Info(c.logger).Log("msg", "hashring populated", "hashring", fmt.Sprintf("%+v", hashrings)) err = c.saveHashring(ctx, hashrings, cm) @@ -757,7 +762,7 @@ func (c *controller) waitForPod(ctx context.Context, name string) error { }) } -func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet) { +func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet, stsReplicas map[string]int32) { for i, h := range hashrings { stsList, exists := statefulsets[h.Hashring] // Sort by sts name for deterministic endpoints order @@ -772,10 +777,7 @@ func (c *controller) populate(ctx context.Context, hashrings []receive.HashringC var endpoints []receive.Endpoint for _, sts := range stsList { - desiredReplicas, err := c.getStsDesiredReplicas(ctx, sts) - if err != nil { - level.Error(c.logger).Log("msg", "failed to get desired replicas for Statefulset", "sts", sts.Name, "err", err) - } + desiredReplicas := stsReplicas[sts.Name] for i := range desiredReplicas { podName := fmt.Sprintf("%s-%d", sts.Name, i) pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{})