Skip to content

smp server: refactor subscriptions and delivery in order to always response SOK on subscription with an optional message to follow. #1573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,8 +1179,8 @@ okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs
Left e -> Left e

-- | Send SMP command
sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateAuthKey -> QueueId -> Command p -> ExceptT SMPClientError IO BrokerMsg
sendSMPCommand c pKey qId cmd = sendProtocolCommand c pKey qId (Cmd sParty cmd)
sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateAuthKey -> EntityId -> Command p -> ExceptT SMPClientError IO BrokerMsg
sendSMPCommand c pKey entId cmd = sendProtocolCommand c pKey entId (Cmd sParty cmd)
{-# INLINE sendSMPCommand #-}

type PCTransmission err msg = (Either TransportError SentRawTransmission, Request err msg)
Expand Down
4 changes: 4 additions & 0 deletions src/Simplex/Messaging/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ module Simplex.Messaging.Protocol
BasicAuth (..),
SrvLoc (..),
CorrId (..),
pattern NoCorrId,
EntityId (..),
pattern NoEntity,
QueueId,
Expand Down Expand Up @@ -1370,6 +1371,9 @@ newtype CorrId = CorrId {bs :: ByteString}
deriving (Eq, Ord, Show)
deriving newtype (Encoding)

pattern NoCorrId :: CorrId
pattern NoCorrId = CorrId ""

instance IsString CorrId where
fromString = CorrId . fromString
{-# INLINE fromString #-}
Expand Down
399 changes: 213 additions & 186 deletions src/Simplex/Messaging/Server.hs

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module Simplex.Messaging.Server.Env.STM
MsgStore (..),
AStoreType (..),
VerifiedTransmission,
ResponseAndMessage,
newEnv,
mkJournalStoreConfig,
msgStore,
Expand Down Expand Up @@ -377,7 +378,7 @@ sameClient c cv = maybe False (sameClientId c) <$> readTVar cv
data ClientSub
= CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- includes previous and new associated service IDs
| CSDeleted QueueId (Maybe ServiceId) -- includes previously associated service IDs
| CSService ServiceId -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
| CSService ServiceId Int64 -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB

newtype ProxyAgent = ProxyAgent
{ smpAgent :: SMPClientAgent 'Sender
Expand All @@ -392,7 +393,7 @@ data Client s = Client
serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]),
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
procThreads :: TVar Int,
endThreads :: TVar (IntMap (Weak ThreadId)),
Expand All @@ -406,13 +407,15 @@ data Client s = Client

type VerifiedTransmission s = (Maybe (StoreQueue s, QueueRec), Transmission Cmd)

type ResponseAndMessage = (Transmission BrokerMsg, Maybe (Transmission BrokerMsg))

data ServerSub = ServerSub (TVar SubscriptionThread) | ProhibitSub

data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId)

data Sub = Sub
{ subThread :: ServerSub, -- Nothing value indicates that sub
delivered :: TMVar MsgId
delivered :: TVar (Maybe (MsgId, RoundedSystemTime))
}

newServer :: IO (Server s)
Expand Down Expand Up @@ -491,13 +494,13 @@ newClient clientId qSize clientTHParams createdAt = do

newSubscription :: SubscriptionThread -> STM Sub
newSubscription st = do
delivered <- newEmptyTMVar
delivered <- newTVar Nothing
subThread <- ServerSub <$> newTVar st
return Sub {subThread, delivered}

newProhibitedSub :: STM Sub
newProhibitedSub = do
delivered <- newEmptyTMVar
delivered <- newTVar Nothing
return Sub {subThread = ProhibitSub, delivered}

newEnv :: ServerConfig s -> IO (Env s)
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
{-# INLINE setQueueService #-}
getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s))
{-# INLINE getQueueNtfServices #-}
getNtfServiceQueueCount = withQS (getNtfServiceQueueCount @(JournalQueue s))
{-# INLINE getNtfServiceQueueCount #-}
getServiceQueueCount = withQS (getServiceQueueCount @(JournalQueue s))
{-# INLINE getServiceQueueCount #-}

makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do
Expand Down
25 changes: 25 additions & 0 deletions src/Simplex/Messaging/Server/Prometheus.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
module Simplex.Messaging.Server.Prometheus where

import Data.Int (Int64)
import qualified Data.IntMap as IM
import Data.List (mapAccumL)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock (UTCTime (..), diffUTCTime)
Expand Down Expand Up @@ -35,11 +37,18 @@ data RealTimeMetrics = RealTimeMetrics
threadsCount :: Int,
clientsCount :: Int,
deliveredSubs :: RTSubscriberMetrics,
deliveredTimes :: TimeAggregations,
smpSubs :: RTSubscriberMetrics,
ntfSubs :: RTSubscriberMetrics,
loadedCounts :: LoadedQueueCounts
}

data TimeAggregations = TimeAggregations
{ sumTime :: Int64,
maxTime :: Int64,
timeBuckets :: IM.IntMap Int
}

data RTSubscriberMetrics = RTSubscriberMetrics
{ subsCount :: Int,
subClientsCount :: Int,
Expand All @@ -57,6 +66,7 @@ prometheusMetrics sm rtm ts =
threadsCount,
clientsCount,
deliveredSubs,
deliveredTimes,
smpSubs,
ntfSubs,
loadedCounts
Expand Down Expand Up @@ -436,6 +446,17 @@ prometheusMetrics sm rtm ts =
\# TYPE simplex_smp_delivered_clients_total gauge\n\
\simplex_smp_delivered_clients_total " <> mshow (subClientsCount deliveredSubs) <> "\n# delivered.subClientsCount\n\
\\n\
\# HELP simplex_smp_delivery_ack_time Times to confirm message delivery\n\
\# TYPE simplex_smp_delivery_ack_time histogram\n\
\simplex_smp_delivery_ack_time_sum " <> mshow (sumTime deliveredTimes) <> "\n# delivered.sumTime\n\
\simplex_smp_delivery_ack_time_count " <> mshow (subsCount deliveredSubs) <> "\n# delivered.subsCount\n"
<> showTimeBuckets (timeBuckets deliveredTimes)
<> showTimeBucket "+Inf" (subsCount deliveredSubs)
<> "\n\
\# HELP simplex_smp_delivery_ack_time_max Max time to confirm message delivery\n\
\# TYPE simplex_smp_delivery_ack_time_max gauge\n\
\simplex_smp_delivery_ack_time_max " <> mshow (maxTime deliveredTimes) <> "\n# delivered.maxTime\n\
\\n\
\# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\
\# TYPE simplex_smp_subscribtion_total gauge\n\
\simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\
Expand Down Expand Up @@ -480,6 +501,10 @@ prometheusMetrics sm rtm ts =
\# TYPE simplex_smp_loaded_queues_ntf_lock_count gauge\n\
\simplex_smp_loaded_queues_ntf_lock_count " <> mshow (notifierLockCount loadedCounts) <> "\n# loadedCounts.notifierLockCount\n"

showTimeBuckets :: IM.IntMap Int -> Text
showTimeBuckets = T.concat . snd . mapAccumL (\total (sec, cnt) -> (total + cnt, showTimeBucket (tshow sec) (total + cnt))) 0 . IM.assocs
showTimeBucket :: Text -> Int -> Text
showTimeBucket sec count = "simplex_smp_delivery_ack_time_bucket{le=\"" <> sec <> "\"} " <> mshow count <> "\n# delivered.timeBuckets\n"
socketsMetric :: (SocketStats -> Int) -> Text -> Text -> Text
socketsMetric sel metric descr =
"# HELP " <> metric <> " " <> descr <> "\n"
Expand Down
3 changes: 3 additions & 0 deletions src/Simplex/Messaging/Server/QueueStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,6 @@ getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` pr

getSystemDate :: IO RoundedSystemTime
getSystemDate = getRoundedSystemTime 86400

getSystemSeconds :: IO RoundedSystemTime
getSystemSeconds = RoundedSystemTime . systemSeconds <$> getSystemTime
21 changes: 12 additions & 9 deletions src/Simplex/Messaging/Server/QueueStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ import qualified Data.ByteString.Builder as BB
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Lazy as LB
import Data.Bitraversable (bimapM)
import Data.Either (fromRight, lefts, rights)
import Data.Either (fromRight, lefts)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (foldl', intersperse, partition)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe, mapMaybe)
import Data.Maybe (catMaybes, fromMaybe)
import qualified Data.Set as S
import Data.Text (Text)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
Expand All @@ -64,7 +63,7 @@ import Database.PostgreSQL.Simple.ToField (Action (..), ToField (..))
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
import Database.PostgreSQL.Simple.SqlQQ (sql)
import GHC.IO (catchAny)
import Simplex.Messaging.Agent.Client (withLockMap, withLocksMap)
import Simplex.Messaging.Agent.Client (withLockMap)
import Simplex.Messaging.Agent.Lock (Lock)
import Simplex.Messaging.Agent.Store.AgentStore ()
import Simplex.Messaging.Agent.Store.Postgres (createDBStore, closeDBStore)
Expand All @@ -83,7 +82,7 @@ import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPServiceRole (..))
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>), ($>>=))
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>))
import System.Exit (exitFailure)
import System.IO (IOMode (..), hFlush, stdout)
import UnliftIO.STM
Expand Down Expand Up @@ -486,11 +485,15 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
in ((serviceId, sNtfs) : ssNtfs, restNtfs)

getNtfServiceQueueCount :: PostgresQueueStore q -> ServiceId -> IO (Either ErrorType Int64)
getNtfServiceQueueCount st serviceId =
E.uninterruptibleMask_ $ runExceptT $ withDB' "getNtfServiceQueueCount" st $ \db ->
getServiceQueueCount :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
getServiceQueueCount st party serviceId =
E.uninterruptibleMask_ $ runExceptT $ withDB' "getServiceQueueCount" st $ \db ->
fmap (fromMaybe 0) $ maybeFirstRow fromOnly $
DB.query db "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL" (Only serviceId)
DB.query db query (Only serviceId)
where
query = case party of
SRecipientService -> "SELECT count(1) FROM msg_queues WHERE rcv_service_id = ? AND deleted_at IS NULL"
SNotifierService -> "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL"

batchInsertServices :: [STMService] -> PostgresQueueStore q -> IO Int64
batchInsertServices services' toStore =
Expand Down
11 changes: 8 additions & 3 deletions src/Simplex/Messaging/Server/QueueStore/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,15 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
pure ((Just serviceId, sNtfs) : ssNtfs, restNtfs)

getNtfServiceQueueCount :: STMQueueStore q -> ServiceId -> IO (Either ErrorType Int64)
getNtfServiceQueueCount st serviceId =
getServiceQueueCount :: (PartyI p, ServiceParty p) => STMQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
getServiceQueueCount st party serviceId =
TM.lookupIO serviceId (services st) >>=
maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceNtfQueues)
maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceSel)
where
serviceSel :: STMService -> TVar (Set QueueId)
serviceSel = case party of
SRecipientService -> serviceRcvQueues
SNotifierService -> serviceNtfQueues

withQueueRec :: TVar (Maybe QueueRec) -> (QueueRec -> STM a) -> IO (Either ErrorType a)
withQueueRec qr a = atomically $ readQueueRec qr >>= mapM a
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Server/QueueStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class StoreQueueClass q => QueueStoreClass q s where
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
getNtfServiceQueueCount :: s -> ServiceId -> IO (Either ErrorType Int64)
getServiceQueueCount :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType Int64)

data EntityCounts = EntityCounts
{ queueCount :: Int,
Expand Down
31 changes: 31 additions & 0 deletions tests/CoreTests/BatchingTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ batchingTests = do
it "should batch with 135 subscriptions per batch" testClientBatchSubscriptions
it "should batch with 255 ENDs per batch" testClientBatchENDs
it "should batch with 80 NMSGs per batch" testClientBatchNMSGs
it "should batch subscription responses with message" testBatchSubResponses
it "should break on message that does not fit" testClientBatchWithMessage
it "should break on large message" testClientBatchWithLargeMessage

Expand Down Expand Up @@ -207,6 +208,20 @@ testClientBatchNMSGs = do
(length rs1, length rs2, length rs3) `shouldBe` (40, 80, 80)
all lenOk [s1, s2, s3] `shouldBe` True

-- 4 responses are used in Simplex.Messaging.Server / `send`
testBatchSubResponses :: IO ()
testBatchSubResponses = do
client <- testClientStub
soks <- replicateM 4 $ randomSOK
msg <- randomMSG
let msgs = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks <> [msg])
batches = batchTransmissions (thParams client) $ L.fromList msgs
length batches `shouldBe` 1
soks' <- replicateM 5 $ randomSOK
let msgs' = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks' <> [msg])
batches' = batchTransmissions (thParams client) $ L.fromList msgs'
length batches' `shouldBe` 2

testClientBatchWithMessageV6 :: IO ()
testClientBatchWithMessageV6 = do
client <- testClientStubV6
Expand Down Expand Up @@ -361,6 +376,22 @@ randomNMSGCmd ts = do
Right encNMsgMeta <- pure $ C.cbEncrypt (C.dh' k pk) nonce (smpEncode msgMeta) 128
pure (CorrId "", EntityId nId, NMSG nonce encNMsgMeta)

randomSOK :: IO (Transmission BrokerMsg)
randomSOK = do
g <- C.newRandom
corrId <- atomically $ C.randomBytes 24 g
rId <- atomically $ C.randomBytes 24 g
pure (CorrId corrId, EntityId rId, SOK Nothing)

randomMSG :: IO (Transmission BrokerMsg)
randomMSG = do
g <- C.newRandom
corrId <- atomically $ C.randomBytes 24 g
rId <- atomically $ C.randomBytes 24 g
msgId <- atomically $ C.randomBytes 24 g
msg <- atomically $ C.randomBytes (maxMessageLength currentClientSMPRelayVersion) g
pure (CorrId corrId, EntityId rId, MSG RcvMessage {msgId, msgBody = EncRcvMsgBody msg})

randomSENDv6 :: ByteString -> Int -> IO (Either TransportError (Maybe TAuthorizations, ByteString))
randomSENDv6 = randomSEND_ C.SEd25519 minServerSMPRelayVersion

Expand Down
Loading
Loading