From 87fdb801c0bc4e9676a98f15510cfb68c1df1e64 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 20 May 2026 20:14:56 +0700 Subject: [PATCH 1/7] add shared cache for cardinality action --- plugin/action/cardinality/cache_test.go | 24 ++++++++++++++++++++++++ plugin/action/cardinality/cardinality.go | 11 ++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/plugin/action/cardinality/cache_test.go b/plugin/action/cardinality/cache_test.go index e550b0ee6..5c26e723c 100644 --- a/plugin/action/cardinality/cache_test.go +++ b/plugin/action/cardinality/cache_test.go @@ -1,6 +1,8 @@ package cardinality import ( + "fmt" + "math/rand" "sync" "testing" "time" @@ -179,6 +181,28 @@ func TestConcurrentOperations(t *testing.T) { wg.Wait() } +func TestCountPrefixWith10kElements(t *testing.T) { + cache := NewCache(time.Minute) + n := 10000 + prefix := randString(64) + for i := 0; i < n; i++ { + key := fmt.Sprintf("%s%s", prefix, randString(48)) + cache.Set(key) + cache.Set(key) + assert.Equal(t, i+1, cache.CountPrefix(prefix)) + } + +} + +func randString(n int) string { + const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + b := make([]byte, n) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} + func TestTTL(t *testing.T) { cache := NewCache(100 * time.Millisecond) diff --git a/plugin/action/cardinality/cardinality.go b/plugin/action/cardinality/cardinality.go index cb1e5d19f..6aa9381a7 100644 --- a/plugin/action/cardinality/cardinality.go +++ b/plugin/action/cardinality/cardinality.go @@ -3,6 +3,7 @@ package cardinality import ( "fmt" "strings" + "sync" "time" "github.com/ozontech/file.d/cfg" @@ -73,6 +74,8 @@ The resulting events: ``` }*/ +var sharedCaches sync.Map + type Plugin struct { cache *Cache config *Config @@ -179,11 +182,17 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { return &Plugin{}, &Config{} } +func getSharedCache(ttl time.Duration, pipelineName string, index int) *Cache { + key := pipelineName + "/" + fmt.Sprint(index) + actual, _ := sharedCaches.LoadOrStore(key, NewCache(ttl)) + return actual.(*Cache) +} + func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { p.config = config.(*Config) p.logger = params.Logger.Desugar() - p.cache = NewCache(p.config.TTL_) + p.cache = getSharedCache(p.config.TTL_, params.PipelineName, params.Index) if len(p.config.Fields) == 0 { p.logger.Fatal("you have to set fields") From be2c7db86497aa1e16cab284688c6c0ad89060cd Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 26 May 2026 09:09:59 +0700 Subject: [PATCH 2/7] add metric cardinality_unique_values_count_total --- plugin/action/cardinality/cache_test.go | 1 - plugin/action/cardinality/cardinality.go | 16 +++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/plugin/action/cardinality/cache_test.go b/plugin/action/cardinality/cache_test.go index 5c26e723c..4ca0d3715 100644 --- a/plugin/action/cardinality/cache_test.go +++ b/plugin/action/cardinality/cache_test.go @@ -191,7 +191,6 @@ func TestCountPrefixWith10kElements(t *testing.T) { cache.Set(key) assert.Equal(t, i+1, cache.CountPrefix(prefix)) } - } func randString(n int) string { diff --git a/plugin/action/cardinality/cardinality.go b/plugin/action/cardinality/cardinality.go index 6aa9381a7..4ba64f1df 100644 --- a/plugin/action/cardinality/cardinality.go +++ b/plugin/action/cardinality/cardinality.go @@ -86,6 +86,7 @@ type Plugin struct { cardinalityUniqueValuesLimit *metric.Gauge cardinalityUniqueValuesGauge *metric.GaugeVec + cardinalityUniqueValuesTotal *metric.CounterVec } type parsedField struct { @@ -231,7 +232,19 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl, prefix string) { } p.cardinalityUniqueValuesGauge = ctl.RegisterGaugeVec( metricName, - "Count of unique values", + "Current count of unique values observed per key group", + keyMetricLabels(p.keys)..., + ) + + var metricTotalName string + if prefix == "" { + metricTotalName = "cardinality_unique_values_count_total" + } else { + metricTotalName = fmt.Sprintf(`cardinality_%s_unique_values_count_total`, prefix) + } + p.cardinalityUniqueValuesTotal = ctl.RegisterCounterVec( + metricTotalName, + "Cumulative number of newly seen unique values per key group since process start", keyMetricLabels(p.keys)..., ) @@ -300,6 +313,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { // is new value keysCount++ p.cardinalityUniqueValuesGauge.WithLabelValues(p.keys.valsBuf...).Set(float64(keysCount)) + p.cardinalityUniqueValuesTotal.WithLabelValues(p.keys.valsBuf...).Inc() } return pipeline.ActionPass From 97abcd54be46ae3124d36704435b36d1a5e53829 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 8 Jun 2026 19:46:31 +0700 Subject: [PATCH 3/7] optimize cardinality cache --- plugin/action/cardinality/cache.go | 88 +++++++++----- plugin/action/cardinality/cache_test.go | 108 +++++++++--------- plugin/action/cardinality/cardinality.go | 2 +- plugin/action/cardinality/cardinality_test.go | 2 +- 4 files changed, 116 insertions(+), 84 deletions(-) diff --git a/plugin/action/cardinality/cache.go b/plugin/action/cardinality/cache.go index 956a9ccc5..6931b26ee 100644 --- a/plugin/action/cardinality/cache.go +++ b/plugin/action/cardinality/cache.go @@ -4,66 +4,96 @@ import ( "sync" "time" - radix "github.com/armon/go-radix" "github.com/ozontech/file.d/xtime" ) +// entry holds a timestamp for a single cached key. +type entry struct { + ts int64 +} + type Cache struct { mu *sync.RWMutex - tree *radix.Tree + tree map[string]*map[string]*entry // prefix -> (full key -> timestamp) ttl int64 } func NewCache(ttl time.Duration) *Cache { return &Cache{ - tree: radix.New(), + tree: make(map[string]*map[string]*entry), ttl: ttl.Nanoseconds(), mu: &sync.RWMutex{}, } } -func (c *Cache) Set(key string) bool { +// Set stores a full key under the given prefix bucket. +// It returns true if the key already existed, false otherwise. +func (c *Cache) Set(prefix, key string) bool { c.mu.Lock() defer c.mu.Unlock() - _, result := c.tree.Insert(key, xtime.GetInaccurateUnixNano()) - return result -} - -func (c *Cache) isExpire(now, value int64) bool { - diff := now - value - return diff > c.ttl -} - -func (c *Cache) delete(keysToDelete ...string) { - if len(keysToDelete) == 0 { - return + bucket, ok := c.tree[prefix] + if !ok { + bucket = &map[string]*entry{} + c.tree[prefix] = bucket } - c.mu.Lock() - defer c.mu.Unlock() - for _, key := range keysToDelete { - c.tree.Delete(key) + e, exists := (*bucket)[key] + if exists { + e.ts = xtime.GetInaccurateUnixNano() + return true } + + (*bucket)[key] = &entry{ts: xtime.GetInaccurateUnixNano()} + return false } +func (c *Cache) isExpire(now, value int64) bool { + return now-value > c.ttl +} + +// CountPrefix returns the number of non-expired keys under the prefix. +// Expired keys are scheduled for async deletion. func (c *Cache) CountPrefix(prefix string) (count int) { var keysToDelete []string now := xtime.GetInaccurateUnixNano() + c.mu.RLock() - c.tree.WalkPrefix(prefix, func(s string, v any) bool { - timeValue := v.(int64) - if c.isExpire(now, timeValue) { - keysToDelete = append(keysToDelete, s) - } else { - count++ + bucket := c.tree[prefix] + if bucket != nil { + for key, e := range *bucket { + if c.isExpire(now, e.ts) { + keysToDelete = append(keysToDelete, key) + } else { + count++ + } } - return false - }) + } c.mu.RUnlock() if len(keysToDelete) > 0 { - go c.delete(keysToDelete...) + go c.delete(prefix, keysToDelete...) } return } + +func (c *Cache) delete(prefix string, keysToDelete ...string) { + if len(keysToDelete) == 0 { + return + } + c.mu.Lock() + defer c.mu.Unlock() + + bucket := c.tree[prefix] + if bucket == nil { + return + } + + for _, key := range keysToDelete { + delete(*bucket, key) + } + + if len(*bucket) == 0 { + delete(c.tree, prefix) + } +} diff --git a/plugin/action/cardinality/cache_test.go b/plugin/action/cardinality/cache_test.go index 4ca0d3715..155668928 100644 --- a/plugin/action/cardinality/cache_test.go +++ b/plugin/action/cardinality/cache_test.go @@ -19,17 +19,17 @@ func TestSetAndExists(t *testing.T) { cache := NewCache(time.Minute) t.Run("basic set and get", func(t *testing.T) { - key := "test-key" - assert.False(t, cache.Set(key)) + prefix, key := "prefix", "prefix-test-key" + assert.False(t, cache.Set(prefix, key)) - found := cacheKeyIsExists(cache, key) + found := cacheKeyIsExists(cache, prefix, key) assert.True(t, found) - assert.True(t, cache.Set(key)) + assert.True(t, cache.Set(prefix, key)) }) t.Run("non-existent key", func(t *testing.T) { - found := cacheKeyIsExists(cache, "non-existent") + found := cacheKeyIsExists(cache, "prefix", "non-existent") assert.False(t, found) }) } @@ -38,41 +38,43 @@ func TestDelete(t *testing.T) { cache := NewCache(time.Minute) t.Run("delete existing key", func(t *testing.T) { - key := "to-delete-1" - cache.Set(key) + prefix, key := "prefix", "prefix-to-delete-1" + cache.Set(prefix, key) - cache.delete(key) + cache.delete(prefix, key) - found := cacheKeyIsExists(cache, key) + found := cacheKeyIsExists(cache, prefix, key) assert.False(t, found, "Key should be deleted") }) t.Run("delete non-existent key", func(t *testing.T) { + prefix := "prefix" // Should not panic or cause issues assert.NotPanics(t, func() { - cache.delete("never-existed-1") + cache.delete(prefix, "never-existed-1") }) // Verify cache is still functional - key := "test-after-non-existent" - cache.Set(key) - found := cacheKeyIsExists(cache, key) + key := "prefix-test-after-non-existent" + cache.Set(prefix, key) + found := cacheKeyIsExists(cache, prefix, key) assert.True(t, found, "Cache should still work after deleting non-existent key") }) t.Run("delete many existing key", func(t *testing.T) { - key1 := "to-delete-1" - cache.Set(key1) + prefix := "prefix" + key1 := "prefix-to-delete-1" + cache.Set(prefix, key1) - key2 := "to-delete-2" - cache.Set(key2) + key2 := "prefix-to-delete-2" + cache.Set(prefix, key2) - cache.delete(key1, key2) + cache.delete(prefix, key1, key2) - found := cacheKeyIsExists(cache, key1) + found := cacheKeyIsExists(cache, prefix, key1) assert.False(t, found, "Key should be deleted") - found = cacheKeyIsExists(cache, key2) + found = cacheKeyIsExists(cache, prefix, key2) assert.False(t, found, "Key should be deleted") }) } @@ -80,17 +82,12 @@ func TestDelete(t *testing.T) { func TestCountPrefix(t *testing.T) { cache := NewCache(time.Minute) - keys := []string{ - "key1_subkey1", - "key1_subkey1", - "key1_subkey2", + prefix1, prefix2 := "key1", "key2" - "key2_subkey1", - } - - for _, key := range keys { - cache.Set(key) - } + cache.Set(prefix1, prefix1+"_subkey1") + cache.Set(prefix1, prefix1+"_subkey1") + cache.Set(prefix1, prefix1+"_subkey2") + cache.Set(prefix2, prefix2+"_subkey1") testCases := []struct { prefix string @@ -108,8 +105,8 @@ func TestCountPrefix(t *testing.T) { } t.Run("count after delete", func(t *testing.T) { - cache.delete("key1_subkey1") - assert.Equal(t, 1, cache.CountPrefix("key1")) + cache.delete(prefix1, prefix1+"_subkey1") + assert.Equal(t, 1, cache.CountPrefix(prefix1)) }) } @@ -117,7 +114,8 @@ func TestConcurrentOperations(t *testing.T) { cache := NewCache(time.Minute) var wg sync.WaitGroup - keys := []string{"key1", "key2", "key3"} + prefix := "prefix" + keys := []string{"prefix-key1", "prefix-key2", "prefix-key3"} // Test concurrent sets wg.Add(len(keys)) @@ -125,7 +123,7 @@ func TestConcurrentOperations(t *testing.T) { go func(k string) { defer wg.Done() for i := 0; i < 100; i++ { - cache.Set(k) + cache.Set(prefix, k) } }(key) } @@ -133,7 +131,7 @@ func TestConcurrentOperations(t *testing.T) { // Verify all keys were set for _, key := range keys { - found := cacheKeyIsExists(cache, key) + found := cacheKeyIsExists(cache, prefix, key) assert.True(t, found) } @@ -143,8 +141,8 @@ func TestConcurrentOperations(t *testing.T) { go func(k string) { defer wg.Done() for i := 0; i < 100; i++ { - cacheKeyIsExists(cache, k) - cache.Set(k + "-new") + cacheKeyIsExists(cache, prefix, k) + cache.Set(prefix, k+"-new") } }(key) } @@ -156,7 +154,7 @@ func TestConcurrentOperations(t *testing.T) { go func(k string) { defer wg.Done() for i := 0; i < 100; i++ { - cache.delete(k) + cache.delete(prefix, k) } }(key) } @@ -167,15 +165,15 @@ func TestConcurrentOperations(t *testing.T) { go func() { defer wg.Done() for i := 0; i < 100; i++ { - cache.CountPrefix("key") + cache.CountPrefix(prefix) } }() go func() { defer wg.Done() for i := 0; i < 100; i++ { - cache.Set("key-x") - cache.Set("key-y") - cache.delete("key-x") + cache.Set(prefix, "prefix-key-x") + cache.Set(prefix, "prefix-key-y") + cache.delete(prefix, "prefix-key-x") } }() wg.Wait() @@ -186,9 +184,9 @@ func TestCountPrefixWith10kElements(t *testing.T) { n := 10000 prefix := randString(64) for i := 0; i < n; i++ { - key := fmt.Sprintf("%s%s", prefix, randString(48)) - cache.Set(key) - cache.Set(key) + key := fmt.Sprintf("%s-%s", prefix, randString(48)) + cache.Set(prefix, key) + cache.Set(prefix, key) assert.Equal(t, i+1, cache.CountPrefix(prefix)) } } @@ -205,28 +203,32 @@ func randString(n int) string { func TestTTL(t *testing.T) { cache := NewCache(100 * time.Millisecond) - key := "ttl-key" - cache.Set(key) + prefix, key := "ttl-key", "ttl-key-sub" + cache.Set(prefix, key) t.Run("key exists before TTL", func(t *testing.T) { - assert.Equal(t, 1, cache.CountPrefix(key)) - found := cacheKeyIsExists(cache, key) + assert.Equal(t, 1, cache.CountPrefix(prefix)) + found := cacheKeyIsExists(cache, prefix, key) assert.True(t, found) }) t.Run("key expires after TTL", func(t *testing.T) { time.Sleep(1 * time.Second) - assert.Equal(t, 0, cache.CountPrefix(key)) + assert.Equal(t, 0, cache.CountPrefix(prefix)) time.Sleep(100 * time.Millisecond) // cause delete in async - found := cacheKeyIsExists(cache, key) + found := cacheKeyIsExists(cache, prefix, key) assert.False(t, found) }) } -func cacheKeyIsExists(c *Cache, key string) bool { +func cacheKeyIsExists(c *Cache, prefix, key string) bool { c.mu.RLock() defer c.mu.RUnlock() - _, found := c.tree.Get(key) + bucket := c.tree[prefix] + if bucket == nil { + return false + } + _, found := (*bucket)[key] return found } diff --git a/plugin/action/cardinality/cardinality.go b/plugin/action/cardinality/cardinality.go index 4ba64f1df..333559cf5 100644 --- a/plugin/action/cardinality/cardinality.go +++ b/plugin/action/cardinality/cardinality.go @@ -308,7 +308,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { p.fields.valsBuf[i] = value } - isOldValue := p.cache.Set(string(p.fields.appendTo(prefixKey))) + isOldValue := p.cache.Set(string(prefixKey), string(p.fields.appendTo(prefixKey))) if !isOldValue { // is new value keysCount++ diff --git a/plugin/action/cardinality/cardinality_test.go b/plugin/action/cardinality/cardinality_test.go index 20a72539b..0687ae31d 100644 --- a/plugin/action/cardinality/cardinality_test.go +++ b/plugin/action/cardinality/cardinality_test.go @@ -198,7 +198,7 @@ func TestSetAndCountPrefix(t *testing.T) { var buf []byte prefixKey := key.appendTo(buf) - cache.Set(string(value.appendTo(prefixKey))) + cache.Set(string(prefixKey), string(value.appendTo(prefixKey))) keysCount := cache.CountPrefix(string(prefixKey)) assert.Equal(t, 1, keysCount, "wrong in events count") From b7d369c1b84c1eee8729d4573a90ab12204bacd2 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 9 Jun 2026 16:20:15 +0700 Subject: [PATCH 4/7] cardinality: optimize isExpire check in CountPrefix loop --- plugin/action/cardinality/cache.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/plugin/action/cardinality/cache.go b/plugin/action/cardinality/cache.go index 6931b26ee..b78135b41 100644 --- a/plugin/action/cardinality/cache.go +++ b/plugin/action/cardinality/cache.go @@ -48,21 +48,17 @@ func (c *Cache) Set(prefix, key string) bool { return false } -func (c *Cache) isExpire(now, value int64) bool { - return now-value > c.ttl -} - // CountPrefix returns the number of non-expired keys under the prefix. // Expired keys are scheduled for async deletion. func (c *Cache) CountPrefix(prefix string) (count int) { var keysToDelete []string - now := xtime.GetInaccurateUnixNano() + threshold := xtime.GetInaccurateUnixNano() - c.ttl c.mu.RLock() bucket := c.tree[prefix] if bucket != nil { for key, e := range *bucket { - if c.isExpire(now, e.ts) { + if e.ts < threshold { keysToDelete = append(keysToDelete, key) } else { count++ From b7c6407e0071cff3e7ff3562c69ba5cda51a765a Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 9 Jun 2026 18:24:44 +0700 Subject: [PATCH 5/7] cardinality: O(1) CountPrefix via minTs fast-path and reduced indirection --- go.mod | 1 - go.sum | 2 - plugin/action/cardinality/cache.go | 113 +++++++++++++----- plugin/action/cardinality/cache_bench_test.go | 89 ++++++++++++++ plugin/action/cardinality/cache_test.go | 8 +- 5 files changed, 175 insertions(+), 38 deletions(-) create mode 100644 plugin/action/cardinality/cache_bench_test.go diff --git a/go.mod b/go.mod index b182dbc18..26f633e58 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/alecthomas/kingpin v2.2.6+incompatible github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 github.com/alicebob/miniredis/v2 v2.35.0 - github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310 github.com/bitly/go-simplejson v0.5.1 github.com/bmatcuk/doublestar/v4 v4.8.1 github.com/bufbuild/protocompile v0.13.0 diff --git a/go.sum b/go.sum index 8c125d74a..5a8f56710 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,6 @@ github.com/alicebob/miniredis/v2 v2.35.0 h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21j github.com/alicebob/miniredis/v2 v2.35.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= -github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310 h1:BUAU3CGlLvorLI26FmByPp2eC2qla6E1Tw+scpcg/to= -github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow= diff --git a/plugin/action/cardinality/cache.go b/plugin/action/cardinality/cache.go index b78135b41..457901742 100644 --- a/plugin/action/cardinality/cache.go +++ b/plugin/action/cardinality/cache.go @@ -7,20 +7,21 @@ import ( "github.com/ozontech/file.d/xtime" ) -// entry holds a timestamp for a single cached key. -type entry struct { - ts int64 +// bucket holds keys for a single prefix with fast counting. +type bucket struct { + keys map[string]int64 + minTs int64 // oldest key timestamp; 0 if empty } type Cache struct { mu *sync.RWMutex - tree map[string]*map[string]*entry // prefix -> (full key -> timestamp) + tree map[string]*bucket ttl int64 } func NewCache(ttl time.Duration) *Cache { return &Cache{ - tree: make(map[string]*map[string]*entry), + tree: make(map[string]*bucket), ttl: ttl.Nanoseconds(), mu: &sync.RWMutex{}, } @@ -32,45 +33,81 @@ func (c *Cache) Set(prefix, key string) bool { c.mu.Lock() defer c.mu.Unlock() - bucket, ok := c.tree[prefix] + b, ok := c.tree[prefix] if !ok { - bucket = &map[string]*entry{} - c.tree[prefix] = bucket + b = &bucket{keys: make(map[string]int64)} + c.tree[prefix] = b } - e, exists := (*bucket)[key] - if exists { - e.ts = xtime.GetInaccurateUnixNano() + ts := xtime.GetInaccurateUnixNano() + if _, exists := b.keys[key]; exists { + b.keys[key] = ts return true } - (*bucket)[key] = &entry{ts: xtime.GetInaccurateUnixNano()} + b.keys[key] = ts + if b.minTs == 0 || ts < b.minTs { + b.minTs = ts + } return false } // CountPrefix returns the number of non-expired keys under the prefix. -// Expired keys are scheduled for async deletion. -func (c *Cache) CountPrefix(prefix string) (count int) { - var keysToDelete []string +// Expired keys are cleaned synchronously on the first call that detects them. +func (c *Cache) CountPrefix(prefix string) int { threshold := xtime.GetInaccurateUnixNano() - c.ttl c.mu.RLock() - bucket := c.tree[prefix] - if bucket != nil { - for key, e := range *bucket { - if e.ts < threshold { - keysToDelete = append(keysToDelete, key) - } else { - count++ - } - } + b := c.tree[prefix] + if b == nil { + c.mu.RUnlock() + return 0 + } + + // Oldest key hasn't expired → no keys expired. Return count in O(1). + if b.minTs >= threshold { + count := len(b.keys) + c.mu.RUnlock() + return count } + + // Some keys may have expired — upgrade to write lock and scan. c.mu.RUnlock() - if len(keysToDelete) > 0 { - go c.delete(prefix, keysToDelete...) + c.mu.Lock() + b = c.tree[prefix] + if b == nil { + c.mu.Unlock() + return 0 + } + + // Recompute threshold and re-check under write lock. + threshold = xtime.GetInaccurateUnixNano() - c.ttl + if b.minTs >= threshold { + count := len(b.keys) + c.mu.Unlock() + return count + } + + count := 0 + newMinTs := int64(0) + for key, ts := range b.keys { + if ts >= threshold { + count++ + if newMinTs == 0 || ts < newMinTs { + newMinTs = ts + } + } else { + delete(b.keys, key) + } + } + b.minTs = newMinTs + if len(b.keys) == 0 { + delete(c.tree, prefix) } - return + c.mu.Unlock() + + return count } func (c *Cache) delete(prefix string, keysToDelete ...string) { @@ -80,16 +117,30 @@ func (c *Cache) delete(prefix string, keysToDelete ...string) { c.mu.Lock() defer c.mu.Unlock() - bucket := c.tree[prefix] - if bucket == nil { + b := c.tree[prefix] + if b == nil { return } + minTsDeleted := false for _, key := range keysToDelete { - delete(*bucket, key) + if ts, ok := b.keys[key]; ok { + if ts == b.minTs { + minTsDeleted = true + } + delete(b.keys, key) + } } - if len(*bucket) == 0 { + if len(b.keys) == 0 { delete(c.tree, prefix) + b.minTs = 0 + } else if minTsDeleted { + b.minTs = 0 + for _, ts := range b.keys { + if b.minTs == 0 || ts < b.minTs { + b.minTs = ts + } + } } } diff --git a/plugin/action/cardinality/cache_bench_test.go b/plugin/action/cardinality/cache_bench_test.go new file mode 100644 index 000000000..1df4f5f91 --- /dev/null +++ b/plugin/action/cardinality/cache_bench_test.go @@ -0,0 +1,89 @@ +package cardinality + +import ( + "fmt" + "math/rand" + "testing" + "time" +) + +func benchSetup(b *testing.B, suffixCount int) (*Cache, string) { + seed := uint64(b.N) + rng := rand.New(rand.NewSource(int64(seed))) + + b.Helper() + cache := NewCache(time.Hour) + prefix := makeRandStr(rng, 16) + + for i := 0; i < suffixCount; i++ { + key := fmt.Sprintf("%s-%s", prefix, makeRandStr(rng, 16)) + cache.Set(prefix, key) + } + + return cache, prefix +} + +func makeRandStr(rng *rand.Rand, n int) string { + const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + b := make([]byte, n) + for i := range b { + b[i] = letters[rng.Intn(len(letters))] + } + return string(b) +} + +func BenchmarkCountPrefix_100(b *testing.B) { + cache, prefix := benchSetup(b, 100) + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.CountPrefix(prefix) + } +} + +func BenchmarkCountPrefix_1k(b *testing.B) { + cache, prefix := benchSetup(b, 1000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.CountPrefix(prefix) + } +} + +func BenchmarkCountPrefix_10k(b *testing.B) { + cache, prefix := benchSetup(b, 10000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.CountPrefix(prefix) + } +} + +func BenchmarkCountPrefix_100k(b *testing.B) { + cache, prefix := benchSetup(b, 100000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.CountPrefix(prefix) + } +} + +func BenchmarkSet_100(b *testing.B) { + cache, prefix := benchSetup(b, 100) + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Set(prefix, "new-"+fmt.Sprint(i)) + } +} + +func BenchmarkSet_1k(b *testing.B) { + cache, prefix := benchSetup(b, 1000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Set(prefix, "new-"+fmt.Sprint(i)) + } +} + +func BenchmarkSet_10k(b *testing.B) { + cache, prefix := benchSetup(b, 10000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Set(prefix, "new-"+fmt.Sprint(i)) + } +} diff --git a/plugin/action/cardinality/cache_test.go b/plugin/action/cardinality/cache_test.go index 155668928..a2a742413 100644 --- a/plugin/action/cardinality/cache_test.go +++ b/plugin/action/cardinality/cache_test.go @@ -215,7 +215,7 @@ func TestTTL(t *testing.T) { t.Run("key expires after TTL", func(t *testing.T) { time.Sleep(1 * time.Second) assert.Equal(t, 0, cache.CountPrefix(prefix)) - time.Sleep(100 * time.Millisecond) // cause delete in async + // CountPrefix now cleans expired keys synchronously, so no need to wait. found := cacheKeyIsExists(cache, prefix, key) assert.False(t, found) }) @@ -224,11 +224,11 @@ func TestTTL(t *testing.T) { func cacheKeyIsExists(c *Cache, prefix, key string) bool { c.mu.RLock() defer c.mu.RUnlock() - bucket := c.tree[prefix] - if bucket == nil { + b := c.tree[prefix] + if b == nil { return false } - _, found := (*bucket)[key] + _, found := b.keys[key] return found } From 3bf28596e7e0e595ab5a38b73ce0b9b3798542f7 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 11 Jun 2026 14:15:48 +0700 Subject: [PATCH 6/7] deduplicate CountPrefix fast-path into fastPath --- plugin/action/cardinality/cache.go | 57 ++++++++++++++++++------------ 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/plugin/action/cardinality/cache.go b/plugin/action/cardinality/cache.go index 457901742..f6d898221 100644 --- a/plugin/action/cardinality/cache.go +++ b/plugin/action/cardinality/cache.go @@ -52,43 +52,38 @@ func (c *Cache) Set(prefix, key string) bool { return false } -// CountPrefix returns the number of non-expired keys under the prefix. -// Expired keys are cleaned synchronously on the first call that detects them. -func (c *Cache) CountPrefix(prefix string) int { +// fastPath checks whether a bucket is nil or fully valid under the given lock. +// It returns (count, true) when the result is conclusive, or (0, false) when +// a slow-path scan is needed. On return false the lock is still held. +func (c *Cache) fastPath(prefix string, lock, unlock func()) (int, bool) { threshold := xtime.GetInaccurateUnixNano() - c.ttl - c.mu.RLock() + lock() b := c.tree[prefix] if b == nil { - c.mu.RUnlock() - return 0 + unlock() + return 0, true } // Oldest key hasn't expired → no keys expired. Return count in O(1). if b.minTs >= threshold { count := len(b.keys) - c.mu.RUnlock() - return count + unlock() + return count, true } - // Some keys may have expired — upgrade to write lock and scan. - c.mu.RUnlock() + return 0, false // lock still held +} - c.mu.Lock() - b = c.tree[prefix] +// cleanBucket scans a bucket under write lock, deletes expired keys, and +// updates minTs. The lock must already be held. +func (c *Cache) cleanBucket(prefix string) (int, int64) { + b := c.tree[prefix] if b == nil { - c.mu.Unlock() - return 0 - } - - // Recompute threshold and re-check under write lock. - threshold = xtime.GetInaccurateUnixNano() - c.ttl - if b.minTs >= threshold { - count := len(b.keys) - c.mu.Unlock() - return count + return 0, 0 } + threshold := xtime.GetInaccurateUnixNano() - c.ttl count := 0 newMinTs := int64(0) for key, ts := range b.keys { @@ -105,8 +100,24 @@ func (c *Cache) CountPrefix(prefix string) int { if len(b.keys) == 0 { delete(c.tree, prefix) } - c.mu.Unlock() + return count, newMinTs +} + +// CountPrefix returns the number of non-expired keys under the prefix. +// Expired keys are cleaned synchronously on the first call that detects them. +func (c *Cache) CountPrefix(prefix string) int { + if count, ok := c.fastPath(prefix, c.mu.RLock, c.mu.RUnlock); ok { + return count + } + c.mu.RUnlock() + + if count, ok := c.fastPath(prefix, c.mu.Lock, c.mu.Unlock); ok { + return count + } + + count, _ := c.cleanBucket(prefix) + c.mu.Unlock() return count } From 3015d76f0c070be7b24b39d90ac588cd58433c34 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 11 Jun 2026 15:05:54 +0700 Subject: [PATCH 7/7] cardinality: use map without sync for shared caches --- plugin/action/cardinality/cardinality.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/plugin/action/cardinality/cardinality.go b/plugin/action/cardinality/cardinality.go index 333559cf5..d89b070fd 100644 --- a/plugin/action/cardinality/cardinality.go +++ b/plugin/action/cardinality/cardinality.go @@ -3,7 +3,6 @@ package cardinality import ( "fmt" "strings" - "sync" "time" "github.com/ozontech/file.d/cfg" @@ -74,7 +73,7 @@ The resulting events: ``` }*/ -var sharedCaches sync.Map +var sharedCaches = make(map[string]*Cache) type Plugin struct { cache *Cache @@ -185,8 +184,12 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { func getSharedCache(ttl time.Duration, pipelineName string, index int) *Cache { key := pipelineName + "/" + fmt.Sprint(index) - actual, _ := sharedCaches.LoadOrStore(key, NewCache(ttl)) - return actual.(*Cache) + cache, ok := sharedCaches[key] + if !ok { + cache = NewCache(ttl) + sharedCaches[key] = cache + } + return cache } func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {