Skip to content
27 changes: 26 additions & 1 deletion cmd/hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main

import (
"errors"
"fmt"
"time"

"github.com/pokt-network/poktroll/pkg/polylog"
Expand Down Expand Up @@ -45,10 +46,34 @@ func setupEndpointHydrator(
return nil, errors.New("endpoint hydrator enabled but no protocol provided. this should never happen")
}

// Get configured service IDs from the protocol instance.
gatewayServiceIDs := protocolInstance.ConfiguredServiceIDs()

// Filter out any service IDs that are manually disabled by the user.
activeQoSServices := make(map[protocol.ServiceID]gateway.QoSService)
for serviceID, qosService := range qosServices {
activeQoSServices[serviceID] = qosService
}

for _, disabledQoSServiceIDForGateway := range hydratorConfig.QoSDisabledServiceIDs {
// Throw error if any manually disabled service IDs are not found in the protocol's configured service IDs.
if _, found := gatewayServiceIDs[disabledQoSServiceIDForGateway]; !found {
return nil, fmt.Errorf("[INVALID CONFIGURATION] QoS manually disabled for service ID: %s BUT NOT not found in protocol's configured service IDs", disabledQoSServiceIDForGateway)
}
logger.Info().Msgf("Gateway manually disabled QoS for service ID: %s", disabledQoSServiceIDForGateway)
delete(activeQoSServices, disabledQoSServiceIDForGateway)
}

// Check if all QoS services were disabled after filtering
if len(activeQoSServices) == 0 {
logger.Warn().Msg("endpoint hydrator is fully disabled: all QoS services were manually disabled")
return nil, nil
}

endpointHydrator := gateway.EndpointHydrator{
Logger: cmdLogger,
Protocol: protocolInstance,
ActiveQoSServices: qosServices,
ActiveQoSServices: activeQoSServices,
RunInterval: hydratorConfig.RunInterval,
MaxEndpointCheckWorkers: hydratorConfig.MaxEndpointCheckWorkers,
MetricsReporter: metricsReporter,
Expand Down
87 changes: 9 additions & 78 deletions cmd/qos.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package main

import (
"fmt"
"strings"

"github.com/pokt-network/poktroll/pkg/polylog"

"github.com/buildwithgrove/path/config"
"github.com/buildwithgrove/path/gateway"
"github.com/buildwithgrove/path/protocol"
"github.com/buildwithgrove/path/qos/cosmos"
"github.com/buildwithgrove/path/qos/evm"
"github.com/buildwithgrove/path/qos/solana"
)

// getServiceQoSInstances returns all QoS instances to be used by the Gateway and the EndpointHydrator.
Expand All @@ -20,10 +16,6 @@ func getServiceQoSInstances(
gatewayConfig config.GatewayConfig,
protocolInstance gateway.Protocol,
) (map[protocol.ServiceID]gateway.QoSService, error) {
// TODO_TECHDEBT(@adshmh): refactor this function to remove the
// need to manually add entries for every new QoS implementation.
qosServices := make(map[protocol.ServiceID]gateway.QoSService)

// Create a logger for this function's own messages with method-specific context
hydratedLogger := logger.With("module", "qos").With("method", "getServiceQoSInstances").With("protocol", protocolInstance.Name())

Expand All @@ -42,69 +34,18 @@ func getServiceQoSInstances(
gatewayServiceIDs := protocolInstance.ConfiguredServiceIDs()
logGatewayServiceIDs(hydratedLogger, gatewayServiceIDs)

// Remove any service IDs that are manually disabled by the user.
for _, disabledQoSServiceIDForGateway := range gatewayConfig.HydratorConfig.QoSDisabledServiceIDs {
// Throw error if any manually disabled service IDs are not found in the protocol's configured service IDs.
if _, found := gatewayServiceIDs[disabledQoSServiceIDForGateway]; !found {
return nil, fmt.Errorf("[INVALID CONFIGURATION] QoS manually disabled for service ID: %s BUT NOT not found in protocol's configured service IDs", disabledQoSServiceIDForGateway)
}
hydratedLogger.Info().Msgf("Gateway manually disabled QoS for service ID: %s", disabledQoSServiceIDForGateway)
delete(gatewayServiceIDs, disabledQoSServiceIDForGateway)
// TODO_TECHDEBT(@adshmh): Refactor to move the Validate method call to GatewayConfig struct.
//
// Validate the QoS services config.
if err := gatewayConfig.ServicesQoSConfigs.Validate(hydratedLogger, gatewayServiceIDs); err != nil {
return nil, err
}

// Get the service configs for the current protocol
qosServiceConfigs := config.QoSServiceConfigs.GetServiceConfigs(gatewayConfig)
logQoSServiceConfigs(hydratedLogger, qosServiceConfigs)

// Initialize QoS services for all service IDs with a corresponding QoS
// implementation, as defined in the `config/service_qos.go` file.
for _, qosServiceConfig := range qosServiceConfigs {
serviceID := qosServiceConfig.GetServiceID()
// Skip service IDs that are not configured for the PATH instance.
if _, found := gatewayServiceIDs[serviceID]; !found {
hydratedLogger.Warn().Msgf("⚠️ 🔍 Service ID '%s' has QoS configuration defined BUT no owned apps configured! 🚫 This service will fallback to NoOp QoS and likely fail. Configure owned app private keys for this service to enable proper QoS.", serviceID)
continue
}

switch qosServiceConfig.GetServiceQoSType() {
case evm.QoSType:
evmServiceQoSConfig, ok := qosServiceConfig.(evm.EVMServiceQoSConfig)
if !ok {
return nil, fmt.Errorf("SHOULD NEVER HAPPEN: error building QoS instances: service ID %q is not an EVM service", serviceID)
}

evmQoS := evm.NewQoSInstance(qosLogger, evmServiceQoSConfig)
qosServices[serviceID] = evmQoS

hydratedLogger.With("service_id", serviceID).Debug().Msg("Added EVM QoS instance for the service ID.")

case cosmos.QoSType:
cosmosSDKServiceQoSConfig, ok := qosServiceConfig.(cosmos.CosmosSDKServiceQoSConfig)
if !ok {
return nil, fmt.Errorf("SHOULD NEVER HAPPEN: error building QoS instances: service ID %q is not a CosmosSDK service", serviceID)
}

cosmosSDKQoS := cosmos.NewQoSInstance(qosLogger, cosmosSDKServiceQoSConfig)
qosServices[serviceID] = cosmosSDKQoS

hydratedLogger.With("service_id", serviceID).Debug().Msg("Added CosmosSDK QoS instance for the service ID.")
// Log services QoS configs.
gatewayConfig.ServicesQoSConfigs.LogServicesConfigs(hydratedLogger)

case solana.QoSType:
solanaServiceQoSConfig, ok := qosServiceConfig.(solana.SolanaServiceQoSConfig)
if !ok {
return nil, fmt.Errorf("SHOULD NEVER HAPPEN: error building QoS instances: service ID %q is not a Solana service", serviceID)
}

solanaQoS := solana.NewQoSInstance(qosLogger, solanaServiceQoSConfig)
qosServices[serviceID] = solanaQoS

hydratedLogger.With("service_id", serviceID).Debug().Msg("Added Solana QoS instance for the service ID.")
default:
return nil, fmt.Errorf("SHOULD NEVER HAPPEN: error building QoS instances: service ID %q not supported by PATH", serviceID)
}
}

return qosServices, nil
// Use the services QoS configs to build QoS instances.
return gatewayConfig.ServicesQoSConfigs.BuildQoSInstances(qosLogger)
}

// logGatewayServiceIDs outputs the available service IDs for the gateway.
Expand All @@ -116,13 +57,3 @@ func logGatewayServiceIDs(logger polylog.Logger, serviceConfigs map[protocol.Ser
}
logger.Info().Msgf("Service IDs configured by the gateway: %s.", strings.Join(serviceIDs, ", "))
}

// logQoSServiceConfigs outputs the configured service IDs for the gateway.
func logQoSServiceConfigs(logger polylog.Logger, serviceConfigs []config.ServiceQoSConfig) {
// Output service IDs with QoS configurations
serviceIDs := make([]string, 0, len(serviceConfigs))
for _, serviceConfig := range serviceConfigs {
serviceIDs = append(serviceIDs, string(serviceConfig.GetServiceID()))
}
logger.Info().Msgf("Service IDs with available QoS configurations: %s.", strings.Join(serviceIDs, ", "))
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type GatewayConfig struct {
HydratorConfig EndpointHydratorConfig `yaml:"hydrator_config"`
MessagingConfig MessagingConfig `yaml:"messaging_config"`
DataReporterConfig HTTPDataReporterConfig `yaml:"data_reporter_config"`
ServicesQoSConfigs *ServicesQoSConfig `yaml:"services_qos_configs"`
}

// LoadGatewayConfigFromYAML reads a YAML configuration file from the specified path
Expand Down
105 changes: 105 additions & 0 deletions config/config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,108 @@ properties:
description: "Timeout in milliseconds for HTTP POST operations. If zero or negative, a default timeout of 10000ms (10s) is used."
type: integer
default: 10000

# Services QoS Configuration (optional)
services_qos_configs:
description: "Configuration for Quality of Service (QoS) checks for each supported service. Services not listed here will use NoOp QoS with random endpoint selection and no monitoring."
type: object
additionalProperties: false
properties:
services:
description: "Map of service IDs to their QoS configurations."
type: object
patternProperties:
"^[a-zA-Z0-9_-]+$":
description: "Service QoS configuration. Each service must specify exactly one of: evm, cosmos, or solana."
type: object
additionalProperties: false
properties:
evm:
description: "EVM-specific service configuration."
type: object
additionalProperties: false
required:
- chain_id
- sync_allowance
- supported_apis
properties:
chain_id:
description: "Chain ID in hex format (e.g., '0x1' for Ethereum mainnet)."
type: string
pattern: "^0x[0-9a-fA-F]+$"
sync_allowance:
description: "Maximum number of blocks an endpoint can be behind before being considered out of sync."
type: integer
minimum: 1
default: 5
supported_apis:
description: "List of supported RPC types for this service."
type: array
minItems: 1
uniqueItems: true
items:
type: string
enum: ["JSON_RPC", "WEBSOCKET", "REST", "COMET_BFT"]
archival_check:
description: "Optional archival check configuration for verifying node has historical data."
type: object
additionalProperties: false
required:
- contract_address
- contract_start_block
- threshold
properties:
contract_address:
description: "Contract address to check for archival balance (in hex format)."
type: string
pattern: "^(0x)?[0-9a-fA-F]{40}$|^one1[0-9a-z]{38}$"
contract_start_block:
description: "The block number when the contract first had a balance."
type: integer
minimum: 0
threshold:
description: "Minimum balance threshold for archival check."
type: integer
minimum: 1
default: 128
cosmos:
description: "Cosmos SDK-specific service configuration."
type: object
additionalProperties: false
required:
- chain_id
- evm_chain_id
- sync_allowance
- supported_apis
properties:
chain_id:
description: "Cosmos SDK chain ID (e.g., 'cosmoshub-4')."
type: string
minLength: 1
evm_chain_id:
description: "EVM chain ID in hex format for Cosmos chains with native EVM support (e.g., XRPLEVM). Use empty string if not applicable."
type: string
sync_allowance:
description: "Maximum number of blocks an endpoint can be behind before being considered out of sync."
type: integer
minimum: 1
default: 5
supported_apis:
description: "List of supported RPC types for this service."
type: array
minItems: 1
uniqueItems: true
items:
type: string
enum: ["JSON_RPC", "REST", "COMET_BFT", "WEBSOCKET"]
solana:
description: "Solana-specific service configuration."
type: object
additionalProperties: false
required:
- chain_id
properties:
chain_id:
description: "Solana chain ID (e.g., 'solana', 'mainnet-beta')."
type: string
minLength: 1
Loading
Loading