-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat: Optimize Offline Node Processing with Batching and Event Loop Yielding #2138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat: Optimize Offline Node Processing with Batching and Event Loop Yielding #2138
Conversation
…e IDs" This reverts commit c03f9d8.
|
Thanks for opening this pull request and contributing to the project! The next step is for the maintainers to review your changes. If everything looks good, it will be approved and merged into the main branch. In the meantime, anyone in the community is encouraged to test this pull request and provide feedback. ✅ How to confirm it worksIf you’ve tested this PR, please comment below with: This helps us speed up the review and merge process. 📦 To test this PR locally:If you encounter any issues or have feedback, feel free to comment as well. |
|
@jlucaso1 @purpshell please review it and let me know what you think |
|
To make it easier, i created this mocked test example. Notice that the second example will execute other tasks while the first one finished EVERYTHING before finally releasing the event loop. const yieldToEventLoop = () => {
return new Promise((resolve) => setImmediate(resolve));
};
const BATCH_SIZE = 10;
function makeOfflineNodeProcessorOld() {
const nodes = [];
let isProcessing = false;
let processingPromise = null;
const enqueue = (type, node) => {
nodes.push({ type, node });
if (isProcessing) {
return;
}
isProcessing = true;
processingPromise = (async () => {
while (nodes.length) {
const { node } = nodes.shift();
if (node.id % 100 === 0) {
console.log(` Processed node ${node.id}...`);
}
await fakeNodeProcessor(node);
}
isProcessing = false;
})();
processingPromise.catch((error) =>
console.error("Unexpected error processing offline nodes:", error)
);
};
const waitForCompletion = () => processingPromise || Promise.resolve();
return { enqueue, waitForCompletion };
}
const fakeNodeProcessor = async (node) => {
return new Promise((resolve) => {
let soma = 0;
for (let j = 0; j < 100000; j++) {
soma += Math.sqrt(j);
}
resolve();
});
};
function makeOfflineNodeProcessorNew() {
const nodes = [];
let isProcessing = false;
let processingPromise = null;
const enqueue = (type, node) => {
nodes.push({ type, node });
if (isProcessing) {
return;
}
isProcessing = true;
processingPromise = (async () => {
let processedInBatch = 0;
while (nodes.length) {
const { node } = nodes.shift();
if (node.id % 100 === 0) {
console.log(` Processed node ${node.id}...`);
}
await fakeNodeProcessor(node);
processedInBatch += 1;
if (processedInBatch >= BATCH_SIZE) {
processedInBatch = 0;
await yieldToEventLoop();
}
}
isProcessing = false;
})();
processingPromise.catch((error) =>
console.error("Unexpected error processing offline nodes:", error)
);
};
const waitForCompletion = () => processingPromise || Promise.resolve();
return { enqueue, waitForCompletion };
}
function runParallelTasks() {
let timerCount = 0;
const timer = setInterval(() => {
timerCount++;
console.log(` ⏰ Timer ${timerCount} executed! (should be every 500ms)`);
}, 500);
fetch("https://example.com").then(() => {
console.log(" 🌐 Fetched from network! (I/O operation)");
});
// Timeout that should execute after 2 seconds
setTimeout(() => {
console.log(" ⏰ Timeout of 2s executed!");
}, 2000);
// Simulate I/O (file reading)
const fs = require("fs");
fs.readFile(__filename, "utf8", (err, data) => {
console.log(" 📁 File read! (I/O operation)");
});
return timer;
}
async function processWithImplementation(useOldOrNew = "new") {
const offlineNodeProcessor = useOldOrNew === "new" ? makeOfflineNodeProcessorNew() : makeOfflineNodeProcessorOld();
offlineNodeProcessor.enqueue("type", { id: 1 });
for (let i = 1; i < 10000; i++) {
offlineNodeProcessor.enqueue("type", { id: i });
}
await offlineNodeProcessor.waitForCompletion();
}
async function menu() {
console.log("╔════════════════════════════════════════════╗");
console.log("║ BAILEYS OFFLINE NODE PROCESSOR ║");
console.log("║ EVENT LOOP TEST NODE.JS ║");
console.log("╚════════════════════════════════════════════╝");
console.log("Choose an option:");
console.log("1 - Old implementation (will freeze)");
console.log("2 - Implementation with breaks (won't freeze)");
const readline = require("readline").createInterface({
input: process.stdin,
output: process.stdout,
});
readline.question("\nType 1 or 2: ", async (resposta) => {
readline.close();
if (resposta === "1") {
const timer = runParallelTasks();
await processWithImplementation("old");
clearInterval(timer);
} else if (resposta === "2") {
const timer = runParallelTasks();
await processWithImplementation();
clearInterval(timer);
} else {
console.log("Invalid option!");
}
console.log("\n✅ Program finished!");
// process.exit(0);
});
}
menu();
|
|
Thank you for your work. 🫡👍 I think they're just going to ask you to fix the lint. |
This pull request optimizes the processing of offline message nodes in the
makeMessagesRecvSocketfunction by introducing batching and yielding to the event loop. This prevents the event loop from being blocked when processing a large number of nodes, improving responsiveness and stability.Event loop optimization:
yieldToEventLoophelper function that yields control to the event loop usingsetImmediate, preventing long blocks during node processing.BATCH_SIZEconstant to process nodes in batches and yield after every batch, reducing the risk of blocking the event loop when many offline nodes are present. [1] [2] [3]