diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 16b1169d4..ddb9b8e3c 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -1179,8 +1179,8 @@ okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs Left e -> Left e -- | Send SMP command -sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateAuthKey -> QueueId -> Command p -> ExceptT SMPClientError IO BrokerMsg -sendSMPCommand c pKey qId cmd = sendProtocolCommand c pKey qId (Cmd sParty cmd) +sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateAuthKey -> EntityId -> Command p -> ExceptT SMPClientError IO BrokerMsg +sendSMPCommand c pKey entId cmd = sendProtocolCommand c pKey entId (Cmd sParty cmd) {-# INLINE sendSMPCommand #-} type PCTransmission err msg = (Either TransportError SentRawTransmission, Request err msg) diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index b3a76f37e..4d5a2f551 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -114,6 +114,7 @@ module Simplex.Messaging.Protocol BasicAuth (..), SrvLoc (..), CorrId (..), + pattern NoCorrId, EntityId (..), pattern NoEntity, QueueId, @@ -1370,6 +1371,9 @@ newtype CorrId = CorrId {bs :: ByteString} deriving (Eq, Ord, Show) deriving newtype (Encoding) +pattern NoCorrId :: CorrId +pattern NoCorrId = CorrId "" + instance IsString CorrId where fromString = CorrId . fromString {-# INLINE fromString #-} diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 194cded59..f3695650c 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -61,17 +61,18 @@ import qualified Data.ByteString.Lazy.Char8 as LB import Data.Constraint (Dict (..)) import Data.Dynamic (toDyn) import Data.Either (fromRight, partitionEithers) +import Data.Foldable (foldrM) import Data.Functor (($>)) import Data.IORef import Data.Int (Int64) import qualified Data.IntMap.Strict as IM import qualified Data.IntSet as IS -import Data.List (foldl', intercalate, mapAccumR) +import Data.List (foldl', intercalate) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing) +import Data.Maybe (fromMaybe, isJust, isNothing) import Data.Semigroup (Sum (..)) import qualified Data.Set as S import Data.Text (Text) @@ -271,6 +272,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt as'' <- if prevServiceId == serviceId_ then pure [] else endServiceSub prevServiceId qId END case serviceId_ of Just serviceId -> do + modifyTVar' totalServiceSubs (+ 1) -- server count for all services as <- endQueueSub qId END as' <- cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers pure $ as ++ as' ++ as'' @@ -282,8 +284,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt as <- endQueueSub qId DELD as' <- endServiceSub serviceId qId DELD pure $ as ++ as' - CSService serviceId -> do + CSService serviceId count -> do modifyTVar' subClients $ IS.insert clntId -- add ID to server's subscribed cients + modifyTVar' totalServiceSubs (+ count) -- server count for all services cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers updateSubDisconnected = case clntSub of -- do not insert client if it is already disconnected, but send END/DELD to any other client subscribed to this queue or service @@ -296,7 +299,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt as <- endQueueSub qId DELD as' <- endServiceSub serviceId qId DELD pure $ as ++ as' - CSService serviceId -> cancelServiceSubs serviceId =<< lookupSubscribedClient serviceId serviceSubscribers + CSService serviceId _ -> cancelServiceSubs serviceId =<< lookupSubscribedClient serviceId serviceSubscribers endQueueSub :: QueueId -> BrokerMsg -> STM [PrevClientSub s] endQueueSub qId msg = prevSub qId msg (CSAEndSub qId) =<< lookupDeleteSubscribedClient qId queueSubscribers endServiceSub :: Maybe ServiceId -> QueueId -> BrokerMsg -> STM [PrevClientSub s] @@ -382,7 +385,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt let cancelNtfs s = throwSTM $ userError $ s <> ", " <> show (length ts_) <> " ntfs kept" unlessM (currentClient readTVar c) $ cancelNtfs "not current client" whenM (isFullTBQueue sndQ) $ cancelNtfs "sending queue full" - writeTBQueue sndQ ts + writeTBQueue sndQ (ts, []) pure $ length ts_ currentClient :: Monad m => (forall a. TVar a -> m a) -> Client s' -> m Bool currentClient rd Client {clientId, connected} = (&&) <$> rd connected <*> (IS.member clientId <$> rd subClients) @@ -393,7 +396,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt ntfs -> do writeTVar v [] pure $ foldl' (\acc' ntf -> nmsg nId ntf : acc') acc ntfs -- reverses, to order by time - nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (CorrId "", nId, NMSG ntfNonce ntfEncMeta) + nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (NoCorrId, nId, NMSG ntfNonce ntfEncMeta) updateNtfStats :: Client s' -> Either SomeException Int -> IO () updateNtfStats Client {clientId} = \case Right 0 -> pure () @@ -419,14 +422,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt where enqueueEvts evts c@Client {connected, sndQ} = whenM (readTVarIO connected) $ do - sent <- atomically $ tryWriteTBQueue sndQ ts + sent <- atomically $ tryWriteTBQueue sndQ (ts, []) if sent then updateEndStats else -- if queue is full it can block - forkClient c ("sendPendingEvtsThread.queueEvts") $ - atomically (writeTBQueue sndQ ts) >> updateEndStats + forkClient c "sendPendingEvtsThread.queueEvts" $ + atomically (writeTBQueue sndQ (ts, [])) >> updateEndStats where - ts = L.map (\(entId, evt) -> (CorrId "", entId, evt)) evts + ts = L.map (\(entId, evt) -> (NoCorrId, entId, evt)) evts -- this accounts for both END and DELD events updateEndStats = do let len = L.length evts @@ -684,24 +687,44 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt let threadsCount = 0 #endif clientsCount <- IM.size <$> getServerClients srv - deliveredSubs <- getDeliveredMetrics + (deliveredSubs, deliveredTimes) <- getDeliveredMetrics =<< getSystemSeconds smpSubs <- getSubscribersMetrics subscribers ntfSubs <- getSubscribersMetrics ntfSubscribers loadedCounts <- loadedQueueCounts $ fromMsgStore ms - pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, smpSubs, ntfSubs, loadedCounts} + pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts} where getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do subsCount <- M.size <$> getSubscribedClients queueSubscribers subClientsCount <- IS.size <$> readTVarIO subClients subServicesCount <- M.size <$> getSubscribedClients serviceSubscribers pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount} - getDeliveredMetrics = foldM countClnt (RTSubscriberMetrics 0 0 0) =<< getServerClients srv - countClnt metrics Client {subscriptions} = do - cnt <- foldM countSubs 0 =<< readTVarIO subscriptions - pure $ if cnt > 0 - then metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1} - else metrics - countSubs !cnt Sub {delivered} = (\empty -> if empty then cnt else cnt + 1) <$> atomically (isEmptyTMVar delivered) + getDeliveredMetrics (RoundedSystemTime ts') = foldM countClnt (RTSubscriberMetrics 0 0 0, TimeAggregations 0 0 IM.empty) =<< getServerClients srv + where + countClnt acc@(metrics, times) Client {subscriptions} = do + (cnt, times') <- foldM countSubs (0, times) =<< readTVarIO subscriptions + pure $ if cnt > 0 + then (metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}, times') + else acc + countSubs acc@(!cnt, TimeAggregations {sumTime, maxTime, timeBuckets}) Sub {delivered} = do + delivered_ <- readTVarIO delivered + pure $ case delivered_ of + Nothing -> acc + Just (_, RoundedSystemTime ts) -> + let t = ts' - ts + seconds + | t <= 5 = fromIntegral t + | t <= 30 = t `toBucket` 5 + | t <= 60 = t `toBucket` 10 + | t <= 180 = t `toBucket` 30 + | otherwise = t `toBucket` 60 + toBucket n m = - fromIntegral (((- n) `div` m) * m) -- round up + times' = + TimeAggregations + { sumTime = sumTime + t, + maxTime = max maxTime t, + timeBuckets = IM.alter (Just . maybe 1 (+ 1)) seconds timeBuckets + } + in (cnt + 1, times') runClient :: Transport c => X.CertificateChain -> C.APrivateSignKey -> TProxy c 'TServer -> c 'TServer -> M s () runClient srvCert srvSignKey tp h = do @@ -1090,7 +1113,7 @@ receive h@THandle {params = THandleParams {thAuth, sessionId}} ms Client {rcvQ, atomically . (writeTVar rcvActiveAt $!) =<< getSystemTime let (es, ts') = partitionEithers $ L.toList ts errs = map (second ERR) es - case ts' of + errs' <- case ts' of (_, _, (_, _, Cmd p cmd)) : rest -> do let service = peerClientService =<< thAuth (errs', cmds) <- partitionEithers <$> case batchParty p of @@ -1100,9 +1123,10 @@ receive h@THandle {params = THandleParams {thAuth, sessionId}} ms Client {rcvQ, qs <- getQueueRecs ms p $ map queueId ts' zipWithM (\t -> verified stats t . verifyLoadedQueue service thAuth t) ts' qs _ -> mapM (\t -> verified stats t =<< verifyTransmission ms service thAuth t) ts' - write rcvQ cmds - write sndQ $ errs ++ errs' - [] -> write sndQ errs + mapM_ (atomically . writeTBQueue rcvQ) $ L.nonEmpty cmds + pure $ errs ++ errs' + [] -> pure errs + mapM_ (atomically . writeTBQueue sndQ . (,[])) $ L.nonEmpty errs' where sameParty :: SParty p -> SignedTransmission Cmd -> Bool sameParty p (_, _, (_, _, Cmd p' _)) = isJust $ testEquality p p' @@ -1124,32 +1148,27 @@ receive h@THandle {params = THandleParams {thAuth, sessionId}} ms Client {rcvQ, NSUB -> incStat $ ntfSubAuth stats GET -> incStat $ msgGetAuth stats _ -> pure () - write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty send :: Transport c => MVar (THandleSMP c 'TServer) -> Client s -> IO () send th c@Client {sndQ, msgQ, clientTHParams = THandleParams {sessionId}} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send" forever $ atomically (readTBQueue sndQ) >>= sendTransmissions where - sendTransmissions :: NonEmpty (Transmission BrokerMsg) -> IO () - sendTransmissions ts - | L.length ts <= 2 = tSend th c ts + -- If the request had batched subscriptions + -- this will reply SOKs to all SUBs in the first batched transmission, + -- to reduce client timeouts. + -- After that all messages will be sent in separate transmissions, + -- without any client response timeouts, and allowing them to interleave + -- with other requests responses. + sendTransmissions :: (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]) -> IO () + sendTransmissions (ts, []) = tSend th c ts + sendTransmissions (ts, msg : msgs) + | length ts <= 4 = do -- up to 4 SOKs can be in one block with MSG (see testBatchSubResponses test) + tSend th c $ ts <> [msg] + mapM_ (atomically . writeTBQueue msgQ) $ L.nonEmpty msgs | otherwise = do - let (msgs_, ts') = mapAccumR splitMessages [] ts - -- If the request had batched subscriptions and L.length ts > 2 - -- this will reply OK to all SUBs in the first batched transmission, - -- to reduce client timeouts. - tSend th c ts' - -- After that all messages will be sent in separate transmissions, - -- without any client response timeouts, and allowing them to interleave - -- with other requests responses. - mapM_ (atomically . writeTBQueue msgQ) $ L.nonEmpty msgs_ - where - splitMessages :: [Transmission BrokerMsg] -> Transmission BrokerMsg -> ([Transmission BrokerMsg], Transmission BrokerMsg) - splitMessages msgs t@(corrId, entId, cmd) = case cmd of - -- replace MSG response with OK, accumulating MSG in a separate list. - MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK)) - _ -> (msgs, t) + tSend th c ts + atomically $ writeTBQueue msgQ (msg :| msgs) sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client s -> IO () sendMsg th c@Client {msgQ, clientTHParams = THandleParams {sessionId}} = do @@ -1195,7 +1214,7 @@ verifyLoadedQueue service thAuth t@(tAuth, authorized, (corrId, _, _)) = \case Left e -> VRFailed e verifyQueueTransmission :: forall s. Maybe THPeerClientService -> Maybe (THandleAuth 'TServer) -> SignedTransmission Cmd -> Maybe (StoreQueue s, QueueRec) -> VerificationResult s -verifyQueueTransmission service thAuth (tAuth, authorized, (corrId, _, command@(Cmd p cmd))) q_ +verifyQueueTransmission service thAuth (tAuth, authorized, (corrId, entId, command@(Cmd p cmd))) q_ | not checkRole = VRFailed $ CMD PROHIBITED | not verifyServiceSig = VRFailed SERVICE | otherwise = vc p cmd @@ -1221,8 +1240,8 @@ verifyQueueTransmission service thAuth (tAuth, authorized, (corrId, _, command@( verify = verifyCmdAuthorization thAuth tAuth authorized' corrId verifyServiceCmd :: VerificationResult s verifyServiceCmd = case (service, tAuth) of - (Just THClientService {serviceKey = k}, Just (TASignature (C.ASignature C.SEd25519 s), Nothing)) - | C.verify' k s authorized -> VRVerified Nothing + (Just THClientService {serviceId, serviceKey = k}, Just (TASignature (C.ASignature C.SEd25519 s), Nothing)) + | entId == serviceId && C.verify' k s authorized -> VRVerified Nothing _ -> VRFailed SERVICE -- this function verify service signature for commands that use it in service sessions verifyServiceSig @@ -1316,19 +1335,20 @@ client -- TODO [certs rcv] rcv subscriptions Server {subscribers, ntfSubscribers} ms - clnt@Client {clientId, subscriptions, ntfSubscriptions, serviceSubsCount = _todo', ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do + clnt@Client {clientId, ntfSubscriptions, ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" let THandleParams {thVersion} = thParams' - service = peerClientService =<< thAuth thParams' + clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams') + process t acc@(rs, msgs) = + (maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_))) + <$> processCommand clntServiceId thVersion t forever $ atomically (readTBQueue rcvQ) - >>= mapM (processCommand service thVersion) - >>= mapM_ reply . L.nonEmpty . catMaybes . L.toList + >>= foldrM process ([], []) + >>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_) where - reply :: MonadIO m => NonEmpty (Transmission BrokerMsg) -> m () - reply = atomically . writeTBQueue sndQ - processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe (Transmission BrokerMsg)) - processProxiedCmd (corrId, EntityId sessId, command) = (corrId,EntityId sessId,) <$$> case command of + processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage) + processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of PRXY srv auth -> ifM allowProxy getRelay (pure $ Just $ ERR $ PROXY BASIC_AUTH) where allowProxy = do @@ -1391,7 +1411,7 @@ client forkProxiedCmd cmdAction = do bracket_ wait signal . forkClient clnt (B.unpack $ "client $" <> encode sessionId <> " proxy") $ do -- commands MUST be processed under a reasonable timeout or the client would halt - cmdAction >>= \t -> reply [(corrId, EntityId sessId, t)] + cmdAction >>= \t -> atomically $ writeTBQueue sndQ ([(corrId, EntityId sessId, t)], []) pure Nothing where wait = do @@ -1407,32 +1427,32 @@ client mkIncProxyStats ps psOwn own sel = do incStat $ sel ps when own $ incStat $ sel psOwn - processCommand :: Maybe THPeerClientService -> VersionSMP -> VerifiedTransmission s -> M s (Maybe (Transmission BrokerMsg)) - processCommand service clntVersion (q_, (corrId, entId, cmd)) = case cmd of + processCommand :: Maybe ServiceId -> VersionSMP -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) + processCommand clntServiceId clntVersion (q_, (corrId, entId, cmd)) = case cmd of Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command) - Cmd SSender command -> Just <$> case command of + Cmd SSender command -> case command of SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k - SEND flags msgBody -> withQueue_ False $ sendMessage flags msgBody - Cmd SIdleClient PING -> pure $ Just (corrId, NoEntity, PONG) - Cmd SProxyService (RFWD encBlock) -> Just . (corrId,NoEntity,) <$> processForwardedCommand encBlock - Cmd SSenderLink command -> Just <$> case command of + SEND flags msgBody -> response <$> withQueue_ False err (sendMessage flags msgBody) + Cmd SIdleClient PING -> pure $ response (corrId, NoEntity, PONG) + Cmd SProxyService (RFWD encBlock) -> response . (corrId,NoEntity,) <$> processForwardedCommand encBlock + Cmd SSenderLink command -> case command of LKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k $>> getQueueLink_ q qr LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr - Cmd SNotifier NSUB -> Just . (corrId,entId,) <$> case q_ of + Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of Just (q, QueueRec {notifier = Just ntfCreds}) -> subscribeNotifications q ntfCreds _ -> pure $ ERR INTERNAL - Cmd SNotifierService NSUBS -> Just . (corrId,entId,) <$> case service of - Just s -> subscribeServiceNotifications s + Cmd SNotifierService NSUBS -> response . (corrId,entId,) <$> case clntServiceId of + Just serviceId -> subscribeServiceNotifications serviceId Nothing -> pure $ ERR INTERNAL Cmd SCreator (NEW nqr@NewQueueReq {auth_}) -> - Just <$> ifM allowNew (createQueue nqr) (pure (corrId, entId, ERR AUTH)) + response <$> ifM allowNew (createQueue nqr) (pure (corrId, entId, ERR AUTH)) where allowNew = do ServerConfig {allowNewQueues, newQueueBasicAuth} <- asks config pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth Cmd SRecipient command -> - Just <$> case command of - SUB -> withQueue subscribeQueue + case command of + SUB -> withQueue' subscribeQueueAndDeliver GET -> withQueue getMessage ACK msgId -> withQueue $ acknowledgeMsg msgId KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey @@ -1448,15 +1468,15 @@ client Nothing -> pure ok NKEY nKey dhKey -> withQueue $ \q _ -> addQueueNotifier_ q nKey dhKey NDEL -> withQueue $ \q _ -> deleteQueueNotifier_ q - OFF -> maybe (pure $ err INTERNAL) suspendQueue_ q_ - DEL -> maybe (pure $ err INTERNAL) delQueueAndMsgs q_ + OFF -> response <$> maybe (pure $ err INTERNAL) suspendQueue_ q_ + DEL -> response <$> maybe (pure $ err INTERNAL) delQueueAndMsgs q_ QUE -> withQueue $ \q qr -> (corrId,entId,) <$> getQueueInfo q qr - Cmd SRecipientService SUBS -> pure $ Just $ err (CMD PROHIBITED) -- "TODO [certs rcv]" + Cmd SRecipientService SUBS -> pure $ response $ err (CMD PROHIBITED) -- "TODO [certs rcv]" where createQueue :: NewQueueReq -> M s (Transmission BrokerMsg) createQueue NewQueueReq {rcvAuthKey, rcvDhKey, subMode, queueReqData} - | isJust service && subMode == SMOnlyCreate = pure (corrId, entId, ERR $ CMD PROHIBITED) - | otherwise = time "NEW" $ do + | isJust clntServiceId && subMode == SMOnlyCreate = pure (corrId, entId, ERR $ CMD PROHIBITED) + | otherwise = do g <- asks random idSize <- asks $ queueIdBytes . config updatedAt <- Just <$> liftIO getSystemDate @@ -1486,7 +1506,6 @@ client -- notifierId <- randId -- pure (NtfCreds {notifierId, notifierKey, rcvNtfDhSecret}, ServerNtfCreds notifierId rcvPubDhKey) let queueMode = queueReqMode <$> queueReqData - rcvServiceId = (\THClientService {serviceId} -> serviceId) <$> service qr = QueueRec { senderId = sndId, @@ -1499,7 +1518,7 @@ client notifier = Nothing, -- fst <$> ntf, status = EntityActive, updatedAt, - rcvServiceId + rcvServiceId = clntServiceId } liftIO (addQueue ms rcvId qr) >>= \case Left DUPLICATE_ -- TODO [short links] possibly, we somehow need to understand which IDs caused collision to retry if it's not client-supplied? @@ -1514,8 +1533,8 @@ client -- when (isJust ntf) $ incStat $ ntfCreated stats case subMode of SMOnlyCreate -> pure () - SMSubscribe -> void $ subscribeNewQueue rcvId qr -- no need to check if message is available, it's a new queue - pure $ IDS QIK {rcvId, sndId, rcvPublicDhKey, queueMode, linkId = fst <$> queueData, serviceId = rcvServiceId} -- , serverNtfCreds = snd <$> ntf + SMSubscribe -> subscribeNewQueue rcvId qr -- no need to check if message is available, it's a new queue + pure $ IDS QIK {rcvId, sndId, rcvPublicDhKey, queueMode, linkId = fst <$> queueData, serviceId = clntServiceId} -- , serverNtfCreds = snd <$> ntf (corrId,entId,) <$> tryCreate (3 :: Int) -- this check allows to support contact queues created prior to SKEY, @@ -1539,7 +1558,7 @@ client getQueueLink_ q qr = liftIO $ LNK (senderId qr) <$$> getQueueLinkData (queueStore ms) q entId addQueueNotifier_ :: StoreQueue s -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> M s (Transmission BrokerMsg) - addQueueNotifier_ q notifierKey dhKey = time "NKEY" $ do + addQueueNotifier_ q notifierKey dhKey = do (rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random let rcvNtfDhSecret = C.dh' dhKey privDhKey (corrId,entId,) <$> addNotifierRetry 3 rcvPublicDhKey rcvNtfDhSecret @@ -1576,47 +1595,60 @@ client suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q -- TODO [certs rcv] if serviceId is passed, associate with the service and respond with SOK - subscribeQueue :: StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg) - subscribeQueue q qr = - liftIO (TM.lookupIO rId subscriptions) >>= \case - Nothing -> subscribeNewQueue rId qr >>= deliver True + subscribeQueueAndDeliver :: StoreQueue s -> QueueRec -> M s ResponseAndMessage + subscribeQueueAndDeliver q qr = + liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case + Nothing -> subscribeRcvQueue qr >>= deliver True Just s@Sub {subThread} -> do stats <- asks serverStats case subThread of ProhibitSub -> do -- cannot use SUB in the same connection where GET was used incStat $ qSubProhibited stats - pure (corrId, rId, ERR $ CMD PROHIBITED) + pure (err (CMD PROHIBITED), Nothing) _ -> do incStat $ qSubDuplicate stats - atomically (tryTakeTMVar $ delivered s) >> deliver False s + atomically (writeTVar (delivered s) Nothing) >> deliver False s where - rId = recipientId q - deliver :: Bool -> Sub -> M s (Transmission BrokerMsg) - deliver inc sub = do + deliver :: Bool -> Sub -> M s ResponseAndMessage + deliver hasSub sub = do stats <- asks serverStats - fmap (either (\e -> (corrId, rId, ERR e)) id) $ liftIO $ runExceptT $ do + fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do msg_ <- tryPeekMsg ms q - liftIO $ when (inc && isJust msg_) $ incStat (qSub stats) - liftIO $ deliverMessage "SUB" qr rId sub msg_ - - subscribeNewQueue :: RecipientId -> QueueRec -> M s Sub - subscribeNewQueue rId QueueRec {rcvServiceId} = time "SUB newSub" . atomically $ do - writeTQueue (subQ subscribers) (CSClient rId rcvServiceId Nothing, clientId) + msg' <- forM msg_ $ \msg -> liftIO $ do + ts <- getSystemSeconds + atomically $ setDelivered sub msg ts + unless hasSub $ incStat $ qSub stats + pure (NoCorrId, entId, MSG (encryptMsg qr msg)) + pure ((corrId, entId, SOK clntServiceId), msg') + + -- TODO [certs rcv] combine with subscribing ntf queues + subscribeRcvQueue :: QueueRec -> M s Sub + subscribeRcvQueue QueueRec {rcvServiceId} = atomically $ do + writeTQueue (subQ subscribers) (CSClient entId rcvServiceId Nothing, clientId) sub <- newSubscription NoSub - TM.insert rId sub subscriptions + TM.insert entId sub $ subscriptions clnt pure sub + subscribeNewQueue :: RecipientId -> QueueRec -> M s () + subscribeNewQueue rId QueueRec {rcvServiceId} = do + case rcvServiceId of + Just _ -> atomically $ modifyTVar' (serviceSubsCount clnt) (+ 1) + Nothing -> do + sub <- atomically $ newSubscription NoSub + atomically $ TM.insert rId sub $ subscriptions clnt + atomically $ writeTQueue (subQ subscribers) (CSClient rId rcvServiceId rcvServiceId, clientId) + -- clients that use GET are not added to server subscribers getMessage :: StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg) - getMessage q qr = time "GET" $ do - atomically (TM.lookup entId subscriptions) >>= \case + getMessage q qr = do + atomically (TM.lookup entId $ subscriptions clnt) >>= \case Nothing -> atomically newSub >>= (`getMessage_` Nothing) Just s@Sub {subThread} -> case subThread of ProhibitSub -> - atomically (tryTakeTMVar $ delivered s) + atomically (swapTVar (delivered s) Nothing) >>= getMessage_ s -- cannot use GET in the same connection where there is an active subscription _ -> do @@ -1627,13 +1659,13 @@ client newSub :: STM Sub newSub = do s <- newProhibitedSub - TM.insert entId s subscriptions + TM.insert entId s $ subscriptions clnt -- Here we don't account for this client as subscribed in the server -- and don't notify other subscribed clients. -- This is tracked as "subscription" in the client to prevent these -- clients from being able to subscribe. pure s - getMessage_ :: Sub -> Maybe MsgId -> M s (Transmission BrokerMsg) + getMessage_ :: Sub -> Maybe (MsgId, RoundedSystemTime) -> M s (Transmission BrokerMsg) getMessage_ s delivered_ = do stats <- asks serverStats fmap (either err id) $ liftIO $ runExceptT $ @@ -1641,30 +1673,36 @@ client Just msg -> do let encMsg = encryptMsg qr msg incStat $ (if isJust delivered_ then msgGetDuplicate else msgGet) stats - atomically $ setDelivered s msg $> (corrId, entId, MSG encMsg) + ts <- liftIO getSystemSeconds + atomically $ setDelivered s msg ts $> (corrId, entId, MSG encMsg) Nothing -> incStat (msgGetNoMsg stats) $> ok - withQueue :: (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Transmission BrokerMsg) - withQueue = withQueue_ True + withQueue :: (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Maybe ResponseAndMessage) + withQueue = fmap response . withQueue_ True err + {-# INLINE withQueue #-} + + withQueue' :: (StoreQueue s -> QueueRec -> M s ResponseAndMessage) -> M s (Maybe ResponseAndMessage) + withQueue' = fmap Just . withQueue_ True ((,Nothing) . err) + {-# INLINE withQueue' #-} -- SEND passes queueNotBlocked False here to update time, but it fails anyway on blocked queues (see code for SEND). - withQueue_ :: Bool -> (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Transmission BrokerMsg) - withQueue_ queueNotBlocked action = case q_ of - Nothing -> pure $ err INTERNAL + withQueue_ :: Bool -> (ErrorType -> r) -> (StoreQueue s -> QueueRec -> M s r) -> M s r + withQueue_ queueNotBlocked err' action = case q_ of + Nothing -> pure $ err' INTERNAL Just (q, qr@QueueRec {status, updatedAt}) -> case status of - EntityBlocked info | queueNotBlocked -> pure $ err $ BLOCKED info + EntityBlocked info | queueNotBlocked -> pure $ err' $ BLOCKED info _ -> do t <- liftIO getSystemDate if updatedAt == Just t then action q qr - else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err) (action q) + else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q) subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg subscribeNotifications q NtfCreds {ntfServiceId} = do stats <- asks serverStats let incNtfSrvStat sel = incStat $ sel $ ntfServices stats - case service of - Just THClientService {serviceId} + case clntServiceId of + Just serviceId | ntfServiceId == Just serviceId -> do -- duplicate queue-service association - can only happen in case of response error/timeout hasSub <- atomically $ ifM hasServiceSub (pure True) (False <$ newServiceQueueSub) @@ -1713,24 +1751,24 @@ client incStat $ if hasSub then ntfSubDuplicate stats else ntfSub stats pure $ SOK Nothing - subscribeServiceNotifications :: THPeerClientService -> M s BrokerMsg - subscribeServiceNotifications THClientService {serviceId} = do + subscribeServiceNotifications :: ServiceId -> M s BrokerMsg + subscribeServiceNotifications serviceId = do srvSubs <- readTVarIO ntfServiceSubsCount if srvSubs == 0 then - liftIO (getNtfServiceQueueCount @(StoreQueue s) (queueStore ms) serviceId) >>= \case + liftIO (getServiceQueueCount @(StoreQueue s) (queueStore ms) SNotifierService serviceId) >>= \case Left e -> pure $ ERR e Right count -> do atomically $ do modifyTVar' ntfServiceSubsCount (+ count) -- service count modifyTVar' (totalServiceSubs ntfSubscribers) (+ count) -- server count for all services - atomically $ writeTQueue (subQ ntfSubscribers) (CSService serviceId, clientId) + atomically $ writeTQueue (subQ ntfSubscribers) (CSService serviceId count, clientId) pure $ SOKS count else pure $ SOKS srvSubs acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg) - acknowledgeMsg msgId q qr = time "ACK" $ do - liftIO (TM.lookupIO entId subscriptions) >>= \case + acknowledgeMsg msgId q qr = + liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case Nothing -> pure $ err NO_MSG Just sub -> atomically (getDelivered sub) >>= \case @@ -1744,16 +1782,20 @@ client pure ok _ -> do (deletedMsg_, msg_) <- tryDelPeekMsg ms q msgId - liftIO $ mapM_ (updateStats stats False) deletedMsg_ - liftIO $ deliverMessage "ACK" qr entId sub msg_ + liftIO $ do + mapM_ (updateStats stats False) deletedMsg_ + forM_ msg_ $ \msg -> do + ts <- getSystemSeconds + atomically $ setDelivered sub msg ts + pure (corrId, entId, maybe OK (MSG . encryptMsg qr) msg_) _ -> pure $ err NO_MSG where getDelivered :: Sub -> STM (Maybe ServerSub) getDelivered Sub {delivered, subThread} = do - tryTakeTMVar delivered $>>= \msgId' -> + readTVar delivered $>>= \(msgId', _) -> if msgId == msgId' || B.null msgId - then pure $ Just subThread - else putTMVar delivered msgId' $> Nothing + then writeTVar delivered Nothing $> Just subThread + else pure Nothing updateStats :: ServerStats -> Bool -> Message -> IO () updateStats stats isGet = \case MessageQuota {} -> pure () @@ -1796,7 +1838,7 @@ client deleteQueueLinkData (queueStore ms) q ServerConfig {messageExpiration, msgIdBytes} <- asks config msgId <- randomId' msgIdBytes - msg_ <- liftIO $ time "SEND" $ runExceptT $ do + msg_ <- liftIO $ runExceptT $ do expireMessages messageExpiration stats msg <- liftIO $ mkMessage msgId body writeMsg ms q True msg @@ -1805,7 +1847,7 @@ client Right Nothing -> do incStat $ msgSentQuota stats pure $ err QUOTA - Right (Just (msg, wasEmpty)) -> time "SEND ok" $ do + Right (Just (msg, wasEmpty)) -> do when wasEmpty $ liftIO $ tryDeliverMessage msg when (notification msgFlags) $ do mapM_ (`enqueueNotification` msg) (notifier qr) @@ -1839,33 +1881,35 @@ client -- the subscribed client var is read outside of STM to avoid transaction cost -- in case no client is subscribed. getSubscribedClient rId (queueSubscribers subscribers) - $>>= atomically . deliverToSub + $>>= deliverToSub >>= mapM_ forkDeliver where rId = recipientId q - deliverToSub rcv = - -- reading client TVar in the same transaction, - -- so that if subscription ends, it re-evalutates - -- and delivery is cancelled - - -- the new client will receive message in response to SUB. - readTVar rcv - $>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs - $>>= \s@Sub {subThread, delivered} -> case subThread of - ProhibitSub -> pure Nothing - ServerSub st -> readTVar st >>= \case - NoSub -> - tryTakeTMVar delivered >>= \case - Just _ -> pure Nothing -- if a message was already delivered, should not deliver more - Nothing -> - ifM - (isFullTBQueue sndQ') - (writeTVar st SubPending $> Just (rc, s, st)) - (deliver sndQ' s $> Nothing) - _ -> pure Nothing - deliver sndQ' s = do + deliverToSub rcv = do + ts <- getSystemSeconds + atomically $ + -- reading client TVar in the same transaction, + -- so that if subscription ends, it re-evalutates + -- and delivery is cancelled - + -- the new client will receive message in response to SUB. + readTVar rcv + $>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs + $>>= \s@Sub {subThread, delivered} -> case subThread of + ProhibitSub -> pure Nothing + ServerSub st -> readTVar st >>= \case + NoSub -> + readTVar delivered >>= \case + Just _ -> pure Nothing -- if a message was already delivered, should not deliver more + Nothing -> + ifM + (isFullTBQueue sndQ') + (writeTVar st SubPending $> Just (rc, s, st)) + (deliver sndQ' s ts $> Nothing) + _ -> pure Nothing + deliver sndQ' s ts = do let encMsg = encryptMsg qr msg - writeTBQueue sndQ' [(CorrId "", rId, MSG encMsg)] - void $ setDelivered s msg + writeTBQueue sndQ' ([(NoCorrId, rId, MSG encMsg)], []) + setDelivered s msg ts forkDeliver (rc@Client {sndQ = sndQ'}, s@Sub {delivered}, st) = do t <- mkWeakThreadId =<< forkIO deliverThread atomically $ modifyTVar' st $ \case @@ -1878,13 +1922,14 @@ client -- lookup can be outside of STM transaction, -- as long as the check that it is the same client is inside. getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame - deliverIfSame rcv = time "deliver" . atomically $ - whenM (sameClient rc rcv) $ - tryTakeTMVar delivered >>= \case + deliverIfSame rcv = do + ts <- getSystemSeconds + atomically $ whenM (sameClient rc rcv) $ + readTVar delivered >>= \case Just _ -> pure () -- if a message was already delivered, should not deliver more Nothing -> do -- a separate thread is needed because it blocks when client sndQ is full. - deliver sndQ' s + deliver sndQ' s ts writeTVar st NoSub enqueueNotification :: NtfCreds -> Message -> M s () @@ -1924,7 +1969,8 @@ client Left r -> pure r -- rejectOrVerify filters allowed commands, no need to repeat it here. -- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types). - Right t''@(_, (corrId', entId', _)) -> fromMaybe (corrId', entId', ERR INTERNAL) <$> lift (processCommand Nothing fwdVersion t'') + -- `fst` removes empty message that is only returned for `SUB` command + Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion t'') -- encode response r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of [] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right @@ -1957,21 +2003,6 @@ client VRVerified q -> Right (q, t'') VRFailed e -> Left (corrId', entId', ERR e) - deliverMessage :: T.Text -> QueueRec -> RecipientId -> Sub -> Maybe Message -> IO (Transmission BrokerMsg) - deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") . atomically $ - case subThread of - ProhibitSub -> pure resp - _ -> case msg_ of - Just msg -> - let encMsg = encryptMsg qr msg - in setDelivered s msg $> (corrId, rId, MSG encMsg) - _ -> pure resp - where - resp = (corrId, rId, OK) - - time :: MonadIO m => T.Text -> m a -> m a - time name = timed name entId - encryptMsg :: QueueRec -> Message -> RcvMessage encryptMsg qr msg = encrypt . encodeRcvMsgBody $ case msg of Message {msgFlags, msgBody} -> RcvMsgBody {msgTs = msgTs', msgFlags, msgBody} @@ -1982,19 +2013,21 @@ client msgId' = messageId msg msgTs' = messageTs msg - setDelivered :: Sub -> Message -> STM Bool - setDelivered s msg = tryPutTMVar (delivered s) $! messageId msg + setDelivered :: Sub -> Message -> RoundedSystemTime -> STM () + setDelivered Sub {delivered} msg !ts = do + let !msgId = messageId msg + writeTVar delivered $ Just (msgId, ts) delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg) delQueueAndMsgs (q, QueueRec {rcvServiceId}) = do liftIO (deleteQueue ms q) >>= \case Right qr -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it - atomically $ do - writeTQueue (subQ subscribers) (CSDeleted entId rcvServiceId, clientId) - -- queue is usually deleted by the same client that is currently subscribed, - -- we delete subscription here, so the client with no subscriptions can be disconnected. - TM.delete entId subscriptions + -- queue is usually deleted by the same client that is currently subscribed, + -- we delete subscription here, so the client with no subscriptions can be disconnected. + sub <- atomically $ TM.lookupDelete entId $ subscriptions clnt + liftIO $ mapM_ cancelSub sub + atomically $ writeTQueue (subQ subscribers) (CSDeleted entId rcvServiceId, clientId) forM_ (notifier qr) $ \NtfCreds {notifierId = nId, ntfServiceId} -> do -- queue is deleted by a different client from the one subscribed to notifications, -- so we don't need to remove subscription from the current client. @@ -2009,7 +2042,7 @@ client getQueueInfo :: StoreQueue s -> QueueRec -> M s BrokerMsg getQueueInfo q QueueRec {senderKey, notifier} = do fmap (either ERR INFO) $ liftIO $ runExceptT $ do - qiSub <- liftIO $ TM.lookupIO entId subscriptions >>= mapM mkQSub + qiSub <- liftIO $ TM.lookupIO entId (subscriptions clnt) >>= mapM mkQSub qiSize <- getQueueSize ms q qiMsg <- toMsgInfo <$$> tryPeekMsg ms q let info = QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg} @@ -2024,14 +2057,20 @@ client SubPending -> QSubPending SubThread _ -> QSubThread ProhibitSub -> pure QProhibitSub - qDelivered <- atomically $ decodeLatin1 . encode <$$> tryReadTMVar delivered + qDelivered <- decodeLatin1 . encode . fst <$$> readTVarIO delivered pure QSub {qSubThread, qDelivered} ok :: Transmission BrokerMsg ok = (corrId, entId, OK) + {-# INLINE ok #-} err :: ErrorType -> Transmission BrokerMsg err e = (corrId, entId, ERR e) + {-# INLINE err #-} + + response :: Transmission BrokerMsg -> Maybe ResponseAndMessage + response = Just . (,Nothing) + {-# INLINE response #-} updateDeletedStats :: QueueRec -> M s () updateDeletedStats q = do @@ -2045,18 +2084,6 @@ incStat :: MonadIO m => IORef Int -> m () incStat r = liftIO $ atomicModifyIORef'_ r (+ 1) {-# INLINE incStat #-} -timed :: MonadIO m => T.Text -> RecipientId -> m a -> m a -timed name (EntityId qId) a = do - t <- liftIO getSystemTime - r <- a - t' <- liftIO getSystemTime - let int = diff t t' - when (int > sec) . logDebug $ T.unwords [name, tshow $ encode qId, tshow int] - pure r - where - diff t t' = (systemSeconds t' - systemSeconds t) * sec + fromIntegral (systemNanoseconds t' - systemNanoseconds t) - sec = 1000_000000 - randomId' :: Int -> M s ByteString randomId' n = atomically . C.randomBytes n =<< asks random diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 627c6079a..fd38d3d73 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -40,6 +40,7 @@ module Simplex.Messaging.Server.Env.STM MsgStore (..), AStoreType (..), VerifiedTransmission, + ResponseAndMessage, newEnv, mkJournalStoreConfig, msgStore, @@ -377,7 +378,7 @@ sameClient c cv = maybe False (sameClientId c) <$> readTVar cv data ClientSub = CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- includes previous and new associated service IDs | CSDeleted QueueId (Maybe ServiceId) -- includes previously associated service IDs - | CSService ServiceId -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB + | CSService ServiceId Int64 -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB newtype ProxyAgent = ProxyAgent { smpAgent :: SMPClientAgent 'Sender @@ -392,7 +393,7 @@ data Client s = Client serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)), - sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)), + sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]), msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)), procThreads :: TVar Int, endThreads :: TVar (IntMap (Weak ThreadId)), @@ -406,13 +407,15 @@ data Client s = Client type VerifiedTransmission s = (Maybe (StoreQueue s, QueueRec), Transmission Cmd) +type ResponseAndMessage = (Transmission BrokerMsg, Maybe (Transmission BrokerMsg)) + data ServerSub = ServerSub (TVar SubscriptionThread) | ProhibitSub data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) data Sub = Sub { subThread :: ServerSub, -- Nothing value indicates that sub - delivered :: TMVar MsgId + delivered :: TVar (Maybe (MsgId, RoundedSystemTime)) } newServer :: IO (Server s) @@ -491,13 +494,13 @@ newClient clientId qSize clientTHParams createdAt = do newSubscription :: SubscriptionThread -> STM Sub newSubscription st = do - delivered <- newEmptyTMVar + delivered <- newTVar Nothing subThread <- ServerSub <$> newTVar st return Sub {subThread, delivered} newProhibitedSub :: STM Sub newProhibitedSub = do - delivered <- newEmptyTMVar + delivered <- newTVar Nothing return Sub {subThread = ProhibitSub, delivered} newEnv :: ServerConfig s -> IO (Env s) diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 59357f0af..78f9c1393 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -356,8 +356,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where {-# INLINE setQueueService #-} getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s)) {-# INLINE getQueueNtfServices #-} - getNtfServiceQueueCount = withQS (getNtfServiceQueueCount @(JournalQueue s)) - {-# INLINE getNtfServiceQueueCount #-} + getServiceQueueCount = withQS (getServiceQueueCount @(JournalQueue s)) + {-# INLINE getServiceQueueCount #-} makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s) makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index 5d9dcc786..a4f721cad 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -6,6 +6,8 @@ module Simplex.Messaging.Server.Prometheus where import Data.Int (Int64) +import qualified Data.IntMap as IM +import Data.List (mapAccumL) import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock (UTCTime (..), diffUTCTime) @@ -35,11 +37,18 @@ data RealTimeMetrics = RealTimeMetrics threadsCount :: Int, clientsCount :: Int, deliveredSubs :: RTSubscriberMetrics, + deliveredTimes :: TimeAggregations, smpSubs :: RTSubscriberMetrics, ntfSubs :: RTSubscriberMetrics, loadedCounts :: LoadedQueueCounts } +data TimeAggregations = TimeAggregations + { sumTime :: Int64, + maxTime :: Int64, + timeBuckets :: IM.IntMap Int + } + data RTSubscriberMetrics = RTSubscriberMetrics { subsCount :: Int, subClientsCount :: Int, @@ -57,6 +66,7 @@ prometheusMetrics sm rtm ts = threadsCount, clientsCount, deliveredSubs, + deliveredTimes, smpSubs, ntfSubs, loadedCounts @@ -436,6 +446,17 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_delivered_clients_total gauge\n\ \simplex_smp_delivered_clients_total " <> mshow (subClientsCount deliveredSubs) <> "\n# delivered.subClientsCount\n\ \\n\ + \# HELP simplex_smp_delivery_ack_time Times to confirm message delivery\n\ + \# TYPE simplex_smp_delivery_ack_time histogram\n\ + \simplex_smp_delivery_ack_time_sum " <> mshow (sumTime deliveredTimes) <> "\n# delivered.sumTime\n\ + \simplex_smp_delivery_ack_time_count " <> mshow (subsCount deliveredSubs) <> "\n# delivered.subsCount\n" + <> showTimeBuckets (timeBuckets deliveredTimes) + <> showTimeBucket "+Inf" (subsCount deliveredSubs) + <> "\n\ + \# HELP simplex_smp_delivery_ack_time_max Max time to confirm message delivery\n\ + \# TYPE simplex_smp_delivery_ack_time_max gauge\n\ + \simplex_smp_delivery_ack_time_max " <> mshow (maxTime deliveredTimes) <> "\n# delivered.maxTime\n\ + \\n\ \# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\ \# TYPE simplex_smp_subscribtion_total gauge\n\ \simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\ @@ -480,6 +501,10 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_loaded_queues_ntf_lock_count gauge\n\ \simplex_smp_loaded_queues_ntf_lock_count " <> mshow (notifierLockCount loadedCounts) <> "\n# loadedCounts.notifierLockCount\n" + showTimeBuckets :: IM.IntMap Int -> Text + showTimeBuckets = T.concat . snd . mapAccumL (\total (sec, cnt) -> (total + cnt, showTimeBucket (tshow sec) (total + cnt))) 0 . IM.assocs + showTimeBucket :: Text -> Int -> Text + showTimeBucket sec count = "simplex_smp_delivery_ack_time_bucket{le=\"" <> sec <> "\"} " <> mshow count <> "\n# delivered.timeBuckets\n" socketsMetric :: (SocketStats -> Int) -> Text -> Text -> Text socketsMetric sel metric descr = "# HELP " <> metric <> " " <> descr <> "\n" diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index cbac2bf08..9395f5bac 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -127,3 +127,6 @@ getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` pr getSystemDate :: IO RoundedSystemTime getSystemDate = getRoundedSystemTime 86400 + +getSystemSeconds :: IO RoundedSystemTime +getSystemSeconds = RoundedSystemTime . systemSeconds <$> getSystemTime diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index b159207de..7258d9148 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -43,14 +43,13 @@ import qualified Data.ByteString.Builder as BB import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Lazy as LB import Data.Bitraversable (bimapM) -import Data.Either (fromRight, lefts, rights) +import Data.Either (fromRight, lefts) import Data.Functor (($>)) import Data.Int (Int64) import Data.List (foldl', intersperse, partition) import Data.List.NonEmpty (NonEmpty) -import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes, fromMaybe, mapMaybe) +import Data.Maybe (catMaybes, fromMaybe) import qualified Data.Set as S import Data.Text (Text) import Data.Time.Clock.System (SystemTime (..), getSystemTime) @@ -64,7 +63,7 @@ import Database.PostgreSQL.Simple.ToField (Action (..), ToField (..)) import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation) import Database.PostgreSQL.Simple.SqlQQ (sql) import GHC.IO (catchAny) -import Simplex.Messaging.Agent.Client (withLockMap, withLocksMap) +import Simplex.Messaging.Agent.Client (withLockMap) import Simplex.Messaging.Agent.Lock (Lock) import Simplex.Messaging.Agent.Store.AgentStore () import Simplex.Messaging.Agent.Store.Postgres (createDBStore, closeDBStore) @@ -83,7 +82,7 @@ import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPServiceRole (..)) -import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>), ($>>=)) +import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>)) import System.Exit (exitFailure) import System.IO (IOMode (..), hFlush, stdout) import UnliftIO.STM @@ -486,11 +485,15 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs' in ((serviceId, sNtfs) : ssNtfs, restNtfs) - getNtfServiceQueueCount :: PostgresQueueStore q -> ServiceId -> IO (Either ErrorType Int64) - getNtfServiceQueueCount st serviceId = - E.uninterruptibleMask_ $ runExceptT $ withDB' "getNtfServiceQueueCount" st $ \db -> + getServiceQueueCount :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64) + getServiceQueueCount st party serviceId = + E.uninterruptibleMask_ $ runExceptT $ withDB' "getServiceQueueCount" st $ \db -> fmap (fromMaybe 0) $ maybeFirstRow fromOnly $ - DB.query db "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL" (Only serviceId) + DB.query db query (Only serviceId) + where + query = case party of + SRecipientService -> "SELECT count(1) FROM msg_queues WHERE rcv_service_id = ? AND deleted_at IS NULL" + SNotifierService -> "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL" batchInsertServices :: [STMService] -> PostgresQueueStore q -> IO Int64 batchInsertServices services' toStore = diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 5c16825d2..4dd6240a8 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -346,10 +346,15 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs' pure ((Just serviceId, sNtfs) : ssNtfs, restNtfs) - getNtfServiceQueueCount :: STMQueueStore q -> ServiceId -> IO (Either ErrorType Int64) - getNtfServiceQueueCount st serviceId = + getServiceQueueCount :: (PartyI p, ServiceParty p) => STMQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64) + getServiceQueueCount st party serviceId = TM.lookupIO serviceId (services st) >>= - maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceNtfQueues) + maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceSel) + where + serviceSel :: STMService -> TVar (Set QueueId) + serviceSel = case party of + SRecipientService -> serviceRcvQueues + SNotifierService -> serviceNtfQueues withQueueRec :: TVar (Maybe QueueRec) -> (QueueRec -> STM a) -> IO (Either ErrorType a) withQueueRec qr a = atomically $ readQueueRec qr >>= mapM a diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index 104d62267..55be4d21d 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -48,7 +48,7 @@ class StoreQueueClass q => QueueStoreClass q s where getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId) setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ()) getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) - getNtfServiceQueueCount :: s -> ServiceId -> IO (Either ErrorType Int64) + getServiceQueueCount :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType Int64) data EntityCounts = EntityCounts { queueCount :: Int, diff --git a/tests/CoreTests/BatchingTests.hs b/tests/CoreTests/BatchingTests.hs index 9069cfc89..d013c0db4 100644 --- a/tests/CoreTests/BatchingTests.hs +++ b/tests/CoreTests/BatchingTests.hs @@ -47,6 +47,7 @@ batchingTests = do it "should batch with 135 subscriptions per batch" testClientBatchSubscriptions it "should batch with 255 ENDs per batch" testClientBatchENDs it "should batch with 80 NMSGs per batch" testClientBatchNMSGs + it "should batch subscription responses with message" testBatchSubResponses it "should break on message that does not fit" testClientBatchWithMessage it "should break on large message" testClientBatchWithLargeMessage @@ -207,6 +208,20 @@ testClientBatchNMSGs = do (length rs1, length rs2, length rs3) `shouldBe` (40, 80, 80) all lenOk [s1, s2, s3] `shouldBe` True +-- 4 responses are used in Simplex.Messaging.Server / `send` +testBatchSubResponses :: IO () +testBatchSubResponses = do + client <- testClientStub + soks <- replicateM 4 $ randomSOK + msg <- randomMSG + let msgs = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks <> [msg]) + batches = batchTransmissions (thParams client) $ L.fromList msgs + length batches `shouldBe` 1 + soks' <- replicateM 5 $ randomSOK + let msgs' = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks' <> [msg]) + batches' = batchTransmissions (thParams client) $ L.fromList msgs' + length batches' `shouldBe` 2 + testClientBatchWithMessageV6 :: IO () testClientBatchWithMessageV6 = do client <- testClientStubV6 @@ -361,6 +376,22 @@ randomNMSGCmd ts = do Right encNMsgMeta <- pure $ C.cbEncrypt (C.dh' k pk) nonce (smpEncode msgMeta) 128 pure (CorrId "", EntityId nId, NMSG nonce encNMsgMeta) +randomSOK :: IO (Transmission BrokerMsg) +randomSOK = do + g <- C.newRandom + corrId <- atomically $ C.randomBytes 24 g + rId <- atomically $ C.randomBytes 24 g + pure (CorrId corrId, EntityId rId, SOK Nothing) + +randomMSG :: IO (Transmission BrokerMsg) +randomMSG = do + g <- C.newRandom + corrId <- atomically $ C.randomBytes 24 g + rId <- atomically $ C.randomBytes 24 g + msgId <- atomically $ C.randomBytes 24 g + msg <- atomically $ C.randomBytes (maxMessageLength currentClientSMPRelayVersion) g + pure (CorrId corrId, EntityId rId, MSG RcvMessage {msgId, msgBody = EncRcvMsgBody msg}) + randomSENDv6 :: ByteString -> Int -> IO (Either TransportError (Maybe TAuthorizations, ByteString)) randomSENDv6 = randomSEND_ C.SEd25519 minServerSMPRelayVersion diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 354573852..70fdf5983 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -18,6 +18,7 @@ module ServerTests where import Control.Concurrent (ThreadId, killThread, threadDelay) +import Control.Concurrent.Async (concurrently_) import Control.Concurrent.STM import Control.Exception (SomeException, throwIO, try) import Control.Monad @@ -29,6 +30,7 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Hashable (hash) import qualified Data.IntSet as IS +import Data.List.NonEmpty (NonEmpty) import Data.String (IsString (..)) import Data.Type.Equality import qualified Data.X509.Validation as XV @@ -75,6 +77,7 @@ serverTests = do describe "GET command" testGetCommand describe "GET & SUB commands" testGetSubCommands describe "Exceeding queue quota" testExceedQueueQuota + describe "Concurrent sending and delivery" testConcurrentSendDelivery describe "Store log" testWithStoreLog describe "Restore messages" testRestoreMessages describe "Restore messages (old / v2)" testRestoreExpireMessages @@ -111,16 +114,25 @@ sendRecv h@THandle {params} (sgn, corrId, qId, cmd) = do tGet1 h signSendRecv :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg)) -signSendRecv h pk = signSendRecv_ h pk Nothing +signSendRecv h pk t = do + [r] <- signSendRecv_ h pk Nothing t + pure r + +signSendRecv2 :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg), Transmission (Either ErrorType BrokerMsg)) +signSendRecv2 h pk t = do + [r1, r2] <- signSendRecv_ h pk Nothing t + pure (r1, r2) serviceSignSendRecv :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg)) -serviceSignSendRecv h pk = signSendRecv_ h pk . Just +serviceSignSendRecv h pk serviceKey t = do + [r] <- signSendRecv_ h pk (Just serviceKey) t + pure r -signSendRecv_ :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> Maybe C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg)) +signSendRecv_ :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> Maybe C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (NonEmpty (Transmission (Either ErrorType BrokerMsg))) signSendRecv_ h@THandle {params} (C.APrivateAuthKey a pk) serviceKey_ (corrId, qId, cmd) = do let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth params (CorrId corrId, qId, cmd) Right () <- tPut1 h (authorize tForAuth, tToSend) - tGet1 h + liftIO $ tGetClient h where authorize t = (,(`C.sign'` t) <$> serviceKey_) <$> case a of C.SEd25519 -> Just . TASignature . C.ASignature C.SEd25519 $ C.sign' pk t' @@ -365,7 +377,7 @@ testCreateDelete = Resp "bcda" _ ok4 <- signSendRecv rh rKey ("bcda", rId, OFF) (ok4, OK) #== "accepts OFF when suspended" - Resp "cdab" _ (Msg mId2 msg2) <- signSendRecv rh rKey ("cdab", rId, SUB) + (Resp "cdab" _ (SOK Nothing), Resp "" _ (Msg mId2 msg2)) <- signSendRecv2 rh rKey ("cdab", rId, SUB) (dec mId2 msg2, Right "hello") #== "accepts SUB when suspended and delivers the message again (because was not ACKed)" Resp "dabc" _ err5 <- sendRecv rh (sampleSig, "dabc", rId, DEL) @@ -404,7 +416,7 @@ stressTest = Resp "" NoEntity (Ids rId _ _) <- signSendRecv h1 rKey ("", NoEntity, New rPub dhPub) pure rId let subscribeQueues h = forM_ rIds $ \rId -> do - Resp "" rId' OK <- signSendRecv h rKey ("", rId, SUB) + Resp "" rId' (SOK Nothing) <- signSendRecv h rKey ("", rId, SUB) rId' `shouldBe` rId closeConnection $ connection h1 subscribeQueues h2 @@ -497,7 +509,7 @@ testSwitchSub = Resp "abcd" _ (Msg mId2 msg2) <- signSendRecv rh1 rKey ("abcd", rId, ACK mId1) (dec mId2 msg2, Right "test2, no ACK") #== "test message 2 delivered, no ACK" - Resp "bcda" _ (Msg mId2' msg2') <- signSendRecv rh2 rKey ("bcda", rId, SUB) + (Resp "bcda" _ (SOK Nothing), Resp "" _ (Msg mId2' msg2')) <- signSendRecv2 rh2 rKey ("bcda", rId, SUB) (dec mId2' msg2', Right "test2, no ACK") #== "same simplex queue via another TCP connection, tes2 delivered again (no ACK in 1st queue)" Resp "cdab" _ OK <- signSendRecv rh2 rKey ("cdab", rId, ACK mId2') @@ -620,6 +632,27 @@ testExceedQueueQuota = Resp "10" _ OK <- signSendRecv rh rKey ("10", rId, ACK mId4) pure () +testConcurrentSendDelivery :: SpecWith (ASrvTransport, AStoreType) +testConcurrentSendDelivery = + it "should continue delivering messages if message is sent before it is acknowledged" $ \(ATransport t, msType) -> do + g <- C.newRandom + smpTest3 t msType $ \rh sh1 sh2 -> do + (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + (sId, rId, rKey, dhShared) <- createAndSecureQueue rh sPub + let dec = decryptMsgV3 dhShared + sndMsg sh n = do + Resp (CorrId n') _ OK <- signSendRecv sh sKey (n, sId, _SEND ("msg " <> n)) + n' `shouldBe` n + isMsg1or2 mId msg = dec mId msg == Right "msg 1" || dec mId msg == Right "msg 2" `shouldBe` True + replicateM_ 50 $ do + concurrently_ (sndMsg sh1 "1") (sndMsg sh2 "2") + Resp "" _ (Msg mId1 msg1) <- tGet1 rh + isMsg1or2 mId1 msg1 + Resp "3" _ (Msg mId2 msg2) <- signSendRecv rh rKey ("3", rId, ACK mId1) + isMsg1or2 mId2 msg2 + Resp "4" _ OK <- signSendRecv rh rKey ("4", rId, ACK mId2) + pure () + testWithStoreLog :: SpecWith (ASrvTransport, AStoreType) testWithStoreLog = it "should store simplex queues to log and restore them after server restart" $ \(at@(ATransport t), msType) -> do @@ -684,7 +717,7 @@ testWithStoreLog = nId <- readTVarIO notifierId Resp "dabc" _ (SOK Nothing) <- signSendRecv h1 nKey ("dabc", nId, NSUB) Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, _SEND' "hello") - Resp "cdab" _ (Msg mId3 msg3) <- signSendRecv h rKey1 ("cdab", rId1, SUB) + (Resp "cdab" _ (SOK Nothing), Resp "" _ (Msg mId3 msg3)) <- signSendRecv2 h rKey1 ("cdab", rId1, SUB) (decryptMsgV3 dh1 mId3 msg3, Right "hello") #== "delivered from restored queue" Resp "" _ (NMSG _ _) <- tGet1 h1 -- this queue is removed - not restored @@ -769,7 +802,7 @@ testRestoreMessages = Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared let dec = decryptMsgV3 dh - Resp "2" _ (Msg mId2 msg2) <- signSendRecv h rKey ("2", rId, SUB) + (Resp "2" _ (SOK Nothing), Resp "" _ (Msg mId2 msg2)) <- signSendRecv2 h rKey ("2", rId, SUB) (dec mId2 msg2, Right "hello 2") #== "restored message delivered" Resp "3" _ (Msg mId3 msg3) <- signSendRecv h rKey ("3", rId, ACK mId2) (dec mId3 msg3, Right "hello 3") #== "restored message delivered" @@ -786,7 +819,7 @@ testRestoreMessages = Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared let dec = decryptMsgV3 dh - Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, SUB) + (Resp "4" _ (SOK Nothing), Resp "" _ (Msg mId4 msg4)) <- signSendRecv2 h rKey ("4", rId, SUB) (dec mId4 msg4, Right "hello 4") #== "restored message delivered" Resp "5" _ (Msg mId5 msg5) <- signSendRecv h rKey ("5", rId, ACK mId4) (dec mId5 msg5, Right "hello 5") #== "restored message delivered" @@ -1131,7 +1164,7 @@ testMsgExpireOnSend = threadDelay 2500000 Resp "2" _ OK <- signSendRecv sh sKey ("2", sId, _SEND "hello (should NOT expire)") testSMPClient @c $ \rh -> do - Resp "3" _ (Msg mId msg) <- signSendRecv rh rKey ("3", rId, SUB) + (Resp "3" _ (SOK Nothing), Resp "" _ (Msg mId msg)) <- signSendRecv2 rh rKey ("3", rId, SUB) (dec mId msg, Right "hello (should NOT expire)") #== "delivered" 1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case Nothing -> return () @@ -1139,8 +1172,7 @@ testMsgExpireOnSend = testMsgExpireOnInterval :: SpecWith (ASrvTransport, AStoreType) testMsgExpireOnInterval = - -- fails on ubuntu - xit' "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c 'TServer), msType) -> do + it "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c 'TServer), msType) -> do g <- C.newRandom (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g let cfg' = updateCfg (cfgMS msType) $ \cfg_ -> cfg_ {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}, idleQueueInterval = 1} @@ -1151,7 +1183,7 @@ testMsgExpireOnInterval = threadDelay 3000000 testSMPClient @c $ \rh -> do signSendRecv rh rKey ("2", rId, SUB) >>= \case - Resp "2" _ OK -> pure () + Resp "2" _ (SOK Nothing) -> pure () r -> unexpected r 1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case Nothing -> return () @@ -1170,7 +1202,7 @@ testMsgNOTExpireOnInterval = Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello (should NOT expire)") threadDelay 2500000 testSMPClient @c $ \rh -> do - Resp "2" _ (Msg mId msg) <- signSendRecv rh rKey ("2", rId, SUB) + (Resp "2" _ (SOK Nothing), Resp "" _ (Msg mId msg)) <- signSendRecv2 rh rKey ("2", rId, SUB) (dec mId msg, Right "hello (should NOT expire)") #== "delivered" 1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case Nothing -> return ()