Skip to content

Commit d914875

Browse files
committed
Upadte GRPC + Create unit test
Signed-off-by: alanprot <[email protected]>
1 parent 648356c commit d914875

File tree

123 files changed

+7754
-4807
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

123 files changed

+7754
-4807
lines changed

go.mod

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ require (
136136
github.com/docker/go-units v0.5.0 // indirect
137137
github.com/edsrzf/mmap-go v1.2.0 // indirect
138138
github.com/efficientgo/tools/extkingpin v0.0.0-20230505153745-6b7392939a60 // indirect
139-
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
140139
github.com/fatih/color v1.18.0 // indirect
141140
github.com/felixge/httpsnoop v1.0.4 // indirect
142141
github.com/fsnotify/fsnotify v1.9.0 // indirect
@@ -301,9 +300,6 @@ replace github.com/google/gnostic => github.com/googleapis/gnostic v0.6.9
301300
// https://github.com/thanos-io/thanos/blob/fdeea3917591fc363a329cbe23af37c6fff0b5f0/go.mod#L265
302301
replace gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497
303302

304-
// gRPC 1.66 introduced memory pooling which breaks Cortex queries. Pin 1.65.0 until we have a fix.
305-
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
306-
307303
replace github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
308304

309305
replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v0.302.1

go.sum

Lines changed: 66 additions & 1101 deletions
Large diffs are not rendered by default.

integration/grpc_server_test.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
//go:build requires_docker
2+
// +build requires_docker
3+
4+
package integration
5+
6+
import (
7+
"context"
8+
"flag"
9+
"fmt"
10+
"math/rand"
11+
"net"
12+
"strconv"
13+
"sync"
14+
"testing"
15+
"time"
16+
17+
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/stretchr/testify/assert"
19+
"github.com/stretchr/testify/require"
20+
"github.com/weaveworks/common/server"
21+
"google.golang.org/grpc"
22+
"google.golang.org/grpc/metadata"
23+
24+
"github.com/cortexproject/cortex/pkg/cortexpb"
25+
"github.com/cortexproject/cortex/pkg/distributor/distributorpb"
26+
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
27+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
28+
)
29+
30+
type mockGprcServer struct {
31+
ingester_client.IngesterServer
32+
}
33+
34+
func (m mockGprcServer) QueryStream(_ *ingester_client.QueryRequest, streamServer ingester_client.Ingester_QueryStreamServer) error {
35+
md, _ := metadata.FromIncomingContext(streamServer.Context())
36+
i, _ := strconv.Atoi(md["i"][0])
37+
return streamServer.Send(createStreamResponse(i))
38+
}
39+
40+
func (m mockGprcServer) Push(ctx context.Context, request *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
41+
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
42+
md, _ := metadata.FromIncomingContext(ctx)
43+
i, _ := strconv.Atoi(md["i"][0])
44+
expected := createRequest(i)
45+
46+
if expected.String() != request.String() {
47+
return nil, fmt.Errorf("expected %v, got %v", expected, request)
48+
}
49+
return &cortexpb.WriteResponse{}, nil
50+
}
51+
52+
func run(t *testing.T, cfg server.Config, register func(s *grpc.Server), validate func(t *testing.T, con *grpc.ClientConn)) {
53+
savedRegistry := prometheus.DefaultRegisterer
54+
prometheus.DefaultRegisterer = prometheus.NewRegistry()
55+
defer func() {
56+
prometheus.DefaultRegisterer = savedRegistry
57+
}()
58+
59+
grpcPort, closeGrpcPort, err := getLocalHostPort()
60+
require.NoError(t, err)
61+
httpPort, closeHTTPPort, err := getLocalHostPort()
62+
require.NoError(t, err)
63+
64+
err = closeGrpcPort()
65+
require.NoError(t, err)
66+
err = closeHTTPPort()
67+
require.NoError(t, err)
68+
69+
cfg.HTTPListenPort = httpPort
70+
cfg.GRPCListenPort = grpcPort
71+
72+
serv, err := server.New(cfg)
73+
require.NoError(t, err)
74+
register(serv.GRPC)
75+
76+
go func() {
77+
err := serv.Run()
78+
require.NoError(t, err)
79+
}()
80+
81+
defer serv.Shutdown()
82+
83+
grpcHost := fmt.Sprintf("localhost:%d", grpcPort)
84+
85+
clientConfig := grpcclient.Config{}
86+
clientConfig.RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))
87+
88+
dialOptions, err := clientConfig.DialOption(nil, nil)
89+
assert.NoError(t, err)
90+
dialOptions = append([]grpc.DialOption{grpc.WithDefaultCallOptions(clientConfig.CallOptions()...)}, dialOptions...)
91+
92+
conn, err := grpc.NewClient(grpcHost, dialOptions...)
93+
assert.NoError(t, err)
94+
validate(t, conn)
95+
}
96+
97+
func TestConcurrentGrpcCalls(t *testing.T) {
98+
cfg := server.Config{}
99+
(&cfg).RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))
100+
101+
tc := map[string]struct {
102+
cfg server.Config
103+
register func(s *grpc.Server)
104+
validate func(t *testing.T, con *grpc.ClientConn)
105+
}{
106+
"distributor": {
107+
cfg: cfg,
108+
register: func(s *grpc.Server) {
109+
d := &mockGprcServer{}
110+
distributorpb.RegisterDistributorServer(s, d)
111+
},
112+
validate: func(t *testing.T, conn *grpc.ClientConn) {
113+
client := distributorpb.NewDistributorClient(conn)
114+
wg := sync.WaitGroup{}
115+
n := 10000
116+
wg.Add(n)
117+
for i := 0; i < n; i++ {
118+
go func(i int) {
119+
defer wg.Done()
120+
ctx := context.Background()
121+
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
122+
_, err := client.Push(ctx, createRequest(i))
123+
require.NoError(t, err)
124+
}(i)
125+
}
126+
127+
wg.Wait()
128+
},
129+
},
130+
"ingester": {
131+
cfg: cfg,
132+
register: func(s *grpc.Server) {
133+
d := &mockGprcServer{}
134+
ingester_client.RegisterIngesterServer(s, d)
135+
},
136+
validate: func(t *testing.T, conn *grpc.ClientConn) {
137+
client := ingester_client.NewIngesterClient(conn)
138+
wg := sync.WaitGroup{}
139+
n := 10000
140+
wg.Add(n)
141+
for i := 0; i < n; i++ {
142+
go func(i int) {
143+
defer wg.Done()
144+
ctx := context.Background()
145+
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
146+
s, err := client.QueryStream(ctx, &ingester_client.QueryRequest{})
147+
require.NoError(t, err)
148+
resp, err := s.Recv()
149+
require.NoError(t, err)
150+
expected := createStreamResponse(i)
151+
require.Equal(t, expected.String(), resp.String())
152+
}(i)
153+
}
154+
155+
wg.Wait()
156+
},
157+
},
158+
}
159+
160+
for name, c := range tc {
161+
t.Run(name, func(t *testing.T) {
162+
run(t, c.cfg, c.register, c.validate)
163+
})
164+
}
165+
}
166+
167+
func createStreamResponse(i int) *ingester_client.QueryStreamResponse {
168+
return &ingester_client.QueryStreamResponse{Chunkseries: []ingester_client.TimeSeriesChunk{
169+
{
170+
FromIngesterId: strconv.Itoa(i),
171+
Labels: createLabels(i),
172+
Chunks: []ingester_client.Chunk{
173+
{
174+
StartTimestampMs: int64(i),
175+
EndTimestampMs: int64(i),
176+
Encoding: int32(i),
177+
Data: []byte(strconv.Itoa(i)),
178+
},
179+
},
180+
},
181+
}}
182+
}
183+
184+
func createRequest(i int) *cortexpb.WriteRequest {
185+
labels := createLabels(i)
186+
return &cortexpb.WriteRequest{
187+
Timeseries: []cortexpb.PreallocTimeseries{
188+
{
189+
TimeSeries: &cortexpb.TimeSeries{
190+
Labels: labels,
191+
Samples: []cortexpb.Sample{
192+
{TimestampMs: int64(i), Value: float64(i)},
193+
},
194+
Exemplars: []cortexpb.Exemplar{
195+
{
196+
Labels: labels,
197+
Value: float64(i),
198+
TimestampMs: int64(i),
199+
},
200+
},
201+
},
202+
},
203+
},
204+
}
205+
}
206+
207+
func createLabels(i int) []cortexpb.LabelAdapter {
208+
labels := make([]cortexpb.LabelAdapter, 0, 100)
209+
for j := 0; j < 100; j++ {
210+
labels = append(labels, cortexpb.LabelAdapter{
211+
Name: fmt.Sprintf("test%d_%d", i, j),
212+
Value: fmt.Sprintf("test%d_%d", i, j),
213+
})
214+
}
215+
return labels
216+
}
217+
218+
func getLocalHostPort() (int, func() error, error) {
219+
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
220+
if err != nil {
221+
return 0, nil, err
222+
}
223+
224+
l, err := net.ListenTCP("tcp", addr)
225+
if err != nil {
226+
return 0, nil, err
227+
}
228+
229+
closePort := func() error {
230+
return l.Close()
231+
}
232+
return l.Addr().(*net.TCPAddr).Port, closePort, nil
233+
}

pkg/ring/client/pool_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ type mockClient struct {
2121
status grpc_health_v1.HealthCheckResponse_ServingStatus
2222
}
2323

24+
func (i mockClient) List(ctx context.Context, in *grpc_health_v1.HealthListRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthListResponse, error) {
25+
if !i.happy {
26+
return nil, fmt.Errorf("Fail")
27+
}
28+
return &grpc_health_v1.HealthListResponse{}, nil
29+
}
30+
2431
func (i mockClient) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
2532
if !i.happy {
2633
return nil, fmt.Errorf("Fail")

pkg/util/grpcutil/health_check.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,19 @@ func (h *HealthCheck) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequ
3232
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
3333
}
3434

35+
func (h *HealthCheck) List(ctx context.Context, request *grpc_health_v1.HealthListRequest) (*grpc_health_v1.HealthListResponse, error) {
36+
checkResp, err := h.Check(ctx, nil)
37+
if err != nil {
38+
return &grpc_health_v1.HealthListResponse{}, err
39+
}
40+
41+
return &grpc_health_v1.HealthListResponse{
42+
Statuses: map[string]*grpc_health_v1.HealthCheckResponse{
43+
"server": checkResp,
44+
},
45+
}, nil
46+
}
47+
3548
// Watch implements the grpc healthcheck.
3649
func (h *HealthCheck) Watch(_ *grpc_health_v1.HealthCheckRequest, _ grpc_health_v1.Health_WatchServer) error {
3750
return status.Error(codes.Unimplemented, "Watching is not supported")

pkg/util/tls/test/tls_integration_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,19 @@ type grpcHealthCheck struct {
3939
healthy bool
4040
}
4141

42+
func (h *grpcHealthCheck) List(ctx context.Context, request *grpc_health_v1.HealthListRequest) (*grpc_health_v1.HealthListResponse, error) {
43+
checkResp, err := h.Check(ctx, nil)
44+
if err != nil {
45+
return &grpc_health_v1.HealthListResponse{}, err
46+
}
47+
48+
return &grpc_health_v1.HealthListResponse{
49+
Statuses: map[string]*grpc_health_v1.HealthCheckResponse{
50+
"server": checkResp,
51+
},
52+
}, nil
53+
}
54+
4255
func (h *grpcHealthCheck) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
4356
if !h.healthy {
4457
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil

vendor/google.golang.org/grpc/CONTRIBUTING.md

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/google.golang.org/grpc/MAINTAINERS.md

Lines changed: 20 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/google.golang.org/grpc/SECURITY.md

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/google.golang.org/grpc/backoff/backoff.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)