Skip to content

Commit 6a8393e

Browse files
authored
Merge pull request #586 from devchat-ai/fix_llm_stream_error
fix: Improve LLM stream handling in devchatComplete function
2 parents c0fe7f1 + fa57ebe commit 6a8393e

File tree

1 file changed

+57
-38
lines changed
  • src/contributes/codecomplete

1 file changed

+57
-38
lines changed

src/contributes/codecomplete/llm.ts

Lines changed: 57 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ export async function * ollamaDeepseekComplete(prompt: string) : AsyncGenerator<
184184
}
185185

186186

187+
187188
export async function * devchatComplete(prompt: string) : AsyncGenerator<CodeCompletionChunk> {
188189
const devchatEndpoint = DevChatConfig.getInstance().get("providers.devchat.api_base");
189190
const llmApiBase = DevChatConfig.getInstance().get("complete_api_base");
@@ -199,24 +200,26 @@ export async function * devchatComplete(prompt: string) : AsyncGenerator<CodeCom
199200

200201
const startTimeLLM = process.hrtime();
201202

202-
const headers = {
203-
'Content-Type': 'application/json'
204-
};
205-
const payload = {
206-
model: model,
207-
prompt: prompt,
208-
stream: true,
209-
stop: ["<|endoftext|>", "<|EOT|>", "<file_sep>", "\n\n"],
210-
temperature: 0.2
211-
};
203+
const headers = {
204+
'Content-Type': 'application/json'
205+
};
206+
const payload = {
207+
model: model,
208+
prompt: prompt,
209+
stream: true,
210+
stop: ["<|endoftext|>", "<|EOT|>", "<file_sep>", "\n\n"],
211+
temperature: 0.2,
212+
max_tokens: 200
213+
};
212214

213-
let idResponse = undefined;
215+
// 内部实现的 sleep 函数
216+
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
214217

215218
try {
216219
const response = await fetch(completionApiBase, {
217220
method: 'POST',
218-
headers,
219-
body: JSON.stringify(payload),
221+
headers,
222+
body: JSON.stringify(payload),
220223
});
221224

222225
if (response.ok && response.body) {
@@ -227,42 +230,53 @@ export async function * devchatComplete(prompt: string) : AsyncGenerator<CodeCom
227230
const durationLLM = endTimeLLM[0] + endTimeLLM[1] / 1e9;
228231
logger.channel()?.debug(`LLM api post took ${durationLLM} seconds`);
229232

230-
let hasFirstLine = false;
231233
let hasFirstChunk = false;
234+
let hasFirstLine = false;
235+
let buffer = '';
236+
const dataRegex = /^data: /m; // 匹配行首的 "data: "
237+
238+
// 模拟接收数据的函数
239+
async function* simulateStreamReceive(stream: any): AsyncGenerator<Uint8Array> {
240+
for await (const chunk of stream) {
241+
const chunkSize = chunk.length;
242+
const numParts = Math.ceil(Math.random() * 3) + 1; // 随机将chunk分成1-4部分
243+
const partSize = Math.ceil(chunkSize / numParts);
244+
245+
for (let i = 0; i < chunkSize; i += partSize) {
246+
const part = chunk.slice(i, Math.min(i + partSize, chunkSize));
247+
logger.channel()?.debug(`Simulated receiving part ${i / partSize + 1}/${numParts} of chunk, size: ${part.length} bytes`);
248+
yield part;
249+
await sleep(Math.random() * 100); // 模拟网络延迟,0-100ms
250+
}
251+
}
252+
}
253+
232254
for await (const chunk of stream) {
233255
if (!hasFirstChunk) {
234256
hasFirstChunk = true;
235257
const endTimeFirstChunk = process.hrtime(startTimeLLM);
236258
const durationFirstChunk = endTimeFirstChunk[0] + endTimeFirstChunk[1] / 1e9;
237259
logger.channel()?.debug(`LLM first chunk took ${durationFirstChunk} seconds`);
238260
}
239-
const chunkDataText = decoder.decode(chunk).trim();
240-
// split chunkText by "data: ", for example:
241-
// data: 123 data: 456 will split to ["", "data: 123 ", "data: 456"]
242-
const chunkTexts = chunkDataText.split("data: ");
243-
for (const chunkTextSplit of chunkTexts) {
244-
if (chunkTextSplit.trim().length === 0) {
245-
continue;
246-
}
247-
248-
const chunkText = "data: " + chunkTextSplit.trim();
249-
250-
// logger.channel()?.info("receve chunk:", chunkText);
251-
// data: {"id": "cmpl-1713846153", "created": 1713846160.292709, "object": "completion.chunk", "model": "ollama/starcoder2:7b", "choices": [{"index": 0, "finish_reason": "stop", "text": "\n});"}]}
252-
// data: {"id": "cmpl-1713846153", "created": 1713846160.366049, "object": "completion.chunk", "model": "ollama/starcoder2:7b", "choices": [{"index": 0, "finish_reason": "stop", "text": ""}], "usage": {"prompt_tokens": 413, "completion_tokens": 16}}
253-
if (!chunkText.startsWith("data:")) {
254-
// log unexpected data
255-
logger.channel()?.warn("Unexpected data: " + chunkText);
256-
return;
257-
}
258261

259-
const jsonData = chunkText.substring(5).trim();
262+
const chunkDataText = decoder.decode(chunk);
263+
buffer += chunkDataText;
264+
265+
while (true) {
266+
const match = dataRegex.exec(buffer);
267+
if (!match) break;
268+
269+
const dataStart = match.index;
270+
const nextDataStart = buffer.slice(dataStart + 5).search(dataRegex);
271+
const jsonEnd = nextDataStart !== -1 ? dataStart + 5 + nextDataStart : buffer.length;
272+
const jsonData = buffer.substring(dataStart + 5, jsonEnd).trim();
273+
260274
if (jsonData === "[DONE]") {
261275
return;
262276
}
263277

264278
try {
265-
const data = JSON.parse(chunkText.substring(5).trim());
279+
const data = JSON.parse(jsonData);
266280
if (!hasFirstLine && data.choices[0].text.indexOf("\n") !== -1) {
267281
hasFirstLine = true;
268282
const endTimeLine = process.hrtime(startTimeLLM);
@@ -273,9 +287,14 @@ export async function * devchatComplete(prompt: string) : AsyncGenerator<CodeCom
273287
text: data.choices[0].text,
274288
id: data.id
275289
};
276-
} catch (e: any) {
277-
logger.channel()?.info("receve:", chunkText);
278-
logger.channel()?.warn("JSON Parsing Error:", e.message);
290+
buffer = buffer.slice(jsonEnd);
291+
} catch (e) {
292+
// 如果解析失败,可能是因为数据不完整,我们继续到下一个循环
293+
if (nextDataStart === -1) {
294+
// 如果没有下一个 'data:',保留剩余的buffer
295+
break;
296+
}
297+
buffer = buffer.slice(dataStart + 5 + nextDataStart);
279298
}
280299
}
281300
}

0 commit comments

Comments
 (0)