Skip to content

Commit b97b25d

Browse files
committed
compact: add some concurrency around deletion
Deleting one by one is REALLY slow if you have thousands of blocks to delete. Use some concurrency by default. Signed-off-by: Giedrius Statkevičius <[email protected]>
1 parent 7488f83 commit b97b25d

File tree

1 file changed

+46
-12
lines changed

1 file changed

+46
-12
lines changed

pkg/compact/blocks_cleaner.go

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package compact
55

66
import (
77
"context"
8+
"sync"
89
"time"
910

1011
"github.com/go-kit/log"
@@ -15,6 +16,8 @@ import (
1516
"github.com/thanos-io/objstore"
1617

1718
"github.com/thanos-io/thanos/pkg/block"
19+
"github.com/thanos-io/thanos/pkg/block/metadata"
20+
"github.com/thanos-io/thanos/pkg/errutil"
1821
)
1922

2023
// BlocksCleaner is a struct that deletes blocks from bucket which are marked for deletion.
@@ -42,23 +45,54 @@ func NewBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMark
4245
// DeleteMarkedBlocks uses ignoreDeletionMarkFilter to gather the blocks that are marked for deletion and deletes those
4346
// if older than given deleteDelay.
4447
func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) (map[ulid.ULID]struct{}, error) {
48+
const conc = 32
49+
4550
level.Info(s.logger).Log("msg", "started cleaning of blocks marked for deletion")
4651

47-
deletedBlocks := make(map[ulid.ULID]struct{}, 0)
52+
var (
53+
merr errutil.SyncMultiError
54+
deletedBlocksMtx sync.Mutex
55+
deletedBlocks = make(map[ulid.ULID]struct{}, 0)
56+
deletionMarkMap = s.ignoreDeletionMarkFilter.DeletionMarkBlocks()
57+
wg sync.WaitGroup
58+
dm = make(chan *metadata.DeletionMark, conc)
59+
)
4860

49-
deletionMarkMap := s.ignoreDeletionMarkFilter.DeletionMarkBlocks()
50-
for _, deletionMark := range deletionMarkMap {
51-
if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > s.deleteDelay.Seconds() {
52-
if err := block.Delete(ctx, s.logger, s.bkt, deletionMark.ID); err != nil {
53-
s.blockCleanupFailures.Inc()
54-
return deletedBlocks, errors.Wrap(err, "delete block")
61+
for range conc {
62+
wg.Go(func() {
63+
defer wg.Done()
64+
65+
for deletionMark := range dm {
66+
if ctx.Err() != nil {
67+
return
68+
}
69+
if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > s.deleteDelay.Seconds() {
70+
if err := block.Delete(ctx, s.logger, s.bkt, deletionMark.ID); err != nil {
71+
s.blockCleanupFailures.Inc()
72+
merr.Add(errors.Wrap(err, "delete block"))
73+
continue
74+
}
75+
s.blocksCleaned.Inc()
76+
level.Info(s.logger).Log("msg", "deleted block marked for deletion", "block", deletionMark.ID)
77+
78+
deletedBlocksMtx.Lock()
79+
deletedBlocks[deletionMark.ID] = struct{}{}
80+
deletedBlocksMtx.Unlock()
81+
}
5582
}
56-
s.blocksCleaned.Inc()
57-
level.Info(s.logger).Log("msg", "deleted block marked for deletion", "block", deletionMark.ID)
58-
deletedBlocks[deletionMark.ID] = struct{}{}
59-
}
83+
})
84+
}
85+
86+
for _, deletionMark := range deletionMarkMap {
87+
dm <- deletionMark
88+
}
89+
close(dm)
90+
wg.Wait()
91+
92+
if ctx.Err() != nil {
93+
return deletedBlocks, ctx.Err()
6094
}
6195

6296
level.Info(s.logger).Log("msg", "cleaning of blocks marked for deletion done")
63-
return deletedBlocks, nil
97+
return deletedBlocks, merr.Err()
6498
}

0 commit comments

Comments
 (0)