Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/metric v1.43.0
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
)

Expand Down
18 changes: 9 additions & 9 deletions meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,22 @@ const (
pgxPoolEmptyAcquireWaitTime = "pgxpool.empty_acquire_wait_time"
)

// RecordStats records database statistics for provided pgxpool.Pool at a default 1 second interval
// RecordStats records database statistics for provided [pgxpool.Pool] at a default 1 second interval
// unless otherwise specified by the WithMinimumReadDBStatsInterval StatsOption.
//
// Attributes provided via WithStatsAttributes override the library-supplied defaults
// (db.system.name, db.client.connection.pool.name) on key collision, since
// attribute.NewSet applies last-value-wins semantics over the resulting slice.
func RecordStats(db PoolStats, opts ...StatsOption) error {
connCfg := db.Config().ConnConfig
poolName := fmt.Sprintf("%s:%d/%s", connCfg.Host, connCfg.Port, connCfg.Database)

o := statsOptions{
meterProvider: otel.GetMeterProvider(),
minimumReadDBStatsInterval: defaultMinimumReadDBStatsInterval,
defaultAttributes: []attribute.KeyValue{
semconv.DBSystemNamePostgreSQL,
semconv.DBClientConnectionPoolName(poolName),
},
}

Expand Down Expand Up @@ -93,12 +101,6 @@ func recordStats(
lock sync.Mutex
)

serverAddress := semconv.ServerAddress(db.Config().ConnConfig.Host)
serverPort := semconv.ServerPort(int(db.Config().ConnConfig.Port))
dbNamespace := semconv.DBNamespace(db.Config().ConnConfig.Database)
poolName := fmt.Sprintf("%s:%d/%s", serverAddress.Value.AsString(), serverPort.Value.AsInt64(), dbNamespace.Value.AsString())
dbClientConnectionPoolName := semconv.DBClientConnectionPoolName(poolName)

lock.Lock()
defer lock.Unlock()

Expand Down Expand Up @@ -195,8 +197,6 @@ func recordStats(
return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolEmptyAcquireWaitTime, err)
}

attrs = append(attrs, dbClientConnectionPoolName)

observeOptions = []metric.ObserveOption{
metric.WithAttributeSet(attribute.NewSet(attrs...)),
}
Expand Down
99 changes: 99 additions & 0 deletions meter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package otelpgx

import (
"context"
"testing"

"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
semconv "go.opentelemetry.io/otel/semconv/v1.40.0"
)

func TestRecordStats_UserAttrsOverrideLibraryDefaults(t *testing.T) {
ctx := context.Background()

cfg, err := pgxpool.ParseConfig("postgres://user@127.0.0.1:5432/somedb")
if err != nil {
t.Fatalf("pgxpool.ParseConfig: %v", err)
}

pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err != nil {
t.Fatalf("pgxpool.NewWithConfig: %v", err)
}
t.Cleanup(pool.Close)

reader := sdkmetric.NewManualReader()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))

const overridePoolName = "my-logical-pool"

err = RecordStats(pool,
WithStatsMeterProvider(provider),
WithStatsAttributes(semconv.DBClientConnectionPoolName(overridePoolName)),
)
if err != nil {
t.Fatalf("RecordStats: %v", err)
}

var rm metricdata.ResourceMetrics
if err := reader.Collect(ctx, &rm); err != nil {
t.Fatalf("reader.Collect: %v", err)
}

if len(rm.ScopeMetrics) == 0 {
t.Fatal("expected at least one scope metric to be collected")
}

var checked int
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
for _, attrs := range dataPointAttributes(m.Data) {
poolNameVal, ok := attrs.Value(semconv.DBClientConnectionPoolNameKey)
if !ok {
t.Errorf("metric %q: missing %s attribute", m.Name, semconv.DBClientConnectionPoolNameKey)
continue
}
if poolNameVal.AsString() != overridePoolName {
t.Errorf("metric %q: %s = %q, want user-supplied override %q",
m.Name, semconv.DBClientConnectionPoolNameKey, poolNameVal.AsString(), overridePoolName)
}

// db.system.name should still carry the library default since the user did not override it.
dbSystemVal, ok := attrs.Value(semconv.DBSystemNameKey)
if !ok {
t.Errorf("metric %q: missing %s attribute", m.Name, semconv.DBSystemNameKey)
} else if dbSystemVal.AsString() != semconv.DBSystemNamePostgreSQL.Value.AsString() {
t.Errorf("metric %q: %s = %q, want library default %q",
m.Name, semconv.DBSystemNameKey, dbSystemVal.AsString(), semconv.DBSystemNamePostgreSQL.Value.AsString())
}

checked++
}
}
}

if checked == 0 {
t.Fatal("collection produced no data points to verify attributes on")
}
}

// dataPointAttributes returns the attribute set attached to every data point
// in the given aggregation, across the metric shapes that recordStats emits
// (Sum[int64] for counters/up-down counters, Gauge[int64] for the max gauge).
func dataPointAttributes(data metricdata.Aggregation) []attribute.Set {
var sets []attribute.Set
switch d := data.(type) {
case metricdata.Sum[int64]:
for _, dp := range d.DataPoints {
sets = append(sets, dp.Attributes)
}
case metricdata.Gauge[int64]:
for _, dp := range d.DataPoints {
sets = append(sets, dp.Attributes)
}
}
return sets
}
Loading