From c09225129194070b214a39caa27772e00cc22183 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Fri, 27 Mar 2026 13:40:30 +0300 Subject: [PATCH 1/2] feat(fracmanager): implement fraction snapshots with wait group reference counting --- asyncsearcher/async_searcher.go | 27 +- asyncsearcher/async_searcher_test.go | 4 +- fracmanager/fracmanager.go | 19 +- fracmanager/fracmanager_for_tests.go | 2 +- fracmanager/fracmanager_test.go | 24 +- fracmanager/fracs_stats.go | 4 + fracmanager/fraction_provider.go | 26 +- fracmanager/fraction_registry.go | 431 ++++++++++---------------- fracmanager/fractions_snapshot.go | 136 ++++++++ fracmanager/lifecycle_manager.go | 99 +++--- fracmanager/lifecycle_manager_test.go | 123 ++++++-- fracmanager/partitioned_collection.go | 117 +++++++ fracmanager/proxy_frac.go | 201 ------------ fracmanager/sync_appender.go | 82 +++++ skipmaskmanager/skip_mask_manager.go | 15 +- storeapi/grpc_fetch.go | 5 +- storeapi/grpc_search.go | 16 +- util/min_heap.go | 117 +++++++ 18 files changed, 846 insertions(+), 602 deletions(-) create mode 100644 fracmanager/fractions_snapshot.go create mode 100644 fracmanager/partitioned_collection.go delete mode 100644 fracmanager/proxy_frac.go create mode 100644 fracmanager/sync_appender.go create mode 100644 util/min_heap.go diff --git a/asyncsearcher/async_searcher.go b/asyncsearcher/async_searcher.go index a0fa6368..72bbaa8a 100644 --- a/asyncsearcher/async_searcher.go +++ b/asyncsearcher/async_searcher.go @@ -74,11 +74,11 @@ type AsyncSearcherConfig struct { } type fractionAcquirer interface { - Fractions() fracmanager.List + AcquireFractions() (_ fracmanager.List, release func()) AcquireFraction(name string) (_ frac.Fraction, release func(), ok bool) } -func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fractionAcquirer) *AsyncSearcher { +func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracProvider fractionAcquirer) *AsyncSearcher { if config.DataDir == "" { logger.Fatal("can't start async searcher: DataDir is empty") } @@ -107,7 +107,7 @@ func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fracti for _, id := range notProcessedIDs { asyncSearchActiveSearches.Add(1) as.processWg.Add(1) - go as.processRequest(id, fracs) + go as.processRequest(id, fracProvider) } // set limit metrics that allow us to calculate alerts' thresholds @@ -209,7 +209,7 @@ func (i *asyncSearchInfo) Status() AsyncSearchStatus { return status } -func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fractionAcquirer) error { +func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracProvider fractionAcquirer) error { if as.readOnly.Load() { return fmt.Errorf("cannot start search on read-only mode") } @@ -240,14 +240,17 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fractionAcquire return fmt.Errorf("retention time should be less than %s, got %s", maxRetention, r.Retention) } - fracNames := fracs.Fractions().FilterInRange(r.Params.From, r.Params.To).Names() + fracs, release := fracProvider.AcquireFractions() + defer release() + + fracNames := fracs.FilterInRange(r.Params.From, r.Params.To).Names() if ok := as.saveSearchInfo(r, fracNames); !ok { // Request was saved previously, skip it return nil } asyncSearchActiveSearches.Add(1) as.processWg.Add(1) - go as.processRequest(r.ID, fracs) + go as.processRequest(r.ID, fracProvider) return nil } @@ -301,17 +304,17 @@ func (as *AsyncSearcher) createDataDir() { }) } -func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fractionAcquirer) { +func (as *AsyncSearcher) processRequest(asyncSearchID string, fracProvider fractionAcquirer) { defer as.processWg.Done() as.rateLimit <- struct{}{} defer func() { <-as.rateLimit }() - as.doSearch(asyncSearchID, fracs) + as.doSearch(asyncSearchID, fracProvider) asyncSearchActiveSearches.Add(-1) } -func (as *AsyncSearcher) doSearch(id string, fracs fractionAcquirer) { +func (as *AsyncSearcher) doSearch(id string, fracProvider fractionAcquirer) { qprPaths, err := as.findQPRs(id) if err != nil { panic(fmt.Errorf("can't find QPRs for id %q: %s", id, err)) @@ -353,7 +356,7 @@ func (as *AsyncSearcher) doSearch(id string, fracs fractionAcquirer) { if as.shouldStopSearch(id) { break } - if err := as.acquireAndProcessFrac(fracInfo, info, fracs); err != nil { + if err := as.acquireAndProcessFrac(fracInfo, info, fracProvider); err != nil { as.updateSearchInfo(id, func(info *asyncSearchInfo) { info.Error = err.Error() }) @@ -395,8 +398,8 @@ func compressQPR(qpr *seq.QPR, cb func(compressed []byte) error) error { return nil } -func (as *AsyncSearcher) acquireAndProcessFrac(fracInfo fracSearchState, searchInfo asyncSearchInfo, fracs fractionAcquirer) (err error) { - f, release, ok := fracs.AcquireFraction(fracInfo.Name) +func (as *AsyncSearcher) acquireAndProcessFrac(fracInfo fracSearchState, searchInfo asyncSearchInfo, fracProvider fractionAcquirer) (err error) { + f, release, ok := fracProvider.AcquireFraction(fracInfo.Name) if !ok { // oldest fracs may already be removed logger.Info( "async search: skip missing fraction", diff --git a/asyncsearcher/async_searcher_test.go b/asyncsearcher/async_searcher_test.go index 8ef37bc3..d6311ef5 100644 --- a/asyncsearcher/async_searcher_test.go +++ b/asyncsearcher/async_searcher_test.go @@ -49,8 +49,8 @@ func (fp fakeFractionProvider) AcquireFraction(name string) (frac.Fraction, func return nil, func() {}, false } -func (fp fakeFractionProvider) Fractions() fracmanager.List { - return fracmanager.List(fp) +func (fp fakeFractionProvider) AcquireFractions() (fracmanager.List, func()) { + return fracmanager.List(fp), func() {} } func TestAsyncSearcherMaintain(t *testing.T) { diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 6b5b7c87..77a73c78 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -76,18 +76,19 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk cancel() wg.Wait() - // freeze active fraction to prevent new writes - active := lc.registry.Active() - if err := active.Finalize(); err != nil { + // finalize appender to prevent new writes + appender := lc.registry.Appender() + if err := appender.Finalize(); err != nil { logger.Fatal("shutdown fraction freezing error", zap.Error(err)) } - active.WaitWriteIdle() + appender.WaitWriteIdle() stopIdx() lc.SyncInfoCache() - sealOnShutdown(active.instance, provider, cfg.MinSealFracSize) + // Seal active fraction + sealOnShutdown(appender.Active, provider, cfg.MinSealFracSize) logger.Info("fracmanager's workers are stopped", zap.Int64("took_ms", time.Since(n).Milliseconds())) } @@ -96,11 +97,11 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk } func (fm *FracManager) AcquireFraction(name string) (frac.Fraction, func(), bool) { - return fm.lc.registry.AcquireFraction(name) + return fm.lc.registry.AcquireOneFraction(name) } -func (fm *FracManager) Fractions() List { - return fm.lc.registry.AllFractions() +func (fm *FracManager) AcquireFractions() (List, func()) { + return fm.lc.registry.AcquireAllFractions() } func (fm *FracManager) Oldest() uint64 { @@ -120,7 +121,7 @@ func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas return ctx.Err() default: // Try to append data to the currently active fraction - err := fm.lc.registry.Active().Append(docs, metas) + err := fm.lc.registry.Appender().Append(docs, metas) if err != nil { logger.Info("append fail", zap.Error(err)) if err == ErrFractionNotWritable { diff --git a/fracmanager/fracmanager_for_tests.go b/fracmanager/fracmanager_for_tests.go index ab7cd851..c4ec1cad 100644 --- a/fracmanager/fracmanager_for_tests.go +++ b/fracmanager/fracmanager_for_tests.go @@ -3,7 +3,7 @@ package fracmanager import "sync" func (fm *FracManager) WaitIdleForTests() { - fm.lc.registry.Active().WaitWriteIdle() + fm.lc.registry.Appender().WaitWriteIdle() } func (fm *FracManager) SealForcedForTests() { diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 663dedec..d92e13b8 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -61,30 +61,32 @@ func TestSealingOnShutdown(t *testing.T) { cfg.MinSealFracSize = 0 // to ensure that the frac will not be sealed on shutdown cfg, fm, stop := setupFracManager(t, cfg) appendDocsToFracManager(t, fm, 10) - activeName := fm.Fractions()[0].Info().Name() + + activeName := fm.lc.registry.all.fractions[0].Info().Name() + stop() // second start cfg.MinSealFracSize = 1 // to ensure that the frac will be sealed on shutdown cfg, fm, stop = setupFracManager(t, cfg) - assert.Equal(t, 1, len(fm.Fractions()), "should have one fraction") - assert.Equal(t, activeName, fm.Fractions()[0].Info().Name(), "fraction should have the same name") - _, ok := fm.Fractions()[0].(*fractionProxy).impl.(*frac.Active) + allFractions := fm.lc.registry.all.fractions + assert.Equal(t, 1, len(allFractions), "should have one fraction") + assert.Equal(t, activeName, allFractions[0].Info().Name(), "fraction should have the same name") + _, ok := allFractions[0].(*syncAppender) assert.True(t, ok, "fraction should be active") - stop() // third start _, fm, stop = setupFracManager(t, cfg) - assert.Equal(t, 2, len(fm.Fractions()), "should have 2 fraction: new active and old sealed") - _, ok = fm.Fractions()[0].(*fractionProxy).impl.(*frac.Sealed) + allFractions = fm.lc.registry.all.fractions + assert.Equal(t, 2, len(allFractions), "should have 2 fraction: new active and old sealed") + _, ok = allFractions[0].(*refCountedSealed) assert.True(t, ok, "first fraction should be sealed") - assert.Equal(t, activeName, fm.Fractions()[0].Info().Name(), "sealed fraction should have the same name") - assert.Equal(t, uint32(0), fm.Fractions()[1].Info().DocsTotal, "active fraction should be empty") - _, ok = fm.Fractions()[1].(*fractionProxy).impl.(*frac.Active) + assert.Equal(t, activeName, allFractions[0].Info().Name(), "sealed fraction should have the same name") + assert.Equal(t, uint32(0), allFractions[1].Info().DocsTotal, "active fraction should be empty") + _, ok = allFractions[1].(*syncAppender) assert.True(t, ok, "new fraction should be active") - stop() } diff --git a/fracmanager/fracs_stats.go b/fracmanager/fracs_stats.go index 968b8b41..c70bbd37 100644 --- a/fracmanager/fracs_stats.go +++ b/fracmanager/fracs_stats.go @@ -95,3 +95,7 @@ func (s *registryStats) SetMetrics() { s.offloading.SetMetrics(dataSizeTotal, "offloading") s.remotes.SetMetrics(dataSizeTotal, "remotes") } + +func (s registryStats) TotalSizeOnDiskLocal() uint64 { + return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk +} diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index 66e6477b..e3a4d46b 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -8,14 +8,17 @@ import ( "time" "github.com/oklog/ulid/v2" + "go.uber.org/zap" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/sealing" + "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" + "github.com/ozontech/seq-db/util" ) const fileBasePattern = "seq-db-" @@ -123,8 +126,11 @@ func (fp *fractionProvider) CreateActive() *frac.Active { // Seal converts an active fraction to a sealed one // Process includes sorting, indexing, and data optimization for reading -func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) { - src, err := frac.NewActiveSealingSource(active, fp.config.SealParams) +func (fp *fractionProvider) Seal(a *frac.Active) (*frac.Sealed, error) { + sealsTotal.Inc() + now := time.Now() + + src, err := frac.NewActiveSealingSource(a, fp.config.SealParams) if err != nil { return nil, err } @@ -133,9 +139,19 @@ func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) { return nil, err } - sealedFrac := fp.NewSealedPreloaded(active.BaseFileName, preloaded) - fp.skipMaskProvider.RefreshFrac(sealedFrac) - return sealedFrac, nil + s := fp.NewSealedPreloaded(a.BaseFileName, preloaded) + fp.skipMaskProvider.RefreshFrac(s) + + sealingTime := time.Since(now) + sealsDoneSeconds.Observe(sealingTime.Seconds()) + + logger.Info( + "fraction sealed", + zap.String("fraction", filepath.Base(s.BaseFileName)), + zap.Float64("time_spent_s", util.DurationToUnit(sealingTime, "s")), + ) + + return s, nil } // Offload uploads fraction to S3 storage and returns a remote fraction diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index 0f79c28d..b0667c04 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -15,94 +15,94 @@ import ( // fractionRegistry manages fraction queues at different lifecycle stages. // Tracks fractions through different stages: active → sealing → sealed → offloading → remote -// Ensures correct state transitions while maintaining chronological order. // The entire structure is thread-safe due to internal synchronization. -// Lifecycle: created once, persists through application lifetime. type fractionRegistry struct { mu sync.RWMutex // main mutex for protecting registry state - // lifecycle queues (FIFO order, oldest at lower indexes) - sealing []*activeProxy // fractions being sealed (0-5 typical) - sealed []*sealedProxy // local sealed fractions (can be thousands) - offloading []*sealedProxy // fractions being offloaded (0-5 typical) - remotes []*remoteProxy // offloaded fractions (can be thousands) + sealing map[string]*syncAppender // fractions being sealed (0-5 typical) + sealed PartitionedCollection[*refCountedSealed] // local sealed fractions (can be thousands) + offloading PartitionedCollection[*refCountedSealed] // fractions being offloaded (0-5 typical) + remotes PartitionedCollection[*refCountedRemote] // offloaded fractions (can be thousands) - stats registryStats // size statistics for monitoring - oldestTotal uint64 // creation time of oldest fraction in all list including remote - oldestLocal uint64 // creation time of oldest fraction in local or offloading queues + stats registryStats // size statistics for monitoring - muAll sync.RWMutex // protects active, all, and oldestTotal fields - active *activeProxy // currently active writable fraction - all []frac.Fraction // all fractions in creation order (read-only view) - allMap map[string]frac.Fraction + muAppender sync.RWMutex + appender *syncAppender // currently active writable fraction + + muAll sync.RWMutex + all fractionsSnapshot // all fractions } // NewFractionRegistry creates and initializes a new fraction registry instance. // Populates the registry with existing active, sealed and remote fractions. -// Rebuilds the complete fractions list in chronological order. func NewFractionRegistry(active *frac.Active, sealed []*frac.Sealed, remotes []*frac.Remote) (*fractionRegistry, error) { if active == nil { return nil, errors.New("active fraction must be specified") } - r := fractionRegistry{ - active: &activeProxy{ - proxy: &fractionProxy{impl: active}, - instance: active, - }, + creationTime := func(f frac.Fraction) uint64 { return f.Info().CreationTime } + + lastDocTime := func(f frac.Fraction) uint64 { + aligned := f.Info().To.Time(). + Add(-time.Nanosecond). + Truncate(time.Minute). + Add(time.Minute) + return uint64(aligned.UnixMilli()) + } + + reg := fractionRegistry{ + appender: &syncAppender{refCountedActive: refCountedActive{Active: active}}, + + sealing: map[string]*syncAppender{}, + sealed: NewPartitionedCollection(func(rcs *refCountedSealed) uint64 { return creationTime(rcs) }), + offloading: NewPartitionedCollection(func(rcs *refCountedSealed) uint64 { return lastDocTime(rcs) }), + remotes: NewPartitionedCollection(func(rcr *refCountedRemote) uint64 { return lastDocTime(rcr) }), } // initialize local sealed fractions - for _, sealed := range sealed { - r.stats.sealed.Add(sealed.Info()) - r.sealed = append(r.sealed, &sealedProxy{ - proxy: &fractionProxy{impl: sealed}, - instance: sealed, - }) + for _, s := range sealed { + reg.stats.sealed.Add(s.Info()) + reg.sealed.Add(s.Info().Name(), &refCountedSealed{Sealed: s}) } // initialize remote fractions - for _, remote := range remotes { - r.stats.remotes.Add(remote.Info()) - r.remotes = append(r.remotes, &remoteProxy{ - proxy: &fractionProxy{impl: remote}, - instance: remote, - }) + for _, r := range remotes { + reg.stats.remotes.Add(r.Info()) + reg.remotes.Add(r.Info().Name(), &refCountedRemote{Remote: r}) } - r.updateOldestLocal() - r.rebuildAllFractions() + reg.rebuildSnapshot() - return &r, nil + return ®, nil } -// Active returns the currently active writable fraction. -func (r *fractionRegistry) Active() *activeProxy { - r.muAll.RLock() - defer r.muAll.RUnlock() - return r.active +// Appender returns the currently active writable fraction. +func (r *fractionRegistry) Appender() *syncAppender { + r.muAppender.RLock() + defer r.muAppender.RUnlock() + return r.appender } -func (r *fractionRegistry) AcquireFraction(name string) (frac.Fraction, func(), bool) { +func (r *fractionRegistry) AcquireOneFraction(name string) (frac.Fraction, func(), bool) { r.muAll.RLock() defer r.muAll.RUnlock() - f, ok := r.allMap[name] - return f, func() {}, ok + return r.all.AcquireOne(name) } -// AllFractions returns a read-only view of all fractions in creation order. -func (r *fractionRegistry) AllFractions() []frac.Fraction { +// AcquireAllFractions returns a read-only view of all fractions +func (r *fractionRegistry) AcquireAllFractions() ([]frac.Fraction, func()) { r.muAll.RLock() defer r.muAll.RUnlock() - return r.all + + return r.all.AcquireAll() } // Stats returns current size statistics of the registry. func (r *fractionRegistry) Stats() registryStats { r.mu.RLock() s := r.stats - i := r.active.instance.Info() + i := r.appender.Info() r.mu.RUnlock() s.active.Set(i) @@ -113,40 +113,57 @@ func (r *fractionRegistry) Stats() registryStats { func (r *fractionRegistry) OldestTotal() uint64 { r.muAll.RLock() defer r.muAll.RUnlock() - return r.oldestTotal + return r.all.oldestTotal } // OldestLocal returns the creation time of the oldest local fraction in the registry. func (r *fractionRegistry) OldestLocal() uint64 { - r.mu.RLock() - defer r.mu.RUnlock() - return r.oldestLocal + r.muAll.RLock() + defer r.muAll.RUnlock() + return r.all.oldestLocal +} + +type activeProvider interface { + CreateActive() *frac.Active +} + +func (r *fractionRegistry) setAppender(appender *syncAppender) { + r.muAppender.Lock() + defer r.muAppender.Unlock() + + r.appender = appender + + r.muAll.Lock() + defer r.muAll.Unlock() + + r.all.AddActive(appender) } // RotateIfFull completes the current active fraction and starts a new one. // Moves previous active fraction to sealing queue. -// Updates statistics and maintains chronological order. -// Should be called when creating a new fraction. -func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *activeProxy) (*activeProxy, func(), error) { +// Should be called when the current active fraction reaches size limit and needs to be rotated +func (r *fractionRegistry) RotateIfFull(maxSize uint64, ap activeProvider) (*refCountedActive, func(), error) { r.mu.Lock() defer r.mu.Unlock() - if r.active.instance.Info().DocsOnDisk <= maxSize { + if r.appender.Info().DocsOnDisk <= maxSize { return nil, nil, nil } - old := r.active - r.sealing = append(r.sealing, old) - r.addActive(newActive()) + old := r.appender + + r.sealing[old.Info().Name()] = old + + r.setAppender(&syncAppender{refCountedActive: refCountedActive{Active: ap.CreateActive()}}) if err := old.Finalize(); err != nil { - return old, nil, err + return nil, nil, err } - curInfo := old.instance.Info() + curInfo := old.Info() r.stats.sealing.Add(curInfo) - r.active.Suspend(old.Suspended()) + r.appender.Suspend(old.Suspended()) wg := sync.WaitGroup{} wg.Add(1) @@ -156,7 +173,7 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active defer wg.Done() old.WaitWriteIdle() // can be long enough - finalInfo := old.instance.Info() + finalInfo := old.Info() r.mu.Lock() defer r.mu.Unlock() @@ -167,14 +184,14 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active r.stats.sealing.Add(finalInfo) }() - return old, wg.Wait, nil + return &old.refCountedActive, wg.Wait, nil } func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { r.mu.Lock() defer r.mu.Unlock() - suspended := r.active.Suspended() + suspended := r.appender.Suspended() if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) { if !suspended { @@ -182,7 +199,7 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.String("reason", "sealing queue size exceeded"), zap.Uint64("limit", maxQueue), zap.Int("queue_size", r.stats.sealing.count)) - r.active.Suspend(true) + r.appender.Suspend(true) } return } @@ -195,7 +212,7 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.String("reason", "occupied space limit exceeded"), zap.Float64("queue_size_limit_gb", util.Float64ToPrec(util.SizeToUnit(maxSize, "gb"), 2)), zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2))) - r.active.Suspend(true) + r.appender.Suspend(true) } return } @@ -206,66 +223,67 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2)), zap.Uint64("sealing_queue_size_limit", maxQueue), zap.Int("queue_size", r.stats.sealing.count)) - r.active.Suspend(false) + r.appender.Suspend(false) } } func (r *fractionRegistry) diskUsage() uint64 { - return r.active.instance.Info().FullSize() + + return r.appender.Info().FullSize() + r.stats.sealed.totalSizeOnDisk + r.stats.sealing.totalSizeOnDisk + r.stats.offloading.totalSizeOnDisk } -// addActive sets a new active fraction and updates the complete fractions list. -func (r *fractionRegistry) addActive(a *activeProxy) { - r.muAll.Lock() - defer r.muAll.Unlock() +// EvictLocalForDelete removes oldest local fractions to free disk space. +// Returns evicted fractions or error if insufficient space is released. +func (r *fractionRegistry) EvictLocalForDelete(sizeLimit uint64) (evicted []*refCountedSealed, err error) { + r.mu.Lock() + defer r.mu.Unlock() - r.active = a - r.all = append(r.all, a.proxy) - r.allMap[a.instance.Info().Name()] = a.proxy -} + if evicted, err = r.evictLocal(sizeLimit); err != nil { + return nil, err + } -// trimAll removes the oldest fractions from the complete fractions list. -// Used when fractions are evicted or deleted from the system. -func (r *fractionRegistry) trimAll(count int) { - r.muAll.Lock() - defer r.muAll.Unlock() + r.rebuildSnapshot() - for _, f := range r.all[:count] { - delete(r.allMap, f.Info().Name()) - } - r.all = r.all[count:] - r.updateOldestTotal() + return evicted, nil } -// EvictLocal removes oldest local fractions to free disk space. -// If shouldOffload is true, moves fractions to offloading queue instead of deleting. +// EvictLocalForOffload removes oldest local fractions to moves it to offloading queue. // Returns evicted fractions or error if insufficient space is released. -func (r *fractionRegistry) EvictLocal(shouldOffload bool, sizeLimit uint64) ([]*sealedProxy, error) { +func (r *fractionRegistry) EvictLocalForOffload(sizeLimit uint64) ([]*refCountedSealed, error) { r.mu.Lock() defer r.mu.Unlock() - var ( - count int - releasingSize uint64 - ) + evicted, err := r.evictLocal(sizeLimit) + if err != nil { + return nil, err + } + + for _, sealed := range evicted { + r.offloading.Add(sealed.Info().Name(), sealed) + r.stats.offloading.Add(sealed.Info()) + } + + return evicted, nil +} + +func (r *fractionRegistry) evictLocal(sizeLimit uint64) ([]*refCountedSealed, error) { + var releasingSize uint64 // calculate total used disk space - totalUsedSize := r.stats.sealed.totalSizeOnDisk + - r.stats.sealing.totalSizeOnDisk + - r.active.instance.Info().FullSize() + totalUsedSize := r.stats.TotalSizeOnDiskLocal() + r.appender.Info().FullSize() + + evicted := []*refCountedSealed{} - // determine how many oldest fractions need to be removed to meet size limit - for _, item := range r.sealed { - if totalUsedSize-releasingSize <= sizeLimit { - break + for r.sealed.Len() > 0 && totalUsedSize-releasingSize > sizeLimit { + for _, s := range r.sealed.GetByPartition(r.sealed.MinPartition()) { + info := s.Info() + releasingSize += info.FullSize() + r.stats.sealed.Sub(info) + r.sealed.Del(info.Name()) + evicted = append(evicted, s) } - info := item.instance.Info() - releasingSize += info.FullSize() - r.stats.sealed.Sub(info) - count++ } // check if enough space will be freed @@ -275,28 +293,13 @@ func (r *fractionRegistry) EvictLocal(shouldOffload bool, sizeLimit uint64) ([]* (totalUsedSize-releasingSize)-sizeLimit, totalUsedSize, releasingSize, sizeLimit) } - // extract fractions to evict - evicted := r.sealed[:count] - r.sealed = r.sealed[count:] - - // either offload or completely remove the fractions - if shouldOffload { - for _, item := range evicted { - r.offloading = append(r.offloading, item) - r.stats.offloading.Add(item.instance.Info()) - } - } else { - r.trimAll(count) // permanently remove - r.updateOldestLocal() // oldest local can be changed here - } - return evicted, nil } // EvictRemote removes oldest remote fractions based on retention policy. // Fractions older than retention period are permanently deleted. // Returns removed fractions or empty slice if nothing to remove. -func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy { +func (r *fractionRegistry) EvictRemote(retention time.Duration) []*refCountedRemote { if retention == 0 { return nil } @@ -304,28 +307,24 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy { r.mu.Lock() defer r.mu.Unlock() - count := 0 - // find fractions older than retention period - for _, item := range r.remotes { - info := item.instance.Info() - if time.Since(time.UnixMilli(int64(info.CreationTime))) <= retention { - break // stop at first fraction within retention + evicted := []*refCountedRemote{} + for r.remotes.Len() > 0 && time.Since(time.UnixMilli(int64(r.remotes.MinPartition()))) > retention { + for _, remote := range r.remotes.GetByPartition(r.remotes.MinPartition()) { + info := remote.Info() + r.stats.remotes.Sub(info) + evicted = append(evicted, remote) + r.remotes.Del(info.Name()) } - r.stats.remotes.Sub(info) - count++ } - evicted := r.remotes[:count] - r.remotes = r.remotes[count:] - r.trimAll(count) // remove from complete list + r.rebuildSnapshot() return evicted } // EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit. -// Selects fractions that haven't finished offloading yet to minimize data loss. // Used when offloading queue grows too large due to slow remote storage performance. -func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy { +func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) (evicted []*refCountedSealed) { if sizeLimit == 0 { return nil } @@ -338,168 +337,80 @@ func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy { return nil } - count := 0 - evicted := []*sealedProxy{} +loop: // filter fractions - for _, item := range r.offloading { - // keep items that are within limits or already offloaded - if r.stats.offloading.totalSizeOnDisk <= sizeLimit || item.remote != nil { - r.offloading[count] = item - count++ - continue + for r.offloading.Len() > 0 { + for _, s := range r.offloading.GetByPartition(r.offloading.MinPartition()) { + evicted = append(evicted, s) + r.stats.offloading.Sub(s.Info()) + r.offloading.Del(s.Info().Name()) + if r.stats.offloading.totalSizeOnDisk <= sizeLimit { + break loop + } } - evicted = append(evicted, item) - r.stats.offloading.Sub(item.instance.Info()) } - r.offloading = r.offloading[:count] - r.rebuildAllFractions() + r.rebuildSnapshot() return evicted } // PromoteToSealed moves fractions from sealing to local queue when sealing completes. -// Maintains strict ordering - younger fractions wait for older ones to seal first. -func (r *fractionRegistry) PromoteToSealed(active *activeProxy, sealed *frac.Sealed) { +func (r *fractionRegistry) PromoteToSealed(active *refCountedActive, sealed *frac.Sealed) { r.mu.Lock() defer r.mu.Unlock() - active.sealed = sealed + r.sealed.Add(sealed.Info().Name(), &refCountedSealed{Sealed: sealed}) + r.stats.sealed.Add(sealed.Info()) + r.stats.sealing.Sub(active.Info()) - promotedCount := 0 - // process sealing queue in order, promoting completed fractions - for _, item := range r.sealing { - if item.sealed == nil { - break // maintain order - wait for previous fractions to complete - } - promotedCount++ - r.sealed = append(r.sealed, &sealedProxy{ - proxy: item.proxy, - instance: item.sealed, - }) - r.stats.sealed.Add(item.sealed.Info()) - r.stats.sealing.Sub(item.instance.Info()) - } + delete(r.sealing, active.Info().Name()) - // remove promoted fractions from sealing queue - r.sealing = r.sealing[promotedCount:] + r.rebuildSnapshot() } // PromoteToRemote moves fractions from offloading to remote queue when offloading completes. // Special case: handles fractions that don't require offloading (remote == nil). -// Maintains strict ordering - younger fractions wait for older ones to offload. -func (r *fractionRegistry) PromoteToRemote(sealed *sealedProxy, remote *frac.Remote) { +func (r *fractionRegistry) PromoteToRemote(sealed *refCountedSealed, remote *frac.Remote) { r.mu.Lock() defer r.mu.Unlock() - sealed.remote = remote - - // special case: remote == nil means fraction doesn't require offloading - if remote == nil { - r.removeFromOffloading(sealed) + if remote != nil { + r.remotes.Add(remote.Info().Name(), &refCountedRemote{Remote: remote}) + r.stats.remotes.Add(remote.Info()) } - promotedCount := 0 - // process offloading queue in order, promoting completed fractions - for _, item := range r.offloading { - if item.remote == nil { - break // maintain order - wait for previous fractions to complete - } - promotedCount++ - r.remotes = append(r.remotes, &remoteProxy{ - proxy: item.proxy, - instance: item.remote, - }) - - r.stats.remotes.Add(item.remote.Info()) - r.stats.offloading.Sub(item.instance.Info()) - } - if promotedCount > 0 { - // remove promoted fractions from offloading queue - r.offloading = r.offloading[promotedCount:] - r.updateOldestLocal() - } + r.stats.offloading.Sub(sealed.Info()) + r.offloading.Del(sealed.Info().Name()) + r.rebuildSnapshot() } -// removeFromOffloading removes a specific fraction from offloading queue. -// O(n) operation that rebuilds the all fractions list. -func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) { - count := 0 - // filter out the target fraction - for _, item := range r.offloading { - if sealed != item { - r.offloading[count] = item - count++ - } - } - - if count == len(r.offloading) { // not found to remove (can be removed earlier in EvictOverflowed) - return - } - - r.offloading = r.offloading[:count] - r.stats.offloading.Sub(sealed.instance.Info()) - - // oldest local can be changed here - r.updateOldestLocal() - - // rebuild complete list since we modified the middle of the queue - r.rebuildAllFractions() -} +// rebuildSnapshot reconstructs the all fractions list +func (r *fractionRegistry) rebuildSnapshot() { + capacity := r.remotes.Len() + r.offloading.Len() + r.sealed.Len() + len(r.sealing) + 1 -// rebuildAllFractions reconstructs the all fractions list in correct chronological order. -// Order: remote (oldest) → offloading → sealed → sealing → active (newest) -// Expensive O(n) operation used when direct list modification is insufficient. -func (r *fractionRegistry) rebuildAllFractions() { - all := make([]frac.Fraction, 0, len(r.all)) - allMap := make(map[string]frac.Fraction, len(r.all)) + // allocate extra capacity to accommodate appender rotation that may occur during snapshot lifetime + all := newFractionsSnapshot(capacity + 1) - add := func(f frac.Fraction) { - all = append(all, f) - allMap[f.Info().Name()] = f + for r := range r.remotes.All() { + all.AddRemote(r) } - // collect fractions in correct chronological order: from oldest (remote) to newest (active) - for _, remote := range r.remotes { - add(remote.proxy) + for o := range r.offloading.All() { + all.AddSealed(o) } - for _, offloaded := range r.offloading { - add(offloaded.proxy) - } - for _, sealed := range r.sealed { - add(sealed.proxy) + + for s := range r.sealed.All() { + all.AddSealed(s) } - for _, active := range r.sealing { - add(active.proxy) + + for _, a := range r.sealing { + all.AddActive(a) } - add(r.active.proxy) + all.AddActive(r.appender) r.muAll.Lock() defer r.muAll.Unlock() - r.all = all - r.allMap = allMap - r.updateOldestTotal() -} - -// updateOldestTotal recalculates the creation time of the oldest fraction. -// Called after modifications of the complete fractions list. -func (r *fractionRegistry) updateOldestTotal() { - r.oldestTotal = r.all[0].Info().CreationTime -} - -// updateOldestLocal recalculates the creation time of the oldest local fraction. -// Priority order: offloading queue → sealed queue → sealing queue → active fraction. -// Called after modifications -func (r *fractionRegistry) updateOldestLocal() { - if len(r.offloading) > 0 { - r.oldestLocal = r.offloading[0].proxy.Info().CreationTime - } else if len(r.sealed) > 0 { - r.oldestLocal = r.sealed[0].proxy.Info().CreationTime - } else if len(r.sealing) > 0 { - r.oldestLocal = r.sealing[0].proxy.Info().CreationTime - } else { - r.oldestLocal = r.active.proxy.Info().CreationTime - } } diff --git a/fracmanager/fractions_snapshot.go b/fracmanager/fractions_snapshot.go new file mode 100644 index 00000000..9d561f63 --- /dev/null +++ b/fracmanager/fractions_snapshot.go @@ -0,0 +1,136 @@ +package fracmanager + +import ( + "math" + "sync" + + "github.com/ozontech/seq-db/frac" +) + +// RefCounter provides reference counting capability. +type RefCounter interface { + Inc() + Dec() +} + +// fractionsSnapshot represents a point-in-time view of multiple fractions +// with associated reference counters to keep them alive. +type fractionsSnapshot struct { + counters []RefCounter // Reference counters to keep fractions alive + fractions []frac.Fraction // The actual fractions in chronological order + names map[string]int + oldestLocal uint64 + oldestTotal uint64 +} + +func newFractionsSnapshot(capacity int) fractionsSnapshot { + return fractionsSnapshot{ + counters: make([]RefCounter, 0, capacity), + fractions: make([]frac.Fraction, 0, capacity), + names: make(map[string]int, capacity), + oldestLocal: math.MaxUint64, + oldestTotal: math.MaxUint64, + } +} + +func (fs *fractionsSnapshot) AddActive(a *syncAppender) { + fs.names[a.Info().Name()] = len(fs.fractions) + + fs.counters = append(fs.counters, a) + fs.fractions = append(fs.fractions, a) + + fs.oldestLocal = min(fs.oldestLocal, a.Info().CreationTime) + fs.oldestTotal = min(fs.oldestTotal, fs.oldestLocal) +} + +func (fs *fractionsSnapshot) AddSealed(s *refCountedSealed) { + fs.names[s.Info().Name()] = len(fs.fractions) + + fs.counters = append(fs.counters, s) + fs.fractions = append(fs.fractions, s) + + fs.oldestLocal = min(fs.oldestLocal, s.Info().CreationTime) + fs.oldestTotal = min(fs.oldestTotal, fs.oldestLocal) +} + +func (fs *fractionsSnapshot) AddRemote(r *refCountedRemote) { + fs.names[r.Info().Name()] = len(fs.fractions) + + fs.counters = append(fs.counters, r) + fs.fractions = append(fs.fractions, r) + + fs.oldestTotal = min(fs.oldestTotal, r.Info().CreationTime) +} + +// AcquireAll returns the fractions and a release function. +// Caller must call the release function when done to decrement reference counts. +func (fs *fractionsSnapshot) AcquireAll() ([]frac.Fraction, func()) { + for _, c := range fs.counters { + c.Inc() + } + + counters := fs.counters // make copy of counters + return fs.fractions, func() { + for _, c := range counters { + c.Dec() + } + } +} + +func (fs *fractionsSnapshot) AcquireOne(name string) (frac.Fraction, func(), bool) { + i, ok := fs.names[name] + if !ok { + return nil, func() {}, false + } + + c := fs.counters[i] + f := fs.fractions[i] + + c.Inc() + return f, c.Dec, true +} + +type refCounterWg struct { + wg sync.WaitGroup +} + +func (p *refCounterWg) Inc() { p.wg.Add(1) } + +func (p *refCounterWg) Dec() { p.wg.Done() } + +// refCountedActive wraps frac.Active with reference counting. +// Destroy releases the underlying Active after all references are gone. +type refCountedActive struct { + refCounterWg + *frac.Active +} + +// Destroy waits for all references to be released and then releases the Active. +func (p *refCountedActive) Destroy() { + p.wg.Wait() + p.Release() +} + +// refCountedSealed wraps frac.Sealed with reference counting. +type refCountedSealed struct { + refCounterWg + *frac.Sealed +} + +// Destroy waits for all references to be released and then destroys the Sealed. +func (p *refCountedSealed) Destroy() { + p.wg.Wait() + p.Suicide() +} + +// refCountedRemote wraps frac.Remote with reference counting. +type refCountedRemote struct { + refCounterWg + *frac.Remote +} + +// Destroy waits for all references to be released and then destroys the Remote. +func (p *refCountedRemote) Destroy() { + p.wg.Wait() + p.Suicide() +} diff --git a/fracmanager/lifecycle_manager.go b/fracmanager/lifecycle_manager.go index cd1c4bd3..24025c23 100644 --- a/fracmanager/lifecycle_manager.go +++ b/fracmanager/lifecycle_manager.go @@ -2,7 +2,6 @@ package fracmanager import ( "context" - "path/filepath" "sync" "time" @@ -10,7 +9,6 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/logger" - "github.com/ozontech/seq-db/util" ) // lifecycleManager manages the complete lifecycle of fractions. @@ -23,7 +21,7 @@ type lifecycleManager struct { registry *fractionRegistry // fraction state registry tasks *TaskManager // Background offloading tasks - sealingWg sync.WaitGroup + sealingWg sync.WaitGroup // todo: get rid after removing SealAll in tests } func newLifecycleManager( @@ -67,41 +65,14 @@ func (lc *lifecycleManager) SyncInfoCache() { } } -// seal converts an active fraction to sealed state. -// It freezes writes, waits for pending operations, then seals the fraction. -func (lc *lifecycleManager) seal(active *activeProxy) error { - sealsTotal.Inc() - now := time.Now() - sealed, err := lc.provider.Seal(active.instance) - if err != nil { - return err - } - sealingTime := time.Since(now) - sealsDoneSeconds.Observe(sealingTime.Seconds()) - - logger.Info( - "fraction sealed", - zap.String("fraction", filepath.Base(sealed.BaseFileName)), - zap.Float64("time_spent_s", util.DurationToUnit(sealingTime, "s")), - ) - - lc.infoCache.Add(sealed.Info()) - lc.registry.PromoteToSealed(active, sealed) - active.proxy.Redirect(sealed) - active.instance.Release() - return nil -} - // rotate checks if active fraction needs rotation based on size limit. // Creates new active fraction and starts sealing the previous one. func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { - activeToSeal, waitBeforeSealing, err := lc.registry.RotateIfFull(maxSize, func() *activeProxy { - return newActiveProxy(lc.provider.CreateActive()) - }) + active, waitBeforeSealing, err := lc.registry.RotateIfFull(maxSize, lc.provider) if err != nil { logger.Fatal("active fraction rotation error", zap.Error(err)) } - if activeToSeal == nil { + if active == nil { return } @@ -112,37 +83,39 @@ func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { defer lc.sealingWg.Done() waitBeforeSealing() - if err := lc.seal(activeToSeal); err != nil { + sealed, err := lc.provider.Seal(active.Active) + if err != nil { logger.Fatal("sealing error", zap.Error(err)) } + + lc.infoCache.Add(sealed.Info()) + lc.registry.PromoteToSealed(active, sealed) + active.Destroy() }() } // offloadLocal starts offloading of local fractions to remote storage. // Selects fractions based on disk space usage and retention policy. func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, retryDelay time.Duration, wg *sync.WaitGroup) { - toOffload, err := lc.registry.EvictLocal(true, sizeLimit) + toOffload, err := lc.registry.EvictLocalForOffload(sizeLimit) if err != nil { logger.Fatal("error releasing old fractions:", zap.Error(err)) } - for _, sealed := range toOffload { + for _, frac := range toOffload { wg.Add(1) - _, err := lc.tasks.Run(sealed.instance.BaseFileName, ctx, func(ctx context.Context) { + _, err := lc.tasks.Run(frac.BaseFileName, ctx, func(ctx context.Context) { defer wg.Done() - remote := lc.offloadWithRetry(ctx, sealed.instance, retryDelay) + remote := lc.offloadWithRetry(ctx, frac.Sealed, retryDelay) - lc.registry.PromoteToRemote(sealed, remote) + lc.registry.PromoteToRemote(frac, remote) if remote == nil { - sealed.proxy.Redirect(emptyFraction{}) - lc.infoCache.Remove(sealed.instance.Info().Name()) - } else { - sealed.proxy.Redirect(remote) + lc.infoCache.Remove(frac.Info().Name()) } // free up local resources - sealed.instance.Suicide() + frac.Destroy() maintenanceTruncateTotal.Add(1) }) if err != nil { @@ -209,20 +182,19 @@ func (lc *lifecycleManager) tryOffload(ctx context.Context, sealed *frac.Sealed) // cleanRemote deletes outdated remote fractions based on retention policy. func (lc *lifecycleManager) cleanRemote(retention time.Duration, wg *sync.WaitGroup) { toDelete := lc.registry.EvictRemote(retention) - wg.Add(1) - go func() { - defer wg.Done() - for _, remote := range toDelete { - remote.proxy.Redirect(emptyFraction{}) - lc.infoCache.Remove(remote.instance.Info().Name()) - remote.instance.Suicide() - } - }() + wg.Add(len(toDelete)) + for _, remote := range toDelete { + go func() { + defer wg.Done() + lc.infoCache.Remove(remote.Info().Name()) + remote.Destroy() + }() + } } // cleanLocal deletes outdated local fractions when offloading is disabled. func (lc *lifecycleManager) cleanLocal(sizeLimit uint64, wg *sync.WaitGroup) { - toDelete, err := lc.registry.EvictLocal(false, sizeLimit) + toDelete, err := lc.registry.EvictLocalForDelete(sizeLimit) if err != nil { logger.Fatal("error releasing old fractions:", zap.Error(err)) } @@ -232,16 +204,15 @@ func (lc *lifecycleManager) cleanLocal(sizeLimit uint64, wg *sync.WaitGroup) { } } - wg.Add(1) - go func() { - defer wg.Done() - for _, sealed := range toDelete { - sealed.proxy.Redirect(emptyFraction{}) - lc.infoCache.Remove(sealed.instance.Info().Name()) - sealed.instance.Suicide() + wg.Add(len(toDelete)) + for _, frac := range toDelete { + go func() { + defer wg.Done() + lc.infoCache.Remove(frac.Info().Name()) + frac.Destroy() maintenanceTruncateTotal.Add(1) - } - }() + }() + } } // updateOldestMetric updates the prometheus metric with oldest fraction timestamp. @@ -254,13 +225,13 @@ func (lc *lifecycleManager) updateOldestMetric() { // Stops ongoing offloading tasks and cleans up both local and remote resources. func (lc *lifecycleManager) removeOverflowed(sizeLimit uint64, wg *sync.WaitGroup) { evicted := lc.registry.EvictOverflowed(sizeLimit) - for _, item := range evicted { + for _, sealed := range evicted { wg.Add(1) go func() { defer wg.Done() // Cancel the offloading task - this operation may take significant time // hence executed in a separate goroutine to avoid blocking - lc.tasks.Cancel(item.instance.BaseFileName) + lc.tasks.Cancel(sealed.BaseFileName) }() } } diff --git a/fracmanager/lifecycle_manager_test.go b/fracmanager/lifecycle_manager_test.go index abd180e2..cb9ab1e0 100644 --- a/fracmanager/lifecycle_manager_test.go +++ b/fracmanager/lifecycle_manager_test.go @@ -1,14 +1,20 @@ package fracmanager import ( + "math" "math/rand" "path/filepath" "sync" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/seq" ) func setupLifecycle(t testing.TB, cfg *Config) (*lifecycleManager, func()) { @@ -31,21 +37,18 @@ func TestFracInfoCache(t *testing.T) { lc, tearDown := setupLifecycle(t, nil) defer tearDown() - var total uint64 - fillRotateAndCheck := func(names map[string]struct{}) { - active := lc.registry.Active() - appendDocsToActive(t, active.instance, 10+rand.Intn(10)) + appender := lc.registry.Appender() + appendDocsToActive(t, appender.Active, 10+rand.Intn(10)) wg := sync.WaitGroup{} lc.rotate(0, &wg) wg.Wait() - info := active.proxy.Info() + info := appender.Info() _, ok := lc.infoCache.Get(info.Name()) assert.True(t, ok) - total += info.FullSize() names[info.Name()] = struct{}{} } @@ -53,12 +56,13 @@ func TestFracInfoCache(t *testing.T) { for range 10 { fillRotateAndCheck(first) } - halfSize := total + halfSize := lc.registry.Stats().TotalSizeOnDiskLocal() second := map[string]struct{}{} for range 10 { fillRotateAndCheck(second) } + total := lc.registry.Stats().TotalSizeOnDiskLocal() wg := sync.WaitGroup{} lc.cleanLocal(total-halfSize, &wg) @@ -80,18 +84,14 @@ func TestCapacityExceeded(t *testing.T) { defer tearDown() const fracsCount = 10 - var total uint64 fillAndRotate := func() { - active := lc.registry.Active() - appendDocsToActive(t, active.instance, 10+rand.Intn(10)) + appender := lc.registry.Appender() + appendDocsToActive(t, appender.Active, 10+rand.Intn(10)) wg := sync.WaitGroup{} lc.rotate(0, &wg) wg.Wait() - - info := active.proxy.Info() - total += info.FullSize() } assert.False(t, lc.flags.IsCapacityExceeded(), "expect data dir is empty") @@ -102,6 +102,8 @@ func TestCapacityExceeded(t *testing.T) { } assert.False(t, lc.flags.IsCapacityExceeded(), "there should be no deletions and the flag is false") + total := lc.registry.Stats().TotalSizeOnDiskLocal() + wg := sync.WaitGroup{} lc.cleanLocal(total, &wg) wg.Wait() @@ -121,20 +123,15 @@ func TestOldestMetrics(t *testing.T) { defer tearDown() const fracsCount = 10 - var total uint64 - fillAndRotate := func() { - active := lc.registry.Active() - appendDocsToActive(t, active.instance, 10+rand.Intn(10)) + appender := lc.registry.Appender() + appendDocsToActive(t, appender.Active, 10+rand.Intn(10)) wg := sync.WaitGroup{} lc.rotate(0, &wg) wg.Wait() - - info := active.proxy.Info() - total += info.FullSize() } - firstFracTime := lc.registry.Active().proxy.Info().CreationTime + firstFracTime := lc.registry.Appender().Info().CreationTime for range fracsCount { fillAndRotate() } @@ -143,12 +140,15 @@ func TestOldestMetrics(t *testing.T) { assert.Equal(t, firstFracTime, lc.registry.OldestTotal(), "should point to the very first fraction when all data is local") assert.Equal(t, firstFracTime, lc.registry.OldestLocal(), "should point to the first fraction when nothing is offloaded") - halfSize := total - halfwayFracTime := lc.registry.Active().proxy.Info().CreationTime + halfSize := lc.registry.Stats().TotalSizeOnDiskLocal() + + halfwayFracTime := lc.registry.Appender().Info().CreationTime for range fracsCount { fillAndRotate() } + total := lc.registry.Stats().TotalSizeOnDiskLocal() + wg := sync.WaitGroup{} lc.offloadLocal(t.Context(), total-halfSize, 0, &wg) wg.Wait() @@ -158,3 +158,80 @@ func TestOldestMetrics(t *testing.T) { assert.Equal(t, firstFracTime, lc.registry.OldestTotal(), "should still reference the first fraction after offload") assert.Equal(t, halfwayFracTime, lc.registry.OldestLocal(), "should point to the oldest remaining local fraction after offload") } + +func TestPendingDestroy(t *testing.T) { + lc, tearDown := setupLifecycle(t, nil) + defer tearDown() + + const ( + fracsCount = 10 + docsPerFrac = 10 + ) + // appending docs to `fracsCount` fractions where the last is active and the rest are sealed + wg := sync.WaitGroup{} + for range fracsCount - 1 { + appendDocsToActive(t, lc.registry.Appender().Active, docsPerFrac) + lc.rotate(0, &wg) + } + appendDocsToActive(t, lc.registry.Appender().Active, docsPerFrac) + + // wait sealing complete + wg.Wait() + + // take all fracs to search + fractions1, release1 := lc.registry.AcquireAllFractions() + + // delete all sealing fracs + lc.cleanLocal(lc.registry.Appender().Info().FullSize(), &wg) + + var ( + beforeRelease time.Time + afterCleanup time.Time + ) + + cleanup := sync.WaitGroup{} + cleanup.Add(1) + go func() { + // cleanup is pending, so run it in a goroutine + // waiting for cleanup to finish + defer cleanup.Done() + wg.Wait() + afterCleanup = time.Now() + }() + + queryAst, err := parser.ParseSeqQL("*", seq.Mapping{}) + require.NoError(t, err, "failed to parse query") + params := processor.SearchParams{ + AST: queryAst.Root, + From: seq.MID(0), + To: seq.MID(math.MaxUint64), + Limit: math.MaxInt32, + } + + for _, f := range fractions1 { + qpr, err := f.Search(t.Context(), params) + assert.NoError(t, err, "failed to search") + assert.Equal(t, docsPerFrac, len(qpr.IDs)) + } + + beforeRelease = time.Now() + release1() + + cleanup.Wait() + assert.Less(t, beforeRelease, afterCleanup, "we expect cleanup to happen after release") + + fractions2, release2 := lc.registry.AcquireAllFractions() + + assert.Len(t, fractions2, 1, "only one active fraction should remain") + singleName := fractions2[0].Info().Name() + + for _, f := range fractions1 { + if f.Info().Name() == singleName { + continue + } + assert.Panics(t, func() { + _, _ = f.Search(t.Context(), params) + }, "searching by destroyed faction is expected to trigger a panic") + } + release2() +} diff --git a/fracmanager/partitioned_collection.go b/fracmanager/partitioned_collection.go new file mode 100644 index 00000000..7f37f045 --- /dev/null +++ b/fracmanager/partitioned_collection.go @@ -0,0 +1,117 @@ +package fracmanager + +import ( + "iter" + + "github.com/ozontech/seq-db/util" +) + +// PartitionedCollection manages a collection of objects grouped into partitions by a user‑defined value. +// Each partition is identified by a uint64. +type PartitionedCollection[T any] struct { + getPartition func(T) uint64 // function to extract partition ID from object + byKey map[string]T // primary index: key -> object + byPartition map[uint64]map[string]T // partition ID -> map[key]object + minPartition *util.MinHeap[uint64] // min‑heap of partition IDs for O(1) MinPartition +} + +// NewPartitionedCollection creates a new empty PartitionedCollection. +func NewPartitionedCollection[T any](getPartition func(T) uint64) PartitionedCollection[T] { + return PartitionedCollection[T]{ + getPartition: getPartition, + byKey: make(map[string]T), + byPartition: make(map[uint64]map[string]T), + minPartition: util.NewMinHeap[uint64](), + } +} + +// Add inserts a new object into the collection. +func (c *PartitionedCollection[T]) Add(key string, obj T) { + if _, ok := c.byKey[key]; ok { + return + } + + partitionID := c.getPartition(obj) + if _, ok := c.byPartition[partitionID]; !ok { + c.minPartition.Push(partitionID) + c.byPartition[partitionID] = make(map[string]T) + } + c.byPartition[partitionID][key] = obj + c.byKey[key] = obj +} + +// Delete removes an object from the collection by its key. +// Does nothing if the key doesn't exist. +func (c *PartitionedCollection[T]) Del(key string) { + obj, ok := c.byKey[key] + if !ok { + return + } + + partitionID := c.getPartition(obj) + delete(c.byPartition[partitionID], key) + if len(c.byPartition[partitionID]) == 0 { + c.minPartition.Remove(partitionID) + delete(c.byPartition, partitionID) + } + delete(c.byKey, key) +} + +// MinPartition returns the smallest partition ID among all stored objects. +// Returns 0 if the collection is empty. +func (c *PartitionedCollection[T]) MinPartition() uint64 { + if val, ok := c.minPartition.Min(); ok { + return val + } + return 0 +} + +// GetByPartition returns all objects in the specified partition. +func (c *PartitionedCollection[T]) GetByPartition(partitionID uint64) []T { + partitionMap, ok := c.byPartition[partitionID] + if !ok { + return nil + } + res := make([]T, 0, len(partitionMap)) + for _, obj := range partitionMap { + res = append(res, obj) + } + return res +} + +// Get retrieves an object by its key. +// Returns the object and true if found, zero value and false otherwise. +func (c *PartitionedCollection[T]) Get(key string) (T, bool) { + obj, ok := c.byKey[key] + return obj, ok +} + +// All returns all objects in the collection. +// The order is not guaranteed. +func (c *PartitionedCollection[T]) All() iter.Seq[T] { + return func(yield func(T) bool) { + for _, obj := range c.byKey { + if !yield(obj) { + return + } + } + } +} + +// Len returns the number of objects in the collection. +func (c *PartitionedCollection[T]) Len() int { + return len(c.byKey) +} + +// GetAllPartitions returns a map of all partitions in the collection. +func (c *PartitionedCollection[T]) GetAllPartitions() map[uint64][]T { + result := make(map[uint64][]T, len(c.byPartition)) + for partitionID, objects := range c.byPartition { + partition := make([]T, 0, len(objects)) + for _, obj := range objects { + partition = append(partition, obj) + } + result[partitionID] = partition + } + return result +} diff --git a/fracmanager/proxy_frac.go b/fracmanager/proxy_frac.go deleted file mode 100644 index 6d4df41f..00000000 --- a/fracmanager/proxy_frac.go +++ /dev/null @@ -1,201 +0,0 @@ -package fracmanager - -import ( - "context" - "errors" - "math" - "sync" - "time" - - "go.uber.org/zap" - - "github.com/ozontech/seq-db/frac" - "github.com/ozontech/seq-db/frac/common" - "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/logger" - "github.com/ozontech/seq-db/metric" - "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/util" -) - -var ( - _ frac.Fraction = (*fractionProxy)(nil) - _ frac.Fraction = (*emptyFraction)(nil) - - ErrFractionNotWritable = errors.New("fraction is not writable") - ErrFractionSuspended = errors.New("write operations temporarily suspended - database capacity exceeded") -) - -// fractionProxy provides thread-safe access to a fraction with atomic replacement -// Used to switch fraction implementations (active → sealed → remote) without blocking readers. -// Lifecycle: Created for each fraction, persists through state transitions. -type fractionProxy struct { - mu sync.RWMutex - impl frac.Fraction // Current fraction implementation -} - -func (p *fractionProxy) Redirect(f frac.Fraction) { - p.mu.Lock() - defer p.mu.Unlock() - p.impl = f -} - -func (p *fractionProxy) Info() *common.Info { - p.mu.RLock() - defer p.mu.RUnlock() - return p.impl.Info() -} - -func (p *fractionProxy) IsIntersecting(from, to seq.MID) bool { - p.mu.RLock() - defer p.mu.RUnlock() - return p.impl.IsIntersecting(from, to) -} - -func (p *fractionProxy) Contains(mid seq.MID) bool { - p.mu.RLock() - defer p.mu.RUnlock() - return p.impl.Contains(mid) -} - -func (p *fractionProxy) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) { - p.mu.RLock() - defer p.mu.RUnlock() - return p.impl.Fetch(ctx, ids) -} - -func (p *fractionProxy) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) { - p.mu.RLock() - defer p.mu.RUnlock() - return p.impl.Search(ctx, params) -} - -func (p *fractionProxy) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { - return p.impl.FindLIDs(ctx, ids) -} - -// activeProxy manages an active (writable) fraction -// Tracks pending write operations and provides freeze capability. -// Lifecycle: Created when fraction becomes active, destroyed after sealing. -type activeProxy struct { - proxy *fractionProxy // Thread-safe fraction access - instance *frac.Active // Actual active fraction instance - sealed *frac.Sealed // Sealed version (set after sealing) - - mu sync.RWMutex // Protects readonly state - wg sync.WaitGroup // Tracks pending write operations - - finalized bool // Whether fraction is frozen for writes - suspended bool // Temporarily suspended for writes -} - -func newActiveProxy(active *frac.Active) *activeProxy { - return &activeProxy{ - proxy: &fractionProxy{impl: active}, - instance: active, - } -} - -// Append adds documents to the active fraction -func (p *activeProxy) Append(docs, meta []byte) error { - p.mu.RLock() - if p.finalized { - p.mu.RUnlock() - return ErrFractionNotWritable - } - if p.suspended { - p.mu.RUnlock() - return ErrFractionSuspended - } - p.wg.Add(1) // Important: wg.Add() inside lock to prevent race with WaitWriteIdle() - p.mu.RUnlock() - - return p.instance.Append(docs, meta, &p.wg) -} - -// WaitWriteIdle waits for all pending write operations to complete -// Used before sealing to ensure data consistency. -func (p *activeProxy) WaitWriteIdle() { - start := time.Now() - logger.Info("waiting fraction to stop write...", zap.String("name", p.instance.BaseFileName)) - p.wg.Wait() - waitTime := util.DurationToUnit(time.Since(start), "s") - logger.Info("write is stopped", - zap.String("name", p.instance.BaseFileName), - zap.Float64("time_wait_s", waitTime)) -} - -func (p *activeProxy) Suspended() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.suspended -} - -func (p *activeProxy) Suspend(value bool) { - p.mu.Lock() - p.suspended = value - p.mu.Unlock() -} - -// Finalize marks the fraction as read-only and prevents new writes from starting after finalize. -func (p *activeProxy) Finalize() error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.finalized { - return errors.New("fraction is already finalized") - } - p.finalized = true - - return nil -} - -// sealedProxy represents a sealed fraction that may be offloaded -// Tracks both local sealed instance and remote version if offloaded. -type sealedProxy struct { - proxy *fractionProxy // Thread-safe fraction access - instance *frac.Sealed // Local sealed fraction - remote *frac.Remote // Remote version (if offloaded) -} - -// remoteProxy represents an offloaded fraction -type remoteProxy struct { - proxy *fractionProxy // Thread-safe fraction access - instance *frac.Remote // Remote fraction instance -} - -// emptyFraction represents a missing or deleted fraction -// Returns empty results for all operations. -// Used as placeholder when fraction is removed but references still exist. -type emptyFraction struct { -} - -func (emptyFraction) Info() *common.Info { - return &common.Info{ - Path: "empty", - From: math.MaxUint64, - To: 0, - } -} - -func (emptyFraction) IsIntersecting(_, _ seq.MID) bool { - return false -} - -func (emptyFraction) Contains(mid seq.MID) bool { - return false -} - -func (emptyFraction) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) { - return nil, nil -} - -func (emptyFraction) Search(_ context.Context, params processor.SearchParams) (*seq.QPR, error) { - metric.CountersTotal.WithLabelValues("empty_data_provider").Inc() - return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil -} - -func (emptyFraction) FindLIDs(_ context.Context, _ []seq.ID) ([]seq.LID, error) { - return nil, nil -} diff --git a/fracmanager/sync_appender.go b/fracmanager/sync_appender.go new file mode 100644 index 00000000..b8d93ab4 --- /dev/null +++ b/fracmanager/sync_appender.go @@ -0,0 +1,82 @@ +package fracmanager + +import ( + "errors" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/util" +) + +var ( + ErrFractionNotWritable = errors.New("fraction is not writable") + ErrFractionSuspended = errors.New("write operations temporarily suspended - database capacity exceeded") +) + +type syncAppender struct { + refCountedActive // Actual active fraction instance + + mu sync.RWMutex // Protects readonly state + wg sync.WaitGroup // Tracks pending write operations + + finalized bool // Whether fraction is frozen for writes + suspended bool // Temporarily suspended for writes +} + +// Append adds documents to the active fraction +func (a *syncAppender) Append(docs, meta []byte) error { + a.mu.RLock() + if a.finalized { + a.mu.RUnlock() + return ErrFractionNotWritable + } + if a.suspended { + a.mu.RUnlock() + return ErrFractionSuspended + } + a.wg.Add(1) // Important: wg.Add() inside lock to prevent race with WaitWriteIdle() + a.mu.RUnlock() + + return a.refCountedActive.Append(docs, meta, &a.wg) +} + +func (a *syncAppender) Suspended() bool { + a.mu.Lock() + defer a.mu.Unlock() + + return a.suspended +} + +func (a *syncAppender) Suspend(value bool) { + a.mu.Lock() + a.suspended = value + a.mu.Unlock() +} + +// WaitWriteIdle waits for all pending write operations to complete +// Used before sealing to ensure data consistency. +func (a *syncAppender) WaitWriteIdle() { + start := time.Now() + logger.Info("waiting fraction to stop write...", zap.String("name", a.BaseFileName)) + a.wg.Wait() + waitTime := util.DurationToUnit(time.Since(start), "s") + logger.Info("write is stopped", + zap.String("name", a.BaseFileName), + zap.Float64("time_wait_s", waitTime)) +} + +// Finalize marks the fraction as read-only and prevents new writes from starting after finalize. +func (a *syncAppender) Finalize() error { + a.mu.Lock() + if a.finalized { + a.mu.Unlock() + return errors.New("fraction is already finalized") + } + a.finalized = true + a.mu.Unlock() + + return nil +} diff --git a/skipmaskmanager/skip_mask_manager.go b/skipmaskmanager/skip_mask_manager.go index 79f6680d..40f48480 100644 --- a/skipmaskmanager/skip_mask_manager.go +++ b/skipmaskmanager/skip_mask_manager.go @@ -45,7 +45,7 @@ type MappingProvider interface { } type fractionAcquirer interface { - Fractions() fracmanager.List + AcquireFractions() (fracmanager.List, func()) AcquireFraction(name string) (_ frac.Fraction, release func(), ok bool) } @@ -142,7 +142,7 @@ func New( // - Begins asynchronous processing of all skip mask queries // // This method must be called before using the manager. -func (smm *SkipMaskManager) Start(fracs fractionAcquirer) { +func (smm *SkipMaskManager) Start(fracProvider fractionAcquirer) { smm.createDataDir() err := smm.loadSkipMasks() @@ -150,7 +150,10 @@ func (smm *SkipMaskManager) Start(fracs fractionAcquirer) { logger.Fatal("failed to load previous skip masks", zap.Error(err)) } - err = smm.buildQueue(fracs.Fractions()) + fracs, release := fracProvider.AcquireFractions() + defer release() + + err = smm.buildQueue(fracs) if err != nil { logger.Fatal("failed to build skip mask manager queue", zap.Error(err)) } @@ -171,7 +174,7 @@ func (smm *SkipMaskManager) Start(fracs fractionAcquirer) { } sm.ast = ast - smm.processSkipMask(sm, fracs) + smm.processSkipMask(sm, fracProvider) } }() } @@ -439,7 +442,7 @@ func (smm *SkipMaskManager) buildQueue(fracs fracmanager.List) error { // It processes each fraction with a .queue file, running search queries in parallel // (limited by the rate limiter). Each successful search writes results to a .skipmask // file. The skip mask status is set to Done when all fractions are processed. -func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fractionAcquirer) { +func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracProvider fractionAcquirer) { skipMaskDes, err := os.ReadDir(skipMask.dirPath) if err != nil { panic(fmt.Errorf("BUG: reading directory must be successful: %s", err)) @@ -457,7 +460,7 @@ func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fractionAc defer skipMask.processWg.Done() defer func() { <-smm.rateLimit }() - f, release, ok := fracs.AcquireFraction(fracNameFromFilePath(name)) + f, release, ok := fracProvider.AcquireFraction(fracNameFromFilePath(name)) if !ok { // skip missing fracs return } diff --git a/storeapi/grpc_fetch.go b/storeapi/grpc_fetch.go index d640618c..9eb14147 100644 --- a/storeapi/grpc_fetch.go +++ b/storeapi/grpc_fetch.go @@ -68,7 +68,10 @@ func (g *GrpcV1) doFetch(ctx context.Context, req *storeapi.FetchRequest, stream dp := acquireDocFieldsFilter(req.FieldsFilter) defer releaseDocFieldsFilter(dp) - docsStream := newDocsStream(ctx, ids, g.fetchData.docFetcher, g.fracManager.Fractions()) + fracs, release := g.fracManager.AcquireFractions() + defer release() + + docsStream := newDocsStream(ctx, ids, g.fetchData.docFetcher, fracs) for _, id := range ids { workTime := time.Now() doc, err := docsStream.Next() diff --git a/storeapi/grpc_search.go b/storeapi/grpc_search.go index 9eb89e73..e5eedd98 100644 --- a/storeapi/grpc_search.go +++ b/storeapi/grpc_search.go @@ -189,18 +189,13 @@ func (g *GrpcV1) doSearch( } searchTr := tr.NewChild("search iteratively") - qpr, err := g.searchData.searcher.SearchDocs( - ctx, - g.fracManager.Fractions(), - searchParams, - tr, - ) + qpr, err := g.searchDocs(ctx, searchParams, tr) searchTr.Done() + if err != nil { if code, ok := parseStoreError(err); ok { return &storeapi.SearchResponse{Code: code}, nil } - return nil, err } @@ -229,6 +224,13 @@ func (g *GrpcV1) doSearch( return buildSearchResponse(qpr), nil } +func (g *GrpcV1) searchDocs(ctx context.Context, sp processor.SearchParams, tr *querytracer.Tracer) (*seq.QPR, error) { + fracs, release := g.fracManager.AcquireFractions() + defer release() + + return g.searchData.searcher.SearchDocs(ctx, fracs, sp, tr) +} + func (g *GrpcV1) parseQuery(query string) (*parser.ASTNode, error) { seqql, err := parser.ParseSeqQL(query, g.mappingProvider.GetMapping()) if err != nil { diff --git a/util/min_heap.go b/util/min_heap.go new file mode 100644 index 00000000..5ec79aee --- /dev/null +++ b/util/min_heap.go @@ -0,0 +1,117 @@ +package util + +import ( + "cmp" + "container/heap" +) + +// MinHeap is a min‑heap for any comparable type. +// Maintains both a heap structure and a map for fast lookup of items. +type MinHeap[T cmp.Ordered] struct { + items []*heapItem[T] // Heap elements + indexMap map[T]*heapItem[T] // Value → item mapping for O(1) lookup +} + +// heapItem represents an element in the heap. +type heapItem[T comparable] struct { + value T // Stored value + index int // Current index in the heap +} + +// NewMinHeap creates and initializes a new MinHeap instance. +func NewMinHeap[T cmp.Ordered]() *MinHeap[T] { + h := &MinHeap[T]{ + items: make([]*heapItem[T], 0), + indexMap: make(map[T]*heapItem[T]), + } + heap.Init((*heapWrapper[T])(h)) + return h +} + +// Push adds a value to the heap if it doesn't already exist (no duplicates). +func (h *MinHeap[T]) Push(value T) { + if _, ok := h.indexMap[value]; !ok { + item := &heapItem[T]{ + value: value, + index: -1, + } + h.indexMap[value] = item + heap.Push((*heapWrapper[T])(h), item) + } +} + +// Remove deletes one occurrence of the specified value from the heap. +// Does nothing if the value doesn't exist. +func (h *MinHeap[T]) Remove(value T) { + item, ok := h.indexMap[value] + if !ok { + return + } + heap.Remove((*heapWrapper[T])(h), item.index) + delete(h.indexMap, value) +} + +// PopMin removes and returns the minimum value from the heap. +// Returns (zero value, false) if the heap is empty. +func (h *MinHeap[T]) PopMin() (T, bool) { + var zero T + if len(h.items) == 0 { + return zero, false + } + item := h.items[0] + value := item.value + heap.Pop((*heapWrapper[T])(h)) + return value, true +} + +// Min returns the minimum value in the heap without removing it. +// Returns (zero value, false) if the heap is empty. +func (h *MinHeap[T]) Min() (T, bool) { + var zero T + if len(h.items) == 0 { + return zero, false + } + return h.items[0].value, true +} + +// Len returns the current number of elements in the heap. +func (h *MinHeap[T]) Len() int { + return len(h.items) +} + +// heapWrapper is a type alias for MinHeap to implement heap.Interface. +type heapWrapper[T cmp.Ordered] MinHeap[T] + +// Len is part of heap.Interface — returns the number of elements. +func (hw *heapWrapper[T]) Len() int { + return len(hw.items) +} + +// Less is part of heap.Interface — defines min‑heap order (smaller values first). +func (hw *heapWrapper[T]) Less(i, j int) bool { + return hw.items[i].value < hw.items[j].value +} + +// Swap is part of heap.Interface — swaps elements and updates their indices. +func (hw *heapWrapper[T]) Swap(i, j int) { + hw.items[i], hw.items[j] = hw.items[j], hw.items[i] + hw.items[i].index = i + hw.items[j].index = j +} + +// Push is part of heap.Interface — adds a new element to the heap. +func (hw *heapWrapper[T]) Push(x interface{}) { + item := x.(*heapItem[T]) + item.index = len(hw.items) + hw.items = append(hw.items, item) +} + +// Pop is part of heap.Interface — removes and returns the last element. +func (hw *heapWrapper[T]) Pop() interface{} { + old := hw.items + n := len(old) - 1 + item := old[n] + item.index = -1 + hw.items = old[0:n] + return item +} From e447d7f40a8192df5117219beb0e7f3dcfe1dd62 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Wed, 20 May 2026 17:02:42 +0300 Subject: [PATCH 2/2] review fixes --- fracmanager/sync_appender.go | 6 ++++-- util/min_heap.go | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/fracmanager/sync_appender.go b/fracmanager/sync_appender.go index b8d93ab4..76cf4ee0 100644 --- a/fracmanager/sync_appender.go +++ b/fracmanager/sync_appender.go @@ -63,9 +63,11 @@ func (a *syncAppender) WaitWriteIdle() { logger.Info("waiting fraction to stop write...", zap.String("name", a.BaseFileName)) a.wg.Wait() waitTime := util.DurationToUnit(time.Since(start), "s") - logger.Info("write is stopped", + logger.Info( + "write is stopped", zap.String("name", a.BaseFileName), - zap.Float64("time_wait_s", waitTime)) + zap.Float64("time_wait_s", waitTime), + ) } // Finalize marks the fraction as read-only and prevents new writes from starting after finalize. diff --git a/util/min_heap.go b/util/min_heap.go index 5ec79aee..70b92fcf 100644 --- a/util/min_heap.go +++ b/util/min_heap.go @@ -100,14 +100,14 @@ func (hw *heapWrapper[T]) Swap(i, j int) { } // Push is part of heap.Interface — adds a new element to the heap. -func (hw *heapWrapper[T]) Push(x interface{}) { +func (hw *heapWrapper[T]) Push(x any) { item := x.(*heapItem[T]) item.index = len(hw.items) hw.items = append(hw.items, item) } // Pop is part of heap.Interface — removes and returns the last element. -func (hw *heapWrapper[T]) Pop() interface{} { +func (hw *heapWrapper[T]) Pop() any { old := hw.items n := len(old) - 1 item := old[n]