Skip to content

Commit 47cc842

Browse files
committed
smp server: refactor subscriptions and delivery
1 parent b47d28a commit 47cc842

File tree

11 files changed

+248
-173
lines changed

11 files changed

+248
-173
lines changed

src/Simplex/Messaging/Client.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,8 +1179,8 @@ okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs
11791179
Left e -> Left e
11801180

11811181
-- | Send SMP command
1182-
sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateAuthKey -> QueueId -> Command p -> ExceptT SMPClientError IO BrokerMsg
1183-
sendSMPCommand c pKey qId cmd = sendProtocolCommand c pKey qId (Cmd sParty cmd)
1182+
sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateAuthKey -> EntityId -> Command p -> ExceptT SMPClientError IO BrokerMsg
1183+
sendSMPCommand c pKey entId cmd = sendProtocolCommand c pKey entId (Cmd sParty cmd)
11841184
{-# INLINE sendSMPCommand #-}
11851185

11861186
type PCTransmission err msg = (Either TransportError SentRawTransmission, Request err msg)

src/Simplex/Messaging/Protocol.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ module Simplex.Messaging.Protocol
114114
BasicAuth (..),
115115
SrvLoc (..),
116116
CorrId (..),
117+
pattern NoCorrId,
117118
EntityId (..),
118119
pattern NoEntity,
119120
QueueId,
@@ -1370,6 +1371,9 @@ newtype CorrId = CorrId {bs :: ByteString}
13701371
deriving (Eq, Ord, Show)
13711372
deriving newtype (Encoding)
13721373

1374+
pattern NoCorrId :: CorrId
1375+
pattern NoCorrId = CorrId ""
1376+
13731377
instance IsString CorrId where
13741378
fromString = CorrId . fromString
13751379
{-# INLINE fromString #-}

src/Simplex/Messaging/Server.hs

Lines changed: 137 additions & 141 deletions
Large diffs are not rendered by default.

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ module Simplex.Messaging.Server.Env.STM
4040
MsgStore (..),
4141
AStoreType (..),
4242
VerifiedTransmission,
43+
ResponseAndMessage,
4344
newEnv,
4445
mkJournalStoreConfig,
4546
msgStore,
@@ -377,7 +378,7 @@ sameClient c cv = maybe False (sameClientId c) <$> readTVar cv
377378
data ClientSub
378379
= CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- includes previous and new associated service IDs
379380
| CSDeleted QueueId (Maybe ServiceId) -- includes previously associated service IDs
380-
| CSService ServiceId -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
381+
| CSService ServiceId Int64 -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
381382

382383
newtype ProxyAgent = ProxyAgent
383384
{ smpAgent :: SMPClientAgent 'Sender
@@ -392,7 +393,7 @@ data Client s = Client
392393
serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
393394
ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
394395
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
395-
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
396+
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]),
396397
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
397398
procThreads :: TVar Int,
398399
endThreads :: TVar (IntMap (Weak ThreadId)),
@@ -406,6 +407,8 @@ data Client s = Client
406407

407408
type VerifiedTransmission s = (Maybe (StoreQueue s, QueueRec), Transmission Cmd)
408409

410+
type ResponseAndMessage = (Transmission BrokerMsg, Maybe (Transmission BrokerMsg))
411+
409412
data ServerSub = ServerSub (TVar SubscriptionThread) | ProhibitSub
410413

411414
data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId)

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,8 +356,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
356356
{-# INLINE setQueueService #-}
357357
getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s))
358358
{-# INLINE getQueueNtfServices #-}
359-
getNtfServiceQueueCount = withQS (getNtfServiceQueueCount @(JournalQueue s))
360-
{-# INLINE getNtfServiceQueueCount #-}
359+
getServiceQueueCount = withQS (getServiceQueueCount @(JournalQueue s))
360+
{-# INLINE getServiceQueueCount #-}
361361

362362
makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
363363
makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -486,11 +486,15 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
486486
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
487487
in ((serviceId, sNtfs) : ssNtfs, restNtfs)
488488

489-
getNtfServiceQueueCount :: PostgresQueueStore q -> ServiceId -> IO (Either ErrorType Int64)
490-
getNtfServiceQueueCount st serviceId =
491-
E.uninterruptibleMask_ $ runExceptT $ withDB' "getNtfServiceQueueCount" st $ \db ->
489+
getServiceQueueCount :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
490+
getServiceQueueCount st party serviceId =
491+
E.uninterruptibleMask_ $ runExceptT $ withDB' "getServiceQueueCount" st $ \db ->
492492
fmap (fromMaybe 0) $ maybeFirstRow fromOnly $
493-
DB.query db "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL" (Only serviceId)
493+
DB.query db query (Only serviceId)
494+
where
495+
query = case party of
496+
SRecipientService -> "SELECT count(1) FROM msg_queues WHERE rcv_service_id = ? AND deleted_at IS NULL"
497+
SNotifierService -> "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL"
494498

495499
batchInsertServices :: [STMService] -> PostgresQueueStore q -> IO Int64
496500
batchInsertServices services' toStore =

src/Simplex/Messaging/Server/QueueStore/STM.hs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,10 +346,15 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
346346
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
347347
pure ((Just serviceId, sNtfs) : ssNtfs, restNtfs)
348348

349-
getNtfServiceQueueCount :: STMQueueStore q -> ServiceId -> IO (Either ErrorType Int64)
350-
getNtfServiceQueueCount st serviceId =
349+
getServiceQueueCount :: (PartyI p, ServiceParty p) => STMQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
350+
getServiceQueueCount st party serviceId =
351351
TM.lookupIO serviceId (services st) >>=
352-
maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceNtfQueues)
352+
maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceSel)
353+
where
354+
serviceSel :: STMService -> TVar (Set QueueId)
355+
serviceSel = case party of
356+
SRecipientService -> serviceRcvQueues
357+
SNotifierService -> serviceNtfQueues
353358

354359
withQueueRec :: TVar (Maybe QueueRec) -> (QueueRec -> STM a) -> IO (Either ErrorType a)
355360
withQueueRec qr a = atomically $ readQueueRec qr >>= mapM a

src/Simplex/Messaging/Server/QueueStore/Types.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class StoreQueueClass q => QueueStoreClass q s where
4848
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
4949
setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
5050
getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
51-
getNtfServiceQueueCount :: s -> ServiceId -> IO (Either ErrorType Int64)
51+
getServiceQueueCount :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
5252

5353
data EntityCounts = EntityCounts
5454
{ queueCount :: Int,

tests/CoreTests/BatchingTests.hs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ batchingTests = do
4747
it "should batch with 135 subscriptions per batch" testClientBatchSubscriptions
4848
it "should batch with 255 ENDs per batch" testClientBatchENDs
4949
it "should batch with 80 NMSGs per batch" testClientBatchNMSGs
50+
it "should batch subscription responses with message" testBatchSubResponses
5051
it "should break on message that does not fit" testClientBatchWithMessage
5152
it "should break on large message" testClientBatchWithLargeMessage
5253

@@ -207,6 +208,20 @@ testClientBatchNMSGs = do
207208
(length rs1, length rs2, length rs3) `shouldBe` (40, 80, 80)
208209
all lenOk [s1, s2, s3] `shouldBe` True
209210

211+
-- 4 responses are used in Simplex.Messaging.Server / `send`
212+
testBatchSubResponses :: IO ()
213+
testBatchSubResponses = do
214+
client <- testClientStub
215+
soks <- replicateM 4 $ randomSOK
216+
msg <- randomMSG
217+
let msgs = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks <> [msg])
218+
batches = batchTransmissions (thParams client) $ L.fromList msgs
219+
length batches `shouldBe` 1
220+
soks' <- replicateM 5 $ randomSOK
221+
let msgs' = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks' <> [msg])
222+
batches' = batchTransmissions (thParams client) $ L.fromList msgs'
223+
length batches' `shouldBe` 2
224+
210225
testClientBatchWithMessageV6 :: IO ()
211226
testClientBatchWithMessageV6 = do
212227
client <- testClientStubV6
@@ -361,6 +376,22 @@ randomNMSGCmd ts = do
361376
Right encNMsgMeta <- pure $ C.cbEncrypt (C.dh' k pk) nonce (smpEncode msgMeta) 128
362377
pure (CorrId "", EntityId nId, NMSG nonce encNMsgMeta)
363378

379+
randomSOK :: IO (Transmission BrokerMsg)
380+
randomSOK = do
381+
g <- C.newRandom
382+
corrId <- atomically $ C.randomBytes 24 g
383+
rId <- atomically $ C.randomBytes 24 g
384+
pure (CorrId corrId, EntityId rId, SOK Nothing)
385+
386+
randomMSG :: IO (Transmission BrokerMsg)
387+
randomMSG = do
388+
g <- C.newRandom
389+
corrId <- atomically $ C.randomBytes 24 g
390+
rId <- atomically $ C.randomBytes 24 g
391+
msgId <- atomically $ C.randomBytes 24 g
392+
msg <- atomically $ C.randomBytes (maxMessageLength currentClientSMPRelayVersion) g
393+
pure (CorrId corrId, EntityId rId, MSG RcvMessage {msgId, msgBody = EncRcvMsgBody msg})
394+
364395
randomSENDv6 :: ByteString -> Int -> IO (Either TransportError (Maybe TAuthorizations, ByteString))
365396
randomSENDv6 = randomSEND_ C.SEd25519 minServerSMPRelayVersion
366397

tests/ServerTests.hs

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
module ServerTests where
1919

2020
import Control.Concurrent (ThreadId, killThread, threadDelay)
21+
import Control.Concurrent.Async (concurrently_)
2122
import Control.Concurrent.STM
2223
import Control.Exception (SomeException, throwIO, try)
2324
import Control.Monad
@@ -29,6 +30,7 @@ import Data.ByteString.Char8 (ByteString)
2930
import qualified Data.ByteString.Char8 as B
3031
import Data.Hashable (hash)
3132
import qualified Data.IntSet as IS
33+
import Data.List.NonEmpty (NonEmpty)
3234
import Data.String (IsString (..))
3335
import Data.Type.Equality
3436
import qualified Data.X509.Validation as XV
@@ -75,6 +77,7 @@ serverTests = do
7577
describe "GET command" testGetCommand
7678
describe "GET & SUB commands" testGetSubCommands
7779
describe "Exceeding queue quota" testExceedQueueQuota
80+
describe "Concurrent sending and delivery" testConcurrentSendDelivery
7881
describe "Store log" testWithStoreLog
7982
describe "Restore messages" testRestoreMessages
8083
describe "Restore messages (old / v2)" testRestoreExpireMessages
@@ -111,16 +114,25 @@ sendRecv h@THandle {params} (sgn, corrId, qId, cmd) = do
111114
tGet1 h
112115

113116
signSendRecv :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg))
114-
signSendRecv h pk = signSendRecv_ h pk Nothing
117+
signSendRecv h pk t = do
118+
[r] <- signSendRecv_ h pk Nothing t
119+
pure r
120+
121+
signSendRecv2 :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg), Transmission (Either ErrorType BrokerMsg))
122+
signSendRecv2 h pk t = do
123+
[r1, r2] <- signSendRecv_ h pk Nothing t
124+
pure (r1, r2)
115125

116126
serviceSignSendRecv :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg))
117-
serviceSignSendRecv h pk = signSendRecv_ h pk . Just
127+
serviceSignSendRecv h pk serviceKey t = do
128+
[r] <- signSendRecv_ h pk (Just serviceKey) t
129+
pure r
118130

119-
signSendRecv_ :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> Maybe C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg))
131+
signSendRecv_ :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> Maybe C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (NonEmpty (Transmission (Either ErrorType BrokerMsg)))
120132
signSendRecv_ h@THandle {params} (C.APrivateAuthKey a pk) serviceKey_ (corrId, qId, cmd) = do
121133
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth params (CorrId corrId, qId, cmd)
122134
Right () <- tPut1 h (authorize tForAuth, tToSend)
123-
tGet1 h
135+
liftIO $ tGetClient h
124136
where
125137
authorize t = (,(`C.sign'` t) <$> serviceKey_) <$> case a of
126138
C.SEd25519 -> Just . TASignature . C.ASignature C.SEd25519 $ C.sign' pk t'
@@ -365,7 +377,7 @@ testCreateDelete =
365377
Resp "bcda" _ ok4 <- signSendRecv rh rKey ("bcda", rId, OFF)
366378
(ok4, OK) #== "accepts OFF when suspended"
367379

368-
Resp "cdab" _ (Msg mId2 msg2) <- signSendRecv rh rKey ("cdab", rId, SUB)
380+
(Resp "cdab" _ (SOK Nothing), Resp "" _ (Msg mId2 msg2)) <- signSendRecv2 rh rKey ("cdab", rId, SUB)
369381
(dec mId2 msg2, Right "hello") #== "accepts SUB when suspended and delivers the message again (because was not ACKed)"
370382

371383
Resp "dabc" _ err5 <- sendRecv rh (sampleSig, "dabc", rId, DEL)
@@ -404,7 +416,7 @@ stressTest =
404416
Resp "" NoEntity (Ids rId _ _) <- signSendRecv h1 rKey ("", NoEntity, New rPub dhPub)
405417
pure rId
406418
let subscribeQueues h = forM_ rIds $ \rId -> do
407-
Resp "" rId' OK <- signSendRecv h rKey ("", rId, SUB)
419+
Resp "" rId' (SOK Nothing) <- signSendRecv h rKey ("", rId, SUB)
408420
rId' `shouldBe` rId
409421
closeConnection $ connection h1
410422
subscribeQueues h2
@@ -497,7 +509,7 @@ testSwitchSub =
497509
Resp "abcd" _ (Msg mId2 msg2) <- signSendRecv rh1 rKey ("abcd", rId, ACK mId1)
498510
(dec mId2 msg2, Right "test2, no ACK") #== "test message 2 delivered, no ACK"
499511

500-
Resp "bcda" _ (Msg mId2' msg2') <- signSendRecv rh2 rKey ("bcda", rId, SUB)
512+
(Resp "bcda" _ (SOK Nothing), Resp "" _ (Msg mId2' msg2')) <- signSendRecv2 rh2 rKey ("bcda", rId, SUB)
501513
(dec mId2' msg2', Right "test2, no ACK") #== "same simplex queue via another TCP connection, tes2 delivered again (no ACK in 1st queue)"
502514
Resp "cdab" _ OK <- signSendRecv rh2 rKey ("cdab", rId, ACK mId2')
503515

@@ -620,6 +632,27 @@ testExceedQueueQuota =
620632
Resp "10" _ OK <- signSendRecv rh rKey ("10", rId, ACK mId4)
621633
pure ()
622634

635+
testConcurrentSendDelivery :: SpecWith (ASrvTransport, AStoreType)
636+
testConcurrentSendDelivery =
637+
it "should continue delivering messages if message is sent before it is acknowledged" $ \(ATransport t, msType) -> do
638+
g <- C.newRandom
639+
smpTest3 t msType $ \rh sh1 sh2 -> do
640+
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
641+
(sId, rId, rKey, dhShared) <- createAndSecureQueue rh sPub
642+
let dec = decryptMsgV3 dhShared
643+
sndMsg sh n = do
644+
Resp (CorrId n') _ OK <- signSendRecv sh sKey (n, sId, _SEND ("msg " <> n))
645+
n' `shouldBe` n
646+
isMsg1or2 mId msg = dec mId msg == Right "msg 1" || dec mId msg == Right "msg 2" `shouldBe` True
647+
replicateM_ 50 $ do
648+
concurrently_ (sndMsg sh1 "1") (sndMsg sh2 "2")
649+
Resp "" _ (Msg mId1 msg1) <- tGet1 rh
650+
isMsg1or2 mId1 msg1
651+
Resp "3" _ (Msg mId2 msg2) <- signSendRecv rh rKey ("3", rId, ACK mId1)
652+
isMsg1or2 mId2 msg2
653+
Resp "4" _ OK <- signSendRecv rh rKey ("4", rId, ACK mId2)
654+
pure ()
655+
623656
testWithStoreLog :: SpecWith (ASrvTransport, AStoreType)
624657
testWithStoreLog =
625658
it "should store simplex queues to log and restore them after server restart" $ \(at@(ATransport t), msType) -> do
@@ -684,7 +717,7 @@ testWithStoreLog =
684717
nId <- readTVarIO notifierId
685718
Resp "dabc" _ (SOK Nothing) <- signSendRecv h1 nKey ("dabc", nId, NSUB)
686719
Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, _SEND' "hello")
687-
Resp "cdab" _ (Msg mId3 msg3) <- signSendRecv h rKey1 ("cdab", rId1, SUB)
720+
(Resp "cdab" _ (SOK Nothing), Resp "" _ (Msg mId3 msg3)) <- signSendRecv2 h rKey1 ("cdab", rId1, SUB)
688721
(decryptMsgV3 dh1 mId3 msg3, Right "hello") #== "delivered from restored queue"
689722
Resp "" _ (NMSG _ _) <- tGet1 h1
690723
-- this queue is removed - not restored
@@ -769,7 +802,7 @@ testRestoreMessages =
769802
Just rKey <- readTVarIO recipientKey
770803
Just dh <- readTVarIO dhShared
771804
let dec = decryptMsgV3 dh
772-
Resp "2" _ (Msg mId2 msg2) <- signSendRecv h rKey ("2", rId, SUB)
805+
(Resp "2" _ (SOK Nothing), Resp "" _ (Msg mId2 msg2)) <- signSendRecv2 h rKey ("2", rId, SUB)
773806
(dec mId2 msg2, Right "hello 2") #== "restored message delivered"
774807
Resp "3" _ (Msg mId3 msg3) <- signSendRecv h rKey ("3", rId, ACK mId2)
775808
(dec mId3 msg3, Right "hello 3") #== "restored message delivered"
@@ -786,7 +819,7 @@ testRestoreMessages =
786819
Just rKey <- readTVarIO recipientKey
787820
Just dh <- readTVarIO dhShared
788821
let dec = decryptMsgV3 dh
789-
Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, SUB)
822+
(Resp "4" _ (SOK Nothing), Resp "" _ (Msg mId4 msg4)) <- signSendRecv2 h rKey ("4", rId, SUB)
790823
(dec mId4 msg4, Right "hello 4") #== "restored message delivered"
791824
Resp "5" _ (Msg mId5 msg5) <- signSendRecv h rKey ("5", rId, ACK mId4)
792825
(dec mId5 msg5, Right "hello 5") #== "restored message delivered"
@@ -1131,16 +1164,15 @@ testMsgExpireOnSend =
11311164
threadDelay 2500000
11321165
Resp "2" _ OK <- signSendRecv sh sKey ("2", sId, _SEND "hello (should NOT expire)")
11331166
testSMPClient @c $ \rh -> do
1134-
Resp "3" _ (Msg mId msg) <- signSendRecv rh rKey ("3", rId, SUB)
1167+
(Resp "3" _ (SOK Nothing), Resp "" _ (Msg mId msg)) <- signSendRecv2 rh rKey ("3", rId, SUB)
11351168
(dec mId msg, Right "hello (should NOT expire)") #== "delivered"
11361169
1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case
11371170
Nothing -> return ()
11381171
Just _ -> error "nothing else should be delivered"
11391172

11401173
testMsgExpireOnInterval :: SpecWith (ASrvTransport, AStoreType)
11411174
testMsgExpireOnInterval =
1142-
-- fails on ubuntu
1143-
xit' "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c 'TServer), msType) -> do
1175+
it "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c 'TServer), msType) -> do
11441176
g <- C.newRandom
11451177
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
11461178
let cfg' = updateCfg (cfgMS msType) $ \cfg_ -> cfg_ {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}, idleQueueInterval = 1}
@@ -1151,7 +1183,7 @@ testMsgExpireOnInterval =
11511183
threadDelay 3000000
11521184
testSMPClient @c $ \rh -> do
11531185
signSendRecv rh rKey ("2", rId, SUB) >>= \case
1154-
Resp "2" _ OK -> pure ()
1186+
Resp "2" _ (SOK Nothing) -> pure ()
11551187
r -> unexpected r
11561188
1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case
11571189
Nothing -> return ()
@@ -1170,7 +1202,7 @@ testMsgNOTExpireOnInterval =
11701202
Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello (should NOT expire)")
11711203
threadDelay 2500000
11721204
testSMPClient @c $ \rh -> do
1173-
Resp "2" _ (Msg mId msg) <- signSendRecv rh rKey ("2", rId, SUB)
1205+
(Resp "2" _ (SOK Nothing), Resp "" _ (Msg mId msg)) <- signSendRecv2 rh rKey ("2", rId, SUB)
11741206
(dec mId msg, Right "hello (should NOT expire)") #== "delivered"
11751207
1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case
11761208
Nothing -> return ()

tests/Test.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,13 @@ main = do
103103
testStoreDBOpts
104104
"src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql"
105105
aroundAll_ (postgressBracket testServerDBConnectInfo) $
106-
describe "SMP server via TLS, postgres+jornal message store" $
106+
fdescribe "SMP server via TLS, postgres+jornal message store" $
107107
before (pure (transport @TLS, ASType SQSPostgres SMSJournal)) serverTests
108108
#endif
109-
describe "SMP server via TLS, jornal message store" $ do
109+
fdescribe "SMP server via TLS, jornal message store" $ do
110110
describe "SMP syntax" $ serverSyntaxTests (transport @TLS)
111111
before (pure (transport @TLS, ASType SQSMemory SMSJournal)) serverTests
112-
describe "SMP server via TLS, memory message store" $
112+
fdescribe "SMP server via TLS, memory message store" $
113113
before (pure (transport @TLS, ASType SQSMemory SMSMemory)) serverTests
114114
-- xdescribe "SMP server via WebSockets" $ do
115115
-- describe "SMP syntax" $ serverSyntaxTests (transport @WS)

0 commit comments

Comments
 (0)