diff --git a/.changeset/disable-sentry-by-default.md b/.changeset/disable-sentry-by-default.md new file mode 100644 index 0000000000..a1946ffcff --- /dev/null +++ b/.changeset/disable-sentry-by-default.md @@ -0,0 +1,7 @@ +--- +"wrangler": patch +--- + +Disable Sentry error reporting by default + +`WRANGLER_SEND_ERROR_REPORTS` now defaults to `false` instead of prompting on every error. The current prompt produces too many false-positive reports. Users can still opt in explicitly by setting `WRANGLER_SEND_ERROR_REPORTS=true`. diff --git a/.changeset/pipeline-stream-rename.md b/.changeset/pipeline-stream-rename.md new file mode 100644 index 0000000000..8f564ed6e5 --- /dev/null +++ b/.changeset/pipeline-stream-rename.md @@ -0,0 +1,37 @@ +--- +"wrangler": minor +"@cloudflare/workers-utils": minor +"miniflare": minor +--- + +Rename `pipeline` field to `stream` in pipeline bindings configuration + +The `pipeline` field inside `pipelines` bindings has been renamed to `stream` to align with the updated API wire format. The old `pipeline` field is still accepted but deprecated and will emit a warning. + +Before: + +```jsonc +// wrangler.json +{ + "pipelines": [ + { + "binding": "MY_PIPELINE", + "pipeline": "my-stream-name", + }, + ], +} +``` + +After: + +```jsonc +// wrangler.json +{ + "pipelines": [ + { + "binding": "MY_PIPELINE", + "stream": "my-stream-name", + }, + ], +} +``` diff --git a/fixtures/vitest-pool-workers-examples/pipelines/wrangler.jsonc b/fixtures/vitest-pool-workers-examples/pipelines/wrangler.jsonc index 56dd6652c4..8eccd5a0a6 100644 --- a/fixtures/vitest-pool-workers-examples/pipelines/wrangler.jsonc +++ b/fixtures/vitest-pool-workers-examples/pipelines/wrangler.jsonc @@ -5,7 +5,7 @@ "pipelines": [ { "binding": "PIPELINE", - "pipeline": "my-pipeline", + "stream": "my-pipeline", }, ], } diff --git a/packages/miniflare/src/plugins/pipelines/index.ts b/packages/miniflare/src/plugins/pipelines/index.ts index 088b4ced5c..941b0c8ec0 100644 --- a/packages/miniflare/src/plugins/pipelines/index.ts +++ b/packages/miniflare/src/plugins/pipelines/index.ts @@ -15,6 +15,13 @@ export const PipelineOptionsSchema = z.object({ z.union([ z.string(), z.object({ + stream: z.string(), + remoteProxyConnectionString: z + .custom() + .optional(), + }), + z.object({ + /** @deprecated Use `stream` instead. */ pipeline: z.string(), remoteProxyConnectionString: z .custom() @@ -79,7 +86,9 @@ function bindingEntries( string, | string | { - pipeline: string; + stream?: string; + /** @deprecated Use `stream` instead. */ + pipeline?: string; remoteProxyConnectionString?: RemoteProxyConnectionString; } > @@ -97,7 +106,8 @@ function bindingEntries( ( | string | { - pipeline: string; + stream?: string; + pipeline?: string; remoteProxyConnectionString?: RemoteProxyConnectionString; } ), @@ -107,7 +117,7 @@ function bindingEntries( typeof opts === "string" ? { id: opts } : { - id: opts.pipeline, + id: opts.stream ?? opts.pipeline ?? "", remoteProxyConnectionString: opts.remoteProxyConnectionString, }, ]); diff --git a/packages/workers-utils/src/config/environment.ts b/packages/workers-utils/src/config/environment.ts index 6604be73fe..b8f16229f9 100644 --- a/packages/workers-utils/src/config/environment.ts +++ b/packages/workers-utils/src/config/environment.ts @@ -1346,8 +1346,13 @@ export interface EnvironmentNonInheritable { pipelines: { /** The binding name used to refer to the bound service. */ binding: string; - /** Name of the Pipeline to bind */ - pipeline: string; + /** Id of the Stream to bind */ + stream?: string; + /** + * Id of the Stream to bind + * @deprecated Use `stream` instead. + */ + pipeline?: string; /** Whether the pipeline should be remote or not in local development */ remote?: boolean; }[]; diff --git a/packages/workers-utils/src/config/validation.ts b/packages/workers-utils/src/config/validation.ts index 1709d1ed09..5b1108362b 100644 --- a/packages/workers-utils/src/config/validation.ts +++ b/packages/workers-utils/src/config/validation.ts @@ -4841,7 +4841,7 @@ const validatePipelineBinding: ValidatorFn = (diagnostics, field, value) => { return false; } let isValid = true; - // Pipeline bindings must have a binding and a pipeline. + // Pipeline bindings must have a binding and a stream (or deprecated pipeline). if (!isRequiredProperty(value, "binding", "string")) { diagnostics.errors.push( `"${field}" bindings must have a string "binding" field but got ${JSON.stringify( @@ -4850,9 +4850,21 @@ const validatePipelineBinding: ValidatorFn = (diagnostics, field, value) => { ); isValid = false; } - if (!isRequiredProperty(value, "pipeline", "string")) { + + const hasStream = isOptionalProperty(value, "stream", "string"); + const hasPipeline = isOptionalProperty(value, "pipeline", "string"); + const v = value as Record; + + if (hasStream && v.stream) { + // "stream" is the primary field — use it as-is + } else if (hasPipeline && v.pipeline) { + // Deprecated "pipeline" field — normalize to "stream" + diagnostics.warnings.push( + `The "pipeline" field in "${field}" bindings is deprecated. Use "stream" instead.` + ); + } else { diagnostics.errors.push( - `"${field}" bindings must have a string "pipeline" field but got ${JSON.stringify( + `"${field}" bindings must have a string "stream" field but got ${JSON.stringify( value )}.` ); @@ -4865,6 +4877,7 @@ const validatePipelineBinding: ValidatorFn = (diagnostics, field, value) => { validateAdditionalProperties(diagnostics, field, Object.keys(value), [ "binding", + "stream", "pipeline", "remote", ]); diff --git a/packages/workers-utils/src/environment-variables/misc-variables.ts b/packages/workers-utils/src/environment-variables/misc-variables.ts index 35973b60d7..d3b47c1781 100644 --- a/packages/workers-utils/src/environment-variables/misc-variables.ts +++ b/packages/workers-utils/src/environment-variables/misc-variables.ts @@ -46,11 +46,15 @@ export const getWranglerSendMetricsFromEnv = }); /** - * `WRANGLER_SEND_ERROR_REPORTS` can override whether we attempt to send error reports to Sentry. + * `WRANGLER_SEND_ERROR_REPORTS` controls whether we attempt to send error reports to Sentry. + * + * Defaults to `false` to avoid noisy false-positive reports. Users can opt in + * by setting `WRANGLER_SEND_ERROR_REPORTS=true`. */ export const getWranglerSendErrorReportsFromEnv = getBooleanEnvironmentVariableFactory({ variableName: "WRANGLER_SEND_ERROR_REPORTS", + defaultValue: false, }); /** diff --git a/packages/workers-utils/src/map-worker-metadata-bindings.ts b/packages/workers-utils/src/map-worker-metadata-bindings.ts index b8b2776122..32fa7c3ed5 100644 --- a/packages/workers-utils/src/map-worker-metadata-bindings.ts +++ b/packages/workers-utils/src/map-worker-metadata-bindings.ts @@ -322,7 +322,10 @@ export function mapWorkerMetadataBindings( ...(configObj.pipelines ?? []), { binding: binding.name, - pipeline: binding.pipeline, + // NOTE: stream is the primary field, but we also support pipeline for backward compatibility + ...(binding.stream && { stream: binding.stream }), + + ...(binding.pipeline && { pipeline: binding.pipeline }), }, ]; break; diff --git a/packages/workers-utils/src/types.ts b/packages/workers-utils/src/types.ts index 9e80ac9235..1920170fef 100644 --- a/packages/workers-utils/src/types.ts +++ b/packages/workers-utils/src/types.ts @@ -150,7 +150,7 @@ export type WorkerMetadataBinding = }; } | { type: "mtls_certificate"; name: string; certificate_id: string } - | { type: "pipelines"; name: string; pipeline: string } + | { type: "pipelines"; name: string; stream?: string; pipeline?: string } | { type: "secrets_store_secret"; name: string; diff --git a/packages/workers-utils/src/worker.ts b/packages/workers-utils/src/worker.ts index ea98756dbf..be00c23977 100644 --- a/packages/workers-utils/src/worker.ts +++ b/packages/workers-utils/src/worker.ts @@ -345,7 +345,8 @@ export interface CfAssetsBinding { export interface CfPipeline { binding: string; - pipeline: string; + stream?: string; + pipeline?: string; remote?: boolean; } diff --git a/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts b/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts index ad18169be9..e904cdc2b7 100644 --- a/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts +++ b/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts @@ -4499,7 +4499,28 @@ describe("normalizeAndValidateConfig()", () => { `); }); - it("should accept valid bindings", ({ expect }) => { + it("should accept valid bindings with stream field", ({ expect }) => { + const { diagnostics } = normalizeAndValidateConfig( + { + pipelines: [ + { + binding: "VALID", + stream: "343cd4f1d58c42fbb5bd082592fd7143", + }, + ], + } as unknown as RawConfig, + undefined, + undefined, + { env: undefined } + ); + + expect(diagnostics.hasErrors()).toBe(false); + expect(diagnostics.hasWarnings()).toBe(false); + }); + + it("should accept deprecated pipeline field with warning", ({ + expect, + }) => { const { diagnostics } = normalizeAndValidateConfig( { pipelines: [ @@ -4515,6 +4536,11 @@ describe("normalizeAndValidateConfig()", () => { ); expect(diagnostics.hasErrors()).toBe(false); + expect(diagnostics.hasWarnings()).toBe(true); + expect(diagnostics.renderWarnings()).toMatchInlineSnapshot(` + "Processing wrangler configuration: + - The "pipeline" field in "pipelines[0]" bindings is deprecated. Use "stream" instead." + `); }); it("should error if pipelines.bindings are not valid", ({ expect }) => { @@ -4524,7 +4550,7 @@ describe("normalizeAndValidateConfig()", () => { {}, { binding: "VALID", - pipeline: "343cd4f1d58c42fbb5bd082592fd7143", + stream: "343cd4f1d58c42fbb5bd082592fd7143", }, { binding: 2000, project: 2111 }, ], @@ -4541,9 +4567,9 @@ describe("normalizeAndValidateConfig()", () => { expect(diagnostics.renderErrors()).toMatchInlineSnapshot(` "Processing wrangler configuration: - "pipelines[0]" bindings must have a string "binding" field but got {}. - - "pipelines[0]" bindings must have a string "pipeline" field but got {}. + - "pipelines[0]" bindings must have a string "stream" field but got {}. - "pipelines[2]" bindings must have a string "binding" field but got {"binding":2000,"project":2111}. - - "pipelines[2]" bindings must have a string "pipeline" field but got {"binding":2000,"project":2111}." + - "pipelines[2]" bindings must have a string "stream" field but got {"binding":2000,"project":2111}." `); }); }); diff --git a/packages/wrangler/src/__tests__/create-worker-upload-form/bindings.test.ts b/packages/wrangler/src/__tests__/create-worker-upload-form/bindings.test.ts index 31bc6eb965..3d387557a5 100644 --- a/packages/wrangler/src/__tests__/create-worker-upload-form/bindings.test.ts +++ b/packages/wrangler/src/__tests__/create-worker-upload-form/bindings.test.ts @@ -376,13 +376,13 @@ describe("createWorkerUploadForm — bindings", () => { describe("pipeline bindings", () => { it("should transform type from pipeline to pipelines", ({ expect }) => { const bindings: StartDevWorkerInput["bindings"] = { - MY_PIPELINE: { type: "pipeline", pipeline: "my-pipeline" }, + MY_PIPELINE: { type: "pipeline", stream: "my-pipeline" }, }; const form = createWorkerUploadForm(createEsmWorker(), bindings); expect(getBindings(form)).toContainEqual({ name: "MY_PIPELINE", type: "pipelines", - pipeline: "my-pipeline", + stream: "my-pipeline", }); }); }); diff --git a/packages/wrangler/src/__tests__/deploy/durable-objects.test.ts b/packages/wrangler/src/__tests__/deploy/durable-objects.test.ts index bb77db1c79..935bad9cca 100644 --- a/packages/wrangler/src/__tests__/deploy/durable-objects.test.ts +++ b/packages/wrangler/src/__tests__/deploy/durable-objects.test.ts @@ -1386,7 +1386,7 @@ describe("deploy", () => { pipelines: [ { binding: "MY_PIPELINE", - pipeline: "my-pipeline", + stream: "my-pipeline", }, ], }); @@ -1397,7 +1397,7 @@ describe("deploy", () => { { type: "pipelines", name: "MY_PIPELINE", - pipeline: "my-pipeline", + stream: "my-pipeline", }, ], }); diff --git a/packages/wrangler/src/__tests__/deploy/get-remote-config-diff.test.ts b/packages/wrangler/src/__tests__/deploy/get-remote-config-diff.test.ts index df74e34fd4..8d97a73279 100644 --- a/packages/wrangler/src/__tests__/deploy/get-remote-config-diff.test.ts +++ b/packages/wrangler/src/__tests__/deploy/get-remote-config-diff.test.ts @@ -356,7 +356,7 @@ describe("getRemoteConfigsDiff", () => { pipelines: [ { binding: "MY_PIPELINE", - pipeline: "my-pipeline", + stream: "my-pipeline", }, ], vectorize: [ @@ -482,7 +482,7 @@ describe("getRemoteConfigsDiff", () => { pipelines: [ { binding: "MY_PIPELINE", - pipeline: "my-pipeline", + stream: "my-pipeline", remote: true, }, ], diff --git a/packages/wrangler/src/__tests__/init.test.ts b/packages/wrangler/src/__tests__/init.test.ts index edc96f7c82..c1d215f472 100644 --- a/packages/wrangler/src/__tests__/init.test.ts +++ b/packages/wrangler/src/__tests__/init.test.ts @@ -331,7 +331,7 @@ describe("init", () => { { type: "pipelines", name: "PIPELINE_BINDING", - pipeline: "some-name", + stream: "some-name", }, { type: "mtls_certificate", @@ -564,7 +564,7 @@ describe("init", () => { pipelines: [ { binding: "PIPELINE_BINDING", - pipeline: "some-name", + stream: "some-name", }, ], queues: { @@ -1097,7 +1097,7 @@ describe("init", () => { "pipelines": [ { "binding": "PIPELINE_BINDING", - "pipeline": "some-name" + "stream": "some-name" } ], "mtls_certificates": [ diff --git a/packages/wrangler/src/__tests__/sentry.test.ts b/packages/wrangler/src/__tests__/sentry.test.ts index 9cd00e97c4..b9a21d53a2 100644 --- a/packages/wrangler/src/__tests__/sentry.test.ts +++ b/packages/wrangler/src/__tests__/sentry.test.ts @@ -5,7 +5,7 @@ import { http, HttpResponse } from "msw"; import { afterEach, assert, beforeEach, describe, it } from "vitest"; import { mockAccountId, mockApiToken } from "./helpers/mock-account-id"; import { mockConsoleMethods } from "./helpers/mock-console"; -import { clearDialogs, mockConfirm } from "./helpers/mock-dialogs"; +import { clearDialogs } from "./helpers/mock-dialogs"; import { useMockIsTTY } from "./helpers/mock-istty"; import { createFetchResult, msw } from "./helpers/msw"; import { runWrangler } from "./helpers/run-wrangler"; @@ -69,9 +69,7 @@ describe("sentry", () => { ────────────────── Getting User settings... - If you think this is a bug then please create an issue at https://github.com/cloudflare/workers-sdk/issues/new/choose - ? Would you like to report this error to Cloudflare? Wrangler's output and the error details will be shared with the Wrangler team to help us diagnose and fix the issue. - 🤖 Using fallback value in non-interactive context: no" + If you think this is a bug then please create an issue at https://github.com/cloudflare/workers-sdk/issues/new/choose" `); expect(sentryRequests?.length).toEqual(0); }); @@ -102,40 +100,6 @@ describe("sentry", () => { expect(sentryRequests?.length).toEqual(0); }); - it("should not hit sentry after reportable error when permission denied", async ({ - expect, - }) => { - // Trigger an API error - msw.use( - http.get( - `https://api.cloudflare.com/client/v4/user`, - async () => { - return HttpResponse.error(); - }, - { once: true } - ), - http.get("*/user/tokens/verify", () => { - return HttpResponse.json(createFetchResult([])); - }) - ); - mockConfirm({ - text: "Would you like to report this error to Cloudflare? Wrangler's output and the error details will be shared with the Wrangler team to help us diagnose and fix the issue.", - result: false, - }); - await expect(runWrangler("whoami")).rejects.toMatchInlineSnapshot( - `[TypeError: Failed to fetch]` - ); - expect(std.out).toMatchInlineSnapshot(` - " - ⛅️ wrangler x.x.x - ────────────────── - Getting User settings... - - If you think this is a bug then please create an issue at https://github.com/cloudflare/workers-sdk/issues/new/choose" - `); - expect(sentryRequests?.length).toEqual(0); - }); - it("should not hit sentry (or even ask) after reportable error if WRANGLER_SEND_ERROR_REPORTS is explicitly false", async ({ expect, }) => { @@ -166,317 +130,6 @@ describe("sentry", () => { expect(sentryRequests?.length).toEqual(0); }); - it("should hit sentry after reportable error when permission provided", async ({ - expect, - }) => { - // Trigger an API error - msw.use( - http.get( - `https://api.cloudflare.com/client/v4/user`, - async () => { - return HttpResponse.error(); - }, - { once: true } - ), - http.get("*/user/tokens/verify", () => { - return HttpResponse.json(createFetchResult([])); - }) - ); - mockConfirm({ - text: "Would you like to report this error to Cloudflare? Wrangler's output and the error details will be shared with the Wrangler team to help us diagnose and fix the issue.", - result: true, - }); - await expect(runWrangler("whoami")).rejects.toMatchInlineSnapshot( - `[TypeError: Failed to fetch]` - ); - expect(std.out).toMatchInlineSnapshot(` - " - ⛅️ wrangler x.x.x - ────────────────── - Getting User settings... - - If you think this is a bug then please create an issue at https://github.com/cloudflare/workers-sdk/issues/new/choose" - `); - - // Sentry sends multiple HTTP requests to capture breadcrumbs - assert(sentryRequests); - expect(sentryRequests.length).toBeGreaterThan(0); - - // Check requests don't include PII - const envelopes = sentryRequests.map(({ envelope }) => { - const parts = envelope.split("\n").map((line) => JSON.parse(line)); - expect(parts).toHaveLength(3); - return { header: parts[0], type: parts[1], data: parts[2] }; - }); - const event = envelopes.find(({ type }) => type.type === "event"); - assert(event); - - // Redact fields with random contents we know don't contain PII - event.header.event_id = ""; - event.header.sent_at = ""; - event.header.trace.trace_id = ""; - event.header.trace.release = ""; - for (const exception of event.data.exception.values) { - for (const frame of exception.stacktrace.frames) { - if ( - frame.filename.startsWith("C:\\Project\\") || - frame.filename.startsWith("/project/") - ) { - frame.filename = "/project/..."; - } - frame.function = ""; - frame.lineno = 0; - frame.colno = 0; - frame.in_app = false; - frame.pre_context = []; - frame.context_line = ""; - frame.post_context = []; - } - } - event.data.event_id = ""; - event.data.contexts.trace.trace_id = ""; - event.data.contexts.trace.span_id = ""; - event.data.contexts.runtime.version = ""; - event.data.contexts.app.app_start_time = ""; - event.data.contexts.app.app_memory = 0; - event.data.contexts.os = {}; - event.data.contexts.device = {}; - event.data.timestamp = 0; - event.data.release = ""; - for (const breadcrumb of event.data.breadcrumbs) { - breadcrumb.timestamp = 0; - } - - const fakeInstallPath = "/wrangler/"; - for (const exception of event.data.exception?.values ?? []) { - for (const frame of exception.stacktrace?.frames ?? []) { - if (frame.module.startsWith("@mswjs")) { - frame.module = - "@mswjs.interceptors.src.interceptors.fetch:index.ts"; - } - if (frame.filename === undefined) { - continue; - } - - const wranglerPackageIndex = frame.filename.indexOf( - path.join("packages", "wrangler", "src") - ); - if (wranglerPackageIndex === -1) { - continue; - } - frame.filename = - fakeInstallPath + - frame.filename - .substring(wranglerPackageIndex) - .replaceAll("\\", "/"); - continue; - } - } - - // If more data is included in the Sentry request, we'll need to verify it - // couldn't contain PII and update this snapshot - expect(event).toStrictEqual({ - data: { - breadcrumbs: [ - { - level: "log", - message: "wrangler whoami", - timestamp: 0, - }, - ], - contexts: { - app: { - app_memory: 0, - app_start_time: "", - }, - cloud_resource: {}, - device: {}, - os: {}, - runtime: { - name: "node", - version: "", - }, - trace: { - span_id: "", - trace_id: "", - }, - }, - environment: "production", - event_id: "", - exception: { - values: [ - { - mechanism: { - handled: true, - type: "generic", - }, - stacktrace: { - frames: [ - { - colno: 0, - context_line: "", - filename: expect.any(String), - function: "", - in_app: false, - lineno: 0, - module: expect.any(String), - post_context: [], - pre_context: [], - }, - { - colno: 0, - context_line: "", - filename: expect.any(String), - function: "", - in_app: false, - lineno: 0, - module: expect.any(String), - post_context: [], - pre_context: [], - }, - { - colno: 0, - context_line: "", - filename: expect.any(String), - function: "", - in_app: false, - lineno: 0, - module: expect.any(String), - post_context: [], - pre_context: [], - }, - { - colno: 0, - context_line: "", - filename: expect.any(String), - function: "", - in_app: false, - lineno: 0, - module: expect.any(String), - post_context: [], - pre_context: [], - }, - { - colno: 0, - context_line: "", - filename: expect.any(String), - function: "", - in_app: false, - lineno: 0, - module: expect.any(String), - post_context: [], - pre_context: [], - }, - { - colno: 0, - context_line: "", - filename: expect.any(String), - function: "", - in_app: false, - lineno: 0, - module: expect.any(String), - post_context: [], - pre_context: [], - }, - { - colno: 0, - context_line: "", - filename: expect.any(String), - function: "", - in_app: false, - lineno: 0, - module: expect.any(String), - post_context: [], - pre_context: [], - }, - { - colno: 0, - context_line: "", - filename: expect.any(String), - function: "", - in_app: false, - lineno: 0, - module: expect.any(String), - post_context: [], - pre_context: [], - }, - { - colno: 0, - context_line: "", - filename: "/project/...", - function: "", - in_app: false, - lineno: 0, - module: - "@mswjs.interceptors.src.interceptors.fetch:index.ts", - post_context: [], - pre_context: [], - }, - { - colno: 0, - context_line: "", - filename: "/project/...", - function: "", - in_app: false, - lineno: 0, - module: - "@mswjs.interceptors.src.interceptors.fetch:index.ts", - post_context: [], - pre_context: [], - }, - ], - }, - type: "TypeError", - value: "Failed to fetch", - }, - ], - }, - modules: {}, - platform: "node", - release: "", - sdk: { - integrations: [ - "InboundFilters", - "FunctionToString", - "LinkedErrors", - "Console", - "OnUncaughtException", - "OnUnhandledRejection", - "ContextLines", - "Context", - "Modules", - ], - name: "sentry.javascript.node", - packages: [ - { - name: "npm:@sentry/node", - version: "7.87.0", - }, - ], - version: "7.87.0", - }, - timestamp: 0, - }, - header: { - event_id: "", - sdk: { - name: "sentry.javascript.node", - version: "7.87.0", - }, - sent_at: "", - trace: { - environment: "production", - public_key: "9edbb8417b284aa2bbead9b4c318918b", - release: "", - trace_id: "", - }, - }, - type: { - type: "event", - }, - }); - }); - it("should hit sentry after reportable error (without confirmation) if WRANGLER_SEND_ERROR_REPORTS is explicitly true", async ({ expect, }) => { diff --git a/packages/wrangler/src/__tests__/type-generation.test.ts b/packages/wrangler/src/__tests__/type-generation.test.ts index b88473b097..88b75ccd22 100644 --- a/packages/wrangler/src/__tests__/type-generation.test.ts +++ b/packages/wrangler/src/__tests__/type-generation.test.ts @@ -521,7 +521,7 @@ const bindingsConfigMock: Omit< }, { type: "CompiledWasm", globs: ["**/*.wasm"], fallthrough: true }, ], - pipelines: [{ binding: "PIPELINE", pipeline: "my-pipeline" }], + pipelines: [{ binding: "PIPELINE", stream: "my-pipeline" }], assets: { binding: "ASSETS_BINDING", directory: "/assets", @@ -4011,7 +4011,7 @@ describe("pipeline schema type generation", () => { name: "test-worker", main: "./index.ts", compatibility_date: "2024-01-01", - pipelines: [{ binding: "ANALYTICS", pipeline: "analytics-stream-id" }], + pipelines: [{ binding: "ANALYTICS", stream: "analytics-stream-id" }], }) ); fs.writeFileSync("./index.ts", "export default { fetch() {} }"); @@ -4069,7 +4069,7 @@ describe("pipeline schema type generation", () => { name: "test-worker", main: "./index.ts", compatibility_date: "2024-01-01", - pipelines: [{ binding: "LOGS", pipeline: "unstructured-stream-id" }], + pipelines: [{ binding: "LOGS", stream: "unstructured-stream-id" }], }) ); fs.writeFileSync("./index.ts", "export default { fetch() {} }"); @@ -4124,7 +4124,7 @@ describe("pipeline schema type generation", () => { name: "test-worker", main: "./index.ts", compatibility_date: "2024-01-01", - pipelines: [{ binding: "MISSING", pipeline: "non-existent-stream" }], + pipelines: [{ binding: "MISSING", stream: "non-existent-stream" }], }) ); fs.writeFileSync("./index.ts", "export default { fetch() {} }"); @@ -4219,8 +4219,8 @@ describe("pipeline schema type generation", () => { main: "./index.ts", compatibility_date: "2024-01-01", pipelines: [ - { binding: "EVENTS", pipeline: "events-stream" }, - { binding: "METRICS", pipeline: "metrics-stream" }, + { binding: "EVENTS", stream: "events-stream" }, + { binding: "METRICS", stream: "metrics-stream" }, ], }) ); @@ -4291,7 +4291,7 @@ describe("pipeline schema type generation", () => { name: "test-worker", main: "./index.ts", compatibility_date: "2024-01-01", - pipelines: [{ binding: "NESTED", pipeline: "nested-stream" }], + pipelines: [{ binding: "NESTED", stream: "nested-stream" }], }) ); fs.writeFileSync("./index.ts", "export default { fetch() {} }"); @@ -4358,7 +4358,7 @@ describe("pipeline schema type generation", () => { name: "test-worker", main: "./index.js", compatibility_date: "2024-01-01", - pipelines: [{ binding: "EVENTS", pipeline: "events-stream" }], + pipelines: [{ binding: "EVENTS", stream: "events-stream" }], }) ); diff --git a/packages/wrangler/src/api/startDevWorker/utils.ts b/packages/wrangler/src/api/startDevWorker/utils.ts index 55c49b62d8..da4e8f60ad 100644 --- a/packages/wrangler/src/api/startDevWorker/utils.ts +++ b/packages/wrangler/src/api/startDevWorker/utils.ts @@ -709,7 +709,11 @@ export function convertWorkerMetadataBindingsToFlatBindings( WorkerMetadataBinding, { type: "pipelines" } >; - output[name] = { type: "pipeline", pipeline: b.pipeline }; + output[name] = { + type: "pipeline", + stream: b.stream, + pipeline: b.pipeline, + }; break; } case "browser": diff --git a/packages/wrangler/src/deployment-bundle/create-worker-upload-form.ts b/packages/wrangler/src/deployment-bundle/create-worker-upload-form.ts index 2e7c6fae39..aaa7a33ac6 100644 --- a/packages/wrangler/src/deployment-bundle/create-worker-upload-form.ts +++ b/packages/wrangler/src/deployment-bundle/create-worker-upload-form.ts @@ -487,12 +487,22 @@ export function createWorkerUploadForm( }); }); - pipelines.forEach(({ binding, pipeline }) => { - metadataBindings.push({ - name: binding, - type: "pipelines", - pipeline: pipeline, - }); + pipelines.forEach(({ binding, stream: pipelineStream, pipeline }) => { + if (pipelineStream) { + metadataBindings.push({ + name: binding, + type: "pipelines", + stream: pipelineStream, + }); + } else if (pipeline) { + metadataBindings.push({ + name: binding, + type: "pipelines", + pipeline, + }); + } else { + throw new Error("Pipeline binding must specify a stream or pipeline"); + } }); worker_loaders.forEach(({ binding }) => { diff --git a/packages/wrangler/src/dev/miniflare/index.ts b/packages/wrangler/src/dev/miniflare/index.ts index e4483a2e25..e0f17e2a3e 100644 --- a/packages/wrangler/src/dev/miniflare/index.ts +++ b/packages/wrangler/src/dev/miniflare/index.ts @@ -283,22 +283,42 @@ function queueProducerEntry( return [binding, { queueName, deliveryDelay, remoteProxyConnectionString }]; } function pipelineEntry( - pipeline: CfPipeline, + { binding, stream, pipeline, remote }: CfPipeline, remoteProxyConnectionString?: RemoteProxyConnectionString ): [ string, - { - pipeline: string; - remoteProxyConnectionString?: RemoteProxyConnectionString; - }, + ( + | { + stream: string; + remoteProxyConnectionString?: RemoteProxyConnectionString; + } + | { + pipeline: string; + remoteProxyConnectionString?: RemoteProxyConnectionString; + } + ), ] { - if (!remoteProxyConnectionString || !pipeline.remote) { - return [pipeline.binding, { pipeline: pipeline.pipeline }]; + if (stream) { + return [ + binding, + { + stream, + ...(remoteProxyConnectionString && + remote && { remoteProxyConnectionString }), + }, + ]; + } else if (pipeline) { + return [ + binding, + { + pipeline, + ...(remoteProxyConnectionString && + remote && { remoteProxyConnectionString }), + }, + ]; + } else { + throw new Error("Pipeline must have either a stream"); } - return [ - pipeline.binding, - { pipeline: pipeline.pipeline, remoteProxyConnectionString }, - ]; } function hyperdriveEntry(hyperdrive: CfHyperdrive): [string, string] { return [hyperdrive.binding, hyperdrive.localConnectionString ?? ""]; diff --git a/packages/wrangler/src/preview/api.ts b/packages/wrangler/src/preview/api.ts index 95bc59e2c7..2134dafa3e 100644 --- a/packages/wrangler/src/preview/api.ts +++ b/packages/wrangler/src/preview/api.ts @@ -36,6 +36,7 @@ export interface Binding { }; certificate_id?: string; pipeline?: string; + stream?: string; store_id?: string; secret_name?: string; simple?: { diff --git a/packages/wrangler/src/preview/shared.ts b/packages/wrangler/src/preview/shared.ts index dcd700ef11..9bcbabb692 100644 --- a/packages/wrangler/src/preview/shared.ts +++ b/packages/wrangler/src/preview/shared.ts @@ -113,7 +113,7 @@ export function getBindingValue(binding: Binding): string { case "mtls_certificate": return String(binding.certificate_id ?? ""); case "pipelines": - return String(binding.pipeline ?? ""); + return String(binding.stream ?? binding.pipeline ?? ""); case "secrets_store_secret": return binding.secret_name ? `${binding.store_id}/${binding.secret_name}` @@ -261,8 +261,12 @@ export function extractConfigBindings(config: Config): EnvBindings { }; } - for (const pipeline of previews?.pipelines ?? []) { - env[pipeline.binding] = { type: "pipelines", pipeline: pipeline.pipeline }; + for (const { binding, stream, pipeline } of previews?.pipelines ?? []) { + env[binding] = { + type: "pipelines", + ...(stream && { stream }), + ...(pipeline && { pipeline }), + }; } for (const secret of previews?.secrets_store_secrets ?? []) { diff --git a/packages/wrangler/src/type-generation/index.ts b/packages/wrangler/src/type-generation/index.ts index c371d77c1e..de48af42bd 100644 --- a/packages/wrangler/src/type-generation/index.ts +++ b/packages/wrangler/src/type-generation/index.ts @@ -2747,19 +2747,21 @@ function collectAllUnsafeBindings( * * @param args - All the CLI arguments passed to the `types` command * - * @returns An array of collected pipeline bindings with their names and pipeline IDs. + * @returns An array of collected pipeline bindings with their names and stream IDs. */ function collectAllPipelines( args: Partial<(typeof typesCommand)["args"]> ): Array<{ binding: string; - pipeline: string; + stream?: string; + pipeline?: string; }> { const pipelinesMap = new Map< string, { binding: string; - pipeline: string; + stream?: string; + pipeline?: string; } >(); @@ -2783,13 +2785,13 @@ function collectAllPipelines( }); } - if (!pipeline.pipeline) { + if (!pipeline.stream && !pipeline.pipeline) { throwMissingBindingError({ binding: pipeline, bindingType: "pipelines", configPath: args.config, envName, - fieldName: "pipeline", + fieldName: "stream", index, }); } @@ -2800,6 +2802,7 @@ function collectAllPipelines( pipelinesMap.set(pipeline.binding, { binding: pipeline.binding, + stream: pipeline.stream, pipeline: pipeline.pipeline, }); } @@ -3892,30 +3895,54 @@ function collectPipelinesPerEnvironment( args: Partial<(typeof typesCommand)["args"]> ): Map< string, - Array<{ - binding: string; - pipeline: string; - }> + Array< + | { + binding: string; + stream: string; + } + | { + binding: string; + pipeline: string; + } + > > { const result = new Map< string, - Array<{ - binding: string; - pipeline: string; - }> + Array< + | { + binding: string; + stream: string; + } + | { + binding: string; + pipeline: string; + } + > >(); function collectEnvironmentPipelines( env: RawEnvironment | undefined, envName: string - ): Array<{ - binding: string; - pipeline: string; - }> { - const pipelines = new Array<{ - binding: string; - pipeline: string; - }>(); + ): Array< + | { + binding: string; + stream: string; + } + | { + binding: string; + pipeline: string; + } + > { + const pipelines = new Array< + | { + binding: string; + stream: string; + } + | { + binding: string; + pipeline: string; + } + >(); if (!env?.pipelines) { return pipelines; @@ -3933,21 +3960,26 @@ function collectPipelinesPerEnvironment( }); } - if (!pipeline.pipeline) { + if (pipeline.stream) { + pipelines.push({ + binding: pipeline.binding, + stream: pipeline.stream, + }); + } else if (pipeline.pipeline) { + pipelines.push({ + binding: pipeline.binding, + pipeline: pipeline.pipeline, + }); + } else { throwMissingBindingError({ binding: pipeline, bindingType: "pipelines", configPath: args.config, envName, - fieldName: "pipeline", + fieldName: "stream", index, }); } - - pipelines.push({ - binding: pipeline.binding, - pipeline: pipeline.pipeline, - }); } return pipelines; diff --git a/packages/wrangler/src/type-generation/pipeline-schema.ts b/packages/wrangler/src/type-generation/pipeline-schema.ts index 93dbd8ca77..33eeb19adb 100644 --- a/packages/wrangler/src/type-generation/pipeline-schema.ts +++ b/packages/wrangler/src/type-generation/pipeline-schema.ts @@ -213,7 +213,7 @@ export interface PipelineTypeResult { */ export async function fetchPipelineTypes( config: Config, - pipelines: Array<{ binding: string; pipeline: string }> + pipelines: Array<{ binding: string; stream?: string; pipeline?: string }> ): Promise { if (pipelines.length === 0) { return []; @@ -233,7 +233,15 @@ export async function fetchPipelineTypes( // Fetch all streams in parallel for better performance const streams = await Promise.all( - pipelines.map((p) => fetchStream(config, p.pipeline)) + pipelines.map((p) => { + const streamID = p.stream || p.pipeline; + if (!streamID) { + throw new Error( + `Pipeline binding ${p.binding} is missing the stream ID` + ); + } + return fetchStream(config, streamID); + }) ); const results = pipelines.map((pipeline, i) => { diff --git a/packages/wrangler/src/utils/print-bindings.ts b/packages/wrangler/src/utils/print-bindings.ts index 150f66d4f8..7894a38bc5 100644 --- a/packages/wrangler/src/utils/print-bindings.ts +++ b/packages/wrangler/src/utils/print-bindings.ts @@ -639,14 +639,16 @@ export function printBindings( if (pipelines.length > 0) { output.push( - ...pipelines.map(({ binding, pipeline, remote }) => ({ - name: binding, - type: getBindingTypeFriendlyName("pipeline"), - value: pipeline, - mode: getMode({ - isSimulatedLocally: context.remoteBindingsDisabled || !remote, - }), - })) + ...pipelines.map( + ({ binding, stream: pipelineStream, pipeline, remote }) => ({ + name: binding, + type: getBindingTypeFriendlyName("pipeline"), + value: pipelineStream || pipeline, + mode: getMode({ + isSimulatedLocally: context.remoteBindingsDisabled || !remote, + }), + }) + ) ); }