-
Notifications
You must be signed in to change notification settings - Fork 4k
feat: implement otlp prom exporter #24158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
ea63eac
c373791
3dc7abf
38664e2
cbc1c77
c05a3a5
0cd6572
7472934
527a99a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,6 +90,12 @@ type Config struct { | |
// DatadogHostname defines the hostname to use when emitting metrics to | ||
// Datadog. Only utilized if MetricsSink is set to "dogstatsd". | ||
DatadogHostname string `mapstructure:"datadog-hostname"` | ||
|
||
// Otlp Exporter fields | ||
OtlpExporterEnabled bool `mapstructure:"otlp-exporter-enabled"` | ||
OtlpCollectorGrpcAddr string `mapstructure:"otlp-collector-grpc-addr"` | ||
PrometheusEndpoint string `mapstructure:"prometheus-endpoint"` | ||
|
||
OtlpServiceName string `mapstructure:"otlp-service-name"` | ||
} | ||
|
||
// Metrics defines a wrapper around application telemetry functionality. It allows | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,153 @@ | ||||||||||||||||||||
package telemetry | ||||||||||||||||||||
|
||||||||||||||||||||
import ( | ||||||||||||||||||||
"context" | ||||||||||||||||||||
"log" | ||||||||||||||||||||
"math" | ||||||||||||||||||||
"net/http" | ||||||||||||||||||||
"time" | ||||||||||||||||||||
|
||||||||||||||||||||
dto "github.com/prometheus/client_model/go" | ||||||||||||||||||||
"github.com/prometheus/common/expfmt" | ||||||||||||||||||||
"go.opentelemetry.io/otel" | ||||||||||||||||||||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" | ||||||||||||||||||||
otmetric "go.opentelemetry.io/otel/metric" | ||||||||||||||||||||
"go.opentelemetry.io/otel/sdk/metric" | ||||||||||||||||||||
"go.opentelemetry.io/otel/sdk/resource" | ||||||||||||||||||||
semconv "go.opentelemetry.io/otel/semconv/v1.21.0" | ||||||||||||||||||||
) | ||||||||||||||||||||
|
||||||||||||||||||||
func StartOtlpExporter(cfg Config) { | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets have a comment on what this does and how it works |
||||||||||||||||||||
ctx := context.Background() | ||||||||||||||||||||
|
||||||||||||||||||||
exporter, err := otlpmetricgrpc.New(ctx, | ||||||||||||||||||||
otlpmetricgrpc.WithEndpoint(cfg.OtlpCollectorGrpcAddr), | ||||||||||||||||||||
otlpmetricgrpc.WithInsecure(), | ||||||||||||||||||||
) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
log.Fatalf("OTLP exporter setup failed: %v", err) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
Comment on lines
+34
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid
- if err != nil {
- log.Fatalf("OTLP exporter setup failed: %v", err)
- }
+ if err != nil {
+ return fmt.Errorf("OTLP exporter setup failed: %w", err)
+ } …and bubble it up to the caller as noted in the 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||
res, _ := resource.New(ctx, resource.WithAttributes( | ||||||||||||||||||||
semconv.ServiceName(cfg.OtlpServiceName), | ||||||||||||||||||||
)) | ||||||||||||||||||||
Comment on lines
+38
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to check the ignored error here? if not, can we comment why we are able to ignore it? |
||||||||||||||||||||
|
||||||||||||||||||||
Comment on lines
+38
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Handle and log the error returned by The second return value is currently discarded. If the resource cannot be created, the exporter will run with incomplete metadata. -res, _ := resource.New(ctx, resource.WithAttributes(
- semconv.ServiceName(cfg.OtlpServiceName),
-))
+res, rErr := resource.New(ctx, resource.WithAttributes(
+ semconv.ServiceName(cfg.OtlpServiceName),
+))
+if rErr != nil {
+ return fmt.Errorf("failed to initialise OTLP resource: %w", rErr)
+} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||
meterProvider := metric.NewMeterProvider( | ||||||||||||||||||||
metric.WithReader(metric.NewPeriodicReader(exporter, | ||||||||||||||||||||
metric.WithInterval(15*time.Second))), | ||||||||||||||||||||
|
||||||||||||||||||||
metric.WithResource(res), | ||||||||||||||||||||
) | ||||||||||||||||||||
otel.SetMeterProvider(meterProvider) | ||||||||||||||||||||
meter := otel.Meter("cosmos-sdk-otlp-exporter") | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
gauges := make(map[string]otmetric.Float64Gauge) | ||||||||||||||||||||
histograms := make(map[string]otmetric.Float64Histogram) | ||||||||||||||||||||
|
||||||||||||||||||||
go func() { | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how does this behave when shutting a node down? Do we want any kind of graceful shutdown here? |
||||||||||||||||||||
for { | ||||||||||||||||||||
if err := scrapeAndPushMetrics(ctx, cfg.PrometheusEndpoint, meter, gauges, histograms); err != nil { | ||||||||||||||||||||
log.Printf("error scraping metrics: %v", err) | ||||||||||||||||||||
} | ||||||||||||||||||||
time.Sleep(15 * time.Second) | ||||||||||||||||||||
} | ||||||||||||||||||||
}() | ||||||||||||||||||||
|
||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func scrapeAndPushMetrics(ctx context.Context, promEndpoint string, meter otmetric.Meter, gauges map[string]otmetric.Float64Gauge, histograms map[string]otmetric.Float64Histogram) error { | ||||||||||||||||||||
resp, err := http.Get(promEndpoint) | ||||||||||||||||||||
|
||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return err | ||||||||||||||||||||
} | ||||||||||||||||||||
defer resp.Body.Close() | ||||||||||||||||||||
|
||||||||||||||||||||
parser := expfmt.TextParser{} | ||||||||||||||||||||
metricsFamilies, err := parser.TextToMetricFamilies(resp.Body) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return err | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
for name, mf := range metricsFamilies { | ||||||||||||||||||||
for _, m := range mf.GetMetric() { | ||||||||||||||||||||
|
||||||||||||||||||||
Check failure on line 71 in telemetry/otlp_exporter.go
|
||||||||||||||||||||
switch { | ||||||||||||||||||||
case m.Gauge != nil: | ||||||||||||||||||||
recordGauge(ctx, meter, gauges, name, mf.GetHelp(), m.Gauge.GetValue()) | ||||||||||||||||||||
|
||||||||||||||||||||
case m.Counter != nil: | ||||||||||||||||||||
recordGauge(ctx, meter, gauges, name, mf.GetHelp(), m.Counter.GetValue()) | ||||||||||||||||||||
|
||||||||||||||||||||
case m.Histogram != nil: | ||||||||||||||||||||
recordHistogram(ctx, meter, histograms, name, mf.GetHelp(), m.Histogram) | ||||||||||||||||||||
|
||||||||||||||||||||
case m.Summary != nil: | ||||||||||||||||||||
continue // TODO: decide whether to support | ||||||||||||||||||||
|
||||||||||||||||||||
default: | ||||||||||||||||||||
continue | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
return nil | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func recordGauge(ctx context.Context, meter otmetric.Meter, gauges map[string]otmetric.Float64Gauge, name, help string, val float64) { | ||||||||||||||||||||
g, ok := gauges[name] | ||||||||||||||||||||
if !ok { | ||||||||||||||||||||
var err error | ||||||||||||||||||||
g, err = meter.Float64Gauge(name, otmetric.WithDescription(help)) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
log.Printf("failed to create gauge %q: %v", name, err) | ||||||||||||||||||||
return | ||||||||||||||||||||
} | ||||||||||||||||||||
gauges[name] = g | ||||||||||||||||||||
} | ||||||||||||||||||||
g.Record(ctx, val) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func recordHistogram(ctx context.Context, meter otmetric.Meter, histograms map[string]otmetric.Float64Histogram, name, help string, h *dto.Histogram) { | ||||||||||||||||||||
boundaries := make([]float64, 0, len(h.Bucket)-1) // excluding +Inf | ||||||||||||||||||||
bucketCounts := make([]uint64, 0, len(h.Bucket)) | ||||||||||||||||||||
|
||||||||||||||||||||
for _, bucket := range h.Bucket { | ||||||||||||||||||||
if math.IsInf(bucket.GetUpperBound(), +1) { | ||||||||||||||||||||
continue // Skip +Inf bucket boundary explicitly | ||||||||||||||||||||
} | ||||||||||||||||||||
Comment on lines
+114
to
+116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we skip this? i know it says +Inf boundary, but as someone who is unfamiliar with OTL, i am not sure what the significance is |
||||||||||||||||||||
boundaries = append(boundaries, bucket.GetUpperBound()) | ||||||||||||||||||||
bucketCounts = append(bucketCounts, bucket.GetCumulativeCount()) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
hist, ok := histograms[name] | ||||||||||||||||||||
if !ok { | ||||||||||||||||||||
var err error | ||||||||||||||||||||
hist, err = meter.Float64Histogram( | ||||||||||||||||||||
name, | ||||||||||||||||||||
otmetric.WithDescription(help), | ||||||||||||||||||||
otmetric.WithExplicitBucketBoundaries(boundaries...), | ||||||||||||||||||||
) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
log.Printf("failed to create histogram %s: %v", name, err) | ||||||||||||||||||||
return | ||||||||||||||||||||
} | ||||||||||||||||||||
histograms[name] = hist | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
prevCount := uint64(0) | ||||||||||||||||||||
for i, count := range bucketCounts { | ||||||||||||||||||||
countInBucket := count - prevCount | ||||||||||||||||||||
prevCount = count | ||||||||||||||||||||
|
||||||||||||||||||||
// Explicitly record the mid-point of the bucket as approximation: | ||||||||||||||||||||
var value float64 | ||||||||||||||||||||
if i == 0 { | ||||||||||||||||||||
value = boundaries[0] / 2.0 | ||||||||||||||||||||
Check noticeCode scanning / CodeQL Floating point arithmetic Note
Floating point arithmetic operations are not associative and a possible source of non-determinism
|
||||||||||||||||||||
} else { | ||||||||||||||||||||
value = (boundaries[i-1] + boundaries[i]) / 2.0 | ||||||||||||||||||||
Check noticeCode scanning / CodeQL Floating point arithmetic Note
Floating point arithmetic operations are not associative and a possible source of non-determinism
Check noticeCode scanning / CodeQL Floating point arithmetic Note
Floating point arithmetic operations are not associative and a possible source of non-determinism
|
||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// Record `countInBucket` number of observations explicitly (approximation): | ||||||||||||||||||||
for j := uint64(0); j < countInBucket; j++ { | ||||||||||||||||||||
hist.Record(ctx, value) | ||||||||||||||||||||
} | ||||||||||||||||||||
Comment on lines
+150
to
+152
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||
} | ||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Exporter is started without lifecycle management or error propagation
StartOtlpExporter
(1) blocks fatal‑logging on failure and (2) launches a goroutine that never stops.Starting it here means:
graceDuration
, tests, etc.).telemetry.New
registers the Prom sink.Recommend returning a cancel/cleanup func and wiring it into the existing
errgroup
, then starting aftertelemetry.New
: