Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions asyncsearcher/async_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ type AsyncSearcherConfig struct {
}

type fractionAcquirer interface {
Fractions() fracmanager.List
AcquireFractions() (_ fracmanager.List, release func())
AcquireFraction(name string) (_ frac.Fraction, release func(), ok bool)
}

func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fractionAcquirer) *AsyncSearcher {
func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracProvider fractionAcquirer) *AsyncSearcher {
if config.DataDir == "" {
logger.Fatal("can't start async searcher: DataDir is empty")
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fracti
for _, id := range notProcessedIDs {
asyncSearchActiveSearches.Add(1)
as.processWg.Add(1)
go as.processRequest(id, fracs)
go as.processRequest(id, fracProvider)
}

// set limit metrics that allow us to calculate alerts' thresholds
Expand Down Expand Up @@ -209,7 +209,7 @@ func (i *asyncSearchInfo) Status() AsyncSearchStatus {
return status
}

func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fractionAcquirer) error {
func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracProvider fractionAcquirer) error {
if as.readOnly.Load() {
return fmt.Errorf("cannot start search on read-only mode")
}
Expand Down Expand Up @@ -240,14 +240,17 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fractionAcquire
return fmt.Errorf("retention time should be less than %s, got %s", maxRetention, r.Retention)
}

fracNames := fracs.Fractions().FilterInRange(r.Params.From, r.Params.To).Names()
fracs, release := fracProvider.AcquireFractions()
defer release()

fracNames := fracs.FilterInRange(r.Params.From, r.Params.To).Names()
if ok := as.saveSearchInfo(r, fracNames); !ok {
// Request was saved previously, skip it
return nil
}
asyncSearchActiveSearches.Add(1)
as.processWg.Add(1)
go as.processRequest(r.ID, fracs)
go as.processRequest(r.ID, fracProvider)
return nil
}

Expand Down Expand Up @@ -301,17 +304,17 @@ func (as *AsyncSearcher) createDataDir() {
})
}

func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fractionAcquirer) {
func (as *AsyncSearcher) processRequest(asyncSearchID string, fracProvider fractionAcquirer) {
defer as.processWg.Done()

as.rateLimit <- struct{}{}
defer func() { <-as.rateLimit }()

as.doSearch(asyncSearchID, fracs)
as.doSearch(asyncSearchID, fracProvider)
asyncSearchActiveSearches.Add(-1)
}

func (as *AsyncSearcher) doSearch(id string, fracs fractionAcquirer) {
func (as *AsyncSearcher) doSearch(id string, fracProvider fractionAcquirer) {
qprPaths, err := as.findQPRs(id)
if err != nil {
panic(fmt.Errorf("can't find QPRs for id %q: %s", id, err))
Expand Down Expand Up @@ -353,7 +356,7 @@ func (as *AsyncSearcher) doSearch(id string, fracs fractionAcquirer) {
if as.shouldStopSearch(id) {
break
}
if err := as.acquireAndProcessFrac(fracInfo, info, fracs); err != nil {
if err := as.acquireAndProcessFrac(fracInfo, info, fracProvider); err != nil {
as.updateSearchInfo(id, func(info *asyncSearchInfo) {
info.Error = err.Error()
})
Expand Down Expand Up @@ -395,8 +398,8 @@ func compressQPR(qpr *seq.QPR, cb func(compressed []byte) error) error {
return nil
}

func (as *AsyncSearcher) acquireAndProcessFrac(fracInfo fracSearchState, searchInfo asyncSearchInfo, fracs fractionAcquirer) (err error) {
f, release, ok := fracs.AcquireFraction(fracInfo.Name)
func (as *AsyncSearcher) acquireAndProcessFrac(fracInfo fracSearchState, searchInfo asyncSearchInfo, fracProvider fractionAcquirer) (err error) {
f, release, ok := fracProvider.AcquireFraction(fracInfo.Name)
if !ok { // oldest fracs may already be removed
logger.Info(
"async search: skip missing fraction",
Expand Down
4 changes: 2 additions & 2 deletions asyncsearcher/async_searcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (fp fakeFractionProvider) AcquireFraction(name string) (frac.Fraction, func
return nil, func() {}, false
}

func (fp fakeFractionProvider) Fractions() fracmanager.List {
return fracmanager.List(fp)
func (fp fakeFractionProvider) AcquireFractions() (fracmanager.List, func()) {
return fracmanager.List(fp), func() {}
}

func TestAsyncSearcherMaintain(t *testing.T) {
Expand Down
19 changes: 10 additions & 9 deletions fracmanager/fracmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,19 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk
cancel()
wg.Wait()

// freeze active fraction to prevent new writes
active := lc.registry.Active()
if err := active.Finalize(); err != nil {
// finalize appender to prevent new writes
appender := lc.registry.Appender()
if err := appender.Finalize(); err != nil {
logger.Fatal("shutdown fraction freezing error", zap.Error(err))
}
active.WaitWriteIdle()
appender.WaitWriteIdle()

stopIdx()

lc.SyncInfoCache()

sealOnShutdown(active.instance, provider, cfg.MinSealFracSize)
// Seal active fraction
sealOnShutdown(appender.Active, provider, cfg.MinSealFracSize)

logger.Info("fracmanager's workers are stopped", zap.Int64("took_ms", time.Since(n).Milliseconds()))
}
Expand All @@ -96,11 +97,11 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk
}

func (fm *FracManager) AcquireFraction(name string) (frac.Fraction, func(), bool) {
return fm.lc.registry.AcquireFraction(name)
return fm.lc.registry.AcquireOneFraction(name)
}

func (fm *FracManager) Fractions() List {
return fm.lc.registry.AllFractions()
func (fm *FracManager) AcquireFractions() (List, func()) {
return fm.lc.registry.AcquireAllFractions()
}

func (fm *FracManager) Oldest() uint64 {
Expand All @@ -120,7 +121,7 @@ func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas
return ctx.Err()
default:
// Try to append data to the currently active fraction
err := fm.lc.registry.Active().Append(docs, metas)
err := fm.lc.registry.Appender().Append(docs, metas)
if err != nil {
logger.Info("append fail", zap.Error(err))
if err == ErrFractionNotWritable {
Expand Down
2 changes: 1 addition & 1 deletion fracmanager/fracmanager_for_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package fracmanager
import "sync"

func (fm *FracManager) WaitIdleForTests() {
fm.lc.registry.Active().WaitWriteIdle()
fm.lc.registry.Appender().WaitWriteIdle()
}

func (fm *FracManager) SealForcedForTests() {
Expand Down
24 changes: 13 additions & 11 deletions fracmanager/fracmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,32 @@ func TestSealingOnShutdown(t *testing.T) {
cfg.MinSealFracSize = 0 // to ensure that the frac will not be sealed on shutdown
cfg, fm, stop := setupFracManager(t, cfg)
appendDocsToFracManager(t, fm, 10)
activeName := fm.Fractions()[0].Info().Name()

activeName := fm.lc.registry.all.fractions[0].Info().Name()

stop()

// second start
cfg.MinSealFracSize = 1 // to ensure that the frac will be sealed on shutdown
cfg, fm, stop = setupFracManager(t, cfg)

assert.Equal(t, 1, len(fm.Fractions()), "should have one fraction")
assert.Equal(t, activeName, fm.Fractions()[0].Info().Name(), "fraction should have the same name")
_, ok := fm.Fractions()[0].(*fractionProxy).impl.(*frac.Active)
allFractions := fm.lc.registry.all.fractions
assert.Equal(t, 1, len(allFractions), "should have one fraction")
assert.Equal(t, activeName, allFractions[0].Info().Name(), "fraction should have the same name")
_, ok := allFractions[0].(*syncAppender)
assert.True(t, ok, "fraction should be active")

stop()

// third start
_, fm, stop = setupFracManager(t, cfg)

assert.Equal(t, 2, len(fm.Fractions()), "should have 2 fraction: new active and old sealed")
_, ok = fm.Fractions()[0].(*fractionProxy).impl.(*frac.Sealed)
allFractions = fm.lc.registry.all.fractions
assert.Equal(t, 2, len(allFractions), "should have 2 fraction: new active and old sealed")
_, ok = allFractions[0].(*refCountedSealed)
assert.True(t, ok, "first fraction should be sealed")
assert.Equal(t, activeName, fm.Fractions()[0].Info().Name(), "sealed fraction should have the same name")
assert.Equal(t, uint32(0), fm.Fractions()[1].Info().DocsTotal, "active fraction should be empty")
_, ok = fm.Fractions()[1].(*fractionProxy).impl.(*frac.Active)
assert.Equal(t, activeName, allFractions[0].Info().Name(), "sealed fraction should have the same name")
assert.Equal(t, uint32(0), allFractions[1].Info().DocsTotal, "active fraction should be empty")
_, ok = allFractions[1].(*syncAppender)
assert.True(t, ok, "new fraction should be active")

stop()
}
4 changes: 4 additions & 0 deletions fracmanager/fracs_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,7 @@ func (s *registryStats) SetMetrics() {
s.offloading.SetMetrics(dataSizeTotal, "offloading")
s.remotes.SetMetrics(dataSizeTotal, "remotes")
}

func (s registryStats) TotalSizeOnDiskLocal() uint64 {
return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk
Comment thread
dkharms marked this conversation as resolved.
}
26 changes: 21 additions & 5 deletions fracmanager/fraction_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ import (
"time"

"github.com/oklog/ulid/v2"
"go.uber.org/zap"

"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/node"
"github.com/ozontech/seq-db/storage"
"github.com/ozontech/seq-db/storage/s3"
"github.com/ozontech/seq-db/util"
)

const fileBasePattern = "seq-db-"
Expand Down Expand Up @@ -123,8 +126,11 @@ func (fp *fractionProvider) CreateActive() *frac.Active {

// Seal converts an active fraction to a sealed one
// Process includes sorting, indexing, and data optimization for reading
func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) {
src, err := frac.NewActiveSealingSource(active, fp.config.SealParams)
func (fp *fractionProvider) Seal(a *frac.Active) (*frac.Sealed, error) {
sealsTotal.Inc()
now := time.Now()

src, err := frac.NewActiveSealingSource(a, fp.config.SealParams)
if err != nil {
return nil, err
}
Expand All @@ -133,9 +139,19 @@ func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) {
return nil, err
}

sealedFrac := fp.NewSealedPreloaded(active.BaseFileName, preloaded)
fp.skipMaskProvider.RefreshFrac(sealedFrac)
return sealedFrac, nil
s := fp.NewSealedPreloaded(a.BaseFileName, preloaded)
fp.skipMaskProvider.RefreshFrac(s)

sealingTime := time.Since(now)
sealsDoneSeconds.Observe(sealingTime.Seconds())

logger.Info(
"fraction sealed",
zap.String("fraction", filepath.Base(s.BaseFileName)),
zap.Float64("time_spent_s", util.DurationToUnit(sealingTime, "s")),
)

return s, nil
}

// Offload uploads fraction to S3 storage and returns a remote fraction
Expand Down
Loading
Loading