diff --git a/vonage-chatbot/inbound/Dockerfile b/vonage-chatbot/inbound/Dockerfile new file mode 100644 index 0000000..c6349f1 --- /dev/null +++ b/vonage-chatbot/inbound/Dockerfile @@ -0,0 +1,16 @@ +FROM dailyco/pipecat-base:latest + +# Enable bytecode compilation +ENV UV_COMPILE_BYTECODE=1 + +# Copy from the cache instead of linking since it's a mounted volume +ENV UV_LINK_MODE=copy + +# Install the project's dependencies using the lockfile and settings +RUN --mount=type=cache,target=/root/.cache/uv \ + --mount=type=bind,source=uv.lock,target=uv.lock \ + --mount=type=bind,source=pyproject.toml,target=pyproject.toml \ + uv sync --locked --no-install-project --no-dev + +# Copy the application code +COPY ./bot.py bot.py \ No newline at end of file diff --git a/vonage-chatbot/inbound/README.md b/vonage-chatbot/inbound/README.md new file mode 100644 index 0000000..c796874 --- /dev/null +++ b/vonage-chatbot/inbound/README.md @@ -0,0 +1,174 @@ +# Vonage Chatbot: Inbound + +This project is a Pipecat-based chatbot that integrates with Vonage Voice API to handle inbound phone calls via WebSocket connections and provide real-time voice conversations. + +> ⚠️ Important: Vonage WebSocket support (`/ws/vonage`) is not yet available on Pipecat Cloud. + +## Table of Contents + +- [How It Works](#how-it-works) +- [Prerequisites](#prerequisites) +- [Setup](#setup) +- [Environment Configuration](#environment-configuration) +- [Local Development](#local-development) +- [Production Deployment](#production-deployment) +- [Customizing your Bot](#customizing-your-bot) +- [Server Guide](#server-guide) + +## How It Works + +When someone calls your Vonage number: + +1. **Vonage requests NCCO**: Vonage sends a webhook request (GET or POST) to your configured Answer URL +2. **Server returns NCCO**: Your FastAPI server (`server.py`) responds with an NCCO (Nexmo Call Control Object) containing WebSocket connection details +3. **WebSocket connection established**: Vonage establishes a WebSocket connection to the URI specified in the NCCO +4. **Connection event**: Vonage sends a `websocket:connected` JSON event with audio format details (e.g., `audio/l16;rate=16000`) +5. **Audio streaming begins**: + - Vonage sends **binary messages** containing the caller's voice (16-bit linear PCM audio) + - Your bot processes the audio through the Pipecat pipeline (STT → LLM → TTS) + - Your bot sends **binary messages** back with synthesized speech +6. **Control commands**: Your bot can send **text messages** (JSON) for control: + - `{"action": "clear"}` - Stop audio playback immediately + - `{"action": "notify", "payload": {...}}` - Request notification when audio finishes +7. **Call ends**: When the call ends, the WebSocket connection closes + +### Protocol Details + +The Vonage Voice API uses a **mixed-mode WebSocket protocol**: + +- **Binary messages**: Raw 16-bit linear PCM audio data (no base64 encoding) +- **Text messages**: JSON for control commands and events +- **Sample rates**: 8kHz, 16kHz (recommended), or 24kHz +- **Channels**: Mono (1 channel) + +### Flow Diagram + +``` +┌─────────┐ ┌─────────────┐ ┌──────────────┐ ┌─────────┐ +│ Caller │────────>│ Vonage │────────>│ Your Server │────────>│ Pipecat │ +│ │ Dials │ Number │ Webhook │ (server.py) │ WebSocket│ Bot │ +└─────────┘ └─────────────┘ └──────────────┘ └─────────┘ + │ │ │ + │ 1. POST /answer │ │ + │───────────────────────>│ │ + │ │ │ + │ 2. NCCO (JSON) │ │ + │<───────────────────────│ │ + │ │ │ + │ 3. WebSocket Connect (wss://...) │ + │────────────────────────────────────────────────>│ + │ │ │ + │ 4. {"event": "websocket:connected"} │ + │────────────────────────────────────────────────>│ + │ │ │ + │ 5. Binary Audio (caller voice) │ + │────────────────────────────────────────────────>│ + │ │ │ + │ 6. Binary Audio (bot response) │ + │<────────────────────────────────────────────────│ + │ │ │ +``` + +## Prerequisites + +### Vonage + +- A Vonage account with: + - A purchased phone number that supports voice calls + +### AI Services + +- Google API key for the LLM inference +- Deepgram API key for speech-to-text +- Cartesia API key for text-to-speech + +### System + +- Python 3.10+ +- `uv` package manager +- ngrok (for local development) +- Docker (for production deployment) + +## Setup + +1. Set up a virtual environment and install dependencies: + + ```sh + cd inbound + uv sync + ``` + +2. Create an .env file and add API keys: + + ```sh + cp env.example .env + ``` + +## Environment Configuration + +The bot supports two deployment modes controlled by the `ENV` variable: + +### Local Development (`ENV=local`) + +- Uses your local server or ngrok URL for WebSocket connections +- Default configuration for development and testing +- WebSocket connections go directly to your running server + +### Production (`ENV=production`) + +- Uses Pipecat Cloud WebSocket URLs automatically +- Requires the agent name and organization name from your Pipecat Cloud deployment +- Set these when deploying to production environments +- WebSocket connections route through Pipecat Cloud infrastructure + +## Local Development + +### Configure Vonage + +1. Start ngrok: + In a new terminal, start ngrok to tunnel the local server: + + ```sh + ngrok http 7860 + ``` + + > Tip: Use the `--subdomain` flag for a reusable ngrok URL. + +2. Configure your Vonage number: + + - Go to your Vonage Dashboard: https://dashboard.nexmo.com/ + - Navigate to Numbers > Your numbers + - Click on your phone number + - In the "Voice" section: + - Set "Answer URL" to: `https://your-url.ngrok.io/answer` + - Set HTTP method to **POST** + - Click "Save" + + > **Note**: This example uses 16kHz audio for better AI/speech recognition quality. + +### Run your Bot + +The FastAPI server handles the webhook from Vonage and dynamically returns the NCCO: + +```bash +uv run server.py +``` + +This will start the server on port 7860. The server provides: + +- `/answer` endpoint (POST): Returns NCCO to Vonage +- `/ws` endpoint: WebSocket endpoint for audio streaming + +In your Vonage Dashboard, set your Answer URL to: + +``` +https://your-url.ngrok.io/answer +``` + +### Call your Bot + +Place a call to the number associated with your bot. The bot will answer and start the conversation. + +## Production Deployment + +Coming soon to Pipecat Cloud! diff --git a/vonage-chatbot/inbound/bot.py b/vonage-chatbot/inbound/bot.py new file mode 100644 index 0000000..e701275 --- /dev/null +++ b/vonage-chatbot/inbound/bot.py @@ -0,0 +1,123 @@ +# +# Copyright (c) 2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os + +from dotenv import load_dotenv +from loguru import logger +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import parse_telephony_websocket +from pipecat.serializers.vonage import VonageFrameSerializer +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.google.llm import GoogleLLMService +from pipecat.transports.base_transport import BaseTransport +from pipecat.transports.websocket.fastapi import ( + FastAPIWebsocketParams, + FastAPIWebsocketTransport, +) + +load_dotenv(override=True) + + +async def run_bot(transport: BaseTransport, handle_sigint: bool): + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + messages = [ + { + "role": "system", + "content": "You are an elementary teacher in an audio call. Your output will be converted to audio so don't include special characters in your answers. Respond to what the student said in a short short sentence.", + }, + ] + + context = LLMContext(messages) + context_aggregator = LLMContextAggregatorPair(context) + + pipeline = Pipeline( + [ + transport.input(), # Websocket input from client + stt, # Speech-To-Text + context_aggregator.user(), + llm, # LLM + tts, # Text-To-Speech + transport.output(), # Websocket output to client + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + audio_in_sample_rate=16000, # Vonage default is 16kHz + audio_out_sample_rate=16000, + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + await task.cancel() + + runner = PipelineRunner(handle_sigint=handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + + transport_type, call_data = await parse_telephony_websocket(runner_args.websocket) + logger.info(f"Auto-detected transport: {transport_type}") + + # Extract phone numbers from call_data (parsed by runner utils) + # Use this information to personalize the bot's response. + from_number = call_data.get("from") + to_number = call_data.get("to") + + if from_number and to_number: + logger.info(f"Call from {from_number} to {to_number}") + + serializer = VonageFrameSerializer() + + transport = FastAPIWebsocketTransport( + websocket=runner_args.websocket, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + add_wav_header=False, + vad_analyzer=SileroVADAnalyzer(), + serializer=serializer, + ), + ) + + await run_bot(transport, runner_args.handle_sigint) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/vonage-chatbot/inbound/env.example b/vonage-chatbot/inbound/env.example new file mode 100644 index 0000000..f744b91 --- /dev/null +++ b/vonage-chatbot/inbound/env.example @@ -0,0 +1,11 @@ +# AI Service API Keys +GOOGLE_API_KEY=your_google_api_key_here +DEEPGRAM_API_KEY=your_deepgram_api_key_here +CARTESIA_API_KEY=your_cartesia_api_key_here + +# Deployment Configuration +ENV=local # Options: local, production +# For production deployment to Pipecat Cloud: +# AGENT_NAME=your_agent_name +# ORGANIZATION_NAME=your_org_name + diff --git a/vonage-chatbot/inbound/pyproject.toml b/vonage-chatbot/inbound/pyproject.toml new file mode 100644 index 0000000..50d2f2e --- /dev/null +++ b/vonage-chatbot/inbound/pyproject.toml @@ -0,0 +1,10 @@ +[project] +name = "vonage-chatbot-dial-in" +version = "0.1.0" +description = "Vonage dial-in example for Pipecat" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "pipecat-ai[websocket,cartesia,google,silero,deepgram,runner]>=0.0.99", + "pipecatcloud>=0.2.16", +] diff --git a/vonage-chatbot/inbound/server.py b/vonage-chatbot/inbound/server.py new file mode 100644 index 0000000..947d850 --- /dev/null +++ b/vonage-chatbot/inbound/server.py @@ -0,0 +1,208 @@ +# +# Copyright (c) 2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""An example server for Vonage to start WebSocket streaming to Pipecat Cloud.""" + +import base64 +import json +import os + +import uvicorn +from dotenv import load_dotenv +from fastapi import FastAPI, HTTPException, Query, Request, WebSocket +from starlette.responses import JSONResponse + +# Load environment variables from .env file +load_dotenv() + +app = FastAPI(title="Vonage NCCO Server", description="Serves NCCO for Vonage WebSocket streaming") + + +def get_websocket_url(host: str, body_data: dict = None): + """Construct WebSocket URL based on environment variables with query parameters.""" + env = os.getenv("ENV", "local").lower() + + # Build query parameters + query_params = [] + + if env == "production": + agent_name = os.getenv("AGENT_NAME") + org_name = os.getenv("ORGANIZATION_NAME") + + if not agent_name or not org_name: + raise ValueError( + "AGENT_NAME and ORGANIZATION_NAME must be set in environment variables for production" + ) + + service_host = f"{agent_name}.{org_name}" + query_params.append(f"serviceHost={service_host}") + base_url = "wss://api.pipecat.daily.co/ws/vonage" + else: + base_url = f"wss://{host}/ws" + + # Add body data as query parameter + if body_data: + body_json = json.dumps(body_data) + body_encoded = base64.b64encode(body_json.encode("utf-8")).decode("utf-8") + query_params.append(f"body={body_encoded}") + + # Construct final URL + if query_params: + return f"{base_url}?{'&'.join(query_params)}" + else: + return base_url + + +@app.post("/answer") +async def answer_call(request: Request): + """ + Returns NCCO (JSON) for Vonage to start WebSocket streaming with call information + + Agent and organization names are configured via environment variables: + - AGENT_NAME: Your deployed agent name + - ORGANIZATION_NAME: Your Pipecat Cloud organization + + For local development, set ENV=local in your .env file. + For production, set ENV=production with AGENT_NAME and ORGANIZATION_NAME. + + Vonage sends call parameters as form data: + - conversation_uuid, uuid, from, to + + Example webhook URL: https://your-domain.com/answer + """ + # Vonage sends JSON (not form data) for webhook requests + content_type = request.headers.get("content-type", "") + if "application/json" in content_type: + form_data = await request.json() + else: + form_data = await request.form() + + # Extract parameters from form data + conversation_uuid = form_data.get("conversation_uuid") + uuid = form_data.get("uuid") + from_number = form_data.get("from") + to = form_data.get("to") + + # Create body data + body_data = {} + if from_number: + body_data["from"] = from_number + if to: + body_data["to"] = to + if uuid: + body_data["call_uuid"] = uuid + if conversation_uuid: + body_data["conversation_uuid"] = conversation_uuid + + # Log call details + if uuid: + print(f"Vonage inbound call: {from_number} → {to}, UUID: {uuid}") + + # Validate environment configuration + env = os.getenv("ENV", "local").lower() + if env == "production": + if not os.getenv("AGENT_NAME") or not os.getenv("ORGANIZATION_NAME"): + raise HTTPException( + status_code=500, + detail="AGENT_NAME and ORGANIZATION_NAME must be set for production deployment", + ) + + # Get request host and construct WebSocket URL with body data + host = request.headers.get("host") + if not host: + raise HTTPException(status_code=400, detail="Unable to determine server host") + + websocket_url = get_websocket_url(host, body_data if body_data else None) + + # Use 16kHz for better AI/speech recognition quality + sample_rate = 16000 + + # Build NCCO with headers for production or query params for local + if env == "production": + # For production (Pipecat Cloud), use headers + ncco = [ + { + "action": "connect", + "endpoint": [ + { + "type": "websocket", + "uri": websocket_url, + "content-type": f"audio/l16;rate={sample_rate}", + "headers": { + "_pipecatCloudServiceHost": f"{os.getenv('AGENT_NAME')}.{os.getenv('ORGANIZATION_NAME')}" + }, + } + ], + } + ] + else: + # For local development, use query parameters + ncco = [ + { + "action": "connect", + "endpoint": [ + { + "type": "websocket", + "uri": websocket_url, + "content-type": f"audio/l16;rate={sample_rate}", + } + ], + } + ] + + print(f"Generated NCCO: {json.dumps(ncco, indent=2)}") + return JSONResponse(content=ncco) + + +@app.websocket("/ws") +async def websocket_endpoint( + websocket: WebSocket, + body: str = Query(None), + serviceHost: str = Query(None), +): + """Handle WebSocket connections for inbound calls.""" + await websocket.accept() + print("WebSocket connection accepted for inbound call") + + # Decode body parameter if provided + body_data = {} + if body: + try: + # Base64 decode the JSON (it was base64-encoded in the webhook handler) + decoded_json = base64.b64decode(body).decode("utf-8") + body_data = json.loads(decoded_json) + print(f"Decoded body data: {body_data}") + except Exception as e: + print(f"Error decoding body parameter: {e}") + else: + print("No body parameter received") + + try: + # Import the bot function from the bot module + from bot import bot + from pipecat.runner.types import WebSocketRunnerArguments + + # Store body_data in websocket state so parse_telephony_websocket can access it + # Note: Vonage doesn't include phone numbers in WebSocket messages, only in the + # initial webhook. We pass this data through websocket state to make it available + # to the bot via parse_telephony_websocket(). + websocket.state.vonage_call_data = body_data + + # Create runner arguments and run the bot + runner_args = WebSocketRunnerArguments(websocket=websocket) + runner_args.handle_sigint = False + + await bot(runner_args) + + except Exception as e: + print(f"Error in WebSocket endpoint: {e}") + await websocket.close() + + +if __name__ == "__main__": + # Run the server on port 7860 + # Use with ngrok: ngrok http 7860 + uvicorn.run(app, host="0.0.0.0", port=7860)