@@ -5,6 +5,7 @@ package compact
55
66import (
77 "context"
8+ "sync"
89 "time"
910
1011 "github.com/go-kit/log"
@@ -15,6 +16,8 @@ import (
1516 "github.com/thanos-io/objstore"
1617
1718 "github.com/thanos-io/thanos/pkg/block"
19+ "github.com/thanos-io/thanos/pkg/block/metadata"
20+ "github.com/thanos-io/thanos/pkg/errutil"
1821)
1922
2023// BlocksCleaner is a struct that deletes blocks from bucket which are marked for deletion.
@@ -42,23 +45,52 @@ func NewBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMark
4245// DeleteMarkedBlocks uses ignoreDeletionMarkFilter to gather the blocks that are marked for deletion and deletes those
4346// if older than given deleteDelay.
4447func (s * BlocksCleaner ) DeleteMarkedBlocks (ctx context.Context ) (map [ulid.ULID ]struct {}, error ) {
48+ const conc = 32
49+
4550 level .Info (s .logger ).Log ("msg" , "started cleaning of blocks marked for deletion" )
4651
47- deletedBlocks := make (map [ulid.ULID ]struct {}, 0 )
52+ var (
53+ merr errutil.SyncMultiError
54+ deletedBlocksMtx sync.Mutex
55+ deletedBlocks = make (map [ulid.ULID ]struct {}, 0 )
56+ deletionMarkMap = s .ignoreDeletionMarkFilter .DeletionMarkBlocks ()
57+ wg sync.WaitGroup
58+ dm = make (chan * metadata.DeletionMark , conc )
59+ )
4860
49- deletionMarkMap := s .ignoreDeletionMarkFilter .DeletionMarkBlocks ()
50- for _ , deletionMark := range deletionMarkMap {
51- if time .Since (time .Unix (deletionMark .DeletionTime , 0 )).Seconds () > s .deleteDelay .Seconds () {
52- if err := block .Delete (ctx , s .logger , s .bkt , deletionMark .ID ); err != nil {
53- s .blockCleanupFailures .Inc ()
54- return deletedBlocks , errors .Wrap (err , "delete block" )
61+ for range conc {
62+ wg .Go (func () {
63+ for deletionMark := range dm {
64+ if ctx .Err () != nil {
65+ return
66+ }
67+ if time .Since (time .Unix (deletionMark .DeletionTime , 0 )).Seconds () > s .deleteDelay .Seconds () {
68+ if err := block .Delete (ctx , s .logger , s .bkt , deletionMark .ID ); err != nil {
69+ s .blockCleanupFailures .Inc ()
70+ merr .Add (errors .Wrap (err , "delete block" ))
71+ continue
72+ }
73+ s .blocksCleaned .Inc ()
74+ level .Info (s .logger ).Log ("msg" , "deleted block marked for deletion" , "block" , deletionMark .ID )
75+
76+ deletedBlocksMtx .Lock ()
77+ deletedBlocks [deletionMark .ID ] = struct {}{}
78+ deletedBlocksMtx .Unlock ()
79+ }
5580 }
56- s .blocksCleaned .Inc ()
57- level .Info (s .logger ).Log ("msg" , "deleted block marked for deletion" , "block" , deletionMark .ID )
58- deletedBlocks [deletionMark .ID ] = struct {}{}
59- }
81+ })
82+ }
83+
84+ for _ , deletionMark := range deletionMarkMap {
85+ dm <- deletionMark
86+ }
87+ close (dm )
88+ wg .Wait ()
89+
90+ if ctx .Err () != nil {
91+ return deletedBlocks , ctx .Err ()
6092 }
6193
6294 level .Info (s .logger ).Log ("msg" , "cleaning of blocks marked for deletion done" )
63- return deletedBlocks , nil
95+ return deletedBlocks , merr . Err ()
6496}
0 commit comments