diff --git a/.changeset/asset-worker-cohort-deployments.md b/.changeset/asset-worker-cohort-deployments.md new file mode 100644 index 0000000000..5bd397c711 --- /dev/null +++ b/.changeset/asset-worker-cohort-deployments.md @@ -0,0 +1,5 @@ +--- +"@cloudflare/workers-shared": patch +--- + +During deployment, routes requests to new versions of asset-worker based on customer account plan. diff --git a/packages/workers-shared/asset-worker/src/analytics.ts b/packages/workers-shared/asset-worker/src/analytics.ts index 6e09e2f7e2..6a13355742 100644 --- a/packages/workers-shared/asset-worker/src/analytics.ts +++ b/packages/workers-shared/asset-worker/src/analytics.ts @@ -4,6 +4,13 @@ import type { ReadyAnalytics } from "./types"; // This will allow us to make breaking changes to the analytic schema const VERSION = 1; +// Identifies which entrypoint produced the analytics row. +// Values must stay in sync with the ClickHouse view in cloudflare/ch/ready-analytics. +export enum EntrypointType { + Outer = 0, + Inner = 1, +} + // When adding new columns please update the schema type Data = { // -- Indexes -- @@ -23,6 +30,8 @@ type Data = { status?: number; // double6 - Compatibility flags compatibilityFlags?: string[]; // converted into a bitmask + // double7 - Entrypoint discriminator (see EntrypointType enum) + entrypoint?: EntrypointType; // -- Blobs -- // blob1 - Hostname of the request @@ -41,6 +50,8 @@ type Data = { coloRegion?: string; // blob8 - The cache status of the request cacheStatus?: string; + // blob9 - Account cohort ("ent", "paid", "free", "employee", or "unknown") + cohort?: string; }; const COMPATIBILITY_FLAG_MASKS: Record = @@ -94,6 +105,7 @@ export class Analytics { this.data.coloTier ?? -1, // double4 this.data.status ?? -1, // double5 compatibilityFlagsBitmask, // double6 + this.data.entrypoint ?? -1, // double7 ], blobs: [ this.data.hostname?.substring(0, 256), // blob1 - trim to 256 bytes @@ -104,6 +116,7 @@ export class Analytics { this.data.version, // blob6 this.data.coloRegion, // blob7 this.data.cacheStatus, // blob8 + this.data.cohort, // blob9 ], }); } diff --git a/packages/workers-shared/asset-worker/src/worker.ts b/packages/workers-shared/asset-worker/src/worker.ts index a9b05a0515..76818026b6 100644 --- a/packages/workers-shared/asset-worker/src/worker.ts +++ b/packages/workers-shared/asset-worker/src/worker.ts @@ -2,7 +2,7 @@ import { WorkerEntrypoint } from "cloudflare:workers"; import { PerformanceTimer } from "../../utils/performance"; import { setupSentry } from "../../utils/sentry"; import { mockJaegerBinding } from "../../utils/tracing"; -import { Analytics } from "./analytics"; +import { Analytics, EntrypointType } from "./analytics"; import { AssetsManifest } from "./assets-manifest"; import { normalizeConfiguration } from "./configuration"; import { ExperimentAnalytics } from "./experiment-analytics"; @@ -16,6 +16,7 @@ import type { SpanContext, UnsafePerformanceTimer, } from "../../utils/types"; +import type { AccountCohortQuerierBinding } from "../worker-configuration"; import type { Environment, ReadyAnalytics } from "./types"; // ============================================================ @@ -49,6 +50,7 @@ export type Env = { COLO_METADATA: ColoMetadata; UNSAFE_PERFORMANCE: UnsafePerformanceTimer; VERSION_METADATA: WorkerVersionMetadata; + ACCOUNT_COHORT_QUERIER?: AccountCohortQuerierBinding; }; export type AssetWorkerEntrypointProps = { @@ -92,6 +94,7 @@ type AssetWorkerContext = ExecutionContext & { exports?: { AssetWorkerInner?: (options: { props: AssetWorkerEntrypointProps; + version?: { cohort?: string }; }) => AssetWorkerMethods; }; props?: AssetWorkerEntrypointProps; @@ -209,12 +212,55 @@ async function unstableGetByPathnameImpl( }); } +export const COHORT_LOOKUP_TIMEOUT_MS = 5; + +/** + * Resolves the deployment cohort for a customer account via the + * AccountCohortQuerier RPC binding. Returns null when the binding is + * unavailable, the RPC fails, times out, or the cohort is undetermined + * (cold cache). Intentionally fails open — a null cohort means the + * request runs under default routing. + */ +export async function lookupCohort( + env: Env, + accountId: number | undefined +): Promise { + const querier = env.ACCOUNT_COHORT_QUERIER; + if (!querier || !accountId) { + return null; + } + const ac = new AbortController(); + try { + const rpc = querier.lookupAccountCohort(accountId.toString()); + // Prevent unhandled rejection if timeout wins but RPC later rejects. + void rpc.catch(() => {}); + const timeout = new Promise((_, reject) => { + const id = setTimeout(() => { + reject(new Error("cohort lookup timed out")); + }, COHORT_LOOKUP_TIMEOUT_MS); + ac.signal.addEventListener("abort", () => clearTimeout(id)); + }); + const res = await Promise.race([rpc, timeout]); + if (!res.ok) { + console.error("cohort lookup failed", res.errors); + return null; + } + return res.result ?? null; + } catch (e: unknown) { + console.error("cohort lookup failed", e); + return null; + } finally { + ac.abort(); + } +} + async function runFetchRequest( request: Request, env: Env, ctx: ExecutionContext, exists: ExistsFn, - getByETag: GetByETagFn + getByETag: GetByETagFn, + cohort?: string ): Promise { let sentry: ReturnType | undefined; const analytics = new Analytics(env.ANALYTICS); @@ -262,6 +308,8 @@ async function runFetchRequest( notFoundHandling: config.not_found_handling, compatibilityFlags: config.compatibility_flags, userAgent: userAgent, + entrypoint: EntrypointType.Inner, + cohort: cohort ?? "unknown", }); } @@ -315,10 +363,29 @@ export default class AssetWorkerOuter extends WorkerEntrypoint implements AssetWorkerMethods { + private resolvedCohort: string | null | undefined = undefined; + + /** + * Resolves and caches the cohort for this request. The cohort is + * constant for the lifetime of the outer entrypoint instance, so + * we only pay the RPC cost once even if multiple methods are called. + */ + private async getCohort(): Promise { + if (this.resolvedCohort === undefined) { + this.resolvedCohort = await lookupCohort( + this.env, + this.env.CONFIG?.account_id + ); + } + return this.resolvedCohort; + } + /** * Gets the inner entrypoint from ctx.exports to forward requests to. + * When a cohort is provided, the runtime routes the inner entrypoint + * to the version assigned to that cohort in the current deployment. */ - private getInnerEntrypoint(): AssetWorkerMethods { + private getInnerEntrypoint(cohort?: string | null): AssetWorkerMethods { const loopbackCtx = this.ctx as AssetWorkerContext; const entrypoint = loopbackCtx.exports?.AssetWorkerInner; if (entrypoint === undefined) { @@ -330,6 +397,7 @@ export default class AssetWorkerOuter } return entrypoint({ props: { traceContext: this.env.JAEGER.getSpanContext() }, + ...(cohort ? { version: { cohort } } : {}), }); } @@ -355,6 +423,7 @@ export default class AssetWorkerOuter coloRegion: this.env.COLO_METADATA.coloRegion, hostname: url.hostname, version: this.env.VERSION_METADATA.tag, + entrypoint: EntrypointType.Outer, }); } sentry = setupSentry( @@ -368,17 +437,28 @@ export default class AssetWorkerOuter this.env.CONFIG?.account_id, this.env.CONFIG?.script_id ); - return await this.getInnerEntrypoint().fetch(request); + + const cohort = await this.getCohort(); + analytics.setData({ cohort: cohort ?? "unknown" }); + + const response = await this.getInnerEntrypoint(cohort).fetch(request); + analytics.setData({ status: response.status }); + if (response.status >= 500) { + analytics.setData({ error: "inner entrypoint error" }); + } + return response; } catch (err) { - const response = handleError(sentry, analytics, err); + analytics.setData({ status: 500 }); + return handleError(sentry, analytics, err); + } finally { submitMetrics(analytics, performance, startTimeMs); - return response; } } async unstable_canFetch(request: Request): Promise { this.env.JAEGER ??= mockJaegerBinding(); - return this.getInnerEntrypoint().unstable_canFetch(request); + const cohort = await this.getCohort(); + return this.getInnerEntrypoint(cohort).unstable_canFetch(request); } async unstable_getByETag( @@ -386,7 +466,8 @@ export default class AssetWorkerOuter request?: Request ): Promise { this.env.JAEGER ??= mockJaegerBinding(); - return this.getInnerEntrypoint().unstable_getByETag(eTag, request); + const cohort = await this.getCohort(); + return this.getInnerEntrypoint(cohort).unstable_getByETag(eTag, request); } async unstable_getByPathname( @@ -394,7 +475,11 @@ export default class AssetWorkerOuter request?: Request ): Promise { this.env.JAEGER ??= mockJaegerBinding(); - return this.getInnerEntrypoint().unstable_getByPathname(pathname, request); + const cohort = await this.getCohort(); + return this.getInnerEntrypoint(cohort).unstable_getByPathname( + pathname, + request + ); } async unstable_exists( @@ -402,7 +487,8 @@ export default class AssetWorkerOuter request?: Request ): Promise { this.env.JAEGER ??= mockJaegerBinding(); - return this.getInnerEntrypoint().unstable_exists(pathname, request); + const cohort = await this.getCohort(); + return this.getInnerEntrypoint(cohort).unstable_exists(pathname, request); } } @@ -426,6 +512,7 @@ export class AssetWorkerInner this.env.JAEGER ??= mockJaegerBinding(); const loopbackCtx = this.ctx as AssetWorkerContext; const traceContext = loopbackCtx.props?.traceContext ?? null; + const cohort = this.ctx.version?.cohort; const response = await this.env.JAEGER.runWithSpanContext( traceContext, @@ -435,7 +522,8 @@ export class AssetWorkerInner this.env, this.ctx, this.unstable_exists.bind(this), - this.unstable_getByETag.bind(this) + this.unstable_getByETag.bind(this), + cohort ) ); diff --git a/packages/workers-shared/asset-worker/tests/analytics.test.ts b/packages/workers-shared/asset-worker/tests/analytics.test.ts new file mode 100644 index 0000000000..f8a9e1dbb7 --- /dev/null +++ b/packages/workers-shared/asset-worker/tests/analytics.test.ts @@ -0,0 +1,90 @@ +import { describe, it, vi } from "vitest"; +import { Analytics, EntrypointType } from "../src/analytics"; +import type { ReadyAnalyticsEvent } from "../src/types"; + +describe("[Asset Worker] Analytics", () => { + function captureEvent(): { + analytics: Analytics; + getEvent: () => ReadyAnalyticsEvent | undefined; + } { + let captured: ReadyAnalyticsEvent | undefined; + const mockReadyAnalytics = { + logEvent: vi.fn((event: ReadyAnalyticsEvent) => { + captured = event; + }), + }; + return { + analytics: new Analytics(mockReadyAnalytics), + getEvent: () => captured, + }; + } + + describe("EntrypointType enum", () => { + it("has expected numeric values", ({ expect }) => { + expect(EntrypointType.Outer).toBe(0); + expect(EntrypointType.Inner).toBe(1); + }); + }); + + describe("entrypoint discriminator (double7)", () => { + it("writes -1 when entrypoint is not set", ({ expect }) => { + const { analytics, getEvent } = captureEvent(); + analytics.write(); + + const event = getEvent(); + expect(event?.doubles?.[6]).toBe(-1); + }); + }); + + describe("full event shape", () => { + it("preserves all existing fields alongside new ones", ({ expect }) => { + const { analytics, getEvent } = captureEvent(); + analytics.setData({ + accountId: 123, + scriptId: 456, + requestTime: 50, + coloId: 1, + metalId: 2, + coloTier: 3, + status: 200, + entrypoint: EntrypointType.Inner, + hostname: "example.com", + userAgent: "test-agent", + htmlHandling: "auto-trailing-slash", + notFoundHandling: "none", + error: "", + version: "abc123", + coloRegion: "WEUR", + cacheStatus: "HIT", + cohort: "free", + }); + analytics.write(); + + const event = getEvent(); + // Doubles + expect(event?.doubles?.[0]).toBe(50); // requestTime + expect(event?.doubles?.[1]).toBe(1); // coloId + expect(event?.doubles?.[2]).toBe(2); // metalId + expect(event?.doubles?.[3]).toBe(3); // coloTier + expect(event?.doubles?.[4]).toBe(200); // status + // double6 is compatibilityFlags bitmask, defaults to 0 + expect(event?.doubles?.[5]).toBe(0); + expect(event?.doubles?.[6]).toBe(EntrypointType.Inner); // entrypoint + + // Blobs + expect(event?.blobs?.[0]).toBe("example.com"); // hostname + expect(event?.blobs?.[1]).toBe("test-agent"); // userAgent + expect(event?.blobs?.[2]).toBe("auto-trailing-slash"); // htmlHandling + expect(event?.blobs?.[3]).toBe("none"); // notFoundHandling + expect(event?.blobs?.[4]).toBe(""); // error + expect(event?.blobs?.[5]).toBe("abc123"); // version + expect(event?.blobs?.[6]).toBe("WEUR"); // coloRegion + expect(event?.blobs?.[7]).toBe("HIT"); // cacheStatus + expect(event?.blobs?.[8]).toBe("free"); // cohort + + // Indexes + expect(event?.accountId).toBe(123); + expect(event?.indexId).toBe("456"); + }); + }); +}); diff --git a/packages/workers-shared/asset-worker/tests/cohort.test.ts b/packages/workers-shared/asset-worker/tests/cohort.test.ts new file mode 100644 index 0000000000..56db5d6488 --- /dev/null +++ b/packages/workers-shared/asset-worker/tests/cohort.test.ts @@ -0,0 +1,85 @@ +import { describe, it, vi } from "vitest"; +import { COHORT_LOOKUP_TIMEOUT_MS, lookupCohort } from "../src/worker"; +import type { Env } from "../src/worker"; +import type { AccountCohortQuerierBinding } from "../worker-configuration"; + +function makeEnv( + querier?: Partial +): Pick { + return { + ACCOUNT_COHORT_QUERIER: querier as AccountCohortQuerierBinding | undefined, + }; +} + +describe("[Asset Worker] lookupCohort", () => { + it("calls querier with account ID as string", async ({ expect }) => { + const lookupMock = vi.fn().mockResolvedValue({ + ok: true, + result: "ent", + meta: { workersVersion: "test" }, + }); + + const result = await lookupCohort( + makeEnv({ lookupAccountCohort: lookupMock }) as Env, + 42 + ); + + expect(result).toBe("ent"); + expect(lookupMock).toHaveBeenCalledWith("42"); + }); + + it("returns null when result is ok:false", async ({ expect }) => { + const result = await lookupCohort( + makeEnv({ + lookupAccountCohort: () => + Promise.resolve({ + ok: false as const, + errors: [ + { name: "Error", message: "invalid account", code: "ERR" }, + ], + }), + }) as Env, + 42 + ); + + expect(result).toBeNull(); + }); + + it("returns null when result is null", async ({ expect }) => { + const result = await lookupCohort( + makeEnv({ + lookupAccountCohort: () => + Promise.resolve({ + ok: true as const, + result: null, + meta: { workersVersion: "test" }, + }), + }) as Env, + 42 + ); + + expect(result).toBeNull(); + }); + + it("times out after COHORT_LOOKUP_TIMEOUT_MS", async ({ expect }) => { + const result = await lookupCohort( + makeEnv({ + lookupAccountCohort: () => + new Promise((resolve) => { + setTimeout( + () => + resolve({ + ok: true as const, + result: "ent", + meta: { workersVersion: "test" }, + }), + COHORT_LOOKUP_TIMEOUT_MS * 2 + ); + }), + }) as Env, + 42 + ); + + expect(result).toBeNull(); + }); +}); diff --git a/packages/workers-shared/asset-worker/worker-configuration.d.ts b/packages/workers-shared/asset-worker/worker-configuration.d.ts index 676c3e8e38..02fb110c81 100644 --- a/packages/workers-shared/asset-worker/worker-configuration.d.ts +++ b/packages/workers-shared/asset-worker/worker-configuration.d.ts @@ -16,3 +16,18 @@ interface ExecutionContext { override?: string; }; } + +/** + * Minimal RPC binding type for the AccountCohortQuerier entrypoint + * in the account-services worker. Replace with the published type from + * `@cloudflare/workers-toolbox-types` once available with bundled deps. + */ +export interface AccountCohortQuerierBinding { + lookupAccountCohort(accountID: string): Promise< + | { ok: true; result: string | null; meta: { workersVersion: string } } + | { + ok: false; + errors: Array<{ name: string; message: string; code: string }>; + } + >; +} diff --git a/packages/workers-shared/asset-worker/wrangler.jsonc b/packages/workers-shared/asset-worker/wrangler.jsonc index b43f7cb183..5d9febf6c8 100644 --- a/packages/workers-shared/asset-worker/wrangler.jsonc +++ b/packages/workers-shared/asset-worker/wrangler.jsonc @@ -43,6 +43,13 @@ "name": "workers-asset-worker", "type": "internal_capability_grants", }, + { + "name": "ACCOUNT_COHORT_QUERIER", + "type": "service", + "service": "account-services-production", + "entrypoint": "AccountCohortQuerier", + "cross_account_grant": "workers-asset-worker", + }, ], }, "vars": { @@ -54,6 +61,13 @@ "env": { "staging": { "name": "asset-worker-staging", + // enable_version_api is set per-environment (not top-level) because + // the local workerd in vitest-pool-workers does not support it yet. + "compatibility_flags": [ + "nodejs_compat", + "enable_ctx_exports", + "enable_version_api", + ], "unsafe": { "metadata": { "build_options": { @@ -81,6 +95,13 @@ "name": "workers-asset-worker-staging", "type": "internal_capability_grants", }, + { + "name": "ACCOUNT_COHORT_QUERIER", + "type": "service", + "service": "account-services-staging", + "entrypoint": "AccountCohortQuerier", + "cross_account_grant": "workers-asset-worker-staging", + }, ], }, "vars": { @@ -93,6 +114,11 @@ "fed-prod": { "name": "asset-worker-fed-prod", "account_id": "fca6c231dc1d1780777997c5bce5a5e3", // workers assets + "compatibility_flags": [ + "nodejs_compat", + "enable_ctx_exports", + "enable_version_api", + ], "vars": { "ENVIRONMENT": "fed-prod", }, @@ -127,6 +153,13 @@ "name": "workers-asset-worker", "type": "internal_capability_grants", }, + { + "name": "ACCOUNT_COHORT_QUERIER", + "type": "service", + "service": "account-services-production", + "entrypoint": "AccountCohortQuerier", + "cross_account_grant": "workers-asset-worker", + }, ], }, },