Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/asset-worker-cohort-deployments.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cloudflare/workers-shared": patch
---

During deployment, routes requests to new versions of asset-worker based on customer account plan.
13 changes: 13 additions & 0 deletions packages/workers-shared/asset-worker/src/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import type { ReadyAnalytics } from "./types";
// This will allow us to make breaking changes to the analytic schema
const VERSION = 1;

// Identifies which entrypoint produced the analytics row.
// Values must stay in sync with the ClickHouse view in cloudflare/ch/ready-analytics.
export enum EntrypointType {
Outer = 0,
Inner = 1,
}

// When adding new columns please update the schema
type Data = {
// -- Indexes --
Expand All @@ -23,6 +30,8 @@ type Data = {
status?: number;
// double6 - Compatibility flags
compatibilityFlags?: string[]; // converted into a bitmask
// double7 - Entrypoint discriminator (see EntrypointType enum)
entrypoint?: EntrypointType;

// -- Blobs --
// blob1 - Hostname of the request
Expand All @@ -41,6 +50,8 @@ type Data = {
coloRegion?: string;
// blob8 - The cache status of the request
cacheStatus?: string;
// blob9 - Account cohort ("ent", "paid", "free", "employee", or "unknown")
cohort?: string;
};

const COMPATIBILITY_FLAG_MASKS: Record<ENABLEMENT_COMPATIBILITY_FLAGS, number> =
Expand Down Expand Up @@ -94,6 +105,7 @@ export class Analytics {
this.data.coloTier ?? -1, // double4
this.data.status ?? -1, // double5
compatibilityFlagsBitmask, // double6
this.data.entrypoint ?? -1, // double7
],
blobs: [
this.data.hostname?.substring(0, 256), // blob1 - trim to 256 bytes
Expand All @@ -104,6 +116,7 @@ export class Analytics {
this.data.version, // blob6
this.data.coloRegion, // blob7
this.data.cacheStatus, // blob8
this.data.cohort, // blob9
],
});
}
Expand Down
110 changes: 99 additions & 11 deletions packages/workers-shared/asset-worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { WorkerEntrypoint } from "cloudflare:workers";
import { PerformanceTimer } from "../../utils/performance";
import { setupSentry } from "../../utils/sentry";
import { mockJaegerBinding } from "../../utils/tracing";
import { Analytics } from "./analytics";
import { Analytics, EntrypointType } from "./analytics";
import { AssetsManifest } from "./assets-manifest";
import { normalizeConfiguration } from "./configuration";
import { ExperimentAnalytics } from "./experiment-analytics";
Expand All @@ -16,6 +16,7 @@ import type {
SpanContext,
UnsafePerformanceTimer,
} from "../../utils/types";
import type { AccountCohortQuerierBinding } from "../worker-configuration";
import type { Environment, ReadyAnalytics } from "./types";

// ============================================================
Expand Down Expand Up @@ -49,6 +50,7 @@ export type Env = {
COLO_METADATA: ColoMetadata;
UNSAFE_PERFORMANCE: UnsafePerformanceTimer;
VERSION_METADATA: WorkerVersionMetadata;
ACCOUNT_COHORT_QUERIER?: AccountCohortQuerierBinding;
};

export type AssetWorkerEntrypointProps = {
Expand Down Expand Up @@ -92,6 +94,7 @@ type AssetWorkerContext = ExecutionContext & {
exports?: {
AssetWorkerInner?: (options: {
props: AssetWorkerEntrypointProps;
version?: { cohort?: string };
}) => AssetWorkerMethods;
};
props?: AssetWorkerEntrypointProps;
Expand Down Expand Up @@ -209,12 +212,55 @@ async function unstableGetByPathnameImpl(
});
}

export const COHORT_LOOKUP_TIMEOUT_MS = 5;

/**
* Resolves the deployment cohort for a customer account via the
* AccountCohortQuerier RPC binding. Returns null when the binding is
* unavailable, the RPC fails, times out, or the cohort is undetermined
* (cold cache). Intentionally fails open — a null cohort means the
* request runs under default routing.
*/
export async function lookupCohort(
env: Env,
accountId: number | undefined
): Promise<string | null> {
const querier = env.ACCOUNT_COHORT_QUERIER;
if (!querier || !accountId) {
return null;
}
const ac = new AbortController();
try {
const rpc = querier.lookupAccountCohort(accountId.toString());
// Prevent unhandled rejection if timeout wins but RPC later rejects.
void rpc.catch(() => {});
const timeout = new Promise<never>((_, reject) => {
const id = setTimeout(() => {
reject(new Error("cohort lookup timed out"));
}, COHORT_LOOKUP_TIMEOUT_MS);
ac.signal.addEventListener("abort", () => clearTimeout(id));
});
const res = await Promise.race([rpc, timeout]);
if (!res.ok) {
console.error("cohort lookup failed", res.errors);
return null;
}
return res.result ?? null;
} catch (e: unknown) {
console.error("cohort lookup failed", e);
return null;
} finally {
ac.abort();
}
}

async function runFetchRequest(
request: Request,
env: Env,
ctx: ExecutionContext,
exists: ExistsFn,
getByETag: GetByETagFn
getByETag: GetByETagFn,
cohort?: string
): Promise<Response> {
let sentry: ReturnType<typeof setupSentry> | undefined;
const analytics = new Analytics(env.ANALYTICS);
Expand Down Expand Up @@ -262,6 +308,8 @@ async function runFetchRequest(
notFoundHandling: config.not_found_handling,
compatibilityFlags: config.compatibility_flags,
userAgent: userAgent,
entrypoint: EntrypointType.Inner,
cohort: cohort ?? "unknown",
});
}

Expand Down Expand Up @@ -315,10 +363,29 @@ export default class AssetWorkerOuter<TEnv extends Env = Env>
extends WorkerEntrypoint<TEnv>
implements AssetWorkerMethods
{
private resolvedCohort: string | null | undefined = undefined;

/**
* Resolves and caches the cohort for this request. The cohort is
* constant for the lifetime of the outer entrypoint instance, so
* we only pay the RPC cost once even if multiple methods are called.
*/
private async getCohort(): Promise<string | null> {
if (this.resolvedCohort === undefined) {
this.resolvedCohort = await lookupCohort(
this.env,
this.env.CONFIG?.account_id
);
}
return this.resolvedCohort;
}

/**
* Gets the inner entrypoint from ctx.exports to forward requests to.
* When a cohort is provided, the runtime routes the inner entrypoint
* to the version assigned to that cohort in the current deployment.
*/
private getInnerEntrypoint(): AssetWorkerMethods {
private getInnerEntrypoint(cohort?: string | null): AssetWorkerMethods {
const loopbackCtx = this.ctx as AssetWorkerContext;
const entrypoint = loopbackCtx.exports?.AssetWorkerInner;
if (entrypoint === undefined) {
Expand All @@ -330,6 +397,7 @@ export default class AssetWorkerOuter<TEnv extends Env = Env>
}
return entrypoint({
props: { traceContext: this.env.JAEGER.getSpanContext() },
...(cohort ? { version: { cohort } } : {}),
});
}

Expand All @@ -355,6 +423,7 @@ export default class AssetWorkerOuter<TEnv extends Env = Env>
coloRegion: this.env.COLO_METADATA.coloRegion,
hostname: url.hostname,
version: this.env.VERSION_METADATA.tag,
entrypoint: EntrypointType.Outer,
});
}
sentry = setupSentry(
Expand All @@ -368,41 +437,58 @@ export default class AssetWorkerOuter<TEnv extends Env = Env>
this.env.CONFIG?.account_id,
this.env.CONFIG?.script_id
);
return await this.getInnerEntrypoint().fetch(request);

const cohort = await this.getCohort();
analytics.setData({ cohort: cohort ?? "unknown" });

const response = await this.getInnerEntrypoint(cohort).fetch(request);
analytics.setData({ status: response.status });
if (response.status >= 500) {
analytics.setData({ error: "inner entrypoint error" });
}
return response;
} catch (err) {
const response = handleError(sentry, analytics, err);
analytics.setData({ status: 500 });
return handleError(sentry, analytics, err);
} finally {
submitMetrics(analytics, performance, startTimeMs);
return response;
}
}

async unstable_canFetch(request: Request): Promise<boolean> {
this.env.JAEGER ??= mockJaegerBinding();
return this.getInnerEntrypoint().unstable_canFetch(request);
const cohort = await this.getCohort();
return this.getInnerEntrypoint(cohort).unstable_canFetch(request);
}

async unstable_getByETag(
eTag: string,
request?: Request
): Promise<GetByETagResult> {
this.env.JAEGER ??= mockJaegerBinding();
return this.getInnerEntrypoint().unstable_getByETag(eTag, request);
const cohort = await this.getCohort();
return this.getInnerEntrypoint(cohort).unstable_getByETag(eTag, request);
}

async unstable_getByPathname(
pathname: string,
request?: Request
): Promise<GetByETagResult | null> {
this.env.JAEGER ??= mockJaegerBinding();
return this.getInnerEntrypoint().unstable_getByPathname(pathname, request);
const cohort = await this.getCohort();
return this.getInnerEntrypoint(cohort).unstable_getByPathname(
pathname,
request
);
}

async unstable_exists(
pathname: string,
request?: Request
): Promise<string | null> {
this.env.JAEGER ??= mockJaegerBinding();
return this.getInnerEntrypoint().unstable_exists(pathname, request);
const cohort = await this.getCohort();
return this.getInnerEntrypoint(cohort).unstable_exists(pathname, request);
}
}

Expand All @@ -426,6 +512,7 @@ export class AssetWorkerInner<TEnv extends Env = Env>
this.env.JAEGER ??= mockJaegerBinding();
const loopbackCtx = this.ctx as AssetWorkerContext;
const traceContext = loopbackCtx.props?.traceContext ?? null;
const cohort = this.ctx.version?.cohort;

const response = await this.env.JAEGER.runWithSpanContext(
traceContext,
Expand All @@ -435,7 +522,8 @@ export class AssetWorkerInner<TEnv extends Env = Env>
this.env,
this.ctx,
this.unstable_exists.bind(this),
this.unstable_getByETag.bind(this)
this.unstable_getByETag.bind(this),
cohort
)
);

Expand Down
90 changes: 90 additions & 0 deletions packages/workers-shared/asset-worker/tests/analytics.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { describe, it, vi } from "vitest";
import { Analytics, EntrypointType } from "../src/analytics";
import type { ReadyAnalyticsEvent } from "../src/types";

describe("[Asset Worker] Analytics", () => {
function captureEvent(): {
analytics: Analytics;
getEvent: () => ReadyAnalyticsEvent | undefined;
} {
let captured: ReadyAnalyticsEvent | undefined;
const mockReadyAnalytics = {
logEvent: vi.fn((event: ReadyAnalyticsEvent) => {
captured = event;
}),
};
return {
analytics: new Analytics(mockReadyAnalytics),
getEvent: () => captured,
};
}

describe("EntrypointType enum", () => {
it("has expected numeric values", ({ expect }) => {
expect(EntrypointType.Outer).toBe(0);
expect(EntrypointType.Inner).toBe(1);
});
});

describe("entrypoint discriminator (double7)", () => {
it("writes -1 when entrypoint is not set", ({ expect }) => {
const { analytics, getEvent } = captureEvent();
analytics.write();

const event = getEvent();
expect(event?.doubles?.[6]).toBe(-1);
});
});

describe("full event shape", () => {
it("preserves all existing fields alongside new ones", ({ expect }) => {
const { analytics, getEvent } = captureEvent();
analytics.setData({
accountId: 123,
scriptId: 456,
requestTime: 50,
coloId: 1,
metalId: 2,
coloTier: 3,
status: 200,
entrypoint: EntrypointType.Inner,
hostname: "example.com",
userAgent: "test-agent",
htmlHandling: "auto-trailing-slash",
notFoundHandling: "none",
error: "",
version: "abc123",
coloRegion: "WEUR",
cacheStatus: "HIT",
cohort: "free",
});
analytics.write();

const event = getEvent();
// Doubles
expect(event?.doubles?.[0]).toBe(50); // requestTime
expect(event?.doubles?.[1]).toBe(1); // coloId
expect(event?.doubles?.[2]).toBe(2); // metalId
expect(event?.doubles?.[3]).toBe(3); // coloTier
expect(event?.doubles?.[4]).toBe(200); // status
// double6 is compatibilityFlags bitmask, defaults to 0
expect(event?.doubles?.[5]).toBe(0);
expect(event?.doubles?.[6]).toBe(EntrypointType.Inner); // entrypoint

// Blobs
expect(event?.blobs?.[0]).toBe("example.com"); // hostname
expect(event?.blobs?.[1]).toBe("test-agent"); // userAgent
expect(event?.blobs?.[2]).toBe("auto-trailing-slash"); // htmlHandling
expect(event?.blobs?.[3]).toBe("none"); // notFoundHandling
expect(event?.blobs?.[4]).toBe(""); // error
expect(event?.blobs?.[5]).toBe("abc123"); // version
expect(event?.blobs?.[6]).toBe("WEUR"); // coloRegion
expect(event?.blobs?.[7]).toBe("HIT"); // cacheStatus
expect(event?.blobs?.[8]).toBe("free"); // cohort

// Indexes
expect(event?.accountId).toBe(123);
expect(event?.indexId).toBe("456");
});
});
});
Loading
Loading