Skip to content

Commit fb6f4c0

Browse files
committed
Refactor queryapi
Signed-off-by: SungJin1212 <[email protected]>
1 parent 0d621ff commit fb6f4c0

File tree

15 files changed

+368
-257
lines changed

15 files changed

+368
-257
lines changed

pkg/api/queryapi/query_api.go

Lines changed: 66 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@ package queryapi
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"net/http"
78
"time"
89

910
"github.com/go-kit/log"
1011
"github.com/go-kit/log/level"
1112
"github.com/grafana/regexp"
13+
jsoniter "github.com/json-iterator/go"
1214
"github.com/munnerz/goautoneg"
1315
"github.com/prometheus/prometheus/promql"
1416
"github.com/prometheus/prometheus/storage"
1517
"github.com/prometheus/prometheus/util/annotations"
1618
"github.com/prometheus/prometheus/util/httputil"
1719
v1 "github.com/prometheus/prometheus/web/api/v1"
18-
"github.com/weaveworks/common/httpgrpc"
1920

20-
"github.com/cortexproject/cortex/pkg/util"
2121
"github.com/cortexproject/cortex/pkg/util/api"
2222
)
2323

@@ -51,53 +51,54 @@ func NewQueryAPI(
5151
}
5252

5353
func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
54-
// TODO(Sungjin1212): Change to emit basic error (not gRPC)
55-
start, err := util.ParseTime(r.FormValue("start"))
54+
start, err := api.ParseTime(r.FormValue("start"))
5655
if err != nil {
5756
return invalidParamError(err, "start")
5857
}
59-
end, err := util.ParseTime(r.FormValue("end"))
58+
end, err := api.ParseTime(r.FormValue("end"))
6059
if err != nil {
6160
return invalidParamError(err, "end")
6261
}
63-
if end < start {
64-
return invalidParamError(ErrEndBeforeStart, "end")
62+
63+
if end.Before(start) {
64+
return invalidParamError(errors.New("end timestamp must not be before start time"), "end")
6565
}
6666

67-
step, err := util.ParseDurationMs(r.FormValue("step"))
67+
step, err := api.ParseDuration(r.FormValue("step"))
6868
if err != nil {
6969
return invalidParamError(err, "step")
7070
}
7171

7272
if step <= 0 {
73-
return invalidParamError(ErrNegativeStep, "step")
73+
return invalidParamError(errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer"), "step")
7474
}
7575

7676
// For safety, limit the number of returned points per timeseries.
7777
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
78-
if (end-start)/step > 11000 {
79-
return apiFuncResult{nil, &apiError{errorBadData, ErrStepTooSmall}, nil, nil}
78+
if end.Sub(start)/step > 11000 {
79+
err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
80+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
8081
}
8182

8283
ctx := r.Context()
8384
if to := r.FormValue("timeout"); to != "" {
8485
var cancel context.CancelFunc
85-
timeout, err := util.ParseDurationMs(to)
86+
timeout, err := api.ParseDuration(to)
8687
if err != nil {
8788
return invalidParamError(err, "timeout")
8889
}
8990

90-
ctx, cancel = context.WithTimeout(ctx, convertMsToDuration(timeout))
91+
ctx, cancel = context.WithTimeout(ctx, timeout)
9192
defer cancel()
9293
}
9394

94-
opts, err := extractQueryOpts(r)
95+
opts, err := api.ExtractQueryOpts(r)
9596
if err != nil {
9697
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
9798
}
98-
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
99+
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), start, end, step)
99100
if err != nil {
100-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
101+
return invalidParamError(err, "query")
101102
}
102103
// From now on, we must only return with a finalizer in the result (to
103104
// be called by the caller) or call qry.Close ourselves (which is
@@ -126,31 +127,30 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
126127
}
127128

128129
func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
129-
// TODO(Sungjin1212): Change to emit basic error (not gRPC)
130-
ts, err := util.ParseTimeParam(r, "time", q.now().Unix())
130+
ts, err := api.ParseTimeParam(r, "time", q.now())
131131
if err != nil {
132132
return invalidParamError(err, "time")
133133
}
134134

135135
ctx := r.Context()
136136
if to := r.FormValue("timeout"); to != "" {
137137
var cancel context.CancelFunc
138-
timeout, err := util.ParseDurationMs(to)
138+
timeout, err := api.ParseDuration(to)
139139
if err != nil {
140140
return invalidParamError(err, "timeout")
141141
}
142142

143-
ctx, cancel = context.WithDeadline(ctx, q.now().Add(convertMsToDuration(timeout)))
143+
ctx, cancel = context.WithDeadline(ctx, q.now().Add(timeout))
144144
defer cancel()
145145
}
146146

147-
opts, err := extractQueryOpts(r)
147+
opts, err := api.ExtractQueryOpts(r)
148148
if err != nil {
149149
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
150150
}
151-
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
151+
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), ts)
152152
if err != nil {
153-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
153+
return invalidParamError(err, "query")
154154
}
155155

156156
// From now on, we must only return with a finalizer in the result (to
@@ -189,7 +189,7 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
189189
}
190190

191191
if result.err != nil {
192-
api.RespondFromGRPCError(q.logger, w, result.err.err)
192+
q.respondError(w, result.err, result.data)
193193
return
194194
}
195195

@@ -201,6 +201,47 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
201201
}
202202
}
203203

204+
func (q *QueryAPI) respondError(w http.ResponseWriter, apiErr *apiError, data interface{}) {
205+
json := jsoniter.ConfigCompatibleWithStandardLibrary
206+
b, err := json.Marshal(&Response{
207+
Status: statusError,
208+
ErrorType: apiErr.typ,
209+
Error: apiErr.err.Error(),
210+
Data: data,
211+
})
212+
if err != nil {
213+
level.Error(q.logger).Log("error marshaling json response", "err", err)
214+
http.Error(w, err.Error(), http.StatusInternalServerError)
215+
return
216+
}
217+
218+
var code int
219+
switch apiErr.typ {
220+
case errorBadData:
221+
code = http.StatusBadRequest
222+
case errorExec:
223+
code = http.StatusUnprocessableEntity
224+
case errorCanceled:
225+
code = statusClientClosedConnection
226+
case errorTimeout:
227+
code = http.StatusServiceUnavailable
228+
case errorInternal:
229+
code = http.StatusInternalServerError
230+
case errorNotFound:
231+
code = http.StatusNotFound
232+
case errorNotAcceptable:
233+
code = http.StatusNotAcceptable
234+
default:
235+
code = http.StatusInternalServerError
236+
}
237+
238+
w.Header().Set("Content-Type", "application/json")
239+
w.WriteHeader(code)
240+
if n, err := w.Write(b); err != nil {
241+
level.Error(q.logger).Log("error writing response", "bytesWritten", n, "err", err)
242+
}
243+
}
244+
204245
func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) {
205246
warn, info := warnings.AsStrings(query, 10, 10)
206247

@@ -213,7 +254,7 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interf
213254

214255
codec, err := q.negotiateCodec(req, resp)
215256
if err != nil {
216-
api.RespondFromGRPCError(q.logger, w, httpgrpc.Errorf(http.StatusNotAcceptable, "%s", &apiError{errorNotAcceptable, err}))
257+
q.respondError(w, &apiError{errorNotAcceptable, err}, nil)
217258
return
218259
}
219260

pkg/api/queryapi/query_api_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,25 +94,25 @@ func Test_CustomAPI(t *testing.T) {
9494
name: "[Range Query] empty start",
9595
path: "/api/v1/query_range?end=1536673680&query=test&step=5",
9696
expectedCode: http.StatusBadRequest,
97-
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"start\\\"; cannot parse \\\"\\\" to a valid timestamp\"}",
97+
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"start\\\": cannot parse \\\"\\\" to a valid timestamp\"}",
9898
},
9999
{
100100
name: "[Range Query] empty end",
101101
path: "/api/v1/query_range?query=test&start=1536673665&step=5",
102102
expectedCode: http.StatusBadRequest,
103-
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\"; cannot parse \\\"\\\" to a valid timestamp\"}",
103+
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\": cannot parse \\\"\\\" to a valid timestamp\"}",
104104
},
105105
{
106106
name: "[Range Query] start is greater than end",
107107
path: "/api/v1/query_range?end=1536673680&query=test&start=1536673681&step=5",
108108
expectedCode: http.StatusBadRequest,
109-
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\"; end timestamp must not be before start time\"}",
109+
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\": end timestamp must not be before start time\"}",
110110
},
111111
{
112112
name: "[Range Query] negative step",
113113
path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=-1",
114114
expectedCode: http.StatusBadRequest,
115-
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"step\\\"; zero or negative query resolution step widths are not accepted. Try a positive integer\"}",
115+
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"step\\\": zero or negative query resolution step widths are not accepted. Try a positive integer\"}",
116116
},
117117
{
118118
name: "[Range Query] returned points are over 11000",
@@ -124,19 +124,19 @@ func Test_CustomAPI(t *testing.T) {
124124
name: "[Range Query] empty query",
125125
path: "/api/v1/query_range?end=1536673680&start=1536673665&step=5",
126126
expectedCode: http.StatusBadRequest,
127-
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\"; unknown position: parse error: no expression found in input\"}",
127+
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\": unknown position: parse error: no expression found in input\"}",
128128
},
129129
{
130130
name: "[Range Query] invalid lookback delta",
131131
path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5&lookback_delta=dummy",
132132
expectedCode: http.StatusBadRequest,
133-
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: rpc error: code = Code(400) desc = cannot parse \\\"dummy\\\" to a valid duration\"}",
133+
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: cannot parse \\\"dummy\\\" to a valid duration\"}",
134134
},
135135
{
136136
name: "[Range Query] invalid timeout delta",
137137
path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5&timeout=dummy",
138138
expectedCode: http.StatusBadRequest,
139-
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\"; cannot parse \\\"dummy\\\" to a valid duration\"}",
139+
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\": cannot parse \\\"dummy\\\" to a valid duration\"}",
140140
},
141141
{
142142
name: "[Range Query] normal case",
@@ -148,19 +148,19 @@ func Test_CustomAPI(t *testing.T) {
148148
name: "[Instant Query] empty query",
149149
path: "/api/v1/query",
150150
expectedCode: http.StatusBadRequest,
151-
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\"; unknown position: parse error: no expression found in input\"}",
151+
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\": unknown position: parse error: no expression found in input\"}",
152152
},
153153
{
154154
name: "[Instant Query] invalid lookback delta",
155155
path: "/api/v1/query?lookback_delta=dummy",
156156
expectedCode: http.StatusBadRequest,
157-
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: rpc error: code = Code(400) desc = cannot parse \\\"dummy\\\" to a valid duration\"}",
157+
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: cannot parse \\\"dummy\\\" to a valid duration\"}",
158158
},
159159
{
160160
name: "[Instant Query] invalid timeout",
161161
path: "/api/v1/query?timeout=dummy",
162162
expectedCode: http.StatusBadRequest,
163-
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\"; cannot parse \\\"dummy\\\" to a valid duration\"}",
163+
expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\": cannot parse \\\"dummy\\\" to a valid duration\"}",
164164
},
165165
{
166166
name: "[Instant Query] normal case",

pkg/api/queryapi/response.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package queryapi
2+
3+
// Response defines the Prometheus response format.
4+
type Response struct {
5+
Status string `json:"status"`
6+
Data interface{} `json:"data,omitempty"`
7+
ErrorType errorType `json:"errorType,omitempty"`
8+
Error string `json:"error,omitempty"`
9+
Warnings []string `json:"warnings,omitempty"`
10+
Infos []string `json:"infos,omitempty"`
11+
}

pkg/api/queryapi/util.go

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,14 @@ import (
55
"errors"
66
"fmt"
77
"net/http"
8-
"time"
98

10-
"github.com/gogo/status"
119
"github.com/prometheus/prometheus/promql"
1210
"github.com/prometheus/prometheus/util/annotations"
13-
"github.com/weaveworks/common/httpgrpc"
14-
15-
"github.com/cortexproject/cortex/pkg/util"
16-
)
17-
18-
var (
19-
ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time")
20-
ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer")
21-
ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
2211
)
2312

24-
func extractQueryOpts(r *http.Request) (promql.QueryOpts, error) {
25-
var duration time.Duration
26-
27-
if strDuration := r.FormValue("lookback_delta"); strDuration != "" {
28-
parsedDuration, err := util.ParseDurationMs(strDuration)
29-
if err != nil {
30-
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error parsing lookback delta duration: %v", err)
31-
}
32-
duration = convertMsToDuration(parsedDuration)
33-
}
34-
35-
return promql.NewPrometheusQueryOpts(r.FormValue("stats") == "all", duration), nil
36-
}
37-
3813
const (
3914
statusSuccess = "success"
15+
statusError = "error"
4016

4117
// Non-standard status code (originally introduced by nginx) for the case when a client closes
4218
// the connection while the server is still processing the request.
@@ -51,6 +27,7 @@ const (
5127
errorExec errorType = "execution"
5228
errorBadData errorType = "bad_data"
5329
errorInternal errorType = "internal"
30+
errorNotFound errorType = "not_found"
5431
errorNotAcceptable errorType = "not_acceptable"
5532
)
5633

@@ -74,18 +51,18 @@ func returnAPIError(err error) *apiError {
7451

7552
switch {
7653
case errors.As(err, &eqc):
77-
return &apiError{errorCanceled, httpgrpc.Errorf(statusClientClosedConnection, "%v", err)}
54+
return &apiError{errorCanceled, err}
7855
case errors.As(err, &eqt):
79-
return &apiError{errorTimeout, httpgrpc.Errorf(http.StatusServiceUnavailable, "%v", err)}
56+
return &apiError{errorTimeout, err}
8057
case errors.As(err, &es):
81-
return &apiError{errorInternal, httpgrpc.Errorf(http.StatusInternalServerError, "%v", err)}
58+
return &apiError{errorInternal, err}
8259
}
8360

8461
if errors.Is(err, context.Canceled) {
85-
return &apiError{errorCanceled, httpgrpc.Errorf(statusClientClosedConnection, "%v", err)}
62+
return &apiError{errorCanceled, err}
8663
}
8764

88-
return &apiError{errorExec, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%v", err)}
65+
return &apiError{errorExec, err}
8966
}
9067

9168
type apiFuncResult struct {
@@ -99,22 +76,6 @@ type apiFunc func(r *http.Request) apiFuncResult
9976

10077
func invalidParamError(err error, parameter string) apiFuncResult {
10178
return apiFuncResult{nil, &apiError{
102-
errorBadData, DecorateWithParamName(err, parameter),
79+
errorBadData, fmt.Errorf("invalid parameter %q: %w", parameter, err),
10380
}, nil, nil}
10481
}
105-
106-
func convertMsToTime(unixMs int64) time.Time {
107-
return time.Unix(0, unixMs*int64(time.Millisecond))
108-
}
109-
110-
func convertMsToDuration(unixMs int64) time.Duration {
111-
return time.Duration(unixMs) * time.Millisecond
112-
}
113-
114-
func DecorateWithParamName(err error, field string) error {
115-
errTmpl := "invalid parameter %q; %v"
116-
if status, ok := status.FromError(err); ok {
117-
return httpgrpc.Errorf(int(status.Code()), errTmpl, field, status.Message())
118-
}
119-
return fmt.Errorf(errTmpl, field, err)
120-
}

pkg/api/queryapi/util_test.go

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)