Skip to content

[ResponsesAPI] Implement streaming mode #1582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/responses-server/examples/streaming.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { OpenAI } from "openai";
const openai = new OpenAI({ baseURL: "http://localhost:3000/v1", apiKey: process.env.HF_TOKEN });

const stream = await openai.responses.create({
model: "Qwen/Qwen2.5-VL-7B-Instruct",
input: [
{
role: "user",
content: "Say 'double bubble bath' ten times fast.",
},
],
stream: true,
});

for await (const event of stream) {
console.log(event);
}
248 changes: 201 additions & 47 deletions packages/responses-server/src/routes/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import { generateUniqueId } from "../lib/generateUniqueId.js";
import { InferenceClient } from "@huggingface/inference";
import type { ChatCompletionInputMessage, ChatCompletionInputMessageChunkType } from "@huggingface/tasks";

import { type Response as OpenAIResponse } from "openai/resources/responses/responses";
import type {
Response,
ResponseStreamEvent,
ResponseOutputItem,
ResponseContentPartAddedEvent,
} from "openai/resources/responses/responses";

export const postCreateResponse = async (
req: ValidatedRequest<CreateResponseParams>,
Expand Down Expand Up @@ -33,27 +38,189 @@ export const postCreateResponse = async (
content:
typeof item.content === "string"
? item.content
: item.content.map((content) => {
if (content.type === "input_image") {
return {
type: "image_url" as ChatCompletionInputMessageChunkType,
image_url: {
url: content.image_url,
},
};
}
// content.type must be "input_text" at this point
return {
type: "text" as ChatCompletionInputMessageChunkType,
text: content.text,
};
}),
: item.content
.map((content) => {
switch (content.type) {
case "input_image":
return {
type: "image_url" as ChatCompletionInputMessageChunkType,
image_url: {
url: content.image_url,
},
};
case "output_text":
return {
type: "text" as ChatCompletionInputMessageChunkType,
text: content.text,
};
case "refusal":
return undefined;
case "input_text":
return {
type: "text" as ChatCompletionInputMessageChunkType,
text: content.text,
};
}
})
.filter((item) => item !== undefined),
}))
);
} else {
messages.push({ role: "user", content: req.body.input });
}

const payload = {
model: req.body.model,
messages: messages,
temperature: req.body.temperature,
top_p: req.body.top_p,
stream: req.body.stream,
};

const responseObject: Omit<
Response,
"incomplete_details" | "metadata" | "output_text" | "parallel_tool_calls" | "tool_choice" | "tools"
> = {
object: "response",
id: generateUniqueId("resp"),
status: "in_progress",
error: null,
instructions: req.body.instructions,
model: req.body.model,
temperature: req.body.temperature,
top_p: req.body.top_p,
created_at: new Date().getTime(),
output: [],
};

if (req.body.stream) {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Connection", "keep-alive");
let sequenceNumber = 0;

// Emit events in sequence
const emitEvent = (event: ResponseStreamEvent) => {
res.write(`data: ${JSON.stringify(event)}\n\n`);
};

try {
// Response created event
emitEvent({
type: "response.created",
response: responseObject as Response,
sequence_number: sequenceNumber++,
});

// Response in progress event
emitEvent({
type: "response.in_progress",
response: responseObject as Response,
sequence_number: sequenceNumber++,
});

const stream = client.chatCompletionStream(payload);

const outputObject: ResponseOutputItem = {
id: generateUniqueId("msg"),
type: "message",
role: "assistant",
status: "in_progress",
content: [],
};
responseObject.output = [outputObject];

// Response output item added event
emitEvent({
type: "response.output_item.added",
output_index: 0,
item: outputObject,
sequence_number: sequenceNumber++,
});

// Response content part added event
const contentPart: ResponseContentPartAddedEvent["part"] = {
type: "output_text",
text: "",
annotations: [],
};
outputObject.content.push(contentPart);

emitEvent({
type: "response.content_part.added",
item_id: outputObject.id,
output_index: 0,
content_index: 0,
part: contentPart,
sequence_number: sequenceNumber++,
});

for await (const chunk of stream) {
if (chunk.choices[0].delta.content) {
contentPart.text += chunk.choices[0].delta.content;

// Response output text delta event
emitEvent({
type: "response.output_text.delta",
item_id: outputObject.id,
output_index: 0,
content_index: 0,
delta: chunk.choices[0].delta.content,
sequence_number: sequenceNumber++,
});
}
}

// Response output text done event
emitEvent({
type: "response.output_text.done",
item_id: outputObject.id,
output_index: 0,
content_index: 0,
text: contentPart.text,
sequence_number: sequenceNumber++,
});

// Response content part done event
emitEvent({
type: "response.content_part.done",
item_id: outputObject.id,
output_index: 0,
content_index: 0,
part: contentPart,
sequence_number: sequenceNumber++,
});

// Response output item done event
outputObject.status = "completed";
emitEvent({
type: "response.output_item.done",
output_index: 0,
item: outputObject,
sequence_number: sequenceNumber++,
});

// Response completed event
responseObject.status = "completed";
emitEvent({
type: "response.completed",
response: responseObject as Response,
sequence_number: sequenceNumber++,
});
} catch (streamError: any) {
console.error("Error in streaming chat completion:", streamError);

emitEvent({
type: "error",
code: null,
message: streamError.message || "An error occurred while streaming from inference server.",
param: null,
sequence_number: sequenceNumber++,
});
}
res.end();
return;
}

try {
const chatCompletionResponse = await client.chatCompletion({
model: req.body.model,
Expand All @@ -62,37 +229,24 @@ export const postCreateResponse = async (
top_p: req.body.top_p,
});

const responseObject: Omit<
OpenAIResponse,
"incomplete_details" | "metadata" | "output_text" | "parallel_tool_calls" | "tool_choice" | "tools"
> = {
object: "response",
id: generateUniqueId("resp"),
status: "completed",
error: null,
instructions: req.body.instructions,
model: req.body.model,
temperature: req.body.temperature,
top_p: req.body.top_p,
created_at: chatCompletionResponse.created,
output: chatCompletionResponse.choices[0].message.content
? [
{
id: generateUniqueId("msg"),
type: "message",
role: "assistant",
status: "completed",
content: [
{
type: "output_text",
text: chatCompletionResponse.choices[0].message.content,
annotations: [],
},
],
},
]
: [],
};
responseObject.status = "completed";
responseObject.output = chatCompletionResponse.choices[0].message.content
? [
{
id: generateUniqueId("msg"),
type: "message",
role: "assistant",
status: "completed",
content: [
{
type: "output_text",
text: chatCompletionResponse.choices[0].message.content,
annotations: [],
},
],
},
]
: [];

res.json(responseObject);
} catch (error) {
Expand Down
Loading