Skip to content

wip: new rate limiter #2146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions chainweb.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ library
, Chainweb.Utils.RequestLog
, Chainweb.Utils.Rule
, Chainweb.Utils.Serialization
, Chainweb.Utils.Throttle
, Chainweb.Utils.TokenLimiting
, Chainweb.VerifierPlugin
, Chainweb.VerifierPlugin.Allow
, Chainweb.VerifierPlugin.Hyperlane.Announcement
Expand Down Expand Up @@ -380,6 +382,7 @@ library
, base64-bytestring-kadena == 0.1
, binary >= 0.8
, bytestring >= 0.10.12
, cache >= 0.1.1.2
, case-insensitive >= 1.2
, cassava >= 0.5.1
, chainweb-storage >= 0.1
Expand Down Expand Up @@ -455,7 +458,7 @@ library
, time >= 1.12.2
, tls >=2.1.4
, tls-session-manager >= 0.0
, token-bucket >= 0.1
, token-limiter >= 0.1
, transformers >= 0.5
, trifecta >= 2.1
, unliftio >= 0.2
Expand All @@ -467,7 +470,6 @@ library
, wai-app-static >= 3.1.6.3
, wai-cors >= 0.2.7
, wai-extra >= 3.0.28
, wai-middleware-throttle >= 0.3
, wai-middleware-validation
, warp >= 3.3.6
, warp-tls >= 3.4
Expand Down Expand Up @@ -665,6 +667,7 @@ test-suite chainweb-tests
Chainweb.Test.SPV
Chainweb.Test.SPV.EventProof
Chainweb.Test.Sync.WebBlockHeaderStore
Chainweb.Test.Throttle
Chainweb.Test.TreeDB
Chainweb.Test.TreeDB.RemoteDB
Chainweb.Test.Version
Expand Down
116 changes: 40 additions & 76 deletions src/Chainweb/Chainweb.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ module Chainweb.Chainweb
, chainwebPeer
, chainwebPayloadDb
, chainwebPactData
, chainwebThrottler
, chainwebPutPeerThrottler
, chainwebMempoolThrottler
, chainwebConfig
, chainwebServiceSocket
, chainwebBackup
Expand All @@ -79,15 +76,7 @@ module Chainweb.Chainweb
, runChainweb

-- * Throttler
, mkGenericThrottler
, mkPutPeerThrottler
, checkPathPrefix
, mkThrottler

, ThrottlingConfig(..)
, throttlingRate
, throttlingPeerRate
, defaultThrottlingConfig

-- * Cut Config
, CutConfig(..)
Expand Down Expand Up @@ -128,11 +117,9 @@ import Network.Wai
import Network.Wai.Handler.Warp hiding (Port)
import Network.Wai.Handler.WarpTLS (WarpTLSException(..))
import Network.Wai.Middleware.RequestSizeLimit
import Network.Wai.Middleware.Throttle

import Prelude hiding (log)

import System.Clock
import System.LogLevel

-- internal modules
Expand Down Expand Up @@ -184,6 +171,7 @@ import P2P.Peer

import qualified Pact.Types.ChainMeta as P
import qualified Pact.Types.Command as P
import qualified Chainweb.Utils.Throttle as Throttle

-- -------------------------------------------------------------------------- --
-- Chainweb Resources
Expand All @@ -199,9 +187,6 @@ data Chainweb logger tbl = Chainweb
, _chainwebPayloadDb :: !(PayloadDb tbl)
, _chainwebManager :: !HTTP.Manager
, _chainwebPactData :: ![(ChainId, PactServerData logger tbl)]
, _chainwebThrottler :: !(Throttle Address)
, _chainwebPutPeerThrottler :: !(Throttle Address)
, _chainwebMempoolThrottler :: !(Throttle Address)
, _chainwebConfig :: !ChainwebConfiguration
, _chainwebServiceSocket :: !(Port, Socket)
, _chainwebBackup :: !(BackupEnv logger)
Expand Down Expand Up @@ -488,13 +473,6 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re
let !mLogger = setComponent "miner" logger
!mConf = _configMining conf
!mCutDb = _cutResCutDb cuts
!throt = _configThrottling conf

-- initialize throttler
throttler <- mkGenericThrottler $ _throttlingRate throt
putPeerThrottler <- mkPutPeerThrottler $ _throttlingPeerRate throt
mempoolThrottler <- mkMempoolThrottler $ _throttlingMempoolRate throt
logg Debug "initialized throttlers"

-- synchronize pact dbs with latest cut before we start the server
-- and clients and begin mining.
Expand Down Expand Up @@ -588,9 +566,6 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re
, _chainwebPayloadDb = view cutDbPayloadDb $ _cutResCutDb cuts
, _chainwebManager = mgr
, _chainwebPactData = pactData
, _chainwebThrottler = throttler
, _chainwebPutPeerThrottler = putPeerThrottler
, _chainwebMempoolThrottler = mempoolThrottler
, _chainwebConfig = conf
, _chainwebServiceSocket = serviceSock
, _chainwebBackup = BackupEnv
Expand Down Expand Up @@ -650,41 +625,13 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re
-- -------------------------------------------------------------------------- --
-- Throttling

mkGenericThrottler :: Double -> IO (Throttle Address)
mkGenericThrottler rate = mkThrottler 30 rate (const True)

mkPutPeerThrottler :: Double -> IO (Throttle Address)
mkPutPeerThrottler rate = mkThrottler 30 rate $ \r ->
elem "peer" (pathInfo r) && requestMethod r == "PUT"

mkMempoolThrottler :: Double -> IO (Throttle Address)
mkMempoolThrottler rate = mkThrottler 30 rate $ \r ->
elem "mempool" (pathInfo r)

checkPathPrefix
:: [T.Text]
-- ^ the base rate granted to users of the endpoing
-> Request
-> Bool
checkPathPrefix endpoint r = endpoint `isPrefixOf` drop 3 (pathInfo r)

-- | The period is 1 second. Burst is 2*rate.
--
mkThrottler
:: Double
-- ^ expiration of a stall bucket in seconds
-> Double
-- ^ the base rate granted to users of the endpoint (requests per second)
-> (Request -> Bool)
-- ^ Predicate to select requests that are throttled
-> IO (Throttle Address)
mkThrottler e rate c = initThrottler (defaultThrottleSettings $ TimeSpec (ceiling e) 0) -- expiration
{ throttleSettingsRate = rate -- number of allowed requests per period
, throttleSettingsPeriod = 1_000_000 -- 1 second
, throttleSettingsBurst = 4 * ceiling rate
, throttleSettingsIsThrottled = c
}

-- -------------------------------------------------------------------------- --
-- Run Chainweb

Expand Down Expand Up @@ -720,28 +667,40 @@ runChainweb cw nowServing = do
logg Warn $ "OpenAPI spec validation enabled on service API, make sure this is what you want"
mkValidationMiddleware
else return id

concurrentlies_

-- 1. Start serving Rest API
[ (if tls then serve else servePlain)
$ httpLog
. throttle (_chainwebPutPeerThrottler cw)
. throttle (_chainwebMempoolThrottler cw)
. throttle (_chainwebThrottler cw)
. p2pRequestSizeLimit
. p2pValidationMiddleware

-- 2. Start Clients (with a delay of 500ms)
, threadDelay 500000 >> clients

-- 3. Start serving local API
, threadDelay 500000 >> do
serveServiceApi
$ serviceHttpLog
. serviceRequestSizeLimit
. serviceApiValidationMiddleware
]
let theP2pThrottleConfig = cw ^. chainwebConfig . configP2p . p2pConfigThrottleConfig
let theServiceApiThrottleConfig = cw ^. chainwebConfig . configServiceApi . serviceApiConfigThrottleConfig
let withP2pThrottleMiddleware =
if _enableConfigEnabled theP2pThrottleConfig
then Throttle.throttleMiddleware (logFunction $ _chainwebLogger cw) "p2p" (_enableConfigConfig theP2pThrottleConfig)
else \k -> k id
let withServiceApiThrottleMiddleware =
if _enableConfigEnabled theServiceApiThrottleConfig
then Throttle.throttleMiddleware (logFunction $ _chainwebLogger cw) "p2p" (_enableConfigConfig theServiceApiThrottleConfig)
else \k -> k id

withP2pThrottleMiddleware $ \p2pThrottler ->
withServiceApiThrottleMiddleware $ \serviceThrottler ->

concurrentlies_

-- 1. Start serving Rest API
[ (if tls then serve else servePlain)
$ httpLog
. p2pRequestSizeLimit
. p2pThrottler
. p2pValidationMiddleware

-- 2. Start Clients (with a delay of 500ms)
, threadDelay 500000 >> clients

-- 3. Start serving local API
, threadDelay 500000 >> do
serveServiceApi
$ serviceHttpLog
. serviceRequestSizeLimit
. serviceThrottler
. serviceApiValidationMiddleware
]

where

Expand Down Expand Up @@ -805,12 +764,16 @@ runChainweb cw nowServing = do
when (defaultShouldDisplayException e) $
logg Debug $ loggServerError msg r e

onExceptionResponse ex =
fromMaybe (defaultOnExceptionResponse ex) (Throttle.throttledResponse ex)

-- P2P Server

serverSettings :: Counter "clientClosedConnections" -> Settings
serverSettings clientClosedConnectionsCounter =
peerServerSettings (_peerResPeer $ _chainwebPeer cw)
& setOnException (logWarpException "P2P API" clientClosedConnectionsCounter)
& setOnExceptionResponse onExceptionResponse
& setBeforeMainLoop (nowServing (nowServingP2PAPI .~ True))

monitorConnectionsClosedByClient :: Counter "clientClosedConnections" -> IO ()
Expand Down Expand Up @@ -893,6 +856,7 @@ runChainweb cw nowServing = do
& setHost interface
& setOnException
(logWarpException "Service API" clientClosedConnectionsCounter)
& setOnExceptionResponse onExceptionResponse
& setBeforeMainLoop (nowServing (nowServingServiceAPI .~ True))
& setServerName "Chainweb Service API"

Expand Down
Loading
Loading