Skip to content

Commit 3475726

Browse files
committed
Add remote write v2 api
Signed-off-by: SungJin1212 <[email protected]>
1 parent 6865ff8 commit 3475726

File tree

6 files changed

+416
-43
lines changed

6 files changed

+416
-43
lines changed

pkg/api/api.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
2424
"github.com/cortexproject/cortex/pkg/compactor"
2525
"github.com/cortexproject/cortex/pkg/cortexpb"
26+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
2627
"github.com/cortexproject/cortex/pkg/distributor"
2728
"github.com/cortexproject/cortex/pkg/distributor/distributorpb"
2829
frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
@@ -45,6 +46,9 @@ import (
4546

4647
// DistributorPushWrapper wraps around a push. It is similar to middleware.Interface.
4748
type DistributorPushWrapper func(next push.Func) push.Func
49+
50+
// DistributorPushWrapperV2 wraps around a push. It is similar to middleware.Interface.
51+
type DistributorPushWrapperV2 func(next push.FuncV2) push.FuncV2
4852
type ConfigHandler func(actualCfg interface{}, defaultCfg interface{}) http.HandlerFunc
4953

5054
type Config struct {
@@ -60,7 +64,8 @@ type Config struct {
6064

6165
// This allows downstream projects to wrap the distributor push function
6266
// and access the deserialized write requests before/after they are pushed.
63-
DistributorPushWrapper DistributorPushWrapper `yaml:"-"`
67+
DistributorPushWrapper DistributorPushWrapper `yaml:"-"`
68+
DistributorPushWrapperV2 DistributorPushWrapperV2 `yaml:"-"`
6469

6570
// The CustomConfigHandler allows for providing a different handler for the
6671
// `/config` endpoint. If this field is set _before_ the API module is
@@ -107,6 +112,15 @@ func (cfg *Config) Validate() error {
107112
return nil
108113
}
109114

115+
// Push either wraps the distributor push function as configured or returns the distributor push directly.
116+
func (cfg *Config) wrapDistributorPushV2(d *distributor.Distributor) push.FuncV2 {
117+
if cfg.DistributorPushWrapperV2 != nil {
118+
return cfg.DistributorPushWrapperV2(d.PushV2)
119+
}
120+
121+
return d.PushV2
122+
}
123+
110124
// Push either wraps the distributor push function as configured or returns the distributor push directly.
111125
func (cfg *Config) wrapDistributorPush(d *distributor.Distributor) push.Func {
112126
if cfg.DistributorPushWrapper != nil {
@@ -277,7 +291,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
277291
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
278292
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
279293

280-
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
294+
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), a.cfg.wrapDistributorPushV2(d)), true, "POST")
281295
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
282296

283297
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
@@ -289,7 +303,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
289303
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")
290304

291305
// Legacy Routes
292-
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
306+
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), a.cfg.wrapDistributorPushV2(d)), true, "POST")
293307
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
294308
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
295309
}
@@ -304,6 +318,7 @@ type Ingester interface {
304318
AllUserStatsHandler(http.ResponseWriter, *http.Request)
305319
ModeHandler(http.ResponseWriter, *http.Request)
306320
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
321+
PushV2(context.Context, *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error)
307322
}
308323

309324
// RegisterIngester registers the ingesters HTTP and GRPC service
@@ -322,12 +337,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
322337
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
323338
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
324339
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
325-
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
340+
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, i.PushV2), true, "POST") // For testing and debugging.
326341

327342
// Legacy Routes
328343
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
329344
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
330-
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
345+
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, i.PushV2), true, "POST") // For testing and debugging.
331346
}
332347

333348
func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {

pkg/ruler/compat.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/weaveworks/common/user"
2323

2424
"github.com/cortexproject/cortex/pkg/cortexpb"
25+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
2526
"github.com/cortexproject/cortex/pkg/querier"
2627
"github.com/cortexproject/cortex/pkg/querier/stats"
2728
"github.com/cortexproject/cortex/pkg/ring/client"
@@ -33,6 +34,7 @@ import (
3334
// Pusher is an ingester server that accepts pushes.
3435
type Pusher interface {
3536
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
37+
PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error)
3638
}
3739

3840
type PusherAppender struct {

pkg/ruler/compat_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,22 @@ import (
2323
"github.com/weaveworks/common/httpgrpc"
2424

2525
"github.com/cortexproject/cortex/pkg/cortexpb"
26+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
2627
"github.com/cortexproject/cortex/pkg/querier/stats"
2728
"github.com/cortexproject/cortex/pkg/util/validation"
2829
)
2930

3031
type fakePusher struct {
31-
request *cortexpb.WriteRequest
32-
response *cortexpb.WriteResponse
33-
err error
32+
request *cortexpb.WriteRequest
33+
requestV2 *cortexpbv2.WriteRequest
34+
response *cortexpb.WriteResponse
35+
responseV2 *cortexpbv2.WriteResponse
36+
err error
37+
}
38+
39+
func (p *fakePusher) PushV2(ctx context.Context, r *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) {
40+
p.requestV2 = r
41+
return p.responseV2, p.err
3442
}
3543

3644
func (p *fakePusher) Push(ctx context.Context, r *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {

pkg/ruler/pusher_mock_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/stretchr/testify/mock"
77

88
"github.com/cortexproject/cortex/pkg/cortexpb"
9+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
910
)
1011

1112
type pusherMock struct {
@@ -16,6 +17,11 @@ func newPusherMock() *pusherMock {
1617
return &pusherMock{}
1718
}
1819

20+
func (m *pusherMock) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) {
21+
args := m.Called(ctx, req)
22+
return args.Get(0).(*cortexpbv2.WriteResponse), args.Error(1)
23+
}
24+
1925
func (m *pusherMock) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
2026
args := m.Called(ctx, req)
2127
return args.Get(0).(*cortexpb.WriteResponse), args.Error(1)

pkg/util/push/push.go

Lines changed: 131 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,44 @@ package push
22

33
import (
44
"context"
5+
"fmt"
56
"net/http"
7+
"strconv"
8+
"strings"
69

710
"github.com/go-kit/log/level"
11+
"github.com/prometheus/prometheus/config"
12+
"github.com/prometheus/prometheus/storage/remote"
813
"github.com/weaveworks/common/httpgrpc"
914
"github.com/weaveworks/common/middleware"
1015

1116
"github.com/cortexproject/cortex/pkg/cortexpb"
17+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
1218
"github.com/cortexproject/cortex/pkg/util"
1319
"github.com/cortexproject/cortex/pkg/util/log"
1420
)
1521

22+
const (
23+
remoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version"
24+
remoteWriteVersion1HeaderValue = "0.1.0"
25+
remoteWriteVersion20HeaderValue = "2.0.0"
26+
appProtoContentType = "application/x-protobuf"
27+
appProtoV1ContentType = "application/x-protobuf;proto=prometheus.WriteRequest"
28+
appProtoV2ContentType = "application/x-protobuf;proto=io.prometheus.write.v2.Request"
29+
30+
rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
31+
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
32+
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"
33+
)
34+
1635
// Func defines the type of the push. It is similar to http.HandlerFunc.
1736
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
1837

38+
// FuncV2 defines the type of the pushV2. It is similar to http.HandlerFunc.
39+
type FuncV2 func(ctx context.Context, request *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error)
40+
1941
// Handler is a http.Handler which accepts WriteRequests.
20-
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
42+
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func, pushV2 FuncV2) http.Handler {
2143
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2244
ctx := r.Context()
2345
logger := log.WithContext(ctx, log.Logger)
@@ -28,31 +50,123 @@ func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push F
2850
logger = log.WithSourceIPs(source, logger)
2951
}
3052
}
31-
var req cortexpb.PreallocWriteRequest
32-
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
53+
54+
contentType := r.Header.Get("Content-Type")
55+
if contentType == "" {
56+
contentType = appProtoContentType
57+
}
58+
59+
msgType, err := parseProtoMsg(contentType)
3360
if err != nil {
34-
level.Error(logger).Log("err", err.Error())
35-
http.Error(w, err.Error(), http.StatusBadRequest)
61+
level.Error(logger).Log("Error decoding remote write request", "err", err)
62+
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
3663
return
3764
}
3865

39-
req.SkipLabelNameValidation = false
40-
if req.Source == 0 {
41-
req.Source = cortexpb.API
66+
if msgType != config.RemoteWriteProtoMsgV1 && msgType != config.RemoteWriteProtoMsgV2 {
67+
level.Error(logger).Log("Error decoding remote write request", "err", err)
68+
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
69+
return
70+
}
71+
72+
enc := r.Header.Get("Content-Encoding")
73+
if enc == "" {
74+
} else if enc != string(remote.SnappyBlockCompression) {
75+
err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, remote.SnappyBlockCompression)
76+
level.Error(logger).Log("Error decoding remote write request", "err", err)
77+
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
78+
return
4279
}
4380

44-
if _, err := push(ctx, &req.WriteRequest); err != nil {
45-
resp, ok := httpgrpc.HTTPResponseFromError(err)
46-
if !ok {
47-
http.Error(w, err.Error(), http.StatusInternalServerError)
81+
switch msgType {
82+
case config.RemoteWriteProtoMsgV1:
83+
var req cortexpb.PreallocWriteRequest
84+
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
85+
if err != nil {
86+
level.Error(logger).Log("err", err.Error())
87+
http.Error(w, err.Error(), http.StatusBadRequest)
88+
return
89+
}
90+
91+
req.SkipLabelNameValidation = false
92+
if req.Source == 0 {
93+
req.Source = cortexpb.API
94+
}
95+
96+
if _, err := push(ctx, &req.WriteRequest); err != nil {
97+
resp, ok := httpgrpc.HTTPResponseFromError(err)
98+
if !ok {
99+
http.Error(w, err.Error(), http.StatusInternalServerError)
100+
return
101+
}
102+
if resp.GetCode()/100 == 5 {
103+
level.Error(logger).Log("msg", "push error", "err", err)
104+
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
105+
level.Warn(logger).Log("msg", "push refused", "err", err)
106+
}
107+
http.Error(w, string(resp.Body), int(resp.Code))
108+
}
109+
case config.RemoteWriteProtoMsgV2:
110+
var req cortexpbv2.WriteRequest
111+
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
112+
if err != nil {
113+
fmt.Println("err", err)
114+
level.Error(logger).Log("err", err.Error())
115+
http.Error(w, err.Error(), http.StatusBadRequest)
48116
return
49117
}
50-
if resp.GetCode()/100 == 5 {
51-
level.Error(logger).Log("msg", "push error", "err", err)
52-
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
53-
level.Warn(logger).Log("msg", "push refused", "err", err)
118+
119+
req.SkipLabelNameValidation = false
120+
if req.Source == 0 {
121+
req.Source = cortexpbv2.API
122+
}
123+
124+
if resp, err := pushV2(ctx, &req); err != nil {
125+
resp, ok := httpgrpc.HTTPResponseFromError(err)
126+
w.Header().Set(rw20WrittenSamplesHeader, "0")
127+
w.Header().Set(rw20WrittenHistogramsHeader, "0")
128+
w.Header().Set(rw20WrittenExemplarsHeader, "0")
129+
if !ok {
130+
http.Error(w, err.Error(), http.StatusInternalServerError)
131+
return
132+
}
133+
if resp.GetCode()/100 == 5 {
134+
level.Error(logger).Log("msg", "push error", "err", err)
135+
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
136+
level.Warn(logger).Log("msg", "push refused", "err", err)
137+
}
138+
http.Error(w, string(resp.Body), int(resp.Code))
139+
} else {
140+
w.Header().Set(rw20WrittenSamplesHeader, strconv.FormatInt(resp.Samples, 10))
141+
w.Header().Set(rw20WrittenHistogramsHeader, strconv.FormatInt(resp.Histograms, 10))
142+
w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(resp.Exemplars, 10))
54143
}
55-
http.Error(w, string(resp.Body), int(resp.Code))
56144
}
57145
})
58146
}
147+
148+
// Refer to parseProtoMsg in https://github.com/prometheus/prometheus/blob/main/storage/remote/write_handler.go
149+
func parseProtoMsg(contentType string) (config.RemoteWriteProtoMsg, error) {
150+
contentType = strings.TrimSpace(contentType)
151+
152+
parts := strings.Split(contentType, ";")
153+
if parts[0] != appProtoContentType {
154+
return "", fmt.Errorf("expected %v as the first (media) part, got %v content-type", appProtoContentType, contentType)
155+
}
156+
// Parse potential https://www.rfc-editor.org/rfc/rfc9110#parameter
157+
for _, p := range parts[1:] {
158+
pair := strings.Split(p, "=")
159+
if len(pair) != 2 {
160+
return "", fmt.Errorf("as per https://www.rfc-editor.org/rfc/rfc9110#parameter expected parameters to be key-values, got %v in %v content-type", p, contentType)
161+
}
162+
if pair[0] == "proto" {
163+
ret := config.RemoteWriteProtoMsg(pair[1])
164+
if err := ret.Validate(); err != nil {
165+
return "", fmt.Errorf("got %v content type; %w", contentType, err)
166+
}
167+
return ret, nil
168+
}
169+
}
170+
// No "proto=" parameter, assuming v1.
171+
return config.RemoteWriteProtoMsgV1, nil
172+
}

0 commit comments

Comments
 (0)