Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions pkg/backends/alb/mech/fr/first_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,19 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// otherwise iterate the fanout
wc := newResponderClaim(l)
var wg sync.WaitGroup
wg.Add(l)
for i := range l {
// only the one of these i fanouts to respond will be mapped back to the
// end user based on the methodology and the rest will have their
// contexts canceled
go func(j int) {
if hl[j] == nil {
wg.Done()
wg.Go(func() {
if hl[i] == nil {
return
}
wm := newFirstResponseGate(w, wc, j, h.fgr)
wm := newFirstResponseGate(w, wc, i, h.fgr)
r2, _ := request.Clone(r)
r2 = r2.WithContext(wc.contexts[j])
hl[j].ServeHTTP(wm, r2)
wg.Done()
}(i)
r2 = r2.WithContext(wc.contexts[i])
hl[i].ServeHTTP(wm, r2)
})
}
wg.Wait()
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/backends/alb/mech/nlm/newest_last_modified.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,19 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// otherwise iterate the fanout
nrm := newNewestResponseMux(l)
var wg sync.WaitGroup
wg.Add(l)
for i := range l {
// only one of these i fanouts to respond will be mapped back to
// the end user based on the methodology and the rest will have their
// contexts canceled
go func(j int) {
if hl[j] == nil {
wg.Go(func() {
if hl[i] == nil {
return
}
nrg := newNewestResponseGate(w, j, nrm)
nrg := newNewestResponseGate(w, i, nrm)
r2, _ := request.Clone(r)
r2 = r2.WithContext(nrm.contexts[j])
hl[j].ServeHTTP(nrg, r2)
wg.Done()
}(i)
r2 = r2.WithContext(nrm.contexts[i])
hl[i].ServeHTTP(nrg, r2)
})
}
wg.Wait()
}
Expand Down
13 changes: 5 additions & 8 deletions pkg/backends/alb/mech/tsm/time_series_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,17 @@ func GetResponseGates(w http.ResponseWriter, r *http.Request,
var wg sync.WaitGroup
l := len(hl)
mgs := make(merge.ResponseGates, l)
wg.Add(l)
for i := range l {
go func(j int) {
if hl[j] == nil {
wg.Done()
wg.Go(func() {
if hl[i] == nil {
return
}
r2, _ := request.Clone(r)
rsc := request.GetResources(r2)
rsc.IsMergeMember = true
mgs[j] = merge.NewResponseGate(w, r2, rsc)
hl[j].ServeHTTP(mgs[j], r2)
wg.Done()
}(i)
mgs[i] = merge.NewResponseGate(w, r2, rsc)
hl[i].ServeHTTP(mgs[i], r2)
})
}
wg.Wait()
return mgs.Compress()
Expand Down
4 changes: 1 addition & 3 deletions pkg/backends/clickhouse/model/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,10 @@ func typeToFieldDataType(input string) timeseries.FieldDataType {
return timeseries.Uint64
case "Float32", "Float64", "Decimal", "Decimal32", "Decimal64", "Decimal128", "Decimal256":
return timeseries.Float64
case "DateTime":
case "DateTime", "DateTime64":
return timeseries.DateTimeSQL
case "Date":
return timeseries.DateSQL
case "DateTime64":
return timeseries.DateTimeSQL
case "Nothing":
return timeseries.Null
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/backends/healthcheck/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,19 @@ func (t *target) Stop() {
}

func (t *target) probeLoop(ctx context.Context) {
t.wg.Add(1)
t.probe(ctx) // perform initial probe
ticker := time.NewTicker(t.interval)
go func() {
t.wg.Go(func() {
for {
select {
case <-ctx.Done():
t.wg.Done()
ticker.Stop()
return // probe complete, stop loop and prevent goroutine leak
case <-ticker.C:
t.probe(ctx)
}
}
}()
})
}

func (t *target) probe(ctx context.Context) {
Expand Down
4 changes: 1 addition & 3 deletions pkg/backends/influxdb/flux/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func loadFieldDef(n, d, g, v string, pos int) timeseries.FieldDefinition {
// typeToFieldDataType is the DataTypeParserFunc passed to the Parser
func typeToFieldDataType(input string) timeseries.FieldDataType {
switch input {
case TypeString:
case TypeString, TypeDuration:
return timeseries.String
case TypeLong:
return timeseries.Int64
Expand All @@ -145,8 +145,6 @@ func typeToFieldDataType(input string) timeseries.FieldDataType {
return timeseries.Float64
case TypeBool:
return timeseries.Bool
case TypeDuration:
return timeseries.String
case TypeRFC3339:
return timeseries.DateTimeRFC3339
case TypeRFC3339Nano, timeColumnName:
Expand Down
18 changes: 7 additions & 11 deletions pkg/backends/influxdb/influxql/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,25 @@ func UnmarshalTimeseriesReader(reader io.Reader, trq *timeseries.TimeRangeQuery)
var sz int64
var wg sync.WaitGroup
errs := make([]error, len(wfd.Results[i].SeriesList[j].Values))
wg.Add(len(wfd.Results[i].SeriesList[j].Values))
for vi, v := range wfd.Results[i].SeriesList[j].Values {
go func(vals []any, idx int) {
pt, cols, err := pointFromValues(vals, sh.TimestampField.OutputPosition)
wg.Go(func() {
pt, cols, err := pointFromValues(v, sh.TimestampField.OutputPosition)
if err != nil {
errs[idx] = err
wg.Done()
errs[vi] = err
return
}
if pt.Epoch == 0 {
wg.Done()
return
}
if idx == 0 {
if vi == 0 {
for x := range cols {
sh.ValueFieldsList[x].DataType = cols[x]
}
}
pts[idx] = pt
pts[vi] = pt
atomic.AddInt64(&sz, int64(pt.Size))
wfd.Results[i].SeriesList[j].Values[idx] = nil
wg.Done()
}(v, vi)
wfd.Results[i].SeriesList[j].Values[vi] = nil
})
}
wg.Wait()
if err := errors.Join(errs...); err != nil {
Expand Down
10 changes: 4 additions & 6 deletions pkg/backends/prometheus/model/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,14 @@ func populateSeries(ds *dataset.DataSet, result []*WFResult,
if !isVector && l > 0 {
pts = make(dataset.Points, l)
var wg sync.WaitGroup
wg.Add(len(pr.Values))
for i, v := range pr.Values {
go func(index int, vals []any) {
pt, _ := pointFromValues(vals)
wg.Go(func() {
pt, _ := pointFromValues(v)
if pt.Epoch > 0 {
atomic.AddInt64(&ps, int64(pt.Size))
pts[index] = pt
pts[i] = pt
}
wg.Done()
}(i, v)
})
}
wg.Wait()
} else if isVector && len(pr.Value) == 2 {
Expand Down
62 changes: 27 additions & 35 deletions pkg/proxy/engines/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,8 @@ func QueryCache(ctx context.Context, c cache.Cache, key string,
// Derive subkey
subkey := getSubKey(key, chunkExtent)
// Query
wg.Add(1)
go func(outIdx int) {
defer wg.Done()
outIdx := resi
wg.Go(func() {
qr := queryConcurrent(ctx, c, subkey)
if qr.lookupStatus != status.LookupStatusHit &&
(qr.err == nil || qr.err == cache.ErrKNF) {
Expand All @@ -177,7 +176,8 @@ func QueryCache(ctx context.Context, c cache.Cache, key string,
if qr.d.timeseries != nil {
ress[outIdx] = qr.d.timeseries
}
}(resi)
})

resi++
}
// Wait on queries
Expand Down Expand Up @@ -214,12 +214,12 @@ func QueryCache(ctx context.Context, c cache.Cache, key string,
// Determine subkey
subkey := key + chunkRange.String()
// Query subdocument
wg.Add(1)
go func(index int) {

index := i
wg.Go(func() {
qr := queryConcurrent(ctx, c, subkey)
cr[index] = qr
wg.Done()
}(i)
})
i++
}
// Wait on queries to finish (result channel is buffered and doesn't hold for receive)
Expand All @@ -237,22 +237,20 @@ func QueryCache(ctx context.Context, c cache.Cache, key string,
// Merge with meta document on success
// We can do this concurrently since chunk ranges don't overlap

wg.Add(1)
go func(qrc *queryResult) {
defer wg.Done()
if qrc.d.IsMeta {
wg.Go(func() {
if qr.d.IsMeta {
return
}
if qrc.lookupStatus == status.LookupStatusHit {
for _, r := range qrc.d.Ranges {
content := qrc.d.Body[r.Start%size : r.End%size+1]
if qr.lookupStatus == status.LookupStatusHit {
for _, r := range qr.d.Ranges {
content := qr.d.Body[r.Start%size : r.End%size+1]
r.Copy(d.Body, content)
if v := atomic.LoadInt64(&dbl); r.End+1 > v {
atomic.CompareAndSwapInt64(&dbl, v, r.End+1)
}
}
}
}(qr)
})
}
wg.Wait()
if len(d.Ranges) > 1 {
Expand Down Expand Up @@ -425,24 +423,21 @@ func WriteCache(ctx context.Context, c cache.Cache, key string, d *HTTPDocument,
}
// Derive subkey
subkey := getSubKey(key, chunkExtent)
// Query
wg.Add(1)
go func(index int) {
// Write
index := i
wg.Go(func() {
cd := d.GetTimeseriesChunk(chunkExtent)
if c.Configuration().Provider != "memory" {
cd.Body, _ = marshal(cd.timeseries, nil, 0)
}
cr[index] = writeConcurrent(ctx, c, subkey, cd, compress, ttl)
wg.Done()
}(i)
})
i++
}
// Store metadocument
wg.Add(1)
go func(index int) {
cr[index] = writeConcurrent(ctx, c, key, meta, compress, ttl)
wg.Done()
}(i)
wg.Go(func() {
cr[i] = writeConcurrent(ctx, c, key, meta, compress, ttl)
})
// Wait on writes to finish (result channel is buffered and doesn't hold for receive)
wg.Wait()
// Handle results
Expand Down Expand Up @@ -474,19 +469,16 @@ func WriteCache(ctx context.Context, c cache.Cache, key string, d *HTTPDocument,
// Get chunk
cd := d.GetByterangeChunk(chunkRange, size)
// Store subdocument
wg.Add(1)
go func(index int) {
index := i
wg.Go(func() {
cr[index] = writeConcurrent(ctx, c, subkey, cd, compress, ttl)
wg.Done()
}(i)
})
i++
}
// Store metadocument
wg.Add(1)
go func(index int) {
cr[index] = writeConcurrent(ctx, c, key, meta, compress, ttl)
wg.Done()
}(i)
wg.Go(func() {
cr[i] = writeConcurrent(ctx, c, key, meta, compress, ttl)
})
// Wait on writes to finish (result channel is buffered and doesn't hold for receive)
wg.Wait()
err = errors.Join(cr...)
Expand Down
20 changes: 9 additions & 11 deletions pkg/proxy/engines/deltaproxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,7 @@ checkCache:
// Only fast forward if configured and the user request is for the absolute latest datapoint
if (!rlo.FastForwardDisable) &&
(trq.Extent.End.Equal(normalizedNow.Extent.End)) {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
_, span := tspan.NewChildSpan(ctx, rsc.Tracer, "FetchFastForward")
if span != nil {
ffReq = ffReq.WithContext(trace.ContextWithSpan(ffReq.Context(), span))
Expand All @@ -324,7 +322,7 @@ checkCache:
} else {
ffStatus = "err"
}
}()
})
}

// while fast forward fetching is occurring in a goroutine, this section will
Expand Down Expand Up @@ -592,11 +590,11 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
mresp := &http.Response{Header: h}

// iterate each time range that the client needs and fetch from the upstream origin
wg.Add(el.Len())
for i := range el {
// This concurrently fetches gaps from the origin and adds their datasets to the merge list
go func(index int, e *timeseries.Extent, rq *proxyRequest) {
defer wg.Done()
wg.Go(func() {
e := &el[i]
rq := pr.Clone()
mrsc := rsc.Clone()
rq.upstreamRequest = rq.upstreamRequest.WithContext(tctx.WithResources(
trace.ContextWithSpan(context.Background(), span),
Expand Down Expand Up @@ -625,7 +623,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
if ferr != nil {
logger.Error("proxy object unmarshaling failed",
logging.Pairs{"detail": ferr.Error()})
errs[index] = ferr
errs[i] = ferr
return
}
uncachedValueCount.Add(nts.ValueCount())
Expand All @@ -634,9 +632,9 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
appendLock.Lock()
headers.Merge(h, resp.Header)
appendLock.Unlock()
mts[index] = nts
mts[i] = nts
} else if resp.StatusCode != http.StatusOK {
errs[index] = tpe.ErrUnexpectedUpstreamResponse
errs[i] = tpe.ErrUnexpectedUpstreamResponse
var b []byte
var s string
if resp.Body != nil {
Expand All @@ -663,7 +661,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
},
)
}
}(i, &el[i], pr.Clone())
})
}
wg.Wait()
return mts, uncachedValueCount.Load(), mresp, errors.Join(errs...)
Expand Down
Loading
Loading