diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index d28300a75..677d9d5ed 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -559,9 +559,9 @@ instance MsgStoreClass (JournalMsgStore s) where getMsg msgs hs = chooseReadJournal q' q drainMsgs hs >>= maybe (pure msgs) readMsg where readMsg (rs, h) = do - (msg, len) <- hGetMsgAt h $ bytePos rs + (msg_, len) <- hGetMsgAt h $ bytePos rs updateReadPos q' q drainMsgs len hs - (msg :) <$> run msgs + maybe id (fmap . (:)) msg_ $ run msgs writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do @@ -620,15 +620,20 @@ instance MsgStoreClass (JournalMsgStore s) where getQueueSize_ JournalMsgQueue {state} = StoreIO $ size <$> readTVarIO state tryPeekMsg_ :: JournalQueue s -> JournalMsgQueue s -> StoreIO s (Maybe Message) - tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} = - StoreIO $ (readTVarIO handles $>>= chooseReadJournal q mq True $>>= peekMsg) + tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} = StoreIO go where - peekMsg (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst) + go = readTVarIO handles $>>= \hs -> chooseReadJournal q mq True hs $>>= peekMsg hs + peekMsg hs (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst) where readMsg = do - ml@(msg, _) <- hGetMsgAt h $ bytePos rs - atomically $ writeTVar tipMsg $ Just (Just ml) - pure $ Just msg + (msg_, len) <- hGetMsgAt h $ bytePos rs + case msg_ of + Just msg -> do + atomically $ writeTVar tipMsg $ Just (Just (msg, len)) + pure $ Just msg + Nothing -> do + updateReadPos q mq True len hs + go tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s () tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $ @@ -984,15 +989,18 @@ hAppend h pos s = do IO.hSeek h SeekFromEnd 0 B.hPutStr h s -hGetMsgAt :: Handle -> Int64 -> IO (Message, Int64) +hGetMsgAt :: Handle -> Int64 -> IO (Maybe Message, Int64) hGetMsgAt h pos = do IO.hSeek h AbsoluteSeek $ fromIntegral pos s <- B.hGetLine h - case strDecode s of - Right !msg -> - let !len = fromIntegral (B.length s) + 1 - in pure (msg, len) - Left e -> E.throwIO $ userError $ "hGetMsgAt invalid message: " <> e + let !len = fromIntegral (B.length s) + 1 + msg_ <- case strDecode s of + Right !msg -> pure $ Just msg + Left e -> do + name <- IO.hShow h + logError $ "STORE: hGetMsgAt, " <> T.pack name <> ", invalid message at pos " <> tshow pos <> ": " <> tshow e + pure Nothing + pure (msg_, len) openFile :: FilePath -> IOMode -> IO Handle openFile f mode = do