diff --git a/app/ai-assistant/package-lock.json b/app/ai-assistant/package-lock.json index 2117c62..fada7da 100644 --- a/app/ai-assistant/package-lock.json +++ b/app/ai-assistant/package-lock.json @@ -9,15 +9,15 @@ "version": "1.0.0", "license": "MIT", "dependencies": { - "@ai-sdk/openai": "^2.0.71", + "@ai-sdk/openai": "2.0.89", "@fastify/cors": "^10.1.0", "@google-cloud/kms": "5.2.1", "@types/pg": "^8.15.6", - "ai": "^5.0.60", + "ai": "6.0.78", "dotenv": "^16.4.7", "fastify": "5.5.0", "ioredis": "^5.4.1", - "openai": "^6.0.0", + "openai": "6.19.0", "pg": "^8.16.3", "pino": "^9.8.0", "pino-pretty": "^13.1.1", @@ -30,14 +30,14 @@ } }, "node_modules/@ai-sdk/gateway": { - "version": "1.0.33", - "resolved": "https://registry.npmjs.org/@ai-sdk/gateway/-/gateway-1.0.33.tgz", - "integrity": "sha512-v9i3GPEo4t3fGcSkQkc07xM6KJN75VUv7C1Mqmmsu2xD8lQwnQfsrgAXyNuWe20yGY0eHuheSPDZhiqsGKtH1g==", + "version": "3.0.39", + "resolved": "https://registry.npmjs.org/@ai-sdk/gateway/-/gateway-3.0.39.tgz", + "integrity": "sha512-SeCZBAdDNbWpVUXiYgOAqis22p5MEYfrjRw0hiBa5hM+7sDGYQpMinUjkM8kbPXMkY+AhKLrHleBl+SuqpzlgA==", "license": "Apache-2.0", "dependencies": { - "@ai-sdk/provider": "2.0.0", - "@ai-sdk/provider-utils": "3.0.10", - "@vercel/oidc": "^3.0.1" + "@ai-sdk/provider": "3.0.8", + "@ai-sdk/provider-utils": "4.0.14", + "@vercel/oidc": "3.1.0" }, "engines": { "node": ">=18" @@ -46,14 +46,27 @@ "zod": "^3.25.76 || ^4.1.8" } }, - "node_modules/@ai-sdk/openai": { - "version": "2.0.71", - "resolved": "https://registry.npmjs.org/@ai-sdk/openai/-/openai-2.0.71.tgz", - "integrity": "sha512-tg+gj+R0z/On9P4V7hy7/7o04cQPjKGayMCL3gzWD/aNGjAKkhEnaocuNDidSnghizt8g2zJn16cAuAolnW+qQ==", + "node_modules/@ai-sdk/gateway/node_modules/@ai-sdk/provider": { + "version": "3.0.8", + "resolved": "https://registry.npmjs.org/@ai-sdk/provider/-/provider-3.0.8.tgz", + "integrity": "sha512-oGMAgGoQdBXbZqNG0Ze56CHjDZ1IDYOwGYxYjO5KLSlz5HiNQ9udIXsPZ61VWaHGZ5XW/jyjmr6t2xz2jGVwbQ==", + "license": "Apache-2.0", + "dependencies": { + "json-schema": "^0.4.0" + }, + "engines": { + "node": ">=18" + } + }, + "node_modules/@ai-sdk/gateway/node_modules/@ai-sdk/provider-utils": { + "version": "4.0.14", + "resolved": "https://registry.npmjs.org/@ai-sdk/provider-utils/-/provider-utils-4.0.14.tgz", + "integrity": "sha512-7bzKd9lgiDeXM7O4U4nQ8iTxguAOkg8LZGD9AfDVZYjO5cKYRwBPwVjboFcVrxncRHu0tYxZtXZtiLKpG4pEng==", "license": "Apache-2.0", "dependencies": { - "@ai-sdk/provider": "2.0.0", - "@ai-sdk/provider-utils": "3.0.17" + "@ai-sdk/provider": "3.0.8", + "@standard-schema/spec": "^1.1.0", + "eventsource-parser": "^3.0.6" }, "engines": { "node": ">=18" @@ -62,15 +75,14 @@ "zod": "^3.25.76 || ^4.1.8" } }, - "node_modules/@ai-sdk/openai/node_modules/@ai-sdk/provider-utils": { - "version": "3.0.17", - "resolved": "https://registry.npmjs.org/@ai-sdk/provider-utils/-/provider-utils-3.0.17.tgz", - "integrity": "sha512-TR3Gs4I3Tym4Ll+EPdzRdvo/rc8Js6c4nVhFLuvGLX/Y4V9ZcQMa/HTiYsHEgmYrf1zVi6Q145UEZUfleOwOjw==", + "node_modules/@ai-sdk/openai": { + "version": "2.0.89", + "resolved": "https://registry.npmjs.org/@ai-sdk/openai/-/openai-2.0.89.tgz", + "integrity": "sha512-4+qWkBCbL9HPKbgrUO/F2uXZ8GqrYxHa8SWEYIzxEJ9zvWw3ISr3t1/27O1i8MGSym+PzEyHBT48EV4LAwWaEw==", "license": "Apache-2.0", "dependencies": { - "@ai-sdk/provider": "2.0.0", - "@standard-schema/spec": "^1.0.0", - "eventsource-parser": "^3.0.6" + "@ai-sdk/provider": "2.0.1", + "@ai-sdk/provider-utils": "3.0.20" }, "engines": { "node": ">=18" @@ -80,9 +92,9 @@ } }, "node_modules/@ai-sdk/provider": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/@ai-sdk/provider/-/provider-2.0.0.tgz", - "integrity": "sha512-6o7Y2SeO9vFKB8lArHXehNuusnpddKPk7xqL7T2/b+OvXMRIXUO1rR4wcv1hAFUAT9avGZshty3Wlua/XA7TvA==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/@ai-sdk/provider/-/provider-2.0.1.tgz", + "integrity": "sha512-KCUwswvsC5VsW2PWFqF8eJgSCu5Ysj7m1TxiHTVA6g7k360bk0RNQENT8KTMAYEs+8fWPD3Uu4dEmzGHc+jGng==", "license": "Apache-2.0", "dependencies": { "json-schema": "^0.4.0" @@ -92,14 +104,14 @@ } }, "node_modules/@ai-sdk/provider-utils": { - "version": "3.0.10", - "resolved": "https://registry.npmjs.org/@ai-sdk/provider-utils/-/provider-utils-3.0.10.tgz", - "integrity": "sha512-T1gZ76gEIwffep6MWI0QNy9jgoybUHE7TRaHB5k54K8mF91ciGFlbtCGxDYhMH3nCRergKwYFIDeFF0hJSIQHQ==", + "version": "3.0.20", + "resolved": "https://registry.npmjs.org/@ai-sdk/provider-utils/-/provider-utils-3.0.20.tgz", + "integrity": "sha512-iXHVe0apM2zUEzauqJwqmpC37A5rihrStAih5Ks+JE32iTe4LZ58y17UGBjpQQTCRw9YxMeo2UFLxLpBluyvLQ==", "license": "Apache-2.0", "dependencies": { - "@ai-sdk/provider": "2.0.0", + "@ai-sdk/provider": "2.0.1", "@standard-schema/spec": "^1.0.0", - "eventsource-parser": "^3.0.5" + "eventsource-parser": "^3.0.6" }, "engines": { "node": ">=18" @@ -794,9 +806,9 @@ "license": "BSD-3-Clause" }, "node_modules/@standard-schema/spec": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/@standard-schema/spec/-/spec-1.0.0.tgz", - "integrity": "sha512-m2bOd0f2RT9k8QJx1JN85cZYyH1RqFBdlwtkSlf4tBDYLCiiZnv1fIIwacK6cqwXavOydf0NPToMQgpKq+dVlA==", + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@standard-schema/spec/-/spec-1.1.0.tgz", + "integrity": "sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w==", "license": "MIT" }, "node_modules/@tootallnate/once": { @@ -829,9 +841,9 @@ } }, "node_modules/@vercel/oidc": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/@vercel/oidc/-/oidc-3.0.1.tgz", - "integrity": "sha512-V/YRVrJDqM6VaMBjRUrd6qRMrTKvZjHdVdEmdXsOZMulTa3iK98ijKTc3wldBmst6W5rHpqMoKllKcBAHgN7GQ==", + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@vercel/oidc/-/oidc-3.1.0.tgz", + "integrity": "sha512-Fw28YZpRnA3cAHHDlkt7xQHiJ0fcL+NRcIqsocZQUSmbzeIKRpwttJjik5ZGanXP+vlA4SbTg+AbA3bP363l+w==", "license": "Apache-2.0", "engines": { "node": ">= 20" @@ -853,14 +865,14 @@ } }, "node_modules/ai": { - "version": "5.0.60", - "resolved": "https://registry.npmjs.org/ai/-/ai-5.0.60.tgz", - "integrity": "sha512-80U/3kmdBW6g+JkLXpz/P2EwkyEaWlPlYtuLUpx/JYK9F7WZh9NnkYoh1KvUi1Sbpo0NyurBTvX0a2AG9mmbDA==", + "version": "6.0.78", + "resolved": "https://registry.npmjs.org/ai/-/ai-6.0.78.tgz", + "integrity": "sha512-eriIX/NLWfWNDeE/OJy8wmIp9fyaH7gnxTOCPT5bp0MNkvORstp1TwRUql9au8XjXzH7o2WApqbwgxJDDV0Rbw==", "license": "Apache-2.0", "dependencies": { - "@ai-sdk/gateway": "1.0.33", - "@ai-sdk/provider": "2.0.0", - "@ai-sdk/provider-utils": "3.0.10", + "@ai-sdk/gateway": "3.0.39", + "@ai-sdk/provider": "3.0.8", + "@ai-sdk/provider-utils": "4.0.14", "@opentelemetry/api": "1.9.0" }, "engines": { @@ -870,6 +882,35 @@ "zod": "^3.25.76 || ^4.1.8" } }, + "node_modules/ai/node_modules/@ai-sdk/provider": { + "version": "3.0.8", + "resolved": "https://registry.npmjs.org/@ai-sdk/provider/-/provider-3.0.8.tgz", + "integrity": "sha512-oGMAgGoQdBXbZqNG0Ze56CHjDZ1IDYOwGYxYjO5KLSlz5HiNQ9udIXsPZ61VWaHGZ5XW/jyjmr6t2xz2jGVwbQ==", + "license": "Apache-2.0", + "dependencies": { + "json-schema": "^0.4.0" + }, + "engines": { + "node": ">=18" + } + }, + "node_modules/ai/node_modules/@ai-sdk/provider-utils": { + "version": "4.0.14", + "resolved": "https://registry.npmjs.org/@ai-sdk/provider-utils/-/provider-utils-4.0.14.tgz", + "integrity": "sha512-7bzKd9lgiDeXM7O4U4nQ8iTxguAOkg8LZGD9AfDVZYjO5cKYRwBPwVjboFcVrxncRHu0tYxZtXZtiLKpG4pEng==", + "license": "Apache-2.0", + "dependencies": { + "@ai-sdk/provider": "3.0.8", + "@standard-schema/spec": "^1.1.0", + "eventsource-parser": "^3.0.6" + }, + "engines": { + "node": ">=18" + }, + "peerDependencies": { + "zod": "^3.25.76 || ^4.1.8" + } + }, "node_modules/ajv": { "version": "8.17.1", "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.17.1.tgz", @@ -1829,9 +1870,9 @@ } }, "node_modules/openai": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/openai/-/openai-6.0.0.tgz", - "integrity": "sha512-J7LEmTn3WLZnbyEmMYcMPyT5A0fGzhPwSvVUcNRKy6j2hJIbqSFrJERnUHYNkcoCCalRumypnj9AVoe5bVHd3Q==", + "version": "6.19.0", + "resolved": "https://registry.npmjs.org/openai/-/openai-6.19.0.tgz", + "integrity": "sha512-5uGrF82Ql7TKgIWUnuxh+OyzYbPRPwYDSgGc05JowbXRFsOkuj0dJuCdPCTBZT4mcmp2NEvj/URwDzW+lYgmVw==", "license": "Apache-2.0", "bin": { "openai": "bin/cli" diff --git a/app/ai-assistant/package.json b/app/ai-assistant/package.json index 8f9fb0a..b4f53c2 100644 --- a/app/ai-assistant/package.json +++ b/app/ai-assistant/package.json @@ -23,15 +23,15 @@ "author": "", "license": "MIT", "dependencies": { - "@ai-sdk/openai": "^2.0.71", + "@ai-sdk/openai": "2.0.89", "@fastify/cors": "^10.1.0", "@google-cloud/kms": "5.2.1", "@types/pg": "^8.15.6", - "ai": "^5.0.60", + "ai": "6.0.78", "dotenv": "^16.4.7", "fastify": "5.5.0", "ioredis": "^5.4.1", - "openai": "^6.0.0", + "openai": "6.19.0", "pg": "^8.16.3", "pino": "^9.8.0", "pino-pretty": "^13.1.1", diff --git a/app/ai-assistant/src/application/controllers/OpenAIProxyController.ts b/app/ai-assistant/src/application/controllers/OpenAIProxyController.ts new file mode 100644 index 0000000..944a4e4 --- /dev/null +++ b/app/ai-assistant/src/application/controllers/OpenAIProxyController.ts @@ -0,0 +1,3 @@ +export default interface IOpenAIProxyController { + registerRoutes(): Promise; +} diff --git a/app/ai-assistant/src/application/controllers/impl/ChatControllerImpl.ts b/app/ai-assistant/src/application/controllers/impl/ChatControllerImpl.ts index 305c929..ec31c49 100644 --- a/app/ai-assistant/src/application/controllers/impl/ChatControllerImpl.ts +++ b/app/ai-assistant/src/application/controllers/impl/ChatControllerImpl.ts @@ -15,14 +15,6 @@ export class ChatControllerImpl { } async registerRoutes(): Promise { - if (isLocal) { - // Register CORS for dev environment - await this.fastify.register((await import('@fastify/cors')).default, { - origin: true, - credentials: true - }); - } - this.fastify.post(`${this.basePath}/chat`, this.startConversation.bind(this)); this.fastify.get(`${this.basePath}/chat/history/:conversationId`, this.getConversationHistory.bind(this)); } diff --git a/app/ai-assistant/src/application/controllers/impl/OpenAIProxyControllerImpl.ts b/app/ai-assistant/src/application/controllers/impl/OpenAIProxyControllerImpl.ts new file mode 100644 index 0000000..95729b6 --- /dev/null +++ b/app/ai-assistant/src/application/controllers/impl/OpenAIProxyControllerImpl.ts @@ -0,0 +1,288 @@ +import { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify'; +import type { + ChatCompletionCreateParamsNonStreaming, + ChatCompletionCreateParamsStreaming +} from 'openai/resources/chat/completions'; +import type { + ResponseCreateParamsNonStreaming, + ResponseCreateParamsStreaming +} from 'openai/resources/responses/responses'; +import { createLogger } from '../../../utils/logger.js'; +import { AuthenticationError, ErrorReason } from '../../../utils/errors.js'; +import { OpenAIProxyService } from '../../../domain/services/OpenAIProxyService.js'; + +import IOpenAIProxyController from '../OpenAIProxyController.js'; + +/** + * Controller implementation that acts as a proxy for OpenAI API requests. + * Handles chat completion and responses endpoints with support for: + * - User-specific API keys (BYOK - Bring Your Own Key) + * - Token usage tracking and limits + * - Streaming and non-streaming responses + */ +export class OpenAIProxyControllerImpl implements IOpenAIProxyController { + /** Base path for all OpenAI proxy endpoints */ + private basePath = '/api/playground/assistant/openai/v1'; + /** Logger instance for tracking operations */ + private logger = createLogger(undefined, 'OpenAIProxyController'); + + /** + * Initializes the OpenAI proxy controller + * @param fastify - Fastify server instance for route registration + * @param openAIProxyService - Service for OpenAI interactions + */ + constructor( + private fastify: FastifyInstance, + private openAIProxyService: OpenAIProxyService + ) { } + + /** + * Registers API routes for OpenAI proxy endpoints + * Routes: + * - POST /api/openai/v1/chat/completions - Proxies chat completion requests (used by Langchain based agents) + * - POST /api/openai/v1/responses - Proxies response API requests (used by Vercel AI-SDK based agents) + */ + async registerRoutes(): Promise { + this.fastify.post<{ Body: ChatCompletionCreateParamsStreaming | ChatCompletionCreateParamsNonStreaming }>( + `${this.basePath}/chat/completions`, + this.handleChatCompletion.bind(this) + ); + this.fastify.post<{ Body: ResponseCreateParamsStreaming | ResponseCreateParamsNonStreaming }>( + `${this.basePath}/responses`, + this.handleResponses.bind(this) + ); + } + + /** + * Configures HTTP headers required for Server-Sent Events (SSE) streaming + * @param reply - Fastify reply object to configure headers on + */ + private setupStreamHeaders(reply: FastifyReply): void { + reply.raw.setHeader('Content-Type', 'text/event-stream'); + reply.raw.setHeader('Cache-Control', 'no-cache'); + reply.raw.setHeader('Connection', 'keep-alive'); + // Prevent buffering in proxies + reply.raw.setHeader('X-Accel-Buffering', 'no'); + } + + /** + * Handles streaming responses from OpenAI API + * Streams chunks to the client as SSE and tracks token usage + * @param stream - Async iterable stream from OpenAI + * @param reply - Fastify reply object for writing response + * @param userId - User identifier for usage tracking + * @param model - Model name for usage tracking + */ + private async handleStreamingRequest( + stream: AsyncIterable, + reply: FastifyReply, + userId: string, + model: string + ): Promise { + // Manually hijack the response to handle streaming directly + reply.hijack(); + this.setupStreamHeaders(reply); + + // Handle client disconnect + // We abort the loop if the client closes the connection + let isClientConnected = true; + + const closeHandler = () => { + isClientConnected = false; + this.logger.debug('Client disconnected during stream', { userId }); + }; + + reply.raw.on('close', closeHandler); + // 'aborted' is also possible on some platforms/versions + reply.raw.on('error', closeHandler); + + let accumulatedUsage: any = null; + + try { + // Stream each chunk to the client + for await (const chunk of stream) { + if (!isClientConnected) { + break; + } + + // Extract usage data if present + const usage = this.openAIProxyService.extractUsage(chunk); + if (usage) { + accumulatedUsage = usage; + } + + // Send chunk as the SSE data event + const jsonString = JSON.stringify(chunk); + reply.raw.write(`data: ${jsonString}\n\n`); + + // Attempt to flush if method exists (e.g. compression middleware might add it) + if (typeof (reply.raw as any).flush === 'function') { + (reply.raw as any).flush(); + } + } + + if (isClientConnected) { + // Send completion signal + reply.raw.write('data: [DONE]\n\n'); + reply.raw.end(); + } + } catch (error) { + this.logger.error('Error during streaming', error, { userId }); + if (isClientConnected) { + // Try to write a specific error event if possible, or just end + reply.raw.write(`event: error\ndata: ${JSON.stringify({ message: 'Stream error' })}\n\n`); + reply.raw.end(); + } + } finally { + reply.raw.removeListener('close', closeHandler); + reply.raw.removeListener('error', closeHandler); + + // Track token usage after stream completes + if (accumulatedUsage) { + // Determine model from final chunk or fallback to request model + await this.openAIProxyService.trackUsage(userId, model, accumulatedUsage); + } + } + } + + /** + * Centralized error handler for OpenAI API errors + * Logs errors and returns appropriate HTTP responses + * @param error - Error object from OpenAI or internal error + * @param reply - Fastify reply object for sending error response + * @param apiType - Type of API call (e.g., 'Chat Completion', 'Responses') + * @returns Error response sent to a client + */ + private handleOpenAIError(error: any, reply: FastifyReply, apiType: string): any { + this.logger.error(`Error in OpenAI proxy (${apiType})`, error); + + // Re-throw authentication errors to be handled by auth middleware + if (error instanceof AuthenticationError) { + throw new AuthenticationError('OpenAI API Error', ErrorReason.EXTERNAL_SERVICE_ERROR); + } + + // If headers are already sent, we can't send a JSON response + if (reply.raw.headersSent) { + return; + } + + // Extract error details with fallbacks + const status = error.status || 500; + const message = error.message || 'Error communicating with OpenAI'; + + return reply.status(status).send({ + error: { + message: message, + type: error.type || 'internal_server_error', + code: error.code || null + } + }); + } + + /** + * Handles OpenAI Responses API requests + * Supports both streaming and non-streaming responses + * @param request - Fastify request object + * @param reply - Fastify reply object + * @returns Response from OpenAI or streams response to a client + */ + private async handleResponses(request: FastifyRequest, reply: FastifyReply) { + // Extract user ID from the header, fallback to 'unknown' + const userId = (request.headers['x-user-id'] as string) || 'unknown'; + const body = request.body as ResponseCreateParamsStreaming | ResponseCreateParamsNonStreaming; + + this.logger.info('Processing OpenAI proxy request (Responses API)', { userId, model: body.model }); + + // Create AbortController to handle client disconnects + const abortController = new AbortController(); + const signal = abortController.signal; + + // Abort upstream request if client disconnects + const onClientDisconnect = () => { + this.logger.debug('Client disconnected, aborting upstream request', { userId }); + abortController.abort(); + }; + + reply.raw.on('close', onClientDisconnect); + reply.raw.on('error', onClientDisconnect); + + try { + const isStreaming = body.stream === true; + + if (isStreaming) { + // Handle streaming response + const stream = await this.openAIProxyService.createResponseStream( + userId, + body as ResponseCreateParamsStreaming, + { signal } + ); + return await this.handleStreamingRequest(stream, reply, userId, body.model as string); + } else { + // Handle non-streaming response + return await this.openAIProxyService.createResponse( + userId, + body as ResponseCreateParamsNonStreaming, + { signal } + ); + } + } catch (error: any) { + return this.handleOpenAIError(error, reply, 'Responses'); + } finally { + reply.raw.removeListener('close', onClientDisconnect); + reply.raw.removeListener('error', onClientDisconnect); + } + } + + /** + * Handles OpenAI Chat Completion API requests + * Supports both streaming and non-streaming chat completions + * @param request - Fastify request object + * @param reply - Fastify reply object + * @returns Chat completion response from OpenAI or streams response to a client + */ + private async handleChatCompletion(request: FastifyRequest, reply: FastifyReply) { + // Extract user ID from the header, fallback to 'unknown' + const userId = (request.headers['x-user-id'] as string) || 'unknown'; + const body = request.body as ChatCompletionCreateParamsStreaming | ChatCompletionCreateParamsNonStreaming; + + this.logger.info('Processing OpenAI proxy request', { userId, model: body.model }); + + // Create AbortController to handle client disconnects + const abortController = new AbortController(); + const signal = abortController.signal; + + // Abort upstream request if client disconnects + const onClientDisconnect = () => { + this.logger.debug('Client disconnected, aborting upstream request', { userId }); + abortController.abort(); + }; + + reply.raw.on('close', onClientDisconnect); + reply.raw.on('error', onClientDisconnect); + + try { + const isStreaming = body.stream === true; + + if (isStreaming) { + const stream = await this.openAIProxyService.createChatCompletionStream( + userId, + body as ChatCompletionCreateParamsStreaming, + { signal } + ); + await this.handleStreamingRequest(stream, reply, userId, body.model); + } else { + // Handle non-streaming response + return await this.openAIProxyService.createChatCompletion( + userId, + body as ChatCompletionCreateParamsNonStreaming, + { signal } + ); + } + } catch (error: any) { + return this.handleOpenAIError(error, reply, 'Chat Completion'); + } finally { + reply.raw.removeListener('close', onClientDisconnect); + reply.raw.removeListener('error', onClientDisconnect); + } + } +} diff --git a/app/ai-assistant/src/domain/services/ChatService.ts b/app/ai-assistant/src/domain/services/ChatService.ts index 743aaeb..b706fa7 100644 --- a/app/ai-assistant/src/domain/services/ChatService.ts +++ b/app/ai-assistant/src/domain/services/ChatService.ts @@ -3,18 +3,16 @@ import { createLogger, AppLogger } from '../../utils/logger.js'; import { UserMetadata, UserMetadataType, ExecutionContext } from '../../types.js'; import { CodeReviewAgent, GeneralAssistantAgent, ExecutionAnalyzerAgent, IMockAgent } from '../agents/index.js'; import { MockAgent } from '../agents/implementations/MockAgent.js'; -import { CacheClient } from '../../infrastructure/persistence/RedisConnector.js'; -import { - AuthenticationError, - UsageLimitError, - ValidationError, - ErrorReason +import { + AuthenticationError, + ValidationError, + ErrorReason } from '../../utils/errors.js'; import { UserAIKeyService } from './UserAIKeyService.js'; import { UserAIKeyRepositoryImpl } from '../../infrastructure/repositories/impl/UserAIKeyRepositoryImpl.js'; +import { TokenUsageService } from './TokenUsageService.js'; const VECTOR_STORE_ID = process.env.VECTOR_STORE_ID || 'vs_688ceeab314c8191a557a849b28cf815'; -const TOKENS_LIMIT_PER_MONTH = Number(process.env.TOKENS_LIMIT_PER_MONTH) || 100000; const CODE_REVIEW_MODEL = process.env.CODE_REVIEW_MODEL || 'gpt-4o-mini'; const CODE_INTEGRATION_MODEL = process.env.CODE_INTEGRATION_MODEL || 'gpt-4o-mini'; const GENERAL_ASSISTANT_MODEL = process.env.GENERAL_ASSISTANT_MODEL || 'gpt-4o-mini'; @@ -29,6 +27,7 @@ export class ChatService { private executionAnalyzerAgent: ExecutionAnalyzerAgent; private mockAgent: IMockAgent | null = null; private userAIKeyService: UserAIKeyService; + private tokenUsageService: TokenUsageService; constructor() { this.mockMode = process.env.ENABLE_MOCK_MODE === 'true'; @@ -38,6 +37,7 @@ export class ChatService { this.generalAssistantAgent = new GeneralAssistantAgent(GENERAL_ASSISTANT_MODEL); this.executionAnalyzerAgent = new ExecutionAnalyzerAgent(EXECUTION_ANALYZER_MODEL); this.userAIKeyService = new UserAIKeyService(new UserAIKeyRepositoryImpl()); + this.tokenUsageService = new TokenUsageService(); if (this.mockMode) { this.mockAgent = new MockAgent(); @@ -65,27 +65,16 @@ export class ChatService { throw new AuthenticationError('Custom key not found for user', ErrorReason.CUSTOM_KEY_NOT_FOUND); } } else { - const tokenUsed = await CacheClient.getNumber(userId); - requestLogger.debug('Token used', { tokenUsed: tokenUsed, tokenLimit: TOKENS_LIMIT_PER_MONTH }); - if (Number(tokenUsed) > Number(TOKENS_LIMIT_PER_MONTH)) { - requestLogger.error('Token limit exceeded', undefined, { - tokenUsed, - limit: TOKENS_LIMIT_PER_MONTH - }); - throw new UsageLimitError('Token limit exceeded', ErrorReason.TOKEN_LIMIT_EXCEEDED, { - tokenUsed, - limit: TOKENS_LIMIT_PER_MONTH - }); - } + await this.tokenUsageService.checkUsageLimit(userId); } // Extract user input for logging const lastMessage = userMessages[userMessages.length - 1]; const userInput = lastMessage ? lastMessage.parts - .filter((part) => part.type === 'text') - .map((part) => (part as any).text) - .join(' ') || 'No text content' + .filter((part) => part.type === 'text') + .map((part) => (part as any).text) + .join(' ') || 'No text content' : 'No content'; requestLogger.info('Processing chat request', { @@ -100,7 +89,7 @@ export class ChatService { return this.mockAgent?.streamMockResponse(); } - const userModelMessages = convertToModelMessages(userMessages); + const userModelMessages = await convertToModelMessages(userMessages); // Create execution context const context: ExecutionContext = { userId, diff --git a/app/ai-assistant/src/domain/services/OpenAIProxyService.ts b/app/ai-assistant/src/domain/services/OpenAIProxyService.ts new file mode 100644 index 0000000..d86ecd7 --- /dev/null +++ b/app/ai-assistant/src/domain/services/OpenAIProxyService.ts @@ -0,0 +1,180 @@ +import { OpenAI } from 'openai'; +import type { ChatCompletionCreateParamsStreaming, ChatCompletionCreateParamsNonStreaming } from 'openai/resources/chat/completions'; +import type { ResponseCreateParamsStreaming, ResponseCreateParamsNonStreaming } from 'openai/resources/responses/responses'; +import { TokenUsageService } from './TokenUsageService.js'; +import { UserAIKeyService } from './UserAIKeyService.js'; +import { createLogger, AppLogger } from '../../utils/logger.js'; + +export class OpenAIProxyService { + private logger: AppLogger = createLogger(undefined, 'OpenAIProxyService'); + private readonly systemOpenAI: OpenAI; + + constructor( + private tokenUsageService: TokenUsageService, + private userAIKeyService: UserAIKeyService + ) { + this.systemOpenAI = new OpenAI({ + apiKey: process.env.OPENAI_API_KEY, + }); + } + + /** + * Resolves which OpenAI client to use for a request + * @param userId - The user ID to check for a custom API key + * @returns OpenAI client instance (user's BYOK or system key) + */ + protected async resolveOpenAIClient(userId: string): Promise { + let openaiClient = this.systemOpenAI; + + try { + const hasKey = await this.userAIKeyService.hasKey(userId); + if (hasKey) { + const keyData = await this.userAIKeyService.retrieveKey(userId); + openaiClient = new OpenAI({ apiKey: keyData.apiKey }); + this.logger.debug('Using user BYOK API key for proxy'); + } + } catch (error) { + this.logger.warn('Failed to check/retrieve user key, falling back to system key', { + userId, + error: (error as Error).message + }); + } + + return openaiClient; + } + + /** + * Validates if the user is allowed to make a request + * @param userId - The user ID to validate against usage limits + * @throws Error if user has exceeded their usage limit + */ + async validateRequest(userId: string): Promise { + await this.tokenUsageService.checkUsageLimit(userId); + } + + /** + * Creates a stream for chat completions + * @param userId - The user ID making the request + * @param body - Chat completion parameters with streaming enabled + * @param options - Optional request configuration (e.g. AbortSignal) + * @returns Async iterable stream of chat completion chunks + */ + async createChatCompletionStream( + userId: string, + body: ChatCompletionCreateParamsStreaming, + options?: { signal?: AbortSignal } + ): Promise> { + await this.validateRequest(userId); + const openaiClient = await this.resolveOpenAIClient(userId); + + // Ensure usage tracking is enabled for compatible models + if (!body.stream_options) { + body.stream_options = { include_usage: true }; + } + + + return await openaiClient.chat.completions.create(body, options) as AsyncIterable; + } + + /** + * Creates a non-streaming chat completion + * @param userId - The user ID making the request + * @param body - Chat completion parameters without streaming + * @param options - Optional request configuration (e.g. AbortSignal) + * @returns Complete chat completion response with usage data + */ + async createChatCompletion( + userId: string, + body: ChatCompletionCreateParamsNonStreaming, + options?: { signal?: AbortSignal } + ): Promise { + await this.validateRequest(userId); + const openaiClient = await this.resolveOpenAIClient(userId); + const completion = await openaiClient.chat.completions.create(body, options); + + if (completion.usage) { + await this.trackUsage(userId, body.model, completion.usage); + } + + return completion; + } + + /** + * Creates a stream for responses API + * @param userId - The user ID making the request + * @param body - Response creation parameters with streaming enabled + * @param options - Optional request configuration (e.g. AbortSignal) + * @returns Async iterable stream of response chunks + */ + async createResponseStream( + userId: string, + body: ResponseCreateParamsStreaming, + options?: { signal?: AbortSignal } + ): Promise> { + await this.validateRequest(userId); + const openaiClient = await this.resolveOpenAIClient(userId); + + return await openaiClient.responses.create(body, options) as AsyncIterable; + } + + /** + * Creates a non-streaming response for responses API + * @param userId - The user ID making the request + * @param body - Response creation parameters without streaming + * @param options - Optional request configuration (e.g. AbortSignal) + * @returns Complete response with usage data + */ + async createResponse( + userId: string, + body: ResponseCreateParamsNonStreaming, + options?: { signal?: AbortSignal } + ): Promise { + await this.validateRequest(userId); + const openaiClient = await this.resolveOpenAIClient(userId); + const response = await openaiClient.responses.create(body, options); + + if (response.usage) { + await this.trackUsage(userId, (response.model || body.model) as string, response.usage); + } + + return response; + } + + /** + * Safely tracks token usage + * @param userId - The user ID to track usage for + * @param model - The AI model used for the request + * @param usage - Usage data containing token counts + */ + async trackUsage(userId: string, model: string, usage: any): Promise { + if (!usage) { + this.logger.warn('No usage data to track', { userId, model }); + return; + } + + try { + const promptTokens = usage.prompt_tokens || usage.input_tokens || 0; + const completionTokens = usage.completion_tokens || usage.output_tokens || 0; + + await this.tokenUsageService.incrementUsage( + userId, + promptTokens, + completionTokens, + model + ); + } catch (error) { + // Log but don't fail the request + this.logger.error('Failed to track token usage', error, { userId, model }); + } + } + + /** + * Extracts usage data from a chunk + * Handles nested usage objects (e.g. Responses API) + * @param chunk - Stream chunk that may contain usage data + * @returns Usage object or null if not found + */ + extractUsage(chunk: any): any { + return chunk.usage || chunk.response?.usage || (chunk as any).usage || null; + } +} diff --git a/app/ai-assistant/src/domain/services/TokenUsageService.ts b/app/ai-assistant/src/domain/services/TokenUsageService.ts new file mode 100644 index 0000000..52e8c7a --- /dev/null +++ b/app/ai-assistant/src/domain/services/TokenUsageService.ts @@ -0,0 +1,50 @@ +import { CacheClient } from '../../infrastructure/persistence/RedisConnector.js'; +import { UsageLimitError, ErrorReason } from '../../utils/errors.js'; +import { createLogger } from '../../utils/logger.js'; + +const TOKENS_LIMIT_PER_MONTH = Number(process.env.TOKENS_LIMIT_PER_MONTH) || 100000; + +export class TokenUsageService { + private logger = createLogger(undefined, 'TokenUsageService'); + + /** + * Checks if the user has exceeded their monthly token limit. + * Throws UsageLimitError if limit is reached. + * @param userId The user ID to check + */ + async checkUsageLimit(userId: string): Promise { + const tokenUsed = await CacheClient.getNumber(userId); + this.logger.debug('Token used', { userId, tokenUsed, tokenLimit: TOKENS_LIMIT_PER_MONTH }); + + if (Number(tokenUsed) > TOKENS_LIMIT_PER_MONTH) { + this.logger.error('Token limit exceeded', undefined, { + userId, + tokenUsed, + limit: TOKENS_LIMIT_PER_MONTH + }); + throw new UsageLimitError('Token limit exceeded', ErrorReason.TOKEN_LIMIT_EXCEEDED, { + tokenUsed, + limit: TOKENS_LIMIT_PER_MONTH + }); + } + } + + /** + * Increments the token usage for a user. + * @param userId The user ID + * @param inputTokens Number of input tokens + * @param outputTokens Number of output tokens + * @param model Optional model name for tracking + */ + async incrementUsage(userId: string, inputTokens: number, outputTokens: number, model?: string): Promise { + const totalTokens = inputTokens + outputTokens; + + await CacheClient.incrementNumberUntilEndOfMonth(userId, totalTokens); + + this.logger.info('Token usage updated', { + userId, + added: totalTokens, + model + }); + } +} diff --git a/app/ai-assistant/src/index.ts b/app/ai-assistant/src/index.ts index 0a7d1d3..ff45104 100644 --- a/app/ai-assistant/src/index.ts +++ b/app/ai-assistant/src/index.ts @@ -1,7 +1,8 @@ +import 'dotenv/config'; // immediate load of environment variables import Fastify, { FastifyBaseLogger, FastifyInstance } from 'fastify'; -import { config } from 'dotenv'; import { ZodError } from 'zod'; import { ChatControllerImpl } from './application/controllers/impl/ChatControllerImpl.js'; +import { OpenAIProxyControllerImpl } from './application/controllers/impl/OpenAIProxyControllerImpl.js'; import HealthControllerImpl from './application/controllers/impl/HealthControllerImpl.js'; import { UserAIKeyControllerImpl } from './application/controllers/impl/UserAIKeyControllerImpl.js'; import { logger } from './utils/logger.js'; @@ -10,10 +11,10 @@ import { PgClient } from './infrastructure/persistence/PgConnector.js'; import { initializeKmsService } from './infrastructure/kms/KmsService.js'; import { UserAIKeyService } from './domain/services/UserAIKeyService.js'; import { UserAIKeyRepositoryImpl } from './infrastructure/repositories/impl/UserAIKeyRepositoryImpl.js'; +import { TokenUsageService } from './domain/services/TokenUsageService.js'; +import { OpenAIProxyService } from './domain/services/OpenAIProxyService.js'; import { APIError, ErrorReason } from './utils/errors.js'; - -// Load environment variables -config(); +import { isLocal } from "./utils/environment.js"; const PORT = parseInt(process.env.PORT || '3001', 10); const HOST = process.env.HOST || '0.0.0.0'; @@ -23,6 +24,18 @@ const fastify: FastifyInstance = Fastify({ disableRequestLogging: true // Disable automatic request logging }); +// Register CORS for local development environment +if (isLocal) { + console.log('Registering CORS for dev environment'); + await fastify.register((await import('@fastify/cors')).default, { + origin: [ + /^http:\/\/localhost:\d+$/ + ], + credentials: true, + methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], + }); +} + // Custom request logging with healthcheck filter fastify.addHook('onRequest', async (request, reply) => { // Skip logging for healthcheck endpoint @@ -64,10 +77,10 @@ fastify.setErrorHandler((error, request, reply) => { message: 'Validation error', statusCode: 400, details: { - errors: error.issues.map((issue) => ({ - path: issue.path.join('.'), - message: issue.message, - code: issue.code + errors: error.issues.map((issue) => ({ + path: issue.path.join('.'), + message: issue.message, + code: issue.code })) }, timestamp: new Date().toISOString() @@ -78,10 +91,10 @@ fastify.setErrorHandler((error, request, reply) => { if (error instanceof APIError) { // Log based on severity if (error.statusCode >= 500) { - fastify.log.error(error); - } else { - fastify.log.warn(error); - } + fastify.log.error(error); + } else { + fastify.log.warn(error); + } return reply.status(error.statusCode).send(error.toJSON()); } @@ -89,12 +102,12 @@ fastify.setErrorHandler((error, request, reply) => { // Handle OpenAI API errors from AI SDK // Thanks to our onError callback in agents, we now receive the original AI_APICallError const openaiError = error as any; - + if (openaiError.statusCode && openaiError.data) { const errorData = openaiError.data?.error || {}; - - fastify.log.warn({ - statusCode: openaiError.statusCode, + + fastify.log.warn({ + statusCode: openaiError.statusCode, type: errorData.type, code: errorData.code, message: openaiError.message, @@ -143,7 +156,7 @@ fastify.setNotFoundHandler((request, reply) => { message: 'Route not found', statusCode: 404, details: { - path: request.url, + path: request.url, method: request.method }, timestamp: new Date().toISOString() @@ -177,11 +190,24 @@ async function start() { logger.warn(error, 'BYOK feature disabled: KMS initialization failed'); } + // Initialize services + const tokenUsageService = new TokenUsageService(); + const proxyUserAIKeyRepository = new UserAIKeyRepositoryImpl(); + const proxyUserAIKeyService = new UserAIKeyService(proxyUserAIKeyRepository); + + const openAIProxyService = new OpenAIProxyService(tokenUsageService, proxyUserAIKeyService); + + // Initialize controllers const healthController = new HealthControllerImpl(fastify); const chatController = new ChatControllerImpl(fastify); + const openAIProxyController = new OpenAIProxyControllerImpl( + fastify, + openAIProxyService + ); await healthController.registerRoutes(); await chatController.registerRoutes(); + await openAIProxyController.registerRoutes(); // Register User Key routes if BYOK is enabled if (userAIKeyController) { @@ -210,6 +236,8 @@ async function start() { endpoints: { chat: `/api/playground/assistant/chat`, history: `/api/playground/assistant/chat/history/:conversationId`, + openaiChatCompletions: `/api/playground/assistant/openai/v1/chat/completions`, + openaiResponses: `/api/playground/assistant/openai/v1/responses`, health: `/api/playground/assistant/health`, stats: `/api/stats`, userKeys: userAIKeyController ? `/api/playground/assistant/user-ai-key` : 'disabled'