Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 43 additions & 22 deletions tsdb/index/tsi1/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -62,7 +63,7 @@ type Partition struct {
// Fieldset shared with engine.
fieldset *tsdb.MeasurementFieldSet

currentCompactionN int // counter of in-progress compactions
currentCompactionN atomic.Int32 // counter of in-progress compactions

// Directory of the Partition's index files.
path string
Expand Down Expand Up @@ -348,22 +349,38 @@ func (p *Partition) buildSeriesSet() error {
}

// CurrentCompactionN returns the number of compactions currently running.
func (p *Partition) CurrentCompactionN() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.currentCompactionN
func (p *Partition) CurrentCompactionN() int32 {
return p.currentCompactionN.Load()
}

// Wait will block until all compactions are finished.
// Must only be called while they are disabled.
func (p *Partition) Wait() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()

// Debug level timeout
timeoutDuration := 24 * time.Hour
startTime := time.Now()

for {
if p.CurrentCompactionN() == 0 {
return
}
<-ticker.C
select {
case <-ticker.C:
elapsed := time.Since(startTime)
if elapsed >= timeoutDuration {
files := make([]string, 0)
for _, v := range p.fileSet.Files() {
files = append(files, v.Path())
}
p.logger.Warn("Partition.Wait() timed out waiting for compactions to complete",
zap.Int32("stuck_compactions", p.CurrentCompactionN()), zap.Duration("timeout", timeoutDuration),
zap.Strings("files", files))
startTime = time.Now()
}
}
}
}

Expand Down Expand Up @@ -1040,14 +1057,17 @@ func (p *Partition) compact() {
}
// Mark the level as compacting.
p.levelCompacting[0] = true
p.currentCompactionN++
p.currentCompactionN.Add(1)
go func() {
defer func() {
p.mu.Lock()
p.currentCompactionN.Add(-1)
p.levelCompacting[0] = false
p.mu.Unlock()
p.Compact()
}()

p.compactLogFile(logFile)
p.mu.Lock()
p.currentCompactionN--
p.levelCompacting[0] = false
p.mu.Unlock()
p.Compact()
}()
}
}
Expand Down Expand Up @@ -1079,20 +1099,21 @@ func (p *Partition) compact() {
// Execute in closure to save reference to the group within the loop.
func(files []*IndexFile, level int) {
// Start compacting in a separate goroutine.
p.currentCompactionN++
p.currentCompactionN.Add(1)
go func() {
defer func() {
// Ensure compaction lock for the level is released.
p.mu.Lock()
p.levelCompacting[level] = false
p.currentCompactionN.Add(-1)
p.mu.Unlock()

// Check for new compactions
p.Compact()
}()

// Compact to a new level.
p.compactToLevel(files, level+1, interrupt)

// Ensure compaction lock for the level is released.
p.mu.Lock()
p.levelCompacting[level] = false
p.currentCompactionN--
p.mu.Unlock()

// Check for new compactions
p.Compact()
}()
}(files, level)
}
Expand Down