Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datadog/dogstatsd/max_sample_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def skip_sample(self):

def flush(self):
# type: () -> List[MetricAggregator]
rate = self.stored_metric_samples / self.total_metric_samples
with self.lock:
rate = self.stored_metric_samples / self.total_metric_samples
return [
# casting self.data[i] to float as it is officially Optional[float]
# but always float between 0 and self.stored_metric_samples - 1
Expand Down
61 changes: 61 additions & 0 deletions tests/unit/dogstatsd/test_max_sample_metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import unittest
from mock import patch
from datadog.dogstatsd.max_sample_metric import HistogramMetric, DistributionMetric, TimingMetric
Expand Down Expand Up @@ -102,6 +103,66 @@ def test_maybe_keep_sample_work_unsafe(self):
self.assertEqual(s.metric_type, MetricType.HISTOGRAM)
self.assertEqual(s.cardinality, None)

def test_flush_rate_reflects_skipped_samples(self):
s = HistogramMetric(name="test", tags=[], max_metric_samples=0, rate=1.0, cardinality=None)
s.maybe_keep_sample_work_unsafe(1)
s.maybe_keep_sample_work_unsafe(2)
s.maybe_keep_sample_work_unsafe(3)
s.skip_sample()
s.skip_sample()

metrics = s.flush()
self.assertEqual(len(metrics), 3)
for m in metrics:
self.assertAlmostEqual(m.rate, 3 / 5)

def test_flush_rate_reflects_bounded_reservoir_sampling(self):
s = HistogramMetric(name="test", tags=[], max_metric_samples=2, rate=1.0, cardinality=None)
s.maybe_keep_sample_work_unsafe(1)
s.maybe_keep_sample_work_unsafe(2)
s.maybe_keep_sample_work_unsafe(3)

metrics = s.flush()
self.assertEqual(len(metrics), 2)
for m in metrics:
self.assertAlmostEqual(m.rate, 2 / 3)

def test_flush_rate_computed_inside_lock(self):
"""rate must be computed inside the lock so it is consistent with the returned samples.

An intercepted lock injects a skip_sample() call right before the lock is
actually acquired, simulating a concurrent thread that increments
total_metric_samples at the worst possible moment. With the old code the
rate was calculated *before* lock acquisition (stored=10, total=10 → 1.0),
and the injected skip would push total to 11 while the stale 1.0 was kept.
With the fix, rate is calculated *inside* the lock, after the injection, so
it correctly reflects 10/11.
"""
s = HistogramMetric(name="test", tags=[], max_metric_samples=0, rate=1.0, cardinality=None)
for i in range(10):
s.maybe_keep_sample_work_unsafe(i)
# stored=10, total=10

real_lock = threading.Lock()

class _InterceptedLock:
def __enter__(self_inner):
s.skip_sample() # total becomes 11 right before the lock is acquired
real_lock.acquire()
return self_inner

def __exit__(self_inner, *args):
real_lock.release()

s.lock = _InterceptedLock()
metrics = s.flush()

# The injected skip_sample made total=11 while stored stayed at 10.
# rate must reflect the state *after* the injection: 10/11.
self.assertEqual(len(metrics), 10)
for m in metrics:
self.assertAlmostEqual(m.rate, 10 / 11)

class TestMaxSampleMetricContexts(unittest.TestCase):

@patch('datadog.dogstatsd.max_sample_metric_context.random.random', return_value=0.0)
Expand Down
Loading