A FastAPI-based load balancing worker for RunPod Serverless. Traffic is routed directly to available workers — no queuing — for low-latency HTTP and WebSocket access.
| Method | Path | Description |
|---|---|---|
GET |
/ping |
Health check (required by RunPod) |
POST |
/generate |
Text generation (swap mock for real model) |
GET |
/stats |
Request and connection stats |
WS |
/ws |
Streaming WebSocket generation |
.
├── app.py # FastAPI application
├── Dockerfile # Container definition
├── requirements.txt # Python dependencies
└── README.md
pip install -r requirements.txt
python3 app.py
# Server starts on http://localhost:80docker build --platform linux/amd64 -t YOUR_DOCKER_USERNAME/worker-lb-websocket:latest .
docker push YOUR_DOCKER_USERNAME/worker-lb-websocket:latest- Go to the Serverless page in the RunPod console.
- Click New Endpoint → Import from Docker Registry.
- Enter your image URL:
YOUR_DOCKER_USERNAME/worker-lb-websocket:latest - Click Next and give your endpoint a name.
- Under Endpoint Type, select Load Balancer.
- Choose at least one GPU type (16 GB+ recommended).
- Click Deploy Endpoint.
Once deployed, all endpoints are accessible at:
https://<ENDPOINT_ID>.api.runpod.ai/<PATH>
Replace ENDPOINT_ID and RUNPOD_API_KEY in every command below.
curl -X GET "https://ENDPOINT_ID.api.runpod.ai/ping" \
-H "Authorization: Bearer RUNPOD_API_KEY"Expected response:
{"status": "healthy"}curl -X POST "https://ENDPOINT_ID.api.runpod.ai/generate" \
-H "Authorization: Bearer RUNPOD_API_KEY" \
-H "Content-Type: application/json" \
-d '{"prompt": "Hello, world!", "max_tokens": 50, "temperature": 0.8}'Expected response:
{"generated_text": "Response to: 'Hello, world!' (tokens=50, temp=0.8, request #1)"}curl -X GET "https://ENDPOINT_ID.api.runpod.ai/stats" \
-H "Authorization: Bearer RUNPOD_API_KEY"Expected response:
{"total_requests": 1, "active_websocket_connections": 0}The WebSocket endpoint is at:
wss://ENDPOINT_ID.api.runpod.ai/ws
Authentication uses the same Authorization: Bearer header.
Install:
npm install -g wscatConnect and send a message:
wscat \
--connect "wss://ENDPOINT_ID.api.runpod.ai/ws" \
--header "Authorization: Bearer RUNPOD_API_KEY"Once connected, type a JSON payload and press Enter:
{"prompt": "Tell me about load balancing", "max_tokens": 20}You will receive a stream of token frames followed by a done frame:
{"token": "Streaming", "index": 0}
{"token": "response", "index": 1}
{"token": "for:", "index": 2}
...
{"done": true, "total_tokens": 7}import asyncio
import json
import websockets
ENDPOINT_ID = "YOUR_ENDPOINT_ID"
RUNPOD_API_KEY = "YOUR_RUNPOD_API_KEY"
async def test_websocket():
url = f"wss://{ENDPOINT_ID}.api.runpod.ai/ws"
headers = {"Authorization": f"Bearer {RUNPOD_API_KEY}"}
async with websockets.connect(url, additional_headers=headers) as ws:
payload = {"prompt": "Hello from Python!", "max_tokens": 30, "temperature": 0.7}
await ws.send(json.dumps(payload))
while True:
message = await ws.recv()
data = json.loads(message)
print(data)
if data.get("done"):
break
asyncio.run(test_websocket())Run it:
pip install websockets
python3 test_ws.pyStart the server locally:
python3 app.pyThen connect without auth headers:
wscat --connect "ws://localhost:80/ws"Or with the Python script, replace the URL with ws://localhost:80/ws and remove the headers argument.
| Variable | Default | Description |
|---|---|---|
PORT |
80 |
Main application port (HTTP + WebSocket) |
PORT_HEALTH |
same as PORT |
Health check port (set to a different port if needed) |
Note: RunPod load-balancing workers share a single port for both HTTP and WebSocket traffic, so
PORTcovers both.
Workers may be initialising when your first request arrives. Use retry logic:
import requests, time
def wait_for_worker(base_url, api_key, retries=5, delay=5):
headers = {"Authorization": f"Bearer {api_key}"}
for _ in range(retries):
try:
r = requests.get(f"{base_url}/ping", headers=headers, timeout=10)
if r.status_code == 200:
return True
except Exception:
pass
time.sleep(delay)
return Falsetest_scaling.py reproduces the following topology and probes whether RunPod's
load-balancer counts WebSocket connections as active work for scaling purposes:
Client ──WS (token in URL)──> Traefik / reverse proxy ──> FastAPI/uvicorn worker
It runs three tests:
| # | Test | What it checks |
|---|---|---|
| 1 | HTTP /ping polling |
Whether repeated health-checks alone trigger scaling |
| 2 | Concurrent WS connections | Whether WS-only load causes workers to scale up |
| 3 | Mixed HTTP + WS | Whether HTTP POST /generate alongside a WS session scales normally |
pip install httpx websockets# Terminal 1: start the worker
python3 app.py
# Terminal 2: run the tests
python3 test_scaling.py --url http://localhost:80python3 test_scaling.py \
--url https://ENDPOINT_ID.api.runpod.ai \
--token YOUR_SESSION_TOKEN \
--connections 10 \
--duration 30 \
--health-polls 30 \
--health-interval 1The --token value is appended to the WS URL as ?token=TOKEN, matching the
ephemeral-token-in-URL pattern used by a Traefik reverse proxy (no custom
headers required on the client side).
-
WS-only test FAILS, HTTP test PASSES → RunPod is not counting open WS connections as active work. Workers scale to 0 between WS messages. Workaround: keep at least one worker warm (
min_workers = 1), or send a lightweight HTTP keepalive (e.g.GET /pingevery 30 s) from your proxy. -
All tests PASS → Scaling is working correctly. If you still see issues, check your Traefik configuration — it may be closing idle WS connections before the RunPod worker sees them.
-
Connection errors on WS → Verify
ENDPOINT_ID,PORT, and that the endpoint type is Load Balancer (not Queue-based). Queue-based endpoints do not support WebSocket.
- Replace the mock body in
generate()and the WebSocket handler with real model inference (e.g. a local vLLM instance or any Hugging Face model). - Add authentication middleware using
Depends()if you need per-user API keys beyond the RunPod bearer token. - Add more routes following the same pattern — all routes are automatically available under the same endpoint URL.