Skip to content

fix putObject never resolves when the input stream gets destroyed o…#1480

Open
yucao2521 wants to merge 1 commit into
minio:masterfrom
yucao2521:fix/put-stream
Open

fix putObject never resolves when the input stream gets destroyed o…#1480
yucao2521 wants to merge 1 commit into
minio:masterfrom
yucao2521:fix/put-stream

Conversation

@yucao2521

@yucao2521 yucao2521 commented Jun 13, 2026

Copy link
Copy Markdown

fixes: #1479

Summary by CodeRabbit

  • Bug Fixes

    • Improved object upload error handling for destroyed/failed input streams, ensuring uploads surface the correct underlying error and clearer “premature close” messaging.
  • Tests

    • Updated and expanded functional and unit tests to validate promise-based rejection behavior and exact error messages for both generic premature close and custom stream errors.
  • Refactor

    • Improved multipart upload flow to handle chunk processing and part completion more reliably, including proper abort behavior on failures.

@coderabbitai

coderabbitai Bot commented Jun 13, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

This PR fixes a critical issue where putObject() with a destroyed input stream would hang indefinitely. The fix introduces stream error detection, replaces .pipe() with stream.pipeline() in multipart uploads for proper error propagation, and adds comprehensive test coverage for stream error scenarios.

Changes

Stream Error Handling for putObject

Layer / File(s) Summary
Stream error detection infrastructure
src/internal/helper.ts
isReadableStream now detects streams via _read function presence, and new getReadableStreamError exports an async helper that returns stream errors by awaiting streamPromise.finished on non-active streams.
putObject stream validation
src/internal/client.ts
Import and use getReadableStreamError to detect input stream errors in putObject, replacing generic type error with actual stream error when present.
Multipart upload coordination with pipeline
src/internal/client.ts
Refactor uploadStream to use streamPromise.pipeline(body, chunkier) for proper error forwarding, add async part-upload loop that iterates chunks and uploads missing parts with MD5 validation, accumulate part ETags, and abort multipart upload on failure.
Test coverage for stream errors
tests/unit/test.js, tests/functional/functional-tests.js
Unit tests updated to expect "Premature close" and add case for custom stream errors; functional tests refactored to async assertions covering immediate destruction and multipart upload destruction scenarios.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

A rabbit hops through streaming trails,
Where promises once broke and failed,
Now pipeline flows with graceful care,
And errors caught beyond compare! 🐰

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly addresses the main bug being fixed: the issue where putObject never resolves when the input stream gets destroyed during upload.
Linked Issues check ✅ Passed The code changes comprehensively address issue #1479 by implementing stream.pipeline() for error handling, adding getReadableStreamError() helper, and adding comprehensive tests for destroyed stream scenarios.
Out of Scope Changes check ✅ Passed All changes are directly aligned with fixing the putObject stream destruction bug; no out-of-scope modifications detected.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/internal/client.ts`:
- Around line 1802-1849: The async uploader awaits the pipeline and the consumer
separately which can deadlock if makeRequestAsyncOmit rejects inside the
consumer (o); change the try block to await both the pipeline and the consumer
together (e.g., await Promise.all([streamPromise.pipeline(body, chunkier), o]))
so the pipeline and the async iterator (o) are observed as one unit and ensure
abortMultipartUpload is still called on any rejection from either side; keep
references to chunkier, o, makeRequestAsyncOmit, streamPromise.pipeline and
abortMultipartUpload when making the change.
- Around line 1808-1840: The bug is that partNumber is incremented before
uploading the current chunk, causing chunk N to be uploaded as part N+1; fix by
keeping the current chunk on the current partNumber: do not increment partNumber
until after you have either skipped a matching oldPart (the existing branch
where you compare oldPart.etag === md5.toString('hex') should increment and
continue) or after a successful upload and pushing to eTags; specifically,
remove the premature partNumber++ that appears immediately before the upload
RequestOption, and instead increment partNumber only after eTags.push({ part:
partNumber, etag }) (and keep the existing increment where you skip matched
oldPart).

In `@tests/unit/test.js`:
- Around line 670-674: The test creates a Readable stream `s` and calls
`s.destroy(new Error('stream error'))` without an 'error' listener which can
cause an unhandled exception; before calling `s.destroy(...)` add an empty error
handler like `s.on('error', () => {})` (i.e., update the test around the
`it('should fail when stream is destroyed with an error', ...)` block so the
`Stream.Readable` instance `s` has `s.on('error', () => {})` attached prior to
calling `client.putObject('bucket', 'object', s)` and `s.destroy(...)`).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: cbae5fd8-c249-4ba8-910e-9df28e9a463a

📥 Commits

Reviewing files that changed from the base of the PR and between f871afd and 9f01f3e.

📒 Files selected for processing (4)
  • src/internal/client.ts
  • src/internal/helper.ts
  • tests/functional/functional-tests.js
  • tests/unit/test.js

Comment thread src/internal/client.ts Outdated
Comment thread src/internal/client.ts
Comment on lines +1808 to +1840
const oldPart = oldParts[partNumber]
if (oldPart) {
if (oldPart.etag === md5.toString('hex')) {
eTags.push({ part: partNumber, etag: oldPart.etag })
partNumber++
continue
}
}

const response = await this.makeRequestAsyncOmit(options, chunk)
partNumber++

let etag = response.headers.etag
if (etag) {
etag = etag.replace(/^"/, '').replace(/"$/, '')
} else {
etag = ''
}
// now start to upload missing part
const options: RequestOption = {
method: 'PUT',
query: qs.stringify({ partNumber, uploadId }),
headers: {
'Content-Length': chunk.length,
'Content-MD5': md5.toString('base64'),
},
bucketName,
objectName,
}

const response = await this.makeRequestAsyncOmit(options, chunk)

eTags.push({ part: partNumber, etag })
let etag = response.headers.etag
if (etag) {
etag = etag.replace(/^"/, '').replace(/"$/, '')
} else {
etag = ''
}

return await this.completeMultipartUpload(bucketName, objectName, uploadId, eTags)
})(),
])
eTags.push({ part: partNumber, etag })

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Keep the current chunk on the current part number.

partNumber is incremented before uploading the current chunk. After the first mismatch, chunk N is sent as part N+1, which can overwrite a later existing part and produce duplicate/out-of-order entries in eTags during resume.

💡 Suggested fix
     const o = (async () => {
       let partNumber = 1

       for await (const chunk of chunkier) {
+        const currentPartNumber = partNumber
         const md5 = crypto.createHash('md5').update(chunk).digest()

-        const oldPart = oldParts[partNumber]
+        const oldPart = oldParts[currentPartNumber]
         if (oldPart) {
           if (oldPart.etag === md5.toString('hex')) {
-            eTags.push({ part: partNumber, etag: oldPart.etag })
+            eTags.push({ part: currentPartNumber, etag: oldPart.etag })
             partNumber++
             continue
           }
         }

-        partNumber--
-
         // now start to upload missing part
         const options: RequestOption = {
           method: 'PUT',
-          query: qs.stringify({ partNumber, uploadId }),
+          query: qs.stringify({ partNumber: currentPartNumber, uploadId }),
           headers: {
             'Content-Length': chunk.length,
             'Content-MD5': md5.toString('base64'),
           },
           bucketName,
           objectName,
         }

         const response = await this.makeRequestAsyncOmit(options, chunk)
@@
-        eTags.push({ part: partNumber, etag })
+        eTags.push({ part: currentPartNumber, etag })
+        partNumber++
       }
     })()
🧰 Tools
🪛 ast-grep (0.43.0)

[warning] 1809-1809: Do not use weak hash functions
Context: md5.toString('hex')
Note: [CWE-328].

(insecure-hash-typescript)


[warning] 1824-1824: Do not use weak hash functions
Context: 'Content-MD5'
Note: [CWE-328].

(insecure-hash-typescript)


[warning] 1824-1824: Do not use weak hash functions
Context: md5.toString('base64')
Note: [CWE-328].

(insecure-hash-typescript)


[warning] 1809-1809: Avoid weak hash algorithm from CryptoJS
Context: md5.toString('hex')
Note: Security best practice.

(crypto-avoid-weak-hash-typescript)


[warning] 1824-1824: Avoid weak hash algorithm from CryptoJS
Context: 'Content-MD5'
Note: Security best practice.

(crypto-avoid-weak-hash-typescript)


[warning] 1824-1824: Avoid weak hash algorithm from CryptoJS
Context: md5.toString('base64')
Note: Security best practice.

(crypto-avoid-weak-hash-typescript)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/internal/client.ts` around lines 1808 - 1840, The bug is that partNumber
is incremented before uploading the current chunk, causing chunk N to be
uploaded as part N+1; fix by keeping the current chunk on the current
partNumber: do not increment partNumber until after you have either skipped a
matching oldPart (the existing branch where you compare oldPart.etag ===
md5.toString('hex') should increment and continue) or after a successful upload
and pushing to eTags; specifically, remove the premature partNumber++ that
appears immediately before the upload RequestOption, and instead increment
partNumber only after eTags.push({ part: partNumber, etag }) (and keep the
existing increment where you skip matched oldPart).

Comment thread tests/unit/test.js

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/internal/client.ts`:
- Around line 1688-1692: The code currently only checks for destroy errors via
getReadableStreamError(stream) but accepts streams that have already been
consumed without error. Since getReadableStreamError() returns undefined for
both "still open" and "finished cleanly" states, you need to add an additional
check after the error probe to reject exhausted streams. After checking if an
error exists from getReadableStreamError(), also verify that the stream is not
already finished or exhausted. If the stream is in a finished/exhausted state
(meaning it was already consumed and returned no error), throw an error to
prevent uploading empty data when the caller provided a positive size. This
ensures that only viable streams proceed past this validation point.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: e4f2da02-ff3a-4313-a8fe-d8fdc0d8fb5a

📥 Commits

Reviewing files that changed from the base of the PR and between 9f01f3e and 55172c4.

📒 Files selected for processing (4)
  • src/internal/client.ts
  • src/internal/helper.ts
  • tests/functional/functional-tests.js
  • tests/unit/test.js
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/unit/test.js
  • tests/functional/functional-tests.js

Comment thread src/internal/client.ts
Comment on lines +1688 to +1692
} else {
const error = await getReadableStreamError(stream)
if (error) {
throw error
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject exhausted streams after probing for a destroy error.

getReadableStreamError() returns undefined for both “still open” and “finished cleanly”. In this branch that means an already-consumed stream is accepted and can be uploaded as empty data on the single-part path, even when the caller provided a positive size.

💡 Suggested fix
     } else {
       const error = await getReadableStreamError(stream)
       if (error) {
         throw error
       }
+      if (!stream.readable) {
+        throw new TypeError('third argument should be of type "stream.Readable" or "Buffer" or "string"')
+      }
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/internal/client.ts` around lines 1688 - 1692, The code currently only
checks for destroy errors via getReadableStreamError(stream) but accepts streams
that have already been consumed without error. Since getReadableStreamError()
returns undefined for both "still open" and "finished cleanly" states, you need
to add an additional check after the error probe to reject exhausted streams.
After checking if an error exists from getReadableStreamError(), also verify
that the stream is not already finished or exhausted. If the stream is in a
finished/exhausted state (meaning it was already consumed and returned no
error), throw an error to prevent uploading empty data when the caller provided
a positive size. This ensures that only viable streams proceed past this
validation point.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

putObject() never resolves or rejects if the input stream gets destroyed on upload

1 participant