feat: Integrate OpenAI proxy controller, token usage tracking, and dependency updates#4
Conversation
…pendency updates Signed-off-by: skurzyp-blockydevs <stanislaw.kurzyp@blockydevs.com>
…d usage tracking, and improved error handling Refactored ChatController initialization, added CORS for local environments, and optimized environment variable loading.
There was a problem hiding this comment.
Nice structure overall (BYOK fallback, shared streaming helper, centralized error handler, usage tracking abstraction). The implementation is solid directionally, but there are a few must-fix issues before merging — mainly around Fastify streaming lifecycle, Responses API streaming usage, and separation of concerns.
🚨 Must-fix
1. Fastify streaming lifecycle
-
You write directly to
reply.raw. When manually taking over the response stream in Fastify, you should callreply.hijack(); otherwise you can hit lifecycle/headers issues depending on environment and plugins. -
Consider also:
flushHeaders()(if available)X-Accel-Buffering: noCache-Control: no-transform
These help prevent buffering issues when running behind nginx/CDNs.
2. Client disconnect handling
- If the client closes the connection mid-stream, we should stop consuming the upstream OpenAI stream. Otherwise we may continue generating tokens and incurring cost.
- Add
request.raw.on('close' | 'aborted')handler and abort/cancel the OpenAI stream if supported (or break the loop).
3. Responses API streaming usage is not captured
- Current
handleStreamingRequest()checks onlychunk.usage. - In Responses streaming, usage typically arrives nested (e.g. under
response.completed → response.usage). - As implemented,
/responsesstreaming will likely never record usage and will log “No usage data…”.
We need a small usage extractor that handles:
chunk.usage ?? chunk.response?.usage4. Don’t throw JSON errors after streaming has started
-
If an error occurs after we start writing SSE frames, calling
reply.status().send()inhandleOpenAIError()can fail because headers/body were already sent. -
For streaming paths:
- Either emit an SSE error event and close
- Or just end the socket
-
JSON error responses should be reserved for non-streaming paths only.
5. Usage tracking after response end can break the handler
handleStreamingRequest()ends the response and then awaitstrackUsage().- If
trackUsage()throws, the outer catch may attempt to send an error response on a closed reply. - Wrap
trackUsage()in its own try/catch and log failures instead of propagating.
🏗 Architectural Concern – Controller Is Doing Too Much
The controller currently handles:
- Route registration
- HTTP/SSE transport
- BYOK resolution
- Usage limit checks
- OpenAI client instantiation
- Streaming orchestration
- Usage extraction
- Usage accounting
- Error translation
This mixes transport concerns with application/business logic. It’s not unmanageable yet, but it’s already trending toward a “god controller.”
Recommendation (incremental refactor, not a rewrite)
Keep in Controller:
- Route registration
- Extracting authenticated
userId - SSE header setup / hijack
- Writing SSE frames
- Mapping service errors → HTTP responses
Extract to an OpenAIProxyService (application layer):
checkUsageLimit- BYOK resolution (
resolveOpenAIClient) - OpenAI API invocation
- Streaming vs non-streaming orchestration
- Usage extraction logic
- Calling
tokenUsageService.incrementUsage
Optionally:
OpenAIClientFactoryfor BYOK/client creationUsageExtractorutility to normalize Chat vs Responses usage formats
Why this matters
- Better separation of concerns
- Proper unit testability (without Fastify/SSE)
- Easier evolution (model allowlists, org quotas, pricing tiers, retries, observability)
- Reduced risk of future streaming edge-case bugs
Not blocking for this PR if time is tight, but strongly recommend either addressing now or creating a follow-up refactor task.
⚠️ Strong recommendations
- Do not trust
x-user-idheader for auth/quota/BYOK resolution. DeriveuserIdfrom auth middleware (JWT/session). Header spoofing can otherwise spend other users’ quota. - Add request validation/schema for
/responsesinstead ofRecord<string, any>. - Consider model allowlisting and sensible limits (
max_output_tokens, tools usage, etc.) to prevent abuse/cost spikes. - Consider whether to append
data: [DONE]for/responses. Responses streaming is JSON event-based;[DONE]may break strict SSE JSON consumers. Align with expected client behavior.
Overall: good foundation, but we need to harden streaming lifecycle handling and clean up separation of concerns before this grows further.
There was a problem hiding this comment.
Key Changes & Addressed Feedback
- Fastify Streaming Lifecycle:
- Implemented
reply.hijack()for manual socket control. - Added
X-Accel-Buffering: noandCache-Control: no-transformheaders.
- Implemented
- Client Disconnect Handling:
- Added
reply.raw.on('close')listeners to trigger upstream aborts viaAbortController. - Stops upstream consumption immediately to save tokens.
- Added
- Responses API Streaming Usage:
- Validated and fixed usage extraction for nested
chunk.response.usagefields.
- Validated and fixed usage extraction for nested
- Error Handling:
- Prevented JSON error responses after streaming headers are sent; falls back to SSE error events or stream termination.
- Safe Usage Tracking:
- Wrapped
trackUsagein a try/catch block to ensure logging of usage failures never crashes the response handler.
- Wrapped
- Architectural Separation:
- Extracted
OpenAIProxyServiceto handle business logic (BYOK resolution, validation, API calls). - Controller now purely handles transport (HTTP/SSE, headers, error mapping).
- Extracted
Testing & Validation
Validated compatibility with popular AI SDKs using an external project which utilizes the OpenAI Proxy:
- Langchain JS: Verified
ChatOpenAIworks correctly for both streaming and non-streaming modes. - Vercel AI SDK: Verified
generateTextandstreamTextwork correctly for both streaming and non-streaming modes.
In addition to the above, tested the connection closing if the client disconnects mid-stream.
Ad Strong recommendations section
x-user-idheader is secure and verified before the request reaches the server. There is SPOE service before in the request routing path.- routes
bodyare now typed with OpenAI types
async registerRoutes(): Promise<void> {
this.fastify.post<{ Body: ChatCompletionCreateParamsStreaming | ChatCompletionCreateParamsNonStreaming }>(
`${this.basePath}/chat/completions`,
this.handleChatCompletion.bind(this)
);
this.fastify.post<{ Body: ResponseCreateParamsStreaming | ResponseCreateParamsNonStreaming }>(
`${this.basePath}/responses`,
this.handleResponses.bind(this)
);
}What do you think about approach?
2 & 3. Skipped for now
…or handling and usage tracking Signed-off-by: skurzyp-blockydevs <stanislaw.kurzyp@blockydevs.com>
Signed-off-by: skurzyp-blockydevs <stanislaw.kurzyp@blockydevs.com>
…streaming/non-streaming requests Signed-off-by: skurzyp-blockydevs <stanislaw.kurzyp@blockydevs.com>
… handling Signed-off-by: skurzyp-blockydevs <stanislaw.kurzyp@blockydevs.com>
Signed-off-by: skurzyp-blockydevs <stanislaw.kurzyp@blockydevs.com>
|
@piotrswierzy Thanks. I'll add a placeholder task to the board so we don't forget about further improvements. You can fill it with details later. |
…ith regex pattern matching Signed-off-by: skurzyp-blockydevs <stanislaw.kurzyp@blockydevs.com>
pat-rg
left a comment
There was a problem hiding this comment.
- Change proxy basePath -
/api/openai/v1→/api/playground/assistant/....
The HAProxy API Gateway uses an ACL to route traffic to the ai-assistant microservice:
acl allowed_path path_beg /api/playground/assistant
http-request deny deny_status 404 if !allowed_path
Any path not starting with /api/playground/assistant will be rejected with 404. By changing the basePath, the new proxy routes will automatically inherit authentication (SPOE), rate limiting, and CORS already configured in HAProxy.
- Fix unused
chatServiceinindex.ts- Either inject it intoChatControllerImplor remove it (currently dead code)
📝 Note about branches
If it's not an inconvenience, could you create a branch directly in the main repository instead of using a fork? This allows the CI workflow (ai-assistant-build.yaml) to build and deploy a version to the develop environment for testing before merging. For future PRs, please follow this approach as well.
|
Thank you for your review, @pat-rg. I’ll implement changes 1 and 2. Regarding point 2, thanks for catching that - I’ll remove the variable. Unfortunately, I’m not able to create a branch directly in your repository since I don’t have write access. |
…initialization Signed-off-by: skurzyp-blockydevs <stanislaw.kurzyp@blockydevs.com>
|
@pat-rg I have pushed the changes. Please re-review |
pat-rg
left a comment
There was a problem hiding this comment.
Thanks for the quick fix!
I pushed your changes to a branch in the main repository to deploy them to the develop environment. During deployment, this ESM import error came up:
Error [ERR_MODULE_NOT_FOUND]: Cannot find module '/app/dist/utils/environment'
In index.ts line 17, the import is missing the .js extension:
// Current
import { isLocal } from "./utils/environment";
// Should be
import { isLocal } from "./utils/environment.js";
This error only shows up when running the compiled code (npm run build && npm start), not during development with ts-node. For future changes, please test the production build locally before pushing.
Could you fix this and push to your fork? Once updated, I'll pull the changes and redeploy.
Also, I'm looking into getting you write access for future PRs.
Signed-off-by: skurzyp-blockydevs <stanislaw.kurzyp@blockydevs.com>
|
@pat-rg I have fixed it but there is some other issue now. I'll let you know when it will be ready |
Signed-off-by: skurzyp-blockydevs <stanislaw.kurzyp@blockydevs.com>
|
@pat-rg it should be all good now. Could you please try to redeploy? |
@skurzyp-blockydevs The updated version is now deployed to the develop environment. The API is available at: |
|
Thank you. We are still waiting on deployment of the hedera portal frontend. Our PR I stuck due to SNYK check failing and I don't have access to the logs. Can you by any chance help me out with it? It would be if you could provide them. Also, is this check strictly necessary for the development environment? I assume that there can be some problem in a sub dependency of our Agentic SDK and this means we will need to make a new release. This might take couple of days and if possible we would like to be able to test the FE in the meantime. |
Changes:
/chat/completionsand/responsesTokenUsageService- it is reused in bothAiAssistantand theOpenAiProxyTodo:
AiAssistant- Blocked by not having access to required services