Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/edge-kv-asset-upload.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"wrangler": patch
Comment thread
jbwcloudflare marked this conversation as resolved.
---

Improve asset upload performance with single-file uploads

Asset uploads now use a more efficient per-file upload path when the platform enables it. This is rolled out server-side and requires no configuration changes. Existing upload behavior is unchanged when the new path is not enabled.
119 changes: 84 additions & 35 deletions packages/deploy-helpers/src/deploy/helpers/assets.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import assert from "node:assert";
import { createReadStream } from "node:fs";
import { readdir, readFile, stat } from "node:fs/promises";
import * as path from "node:path";
import { parseStaticRouting } from "@cloudflare/workers-shared/utils/configuration/parseStaticRouting";
Expand Down Expand Up @@ -27,7 +28,7 @@ import prettyBytes from "pretty-bytes";
import { FormData } from "undici";
import { fetchResult, logger } from "../../shared/context";
import { hashFile } from "./hash";
import { isJwtExpired } from "./jwt";
import { decodeJwtPayload, isJwtExpired } from "./jwt";
import type { SharedDeployVersionsProps } from "../../shared/types";
import type { AssetConfig, RouterConfig } from "@cloudflare/workers-shared";
import type {
Expand All @@ -50,6 +51,7 @@ type UploadResponse = {

// constants same as Pages for now
const BULK_UPLOAD_CONCURRENCY = 3;
const EDGE_KV_UPLOAD_CONCURRENCY = 25;
const MAX_UPLOAD_ATTEMPTS = 5;
const MAX_UPLOAD_GATEWAY_ERRORS = 5;

Expand Down Expand Up @@ -140,53 +142,83 @@ export const syncAssets = async (
});
});

const queue = new PQueue({ concurrency: BULK_UPLOAD_CONCURRENCY });
const useEdgeKvUpload = isEdgeKvUpload(initializeAssetsResponse.jwt);
const uploadBuckets = useEdgeKvUpload
? assetBuckets.flat().map((entry) => [entry])
: assetBuckets;

const queue = new PQueue({
concurrency: useEdgeKvUpload
? getEdgeKvUploadConcurrency(initializeAssetsResponse.jwt)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need concurrency limits for single-file uploads? i guess probably if there's any level of buffering, and definitely better start there and increase it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i'm following: edgeKvUpload is the single-file mode and this will currently set the concurrency limit to 25 by default (it's actually a setting in the JWT so we can increase it from EWC later if we want)

is that what you meant? or some kind of server side concurrency limit maybe?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mainly meant that we might not need control concurrency for single-file uploads - we do this for batches uploads because we buffer so much and can easily crush the memory used for the upload, but with a true passthrough single-file upload, maybe we could just never need to limit concurrency? but i know we're not there right now so it's a moot point

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see what you mean.

well, the limit now configurable and controllable from EWC.

so, we can iteratively bump it higher and monitor for issues. if we want to remove it entirely later, maybe we can!

: BULK_UPLOAD_CONCURRENCY,
});
const queuePromises: Array<Promise<void>> = [];
let attempts = 0;
const start = Date.now();
let completionJwt = "";
let uploadedAssetsCount = 0;

for (const [bucketIndex, bucket] of assetBuckets.entries()) {
attempts = 0;
for (const [bucketIndex, bucket] of uploadBuckets.entries()) {
let attempts = 0;
let gatewayErrors = 0;
const doUpload = async (): Promise<UploadResponse> => {
// Populate the payload only when actually uploading (this is limited to 3 concurrent uploads at 50 MiB per bucket meaning we'd only load in a max of ~150 MiB)
// This is so we don't run out of memory trying to upload the files.
const payload = new FormData();
const uploadedFiles: string[] = [];
for (const manifestEntry of bucket) {
const absFilePath = path.join(assetDirectory, manifestEntry[0]);
uploadedFiles.push(manifestEntry[0]);
payload.append(
manifestEntry[1].hash,
new File(
[(await readFile(absFilePath)).toString("base64")],
manifestEntry[1].hash,
{
// Most formdata body encoders (incl. undici's) will override with "application/octet-stream" if you use a falsy value here
// Additionally, it appears that undici doesn't support non-standard main types (e.g. "null")
// So, to make it easier for any other clients, we'll just parse "application/null" on the API
// to mean actually null (signal to not send a Content-Type header with the response)
type: getContentType(absFilePath) ?? "application/null",
}
),
manifestEntry[1].hash
);
}

try {
const res = await fetchResult<UploadResponse>(
complianceConfig,
`/accounts/${accountId}/workers/assets/upload?base64=true`,
{
method: "POST",
headers: {
Authorization: `Bearer ${initializeAssetsResponse.jwt}`,
},
body: payload,
let res: UploadResponse;
if (useEdgeKvUpload) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the new codepath: if the gate is on, use the new single-file upload endpoint

const manifestEntry = bucket[0];
const absFilePath = path.join(assetDirectory, manifestEntry[0]);
const contentType = getContentType(absFilePath);
res = await fetchResult<UploadResponse>(
complianceConfig,
`/accounts/${accountId}/workers/assets/upload/${manifestEntry[1].hash}`,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't realize we were using the same endpoint but with different semantics, i kinda figured we'd have a new endpoint? not an issue though

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old bulk one is:
/accounts/${accountId}/workers/assets/upload,

and the new single file-one is:
/accounts/${accountId}/workers/assets/upload/:hash

so they are different, unless i'm misunderstanding the question?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooo i just wasn't reading closely enough haha. my bad

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha no worries!

{
method: "POST",
headers: {
Authorization: `Bearer ${initializeAssetsResponse.jwt}`,
"Content-Type": contentType ?? "application/null",
},
body: createReadStream(absFilePath),
duplex: "half",
}
);
} else {
// Populate the payload only when actually uploading (this is limited to 3 concurrent uploads at 50 MiB per bucket meaning we'd only load in a max of ~150 MiB)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the old codepath: just moved down into the else

// This is so we don't run out of memory trying to upload the files.
const payload = new FormData();
for (const manifestEntry of bucket) {
const absFilePath = path.join(assetDirectory, manifestEntry[0]);
payload.append(
manifestEntry[1].hash,
new File(
[(await readFile(absFilePath)).toString("base64")],
manifestEntry[1].hash,
{
// Most formdata body encoders (incl. undici's) will override with "application/octet-stream" if you use a falsy value here
// Additionally, it appears that undici doesn't support non-standard main types (e.g. "null")
// So, to make it easier for any other clients, we'll just parse "application/null" on the API
// to mean actually null (signal to not send a Content-Type header with the response)
type: getContentType(absFilePath) ?? "application/null",
}
),
manifestEntry[1].hash
);
}
);
res = await fetchResult<UploadResponse>(
complianceConfig,
`/accounts/${accountId}/workers/assets/upload?base64=true`,
{
method: "POST",
headers: {
Authorization: `Bearer ${initializeAssetsResponse.jwt}`,
},
body: payload,
}
);
}
uploadedAssetsCount += bucket.length;
logAssetsUploadStatus(
numberFilesToUpload,
Expand Down Expand Up @@ -225,7 +257,7 @@ export const syncAssets = async (
throw new FatalError(
`Upload took too long.\n` +
`Asset upload took too long on bucket ${bucketIndex + 1}/${
initializeAssetsResponse.buckets.length
uploadBuckets.length
}. Please try again.\n` +
`Assets already uploaded have been saved, so the next attempt will automatically resume from this point.`,
{ telemetryMessage: "Asset upload took too long" }
Expand Down Expand Up @@ -274,6 +306,23 @@ export const syncAssets = async (
return completionJwt;
};

function isEdgeKvUpload(jwt: string): boolean {
try {
return decodeJwtPayload(jwt).edge_kv === true;
} catch {
return false;
}
}

export function getEdgeKvUploadConcurrency(jwt: string): number {
try {
const value = Number(decodeJwtPayload(jwt).edge_kv_upload_concurrency);
return value > 0 ? Math.floor(value) : EDGE_KV_UPLOAD_CONCURRENCY;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 getEdgeKvUploadConcurrency can return 0 for fractional JWT values between 0 and 1, breaking PQueue

When edge_kv_upload_concurrency in the JWT is a fractional value in the range (0, 1) — e.g. 0.5 — the check value > 0 passes but Math.floor(value) produces 0. PQueue requires concurrency ≥ 1; setting it to 0 will cause the queue to either throw or hang indefinitely, preventing any uploads from completing. While the value is server-controlled (making this unlikely in practice), a simple guard like Math.max(1, Math.floor(value)) or checking value >= 1 would make this robust.

Suggested change
return value > 0 ? Math.floor(value) : EDGE_KV_UPLOAD_CONCURRENCY;
return value >= 1 ? Math.floor(value) : EDGE_KV_UPLOAD_CONCURRENCY;
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@jbwcloudflare jbwcloudflare Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fine lol we will not set this to a fractional value between 0 and 1 on the server side

} catch {
return EDGE_KV_UPLOAD_CONCURRENCY;
}
}

export const buildAssetManifest = async (dir: string) => {
const files = await readdir(dir, { recursive: true });
logReadFilesFromDirectory(dir, files);
Expand Down
8 changes: 5 additions & 3 deletions packages/deploy-helpers/src/deploy/helpers/jwt.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
export const decodeJwtPayload = (token: string) => {
return JSON.parse(Buffer.from(token.split(".")[1], "base64").toString());
};

export const isJwtExpired = (token: string): boolean | undefined => {
// During testing we don't use valid JWTs, so don't try and parse them.
if (
Expand All @@ -9,9 +13,7 @@ export const isJwtExpired = (token: string): boolean | undefined => {
return false;
}
try {
const decodedJwt = JSON.parse(
Buffer.from(token.split(".")[1], "base64").toString()
);
const decodedJwt = decodeJwtPayload(token);

const dateNow = new Date().getTime() / 1000;

Expand Down
Loading
Loading