Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.35.0
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310
github.com/bitly/go-simplejson v0.5.1
github.com/bmatcuk/doublestar/v4 v4.8.1
github.com/bufbuild/protocompile v0.13.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ github.com/alicebob/miniredis/v2 v2.35.0 h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21j
github.com/alicebob/miniredis/v2 v2.35.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310 h1:BUAU3CGlLvorLI26FmByPp2eC2qla6E1Tw+scpcg/to=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow=
Expand Down
135 changes: 106 additions & 29 deletions plugin/action/cardinality/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,143 @@ import (
"sync"
"time"

radix "github.com/armon/go-radix"
"github.com/ozontech/file.d/xtime"
)

// bucket holds keys for a single prefix with fast counting.
type bucket struct {
keys map[string]int64
minTs int64 // oldest key timestamp; 0 if empty
}

type Cache struct {
mu *sync.RWMutex
tree *radix.Tree
tree map[string]*bucket
ttl int64
}

func NewCache(ttl time.Duration) *Cache {
return &Cache{
tree: radix.New(),
tree: make(map[string]*bucket),
ttl: ttl.Nanoseconds(),
mu: &sync.RWMutex{},
}
}

func (c *Cache) Set(key string) bool {
// Set stores a full key under the given prefix bucket.
// It returns true if the key already existed, false otherwise.
func (c *Cache) Set(prefix, key string) bool {
c.mu.Lock()
defer c.mu.Unlock()

_, result := c.tree.Insert(key, xtime.GetInaccurateUnixNano())
return result
b, ok := c.tree[prefix]
if !ok {
b = &bucket{keys: make(map[string]int64)}
c.tree[prefix] = b
}

ts := xtime.GetInaccurateUnixNano()
if _, exists := b.keys[key]; exists {
b.keys[key] = ts
return true
}

b.keys[key] = ts
if b.minTs == 0 || ts < b.minTs {
b.minTs = ts
}
return false
}

func (c *Cache) isExpire(now, value int64) bool {
diff := now - value
return diff > c.ttl
// CountPrefix returns the number of non-expired keys under the prefix.
// Expired keys are cleaned synchronously on the first call that detects them.
func (c *Cache) CountPrefix(prefix string) int {
threshold := xtime.GetInaccurateUnixNano() - c.ttl

c.mu.RLock()
b := c.tree[prefix]
if b == nil {
c.mu.RUnlock()
return 0
}

// Oldest key hasn't expired → no keys expired. Return count in O(1).
if b.minTs >= threshold {
count := len(b.keys)
c.mu.RUnlock()
return count
}

// Some keys may have expired — upgrade to write lock and scan.
c.mu.RUnlock()

c.mu.Lock()
b = c.tree[prefix]
if b == nil {
c.mu.Unlock()
return 0
}

// Recompute threshold and re-check under write lock.
threshold = xtime.GetInaccurateUnixNano() - c.ttl
if b.minTs >= threshold {
count := len(b.keys)
c.mu.Unlock()
return count
}

count := 0
newMinTs := int64(0)
for key, ts := range b.keys {
if ts >= threshold {
count++
if newMinTs == 0 || ts < newMinTs {
newMinTs = ts
}
} else {
delete(b.keys, key)
}
}
b.minTs = newMinTs
if len(b.keys) == 0 {
delete(c.tree, prefix)
}
c.mu.Unlock()

return count
}

func (c *Cache) delete(keysToDelete ...string) {
func (c *Cache) delete(prefix string, keysToDelete ...string) {
if len(keysToDelete) == 0 {
return
}
c.mu.Lock()
defer c.mu.Unlock()

for _, key := range keysToDelete {
c.tree.Delete(key)
b := c.tree[prefix]
if b == nil {
return
}
}

func (c *Cache) CountPrefix(prefix string) (count int) {
var keysToDelete []string
now := xtime.GetInaccurateUnixNano()
c.mu.RLock()
c.tree.WalkPrefix(prefix, func(s string, v any) bool {
timeValue := v.(int64)
if c.isExpire(now, timeValue) {
keysToDelete = append(keysToDelete, s)
} else {
count++
minTsDeleted := false
for _, key := range keysToDelete {
if ts, ok := b.keys[key]; ok {
if ts == b.minTs {
minTsDeleted = true
}
delete(b.keys, key)
}
return false
})
c.mu.RUnlock()
}

if len(keysToDelete) > 0 {
go c.delete(keysToDelete...)
if len(b.keys) == 0 {
delete(c.tree, prefix)
b.minTs = 0
} else if minTsDeleted {
b.minTs = 0
for _, ts := range b.keys {
if b.minTs == 0 || ts < b.minTs {
b.minTs = ts
}
}
}
return
}
89 changes: 89 additions & 0 deletions plugin/action/cardinality/cache_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package cardinality

import (
"fmt"
"math/rand"
"testing"
"time"
)

func benchSetup(b *testing.B, suffixCount int) (*Cache, string) {
seed := uint64(b.N)
rng := rand.New(rand.NewSource(int64(seed)))

b.Helper()
cache := NewCache(time.Hour)
prefix := makeRandStr(rng, 16)

for i := 0; i < suffixCount; i++ {
key := fmt.Sprintf("%s-%s", prefix, makeRandStr(rng, 16))
cache.Set(prefix, key)
}

return cache, prefix
}

func makeRandStr(rng *rand.Rand, n int) string {
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, n)
for i := range b {
b[i] = letters[rng.Intn(len(letters))]
}
return string(b)
}

func BenchmarkCountPrefix_100(b *testing.B) {
cache, prefix := benchSetup(b, 100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.CountPrefix(prefix)
}
}

func BenchmarkCountPrefix_1k(b *testing.B) {
cache, prefix := benchSetup(b, 1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.CountPrefix(prefix)
}
}

func BenchmarkCountPrefix_10k(b *testing.B) {
cache, prefix := benchSetup(b, 10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.CountPrefix(prefix)
}
}

func BenchmarkCountPrefix_100k(b *testing.B) {
cache, prefix := benchSetup(b, 100000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.CountPrefix(prefix)
}
}

func BenchmarkSet_100(b *testing.B) {
cache, prefix := benchSetup(b, 100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Set(prefix, "new-"+fmt.Sprint(i))
}
}

func BenchmarkSet_1k(b *testing.B) {
cache, prefix := benchSetup(b, 1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Set(prefix, "new-"+fmt.Sprint(i))
}
}

func BenchmarkSet_10k(b *testing.B) {
cache, prefix := benchSetup(b, 10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Set(prefix, "new-"+fmt.Sprint(i))
}
}
Loading
Loading