diff --git a/chainweb.cabal b/chainweb.cabal index 6c1101f2cb..87df577aa9 100644 --- a/chainweb.cabal +++ b/chainweb.cabal @@ -256,6 +256,8 @@ library , Chainweb.Utils.RequestLog , Chainweb.Utils.Rule , Chainweb.Utils.Serialization + , Chainweb.Utils.Throttle + , Chainweb.Utils.TokenLimiting , Chainweb.VerifierPlugin , Chainweb.VerifierPlugin.Allow , Chainweb.VerifierPlugin.Hyperlane.Announcement @@ -380,6 +382,7 @@ library , base64-bytestring-kadena == 0.1 , binary >= 0.8 , bytestring >= 0.10.12 + , cache >= 0.1.1.2 , case-insensitive >= 1.2 , cassava >= 0.5.1 , chainweb-storage >= 0.1 @@ -455,7 +458,7 @@ library , time >= 1.12.2 , tls >=2.1.4 , tls-session-manager >= 0.0 - , token-bucket >= 0.1 + , token-limiter >= 0.1 , transformers >= 0.5 , trifecta >= 2.1 , unliftio >= 0.2 @@ -467,7 +470,6 @@ library , wai-app-static >= 3.1.6.3 , wai-cors >= 0.2.7 , wai-extra >= 3.0.28 - , wai-middleware-throttle >= 0.3 , wai-middleware-validation , warp >= 3.3.6 , warp-tls >= 3.4 @@ -665,6 +667,7 @@ test-suite chainweb-tests Chainweb.Test.SPV Chainweb.Test.SPV.EventProof Chainweb.Test.Sync.WebBlockHeaderStore + Chainweb.Test.Throttle Chainweb.Test.TreeDB Chainweb.Test.TreeDB.RemoteDB Chainweb.Test.Version diff --git a/src/Chainweb/Chainweb.hs b/src/Chainweb/Chainweb.hs index a1ae6439f4..ecb39ab6f3 100644 --- a/src/Chainweb/Chainweb.hs +++ b/src/Chainweb/Chainweb.hs @@ -61,9 +61,6 @@ module Chainweb.Chainweb , chainwebPeer , chainwebPayloadDb , chainwebPactData -, chainwebThrottler -, chainwebPutPeerThrottler -, chainwebMempoolThrottler , chainwebConfig , chainwebServiceSocket , chainwebBackup @@ -79,15 +76,7 @@ module Chainweb.Chainweb , runChainweb -- * Throttler -, mkGenericThrottler -, mkPutPeerThrottler , checkPathPrefix -, mkThrottler - -, ThrottlingConfig(..) -, throttlingRate -, throttlingPeerRate -, defaultThrottlingConfig -- * Cut Config , CutConfig(..) @@ -128,11 +117,9 @@ import Network.Wai import Network.Wai.Handler.Warp hiding (Port) import Network.Wai.Handler.WarpTLS (WarpTLSException(..)) import Network.Wai.Middleware.RequestSizeLimit -import Network.Wai.Middleware.Throttle import Prelude hiding (log) -import System.Clock import System.LogLevel -- internal modules @@ -184,6 +171,7 @@ import P2P.Peer import qualified Pact.Types.ChainMeta as P import qualified Pact.Types.Command as P +import qualified Chainweb.Utils.Throttle as Throttle -- -------------------------------------------------------------------------- -- -- Chainweb Resources @@ -199,9 +187,6 @@ data Chainweb logger tbl = Chainweb , _chainwebPayloadDb :: !(PayloadDb tbl) , _chainwebManager :: !HTTP.Manager , _chainwebPactData :: ![(ChainId, PactServerData logger tbl)] - , _chainwebThrottler :: !(Throttle Address) - , _chainwebPutPeerThrottler :: !(Throttle Address) - , _chainwebMempoolThrottler :: !(Throttle Address) , _chainwebConfig :: !ChainwebConfiguration , _chainwebServiceSocket :: !(Port, Socket) , _chainwebBackup :: !(BackupEnv logger) @@ -488,13 +473,6 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re let !mLogger = setComponent "miner" logger !mConf = _configMining conf !mCutDb = _cutResCutDb cuts - !throt = _configThrottling conf - - -- initialize throttler - throttler <- mkGenericThrottler $ _throttlingRate throt - putPeerThrottler <- mkPutPeerThrottler $ _throttlingPeerRate throt - mempoolThrottler <- mkMempoolThrottler $ _throttlingMempoolRate throt - logg Debug "initialized throttlers" -- synchronize pact dbs with latest cut before we start the server -- and clients and begin mining. @@ -588,9 +566,6 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re , _chainwebPayloadDb = view cutDbPayloadDb $ _cutResCutDb cuts , _chainwebManager = mgr , _chainwebPactData = pactData - , _chainwebThrottler = throttler - , _chainwebPutPeerThrottler = putPeerThrottler - , _chainwebMempoolThrottler = mempoolThrottler , _chainwebConfig = conf , _chainwebServiceSocket = serviceSock , _chainwebBackup = BackupEnv @@ -650,17 +625,6 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re -- -------------------------------------------------------------------------- -- -- Throttling -mkGenericThrottler :: Double -> IO (Throttle Address) -mkGenericThrottler rate = mkThrottler 30 rate (const True) - -mkPutPeerThrottler :: Double -> IO (Throttle Address) -mkPutPeerThrottler rate = mkThrottler 30 rate $ \r -> - elem "peer" (pathInfo r) && requestMethod r == "PUT" - -mkMempoolThrottler :: Double -> IO (Throttle Address) -mkMempoolThrottler rate = mkThrottler 30 rate $ \r -> - elem "mempool" (pathInfo r) - checkPathPrefix :: [T.Text] -- ^ the base rate granted to users of the endpoing @@ -668,23 +632,6 @@ checkPathPrefix -> Bool checkPathPrefix endpoint r = endpoint `isPrefixOf` drop 3 (pathInfo r) --- | The period is 1 second. Burst is 2*rate. --- -mkThrottler - :: Double - -- ^ expiration of a stall bucket in seconds - -> Double - -- ^ the base rate granted to users of the endpoint (requests per second) - -> (Request -> Bool) - -- ^ Predicate to select requests that are throttled - -> IO (Throttle Address) -mkThrottler e rate c = initThrottler (defaultThrottleSettings $ TimeSpec (ceiling e) 0) -- expiration - { throttleSettingsRate = rate -- number of allowed requests per period - , throttleSettingsPeriod = 1_000_000 -- 1 second - , throttleSettingsBurst = 4 * ceiling rate - , throttleSettingsIsThrottled = c - } - -- -------------------------------------------------------------------------- -- -- Run Chainweb @@ -720,28 +667,40 @@ runChainweb cw nowServing = do logg Warn $ "OpenAPI spec validation enabled on service API, make sure this is what you want" mkValidationMiddleware else return id - - concurrentlies_ - - -- 1. Start serving Rest API - [ (if tls then serve else servePlain) - $ httpLog - . throttle (_chainwebPutPeerThrottler cw) - . throttle (_chainwebMempoolThrottler cw) - . throttle (_chainwebThrottler cw) - . p2pRequestSizeLimit - . p2pValidationMiddleware - - -- 2. Start Clients (with a delay of 500ms) - , threadDelay 500000 >> clients - - -- 3. Start serving local API - , threadDelay 500000 >> do - serveServiceApi - $ serviceHttpLog - . serviceRequestSizeLimit - . serviceApiValidationMiddleware - ] + let theP2pThrottleConfig = cw ^. chainwebConfig . configP2p . p2pConfigThrottleConfig + let theServiceApiThrottleConfig = cw ^. chainwebConfig . configServiceApi . serviceApiConfigThrottleConfig + let withP2pThrottleMiddleware = + if _enableConfigEnabled theP2pThrottleConfig + then Throttle.throttleMiddleware (logFunction $ _chainwebLogger cw) "p2p" (_enableConfigConfig theP2pThrottleConfig) + else \k -> k id + let withServiceApiThrottleMiddleware = + if _enableConfigEnabled theServiceApiThrottleConfig + then Throttle.throttleMiddleware (logFunction $ _chainwebLogger cw) "p2p" (_enableConfigConfig theServiceApiThrottleConfig) + else \k -> k id + + withP2pThrottleMiddleware $ \p2pThrottler -> + withServiceApiThrottleMiddleware $ \serviceThrottler -> + + concurrentlies_ + + -- 1. Start serving Rest API + [ (if tls then serve else servePlain) + $ httpLog + . p2pRequestSizeLimit + . p2pThrottler + . p2pValidationMiddleware + + -- 2. Start Clients (with a delay of 500ms) + , threadDelay 500000 >> clients + + -- 3. Start serving local API + , threadDelay 500000 >> do + serveServiceApi + $ serviceHttpLog + . serviceRequestSizeLimit + . serviceThrottler + . serviceApiValidationMiddleware + ] where @@ -805,12 +764,16 @@ runChainweb cw nowServing = do when (defaultShouldDisplayException e) $ logg Debug $ loggServerError msg r e + onExceptionResponse ex = + fromMaybe (defaultOnExceptionResponse ex) (Throttle.throttledResponse ex) + -- P2P Server serverSettings :: Counter "clientClosedConnections" -> Settings serverSettings clientClosedConnectionsCounter = peerServerSettings (_peerResPeer $ _chainwebPeer cw) & setOnException (logWarpException "P2P API" clientClosedConnectionsCounter) + & setOnExceptionResponse onExceptionResponse & setBeforeMainLoop (nowServing (nowServingP2PAPI .~ True)) monitorConnectionsClosedByClient :: Counter "clientClosedConnections" -> IO () @@ -893,6 +856,7 @@ runChainweb cw nowServing = do & setHost interface & setOnException (logWarpException "Service API" clientClosedConnectionsCounter) + & setOnExceptionResponse onExceptionResponse & setBeforeMainLoop (nowServing (nowServingServiceAPI .~ True)) & setServerName "Chainweb Service API" diff --git a/src/Chainweb/Chainweb/Configuration.hs b/src/Chainweb/Chainweb/Configuration.hs index 0f27adee78..7ad8c17771 100644 --- a/src/Chainweb/Chainweb/Configuration.hs +++ b/src/Chainweb/Chainweb/Configuration.hs @@ -20,15 +20,8 @@ -- module Chainweb.Chainweb.Configuration ( --- * Throttling Configuration - ThrottlingConfig(..) -, throttlingRate -, throttlingPeerRate -, throttlingMempoolRate -, defaultThrottlingConfig - -- * Cut Configuration -, ChainDatabaseGcConfig(..) + ChainDatabaseGcConfig(..) , chainDatabaseGcToText , chainDatabaseGcFromText @@ -44,6 +37,7 @@ module Chainweb.Chainweb.Configuration , ServiceApiConfig(..) , serviceApiConfigPort , serviceApiConfigInterface +, serviceApiConfigThrottleConfig , defaultServiceApiConfig , pServiceApiConfig @@ -63,7 +57,6 @@ module Chainweb.Chainweb.Configuration , configP2p , configBlockGasLimit , configMinGasPrice -, configThrottling , configReorgLimit , configFullHistoricPactState , configBackup @@ -124,42 +117,7 @@ import Chainweb.Time import P2P.Node.Configuration import Chainweb.Pact.Backend.DbCache (DbCacheLimitBytes) - --- -------------------------------------------------------------------------- -- --- Throttling Configuration - -data ThrottlingConfig = ThrottlingConfig - { _throttlingRate :: !Double - , _throttlingPeerRate :: !Double - -- ^ This should throttle aggressively. This endpoint does an expensive - -- check of the client. And we want to keep bad actors out of the - -- system. There should be no need for a client to call this endpoint on - -- the same node more often than at most few times peer minute. - , _throttlingMempoolRate :: !Double - } - deriving stock (Eq, Show) - -makeLenses ''ThrottlingConfig - -defaultThrottlingConfig :: ThrottlingConfig -defaultThrottlingConfig = ThrottlingConfig - { _throttlingRate = 50 -- per second, in a 100 burst - , _throttlingPeerRate = 11 -- per second, 1 for each p2p network - , _throttlingMempoolRate = 20 -- one every seconds per mempool. - } - -instance ToJSON ThrottlingConfig where - toJSON o = object - [ "global" .= _throttlingRate o - , "putPeer" .= _throttlingPeerRate o - , "mempool" .= _throttlingMempoolRate o - ] - -instance FromJSON (ThrottlingConfig -> ThrottlingConfig) where - parseJSON = withObject "ThrottlingConfig" $ \o -> id - <$< throttlingRate ..: "global" % o - <*< throttlingPeerRate ..: "putPeer" % o - <*< throttlingMempoolRate ..: "mempool" % o +import qualified Chainweb.Utils.Throttle as Throttle -- -------------------------------------------------------------------------- -- -- Cut Configuration @@ -276,6 +234,7 @@ data ServiceApiConfig = ServiceApiConfig , _serviceApiPayloadBatchLimit :: PayloadBatchLimit -- ^ maximum size for payload batches on the service API. Default is -- 'Chainweb.Payload.RestAPI.defaultServicePayloadBatchLimit'. + , _serviceApiConfigThrottleConfig :: !(EnableConfig Throttle.ThrottleConfig) } deriving (Show, Eq, Generic) @@ -287,6 +246,15 @@ defaultServiceApiConfig = ServiceApiConfig , _serviceApiConfigInterface = "*" , _serviceApiConfigValidateSpec = False , _serviceApiPayloadBatchLimit = defaultServicePayloadBatchLimit + , _serviceApiConfigThrottleConfig = defaultEnableConfig + Throttle.ThrottleConfig + { Throttle._requestCost = 10 + , Throttle._requestBody100ByteCost = 1 + , Throttle._responseBody100ByteCost = 2 + , Throttle._maxBudget = 50_000 + , Throttle._tokenBucketRefillPerSecond = 1_000 + , Throttle._throttleExpiry = 30 + } } instance ToJSON ServiceApiConfig where @@ -295,6 +263,7 @@ instance ToJSON ServiceApiConfig where , "interface" .= hostPreferenceToText (_serviceApiConfigInterface o) , "validateSpec" .= _serviceApiConfigValidateSpec o , "payloadBatchLimit" .= _serviceApiPayloadBatchLimit o + , "throttling" .= _serviceApiConfigThrottleConfig o ] instance FromJSON (ServiceApiConfig -> ServiceApiConfig) where @@ -303,6 +272,7 @@ instance FromJSON (ServiceApiConfig -> ServiceApiConfig) where <*< setProperty serviceApiConfigInterface "interface" (parseJsonFromText "interface") o <*< serviceApiConfigValidateSpec ..: "validateSpec" % o <*< serviceApiPayloadBatchLimit ..: "payloadBatchLimit" % o + <*< serviceApiConfigThrottleConfig %.: "throttling" % o pServiceApiConfig :: MParser ServiceApiConfig pServiceApiConfig = id @@ -317,6 +287,9 @@ pServiceApiConfig = id <*< serviceApiConfigValidateSpec .:: enableDisableFlag % prefixLong service "validate-spec" <> internal -- hidden option, for expert use + <*< serviceApiConfigThrottleConfig . enableConfigEnabled .:: enableDisableFlag + % prefixLong service "throttling" + <> suffixHelp service "enable HTTP throttling" where service = Just "service" @@ -385,7 +358,6 @@ data ChainwebConfiguration = ChainwebConfiguration , _configHeaderStream :: !Bool , _configReintroTxs :: !Bool , _configP2p :: !P2pConfiguration - , _configThrottling :: !ThrottlingConfig , _configMempoolP2p :: !(EnableConfig MempoolP2pConfig) , _configBlockGasLimit :: !Mempool.GasLimit , _configLogGas :: !Bool @@ -450,7 +422,6 @@ defaultChainwebConfiguration v = ChainwebConfiguration , _configHeaderStream = False , _configReintroTxs = True , _configP2p = defaultP2pConfiguration - , _configThrottling = defaultThrottlingConfig , _configMempoolP2p = defaultEnableConfig defaultMempoolP2pConfig , _configBlockGasLimit = 150_000 , _configLogGas = False @@ -477,7 +448,6 @@ instance ToJSON ChainwebConfiguration where , "headerStream" .= _configHeaderStream o , "reintroTxs" .= _configReintroTxs o , "p2p" .= _configP2p o - , "throttling" .= _configThrottling o , "mempoolP2p" .= _configMempoolP2p o , "gasLimitOfBlock" .= J.toJsonViaEncode (_configBlockGasLimit o) , "logGas" .= _configLogGas o @@ -508,7 +478,6 @@ instance FromJSON (ChainwebConfiguration -> ChainwebConfiguration) where <*< configHeaderStream ..: "headerStream" % o <*< configReintroTxs ..: "reintroTxs" % o <*< configP2p %.: "p2p" % o - <*< configThrottling %.: "throttling" % o <*< configMempoolP2p %.: "mempoolP2p" % o <*< configBlockGasLimit ..: "gasLimitOfBlock" % o <*< configLogGas ..: "logGas" % o diff --git a/src/Chainweb/Utils.hs b/src/Chainweb/Utils.hs index 1515b9ccd5..366744c050 100644 --- a/src/Chainweb/Utils.hs +++ b/src/Chainweb/Utils.hs @@ -25,6 +25,7 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} {-# OPTIONS_GHC -fno-warn-deprecations #-} +{-# LANGUAGE RecordWildCards #-} -- | -- Module: Chainweb.Utils @@ -245,7 +246,7 @@ import Configuration.Utils hiding (Error, Lens) import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.MVar -import Control.Concurrent.TokenBucket +import Control.Concurrent.TokenLimiter import Control.DeepSeq import Control.Exception (SomeAsyncException(..), evaluate) import Control.Lens hiding ((.=)) @@ -1058,9 +1059,13 @@ runForeverThrottled -> IO () -> IO () runForeverThrottled logfun name burst rate a = mask $ \umask -> do - tokenBucket <- newTokenBucket + let config = defaultLimitConfig + { maxBucketTokens = fromIntegral burst + , bucketRefillTokensPerSecond = fromIntegral rate + } + tokenBucket <- newRateLimiter config logfun Debug $ "start " <> name - let runThrottled = tokenBucketWait tokenBucket burst rate >> a + let runThrottled = waitDebit config tokenBucket 1 >> a go = do forever (umask runThrottled) `catchAllSynchronous` \e -> logfun Error $ name <> " failed: " <> sshow e <> ". Restarting ..." @@ -1115,7 +1120,12 @@ instance FromJSON (a -> a) => FromJSON (EnableConfig a -> EnableConfig a) where parseJSON = withObject "EnableConfig" $ \o -> id <$< enableConfigEnabled ..: "enabled" % o <*< enableConfigConfig %.: "configuration" % o - {-# INLINE parseJSON #-} + +instance FromJSON a => FromJSON (EnableConfig a) where + parseJSON = withObject "EnableConfig" $ \o -> do + _enableConfigEnabled <- o .: "enabled" + _enableConfigConfig <- o .: "configuration" + return EnableConfig {..} validateEnableConfig :: ConfigValidation a l -> ConfigValidation (EnableConfig a) l validateEnableConfig v c = when (_enableConfigEnabled c) $ v (_enableConfigConfig c) diff --git a/src/Chainweb/Utils/Throttle.hs b/src/Chainweb/Utils/Throttle.hs new file mode 100644 index 0000000000..958a7fd2f1 --- /dev/null +++ b/src/Chainweb/Utils/Throttle.hs @@ -0,0 +1,219 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE ViewPatterns #-} + +module Chainweb.Utils.Throttle + ( ThrottleConfig(..) + , ThrottledException(..) + , throttleMiddleware + , throttledResponse + ) where + +import Configuration.Utils +import Control.Exception.Safe +import Control.Lens hiding ((.=)) +import Control.Monad +import Data.Int +import Data.LogMessage +import Data.ByteString qualified as BS +import Data.ByteString.Builder qualified as BSB +import Data.ByteString.Lazy qualified as LBS +import Data.Hashable +import Data.Text (Text) +import Data.Text qualified as T +import GHC.Generics (Generic) +import Network.HTTP.Types +import Network.Socket (SockAddr(..)) +import Network.Wai qualified as Wai +import Network.Wai.Internal qualified as Wai.Internal +import Numeric.Natural (Natural) +import qualified System.Clock as Clock +import System.IO.Unsafe (unsafeInterleaveIO) + +import Chainweb.Utils.TokenLimiting +import Chainweb.Utils (int) +import Chainweb.Time + +data ThrottleConfig = ThrottleConfig + { _requestCost :: Int + , _requestBody100ByteCost :: Int + , _responseBody100ByteCost :: Int + , _maxBudget :: Int + -- TODO: charge for time, per second + , _tokenBucketRefillPerSecond :: Int + , _throttleExpiry :: Seconds + } deriving stock (Show, Eq) + +makeLenses ''ThrottleConfig + +instance ToJSON ThrottleConfig where + toJSON o = object + [ "requestCost" .= _requestCost o + , "requestBody100ByteCost" .= _requestBody100ByteCost o + , "responseBody100ByteCost" .= _responseBody100ByteCost o + , "maxBudget" .= _maxBudget o + , "tokenBucketRefillPerSecond" .= _tokenBucketRefillPerSecond o + , "throttleExpiry" .= int @Seconds @Int (_throttleExpiry o) + ] + +instance FromJSON (ThrottleConfig -> ThrottleConfig) where + parseJSON = withObject "ThrottleConfig" $ \o -> id + <$< requestCost ..: "requestCost" % o + <*< requestBody100ByteCost ..: "requestBody100ByteCost" % o + <*< responseBody100ByteCost ..: "responseBody100ByteCost" % o + <*< maxBudget ..: "maxBudget" % o + <*< tokenBucketRefillPerSecond ..: "tokenBucketRefillPerSecond" % o + <*< throttleExpiry . (iso (int @Seconds @Int) (int @Int @Seconds)) ..: "throttleExpiry" % o + +instance FromJSON ThrottleConfig where + parseJSON = withObject "ThrottleConfig" $ \o -> do + _requestCost <- o .: "requestCost" + _requestBody100ByteCost <- o .: "requestBody100ByteCost" + _responseBody100ByteCost <- o .: "responseBody100ByteCost" + _maxBudget <- o .: "maxBudget" + _tokenBucketRefillPerSecond <- o .: "tokenBucketRefillPerSecond" + _throttleExpiry <- int @Natural @Seconds <$> o .: "throttleExpiry" + return ThrottleConfig {..} + +-- TODO: make an uncaught ThrottledException translate to a 429 in warp when possible +newtype ThrottledException = ThrottledException Text + deriving stock (Show, Eq, Generic) + deriving anyclass (Exception) + +hashWithSalt' :: Hashable a => a -> Int -> Int +hashWithSalt' = flip hashWithSalt + +newtype HashableSockAddr = HashableSockAddr SockAddr + deriving newtype (Show) + +instance Eq HashableSockAddr where + HashableSockAddr sockAddr1 == HashableSockAddr sockAddr2 = case (sockAddr1, sockAddr2) of + (SockAddrInet _port1 hostAddr1, SockAddrInet _port2 hostAddr2) -> + -- constructor port not used deliberately, requests can come from different ports + hostAddr1 == hostAddr2 + (SockAddrInet6 _port1 flowInfo1 hostAddr1 scopeId1, SockAddrInet6 _port2 flowInfo2 hostAddr2 scopeId2) -> + flowInfo1 == flowInfo2 && hostAddr1 == hostAddr2 && scopeId1 == scopeId2 + (SockAddrUnix sock1, SockAddrUnix sock2) -> + sock1 == sock2 + _ -> False + +instance Hashable HashableSockAddr where + hashWithSalt salt (HashableSockAddr sockAddr) = case sockAddr of + SockAddrInet _port hostAddr -> + -- constructor tag + hashWithSalt' (1 :: Word) + -- port not used deliberately, requests can come from different ports + . hashWithSalt' hostAddr + $ salt + SockAddrInet6 _port flowInfo hostAddr scopeId -> + hashWithSalt' (2 :: Word) + . hashWithSalt' flowInfo + . hashWithSalt' hostAddr + . hashWithSalt' scopeId + $ salt + SockAddrUnix sock -> + hashWithSalt' (3 :: Word) + . hashWithSalt' sock + $ salt + +debitOrDie :: (Hashable k) => TokenLimitMap k -> (Text, k) -> Int -> IO () +debitOrDie tokenLimitMap (name, k) cost = do + tryDebit cost k tokenLimitMap >>= \case + True -> return () + False -> throwIO (ThrottledException name) + +-- meant to be used with Warp's setOnExceptionResponse to translate thrown exceptions into responses +throttledResponse :: SomeException -> Maybe Wai.Response +throttledResponse (fromException -> Just (ThrottledException _)) = + Just (Wai.responseLBS status429 [] "host throttled") +throttledResponse _ = Nothing + +throttleMiddleware :: LogFunction -> Text -> ThrottleConfig -> (Wai.Middleware -> IO r) -> IO r +throttleMiddleware logfun name ThrottleConfig{..} k = + withTokenLimitMap logfun ("request-throttler-" <> name) limitCachePolicy limitConfig $ \tokenLimitMap -> do + k $ middleware tokenLimitMap + where + middleware tokenLimitMap app request respond = do + debitOrDie' _requestCost + meteredRequest <- meterRequest debitOrDie' request + -- if response chunks are being sent back, it's too late to decide to return a 429. + -- warp probably doesn't know what to do if the response streaming itself throws an error. + -- so we use penalize instead, which can bring the token bucket negative and won't throw an error. + app meteredRequest (meterResponse penalize' respond) + where + host = HashableSockAddr $ Wai.remoteHost request + hostText = T.pack $ show (Wai.remoteHost request) + debitOrDie' c = do + debitOrDie tokenLimitMap (hostText, host) c + penalize' tks = void $ penalize tks host tokenLimitMap + + limitCachePolicy = TokenLimitCachePolicy (Clock.TimeSpec { sec = int @Seconds @Int64 _throttleExpiry, nsec = 0 }) + limitConfig = defaultLimitConfig + { maxBucketTokens = _maxBudget + , initialBucketTokens = _maxBudget + , bucketRefillTokensPerSecond = _tokenBucketRefillPerSecond + } + + meterRequest debit request + | _requestBody100ByteCost == 0 = return request + | otherwise = case Wai.requestBodyLength request of + Wai.KnownLength requestBodyLen -> do + () <- debit $ (_requestBody100ByteCost * fromIntegral (max requestBodyLen 100)) `div` 100 + return request + Wai.ChunkedBody -> + return (Wai.setRequestBodyChunks (getMeteredRequestBodyChunk debit request) request) + + getMeteredRequestBodyChunk debit request = do + chunk <- Wai.getRequestBodyChunk request + -- charge *after* receiving a request body chunk + () <- debit $ (_requestBody100ByteCost * max (BS.length chunk) 100) `div` 100 + return chunk + + -- the only way to match on responses without using internal API is via + -- responseToStream, which converts any response into a streaming response. + -- unfortunately: + -- * all of the responses produced by servant are builder responses, + -- not streaming responses + -- * streaming responses are not supported by http2; we try to use http2 + -- (see https://hackage.haskell.org/package/http2-5.3.5/docs/src/Network.HTTP2.Server.Run.html#runIO) + -- * a streaming response body may be less efficient than a builder + -- response body, in particular because it needs to use a chunked + -- encoding + -- + meterResponse + :: (Int -> IO ()) + -> (Wai.Response -> IO a) -> Wai.Response -> IO a + meterResponse _ respond response + | _responseBody100ByteCost == 0 = respond response + meterResponse debit respond (Wai.Internal.ResponseStream status headers responseBody) = do + respond + $ Wai.responseStream status headers + $ meterStreamingResponseBody debit responseBody + meterResponse debit respond (Wai.Internal.ResponseBuilder status headers responseBody) = do + respond + <$> Wai.responseLBS status headers . LBS.fromChunks + =<< meterBuilderResponseBody debit (LBS.toChunks $ BSB.toLazyByteString responseBody) + meterResponse _ _ _ = error "unrecognized response type" + + meterStreamingResponseBody debit responseBody send flush = responseBody + (\chunkBSBuilder -> do + let chunkBS = BS.toStrict (BSB.toLazyByteString chunkBSBuilder) + () <- debit $ (_responseBody100ByteCost * max (BS.length chunkBS) 100) `div` 100 + -- charger *before* sending a response body chunk + send (BSB.byteString chunkBS) + ) + flush + meterBuilderResponseBody debit (chunk:chunks) = unsafeInterleaveIO $ do + () <- debit $ (_responseBody100ByteCost * max (BS.length chunk) 100) `div` 100 + (chunk:) <$> meterBuilderResponseBody debit chunks + meterBuilderResponseBody _ [] = return [] diff --git a/src/Chainweb/Utils/TokenLimiting.hs b/src/Chainweb/Utils/TokenLimiting.hs new file mode 100644 index 0000000000..f2e4c3d709 --- /dev/null +++ b/src/Chainweb/Utils/TokenLimiting.hs @@ -0,0 +1,103 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} + +-- | A concurrent, expiring map from @k@ to RateLimiter. +module Chainweb.Utils.TokenLimiting +( TokenLimitMap +, TokenLimitCachePolicy(..) +, LimitConfig(..) +, withTokenLimitMap +, defaultLimitConfig +, makeLimitConfig +, getLimiter +, getLimitPolicy +, tryDebit +, waitDebit +, penalize +) where + +import qualified Control.Concurrent.Async as Async +import Control.Concurrent.TokenLimiter (LimitConfig(..), RateLimiter) +import qualified Control.Concurrent.TokenLimiter as TL +import Control.Exception +import Data.Cache (Cache) +import qualified Data.Cache as Cache +import Data.Hashable (Hashable) +import Data.Text (Text) +import GHC.Generics +import System.Clock (TimeSpec) + +import Chainweb.Utils +import Data.LogMessage + +data TokenLimitMap k = TokenLimitMap + { _tlmMap :: !(Cache k RateLimiter) + , _tlmLimitPolicy :: !LimitConfig + -- ^ token bucket rate limiting policy (max num tokens, refill rate, etc) + , _tlmCachePolicy :: !TokenLimitCachePolicy + -- ^ inactivity period before your key's rate limiter is expired from the + -- cache + } deriving (Generic) + +newtype TokenLimitCachePolicy = TokenLimitCachePolicy + { policyExpirationTime :: TimeSpec + } deriving (Generic, Eq, Ord, Num, Show) + +makeLimitConfig :: Int -> Int -> Int -> LimitConfig +makeLimitConfig mx it ref = + defaultLimitConfig + { maxBucketTokens = mx + , initialBucketTokens = it + , bucketRefillTokensPerSecond = ref + } + +withTokenLimitMap + :: (Eq k, Hashable k) + => LogFunctionText + -> Text + -> TokenLimitCachePolicy + -> LimitConfig + -> (TokenLimitMap k -> IO a) + -> IO a +withTokenLimitMap logfun mapName expPolicy@(TokenLimitCachePolicy expTSpec) lcfg act = + mask $ \restore -> do + cache <- restore (Cache.newCache (Just expTSpec)) + Async.withAsync (reaper logfun mapName cache) $ \_ -> do + let m = TokenLimitMap cache lcfg expPolicy + restore $ act m + +reaper + :: (Eq k, Hashable k) + => LogFunctionText + -> Text + -> Cache k v -> IO () +reaper logfun mapName cache = runForever logfun mapName $ do + approximateThreadDelay (2 * 60 * 1000000) -- two minute cycle time + Cache.purgeExpired cache + +getLimiter :: (Eq k, Hashable k) => k -> TokenLimitMap k -> IO RateLimiter +getLimiter k (TokenLimitMap cache limitPolicy _) = + Cache.fetchWithCache cache k (\_ -> TL.newRateLimiter limitPolicy) + +penalize :: (Eq k, Hashable k) => Int -> k -> TokenLimitMap k -> IO Int +penalize ndebits k tlm = do + rl <- getLimiter k tlm + TL.penalize rl ndebits + +tryDebit :: (Eq k, Hashable k) => Int -> k -> TokenLimitMap k -> IO Bool +tryDebit ndebits k tlm = do + rl <- getLimiter k tlm + TL.tryDebit (_tlmLimitPolicy tlm) rl ndebits + +waitDebit :: (Eq k, Hashable k) => Int -> k -> TokenLimitMap k -> IO () +waitDebit ndebits k tlm = do + rl <- getLimiter k tlm + TL.waitDebit (_tlmLimitPolicy tlm) rl ndebits + +defaultLimitConfig :: LimitConfig +defaultLimitConfig = TL.defaultLimitConfig + +getLimitPolicy :: TokenLimitMap k -> LimitConfig +getLimitPolicy = _tlmLimitPolicy diff --git a/src/P2P/Node/Configuration.hs b/src/P2P/Node/Configuration.hs index 5228065663..1e133305c3 100644 --- a/src/P2P/Node/Configuration.hs +++ b/src/P2P/Node/Configuration.hs @@ -8,6 +8,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE NumericUnderscores #-} -- | -- Module: P2P.Node.Configuration @@ -29,6 +30,7 @@ module P2P.Node.Configuration , p2pConfigKnownPeers , p2pConfigIgnoreBootstrapNodes , p2pConfigBootstrapReachability +, p2pConfigThrottleConfig , defaultP2pConfiguration , validateP2pConfiguration , pP2pConfiguration @@ -50,6 +52,7 @@ import Numeric.Natural import Chainweb.Time import Chainweb.Utils hiding (check) +import qualified Chainweb.Utils.Throttle as Throttle import P2P.Peer @@ -101,6 +104,7 @@ data P2pConfiguration = P2pConfiguration , _p2pConfigValidateSpec :: !Bool -- ^ enable OpenAPI specification validation for requests and responses. -- this will likely cause significant performance degradation. + , _p2pConfigThrottleConfig :: !(EnableConfig Throttle.ThrottleConfig) } deriving (Show, Eq, Generic) @@ -125,6 +129,14 @@ defaultP2pConfiguration = P2pConfiguration , _p2pConfigBootstrapReachability = 0.5 , _p2pConfigTls = True , _p2pConfigValidateSpec = False + , _p2pConfigThrottleConfig = defaultEnableConfig Throttle.ThrottleConfig + { Throttle._requestCost = 10 + , Throttle._requestBody100ByteCost = 1 + , Throttle._responseBody100ByteCost = 2 + , Throttle._maxBudget = 35_000 + , Throttle._tokenBucketRefillPerSecond = 750 + , Throttle._throttleExpiry = 30 + } } validateP2pConfiguration :: Applicative a => ConfigValidation P2pConfiguration a @@ -169,6 +181,7 @@ instance ToJSON P2pConfiguration where , "ignoreBootstrapNodes" .= _p2pConfigIgnoreBootstrapNodes o , "private" .= _p2pConfigPrivate o , "bootstrapReachability" .= _p2pConfigBootstrapReachability o + , "throttling" .= _p2pConfigThrottleConfig o ] -- hidden: Do not print the default value. <> [ "tls" .= _p2pConfigTls o | not (_p2pConfigTls o) ] @@ -186,6 +199,7 @@ instance FromJSON (P2pConfiguration -> P2pConfiguration) where <*< p2pConfigBootstrapReachability ..: "bootstrapReachability" % o <*< p2pConfigTls ..: "tls" % o <*< p2pConfigValidateSpec ..: "validateSpec" % o + <*< p2pConfigThrottleConfig %.: "throttling" % o instance FromJSON P2pConfiguration where parseJSON = withObject "P2pExampleConfig" $ \o -> P2pConfiguration @@ -199,6 +213,7 @@ instance FromJSON P2pConfiguration where <*> o .: "bootstrapReachability" <*> o .:? "tls" .!= True <*> o .:? "validateSpec" .!= False + <*> o .: "throttling" pP2pConfiguration :: MParser P2pConfiguration pP2pConfiguration = id @@ -230,6 +245,9 @@ pP2pConfiguration = id <*< p2pConfigValidateSpec .:: enableDisableFlag % prefixLong net "validate-spec" <> internal -- hidden option, only for expert use + <*< p2pConfigThrottleConfig . enableConfigEnabled .:: enableDisableFlag + % prefixLong net "throttling" + <> suffixHelp net "enable HTTP throttling" where net = Nothing diff --git a/test/lib/Chainweb/Test/MultiNode.hs b/test/lib/Chainweb/Test/MultiNode.hs index d825d3f974..eb45f0eff8 100644 --- a/test/lib/Chainweb/Test/MultiNode.hs +++ b/test/lib/Chainweb/Test/MultiNode.hs @@ -176,9 +176,6 @@ multiConfig v n = defaultChainwebConfiguration v & set configReintroTxs True -- enable transaction re-introduction - & set configThrottling throttling - -- throttling is effectively disabled to not slow down the test nodes - & set (configServiceApi . serviceApiConfigPort) 0 & set (configServiceApi . serviceApiConfigInterface) interface & set (configCuts . cutFetchTimeout) 10_000_000 @@ -189,11 +186,6 @@ multiConfig v n = defaultChainwebConfiguration v , _nodeTestMiners = MinerCount n } - throttling = defaultThrottlingConfig - { _throttlingRate = 10_000 -- per second - , _throttlingPeerRate = 10_000 -- per second, one for each p2p network - } - -- | Configure a bootstrap node -- multiBootstrapConfig diff --git a/test/lib/Chainweb/Test/Orphans/Internal.hs b/test/lib/Chainweb/Test/Orphans/Internal.hs index e223ee6f03..4180fe07fc 100644 --- a/test/lib/Chainweb/Test/Orphans/Internal.hs +++ b/test/lib/Chainweb/Test/Orphans/Internal.hs @@ -166,6 +166,7 @@ import P2P.Test.Orphans () import System.Logger.Types import Utils.Logging +import Chainweb.Utils.Throttle (ThrottleConfig(..)) -- -------------------------------------------------------------------------- -- -- Utils @@ -230,13 +231,18 @@ instance Arbitrary HashDifficulty where -- -------------------------------------------------------------------------- -- -- P2P +instance Arbitrary ThrottleConfig where + arbitrary = ThrottleConfig + <$> arbitrary <*> arbitrary <*> arbitrary + <*> arbitrary <*> arbitrary <*> (int @Natural @Seconds <$> arbitrary) + instance Arbitrary P2pConfiguration where arbitrary = P2pConfiguration <$> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary - <*> arbitrary + <*> arbitrary <*> arbitrary instance Arbitrary PeerEntry where arbitrary = PeerEntry @@ -799,13 +805,6 @@ instance Arbitrary ChainDatabaseGcConfig where instance Arbitrary a => Arbitrary (EnableConfig a) where arbitrary = EnableConfig <$> arbitrary <*> arbitrary --- | Helper instance for JSON roundtrip tests --- -instance FromJSON (EnableConfig MiningConfig) where - parseJSON v = do - f <- parseJSON v - return $ f $ defaultEnableConfig defaultMining - instance Arbitrary a => Arbitrary (NextItem a) where arbitrary = oneof [ Inclusive <$> arbitrary diff --git a/test/unit/Chainweb/Test/Throttle.hs b/test/unit/Chainweb/Test/Throttle.hs new file mode 100644 index 0000000000..90988aa34d --- /dev/null +++ b/test/unit/Chainweb/Test/Throttle.hs @@ -0,0 +1,104 @@ +{-# language OverloadedStrings #-} +{-# LANGUAGE NumericUnderscores #-} + +module Chainweb.Test.Throttle (tests) where + +import Control.Concurrent.Async (withAsync) +import Control.Concurrent.MVar +import Data.Maybe +import qualified Data.ByteString.Builder as BSB +import qualified Data.ByteString.Char8 as BSC +import qualified Data.ByteString.Lazy as LBS + +import qualified Network.HTTP.Client as Client +import Network.HTTP.Types (status200, status429) +import qualified Network.Wai as Wai +import Network.Wai.Handler.Warp + +import Chainweb.Time +import Chainweb.Utils ((&)) +import Chainweb.Utils.Throttle + +import Test.Tasty +import Test.Tasty.HUnit +import qualified PropertyMatchers as P +import PropertyMatchers ((?)) + +tests :: TestTree +tests = testGroup "Chainweb.Test.Throttle" + [ testCase "request cost" $ runTest + mempty + ThrottleConfig + { _requestCost = 1 + , _requestBody100ByteCost = 0 + , _responseBody100ByteCost = 0 + , _maxBudget = 1 + , _tokenBucketRefillPerSecond = 1 + , _throttleExpiry = Seconds 20 + } + $ \req manager -> do + Client.httpLbs req manager + >>= P.fun Client.responseStatus + ? P.equals status200 + Client.httpLbs req manager + >>= throttled + , testCase "request body size cost" $ runTest + mempty + ThrottleConfig + { _requestCost = 1 + , _requestBody100ByteCost = 1 + , _responseBody100ByteCost = 0 + , _maxBudget = 2 + , _tokenBucketRefillPerSecond = 1 + , _throttleExpiry = Seconds 20 + } + $ \req manager -> do + let req' = req { Client.requestBody = Client.RequestBodyBS $ BSC.replicate 200 'a' } + Client.httpLbs req' manager + >>= throttled + , testCase "response body size penalties do not interrupt the response" $ runTest + (BSB.byteString $ BSC.replicate 100 'a') + ThrottleConfig + { _requestCost = 1 + , _requestBody100ByteCost = 0 + , _responseBody100ByteCost = 1 + , _maxBudget = 2 + , _tokenBucketRefillPerSecond = 1 + , _throttleExpiry = Seconds 20 + } + $ \req manager -> do + Client.httpLbs req manager + >>= P.fun Client.responseBody + ? P.fun LBS.length + ? P.equals 100 + Client.httpLbs req manager + >>= throttled + ] + + +throttled :: HasCallStack => P.Prop (Client.Response LBS.LazyByteString) +throttled = P.checkAll + [ P.fun Client.responseBody ? P.equals "host throttled" + , P.fun Client.responseStatus ? P.equals status429 + ] + +runTest + :: BSB.Builder + -> ThrottleConfig + -> (Client.Request -> Client.Manager -> IO ()) + -> IO () +runTest respBody throttleConfig f = do + throttleMiddleware (\_ _ -> return ()) "test" throttleConfig $ \mw -> do + let app = mw $ \_req resp -> resp $ Wai.responseBuilder status200 [] respBody + (sockPort, sock) <- openFreePort + readyVar <- newEmptyMVar + let settings = defaultSettings + & setBeforeMainLoop (putMVar readyVar ()) + & setOnExceptionResponse (\ex -> fromMaybe (defaultOnExceptionResponse ex) (throttledResponse ex)) + & setOnException (\_ _ -> return ()) + withAsync (runSettingsSocket settings sock app) $ \_ -> do + takeMVar readyVar + manager <- Client.newManager Client.defaultManagerSettings + req <- Client.parseRequest $ "http://127.0.0.1:" <> show sockPort + + f req manager diff --git a/test/unit/ChainwebTests.hs b/test/unit/ChainwebTests.hs index 9017062eeb..568b1e0016 100644 --- a/test/unit/ChainwebTests.hs +++ b/test/unit/ChainwebTests.hs @@ -84,6 +84,7 @@ import qualified Chainweb.Test.Roundtrips (tests) import qualified Chainweb.Test.SPV (tests) import qualified Chainweb.Test.SPV.EventProof (properties) import qualified Chainweb.Test.Sync.WebBlockHeaderStore (properties) +import qualified Chainweb.Test.Throttle (tests) import qualified Chainweb.Test.TreeDB (properties) import qualified Chainweb.Test.TreeDB.RemoteDB import qualified Chainweb.Test.Version (tests) @@ -179,6 +180,7 @@ suite rdb = , Chainweb.Test.Misc.tests , Chainweb.Test.BlockHeader.Genesis.tests , Chainweb.Test.BlockHeader.Validation.tests + , Chainweb.Test.Throttle.tests , Chainweb.Test.Version.tests , testProperties "Chainweb.Test.Chainweb.Utils.Paging" Chainweb.Test.Chainweb.Utils.Paging.properties , testProperties "Chainweb.Test.HostAddress" Chainweb.Test.HostAddress.properties