Skip to content

Commit 59a934b

Browse files
authored
Merge pull request #8564 from thanos-io/reworkprunejob
receive/multitsdb: fix pruning job to avoid inconsistent state
2 parents b3b08ef + a7f04c2 commit 59a934b

File tree

13 files changed

+179
-60
lines changed

13 files changed

+179
-60
lines changed

pkg/api/query/v1_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030
"testing"
3131
"time"
3232

33+
"go.uber.org/atomic"
34+
3335
"github.com/efficientgo/core/testutil"
3436
"github.com/go-kit/log"
3537
"github.com/prometheus/client_golang/prometheus"
@@ -669,7 +671,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
669671
func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore {
670672
c := &storetestutil.TestClient{
671673
Name: "1",
672-
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, labels.EmptyLabels())),
674+
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, labels.EmptyLabels()), atomic.Bool{}),
673675
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
674676
}
675677

pkg/query/querier_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"testing"
1717
"time"
1818

19+
"go.uber.org/atomic"
20+
1921
"github.com/efficientgo/core/testutil"
2022
"github.com/go-kit/log"
2123
"github.com/pkg/errors"
@@ -848,7 +850,7 @@ func newProxyStore(storeAPIs ...storepb.StoreServer) *store.ProxyStore {
848850
}
849851
cls[i] = &storetestutil.TestClient{
850852
Name: fmt.Sprintf("%v", i),
851-
StoreClient: storepb.ServerAsClient(s),
853+
StoreClient: storepb.ServerAsClient(s, atomic.Bool{}),
852854
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
853855
WithoutReplicaLabelsEnabled: withoutReplicaLabelsEnabled,
854856
}

pkg/query/query_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/prometheus/prometheus/model/labels"
1717
"github.com/prometheus/prometheus/promql/parser"
1818
"github.com/prometheus/prometheus/storage"
19+
"go.uber.org/atomic"
1920

2021
"github.com/thanos-io/thanos/pkg/component"
2122
"github.com/thanos-io/thanos/pkg/dedup"
@@ -86,7 +87,7 @@ func TestQuerier_Proxy(t *testing.T) {
8687
// TODO(bwplotka): Parse external labels.
8788
sc.append(&storetestutil.TestClient{
8889
Name: fmt.Sprintf("store number %v", i),
89-
StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, labels.EmptyLabels()), m, st.mint, st.maxt)),
90+
StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, labels.EmptyLabels()), m, st.mint, st.maxt), atomic.Bool{}),
9091
MinTime: st.mint,
9192
MaxTime: st.maxt,
9293
})

pkg/receive/multitsdb.go

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package receive
66
import (
77
"context"
88
"fmt"
9+
"maps"
910
"os"
1011
"path"
1112
"path/filepath"
@@ -201,10 +202,10 @@ type localClient struct {
201202
client storepb.StoreClient
202203
}
203204

204-
func newLocalClient(store *store.TSDBStore) *localClient {
205+
func newLocalClient(store *store.TSDBStore, readOnly atomic.Bool) *localClient {
205206
return &localClient{
206207
store: store,
207-
client: storepb.ServerAsClient(store),
208+
client: storepb.ServerAsClient(store, readOnly),
208209
}
209210
}
210211

@@ -268,13 +269,19 @@ func (l *localClient) SupportsWithoutReplicaLabels() bool {
268269
return true
269270
}
270271

272+
func (t *tenant) setReadOnly(ro bool) {
273+
t.readOnly.Store(ro)
274+
}
275+
271276
type tenant struct {
272277
readyS *ReadyStorage
273278
storeTSDB *store.TSDBStore
274279
exemplarsTSDB *exemplars.TSDB
275280
ship *shipper.Shipper
276281
reg *UnRegisterer
277282

283+
readOnly atomic.Bool
284+
278285
mtx *sync.RWMutex
279286
tsdb *tsdb.DB
280287

@@ -338,7 +345,7 @@ func (t *tenant) client() store.Client {
338345
return nil
339346
}
340347

341-
return newLocalClient(tsdbStore)
348+
return newLocalClient(tsdbStore, t.readOnly)
342349
}
343350

344351
func (t *tenant) exemplars() *exemplars.TSDB {
@@ -472,14 +479,21 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
472479

473480
prunedTenants []string
474481
pmtx sync.Mutex
482+
483+
tenants = make(map[string]*tenant)
475484
)
485+
476486
t.mtx.RLock()
477-
for tenantID, tenantInstance := range t.tenants {
487+
maps.Copy(tenants, t.tenants)
488+
t.mtx.RUnlock()
489+
490+
begin := time.Now()
491+
for tenantID, tenantInstance := range tenants {
478492
wg.Add(1)
479493
go func(tenantID string, tenantInstance *tenant) {
480494
defer wg.Done()
481-
tlog := log.With(t.logger, "tenant", tenantID)
482-
pruned, err := t.pruneTSDB(ctx, tlog, tenantInstance)
495+
496+
pruned, err := t.pruneTSDB(ctx, log.With(t.logger, "tenant", tenantID), tenantInstance, tenantID)
483497
if err != nil {
484498
merr.Add(err)
485499
return
@@ -493,50 +507,35 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
493507
}(tenantID, tenantInstance)
494508
}
495509
wg.Wait()
496-
t.mtx.RUnlock()
497510

498-
t.mtx.Lock()
499-
defer t.mtx.Unlock()
500-
for _, tenantID := range prunedTenants {
501-
// Check that the tenant hasn't been reinitialized in-between locks.
502-
if t.tenants[tenantID].readyStorage().get() != nil {
503-
continue
504-
}
505-
506-
level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
507-
t.removeTenantUnlocked(tenantID)
508-
}
511+
level.Info(t.logger).Log("msg", "Pruning job completed", "pruned_tenants_count", len(prunedTenants), "pruned_tenants", prunedTenants, "took_seconds", time.Since(begin).Seconds())
509512

510513
return merr.Err()
511514
}
512515

513516
// pruneTSDB removes a TSDB if its past the retention period.
514517
// It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk.
515-
func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (pruned bool, rerr error) {
518+
func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant, tenantID string) (pruned bool, rerr error) {
516519
tenantTSDB := tenantInstance.readyStorage()
517520
if tenantTSDB == nil {
518521
return false, nil
519522
}
520-
tenantTSDB.mtx.RLock()
521-
if tenantTSDB.a == nil || tenantTSDB.a.db == nil {
522-
tenantTSDB.mtx.RUnlock()
523+
524+
tdb := tenantTSDB.Get()
525+
if tdb == nil {
523526
return false, nil
524527
}
525528

526-
tdb := tenantTSDB.a.db
527529
head := tdb.Head()
528530
if head.MaxTime() < 0 {
529-
tenantTSDB.mtx.RUnlock()
530531
return false, nil
531532
}
532533

533534
sinceLastAppendMillis := time.Since(time.UnixMilli(head.MaxTime())).Milliseconds()
534535
compactThreshold := int64(1.5 * float64(t.tsdbOpts.MaxBlockDuration))
535536
if sinceLastAppendMillis <= compactThreshold {
536-
tenantTSDB.mtx.RUnlock()
537537
return false, nil
538538
}
539-
tenantTSDB.mtx.RUnlock()
540539

541540
// Acquire a write lock and check that no writes have occurred in-between locks.
542541
tenantTSDB.mtx.Lock()
@@ -585,6 +584,15 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
585584
}
586585
}
587586

587+
tenantInstance.setReadOnly(true)
588+
defer func() {
589+
if pruned {
590+
return
591+
}
592+
593+
tenantInstance.setReadOnly(false)
594+
}()
595+
588596
if err := tdb.Close(); err != nil {
589597
return false, err
590598
}
@@ -598,6 +606,10 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
598606
tenantInstance.setComponents(nil, nil, nil, nil, nil)
599607
tenantInstance.mtx.Unlock()
600608

609+
t.mtx.Lock()
610+
t.removeTenantUnlocked(tenantID)
611+
t.mtx.Unlock()
612+
601613
return true, nil
602614
}
603615

pkg/receive/multitsdb_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -959,3 +959,60 @@ func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) {
959959
}, tenant.blocksToDelete(nil))
960960
})
961961
}
962+
963+
func TestMultiTSDBDoesNotReturnPrunedTenants(t *testing.T) {
964+
t.Parallel()
965+
966+
dir := t.TempDir()
967+
968+
m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), &tsdb.Options{
969+
MinBlockDuration: (2 * time.Hour).Milliseconds(),
970+
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
971+
RetentionDuration: (6 * time.Hour).Milliseconds(),
972+
}, labels.FromStrings("replica", "test"), "tenant_id", objstore.NewInMemBucket(), false, false, metadata.NoneFunc)
973+
t.Cleanup(func() {
974+
testutil.Ok(t, m.Close())
975+
})
976+
977+
ctx, cancel := context.WithCancel(context.Background())
978+
t.Cleanup(cancel)
979+
980+
const iterations = 200
981+
982+
wg := sync.WaitGroup{}
983+
wg.Go(func() {
984+
for i := range iterations {
985+
tenant := fmt.Sprintf("pruned-tenant-%d", i)
986+
987+
testutil.Ok(t, appendSample(m, tenant, time.UnixMilli(int64(10))))
988+
989+
testutil.Ok(t, m.Prune(ctx))
990+
}
991+
})
992+
993+
wg.Go(func() {
994+
for range iterations {
995+
clients := m.TSDBLocalClients()
996+
req := &storepb.SeriesRequest{
997+
MinTime: 0,
998+
MaxTime: 10,
999+
Matchers: []storepb.LabelMatcher{{Name: "foo", Value: ".*", Type: storepb.LabelMatcher_RE}},
1000+
}
1001+
1002+
for _, c := range clients {
1003+
sc, err := c.Series(ctx, req)
1004+
testutil.Ok(t, err)
1005+
1006+
for {
1007+
_, err := sc.Recv()
1008+
if err == io.EOF {
1009+
break
1010+
}
1011+
testutil.Ok(t, err)
1012+
}
1013+
}
1014+
}
1015+
})
1016+
1017+
wg.Wait()
1018+
}

pkg/store/acceptance_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/efficientgo/core/testutil"
1818
"github.com/go-kit/log"
1919
"github.com/pkg/errors"
20+
"go.uber.org/atomic"
2021

2122
"github.com/prometheus/common/model"
2223
"github.com/prometheus/prometheus/model/labels"
@@ -1163,7 +1164,7 @@ func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) {
11631164
p1 := startNestedStore(tt, appendFn, extLset1, extLset2, extLset3)
11641165

11651166
clients := []Client{
1166-
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1, extLset2, extLset3}},
1167+
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1, atomic.Bool{}), ExtLset: []labels.Labels{extLset1, extLset2, extLset3}},
11671168
}
11681169

11691170
relabelCfgs := []*relabel.Config{{
@@ -1226,8 +1227,8 @@ func TestProxyStoreWithReplicas_Acceptance(t *testing.T) {
12261227
p2 := startNestedStore(tt, extLset2, appendFn)
12271228

12281229
clients := []Client{
1229-
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1}},
1230-
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{extLset2}},
1230+
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1, atomic.Bool{}), ExtLset: []labels.Labels{extLset1}},
1231+
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2, atomic.Bool{}), ExtLset: []labels.Labels{extLset2}},
12311232
}
12321233

12331234
return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval))
@@ -1261,11 +1262,11 @@ func TestTSDBSelectorFilteringBehavior(t *testing.T) {
12611262

12621263
clients := []Client{
12631264
storetestutil.TestClient{
1264-
StoreClient: storepb.ServerAsClient(store1),
1265+
StoreClient: storepb.ServerAsClient(store1, atomic.Bool{}),
12651266
ExtLset: []labels.Labels{extLset},
12661267
},
12671268
storetestutil.TestClient{
1268-
StoreClient: storepb.ServerAsClient(store2),
1269+
StoreClient: storepb.ServerAsClient(store2, atomic.Bool{}),
12691270
ExtLset: []labels.Labels{droppedLset},
12701271
},
12711272
}

pkg/store/limiter_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/prometheus/client_golang/prometheus/promauto"
1515
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
1616
"github.com/prometheus/prometheus/model/labels"
17+
"go.uber.org/atomic"
1718

1819
"github.com/thanos-io/thanos/pkg/store/storepb"
1920
)
@@ -101,7 +102,7 @@ func TestRateLimitedServer(t *testing.T) {
101102
defer cancel()
102103

103104
store := NewLimitedStoreServer(newStoreServerStub(test.series), prometheus.NewRegistry(), test.limits)
104-
client := storepb.ServerAsClient(store)
105+
client := storepb.ServerAsClient(store, atomic.Bool{})
105106
seriesClient, err := client.Series(ctx, &storepb.SeriesRequest{})
106107
testutil.Ok(t, err)
107108
for {

pkg/store/proxy_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/prometheus/prometheus/model/timestamp"
2626
"github.com/prometheus/prometheus/tsdb"
2727
"github.com/prometheus/prometheus/tsdb/chunkenc"
28+
"go.uber.org/atomic"
2829
"google.golang.org/grpc"
2930

3031
"github.com/thanos-io/thanos/pkg/block"
@@ -679,7 +680,7 @@ func TestProxyStore_Series(t *testing.T) {
679680
},
680681
},
681682
}
682-
}, component.Store, labels.FromStrings("role", "proxy"), 1*time.Minute, EagerRetrieval)),
683+
}, component.Store, labels.FromStrings("role", "proxy"), 1*time.Minute, EagerRetrieval), atomic.Bool{}),
683684
},
684685
&storetestutil.TestClient{
685686
MinTime: 1,
@@ -2345,7 +2346,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
23452346
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
23462347
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
23472348
},
2348-
}),
2349+
}, atomic.Bool{}),
23492350
MinTime: math.MinInt64,
23502351
MaxTime: math.MaxInt64,
23512352
},

pkg/store/recover_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/efficientgo/core/testutil"
1111
"github.com/go-kit/log"
12+
"go.uber.org/atomic"
1213

1314
"github.com/thanos-io/thanos/pkg/store/storepb"
1415
)
@@ -20,7 +21,7 @@ func TestRecoverableServer(t *testing.T) {
2021
store := NewRecoverableStoreServer(logger, &panicStoreServer{})
2122

2223
ctx := t.Context()
23-
client := storepb.ServerAsClient(store)
24+
client := storepb.ServerAsClient(store, atomic.Bool{})
2425
seriesClient, err := client.Series(ctx, &storepb.SeriesRequest{})
2526
testutil.Ok(t, err)
2627

0 commit comments

Comments
 (0)