diff --git a/.github/workflows/e2e_integration_test.yaml b/.github/workflows/e2e_integration_test.yaml index 691bb163..78724bf8 100644 --- a/.github/workflows/e2e_integration_test.yaml +++ b/.github/workflows/e2e_integration_test.yaml @@ -34,7 +34,7 @@ jobs: shell: bash - name: Run Docker Compose (Without RPC) - run: docker compose -f docker-compose.yaml up --build -V -d --scale stellar-rpc=0 + run: docker compose -f docker-compose.yaml up --build -V -d api ingest --wait --scale stellar-rpc=0 shell: bash - name: Run Integration Tests diff --git a/cmd/ingest.go b/cmd/ingest.go index f28fa7d8..9068913a 100644 --- a/cmd/ingest.go +++ b/cmd/ingest.go @@ -29,6 +29,7 @@ func (c *ingestCmd) Command() *cobra.Command { utils.StartLedgerOption(&cfg.StartLedger), utils.EndLedgerOption(&cfg.EndLedger), utils.NetworkPassphraseOption(&cfg.NetworkPassphrase), + utils.IngestServerPortOption(&cfg.ServerPort), { Name: "ledger-cursor-name", Usage: "Name of last synced ledger cursor, used to keep track of the last ledger ingested by the service. When starting up, ingestion will resume from the ledger number stored in this record. It should be an unique name per container as different containers would overwrite the cursor value of its peers when using the same cursor name.", @@ -37,14 +38,6 @@ func (c *ingestCmd) Command() *cobra.Command { FlagDefault: "live_ingest_cursor", Required: true, }, - { - Name: "start", - Usage: "Ledger number from which ingestion should start. When not present, ingestion will resume from last synced ledger.", - OptType: types.Int, - ConfigKey: &cfg.StartLedger, - FlagDefault: 0, - Required: false, - }, } cmd := &cobra.Command{ diff --git a/cmd/utils/global_options.go b/cmd/utils/global_options.go index de3fcd90..1946dc60 100644 --- a/cmd/utils/global_options.go +++ b/cmd/utils/global_options.go @@ -10,6 +10,17 @@ import ( "github.com/stellar/wallet-backend/internal/signing" ) +func IngestServerPortOption(configKey *int) *config.ConfigOption { + return &config.ConfigOption{ + Name: "ingest-server-port", + Usage: "The port for the ingest server.", + OptType: types.Int, + ConfigKey: configKey, + FlagDefault: 8002, + Required: false, + } +} + func DatabaseURLOption(configKey *string) *config.ConfigOption { return &config.ConfigOption{ Name: "database-url", @@ -144,7 +155,7 @@ func DistributionAccountSignatureClientProviderOption(configKey *signing.Signatu func StartLedgerOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ Name: "start-ledger", - Usage: "ledger number to start getting transactions from", + Usage: "ledger number from which ingestion should start. When not present, ingestion will resume from last synced ledger.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 0, diff --git a/docker-compose.yaml b/docker-compose.yaml index 7d6484cb..4236d21f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -38,7 +38,7 @@ services: container_name: api image: stellar/wallet-backend:development healthcheck: - test: "curl --fail --silent --show-error --location 'http://localhost:8001/health' | grep -q '\"status\": \"pass\"'" + test: "curl --fail --silent --show-error --location 'http://localhost:8001/health' | grep -q '\"status\": \"ok\"'" interval: 10s timeout: 10s retries: 3 @@ -50,6 +50,8 @@ services: condition: service_healthy stellar-rpc: condition: service_started + ingest: + condition: service_healthy ports: - 8001:8001 entrypoint: "" @@ -57,7 +59,6 @@ services: - sh - -c - | - ./wallet-backend migrate up ./wallet-backend channel-account ensure ${NUMBER_CHANNEL_ACCOUNTS:-2} ./wallet-backend serve environment: @@ -95,21 +96,36 @@ services: ingest: container_name: ingest image: stellar/wallet-backend:development + healthcheck: + test: "curl --fail --silent --show-error --location 'http://localhost:8002/health' | grep -q '\"status\": \"ok\"'" + interval: 10s + timeout: 10s + retries: 3 build: context: ./ dockerfile: Dockerfile depends_on: db: condition: service_healthy - api: - condition: service_healthy stellar-rpc: condition: service_started entrypoint: "" command: - sh - -c - - ./wallet-backend ingest + - | + ./wallet-backend migrate up + if [ "$STELLAR_ENVIRONMENT" = "GITHUB_WORKFLOW" ]; then + HEALTH_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1,"method":"getHealth"}' "${RPC_URL}") + LATEST_LEDGER=$(echo "$$HEALTH_RESPONSE" | grep -oE '"latestLedger":[0-9]+' | grep -oE '[0-9]+' || true) + if [ -z "$$LATEST_LEDGER" ] || [ "$$LATEST_LEDGER" = "" ]; then + ./wallet-backend ingest + else + ./wallet-backend ingest --start-ledger "$$LATEST_LEDGER" + fi + else + ./wallet-backend ingest + fi environment: RPC_URL: ${RPC_URL:-http://stellar-rpc:8000} DATABASE_URL: postgres://postgres@db:5432/wallet-backend?sslmode=disable diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index dcfdfada..c0a276ce 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "net/http" + "os" + "os/signal" + "syscall" "time" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -14,13 +17,19 @@ import ( "github.com/stellar/wallet-backend/internal/data" "github.com/stellar/wallet-backend/internal/db" "github.com/stellar/wallet-backend/internal/metrics" + httphandler "github.com/stellar/wallet-backend/internal/serve/httphandler" "github.com/stellar/wallet-backend/internal/services" "github.com/stellar/wallet-backend/internal/signing/store" cache "github.com/stellar/wallet-backend/internal/store" ) +const ( + ServerShutdownTimeout = 10 * time.Second +) + type Configs struct { DatabaseURL string + ServerPort int LedgerCursorName string StartLedger int EndLedger int @@ -76,13 +85,49 @@ func setupDeps(cfg Configs) (services.IngestService, error) { return nil, fmt.Errorf("instantiating ingest service: %w", err) } - http.Handle("/ingest-metrics", promhttp.HandlerFor(metricsService.GetRegistry(), promhttp.HandlerOpts{})) + // Start ingest server which serves metrics and health check endpoints. + server := startServers(cfg, models, rpcService, metricsService) + + // Wait for termination signal to gracefully shut down the server. + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) go func() { - err := http.ListenAndServe(":8002", nil) - if err != nil { - log.Ctx(context.Background()).Fatalf("starting ingest metrics server: %v", err) + <-quit + log.Info("Shutting down server...") + + ctx, cancel := context.WithTimeout(context.Background(), ServerShutdownTimeout) + defer cancel() + + if err := server.Shutdown(ctx); err != nil { + log.Errorf("Server forced to shutdown: %v", err) } + log.Info("Server gracefully stopped") }() return ingestService, nil } + +// startServers initializes and starts the ingest server which serves metrics and health check endpoints. +func startServers(cfg Configs, models *data.Models, rpcService services.RPCService, metricsSvc metrics.MetricsService) *http.Server { + mux := http.NewServeMux() + server := &http.Server{ + Addr: fmt.Sprintf(":%d", cfg.ServerPort), + Handler: mux, + } + + healthHandler := httphandler.HealthHandler{ + Models: models, + RPCService: rpcService, + AppTracker: cfg.AppTracker, + } + mux.Handle("/ingest-metrics", promhttp.HandlerFor(metricsSvc.GetRegistry(), promhttp.HandlerOpts{})) + mux.Handle("/health", http.HandlerFunc(healthHandler.GetHealth)) + + go func() { + if err := server.ListenAndServe(); err != http.ErrServerClosed { + log.Ctx(context.Background()).Fatalf("starting server on %s: %v", server.Addr, err) + } + }() + + return server +} diff --git a/internal/serve/httperror/errors.go b/internal/serve/httperror/errors.go index 45037a43..a94a908d 100644 --- a/internal/serve/httperror/errors.go +++ b/internal/serve/httperror/errors.go @@ -79,3 +79,21 @@ func InternalServerError(ctx context.Context, message string, err error, extras Extras: extras, } } + +func ServiceUnavailable(ctx context.Context, message string, err error, extras map[string]interface{}, appTracker apptracker.AppTracker) *ErrorResponse { + if message == "" { + message = "The service is unavailable." + } + log.Ctx(ctx).Error(err) + if appTracker != nil { + appTracker.CaptureException(err) + } else { + log.Warn("App Tracker is nil") + } + + return &ErrorResponse{ + Status: http.StatusServiceUnavailable, + Error: message, + Extras: extras, + } +} diff --git a/internal/serve/httphandler/health.go b/internal/serve/httphandler/health.go new file mode 100644 index 00000000..3b7ad4c1 --- /dev/null +++ b/internal/serve/httphandler/health.go @@ -0,0 +1,61 @@ +package httphandler + +import ( + "errors" + "fmt" + "net/http" + + "github.com/stellar/go/support/render/httpjson" + + "github.com/stellar/wallet-backend/internal/apptracker" + "github.com/stellar/wallet-backend/internal/data" + "github.com/stellar/wallet-backend/internal/serve/httperror" + "github.com/stellar/wallet-backend/internal/services" +) + +type HealthHandler struct { + Models *data.Models + RPCService services.RPCService + AppTracker apptracker.AppTracker +} + +const ( + ledgerCursorName = "live_ingest_cursor" + ledgerHealthThreshold = uint32(50) +) + +func (h HealthHandler) GetHealth(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + rpcHealth, err := h.RPCService.GetHealth() + if err != nil { + err = fmt.Errorf("failed to get RPC health: %w", err) + httperror.InternalServerError(ctx, err.Error(), err, nil, h.AppTracker).Render(w) + return + } + if rpcHealth.Status != "healthy" { + err = errors.New("rpc is not healthy") + httperror.ServiceUnavailable(ctx, err.Error(), err, nil, h.AppTracker).Render(w) + return + } + + backendLatestLedger, err := h.Models.IngestStore.GetLatestLedgerSynced(ctx, ledgerCursorName) + if err != nil { + err = fmt.Errorf("failed to get backend latest ledger: %w", err) + httperror.InternalServerError(ctx, err.Error(), err, nil, h.AppTracker).Render(w) + return + } + if rpcHealth.LatestLedger-backendLatestLedger > ledgerHealthThreshold { + err = errors.New("wallet backend is not in sync with the RPC") + httperror.ServiceUnavailable(ctx, err.Error(), err, map[string]interface{}{ + "rpc_latest_ledger": rpcHealth.LatestLedger, + "backend_latest_ledger": backendLatestLedger, + }, h.AppTracker).Render(w) + return + } + + httpjson.Render(w, map[string]any{ + "status": "ok", + "backend_latest_ledger": backendLatestLedger, + }, httpjson.JSON) +} diff --git a/internal/serve/httphandler/health_test.go b/internal/serve/httphandler/health_test.go new file mode 100644 index 00000000..8e407eb1 --- /dev/null +++ b/internal/serve/httphandler/health_test.go @@ -0,0 +1,186 @@ +// Health handler tests for wallet backend health check endpoint +package httphandler + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/stellar/wallet-backend/internal/apptracker" + "github.com/stellar/wallet-backend/internal/data" + "github.com/stellar/wallet-backend/internal/db" + "github.com/stellar/wallet-backend/internal/db/dbtest" + "github.com/stellar/wallet-backend/internal/entities" + "github.com/stellar/wallet-backend/internal/metrics" + "github.com/stellar/wallet-backend/internal/services" +) + +func TestHealthHandler_GetHealth(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + mockMetricsService := metrics.NewMockMetricsService() + mockMetricsService.On("ObserveDBQueryDuration", "SELECT", "ingest_store", mock.AnythingOfType("float64")).Return() + mockMetricsService.On("IncDBQuery", "SELECT", "ingest_store").Return() + defer mockMetricsService.AssertExpectations(t) + + models, err := data.NewModels(dbConnectionPool, mockMetricsService) + require.NoError(t, err) + + t.Run("healthy - RPC and backend in sync", func(t *testing.T) { + ctx := context.Background() + _, err := dbConnectionPool.ExecContext(ctx, "DELETE FROM ingest_store WHERE key = 'live_ingest_cursor'") + require.NoError(t, err) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO ingest_store (key, value) VALUES ('live_ingest_cursor', $1)", uint32(98)) + require.NoError(t, err) + + mockRPCService := &services.RPCServiceMock{} + mockAppTracker := apptracker.NewMockAppTracker(t) + mockAppTracker.On("CaptureException", mock.Anything).Return().Maybe() + defer mockAppTracker.AssertExpectations(t) + + handler := &HealthHandler{ + Models: models, + RPCService: mockRPCService, + AppTracker: mockAppTracker, + } + + rpcHealthResult := entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + } + mockRPCService.On("GetHealth").Return(rpcHealthResult, nil) + defer mockRPCService.AssertExpectations(t) + + req := httptest.NewRequest(http.MethodGet, "/health", nil) + recorder := httptest.NewRecorder() + handler.GetHealth(recorder, req) + assert.Equal(t, http.StatusOK, recorder.Code) + + var response map[string]any + err = json.Unmarshal(recorder.Body.Bytes(), &response) + require.NoError(t, err) + + assert.Equal(t, "ok", response["status"]) + assert.Equal(t, float64(98), response["backend_latest_ledger"]) + + _, cleanupErr := dbConnectionPool.ExecContext(ctx, "DELETE FROM ingest_store WHERE key = 'live_ingest_cursor'") + require.NoError(t, cleanupErr) + }) + + t.Run("unhealthy - RPC service error", func(t *testing.T) { + ctx := context.Background() + _, err := dbConnectionPool.ExecContext(ctx, "DELETE FROM ingest_store WHERE key = 'live_ingest_cursor'") + require.NoError(t, err) + + mockRPCService := &services.RPCServiceMock{} + mockAppTracker := apptracker.NewMockAppTracker(t) + mockAppTracker.On("CaptureException", mock.Anything).Return().Maybe() + defer mockAppTracker.AssertExpectations(t) + + handler := &HealthHandler{ + Models: models, + RPCService: mockRPCService, + AppTracker: mockAppTracker, + } + + mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{}, errors.New("RPC connection failed")) + defer mockRPCService.AssertExpectations(t) + + req := httptest.NewRequest(http.MethodGet, "/health", nil) + recorder := httptest.NewRecorder() + handler.GetHealth(recorder, req) + assert.Equal(t, http.StatusInternalServerError, recorder.Code) + assert.Contains(t, recorder.Body.String(), "failed to get RPC health") + + _, cleanupErr := dbConnectionPool.ExecContext(ctx, "DELETE FROM ingest_store WHERE key = 'live_ingest_cursor'") + require.NoError(t, cleanupErr) + }) + + t.Run("unhealthy - RPC status not healthy", func(t *testing.T) { + ctx := context.Background() + _, err := dbConnectionPool.ExecContext(ctx, "DELETE FROM ingest_store WHERE key = 'live_ingest_cursor'") + require.NoError(t, err) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO ingest_store (key, value) VALUES ('live_ingest_cursor', $1)", uint32(98)) + require.NoError(t, err) + + mockRPCService := &services.RPCServiceMock{} + mockAppTracker := apptracker.NewMockAppTracker(t) + mockAppTracker.On("CaptureException", mock.Anything).Return().Maybe() + defer mockAppTracker.AssertExpectations(t) + + handler := &HealthHandler{ + Models: models, + RPCService: mockRPCService, + AppTracker: mockAppTracker, + } + + rpcHealthResult := entities.RPCGetHealthResult{ + Status: "unhealthy", + LatestLedger: 100, + } + mockRPCService.On("GetHealth").Return(rpcHealthResult, nil) + defer mockRPCService.AssertExpectations(t) + + req := httptest.NewRequest(http.MethodGet, "/health", nil) + recorder := httptest.NewRecorder() + handler.GetHealth(recorder, req) + assert.Equal(t, http.StatusServiceUnavailable, recorder.Code) + + _, cleanupErr := dbConnectionPool.ExecContext(ctx, "DELETE FROM ingest_store WHERE key = 'live_ingest_cursor'") + require.NoError(t, cleanupErr) + }) + + t.Run("unhealthy - backend significantly behind", func(t *testing.T) { + ctx := context.Background() + _, err := dbConnectionPool.ExecContext(ctx, "DELETE FROM ingest_store WHERE key = 'live_ingest_cursor'") + require.NoError(t, err) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO ingest_store (key, value) VALUES ('live_ingest_cursor', $1)", uint32(900)) + require.NoError(t, err) + + mockRPCService := &services.RPCServiceMock{} + mockAppTracker := apptracker.NewMockAppTracker(t) + mockAppTracker.On("CaptureException", mock.Anything).Return().Maybe() + defer mockAppTracker.AssertExpectations(t) + + handler := &HealthHandler{ + Models: models, + RPCService: mockRPCService, + AppTracker: mockAppTracker, + } + + rpcHealthResult := entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 1000, + } + mockRPCService.On("GetHealth").Return(rpcHealthResult, nil) + defer mockRPCService.AssertExpectations(t) + + req := httptest.NewRequest(http.MethodGet, "/health", nil) + recorder := httptest.NewRecorder() + handler.GetHealth(recorder, req) + assert.Equal(t, http.StatusServiceUnavailable, recorder.Code) + + var response map[string]any + err = json.Unmarshal(recorder.Body.Bytes(), &response) + require.NoError(t, err) + + assert.Contains(t, recorder.Body.String(), "wallet backend is not in sync with the RPC") + assert.Equal(t, float64(1000), response["extras"].(map[string]any)["rpc_latest_ledger"]) + assert.Equal(t, float64(900), response["extras"].(map[string]any)["backend_latest_ledger"]) + + _, cleanupErr := dbConnectionPool.ExecContext(ctx, "DELETE FROM ingest_store WHERE key = 'live_ingest_cursor'") + require.NoError(t, cleanupErr) + }) +} diff --git a/internal/serve/serve.go b/internal/serve/serve.go index 3421dd1e..671a8f37 100644 --- a/internal/serve/serve.go +++ b/internal/serve/serve.go @@ -12,7 +12,6 @@ import ( "github.com/sirupsen/logrus" supporthttp "github.com/stellar/go/support/http" "github.com/stellar/go/support/log" - "github.com/stellar/go/support/render/health" "github.com/stellar/go/xdr" "github.com/stellar/wallet-backend/internal/apptracker" @@ -72,6 +71,7 @@ type handlerDeps struct { PaymentService services.PaymentService MetricsService metrics.MetricsService TransactionService txservices.TransactionService + RPCService services.RPCService // Error Tracker AppTracker apptracker.AppTracker @@ -193,6 +193,7 @@ func initHandlerDeps(ctx context.Context, cfg Configs) (handlerDeps, error) { AccountSponsorshipService: accountSponsorshipService, PaymentService: paymentService, MetricsService: metricsService, + RPCService: rpcService, AppTracker: cfg.AppTracker, NetworkPassphrase: cfg.NetworkPassphrase, TransactionService: txService, @@ -218,7 +219,11 @@ func handler(deps handlerDeps) http.Handler { mux.Use(middleware.MetricsMiddleware(deps.MetricsService)) mux.Use(middleware.RecoverHandler(deps.AppTracker)) - mux.Get("/health", health.PassHandler{}.ServeHTTP) + mux.Get("/health", httphandler.HealthHandler{ + Models: deps.Models, + RPCService: deps.RPCService, + AppTracker: deps.AppTracker, + }.GetHealth) mux.Get("/api-metrics", promhttp.HandlerFor(deps.MetricsService.GetRegistry(), promhttp.HandlerOpts{}).ServeHTTP) // Authenticated routes