Skip to content
18 changes: 18 additions & 0 deletions src/Socket/messages-recv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@

await delay(5000)

if (!placeholderResendCache.get(messageKey?.id!)) {

Check failure on line 154 in src/Socket/messages-recv.ts

View workflow job for this annotation

GitHub Actions / check-lint

Replace `await·placeholderResendCache.get(messageKey?.id!` with `(await·placeholderResendCache.get(messageKey?.id!)`
logger.debug({ messageKey }, 'message received while resend requested')
return 'RESOLVED'
}
Expand Down Expand Up @@ -183,7 +183,7 @@
return
}

let data: any

Check warning on line 186 in src/Socket/messages-recv.ts

View workflow job for this annotation

GitHub Actions / check-lint

Unexpected any. Specify a different type
try {
data = JSON.parse(mexNode.content.toString())
} catch (error) {
Expand Down Expand Up @@ -279,7 +279,7 @@
case 'update':
const settingsNode = getBinaryNodeChild(child, 'settings')
if (settingsNode) {
const update: Record<string, any> = {}

Check warning on line 282 in src/Socket/messages-recv.ts

View workflow job for this annotation

GitHub Actions / check-lint

Unexpected any. Specify a different type
const nameNode = getBinaryNodeChild(settingsNode, 'name')
if (nameNode?.content) update.name = nameNode.content.toString()

Expand Down Expand Up @@ -1433,6 +1433,11 @@
node: BinaryNode
}

/** Yields control to the event loop to prevent blocking */
const yieldToEventLoop = (): Promise<void> => {
return new Promise(resolve => setImmediate(resolve))
}

const makeOfflineNodeProcessor = () => {
const nodeProcessorMap: Map<MessageType, (node: BinaryNode) => Promise<void>> = new Map([
['message', handleMessage],
Expand All @@ -1443,6 +1448,9 @@
const nodes: OfflineNode[] = []
let isProcessing = false

// Number of nodes to process before yielding to event loop
const BATCH_SIZE = 10

const enqueue = (type: MessageType, node: BinaryNode) => {
nodes.push({ type, node })

Expand All @@ -1453,6 +1461,8 @@
isProcessing = true

const promise = async () => {
let processedInBatch = 0

while (nodes.length && ws.isOpen) {
const { type, node } = nodes.shift()!

Expand All @@ -1464,6 +1474,14 @@
}

await nodeProcessor(node)
processedInBatch++

// Yield to event loop after processing a batch
// This prevents blocking the event loop for too long when there are many offline nodes
if (processedInBatch >= BATCH_SIZE) {
processedInBatch = 0
await yieldToEventLoop()
}
}

isProcessing = false
Expand Down
Loading