Skip to content

Commit cb174a0

Browse files
authored
Merge pull request #2241 from crossplane-contrib/backport-2240-to-release-0.54
[Backport release-0.54] fix(kafkacluster): sort broker endpoints strings for consistent connectionDetails
2 parents 10ab227 + 39c6cd4 commit cb174a0

File tree

2 files changed

+89
-9
lines changed

2 files changed

+89
-9
lines changed

pkg/controller/kafka/cluster/setup.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cluster
1818

1919
import (
2020
"context"
21+
"sort"
2122
"strings"
2223

2324
"github.com/aws/aws-sdk-go/aws"
@@ -159,8 +160,8 @@ func (u *hooks) postObserve(ctx context.Context, cr *svcapitypes.Cluster, obj *s
159160
obs.ConnectionDetails = managed.ConnectionDetails{
160161
// see: https://docs.aws.amazon.com/msk/latest/developerguide/client-access.html
161162
// no endpoint informations available in DescribeClusterOutput only endpoints for zookeeperPlain/Tls
162-
"zookeeperEndpointPlain": []byte(pointer.StringValue(obj.ClusterInfo.ZookeeperConnectString)),
163-
"zookeeperEndpointTls": []byte(pointer.StringValue(obj.ClusterInfo.ZookeeperConnectStringTls)),
163+
"zookeeperEndpointPlain": []byte(sortBootstrapBrokerString(pointer.StringValue(obj.ClusterInfo.ZookeeperConnectString))),
164+
"zookeeperEndpointTls": []byte(sortBootstrapBrokerString(pointer.StringValue(obj.ClusterInfo.ZookeeperConnectStringTls))),
164165
}
165166

166167
switch pointer.StringValue(obj.ClusterInfo.State) {
@@ -174,13 +175,13 @@ func (u *hooks) postObserve(ctx context.Context, cr *svcapitypes.Cluster, obj *s
174175
if err != nil {
175176
return obs, errorutils.Wrap(err, errGetBootstrapBrokers)
176177
}
177-
obs.ConnectionDetails["clusterEndpointPlain"] = []byte(pointer.StringValue(endpoints.BootstrapBrokerString))
178-
obs.ConnectionDetails["clusterEndpointTls"] = []byte(pointer.StringValue(endpoints.BootstrapBrokerStringTls))
179-
obs.ConnectionDetails["clusterEndpointIAM"] = []byte(pointer.StringValue(endpoints.BootstrapBrokerStringSaslIam))
180-
obs.ConnectionDetails["clusterEndpointScram"] = []byte(pointer.StringValue(endpoints.BootstrapBrokerStringSaslScram))
181-
obs.ConnectionDetails["clusterEndpointTlsPublic"] = []byte(pointer.StringValue(endpoints.BootstrapBrokerStringPublicTls))
182-
obs.ConnectionDetails["clusterEndpointIAMPublic"] = []byte(pointer.StringValue(endpoints.BootstrapBrokerStringPublicSaslIam))
183-
obs.ConnectionDetails["clusterEndpointScramPublic"] = []byte(pointer.StringValue(endpoints.BootstrapBrokerStringPublicSaslScram))
178+
obs.ConnectionDetails["clusterEndpointPlain"] = []byte(sortBootstrapBrokerString(pointer.StringValue(endpoints.BootstrapBrokerString)))
179+
obs.ConnectionDetails["clusterEndpointTls"] = []byte(sortBootstrapBrokerString(pointer.StringValue(endpoints.BootstrapBrokerStringTls)))
180+
obs.ConnectionDetails["clusterEndpointIAM"] = []byte(sortBootstrapBrokerString(pointer.StringValue(endpoints.BootstrapBrokerStringSaslIam)))
181+
obs.ConnectionDetails["clusterEndpointScram"] = []byte(sortBootstrapBrokerString(pointer.StringValue(endpoints.BootstrapBrokerStringSaslScram)))
182+
obs.ConnectionDetails["clusterEndpointTlsPublic"] = []byte(sortBootstrapBrokerString(pointer.StringValue(endpoints.BootstrapBrokerStringPublicTls)))
183+
obs.ConnectionDetails["clusterEndpointIAMPublic"] = []byte(sortBootstrapBrokerString(pointer.StringValue(endpoints.BootstrapBrokerStringPublicSaslIam)))
184+
obs.ConnectionDetails["clusterEndpointScramPublic"] = []byte(sortBootstrapBrokerString(pointer.StringValue(endpoints.BootstrapBrokerStringPublicSaslScram)))
184185

185186
case string(svcapitypes.ClusterState_CREATING):
186187
cr.SetConditions(xpv1.Creating())
@@ -1192,3 +1193,17 @@ func generateOpenMonitorinInput(wanted *svcapitypes.OpenMonitoringInfo) *svcsdk.
11921193
return output
11931194

11941195
}
1196+
1197+
// sortBootstrapBrokerString splits the provided string at ",",
1198+
// trims potential spaces, sorts the resulting list of endpoints
1199+
// and returns this a consistent string value with the endpoints joined by "," again.
1200+
func sortBootstrapBrokerString(endpoints string) string {
1201+
endpointList := strings.Split(endpoints, ",")
1202+
sort.Strings(endpointList)
1203+
// for potential edge case, where someday AWS string contains spaces
1204+
for i, endpoint := range endpointList {
1205+
endpointList[i] = strings.TrimSpace(endpoint)
1206+
}
1207+
sort.Strings(endpointList)
1208+
return strings.Join(endpointList, ",")
1209+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package cluster
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func TestSortBootstrapBrokerString(t *testing.T) {
8+
type args struct {
9+
input string
10+
}
11+
12+
type want struct {
13+
result string
14+
}
15+
16+
cases := map[string]struct {
17+
args
18+
want
19+
}{
20+
"EmptyString": {
21+
args: args{
22+
input: "",
23+
},
24+
want: want{
25+
result: "",
26+
},
27+
},
28+
29+
"SingleEndpointString": {
30+
args: args{
31+
input: "b-1.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098",
32+
},
33+
want: want{
34+
result: "b-1.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098",
35+
},
36+
},
37+
38+
"MultipleEndpointsString": {
39+
args: args{
40+
input: "b-3.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098,b-2.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098,b-1.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098",
41+
},
42+
want: want{
43+
result: "b-1.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098,b-2.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098,b-3.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098",
44+
},
45+
},
46+
47+
"WithSpacesEndpointsString": {
48+
args: args{
49+
input: "b-3.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098, b-2.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098, b-1.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098",
50+
},
51+
want: want{
52+
result: "b-1.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098,b-2.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098,b-3.test.0abcde.c6.kafka.eu-central-1.amazonaws.com:9098",
53+
},
54+
},
55+
}
56+
57+
for name, tc := range cases {
58+
t.Run(name, func(t *testing.T) {
59+
out := sortBootstrapBrokerString(tc.input)
60+
if out != tc.result {
61+
t.Errorf("For input '%s', expected '%s', but got '%s'", tc.input, tc.result, out)
62+
}
63+
})
64+
}
65+
}

0 commit comments

Comments
 (0)