diff --git a/.changeset/edge-kv-asset-upload.md b/.changeset/edge-kv-asset-upload.md new file mode 100644 index 0000000000..c4080c7091 --- /dev/null +++ b/.changeset/edge-kv-asset-upload.md @@ -0,0 +1,7 @@ +--- +"wrangler": patch +--- + +Improve asset upload performance with single-file uploads + +Asset uploads now use a more efficient per-file upload path when the platform enables it. This is rolled out server-side and requires no configuration changes. Existing upload behavior is unchanged when the new path is not enabled. diff --git a/packages/deploy-helpers/src/deploy/helpers/assets.ts b/packages/deploy-helpers/src/deploy/helpers/assets.ts index 8224ce2b69..baf8e56ab7 100644 --- a/packages/deploy-helpers/src/deploy/helpers/assets.ts +++ b/packages/deploy-helpers/src/deploy/helpers/assets.ts @@ -1,4 +1,5 @@ import assert from "node:assert"; +import { createReadStream } from "node:fs"; import { readdir, readFile, stat } from "node:fs/promises"; import * as path from "node:path"; import { parseStaticRouting } from "@cloudflare/workers-shared/utils/configuration/parseStaticRouting"; @@ -27,7 +28,7 @@ import prettyBytes from "pretty-bytes"; import { FormData } from "undici"; import { fetchResult, logger } from "../../shared/context"; import { hashFile } from "./hash"; -import { isJwtExpired } from "./jwt"; +import { decodeJwtPayload, isJwtExpired } from "./jwt"; import type { SharedDeployVersionsProps } from "../../shared/types"; import type { AssetConfig, RouterConfig } from "@cloudflare/workers-shared"; import type { @@ -50,6 +51,7 @@ type UploadResponse = { // constants same as Pages for now const BULK_UPLOAD_CONCURRENCY = 3; +const EDGE_KV_UPLOAD_CONCURRENCY = 25; const MAX_UPLOAD_ATTEMPTS = 5; const MAX_UPLOAD_GATEWAY_ERRORS = 5; @@ -140,53 +142,83 @@ export const syncAssets = async ( }); }); - const queue = new PQueue({ concurrency: BULK_UPLOAD_CONCURRENCY }); + const useEdgeKvUpload = isEdgeKvUpload(initializeAssetsResponse.jwt); + const uploadBuckets = useEdgeKvUpload + ? assetBuckets.flat().map((entry) => [entry]) + : assetBuckets; + + const queue = new PQueue({ + concurrency: useEdgeKvUpload + ? getEdgeKvUploadConcurrency(initializeAssetsResponse.jwt) + : BULK_UPLOAD_CONCURRENCY, + }); const queuePromises: Array> = []; - let attempts = 0; const start = Date.now(); let completionJwt = ""; let uploadedAssetsCount = 0; - for (const [bucketIndex, bucket] of assetBuckets.entries()) { - attempts = 0; + for (const [bucketIndex, bucket] of uploadBuckets.entries()) { + let attempts = 0; let gatewayErrors = 0; const doUpload = async (): Promise => { - // Populate the payload only when actually uploading (this is limited to 3 concurrent uploads at 50 MiB per bucket meaning we'd only load in a max of ~150 MiB) - // This is so we don't run out of memory trying to upload the files. - const payload = new FormData(); const uploadedFiles: string[] = []; for (const manifestEntry of bucket) { - const absFilePath = path.join(assetDirectory, manifestEntry[0]); uploadedFiles.push(manifestEntry[0]); - payload.append( - manifestEntry[1].hash, - new File( - [(await readFile(absFilePath)).toString("base64")], - manifestEntry[1].hash, - { - // Most formdata body encoders (incl. undici's) will override with "application/octet-stream" if you use a falsy value here - // Additionally, it appears that undici doesn't support non-standard main types (e.g. "null") - // So, to make it easier for any other clients, we'll just parse "application/null" on the API - // to mean actually null (signal to not send a Content-Type header with the response) - type: getContentType(absFilePath) ?? "application/null", - } - ), - manifestEntry[1].hash - ); } try { - const res = await fetchResult( - complianceConfig, - `/accounts/${accountId}/workers/assets/upload?base64=true`, - { - method: "POST", - headers: { - Authorization: `Bearer ${initializeAssetsResponse.jwt}`, - }, - body: payload, + let res: UploadResponse; + if (useEdgeKvUpload) { + const manifestEntry = bucket[0]; + const absFilePath = path.join(assetDirectory, manifestEntry[0]); + const contentType = getContentType(absFilePath); + res = await fetchResult( + complianceConfig, + `/accounts/${accountId}/workers/assets/upload/${manifestEntry[1].hash}`, + { + method: "POST", + headers: { + Authorization: `Bearer ${initializeAssetsResponse.jwt}`, + "Content-Type": contentType ?? "application/null", + }, + body: createReadStream(absFilePath), + duplex: "half", + } + ); + } else { + // Populate the payload only when actually uploading (this is limited to 3 concurrent uploads at 50 MiB per bucket meaning we'd only load in a max of ~150 MiB) + // This is so we don't run out of memory trying to upload the files. + const payload = new FormData(); + for (const manifestEntry of bucket) { + const absFilePath = path.join(assetDirectory, manifestEntry[0]); + payload.append( + manifestEntry[1].hash, + new File( + [(await readFile(absFilePath)).toString("base64")], + manifestEntry[1].hash, + { + // Most formdata body encoders (incl. undici's) will override with "application/octet-stream" if you use a falsy value here + // Additionally, it appears that undici doesn't support non-standard main types (e.g. "null") + // So, to make it easier for any other clients, we'll just parse "application/null" on the API + // to mean actually null (signal to not send a Content-Type header with the response) + type: getContentType(absFilePath) ?? "application/null", + } + ), + manifestEntry[1].hash + ); } - ); + res = await fetchResult( + complianceConfig, + `/accounts/${accountId}/workers/assets/upload?base64=true`, + { + method: "POST", + headers: { + Authorization: `Bearer ${initializeAssetsResponse.jwt}`, + }, + body: payload, + } + ); + } uploadedAssetsCount += bucket.length; logAssetsUploadStatus( numberFilesToUpload, @@ -225,7 +257,7 @@ export const syncAssets = async ( throw new FatalError( `Upload took too long.\n` + `Asset upload took too long on bucket ${bucketIndex + 1}/${ - initializeAssetsResponse.buckets.length + uploadBuckets.length }. Please try again.\n` + `Assets already uploaded have been saved, so the next attempt will automatically resume from this point.`, { telemetryMessage: "Asset upload took too long" } @@ -274,6 +306,23 @@ export const syncAssets = async ( return completionJwt; }; +function isEdgeKvUpload(jwt: string): boolean { + try { + return decodeJwtPayload(jwt).edge_kv === true; + } catch { + return false; + } +} + +export function getEdgeKvUploadConcurrency(jwt: string): number { + try { + const value = Number(decodeJwtPayload(jwt).edge_kv_upload_concurrency); + return value > 0 ? Math.floor(value) : EDGE_KV_UPLOAD_CONCURRENCY; + } catch { + return EDGE_KV_UPLOAD_CONCURRENCY; + } +} + export const buildAssetManifest = async (dir: string) => { const files = await readdir(dir, { recursive: true }); logReadFilesFromDirectory(dir, files); diff --git a/packages/deploy-helpers/src/deploy/helpers/jwt.ts b/packages/deploy-helpers/src/deploy/helpers/jwt.ts index 93a63391d7..d59c9dac68 100644 --- a/packages/deploy-helpers/src/deploy/helpers/jwt.ts +++ b/packages/deploy-helpers/src/deploy/helpers/jwt.ts @@ -1,3 +1,7 @@ +export const decodeJwtPayload = (token: string) => { + return JSON.parse(Buffer.from(token.split(".")[1], "base64").toString()); +}; + export const isJwtExpired = (token: string): boolean | undefined => { // During testing we don't use valid JWTs, so don't try and parse them. if ( @@ -9,9 +13,7 @@ export const isJwtExpired = (token: string): boolean | undefined => { return false; } try { - const decodedJwt = JSON.parse( - Buffer.from(token.split(".")[1], "base64").toString() - ); + const decodedJwt = decodeJwtPayload(token); const dateNow = new Date().getTime() / 1000; diff --git a/packages/wrangler/src/__tests__/deploy/assets.test.ts b/packages/wrangler/src/__tests__/deploy/assets.test.ts index 529988da52..229b7ac58d 100644 --- a/packages/wrangler/src/__tests__/deploy/assets.test.ts +++ b/packages/wrangler/src/__tests__/deploy/assets.test.ts @@ -1,4 +1,5 @@ import * as fs from "node:fs"; +import { getEdgeKvUploadConcurrency } from "@cloudflare/deploy-helpers"; import { runInTempDir, writeWranglerConfig, @@ -55,6 +56,10 @@ vi.mock("../../autoconfig/run"); vi.mock("../../autoconfig/frameworks/utils/packages"); vi.mock("@cloudflare/cli-shared-helpers/command"); +function createJwt(payload: Record) { + return `header.${Buffer.from(JSON.stringify(payload)).toString("base64")}.signature`; +} + describe("deploy", () => { mockAccountId(); mockApiToken(); @@ -1051,6 +1056,144 @@ describe("deploy", () => { }); }); + for (const fallbackCase of [ + { name: "edge_kv is missing", jwt: createJwt({}) }, + { name: "edge_kv is false", jwt: createJwt({ edge_kv: false }) }, + { name: "the upload token cannot be decoded", jwt: "<>" }, + ]) { + it(`should use legacy base64 bucket upload when ${fallbackCase.name}`, async ({ + expect, + }) => { + const assets = [ + { filePath: "file-1.txt", content: "Content of file-1" }, + ]; + writeAssets(assets); + writeWranglerConfig({ assets: { directory: "assets" } }); + + const mockBuckets = [["0de3dd5df907418e9730fd2bd747bd5e"]]; + await mockAUSRequest([], mockBuckets, fallbackCase.jwt); + const uploadBodies: FormData[] = []; + const uploadContentTypeHeaders: (string | null)[] = []; + const uploadAuthHeaders: (string | null)[] = []; + const uploadUrls: string[] = []; + await mockAssetUploadRequest( + mockBuckets.length, + uploadBodies, + uploadContentTypeHeaders, + uploadAuthHeaders, + uploadUrls + ); + mockSubDomainRequest(); + mockUploadWorkerRequest({ + expectedAssets: { jwt: "<>", config: {} }, + expectedType: "none", + }); + + await runWrangler("deploy"); + + expect(uploadUrls).toHaveLength(1); + expect(new URL(uploadUrls[0]).pathname).toContain( + "/workers/assets/upload" + ); + expect(new URL(uploadUrls[0]).search).toBe("?base64=true"); + }); + } + + it("should upload each asset individually with raw bytes when edge_kv is true", async ({ + expect, + }) => { + const assets = [ + { filePath: "file-1.txt", content: "Content of file-1" }, + { filePath: "foobar.greg", content: "something-binary" }, + ]; + writeAssets(assets); + writeWranglerConfig({ assets: { directory: "assets" } }); + + const file1Hash = "0de3dd5df907418e9730fd2bd747bd5e"; + const unknownHash = "80e40c1f2422528cb2fba3f9389ce315"; + const mockBuckets = [[file1Hash, unknownHash]]; + const edgeKvJwt = createJwt({ edge_kv: true }); + await mockAUSRequest([], mockBuckets, edgeKvJwt); + + const uploadedRequests: { + url: string; + contentType: string | null; + authorization: string | null; + body: ArrayBuffer; + }[] = []; + msw.use( + http.post( + "*/accounts/some-account-id/workers/assets/upload/:hash", + async ({ request }) => { + uploadedRequests.push({ + url: request.url, + contentType: request.headers.get("Content-Type"), + authorization: request.headers.get("Authorization"), + body: await request.arrayBuffer(), + }); + return HttpResponse.json( + { + success: true, + errors: [], + messages: [], + result: + uploadedRequests.length === 2 + ? { jwt: "<>" } + : {}, + }, + { status: uploadedRequests.length === 2 ? 201 : 202 } + ); + } + ) + ); + mockSubDomainRequest(); + mockUploadWorkerRequest({ + expectedAssets: { jwt: "<>", config: {} }, + expectedType: "none", + }); + + await runWrangler("deploy"); + + expect(uploadedRequests).toHaveLength(2); + + // Verify per-hash URLs with no query string + const paths = uploadedRequests.map((r) => new URL(r.url).pathname).sort(); + expect(paths).toEqual([ + expect.stringContaining(`/workers/assets/upload/${file1Hash}`), + expect.stringContaining(`/workers/assets/upload/${unknownHash}`), + ]); + for (const req of uploadedRequests) { + expect(new URL(req.url).search).toBe(""); + } + + // Verify authorization + for (const req of uploadedRequests) { + expect(req.authorization).toBe(`Bearer ${edgeKvJwt}`); + } + + // Verify raw bytes (not base64) for file-1.txt + const file1Req = uploadedRequests.find((r) => r.url.includes(file1Hash)); + if (!file1Req) { + throw new Error("missing upload request for file1Hash"); + } + expect(Buffer.from(file1Req.body).toString("utf-8")).toBe( + "Content of file-1" + ); + expect(file1Req.contentType).toMatch(/text\/plain/); + + // Verify unknown content type gets application/null sentinel + const unknownReq = uploadedRequests.find((r) => + r.url.includes(unknownHash) + ); + if (!unknownReq) { + throw new Error("missing upload request for unknownHash"); + } + expect(Buffer.from(unknownReq.body).toString("utf-8")).toBe( + "something-binary" + ); + expect(unknownReq.contentType).toBe("application/null"); + }); + it("should be able to upload a user worker with ASSETS binding and config", async () => { const assets = [ { filePath: "file-1.txt", content: "Content of file-1" }, @@ -1372,26 +1515,37 @@ describe("deploy", () => { ); }); - it("should retry asset uploads on failure and log a retry message including the attempt count", async ({ - expect, - }) => { - vi.stubEnv("WRANGLER_LOG", "debug"); - - const assets = [{ filePath: "file-1.txt", content: "Content of file-1" }]; - writeAssets(assets); - writeWranglerConfig({ - assets: { directory: "assets" }, - }); + for (const retryCase of [ + { + name: "legacy upload", + jwt: "<>", + urlPattern: "*/accounts/some-account-id/workers/assets/upload", + }, + { + name: "edge KV upload", + jwt: createJwt({ edge_kv: true }), + urlPattern: "*/accounts/some-account-id/workers/assets/upload/:hash", + }, + ]) { + it(`should retry asset uploads on failure for ${retryCase.name}`, async ({ + expect, + }) => { + vi.stubEnv("WRANGLER_LOG", "debug"); + + const assets = [ + { filePath: "file-1.txt", content: "Content of file-1" }, + ]; + writeAssets(assets); + writeWranglerConfig({ + assets: { directory: "assets" }, + }); - const mockBuckets = [["0de3dd5df907418e9730fd2bd747bd5e"]]; - await mockAUSRequest([], mockBuckets, "<>"); + const mockBuckets = [["0de3dd5df907418e9730fd2bd747bd5e"]]; + await mockAUSRequest([], mockBuckets, retryCase.jwt); - // Fail the first upload attempt, succeed on the second. - const uploadAttempts: Request[] = []; - msw.use( - http.post( - "*/accounts/some-account-id/workers/assets/upload", - async ({ request }) => { + const uploadAttempts: Request[] = []; + msw.use( + http.post(retryCase.urlPattern, async ({ request }) => { uploadAttempts.push(request); if (uploadAttempts.length === 1) { return HttpResponse.json( @@ -1413,34 +1567,43 @@ describe("deploy", () => { }, { status: 201 } ); - } - ) - ); - - mockSubDomainRequest(); - mockUploadWorkerRequest({ - expectedAssets: { - jwt: "<>", - config: {}, - }, - expectedType: "none", - }); + }) + ); + + mockSubDomainRequest(); + mockUploadWorkerRequest({ + expectedAssets: { + jwt: "<>", + config: {}, + }, + expectedType: "none", + }); - await runWrangler("deploy"); + await runWrangler("deploy"); - // The upload endpoint was hit twice: once failing, once succeeding. - expect(uploadAttempts.length).toBe(2); + expect(uploadAttempts.length).toBe(2); + expect(std.info).toContain( + "Asset upload failed. Retrying... 1 of 5 attempts." + ); + expect(std.info).not.toContain("upload-boom-from-test"); + expect(std.debug).toContain("upload-boom-from-test"); + }); + } + }); +}); - // The new info message includes the attempt count. - expect(std.info).toContain( - "Asset upload failed. Retrying... 1 of 5 attempts." - ); +describe("getEdgeKvUploadConcurrency", () => { + it("returns value from JWT claim", ({ expect }) => { + const jwt = createJwt({ edge_kv_upload_concurrency: 42 }); + expect(getEdgeKvUploadConcurrency(jwt)).toBe(42); + }); - // The error details no longer leak into the user-facing info log. - expect(std.info).not.toContain("upload-boom-from-test"); + it("returns default when claim is missing", ({ expect }) => { + const jwt = createJwt({}); + expect(getEdgeKvUploadConcurrency(jwt)).toBe(25); + }); - // The error is now logged at debug level instead. - expect(std.debug).toContain("upload-boom-from-test"); - }); + it("returns default when JWT is undecodable", ({ expect }) => { + expect(getEdgeKvUploadConcurrency("garbage")).toBe(25); }); }); diff --git a/packages/wrangler/src/__tests__/deploy/helpers.ts b/packages/wrangler/src/__tests__/deploy/helpers.ts index 0595854feb..e58b95eee4 100644 --- a/packages/wrangler/src/__tests__/deploy/helpers.ts +++ b/packages/wrangler/src/__tests__/deploy/helpers.ts @@ -717,12 +717,14 @@ export const mockAssetUploadRequest = async ( numberOfBuckets: number, bodies: FormData[], uploadContentTypeHeaders: (string | null)[], - uploadAuthHeaders: (string | null)[] + uploadAuthHeaders: (string | null)[], + uploadUrls?: string[] ) => { msw.use( http.post( "*/accounts/some-account-id/workers/assets/upload", async ({ request }) => { + uploadUrls?.push(request.url); uploadContentTypeHeaders.push(request.headers.get("Content-Type")); uploadAuthHeaders.push(request.headers.get("Authorization")); const formData = await request.formData(); diff --git a/packages/wrangler/src/pages/upload.ts b/packages/wrangler/src/pages/upload.ts index 553c8b99f5..ff6bc006cf 100644 --- a/packages/wrangler/src/pages/upload.ts +++ b/packages/wrangler/src/pages/upload.ts @@ -1,7 +1,7 @@ import { mkdir, readFile, writeFile } from "node:fs/promises"; import { dirname } from "node:path"; import { spinner } from "@cloudflare/cli-shared-helpers/interactive"; -import { isJwtExpired } from "@cloudflare/deploy-helpers"; +import { decodeJwtPayload, isJwtExpired } from "@cloudflare/deploy-helpers"; import { APIError, COMPLIANCE_REGION_CONFIG_PUBLIC, @@ -28,7 +28,7 @@ import { validate } from "./validate"; import type { UploadPayloadFile } from "./types"; import type { FileContainer } from "./validate"; -export { isJwtExpired } from "@cloudflare/deploy-helpers"; +export { decodeJwtPayload, isJwtExpired } from "@cloudflare/deploy-helpers"; export const pagesProjectUploadCommand = createCommand({ metadata: { @@ -408,9 +408,7 @@ export const maxFileCountAllowedFromClaims = (token: string): number => { // Not validating the JWT here, which ordinarily would be a big red flag. // However, if the JWT is invalid, no uploads (calls to /pages/assets/upload) // will succeed. - const decodedJwt = JSON.parse( - Buffer.from(token.split(".")[1], "base64").toString() - ); + const decodedJwt = decodeJwtPayload(token); const maxFileCountAllowed = decodedJwt["max_file_count_allowed"]; if (typeof maxFileCountAllowed == "number") {