Skip to content

refactor: simplify runtime metrics collection architecture #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 51 additions & 62 deletions pkg/runtimemetrics/runtime_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,20 @@ func NewEmitter(statsd partialStatsdClientInterface, opts *Options) *Emitter {
if opts == nil {
opts = &Options{}
}
baseTags := append(getBaseTags(), opts.Tags...)
e := &Emitter{
statsd: statsd,
logger: cmp.Or(opts.Logger, slog.Default()),
tags: opts.Tags,
stop: make(chan struct{}),
stopped: make(chan struct{}),
period: cmp.Or(opts.Period, 10*time.Second),
statsd: statsd,
logger: cmp.Or(opts.Logger, slog.Default()),
tags: opts.Tags,
stop: make(chan struct{}),
stopped: make(chan struct{}),
period: cmp.Or(opts.Period, 10*time.Second),
metrics: map[string]*runtimeMetric{},
baseTags: baseTags,
unknownMetricLogOnce: &sync.Once{},
unsupportedKindLogOnce: &sync.Once{},
}
e.initializeMetrics()
go e.emit()
return e
}
Expand All @@ -60,15 +66,18 @@ type Emitter struct {

stop chan struct{}
stopped chan struct{}

// the map key is the name of the metric in runtime/metrics
metrics map[string]*runtimeMetric
baseTags []string
unknownMetricLogOnce *sync.Once
unsupportedKindLogOnce *sync.Once
}

// emit emits runtime/metrics to statsd on a regular interval.
func (e *Emitter) emit() {
descs := metrics.All()
tags := append(getBaseTags(), e.tags...)
rms := newRuntimeMetricStore(descs, e.statsd, e.logger, tags)
// TODO: Go services experiencing high scheduling latency might see a
// large variance for the period in between rms.report calls. This might
// large variance for the period in between e.report calls. This might
// cause spikes in cumulative metric reporting. Should we try to correct
// for this by measuring the actual reporting time delta to adjust
// the numbers?
Expand All @@ -87,7 +96,7 @@ func (e *Emitter) emit() {
close(e.stopped)
return
case <-tick:
rms.report()
e.report()
}
}
}
Expand All @@ -112,16 +121,6 @@ type runtimeMetric struct {
previousValue metrics.Value
}

// the map key is the name of the metric in runtime/metrics
type runtimeMetricStore struct {
metrics map[string]*runtimeMetric
statsd partialStatsdClientInterface
logger *slog.Logger
baseTags []string
unknownMetricLogOnce *sync.Once
unsupportedKindLogOnce *sync.Once
}

// partialStatsdClientInterface is the subset of statsd.ClientInterface that is
// used by this package.
type partialStatsdClientInterface interface {
Expand All @@ -133,16 +132,8 @@ type partialStatsdClientInterface interface {
DistributionSamples(name string, values []float64, tags []string, rate float64) error
}

func newRuntimeMetricStore(descs []metrics.Description, statsdClient partialStatsdClientInterface, logger *slog.Logger, tags []string) runtimeMetricStore {
rms := runtimeMetricStore{
metrics: map[string]*runtimeMetric{},
statsd: statsdClient,
logger: logger,
baseTags: tags,
unknownMetricLogOnce: &sync.Once{},
unsupportedKindLogOnce: &sync.Once{},
}

func (e *Emitter) initializeMetrics() {
descs := metrics.All()
for _, d := range descs {
cumulative := d.Cumulative

Expand All @@ -155,49 +146,47 @@ func newRuntimeMetricStore(descs []metrics.Description, statsdClient partialStat

ddMetricName, err := datadogMetricName(d.Name)
if err != nil {
rms.logger.Warn("runtimemetrics: not reporting one of the runtime metrics", slog.Attr{Key: "error", Value: slog.StringValue(err.Error())})
e.logger.Warn("runtimemetrics: not reporting one of the runtime metrics", slog.Attr{Key: "error", Value: slog.StringValue(err.Error())})
continue
}

rms.metrics[d.Name] = &runtimeMetric{
e.metrics[d.Name] = &runtimeMetric{
ddMetricName: ddMetricName,
cumulative: cumulative,
}
}

rms.update()

return rms
e.update()
}

func (rms runtimeMetricStore) update() {
func (e *Emitter) update() {
// TODO: Reuse this slice to avoid allocations? Note: I don't see these
// allocs show up in profiling.
samples := make([]metrics.Sample, len(rms.metrics))
samples := make([]metrics.Sample, len(e.metrics))
i := 0
// NOTE: Map iteration in Go is randomized, so we end up randomizing the
// samples slice. In theory this should not impact correctness, but it's
// worth keeping in mind in case problems are observed in the future.
for name := range rms.metrics {
for name := range e.metrics {
samples[i].Name = name
i++
}
metrics.Read(samples)
for _, s := range samples {
runtimeMetric := rms.metrics[s.Name]
runtimeMetric := e.metrics[s.Name]

runtimeMetric.previousValue = runtimeMetric.currentValue
runtimeMetric.currentValue = s.Value
}
}

func (rms runtimeMetricStore) report() {
func (e *Emitter) report() {
ts := time.Now()
rms.update()
e.update()
samples := []distributionSample{}

rms.statsd.GaugeWithTimestamp(datadogMetricPrefix+"enabled", 1, rms.baseTags, 1, ts)
for name, rm := range rms.metrics {
e.statsd.GaugeWithTimestamp(datadogMetricPrefix+"enabled", 1, e.baseTags, 1, ts)
for name, rm := range e.metrics {
switch rm.currentValue.Kind() {
case metrics.KindUint64:
v := rm.currentValue.Uint64()
Expand All @@ -219,10 +208,10 @@ func (rms runtimeMetricStore) report() {
// This is known to happen with the '/memory/classes/heap/unused:bytes' metric: https://github.com/golang/go/blob/go1.22.1/src/runtime/metrics.go#L364
// Until this bug is fixed, we log the problematic value and skip submitting that point to avoid spurious spikes in graphs.
if v > math.MaxUint64/2 {
tags := make([]string, 0, len(rms.baseTags)+1)
tags = append(tags, rms.baseTags...)
tags := make([]string, 0, len(e.baseTags)+1)
tags = append(tags, e.baseTags...)
tags = append(tags, "metric_name:"+rm.ddMetricName)
rms.statsd.CountWithTimestamp(datadogMetricPrefix+"skipped_values", 1, tags, 1, ts)
e.statsd.CountWithTimestamp(datadogMetricPrefix+"skipped_values", 1, tags, 1, ts)

// Some metrics are ~sort of expected to report this high value (e.g.
// "runtime.go.metrics.gc_gogc.percent" will consistently report "MaxUint64 - 1" if
Expand All @@ -241,18 +230,18 @@ func (rms runtimeMetricStore) report() {
}

// Append all Uint64 values for maximum observability
for name, rm := range rms.metrics {
for name, rm := range e.metrics {
if rm.currentValue.Kind() == metrics.KindUint64 {
logAttrs = append(logAttrs, slog.Attr{Key: name, Value: slog.Uint64Value(rm.currentValue.Uint64())})
}
}

rms.logger.Warn("runtimemetrics: skipped submission of absurd value", logAttrs...)
e.logger.Warn("runtimemetrics: skipped submission of absurd value", logAttrs...)
}
continue
}

rms.statsd.GaugeWithTimestamp(rm.ddMetricName, float64(v), rms.baseTags, 1, ts)
e.statsd.GaugeWithTimestamp(rm.ddMetricName, float64(v), e.baseTags, 1, ts)
case metrics.KindFloat64:
v := rm.currentValue.Float64()
// if the value didn't change between two reporting
Expand All @@ -264,7 +253,7 @@ func (rms runtimeMetricStore) report() {
if rm.cumulative && v != 0 && v == rm.previousValue.Float64() {
continue
}
rms.statsd.GaugeWithTimestamp(rm.ddMetricName, v, rms.baseTags, 1, ts)
e.statsd.GaugeWithTimestamp(rm.ddMetricName, v, e.baseTags, 1, ts)
case metrics.KindFloat64Histogram:
v := rm.currentValue.Float64Histogram()
var equal bool
Expand All @@ -284,30 +273,30 @@ func (rms runtimeMetricStore) report() {
values := make([]float64, len(distSamples))
for i, ds := range distSamples {
values[i] = ds.Value
rms.statsd.DistributionSamples(rm.ddMetricName, values[i:i+1], rms.baseTags, ds.Rate)
e.statsd.DistributionSamples(rm.ddMetricName, values[i:i+1], e.baseTags, ds.Rate)
}

stats := statsFromHist(v)
// TODO: Could/should we use datadog distribution metrics for this?
rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".avg", stats.Avg, rms.baseTags, 1, ts)
rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".min", stats.Min, rms.baseTags, 1, ts)
rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".max", stats.Max, rms.baseTags, 1, ts)
rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".median", stats.Median, rms.baseTags, 1, ts)
rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".p95", stats.P95, rms.baseTags, 1, ts)
rms.statsd.GaugeWithTimestamp(rm.ddMetricName+".p99", stats.P99, rms.baseTags, 1, ts)
e.statsd.GaugeWithTimestamp(rm.ddMetricName+".avg", stats.Avg, e.baseTags, 1, ts)
e.statsd.GaugeWithTimestamp(rm.ddMetricName+".min", stats.Min, e.baseTags, 1, ts)
e.statsd.GaugeWithTimestamp(rm.ddMetricName+".max", stats.Max, e.baseTags, 1, ts)
e.statsd.GaugeWithTimestamp(rm.ddMetricName+".median", stats.Median, e.baseTags, 1, ts)
e.statsd.GaugeWithTimestamp(rm.ddMetricName+".p95", stats.P95, e.baseTags, 1, ts)
e.statsd.GaugeWithTimestamp(rm.ddMetricName+".p99", stats.P99, e.baseTags, 1, ts)
case metrics.KindBad:
// This should never happen because all metrics are supported
// by construction.
rms.unknownMetricLogOnce.Do(func() {
rms.logger.Error("runtimemetrics: encountered an unknown metric, this should never happen and might indicate a bug", slog.Attr{Key: "metric_name", Value: slog.StringValue(name)})
e.unknownMetricLogOnce.Do(func() {
e.logger.Error("runtimemetrics: encountered an unknown metric, this should never happen and might indicate a bug", slog.Attr{Key: "metric_name", Value: slog.StringValue(name)})
})
default:
// This may happen as new metric kinds get added.
//
// The safest thing to do here is to simply log it somewhere once
// as something to look into, but ignore it for now.
rms.unsupportedKindLogOnce.Do(func() {
rms.logger.Error("runtimemetrics: unsupported metric kind, support for that kind should be added in pkg/runtimemetrics",
e.unsupportedKindLogOnce.Do(func() {
e.logger.Error("runtimemetrics: unsupported metric kind, support for that kind should be added in pkg/runtimemetrics",
slog.Attr{Key: "metric_name", Value: slog.StringValue(name)},
slog.Attr{Key: "kind", Value: slog.AnyValue(rm.currentValue.Kind())},
)
Expand Down
67 changes: 45 additions & 22 deletions pkg/runtimemetrics/runtime_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func TestMetricKinds(t *testing.T) {
t.Run("Cumulative", func(t *testing.T) {
// Note: This test could fail if an unexpected GC occurs. This
// should be extremely unlikely.
mock, rms := reportMetric("/gc/cycles/total:gc-cycles", metrics.KindUint64)
mock, emitter := reportMetric("/gc/cycles/total:gc-cycles", metrics.KindUint64)
require.GreaterOrEqual(t, mockCallWithSuffix(t, mock.GaugeCalls(), ".gc_cycles_total.gc_cycles").value, 1.0)
// Note: Only these two GC cycles are expected to occur here
runtime.GC()
runtime.GC()
rms.report()
emitter.report()
calls := mockCallsWithSuffix(mock.GaugeCalls(), ".gc_cycles_total.gc_cycles")
require.Equal(t, 2, len(calls))
require.Greater(t, calls[1].value, calls[0].value)
Expand All @@ -103,14 +103,14 @@ func TestMetricKinds(t *testing.T) {
t.Run("Cumulative", func(t *testing.T) {
// Note: This test could fail if we get extremely unlucky with the
// scheduling. This should be extremely unlikely.
mock, rms := reportMetric("/sync/mutex/wait/total:seconds", metrics.KindFloat64)
mock, emitter := reportMetric("/sync/mutex/wait/total:seconds", metrics.KindFloat64)

// With Go 1.22: mutex wait sometimes increments when calling runtime.GC().
// This does not seem to happen with Go <= 1.21
beforeCalls := mockCallsWithSuffix(mock.GaugeCalls(), ".sync_mutex_wait_total.seconds")
require.LessOrEqual(t, len(beforeCalls), 1)
createLockContention(100 * time.Millisecond)
rms.report()
emitter.report()
afterCalls := mockCallsWithSuffix(mock.GaugeCalls(), ".sync_mutex_wait_total.seconds")
require.Equal(t, len(beforeCalls)+1, len(afterCalls))
require.Greater(t, afterCalls[len(afterCalls)-1].value, 0.0)
Expand All @@ -133,7 +133,7 @@ func TestMetricKinds(t *testing.T) {
summaries := []string{"avg", "min", "max", "median", "p95", "p99"}
// Note: This test could fail if an unexpected GC occurs. This
// should be extremely unlikely.
mock, rms := reportMetric("/gc/pauses:seconds", metrics.KindFloat64Histogram)
mock, emitter := reportMetric("/gc/pauses:seconds", metrics.KindFloat64Histogram)
calls1 := mockCallsWith(mock.GaugeCalls(), func(c statsdCall[float64]) bool {
return strings.Contains(c.name, ".gc_pauses.seconds.")
})
Expand Down Expand Up @@ -162,15 +162,15 @@ func TestMetricKinds(t *testing.T) {
if !found {
t.Errorf("missing %s metric", want)
}
rms.report()
emitter.report()
// Note: No GC cycle is expected to occur here
calls2 := mockCallsWith(mock.GaugeCalls(), func(c statsdCall[float64]) bool {
return strings.Contains(c.name, ".gc_pauses.seconds.")
})
require.Equal(t, len(summaries), len(calls2))
// Note: Only this GC cycle is expected to occur here
runtime.GC()
rms.report()
emitter.report()
calls3 := mockCallsWith(mock.GaugeCalls(), func(c statsdCall[float64]) bool {
return strings.Contains(c.name, ".gc_pauses.seconds.")
})
Expand All @@ -183,19 +183,20 @@ func TestMetricKinds(t *testing.T) {
// metrics and check that we don't crash or produce a very unexpected number of
// metrics.
func TestSmoke(t *testing.T) {
// Initialize store for all metrics with a mocked statsd client.
descs := metrics.All()
// Initialize emitter for all metrics with a mocked statsd client.
mock := &statsdClientMock{}
rms := newRuntimeMetricStore(descs, mock, slog.Default(), []string{})
emitter := NewEmitter(mock, &Options{Logger: slog.Default()})
defer emitter.Stop()

// This poulates most runtime/metrics.
runtime.GC()

// But nothing should be sent to statsd yet.
assert.Equal(t, 0, len(mock.GaugeCalls()))
// But nothing should be sent to statsd yet initially.
// Give it a moment for potential initial emissions
time.Sleep(1 * time.Millisecond)

// Flush the current metrics to our statsd mock.
rms.report()
emitter.report()

// The exact number of statsd calls depends on the metric values and may
// also change as new version of Go are being released. So we assert that we
Expand All @@ -211,31 +212,53 @@ func TestSmoke(t *testing.T) {
// and discarding them in a statsd mock. This can be used as a stress test,
// identify regressions and to inform decisions about pollFrequency.
func BenchmarkReport(b *testing.B) {
// Initialize store for all metrics with a mocked statsd client.
descs := metrics.All()
// Initialize emitter for all metrics with a mocked statsd client.
mock := &statsdClientMock{Discard: true}
rms := newRuntimeMetricStore(descs, mock, slog.Default(), []string{})
emitter := NewEmitter(mock, &Options{Logger: slog.Default()})
defer emitter.Stop()

// Benchmark report method
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
rms.report()
emitter.report()
}
}

// reportMetric creates a metrics store for the given metric, hooks it up to a
// reportMetric creates an emitter for the given metric, hooks it up to a
// mock statsd client, triggers a GC cycle, calls report, and then returns
// both. Callers are expected to observe the calls recorded by the mock and/or
// trigger more activity.
func reportMetric(name string, kind metrics.ValueKind) (*statsdClientMock, runtimeMetricStore) {
func reportMetric(name string, kind metrics.ValueKind) (*statsdClientMock, *Emitter) {
desc := metricDesc(name, kind)
mock := &statsdClientMock{}
rms := newRuntimeMetricStore([]metrics.Description{desc}, mock, slog.Default(), []string{})
// Create a new emitter with just the specific metric
emitter := NewEmitter(mock, &Options{Logger: slog.Default()})

// Clear all metrics and initialize only the one we want to test
emitter.metrics = map[string]*runtimeMetric{}
cumulative := desc.Cumulative
if desc.Name == "/sched/latencies:seconds" {
cumulative = true
}
ddMetricName, err := datadogMetricName(desc.Name)
if err != nil {
panic(fmt.Sprintf("failed to get datadog metric name for %s: %v", desc.Name, err))
}
emitter.metrics[desc.Name] = &runtimeMetric{
ddMetricName: ddMetricName,
cumulative: cumulative,
}

// Initialize the metric values first
emitter.update()
// Populate Metrics. Test implicitly expect this to be the only GC cycle to happen before report is finished.
runtime.GC()
rms.report()
return mock, rms
emitter.report()

// Stop the emitter after testing
emitter.Stop()
return mock, emitter
}

// metricDesc looks up a metric by name and kind. The name alone should be
Expand Down
Loading