Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,18 @@ func TestServeWithMemoryProtectionMiddleware(t *testing.T) {

serverToken := "mykey"
serveResource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "authzed/spicedb",
Tag: "ci",
Cmd: []string{"serve", "--log-level=debug", "--grpc-preshared-key", serverToken, "--telemetry-endpoint=\"\""},
Repository: "authzed/spicedb",
Tag: "ci",
Cmd: []string{
"serve",
"--log-level=debug",
"--grpc-preshared-key", serverToken,
"--telemetry-endpoint", "",
// With very low GOMEMLIMIT values, percentage-based dispatch cache defaults
// can round down to zero and fail startup.
"--dispatch-cache-max-cost", "8KiB",
"--dispatch-cluster-cache-max-cost", "8KiB",
},
ExposedPorts: []string{"50051/tcp"},
Env: []string{"GOMEMLIMIT=1B"}, // NOTE: Absurdly low on purpose
}, func(config *docker.HostConfig) {
Expand Down
48 changes: 25 additions & 23 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
initCtx, initCancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer initCancel()

healthChecker, err := pool.NewNodeHealthChecker(url)
healthChecker, err := pool.NewNodeHealthChecker(url, config.prometheusRegisterer)
if err != nil {
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url)
}
Expand All @@ -103,7 +103,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
// interfere with pool setup.
initPoolConfig := readPoolConfig.Copy()
initPoolConfig.MinConns = 1
initPool, err := pool.NewRetryPool(initCtx, "init", initPoolConfig, healthChecker, config.maxRetries, config.connectRate)
initPool, err := pool.NewRetryPool(initCtx, "init", initPoolConfig, healthChecker, config.maxRetries, config.connectRate, config.prometheusRegisterer)
if err != nil {
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url)
}
Expand Down Expand Up @@ -197,23 +197,25 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
gcWindow: config.gcWindow,
watchEnabled: !config.watchDisabled,
schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, false),
prometheusRegisterer: config.prometheusRegisterer,
prometheusUnregisterFunction: func() {},
}
ds.SetNowFunc(ds.headRevisionInternal)

// this ctx and cancel is tied to the lifetime of the datastore
ds.ctx, ds.cancel = context.WithCancel(context.Background())
ds.writePool, err = pool.NewRetryPool(ds.ctx, "write", writePoolConfig, healthChecker, config.maxRetries, config.connectRate)
ds.writePool, err = pool.NewRetryPool(ds.ctx, "write", writePoolConfig, healthChecker, config.maxRetries, config.connectRate, config.prometheusRegisterer)
if err != nil {
ds.cancel()
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url)
}
ds.readPool, err = pool.NewRetryPool(ds.ctx, "read", readPoolConfig, healthChecker, config.maxRetries, config.connectRate)
ds.readPool, err = pool.NewRetryPool(ds.ctx, "read", readPoolConfig, healthChecker, config.maxRetries, config.connectRate, config.prometheusRegisterer)
if err != nil {
ds.cancel()
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url)
}

err = ds.registerPrometheusCollectors(config.enablePrometheusStats)
err = ds.registerPrometheusCollectors(config.prometheusRegisterer, config.enablePrometheusStats)
if err != nil {
ds.cancel()
return nil, err
Expand All @@ -225,6 +227,15 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
// Start goroutines for pruning
if config.enableConnectionBalancing {
log.Ctx(initCtx).Info().Msg("starting cockroach connection balancer")
balancerUnregister, _ := pool.RegisterNodeConnectionBalancerMetrics(config.prometheusRegisterer)
if balancerUnregister != nil {
previousUnregister := ds.prometheusUnregisterFunction
ds.prometheusUnregisterFunction = func() {
previousUnregister()
balancerUnregister()
}
}

ds.pruneGroup, ds.ctx = errgroup.WithContext(ds.ctx)
writePoolBalancer := pool.NewNodeConnectionBalancer(ds.writePool, healthChecker, 5*time.Second)
readPoolBalancer := pool.NewNodeConnectionBalancer(ds.readPool, healthChecker, 5*time.Second)
Expand Down Expand Up @@ -263,7 +274,8 @@ type crdbDatastore struct {

dburl string
readPool, writePool *pool.RetryPool
collectors []prometheus.Collector
prometheusRegisterer prometheus.Registerer
prometheusUnregisterFunction func()
watchBufferLength uint16
watchChangeBufferMaximumSize uint64
watchBufferWriteTimeout time.Duration
Expand Down Expand Up @@ -481,12 +493,8 @@ func (cds *crdbDatastore) Close() error {
}
cds.readPool.Close()
cds.writePool.Close()
for _, collector := range cds.collectors {
ok := prometheus.Unregister(collector)
if !ok {
errs = append(errs, errors.New("could not unregister collector for CRDB datastore"))
}
}
cds.prometheusUnregisterFunction()

return errors.Join(errs...)
}

Expand Down Expand Up @@ -658,7 +666,7 @@ func readClusterTTLNanos(ctx context.Context, conn pgxcommon.DBFuncQuerier) (int
return gcSeconds * 1_000_000_000, nil
}

func (cds *crdbDatastore) registerPrometheusCollectors(enablePrometheusStats bool) error {
func (cds *crdbDatastore) registerPrometheusCollectors(registerer prometheus.Registerer, enablePrometheusStats bool) error {
if !enablePrometheusStats {
return nil
}
Expand All @@ -668,20 +676,14 @@ func (cds *crdbDatastore) registerPrometheusCollectors(enablePrometheusStats boo
"pool_usage": "read",
})

if err := prometheus.Register(readCollector); err != nil {
return fmt.Errorf("failed to register prometheus read collector: %w", err)
}
cds.collectors = append(cds.collectors, readCollector)

writeCollector := pgxpoolprometheus.NewCollector(cds.writePool, map[string]string{
"db_name": "spicedb",
"pool_usage": "write",
})

if err := prometheus.Register(writeCollector); err != nil {
return fmt.Errorf("failed to register prometheus write collector: %w", err)
}
cds.collectors = append(cds.collectors, writeCollector)
unregister, err := datastore.RegisterPrometheusCollectors(registerer, "failed to register crdb pool metrics", readCollector, writeCollector)

return nil
cds.prometheusUnregisterFunction = unregister

return err
}
14 changes: 6 additions & 8 deletions internal/datastore/crdb/crdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,15 +896,15 @@ func TestRegisterPrometheusCollectors(t *testing.T) {
// Create read & write pools
readPoolConfig, err := pgxpool.ParseConfig(fmt.Sprintf("postgres://db:password@pg.example.com:5432/mydb?pool_max_conns=%d", readMaxConns))
require.NoError(t, err)
readPool, err := pool.NewRetryPool(t.Context(), "read", readPoolConfig, nil, 18, 20)
readPool, err := pool.NewRetryPool(t.Context(), "read", readPoolConfig, nil, 18, 20, nil)
require.NoError(t, err)
t.Cleanup(func() {
readPool.Close()
})

writePoolConfig, err := pgxpool.ParseConfig(fmt.Sprintf("postgres://db:password@pg.example.com:5432/mydb?pool_max_conns=%d", writeMaxConns))
require.NoError(t, err)
writePool, err := pool.NewRetryPool(t.Context(), "read", writePoolConfig, nil, 18, 20)
writePool, err := pool.NewRetryPool(t.Context(), "write", writePoolConfig, nil, 18, 20, nil)
require.NoError(t, err)

// Create datastore with those pools
Expand All @@ -913,14 +913,12 @@ func TestRegisterPrometheusCollectors(t *testing.T) {
_ = cds.Close()
})

err = cds.registerPrometheusCollectors(false)
err = cds.registerPrometheusCollectors(prometheus.NewPedanticRegistry(), false)
require.NoError(t, err)
require.Empty(t, cds.collectors)

// Register collectors and verify the values of the metrics
err = cds.registerPrometheusCollectors(true)
err = cds.registerPrometheusCollectors(prometheus.NewPedanticRegistry(), true)
require.NoError(t, err)
require.Len(t, cds.collectors, 2)

metricFamily, err := prometheus.DefaultGatherer.Gather()
require.NoError(t, err)
Expand Down Expand Up @@ -977,9 +975,9 @@ func TestVersionReading(t *testing.T) {
// Set up a raw connection to the DB
initPoolConfig, err := pgxpool.ParseConfig(uri)
require.NoError(err)
checker, err := pool.NewNodeHealthChecker(uri)
checker, err := pool.NewNodeHealthChecker(uri, nil)
require.NoError(err)
initPool, err := pool.NewRetryPool(t.Context(), "pool", initPoolConfig, checker, 18, 20)
initPool, err := pool.NewRetryPool(t.Context(), "pool", initPoolConfig, checker, 18, 20, nil)
require.NoError(err)
t.Cleanup(func() {
initPool.Close()
Expand Down
9 changes: 9 additions & 0 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/utils/ptr"

"github.com/authzed/spicedb/internal/datastore/common"
Expand All @@ -29,6 +30,7 @@ type crdbOptions struct {
enableConnectionBalancing bool
analyzeBeforeStatistics bool
filterMaximumIDCount uint16
prometheusRegisterer prometheus.Registerer
enablePrometheusStats bool
withIntegrity bool
allowedMigrations []string
Expand Down Expand Up @@ -127,6 +129,13 @@ func generateConfig(options []Option) (crdbOptions, error) {
return computed, nil
}

// WithPrometheusRegisterer sets the prometheus.Registerer used for CockroachDB datastore metrics.
func WithPrometheusRegisterer(registerer prometheus.Registerer) Option {
return func(po *crdbOptions) {
po.prometheusRegisterer = registerer
}
}

// ReadConnHealthCheckInterval is the frequency at which both idle and max
// lifetime connections are checked, and also the frequency at which the
// minimum number of connections is checked.
Expand Down
15 changes: 9 additions & 6 deletions internal/datastore/crdb/pool/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"golang.org/x/sync/semaphore"

log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/genutil"
)

Expand All @@ -33,11 +34,6 @@ var (
}, []string{"pool"})
)

func init() {
prometheus.MustRegister(connectionsPerCRDBNodeCountGauge)
prometheus.MustRegister(pruningTimeHistogram)
}

type balancePoolConn[C balanceConn] interface {
Conn() C
Release()
Expand Down Expand Up @@ -67,9 +63,16 @@ type NodeConnectionBalancer struct {
nodeConnectionBalancer[*pgxpool.Conn, *pgx.Conn]
}

// RegisterNodeConnectionBalancerMetrics registers the shared connection balancer collectors.
func RegisterNodeConnectionBalancerMetrics(registerer prometheus.Registerer) (func(), error) {
return datastore.RegisterPrometheusCollectors(registerer, "failed to register crdb connection balancer metrics", connectionsPerCRDBNodeCountGauge, pruningTimeHistogram)
}

// NewNodeConnectionBalancer builds a new nodeConnectionBalancer for a given connection pool and health tracker.
func NewNodeConnectionBalancer(pool *RetryPool, healthTracker *NodeHealthTracker, interval time.Duration) *NodeConnectionBalancer {
return &NodeConnectionBalancer{*newNodeConnectionBalancer[*pgxpool.Conn, *pgx.Conn](pool, healthTracker, interval)}
return &NodeConnectionBalancer{
*newNodeConnectionBalancer[*pgxpool.Conn, *pgx.Conn](pool, healthTracker, interval),
}
}

// nodeConnectionBalancer is generic over underlying connection types for
Expand Down
4 changes: 3 additions & 1 deletion internal/datastore/crdb/pool/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -139,7 +140,8 @@ func TestNodeConnectionBalancerPrune(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tracker, err := NewNodeHealthChecker("")
reg := prometheus.NewRegistry()
tracker, err := NewNodeHealthChecker("", reg)
require.NoError(t, err)
for _, n := range tt.nodes {
tracker.healthyNodes[n] = struct{}{}
Expand Down
19 changes: 10 additions & 9 deletions internal/datastore/crdb/pool/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
)

const errorBurst = 2
Expand All @@ -23,37 +24,37 @@ var healthyCRDBNodeCountGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Help: "the number of healthy crdb nodes detected by spicedb",
})

func init() {
prometheus.MustRegister(healthyCRDBNodeCountGauge)
}

// NodeHealthTracker detects changes in the node pool by polling the cluster periodically and recording
// the node ids that are seen. This is used to detect new nodes that come online that have either previously
// been marked unhealthy due to connection errors or due to scale up.
//
// Consumers can manually mark a node healthy or unhealthy as well.
type NodeHealthTracker struct {
sync.RWMutex
connConfig *pgx.ConnConfig
healthyNodes map[uint32]struct{} // GUARDED_BY(RWMutex)
nodesEverSeen map[uint32]*rate.Limiter // GUARDED_BY(RWMutex)
newLimiter func() *rate.Limiter
connConfig *pgx.ConnConfig
healthyNodes map[uint32]struct{} // GUARDED_BY(RWMutex)
nodesEverSeen map[uint32]*rate.Limiter // GUARDED_BY(RWMutex)
newLimiter func() *rate.Limiter
prometheusUnregisterFunction func()
}

// NewNodeHealthChecker builds a health checker that polls the cluster at the given url.
func NewNodeHealthChecker(url string) (*NodeHealthTracker, error) {
func NewNodeHealthChecker(url string, registerer prometheus.Registerer) (*NodeHealthTracker, error) {
connConfig, err := pgxcommon.ParseConfigWithInstrumentation(url)
if err != nil {
return nil, err
}

unregister, _ := datastore.RegisterPrometheusCollectors(registerer, "failed to register crdb health metrics", healthyCRDBNodeCountGauge)

return &NodeHealthTracker{
connConfig: connConfig,
healthyNodes: make(map[uint32]struct{}, 0),
nodesEverSeen: make(map[uint32]*rate.Limiter, 0),
newLimiter: func() *rate.Limiter {
return rate.NewLimiter(rate.Every(1*time.Minute), errorBurst)
},
prometheusUnregisterFunction: unregister,
}, nil
}

Expand Down
27 changes: 14 additions & 13 deletions internal/datastore/crdb/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/spiceerrors"
)

Expand All @@ -34,10 +35,6 @@ var resetHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Buckets: []float64{0, 1, 2, 5, 10, 20, 50},
})

func init() {
prometheus.MustRegister(resetHistogram)
}

type ctxDisableRetries struct{}

var (
Expand All @@ -51,19 +48,23 @@ type RetryPool struct {
healthTracker *NodeHealthTracker

sync.RWMutex
maxRetries uint8
nodeForConn map[*pgx.Conn]uint32 // GUARDED_BY(RWMutex)
gc map[*pgx.Conn]struct{} // GUARDED_BY(RWMutex)
maxRetries uint8
nodeForConn map[*pgx.Conn]uint32 // GUARDED_BY(RWMutex)
gc map[*pgx.Conn]struct{} // GUARDED_BY(RWMutex)
prometheusUnregisterFunction func()
}

func NewRetryPool(ctx context.Context, name string, config *pgxpool.Config, healthTracker *NodeHealthTracker, maxRetries uint8, connectRate time.Duration) (*RetryPool, error) {
func NewRetryPool(ctx context.Context, name string, config *pgxpool.Config, healthTracker *NodeHealthTracker, maxRetries uint8, connectRate time.Duration, registerer prometheus.Registerer) (*RetryPool, error) {
unregister, _ := datastore.RegisterPrometheusCollectors(registerer, "failed to register crdb pool metrics", resetHistogram)

config = config.Copy()
p := &RetryPool{
id: name,
maxRetries: maxRetries,
healthTracker: healthTracker,
nodeForConn: make(map[*pgx.Conn]uint32, 0),
gc: make(map[*pgx.Conn]struct{}, 0),
id: name,
maxRetries: maxRetries,
healthTracker: healthTracker,
nodeForConn: make(map[*pgx.Conn]uint32, 0),
gc: make(map[*pgx.Conn]struct{}, 0),
prometheusUnregisterFunction: unregister,
}

limiter := rate.NewLimiter(rate.Every(connectRate), 1)
Expand Down
Loading
Loading