Skip to content

Commit e2264ec

Browse files
authored
Graveler parallel delete using workpool (#9716)
* Graveler parallel delete using workpool - Refactored graveler to use a workpool for parallel deletes - Refactor how graveler is created using a config struct - Reuse the catalog workpool for graveler * Add delete batch test to controller tests
1 parent cfd6894 commit e2264ec

File tree

6 files changed

+123
-42
lines changed

6 files changed

+123
-42
lines changed

pkg/api/controller_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3536,6 +3536,41 @@ func TestController_ObjectsDeleteObjectHandler(t *testing.T) {
35363536
t.Fatalf("expected file to be gone now")
35373537
}
35383538
})
3539+
3540+
t.Run("delete entries batch", func(t *testing.T) {
3541+
repo := testUniqueRepoName()
3542+
const branch = "main"
3543+
_, err := deps.catalog.CreateRepository(ctx, repo, config.SingleBlockstoreID, onBlock(deps, "batch-delete-bucket"), branch, false)
3544+
testutil.Must(t, err)
3545+
3546+
// Create multiple entries
3547+
paths := []string{"batch/file1", "batch/file2", "batch/file3"}
3548+
for _, path := range paths {
3549+
testutil.Must(t, deps.catalog.CreateEntry(ctx, repo, branch, catalog.DBEntry{
3550+
Path: path,
3551+
PhysicalAddress: onBlock(deps, path),
3552+
CreationDate: time.Now(),
3553+
Size: 100,
3554+
Checksum: "checksum",
3555+
}))
3556+
}
3557+
3558+
// Commit the entries
3559+
_, err = deps.catalog.Commit(ctx, repo, branch, "add batch files", "testuser", nil, nil, nil, false)
3560+
testutil.Must(t, err)
3561+
3562+
// Delete entries using DeleteEntries (which calls DeleteBatch)
3563+
err = deps.catalog.DeleteEntries(ctx, repo, branch, paths)
3564+
testutil.Must(t, err)
3565+
3566+
// Verify entries are deleted
3567+
for _, path := range paths {
3568+
_, err := deps.catalog.GetEntry(ctx, repo, branch, path, catalog.GetEntryParams{})
3569+
if !errors.Is(err, graveler.ErrNotFound) {
3570+
t.Fatalf("expected entry %s to be deleted, but got error: %v", path, err)
3571+
}
3572+
}
3573+
})
35393574
}
35403575

35413576
func TestController_CreatePolicyHandler(t *testing.T) {

pkg/catalog/catalog.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,6 @@ const (
261261
ListPullsLimitMax = 1000
262262
ListEntriesLimitMax = 10000
263263
sharedWorkers = 30
264-
pendingTasksPerWorker = 3
265-
workersMaxDrainDuration = 5 * time.Second
266264
)
267265

268266
type ImportPathType string
@@ -405,10 +403,19 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
405403
}
406404
deleteSensor = graveler.NewDeleteSensor(baseCfg.Graveler.CompactionSensorThreshold, cb)
407405
}
408-
gStore := graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager, deleteSensor)
409406

410407
// The size of the workPool is determined by the number of workers and the number of desired pending tasks for each worker.
411408
workPool := pond.NewPool(sharedWorkers, pond.WithContext(ctx))
409+
410+
gStore := graveler.NewGraveler(graveler.GravelerConfig{
411+
CommittedManager: committedManager,
412+
StagingManager: stagingManager,
413+
RefManager: refManager,
414+
GarbageCollectionManager: gcManager,
415+
ProtectedBranchesManager: protectedBranchesManager,
416+
DeleteSensor: deleteSensor,
417+
WorkPool: workPool,
418+
})
412419
closers = append(closers, &ctxCloser{cancelFn})
413420
return &Catalog{
414421
BlockAdapter: tierFSParams.Adapter,

pkg/graveler/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (e *HookAbortError) Unwrap() error {
104104
return e.Err
105105
}
106106

107-
// DeleteError single delete error used by DeleteBatch's multierror.Error to report each key that failed
107+
// DeleteError single delete error used to report individual key deletion failures
108108
type DeleteError struct {
109109
Key Key
110110
Err error

pkg/graveler/graveler.go

Lines changed: 63 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strings"
1111
"time"
1212

13+
"github.com/alitto/pond/v2"
1314
"github.com/cenkalti/backoff/v4"
1415
"github.com/google/uuid"
1516
"github.com/hashicorp/go-multierror"
@@ -23,6 +24,8 @@ import (
2324
"google.golang.org/protobuf/types/known/timestamppb"
2425
)
2526

27+
const defaultSharedWorkers = 5
28+
2629
var kvRetriesCounter = promauto.NewCounterVec(
2730
prometheus.CounterOpts{
2831
Name: "graveler_kv_retries",
@@ -31,6 +34,17 @@ var kvRetriesCounter = promauto.NewCounterVec(
3134
[]string{"operation"},
3235
)
3336

37+
// GravelerConfig holds the configuration for creating a Graveler instance
38+
type GravelerConfig struct {
39+
CommittedManager CommittedManager
40+
StagingManager StagingManager
41+
RefManager RefManager
42+
GarbageCollectionManager GarbageCollectionManager
43+
ProtectedBranchesManager ProtectedBranchesManager
44+
DeleteSensor *DeleteSensor
45+
WorkPool pond.Pool
46+
}
47+
3448
//go:generate go run github.com/golang/mock/[email protected] -source=graveler.go -destination=mock/graveler.go -package=mock
3549

3650
const (
@@ -198,27 +212,29 @@ func WithStageOnly(v bool) GetOptionsFunc {
198212
}
199213
}
200214

201-
type ConditionFunc func(currentValue *Value) error
202-
type SetOptions struct {
203-
// MaxTries set number of times we try to perform the operation before we fail with BranchWriteMaxTries.
204-
// By default, 0 - we try BranchWriteMaxTries
205-
MaxTries int
206-
// Force set to true will bypass repository read-only protection.
207-
Force bool
208-
// AllowEmpty set to true will allow committing an empty commit.
209-
AllowEmpty bool
210-
// Hidden Will create the branch with the hidden property
211-
Hidden bool
212-
// SquashMerge causes merge commits to be "squashed", losing parent
213-
// information about the merged-from branch.
214-
SquashMerge bool
215-
// NoTombstone will try to remove entry without setting a tombstone in KV
216-
NoTombstone bool
217-
// Condition is a function that validates the current value before performing the Set.
218-
// If the condition returns an error, the Set operation fails with that error.
219-
// If the condition succeeds, the Set is performed using SetIf with the current value.
220-
Condition ConditionFunc
221-
}
215+
type (
216+
ConditionFunc func(currentValue *Value) error
217+
SetOptions struct {
218+
// MaxTries set number of times we try to perform the operation before we fail with BranchWriteMaxTries.
219+
// By default, 0 - we try BranchWriteMaxTries
220+
MaxTries int
221+
// Force set to true will bypass repository read-only protection.
222+
Force bool
223+
// AllowEmpty set to true will allow committing an empty commit.
224+
AllowEmpty bool
225+
// Hidden Will create the branch with the hidden property
226+
Hidden bool
227+
// SquashMerge causes merge commits to be "squashed", losing parent
228+
// information about the merged-from branch.
229+
SquashMerge bool
230+
// NoTombstone will try to remove entry without setting a tombstone in KV
231+
NoTombstone bool
232+
// Condition is a function that validates the current value before performing the Set.
233+
// If the condition returns an error, the Set operation fails with that error.
234+
// If the condition succeeds, the Set is performed using SetIf with the current value.
235+
Condition ConditionFunc
236+
}
237+
)
222238

223239
type SetOptionsFunc func(opts *SetOptions)
224240

@@ -1205,22 +1221,30 @@ type Graveler struct {
12051221
logger logging.Logger
12061222
BranchUpdateBackOff backoff.BackOff
12071223
deleteSensor *DeleteSensor
1224+
workPool pond.Pool
12081225
}
12091226

1210-
func NewGraveler(committedManager CommittedManager, stagingManager StagingManager, refManager RefManager, gcManager GarbageCollectionManager, protectedBranchesManager ProtectedBranchesManager, deleteSensor *DeleteSensor) *Graveler {
1227+
func NewGraveler(cfg GravelerConfig) *Graveler {
12111228
branchUpdateBackOff := backoff.NewExponentialBackOff()
12121229
branchUpdateBackOff.MaxInterval = BranchUpdateMaxInterval
12131230

1231+
workPool := cfg.WorkPool
1232+
if workPool == nil {
1233+
// Create a default work pool with 5 workers for basic operations
1234+
workPool = pond.NewPool(defaultSharedWorkers)
1235+
}
1236+
12141237
return &Graveler{
12151238
hooks: &HooksNoOp{},
1216-
CommittedManager: committedManager,
1217-
RefManager: refManager,
1218-
StagingManager: stagingManager,
1239+
CommittedManager: cfg.CommittedManager,
1240+
RefManager: cfg.RefManager,
1241+
StagingManager: cfg.StagingManager,
12191242
BranchUpdateBackOff: branchUpdateBackOff,
1220-
protectedBranchesManager: protectedBranchesManager,
1221-
garbageCollectionManager: gcManager,
1243+
protectedBranchesManager: cfg.ProtectedBranchesManager,
1244+
garbageCollectionManager: cfg.GarbageCollectionManager,
12221245
logger: logging.ContextUnavailable().WithField("service_name", "graveler_graveler"),
1223-
deleteSensor: deleteSensor,
1246+
deleteSensor: cfg.DeleteSensor,
1247+
workPool: workPool,
12241248
}
12251249
}
12261250

@@ -1991,8 +2015,7 @@ func (g *Graveler) Delete(ctx context.Context, repository *RepositoryRecord, bra
19912015
return err
19922016
}
19932017

1994-
// DeleteBatch delete batch of keys. Keys length is limited to DeleteKeysMaxSize. Return error can be of type
1995-
// 'multi-error' holds DeleteError with each key/error that failed as part of the batch.
2018+
// DeleteBatch delete batch of keys. Keys length is limited to DeleteKeysMaxSize. Returns the first error encountered during deletion.
19962019
func (g *Graveler) DeleteBatch(ctx context.Context, repository *RepositoryRecord, branchID BranchID, keys []Key, opts ...SetOptionsFunc) error {
19972020
isProtected, err := g.protectedBranchesManager.IsBlocked(ctx, repository, branchID, BranchProtectionBlockedAction_STAGING_WRITE)
19982021
if err != nil {
@@ -2011,20 +2034,24 @@ func (g *Graveler) DeleteBatch(ctx context.Context, repository *RepositoryRecord
20112034
return fmt.Errorf("keys length (%d) passed the maximum allowed(%d): %w", len(keys), DeleteKeysMaxSize, ErrInvalidValue)
20122035
}
20132036

2014-
var m *multierror.Error
20152037
log := g.log(ctx).WithField("operation", "delete_keys")
20162038
deleteFunc := g.deleteUnsafe
20172039
if options.NoTombstone {
20182040
deleteFunc = g.deleteNoTombstoneUnsafe
20192041
}
20202042
err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{}, func(branch *Branch) error {
2043+
// Use workpool for parallel deletion
2044+
workerGroup := g.workPool.NewGroupContext(ctx)
2045+
2046+
// Submit delete tasks to workpool
20212047
for _, key := range keys {
2022-
err = deleteFunc(ctx, repository, key, BranchRecord{branchID, branch})
2023-
if err != nil {
2024-
m = multierror.Append(m, &DeleteError{Key: key, Err: err})
2025-
}
2048+
workerGroup.SubmitErr(func() error {
2049+
return deleteFunc(ctx, repository, key, BranchRecord{branchID, branch})
2050+
})
20262051
}
2027-
return m.ErrorOrNil()
2052+
2053+
// Wait for the first error or completion
2054+
return workerGroup.Wait()
20282055
}, "delete_keys")
20292056
return err
20302057
}

pkg/graveler/graveler_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,13 @@ func newGraveler(t *testing.T, committedManager graveler.CommittedManager, stagi
191191
) catalog.Store {
192192
t.Helper()
193193

194-
return graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager, nil)
194+
return graveler.NewGraveler(graveler.GravelerConfig{
195+
CommittedManager: committedManager,
196+
StagingManager: stagingManager,
197+
RefManager: refManager,
198+
GarbageCollectionManager: gcManager,
199+
ProtectedBranchesManager: protectedBranchesManager,
200+
})
195201
}
196202

197203
func TestGraveler_List(t *testing.T) {

pkg/graveler/testutil/graveler_mock.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,13 @@ func InitGravelerTest(t *testing.T) *GravelerTest {
3535
KVStore: kvmock.NewMockStore(ctrl),
3636
}
3737

38-
test.Sut = graveler.NewGraveler(test.CommittedManager, test.StagingManager, test.RefManager, test.GarbageCollectionManager, test.ProtectedBranchesManager, nil)
38+
test.Sut = graveler.NewGraveler(graveler.GravelerConfig{
39+
CommittedManager: test.CommittedManager,
40+
StagingManager: test.StagingManager,
41+
RefManager: test.RefManager,
42+
GarbageCollectionManager: test.GarbageCollectionManager,
43+
ProtectedBranchesManager: test.ProtectedBranchesManager,
44+
})
3945

4046
return test
4147
}

0 commit comments

Comments
 (0)