@@ -684,16 +684,24 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
684
684
let threadsCount = 0
685
685
#endif
686
686
clientsCount <- IM. size <$> getServerClients srv
687
+ deliveredSubs <- getDeliveredMetrics
687
688
smpSubs <- getSubscribersMetrics subscribers
688
689
ntfSubs <- getSubscribersMetrics ntfSubscribers
689
690
loadedCounts <- loadedQueueCounts $ fromMsgStore ms
690
- pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubs, ntfSubs, loadedCounts}
691
+ pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, smpSubs, ntfSubs, loadedCounts}
691
692
where
692
693
getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do
693
694
subsCount <- M. size <$> getSubscribedClients queueSubscribers
694
695
subClientsCount <- IS. size <$> readTVarIO subClients
695
696
subServicesCount <- M. size <$> getSubscribedClients serviceSubscribers
696
697
pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount}
698
+ getDeliveredMetrics = foldM countClnt (RTSubscriberMetrics 0 0 0 ) =<< getServerClients srv
699
+ countClnt metrics Client {subscriptions} = do
700
+ cnt <- foldM countSubs 0 =<< readTVarIO subscriptions
701
+ pure $ if cnt > 0
702
+ then metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1 }
703
+ else metrics
704
+ countSubs ! cnt Sub {delivered} = (\ empty -> if empty then cnt else cnt + 1 ) <$> atomically (isEmptyTMVar delivered)
697
705
698
706
runClient :: Transport c => X. CertificateChain -> C. APrivateSignKey -> TProxy c 'TServer -> c 'TServer -> M s ()
699
707
runClient srvCert srvSignKey tp h = do
0 commit comments