Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,24 +647,29 @@ func (c *controller) sync(ctx context.Context) {
}

statefulsets := make(map[string][]*appsv1.StatefulSet)
stsReplicas := make(map[string]int32)

for _, obj := range c.ssetInf.GetStore().List() {
sts, ok := obj.(*appsv1.StatefulSet)

if !ok {
level.Error(c.logger).Log("msg", "failed type assertion from expected StatefulSet")
continue
}

hashring, ok := sts.Labels[hashringLabelKey]
if !ok {
level.Error(c.logger).Log("msg", "failed to get hashring label from StatefulSet", "sts", sts.Name)
continue
}

stsReplica, exist := c.replicas[sts.Name]
desiredReplicas, err := c.getStsDesiredReplicas(ctx, sts)
if err != nil {
level.Error(c.logger).Log("msg", "failed to get desired replicas for Statefulset", "sts", sts.Name, "err", err)
continue
}
stsReplicas[sts.Name] = desiredReplicas
// If hashring is not initialized, need to wait for all pods ready within statefulset before generating hashring
if !exist && c.options.allowOnlyReadyReplicas {
for i := range desiredReplicas {
Expand Down Expand Up @@ -711,7 +716,7 @@ func (c *controller) sync(ctx context.Context) {
return
}

c.populate(ctx, hashrings, statefulsets)
c.populate(ctx, hashrings, statefulsets, stsReplicas)
level.Info(c.logger).Log("msg", "hashring populated", "hashring", fmt.Sprintf("%+v", hashrings))

err = c.saveHashring(ctx, hashrings, cm)
Expand Down Expand Up @@ -757,7 +762,7 @@ func (c *controller) waitForPod(ctx context.Context, name string) error {
})
}

func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet) {
func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet, stsReplicas map[string]int32) {
for i, h := range hashrings {
stsList, exists := statefulsets[h.Hashring]
// Sort by sts name for deterministic endpoints order
Expand All @@ -772,10 +777,7 @@ func (c *controller) populate(ctx context.Context, hashrings []receive.HashringC
var endpoints []receive.Endpoint

for _, sts := range stsList {
desiredReplicas, err := c.getStsDesiredReplicas(ctx, sts)
if err != nil {
level.Error(c.logger).Log("msg", "failed to get desired replicas for Statefulset", "sts", sts.Name, "err", err)
}
desiredReplicas := stsReplicas[sts.Name]
for i := range desiredReplicas {
podName := fmt.Sprintf("%s-%d", sts.Name, i)
pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{})
Expand Down