Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ The following types can be passed over RPC (in arguments or return values), and
* `ReadableStream` and `WritableStream`, with automatic flow control.
* `Headers`, `Request`, and `Response` from the Fetch API.

When a returned `Response` has a Cloudflare Workers-style `.webSocket` property, Cap'n Web treats
that socket as the continuation of an HTTP/WebSocket upgrade. The receiver gets a `Response` with
a `.webSocket` property whose value supports `send()`, `close()`, `accept()`, `readyState`, and
`message` / `close` / `error` event listeners, suitable for passing to
`newWebSocketRpcSession()`. Bare `WebSocket` objects are not otherwise serializable values; this
support is scoped to fetch upgrade responses. Close or dispose the upgraded socket when done so
the remote connection can be released. Upgrade tunneling carries data frames over the RPC session
and does not add `ReadableStream`-style flow control or expose ping/pong control frames.

The following types are not supported as of this writing, but may be added in the future:
* `Map` and `Set`
* `ArrayBuffer` and typed arrays other than `Uint8Array`
Expand Down
5 changes: 4 additions & 1 deletion __tests__/test-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ export async function setup(project: TestProject) {
})

// Listen on an ephemeral port for testing purposes.
httpServer.listen(0);
await new Promise<void>((resolve, reject) => {
httpServer!.once("error", reject);
httpServer!.listen(0, "127.0.0.1", resolve);
});
let addr = httpServer.address() as AddressInfo;

// Provide the server address to tests.
Expand Down
189 changes: 189 additions & 0 deletions __tests__/websocket-serialization.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright (c) 2025 Cloudflare, Inc.
// Licensed under the MIT license found in the LICENSE.txt file or at:
// https://opensource.org/license/mit

import { expect, it, describe } from "vitest";
import type { AddressInfo } from "node:net";
import { WebSocket as WsWebSocket, WebSocketServer } from "ws";
import { newWebSocketRpcSession, RpcTarget } from "../src/index.js";

function listenWebSocket(server: WebSocketServer): Promise<number> {
return new Promise((resolve, reject) => {
server.on("error", reject);
server.on("listening", () => {
resolve((server.address() as AddressInfo).port);
});
});
}

function closeWebSocket(server: WebSocketServer): Promise<void> {
return new Promise(resolve => {
for (let client of server.clients) client.terminate();
server.close(() => resolve());
});
}

function waitOpen(socket: WsWebSocket): Promise<void> {
if (socket.readyState === WsWebSocket.OPEN) return Promise.resolve();

return new Promise((resolve, reject) => {
socket.addEventListener("open", () => resolve(), { once: true });
socket.addEventListener("error", () => reject(new Error("WebSocket failed to open")), {
once: true,
});
});
}

function withTimeout<T>(promise: Promise<T>, ms = 5000): Promise<T> {
let timer: ReturnType<typeof setTimeout>;
let timeout = new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(new Error("Timed out waiting for WebSocket message")), ms);
});

return Promise.race([promise, timeout]).finally(() => clearTimeout(timer));
}

function nextMessage(socket: { addEventListener(type: "message", listener: (event: any) => void, options?: any): void })
: Promise<string | Uint8Array> {
return new Promise(resolve => {
socket.addEventListener("message", event => resolve(event.data), { once: true });
});
}

function nextClose(socket: { addEventListener(type: "close", listener: (event: any) => void, options?: any): void })
: Promise<{ code: number, reason: string }> {
return new Promise(resolve => {
socket.addEventListener("close", event => {
resolve({ code: event.code, reason: event.reason });
}, { once: true });
});
}

function responseWithWebSocket(socket: WebSocket): Response {
let response = new Response(null);
Object.defineProperty(response, "webSocket", { value: socket });
return response;
}

describe("WebSocket upgrade response serialization", () => {
it("rejects bare WebSockets but can pass Response.webSocket as an upgrade continuation",
async () => {
let echoServer = new WebSocketServer({ host: "127.0.0.1", port: 0 });
echoServer.on("connection", socket => {
socket.on("message", (data, isBinary) => socket.send(data, { binary: isBinary }));
});
let echoPort = await listenWebSocket(echoServer);

class Api extends RpcTarget {
openedSocket?: WsWebSocket;

async open() {
let socket = new WsWebSocket(`ws://127.0.0.1:${echoPort}`);
await waitOpen(socket);
this.openedSocket = socket;
return socket;
}

async openResponse() {
let socket = new WsWebSocket(`ws://127.0.0.1:${echoPort}`);
await waitOpen(socket);
return responseWithWebSocket(socket as any);
}
}

let apiImpl = new Api();
let rpcServer = new WebSocketServer({ host: "127.0.0.1", port: 0 });
rpcServer.on("connection", socket => {
newWebSocketRpcSession(socket as any, apiImpl);
});
let rpcPort = await listenWebSocket(rpcServer);

let api: any = newWebSocketRpcSession(new WsWebSocket(`ws://127.0.0.1:${rpcPort}`) as any);
try {
await expect(api.open()).rejects.toThrow();

let response: any = await api.openResponse();
expect(response.status).toBe(200);
expect(response.webSocket).toBeTruthy();

let socket = response.webSocket;
let message = nextMessage(socket);
socket.send("hello through Response.webSocket");
expect(await withTimeout(message)).toBe("hello through Response.webSocket");

let onceCount = 0;
socket.addEventListener("message", () => { ++onceCount; }, { once: true });
let onceMessage = nextMessage(socket);
socket.send("listener once");
expect(await withTimeout(onceMessage)).toBe("listener once");
let afterOnceMessage = nextMessage(socket);
socket.send("listener after once");
expect(await withTimeout(afterOnceMessage)).toBe("listener after once");
expect(onceCount).toBe(1);

let binaryMessage = nextMessage(socket);
socket.send(new Uint8Array([1, 2, 3]).buffer);
let bytes = await withTimeout(binaryMessage);
expect(typeof bytes).not.toBe("string");
expect(Array.from(bytes as Uint8Array)).toEqual([1, 2, 3]);

let close = nextClose(socket);
socket.close(1000, "done");
expect(await withTimeout(close)).toEqual({ code: 1000, reason: "done" });
socket.close();
} finally {
apiImpl.openedSocket?.close();
await closeWebSocket(rpcServer);
await closeWebSocket(echoServer);
}
}, 10_000);

it("can run Cap'n Web over a WebSocket tunneled through Cap'n Web", async () => {
class Processor extends RpcTarget {
square(value: number) { return value * value; }
greet(name: string) { return `hello ${name}`; }
}

let processorServer = new WebSocketServer({ host: "127.0.0.1", port: 0 });
processorServer.on("connection", socket => {
newWebSocketRpcSession(socket as any, new Processor());
});
let processorPort = await listenWebSocket(processorServer);

class TunnelTarget extends RpcTarget {
async fetch(request: Request): Promise<Response> {
if (request.headers.get("Upgrade")?.toLowerCase() !== "websocket") {
return new Response("Expected a WebSocket upgrade.", { status: 426 });
}

let socket = new WsWebSocket(`ws://127.0.0.1:${processorPort}`);
await waitOpen(socket);

let response = new Response(null);
Object.defineProperty(response, "webSocket", { value: socket });
return response;
}
}

let tunnelServer = new WebSocketServer({ host: "127.0.0.1", port: 0 });
tunnelServer.on("connection", socket => {
newWebSocketRpcSession(socket as any, new TunnelTarget());
});
let tunnelPort = await listenWebSocket(tunnelServer);

let tunnelApi: any =
newWebSocketRpcSession(new WsWebSocket(`ws://127.0.0.1:${tunnelPort}`) as any);
let response: any = await tunnelApi.fetch(new Request("https://captun.test/processor", {
headers: { Upgrade: "websocket" },
}));

let processor: any = newWebSocketRpcSession(response.webSocket);
try {
expect(await processor.square(7)).toBe(49);
expect(await processor.greet("captun")).toBe("hello captun");
} finally {
await closeWebSocket(tunnelServer);
await closeWebSocket(processorServer);
}
}, 10_000);
});
35 changes: 35 additions & 0 deletions __tests__/workerd.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,41 @@ interface WorkerdTestTarget extends TestTarget {
}

describe("workerd RPC server", () => {
it("can return a WebSocket-bearing Response over RPC", async () => {
class WebSocketResponseTarget extends RpcTarget {
openSocket() {
let pair = new WebSocketPair();
let client = pair[0];
let server = pair[1];
server.accept();
server.addEventListener("message", event => server.send(event.data));
return new Response(null, { status: 101, webSocket: client });
}
}

let pair = new WebSocketPair();
let client = pair[0];
let server = pair[1];
client.accept();
server.accept();

let api: any = newWebSocketRpcSession(client);
newWebSocketRpcSession(server, new WebSocketResponseTarget());

let response: any = await api.openSocket();
let socket = response.webSocket;
expect(socket).toBeTruthy();

socket.accept();
let message = new Promise(resolve => {
socket.addEventListener("message", event => resolve(event.data), { once: true });
});
socket.send("hello over Response.webSocket");

expect(await message).toBe("hello over Response.webSocket");
socket.close();
});

it("can accept WebSocket RPC connections", async () => {
let resp = await (<Env>env).testServer.fetch("http://foo", {headers: {Upgrade: "websocket"}});
let ws = resp.webSocket;
Expand Down
3 changes: 2 additions & 1 deletion protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ A `Request` object from the Fetch API. `url` and `init` are the parameters to pa
At this time, `init.signal` is not supported and must not be sent, though that will change when `AbortSignal` gains support for serialization.

`["response", body, init]`
`["response", body, init, ["upgrade", sessionExpression]]`

A `Response` object from the Fetch API. `body` and `init` are the parameters to pass to `Response`'s constructor to create the desired `Response` instance. `body` is an expression which must evaluate to `null`, a string, `UInt8Array`, or `ReadableStream`. `init.headers`, if present, must contain an array of pairs, suitable to pass to the constructor of `Headers`. Other properties of `init` must be plain values; they will not be evaluated as expressions before passing to the `Response` constructor.

At this time, `init.webSocket` (a Cloudflare Workers extension) is not supported and must not be sent, though that may change if `WebSocket` gains support for serialization.
The 4-element form represents an HTTP/WebSocket upgrade continuation, as in the Cloudflare Workers `Response.webSocket` extension. It is only produced when serializing a `Response` with an attached `.webSocket`; bare WebSocket objects are not otherwise serializable values. `body` must be `null` for an upgrade response. `sessionExpression` evaluates to an exported upgrade session that forwards `send()`, `close()`, and socket events. The receiver reconstructs a `Response` with a non-standard `.webSocket` property; the presence of that property is the upgrade signal. Since standard `Response` constructors cannot represent 1xx statuses, senders omit `status` and `statusText` from `init` when an upgrade response has a 1xx status. A runtime that requires a native upgrade socket may create a local WebSocket pair and bridge it to `sessionExpression`.

`["import", importId, propertyPath, callArguments]`
`["pipeline", importId, propertyPath, callArguments]`
Expand Down
31 changes: 24 additions & 7 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -805,11 +805,11 @@ export class RpcPayload {
// return, so that we can make sure they are not disposed before the pipeline ends.
//
// This is initialized on first use.
private rpcTargets?: Map<RpcTarget | Function | WritableStream | ReadableStream, StubHook>;
private rpcTargets?: Map<object, StubHook>;

// Get the StubHook representing the given RpcTarget found inside this payload.
public getHookForRpcTarget(target: RpcTarget | Function, parent: object | undefined,
dupStubs: boolean = true): StubHook {
dupStubs: boolean = true, mapKey: object = target): StubHook {
if (this.source === "params") {
if (dupStubs) {
// We aren't supposed to take ownership of stubs appearing in params -- we're supposed to
Expand Down Expand Up @@ -846,12 +846,12 @@ export class RpcPayload {
// * If the target is not in the map, we just create it, but don't populate the map.
// * If the target *is* in the map, we *remove* the hook from the map, and return it.

let hook = this.rpcTargets?.get(target);
let hook = this.rpcTargets?.get(mapKey);
if (hook) {
if (dupStubs) {
return hook.dup();
} else {
this.rpcTargets?.delete(target);
this.rpcTargets?.delete(mapKey);
return hook;
}
} else {
Expand All @@ -860,7 +860,7 @@ export class RpcPayload {
if (!this.rpcTargets) {
this.rpcTargets = new Map;
}
this.rpcTargets.set(target, hook);
this.rpcTargets.set(mapKey, hook);
return hook.dup();
} else {
return hook;
Expand Down Expand Up @@ -1078,7 +1078,15 @@ export class RpcPayload {
// Make an actual copy of the object, e.g. so the headers are copied.
// Note that it would be incorrect to use clone() here since that would tee() the body
// stream.
return new Response(resp.body, resp);
let result: any = new Response(resp.body, resp);
let webSocket = (<any>resp).webSocket;
if (webSocket) {
Object.defineProperty(result, "webSocket", {
value: webSocket,
configurable: true,
});
}
return result;
}

default:
Expand Down Expand Up @@ -1348,7 +1356,16 @@ export class RpcPayload {
// The body may be a ReadableStream that has an associated hook in rpcTargets.
let resp = <Response>value;
if (resp.body) this.disposeImpl(resp.body, resp);
// TODO: When we support WebSocket, we may need to dispose response.webSocket here?
let cfResp = resp as any;
if (cfResp.webSocket) {
let hook = this.rpcTargets?.get(cfResp.webSocket);
if (hook) {
hook.dispose();
this.rpcTargets!.delete(cfResp.webSocket);
} else {
try { cfResp.webSocket.close(); } catch {}
}
}
return;
}

Expand Down
Loading