Skip to content

smp server: skip invalid message(s) when reading from journal #1522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,9 @@ instance MsgStoreClass (JournalMsgStore s) where
getMsg msgs hs = chooseReadJournal q' q drainMsgs hs >>= maybe (pure msgs) readMsg
where
readMsg (rs, h) = do
(msg, len) <- hGetMsgAt h $ bytePos rs
(msg_, len) <- hGetMsgAt h $ bytePos rs
updateReadPos q' q drainMsgs len hs
(msg :) <$> run msgs
maybe id (fmap . (:)) msg_ $ run msgs

writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do
Expand Down Expand Up @@ -620,15 +620,20 @@ instance MsgStoreClass (JournalMsgStore s) where
getQueueSize_ JournalMsgQueue {state} = StoreIO $ size <$> readTVarIO state

tryPeekMsg_ :: JournalQueue s -> JournalMsgQueue s -> StoreIO s (Maybe Message)
tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} =
StoreIO $ (readTVarIO handles $>>= chooseReadJournal q mq True $>>= peekMsg)
tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} = StoreIO go
where
peekMsg (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst)
go = readTVarIO handles $>>= \hs -> chooseReadJournal q mq True hs $>>= peekMsg hs
peekMsg hs (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst)
where
readMsg = do
ml@(msg, _) <- hGetMsgAt h $ bytePos rs
atomically $ writeTVar tipMsg $ Just (Just ml)
pure $ Just msg
(msg_, len) <- hGetMsgAt h $ bytePos rs
case msg_ of
Just msg -> do
atomically $ writeTVar tipMsg $ Just (Just (msg, len))
pure $ Just msg
Nothing -> do
updateReadPos q mq True len hs
go

tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s ()
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
Expand Down Expand Up @@ -984,15 +989,18 @@ hAppend h pos s = do
IO.hSeek h SeekFromEnd 0
B.hPutStr h s

hGetMsgAt :: Handle -> Int64 -> IO (Message, Int64)
hGetMsgAt :: Handle -> Int64 -> IO (Maybe Message, Int64)
hGetMsgAt h pos = do
IO.hSeek h AbsoluteSeek $ fromIntegral pos
s <- B.hGetLine h
case strDecode s of
Right !msg ->
let !len = fromIntegral (B.length s) + 1
in pure (msg, len)
Left e -> E.throwIO $ userError $ "hGetMsgAt invalid message: " <> e
let !len = fromIntegral (B.length s) + 1
msg_ <- case strDecode s of
Right !msg -> pure $ Just msg
Left e -> do
name <- IO.hShow h
logError $ "STORE: hGetMsgAt, " <> T.pack name <> ", invalid message at pos " <> tshow pos <> ": " <> tshow e
pure Nothing
pure (msg_, len)

openFile :: FilePath -> IOMode -> IO Handle
openFile f mode = do
Expand Down
Loading