diff --git a/pkg/runtimemetrics/runtime_metrics.go b/pkg/runtimemetrics/runtime_metrics.go index cde5525..63c1f64 100644 --- a/pkg/runtimemetrics/runtime_metrics.go +++ b/pkg/runtimemetrics/runtime_metrics.go @@ -39,14 +39,20 @@ func NewEmitter(statsd partialStatsdClientInterface, opts *Options) *Emitter { if opts == nil { opts = &Options{} } + baseTags := append(getBaseTags(), opts.Tags...) e := &Emitter{ - statsd: statsd, - logger: cmp.Or(opts.Logger, slog.Default()), - tags: opts.Tags, - stop: make(chan struct{}), - stopped: make(chan struct{}), - period: cmp.Or(opts.Period, 10*time.Second), + statsd: statsd, + logger: cmp.Or(opts.Logger, slog.Default()), + tags: opts.Tags, + stop: make(chan struct{}), + stopped: make(chan struct{}), + period: cmp.Or(opts.Period, 10*time.Second), + metrics: map[string]*runtimeMetric{}, + baseTags: baseTags, + unknownMetricLogOnce: &sync.Once{}, + unsupportedKindLogOnce: &sync.Once{}, } + e.initializeMetrics() go e.emit() return e } @@ -60,15 +66,18 @@ type Emitter struct { stop chan struct{} stopped chan struct{} + + // the map key is the name of the metric in runtime/metrics + metrics map[string]*runtimeMetric + baseTags []string + unknownMetricLogOnce *sync.Once + unsupportedKindLogOnce *sync.Once } // emit emits runtime/metrics to statsd on a regular interval. func (e *Emitter) emit() { - descs := metrics.All() - tags := append(getBaseTags(), e.tags...) - rms := newRuntimeMetricStore(descs, e.statsd, e.logger, tags) // TODO: Go services experiencing high scheduling latency might see a - // large variance for the period in between rms.report calls. This might + // large variance for the period in between e.report calls. This might // cause spikes in cumulative metric reporting. Should we try to correct // for this by measuring the actual reporting time delta to adjust // the numbers? @@ -87,7 +96,7 @@ func (e *Emitter) emit() { close(e.stopped) return case <-tick: - rms.report() + e.report() } } } @@ -112,16 +121,6 @@ type runtimeMetric struct { previousValue metrics.Value } -// the map key is the name of the metric in runtime/metrics -type runtimeMetricStore struct { - metrics map[string]*runtimeMetric - statsd partialStatsdClientInterface - logger *slog.Logger - baseTags []string - unknownMetricLogOnce *sync.Once - unsupportedKindLogOnce *sync.Once -} - // partialStatsdClientInterface is the subset of statsd.ClientInterface that is // used by this package. type partialStatsdClientInterface interface { @@ -133,16 +132,8 @@ type partialStatsdClientInterface interface { DistributionSamples(name string, values []float64, tags []string, rate float64) error } -func newRuntimeMetricStore(descs []metrics.Description, statsdClient partialStatsdClientInterface, logger *slog.Logger, tags []string) runtimeMetricStore { - rms := runtimeMetricStore{ - metrics: map[string]*runtimeMetric{}, - statsd: statsdClient, - logger: logger, - baseTags: tags, - unknownMetricLogOnce: &sync.Once{}, - unsupportedKindLogOnce: &sync.Once{}, - } - +func (e *Emitter) initializeMetrics() { + descs := metrics.All() for _, d := range descs { cumulative := d.Cumulative @@ -155,49 +146,47 @@ func newRuntimeMetricStore(descs []metrics.Description, statsdClient partialStat ddMetricName, err := datadogMetricName(d.Name) if err != nil { - rms.logger.Warn("runtimemetrics: not reporting one of the runtime metrics", slog.Attr{Key: "error", Value: slog.StringValue(err.Error())}) + e.logger.Warn("runtimemetrics: not reporting one of the runtime metrics", slog.Attr{Key: "error", Value: slog.StringValue(err.Error())}) continue } - rms.metrics[d.Name] = &runtimeMetric{ + e.metrics[d.Name] = &runtimeMetric{ ddMetricName: ddMetricName, cumulative: cumulative, } } - rms.update() - - return rms + e.update() } -func (rms runtimeMetricStore) update() { +func (e *Emitter) update() { // TODO: Reuse this slice to avoid allocations? Note: I don't see these // allocs show up in profiling. - samples := make([]metrics.Sample, len(rms.metrics)) + samples := make([]metrics.Sample, len(e.metrics)) i := 0 // NOTE: Map iteration in Go is randomized, so we end up randomizing the // samples slice. In theory this should not impact correctness, but it's // worth keeping in mind in case problems are observed in the future. - for name := range rms.metrics { + for name := range e.metrics { samples[i].Name = name i++ } metrics.Read(samples) for _, s := range samples { - runtimeMetric := rms.metrics[s.Name] + runtimeMetric := e.metrics[s.Name] runtimeMetric.previousValue = runtimeMetric.currentValue runtimeMetric.currentValue = s.Value } } -func (rms runtimeMetricStore) report() { +func (e *Emitter) report() { ts := time.Now() - rms.update() + e.update() samples := []distributionSample{} - rms.statsd.GaugeWithTimestamp(datadogMetricPrefix+"enabled", 1, rms.baseTags, 1, ts) - for name, rm := range rms.metrics { + e.statsd.GaugeWithTimestamp(datadogMetricPrefix+"enabled", 1, e.baseTags, 1, ts) + for name, rm := range e.metrics { switch rm.currentValue.Kind() { case metrics.KindUint64: v := rm.currentValue.Uint64() @@ -219,10 +208,10 @@ func (rms runtimeMetricStore) report() { // This is known to happen with the '/memory/classes/heap/unused:bytes' metric: https://github.com/golang/go/blob/go1.22.1/src/runtime/metrics.go#L364 // Until this bug is fixed, we log the problematic value and skip submitting that point to avoid spurious spikes in graphs. if v > math.MaxUint64/2 { - tags := make([]string, 0, len(rms.baseTags)+1) - tags = append(tags, rms.baseTags...) + tags := make([]string, 0, len(e.baseTags)+1) + tags = append(tags, e.baseTags...) tags = append(tags, "metric_name:"+rm.ddMetricName) - rms.statsd.CountWithTimestamp(datadogMetricPrefix+"skipped_values", 1, tags, 1, ts) + e.statsd.CountWithTimestamp(datadogMetricPrefix+"skipped_values", 1, tags, 1, ts) // Some metrics are ~sort of expected to report this high value (e.g. // "runtime.go.metrics.gc_gogc.percent" will consistently report "MaxUint64 - 1" if @@ -241,18 +230,18 @@ func (rms runtimeMetricStore) report() { } // Append all Uint64 values for maximum observability - for name, rm := range rms.metrics { + for name, rm := range e.metrics { if rm.currentValue.Kind() == metrics.KindUint64 { logAttrs = append(logAttrs, slog.Attr{Key: name, Value: slog.Uint64Value(rm.currentValue.Uint64())}) } } - rms.logger.Warn("runtimemetrics: skipped submission of absurd value", logAttrs...) + e.logger.Warn("runtimemetrics: skipped submission of absurd value", logAttrs...) } continue } - rms.statsd.GaugeWithTimestamp(rm.ddMetricName, float64(v), rms.baseTags, 1, ts) + e.statsd.GaugeWithTimestamp(rm.ddMetricName, float64(v), e.baseTags, 1, ts) case metrics.KindFloat64: v := rm.currentValue.Float64() // if the value didn't change between two reporting @@ -264,7 +253,7 @@ func (rms runtimeMetricStore) report() { if rm.cumulative && v != 0 && v == rm.previousValue.Float64() { continue } - rms.statsd.GaugeWithTimestamp(rm.ddMetricName, v, rms.baseTags, 1, ts) + e.statsd.GaugeWithTimestamp(rm.ddMetricName, v, e.baseTags, 1, ts) case metrics.KindFloat64Histogram: v := rm.currentValue.Float64Histogram() var equal bool @@ -284,30 +273,30 @@ func (rms runtimeMetricStore) report() { values := make([]float64, len(distSamples)) for i, ds := range distSamples { values[i] = ds.Value - rms.statsd.DistributionSamples(rm.ddMetricName, values[i:i+1], rms.baseTags, ds.Rate) + e.statsd.DistributionSamples(rm.ddMetricName, values[i:i+1], e.baseTags, ds.Rate) } stats := statsFromHist(v) // TODO: Could/should we use datadog distribution metrics for this? - rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".avg", stats.Avg, rms.baseTags, 1, ts) - rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".min", stats.Min, rms.baseTags, 1, ts) - rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".max", stats.Max, rms.baseTags, 1, ts) - rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".median", stats.Median, rms.baseTags, 1, ts) - rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".p95", stats.P95, rms.baseTags, 1, ts) - rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".p99", stats.P99, rms.baseTags, 1, ts) + e.statsd.GaugeWithTimestamp(rm.ddMetricName+".avg", stats.Avg, e.baseTags, 1, ts) + e.statsd.GaugeWithTimestamp(rm.ddMetricName+".min", stats.Min, e.baseTags, 1, ts) + e.statsd.GaugeWithTimestamp(rm.ddMetricName+".max", stats.Max, e.baseTags, 1, ts) + e.statsd.GaugeWithTimestamp(rm.ddMetricName+".median", stats.Median, e.baseTags, 1, ts) + e.statsd.GaugeWithTimestamp(rm.ddMetricName+".p95", stats.P95, e.baseTags, 1, ts) + e.statsd.GaugeWithTimestamp(rm.ddMetricName+".p99", stats.P99, e.baseTags, 1, ts) case metrics.KindBad: // This should never happen because all metrics are supported // by construction. - rms.unknownMetricLogOnce.Do(func() { - rms.logger.Error("runtimemetrics: encountered an unknown metric, this should never happen and might indicate a bug", slog.Attr{Key: "metric_name", Value: slog.StringValue(name)}) + e.unknownMetricLogOnce.Do(func() { + e.logger.Error("runtimemetrics: encountered an unknown metric, this should never happen and might indicate a bug", slog.Attr{Key: "metric_name", Value: slog.StringValue(name)}) }) default: // This may happen as new metric kinds get added. // // The safest thing to do here is to simply log it somewhere once // as something to look into, but ignore it for now. - rms.unsupportedKindLogOnce.Do(func() { - rms.logger.Error("runtimemetrics: unsupported metric kind, support for that kind should be added in pkg/runtimemetrics", + e.unsupportedKindLogOnce.Do(func() { + e.logger.Error("runtimemetrics: unsupported metric kind, support for that kind should be added in pkg/runtimemetrics", slog.Attr{Key: "metric_name", Value: slog.StringValue(name)}, slog.Attr{Key: "kind", Value: slog.AnyValue(rm.currentValue.Kind())}, ) diff --git a/pkg/runtimemetrics/runtime_metrics_test.go b/pkg/runtimemetrics/runtime_metrics_test.go index 36d7640..3b4e017 100644 --- a/pkg/runtimemetrics/runtime_metrics_test.go +++ b/pkg/runtimemetrics/runtime_metrics_test.go @@ -77,12 +77,12 @@ func TestMetricKinds(t *testing.T) { t.Run("Cumulative", func(t *testing.T) { // Note: This test could fail if an unexpected GC occurs. This // should be extremely unlikely. - mock, rms := reportMetric("/gc/cycles/total:gc-cycles", metrics.KindUint64) + mock, emitter := reportMetric("/gc/cycles/total:gc-cycles", metrics.KindUint64) require.GreaterOrEqual(t, mockCallWithSuffix(t, mock.GaugeCalls(), ".gc_cycles_total.gc_cycles").value, 1.0) // Note: Only these two GC cycles are expected to occur here runtime.GC() runtime.GC() - rms.report() + emitter.report() calls := mockCallsWithSuffix(mock.GaugeCalls(), ".gc_cycles_total.gc_cycles") require.Equal(t, 2, len(calls)) require.Greater(t, calls[1].value, calls[0].value) @@ -103,14 +103,14 @@ func TestMetricKinds(t *testing.T) { t.Run("Cumulative", func(t *testing.T) { // Note: This test could fail if we get extremely unlucky with the // scheduling. This should be extremely unlikely. - mock, rms := reportMetric("/sync/mutex/wait/total:seconds", metrics.KindFloat64) + mock, emitter := reportMetric("/sync/mutex/wait/total:seconds", metrics.KindFloat64) // With Go 1.22: mutex wait sometimes increments when calling runtime.GC(). // This does not seem to happen with Go <= 1.21 beforeCalls := mockCallsWithSuffix(mock.GaugeCalls(), ".sync_mutex_wait_total.seconds") require.LessOrEqual(t, len(beforeCalls), 1) createLockContention(100 * time.Millisecond) - rms.report() + emitter.report() afterCalls := mockCallsWithSuffix(mock.GaugeCalls(), ".sync_mutex_wait_total.seconds") require.Equal(t, len(beforeCalls)+1, len(afterCalls)) require.Greater(t, afterCalls[len(afterCalls)-1].value, 0.0) @@ -133,7 +133,7 @@ func TestMetricKinds(t *testing.T) { summaries := []string{"avg", "min", "max", "median", "p95", "p99"} // Note: This test could fail if an unexpected GC occurs. This // should be extremely unlikely. - mock, rms := reportMetric("/gc/pauses:seconds", metrics.KindFloat64Histogram) + mock, emitter := reportMetric("/gc/pauses:seconds", metrics.KindFloat64Histogram) calls1 := mockCallsWith(mock.GaugeCalls(), func(c statsdCall[float64]) bool { return strings.Contains(c.name, ".gc_pauses.seconds.") }) @@ -162,7 +162,7 @@ func TestMetricKinds(t *testing.T) { if !found { t.Errorf("missing %s metric", want) } - rms.report() + emitter.report() // Note: No GC cycle is expected to occur here calls2 := mockCallsWith(mock.GaugeCalls(), func(c statsdCall[float64]) bool { return strings.Contains(c.name, ".gc_pauses.seconds.") @@ -170,7 +170,7 @@ func TestMetricKinds(t *testing.T) { require.Equal(t, len(summaries), len(calls2)) // Note: Only this GC cycle is expected to occur here runtime.GC() - rms.report() + emitter.report() calls3 := mockCallsWith(mock.GaugeCalls(), func(c statsdCall[float64]) bool { return strings.Contains(c.name, ".gc_pauses.seconds.") }) @@ -183,19 +183,20 @@ func TestMetricKinds(t *testing.T) { // metrics and check that we don't crash or produce a very unexpected number of // metrics. func TestSmoke(t *testing.T) { - // Initialize store for all metrics with a mocked statsd client. - descs := metrics.All() + // Initialize emitter for all metrics with a mocked statsd client. mock := &statsdClientMock{} - rms := newRuntimeMetricStore(descs, mock, slog.Default(), []string{}) + emitter := NewEmitter(mock, &Options{Logger: slog.Default()}) + defer emitter.Stop() // This poulates most runtime/metrics. runtime.GC() - // But nothing should be sent to statsd yet. - assert.Equal(t, 0, len(mock.GaugeCalls())) + // But nothing should be sent to statsd yet initially. + // Give it a moment for potential initial emissions + time.Sleep(1 * time.Millisecond) // Flush the current metrics to our statsd mock. - rms.report() + emitter.report() // The exact number of statsd calls depends on the metric values and may // also change as new version of Go are being released. So we assert that we @@ -211,31 +212,53 @@ func TestSmoke(t *testing.T) { // and discarding them in a statsd mock. This can be used as a stress test, // identify regressions and to inform decisions about pollFrequency. func BenchmarkReport(b *testing.B) { - // Initialize store for all metrics with a mocked statsd client. - descs := metrics.All() + // Initialize emitter for all metrics with a mocked statsd client. mock := &statsdClientMock{Discard: true} - rms := newRuntimeMetricStore(descs, mock, slog.Default(), []string{}) + emitter := NewEmitter(mock, &Options{Logger: slog.Default()}) + defer emitter.Stop() // Benchmark report method b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - rms.report() + emitter.report() } } -// reportMetric creates a metrics store for the given metric, hooks it up to a +// reportMetric creates an emitter for the given metric, hooks it up to a // mock statsd client, triggers a GC cycle, calls report, and then returns // both. Callers are expected to observe the calls recorded by the mock and/or // trigger more activity. -func reportMetric(name string, kind metrics.ValueKind) (*statsdClientMock, runtimeMetricStore) { +func reportMetric(name string, kind metrics.ValueKind) (*statsdClientMock, *Emitter) { desc := metricDesc(name, kind) mock := &statsdClientMock{} - rms := newRuntimeMetricStore([]metrics.Description{desc}, mock, slog.Default(), []string{}) + // Create a new emitter with just the specific metric + emitter := NewEmitter(mock, &Options{Logger: slog.Default()}) + + // Clear all metrics and initialize only the one we want to test + emitter.metrics = map[string]*runtimeMetric{} + cumulative := desc.Cumulative + if desc.Name == "/sched/latencies:seconds" { + cumulative = true + } + ddMetricName, err := datadogMetricName(desc.Name) + if err != nil { + panic(fmt.Sprintf("failed to get datadog metric name for %s: %v", desc.Name, err)) + } + emitter.metrics[desc.Name] = &runtimeMetric{ + ddMetricName: ddMetricName, + cumulative: cumulative, + } + + // Initialize the metric values first + emitter.update() // Populate Metrics. Test implicitly expect this to be the only GC cycle to happen before report is finished. runtime.GC() - rms.report() - return mock, rms + emitter.report() + + // Stop the emitter after testing + emitter.Stop() + return mock, emitter } // metricDesc looks up a metric by name and kind. The name alone should be