-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Fix/images stream writer error - ERR_STREAM_DESTROYED #1969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fix/images stream writer error - ERR_STREAM_DESTROYED #1969
Conversation
|
Thanks for opening this pull request and contributing to the project! The next step is for the maintainers to review your changes. If everything looks good, it will be approved and merged into the main branch. In the meantime, anyone in the community is encouraged to test this pull request and provide feedback. ✅ How to confirm it worksIf you’ve tested this PR, please comment below with: This helps us speed up the review and merge process. 📦 To test this PR locally:If you encounter any issues or have feedback, feel free to comment as well. |
…R_STREAM_DESTROYED - Add proper stream cleanup with waitForCloseOrFinishStream - Use Promise.allSettled for safe stream closing - Fix ERR_STREAM_DESTROYED errors in media processing - Improve stream lifecycle management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
| if (!creds.me) { | ||
| node = generateRegistrationNode(creds, config) | ||
| logger.info({ node }, 'not logged in, attempting registration...') | ||
| //logger.info({ node }, 'not logged in, attempting registration...') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you commenting this
|
|
||
| closed = true | ||
| logger.info({ trace: error?.stack }, error ? 'connection errored' : 'connection closed') | ||
| //logger.info({ trace: error?.stack }, error ? 'connection errored' : 'connection closed') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
| let didStartBuffer = false | ||
| process.nextTick(() => { | ||
| if (creds.me?.id) { | ||
| // start buffering important events | ||
| // if we're logged in | ||
| ev.buffer() | ||
| didStartBuffer = true | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you remove event buffering?
| if (didStartBuffer) { | ||
| ev.flush() | ||
| logger.trace('flushed events for initial buffer') | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
src/Utils/auth-utils.ts
Outdated
| export function makeCacheableSignalKeyStore( | ||
| store: SignalKeyStore, | ||
| logger?: ILogger, | ||
| prefix?: string, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need a prefix?
src/Utils/auth-utils.ts
Outdated
|
|
||
| function getUniqueId(type: string, id: string) { | ||
| return `${type}.${id}` | ||
| return prefix ? `${prefix}-${type}.${id}` : `${type}.${id}` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
| try { aes.destroy() } catch { } | ||
| try { hmac.destroy() } catch { } | ||
| try { sha256Plain.destroy() } catch { } | ||
| try { sha256Enc.destroy() } catch { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That doesn't look very safe, we should know if any of these are erroring.
| //logger?.info( | ||
| // { | ||
| // histNotification, | ||
| // process, | ||
| // id: message.key.id, | ||
| // isLatest | ||
| // }, | ||
| // 'got history notification' | ||
| //) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
…ets#1969 to latest Baileys Applied PRs: - WhiskeySockets#2067: libsignal wasm - WhiskeySockets#2057: emit setting events - WhiskeySockets#1969: improve retry logic Note: PRs WhiskeySockets#1991, WhiskeySockets#1981, WhiskeySockets#1906, WhiskeySockets#1892 have conflicts with latest Baileys version and were skipped. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
# Conflicts: # src/Socket/messages-recv.ts
Prevent unhandled stream errors on media upload - ERR_STREAM_DESTROYED error
Summary
Under heavy load (100+ concurrent proxy’d connections) media uploads could crash with unhandled stream errors.
When the remote side closed the HTTP response mid-transfer, our readable ended while the file
Writablestill had in-flightfs.writeops. We calleddestroy()on the writable, and those pending writes later completed against a destroyed stream →ERR_STREAM_DESTROYEDsurfaced as an unhandled error.This PR makes streaming robust by:
drain) on every write,finish/closebefore teardown,Context / Symptoms
Happened during bursts of image sends while proxies were flapping.
Representative logs:
Followed by a low-level write error:
Full errores:
"type":"Error","message":"Cannot call write after a stream was destroyed","stack":"Error [ERR_STREAM_DESTROYED]: Cannot call write after a stream was destroyed\n at node:internal/fs/streams:426:23\n at FSReqCallback.wrapper [as oncomplete] (node:fs:824:5)\n at FSReqCallback.callbackTrampoline (node:internal/async_hooks:130:17)","code":"ERR_STREAM_DESTROYED"},"msg":"Cannot call write after a stream was destroyed"Root cause
undiciaborts the fetch when the remote side closes: we correctly hitcatch.fileWriteStream.destroy().fileWriteStream.write()calls are non-blocking: they schedulefs.writevia libuv. Those callbacks can still fire afterdestroy().errorlistener and without awaitingfinish/close, the completion of those writes throws on a destroyed stream → unhandled rejection → process crash.This is a classic race between readable failure and writable pending I/O.
What I changed (high level)
Backpressure-aware writes
async writeChunk(); if.write()returnsfalse, await'drain'before continuing.Reliable stream finalization
waitForCloseOrFinishStream(stream)and await it where we previously did.end()/.destroy().destroy()+ hope withPromise.allSettled([ wait(encFile), wait(origFile) ]).Graceful error path
.destroy()(guarded), then await finish/close on involved streams, then unlink temp files.try/catcharound every step).Hashing & return semantics
finish/close).Why this fixes it
fs.writecompletions now resolve against a stream that we explicitly let finish (or at least reachclose) instead of a just-destroyed instance.Load conditions where it used to fail
Risk & compatibility