diff --git a/agents/src/pipeline/pipeline_agent.ts b/agents/src/pipeline/pipeline_agent.ts index f83f4e05..abe686a4 100644 --- a/agents/src/pipeline/pipeline_agent.ts +++ b/agents/src/pipeline/pipeline_agent.ts @@ -6,6 +6,7 @@ import type { NoiseCancellationOptions, RemoteParticipant, Room, + TranscriptionSegment, } from '@livekit/rtc-node'; import { AudioSource, @@ -275,6 +276,7 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< #lastSpeechTime?: number; #transcriptionId?: string; #agentTranscribedText = ''; + #agentFinalTranscriptionBuffer: TranscriptionSegment[] = []; constructor( /** Voice Activity Detection instance. */ @@ -748,8 +750,11 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< if (handle.interrupted) break; } commitUserQuestionIfNeeded(); - - let collectedText = this.#agentTranscribedText; + // wait for the new sentence delay + await new Promise((resolve) => setTimeout(resolve, defaultTextSyncOptions.newSentenceDelay)); + let collectedText = this.#agentFinalTranscriptionBuffer + .map((segment) => segment.text) + .join(' '); const isUsingTools = handle.source instanceof LLMStream && !!handle.source.functionCalls.length; const interrupted = handle.interrupted; @@ -763,6 +768,7 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< const msg = ChatMessage.create({ text: collectedText, role: ChatRole.ASSISTANT }); this.chatCtx.messages.push(msg); + this.#agentFinalTranscriptionBuffer = []; handle.markSpeechCommitted(); if (interrupted) { @@ -925,8 +931,20 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< ): SynthesisHandle { const synchronizer = new TextAudioSynchronizer(defaultTextSyncOptions); // TODO: where possible we would want to use deltas instead of full text segments, esp for LLM streams over the streamText API + let lastTranscriptionSegmentId: string | undefined; synchronizer.on('textUpdated', async (text) => { this.#agentTranscribedText = text.text; + if (lastTranscriptionSegmentId !== text.id) { + this.#agentFinalTranscriptionBuffer.push(text); + lastTranscriptionSegmentId = text.id; + } else { + const transcriptionSegment = + this.#agentFinalTranscriptionBuffer[this.#agentFinalTranscriptionBuffer.length - 1]; + if (transcriptionSegment) { + transcriptionSegment.text = text.text; + } + } + await this.#publishTranscription( this.#room!.localParticipant!.identity!, this.#agentPublication?.sid ?? '',