Skip to content

Commit 49b41da

Browse files
committed
add aligned ketama hashring algorithm
add aligned ketama hashring algorithm add unit test add sanity check, remove print add panic check fix check Revert "add panic check" This reverts commit 7e36958. polish code fix lint clean up code clean up code clean up code resolve comments
1 parent adaad9f commit 49b41da

File tree

6 files changed

+660
-3
lines changed

6 files changed

+660
-3
lines changed

cmd/thanos/receive.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1049,7 +1049,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
10491049
hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ")
10501050
cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext+". Will be overwritten by the tenant-specific algorithm in the hashring config.").
10511051
Default(string(receive.AlgorithmHashmod)).
1052-
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama))
1052+
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmAlignedKetama))
10531053

10541054
rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
10551055
Default("5m"))

pkg/receive/aligned_hashring.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package receive
5+
6+
import (
7+
"fmt"
8+
"sort"
9+
"strconv"
10+
11+
"github.com/cespare/xxhash"
12+
"github.com/pkg/errors"
13+
14+
"github.com/thanos-io/thanos/pkg/strutil"
15+
)
16+
17+
// groupByAZ groups endpoints by Availability Zone and sorts them by their inferred ordinal in a k8s statefulset.
18+
// It returns a 2D slice where each inner slice represents an AZ (sorted alphabetically)
19+
// and contains endpoints sorted by ordinal. All inner slices are truncated to the
20+
// length of the largest common sequence of ordinals starting from 0 across all AZs.
21+
// All endpoint addresses must be valid k8s DNS names with 0-index ordinals at the end of the pod name.
22+
func groupByAZ(endpoints []Endpoint) ([][]Endpoint, error) {
23+
if len(endpoints) == 0 {
24+
return nil, errors.New("no endpoints provided")
25+
}
26+
27+
// Group endpoints by AZ and then by ordinal.
28+
azEndpoints := make(map[string]map[int]Endpoint)
29+
for _, ep := range endpoints {
30+
ordinal, err := strutil.ExtractPodOrdinal(ep.Address)
31+
if err != nil {
32+
return nil, errors.Wrapf(err, "failed to extract ordinal from address %s", ep.Address)
33+
}
34+
if _, ok := azEndpoints[ep.AZ]; !ok {
35+
azEndpoints[ep.AZ] = make(map[int]Endpoint)
36+
}
37+
if _, exists := azEndpoints[ep.AZ][ordinal]; exists {
38+
return nil, fmt.Errorf("duplicate endpoint ordinal %d for address %s in AZ %s", ordinal, ep.Address, ep.AZ)
39+
}
40+
azEndpoints[ep.AZ][ordinal] = ep
41+
}
42+
43+
// Get sorted list of AZ names.
44+
sortedAZs := make([]string, 0, len(azEndpoints))
45+
for az := range azEndpoints {
46+
sortedAZs = append(sortedAZs, az)
47+
}
48+
sort.Strings(sortedAZs)
49+
50+
// Determine the maximum common ordinal across all AZs.
51+
maxCommonOrdinal := -1
52+
for i := 0; ; i++ {
53+
presentInAllAZs := true
54+
for _, az := range sortedAZs {
55+
if _, ok := azEndpoints[az][i]; !ok {
56+
presentInAllAZs = false
57+
if i == 0 {
58+
return nil, fmt.Errorf("AZ %q is missing endpoint with ordinal 0", az)
59+
}
60+
break
61+
}
62+
}
63+
if !presentInAllAZs {
64+
maxCommonOrdinal = i - 1
65+
break
66+
}
67+
}
68+
if maxCommonOrdinal < 0 {
69+
return nil, errors.New("no common endpoints with ordinal 0 found across all AZs")
70+
}
71+
numAZs := len(sortedAZs)
72+
result := make([][]Endpoint, numAZs)
73+
for i, az := range sortedAZs {
74+
result[i] = make([]Endpoint, 0, maxCommonOrdinal+1)
75+
for j := 0; j <= maxCommonOrdinal; j++ {
76+
result[i] = append(result[i], azEndpoints[az][j])
77+
}
78+
}
79+
return result, nil
80+
}
81+
82+
// newAlignedKetamaHashring creates a Ketama hash ring where replicas are strictly aligned across Availability Zones.
83+
// Each section on the hash ring corresponds to a primary endpoint (taken from the first AZ) and its
84+
// aligned replicas in other AZs (endpoints with the same ordinal). The hash for a section is calculated
85+
// based *only* on the primary endpoint's address.
86+
func newAlignedKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) {
87+
if replicationFactor == 0 {
88+
return nil, errors.New("replication factor cannot be zero")
89+
}
90+
if sectionsPerNode <= 0 {
91+
return nil, errors.New("sections per node must be positive")
92+
}
93+
groupedEndpoints, err := groupByAZ(endpoints)
94+
if err != nil {
95+
return nil, errors.Wrap(err, "failed to group endpoints by AZ")
96+
}
97+
numAZs := len(groupedEndpoints)
98+
if numAZs == 0 {
99+
return nil, errors.New("no endpoint groups found after grouping by AZ")
100+
}
101+
if uint64(numAZs) != replicationFactor {
102+
return nil, fmt.Errorf("number of AZs (%d) must equal replication factor (%d)", numAZs, replicationFactor)
103+
}
104+
numEndpointsPerAZ := len(groupedEndpoints[0])
105+
if numEndpointsPerAZ == 0 {
106+
return nil, errors.New("AZ groups are empty after grouping")
107+
}
108+
totalEndpoints := numAZs * numEndpointsPerAZ
109+
flatEndpoints := make([]Endpoint, 0, totalEndpoints)
110+
for azIndex := 0; azIndex < numAZs; azIndex++ {
111+
flatEndpoints = append(flatEndpoints, groupedEndpoints[azIndex]...)
112+
}
113+
hasher := xxhash.New()
114+
ringSections := make(sections, 0, numEndpointsPerAZ*sectionsPerNode)
115+
116+
// Iterate through primary endpoints (those in the first AZ) to define sections.
117+
for primaryOrdinalIndex := 0; primaryOrdinalIndex < numEndpointsPerAZ; primaryOrdinalIndex++ {
118+
primaryEndpoint := groupedEndpoints[0][primaryOrdinalIndex]
119+
for sectionIndex := 1; sectionIndex <= sectionsPerNode; sectionIndex++ {
120+
hasher.Reset()
121+
_, _ = hasher.Write([]byte(primaryEndpoint.Address + ":" + strconv.Itoa(sectionIndex)))
122+
sectionHash := hasher.Sum64()
123+
sec := &section{
124+
hash: sectionHash,
125+
az: primaryEndpoint.AZ,
126+
endpointIndex: uint64(primaryOrdinalIndex),
127+
replicas: make([]uint64, 0, replicationFactor),
128+
}
129+
130+
// Find indices of all replicas (including primary) in the flat list and verify alignment.
131+
for azIndex := 0; azIndex < numAZs; azIndex++ {
132+
replicaFlatIndex := azIndex*numEndpointsPerAZ + primaryOrdinalIndex
133+
replicaEndpoint := flatEndpoints[replicaFlatIndex]
134+
replicaOrdinal, err := strutil.ExtractPodOrdinal(replicaEndpoint.Address)
135+
if err != nil {
136+
return nil, errors.Wrapf(err, "failed to extract ordinal from replica endpoint %s in AZ %s", replicaEndpoint.Address, replicaEndpoint.AZ)
137+
}
138+
if replicaOrdinal != primaryOrdinalIndex {
139+
return nil, fmt.Errorf("ordinal mismatch for primary endpoint %s (ordinal %d): replica %s in AZ %s has ordinal %d",
140+
primaryEndpoint.Address, primaryOrdinalIndex, replicaEndpoint.Address, replicaEndpoint.AZ, replicaOrdinal)
141+
}
142+
sec.replicas = append(sec.replicas, uint64(replicaFlatIndex))
143+
}
144+
ringSections = append(ringSections, sec)
145+
}
146+
}
147+
sort.Sort(ringSections)
148+
return &ketamaHashring{
149+
endpoints: flatEndpoints,
150+
sections: ringSections,
151+
numEndpoints: uint64(totalEndpoints),
152+
}, nil
153+
}

0 commit comments

Comments
 (0)