Skip to content

Two on_pipeline_started event getting triggered on starting pipeline #3341

@piyushjain0106

Description

@piyushjain0106

pipecat version

0.0.96

Python version

3.11

Operating System

macOS

Issue description

Two on_pipeline_started events are coming on starting pipeline

@task.event_handler("on_pipeline_started") async def on_pipeline_started(task, frame): try: if self.websocket.client_state.value == 1: await self.websocket.send_json({ "type": "state", "state": "ready" }) self.log.info("Pipeline started") except Exception as e: self.log.error(f"Failed to send pipeline ready state event, error: {e}")

Reproduction steps

`async def run(self):
"""Run the bot session"""

    # check for doro device or parent type
    is_welcome_msg, welcome_msg = welcome_msg_required(self.device_type, self.user_type, self.user_name, self.language)
    # Setup transport
    transport = self._setup_transport()
    
    # Create services
    services = ServiceManager.create_services(
        pipeline_config, self.stt_service, self.llm_service, self.tts_service,
        self.llm_model, self.metadata, self.log
    )
    
    # Setup context aggregator
    context_aggregator, system_prompt = self._setup_context_aggregator(services['llm'])
    
    # setup welcome canceller
    welcome_canceller = None
    send_welcome_after_timeout = None
    if pipeline_config.welcome_detection_enabled and not is_welcome_msg:
        async def send_welcome_after_timeout():
            try:
                await asyncio.sleep(self.welcome_timeout_secs)
                if not self.welcome_state.user_has_spoken and not self.welcome_state.welcome_sent:
                    self.log.info(f"silent for {self.welcome_timeout_secs}s, sending welcome message")
                    self.welcome_state.welcome_sent = True

                    if context_aggregator:
                        await task.queue_frames([
                            context_aggregator.user().get_context_frame()
                        ])
            except asyncio.CancelledError:
                self.log.debug("Welcome timeout cancelled - user spoke first")

        welcome_canceller = FirstSpeechDetection(self.welcome_state, metadata=self.metadata)
    else:
        self.log.warning("Welcome detection disabled - no automatic greeting")
    
    
    # Create processors
    recorder = CallRecorder(self.call_id, self.user_id)
    processors = ProcessorManager.create_processors(
        pipeline_config, self.metadata, recorder, self.websocket, self.queue_service,
        self.call_id, self.user_id, self.stt_enable_vad, self.stt_enable_partials, 
        self.welcome_state, context_aggregator, system_prompt
    )
    
    # Create additional components
    user_transcript_recorder = CustomTranscriptProcessor()
    text_cleaner = TextCleanerProcessor(metadata=self.metadata)
    emotion_filler = EmotionFillerProcessor(
        sample_rate=16000, min_audio_duration=0.5, confidence_threshold=0.1,
        language=self.language.lower(), metadata=self.metadata,
        notifier=EventNotifier(), recorder=recorder
    )
    
    exit_intent_processor = maybe_create(
        context_aggregator is not None,
        lambda: ExitIntentProcessor(
            transport=transport, context_aggregator=context_aggregator,
            websocket=self.websocket, metadata=self.metadata
        )
    )
    self.exit_intent_processor = exit_intent_processor
    
    # Build pipeline
    pipeline_components = PipelineBuilder.build_component_chain(
        transport, services, processors, context_aggregator,
        user_transcript_recorder, text_cleaner, emotion_filler,
        exit_intent_processor, self.filler_mode
    )
    
    pipeline = PipelineBuilder.create_pipeline(pipeline_components)
    
    # Setup event handlers
    self._setup_event_handlers(transport, user_transcript_recorder, recorder, send_welcome_after_timeout, is_welcome_msg)
    
    # Create and run task
    task = PipelineTask(
        pipeline,
        params=PipelineParams(
            allow_interruptions=True,
            enable_metrics=True,
            interruption_strategies=[MinWordsInterruptionStrategy(min_words=1)],
        )
    )
    self.task = task

    @task.event_handler("on_pipeline_started")
    async def on_pipeline_started(task, frame):
        try:
            if self.websocket.client_state.value == 1:
                await self.websocket.send_json({
                    "type": "state",
                    "state": "ready"
                })
            self.log.info("Pipeline started")
        except Exception as e:
            self.log.error(f"Failed to send pipeline ready state event, error: {e}")
    
    start_frame = StartFrame(allow_interruptions=True)
    if is_welcome_msg:
        await task.queue_frames([start_frame, TTSSpeakFrame(welcome_msg)])
    else:
        await task.queue_frame(start_frame)
    
    runner = PipelineRunner()
    self.runner = runner
    try:
        await runner.run(task)
    finally:
        # Always attempt cleanup even if the pipeline exits unexpectedly.
        await self._finalize_session(recorder, reason="session_finished")`

Expected behavior

Ideally only one should be created and one more question only one startFrame is passed right during the whole pipeline run session? Correct?

Actual behavior

Getting two events instead of one. And In case if user starts speaking then the second one is passed after a while

Logs

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions