@@ -5,6 +5,7 @@ package compact
55
66import (
77 "context"
8+ "sync"
89 "time"
910
1011 "github.com/go-kit/log"
@@ -15,6 +16,8 @@ import (
1516 "github.com/thanos-io/objstore"
1617
1718 "github.com/thanos-io/thanos/pkg/block"
19+ "github.com/thanos-io/thanos/pkg/block/metadata"
20+ "github.com/thanos-io/thanos/pkg/errutil"
1821)
1922
2023// BlocksCleaner is a struct that deletes blocks from bucket which are marked for deletion.
@@ -42,23 +45,55 @@ func NewBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMark
4245// DeleteMarkedBlocks uses ignoreDeletionMarkFilter to gather the blocks that are marked for deletion and deletes those
4346// if older than given deleteDelay.
4447func (s * BlocksCleaner ) DeleteMarkedBlocks (ctx context.Context ) (map [ulid.ULID ]struct {}, error ) {
48+ const conc = 32
49+
4550 level .Info (s .logger ).Log ("msg" , "started cleaning of blocks marked for deletion" )
4651
47- deletedBlocks := make (map [ulid.ULID ]struct {}, 0 )
52+ var (
53+ merr errutil.SyncMultiError
54+ deletedBlocksMtx sync.Mutex
55+ deletedBlocks = make (map [ulid.ULID ]struct {}, 0 )
56+ deletionMarkMap = s .ignoreDeletionMarkFilter .DeletionMarkBlocks ()
57+ wg sync.WaitGroup
58+ dm = make (chan * metadata.DeletionMark , conc )
59+ )
4860
49- deletionMarkMap := s .ignoreDeletionMarkFilter .DeletionMarkBlocks ()
50- for _ , deletionMark := range deletionMarkMap {
51- if time .Since (time .Unix (deletionMark .DeletionTime , 0 )).Seconds () > s .deleteDelay .Seconds () {
52- if err := block .Delete (ctx , s .logger , s .bkt , deletionMark .ID ); err != nil {
53- s .blockCleanupFailures .Inc ()
54- return deletedBlocks , errors .Wrap (err , "delete block" )
61+ wg .Add (conc )
62+ for range conc {
63+ go func () {
64+ defer wg .Done ()
65+
66+ for deletionMark := range dm {
67+ if ctx .Err () != nil {
68+ return
69+ }
70+ if time .Since (time .Unix (deletionMark .DeletionTime , 0 )).Seconds () > s .deleteDelay .Seconds () {
71+ if err := block .Delete (ctx , s .logger , s .bkt , deletionMark .ID ); err != nil {
72+ s .blockCleanupFailures .Inc ()
73+ merr .Add (errors .Wrap (err , "delete block" ))
74+ continue
75+ }
76+ s .blocksCleaned .Inc ()
77+ level .Info (s .logger ).Log ("msg" , "deleted block marked for deletion" , "block" , deletionMark .ID )
78+
79+ deletedBlocksMtx .Lock ()
80+ deletedBlocks [deletionMark .ID ] = struct {}{}
81+ deletedBlocksMtx .Unlock ()
82+ }
5583 }
56- s .blocksCleaned .Inc ()
57- level .Info (s .logger ).Log ("msg" , "deleted block marked for deletion" , "block" , deletionMark .ID )
58- deletedBlocks [deletionMark .ID ] = struct {}{}
59- }
84+ }()
85+ }
86+
87+ for _ , deletionMark := range deletionMarkMap {
88+ dm <- deletionMark
89+ }
90+ close (dm )
91+ wg .Wait ()
92+
93+ if ctx .Err () != nil {
94+ return deletedBlocks , ctx .Err ()
6095 }
6196
6297 level .Info (s .logger ).Log ("msg" , "cleaning of blocks marked for deletion done" )
63- return deletedBlocks , nil
98+ return deletedBlocks , merr . Err ()
6499}
0 commit comments