diff --git a/datadog/dogstatsd/max_sample_metric.py b/datadog/dogstatsd/max_sample_metric.py index ed9f67ac0..d2bbe098d 100644 --- a/datadog/dogstatsd/max_sample_metric.py +++ b/datadog/dogstatsd/max_sample_metric.py @@ -49,8 +49,8 @@ def skip_sample(self): def flush(self): # type: () -> List[MetricAggregator] - rate = self.stored_metric_samples / self.total_metric_samples with self.lock: + rate = self.stored_metric_samples / self.total_metric_samples return [ # casting self.data[i] to float as it is officially Optional[float] # but always float between 0 and self.stored_metric_samples - 1 diff --git a/tests/unit/dogstatsd/test_max_sample_metrics.py b/tests/unit/dogstatsd/test_max_sample_metrics.py index 6dcb4033c..1543b1050 100644 --- a/tests/unit/dogstatsd/test_max_sample_metrics.py +++ b/tests/unit/dogstatsd/test_max_sample_metrics.py @@ -1,3 +1,4 @@ +import threading import unittest from mock import patch from datadog.dogstatsd.max_sample_metric import HistogramMetric, DistributionMetric, TimingMetric @@ -102,6 +103,66 @@ def test_maybe_keep_sample_work_unsafe(self): self.assertEqual(s.metric_type, MetricType.HISTOGRAM) self.assertEqual(s.cardinality, None) + def test_flush_rate_reflects_skipped_samples(self): + s = HistogramMetric(name="test", tags=[], max_metric_samples=0, rate=1.0, cardinality=None) + s.maybe_keep_sample_work_unsafe(1) + s.maybe_keep_sample_work_unsafe(2) + s.maybe_keep_sample_work_unsafe(3) + s.skip_sample() + s.skip_sample() + + metrics = s.flush() + self.assertEqual(len(metrics), 3) + for m in metrics: + self.assertAlmostEqual(m.rate, 3 / 5) + + def test_flush_rate_reflects_bounded_reservoir_sampling(self): + s = HistogramMetric(name="test", tags=[], max_metric_samples=2, rate=1.0, cardinality=None) + s.maybe_keep_sample_work_unsafe(1) + s.maybe_keep_sample_work_unsafe(2) + s.maybe_keep_sample_work_unsafe(3) + + metrics = s.flush() + self.assertEqual(len(metrics), 2) + for m in metrics: + self.assertAlmostEqual(m.rate, 2 / 3) + + def test_flush_rate_computed_inside_lock(self): + """rate must be computed inside the lock so it is consistent with the returned samples. + + An intercepted lock injects a skip_sample() call right before the lock is + actually acquired, simulating a concurrent thread that increments + total_metric_samples at the worst possible moment. With the old code the + rate was calculated *before* lock acquisition (stored=10, total=10 → 1.0), + and the injected skip would push total to 11 while the stale 1.0 was kept. + With the fix, rate is calculated *inside* the lock, after the injection, so + it correctly reflects 10/11. + """ + s = HistogramMetric(name="test", tags=[], max_metric_samples=0, rate=1.0, cardinality=None) + for i in range(10): + s.maybe_keep_sample_work_unsafe(i) + # stored=10, total=10 + + real_lock = threading.Lock() + + class _InterceptedLock: + def __enter__(self_inner): + s.skip_sample() # total becomes 11 right before the lock is acquired + real_lock.acquire() + return self_inner + + def __exit__(self_inner, *args): + real_lock.release() + + s.lock = _InterceptedLock() + metrics = s.flush() + + # The injected skip_sample made total=11 while stored stayed at 10. + # rate must reflect the state *after* the injection: 10/11. + self.assertEqual(len(metrics), 10) + for m in metrics: + self.assertAlmostEqual(m.rate, 10 / 11) + class TestMaxSampleMetricContexts(unittest.TestCase): @patch('datadog.dogstatsd.max_sample_metric_context.random.random', return_value=0.0)