diff --git a/vonage-ac-chatbot/README.md b/vonage-ac-chatbot/README.md new file mode 100644 index 0000000..a099371 --- /dev/null +++ b/vonage-ac-chatbot/README.md @@ -0,0 +1,144 @@ +# Vonage Chatbot (Pipecat) + +A real-time voice chatbot built using **Pipecat AI** with **Vonage Audio Connector** over **WebSocket**. +This project streams caller audio to **OpenAI STT**, processes the conversation using an LLM, converts the AI's response to speech via **OpenAI TTS**, and streams it back to the caller in real time. The server exposes a WebSocket endpoint (via **VonageAudioConnectorTransport**) that the Vonage **/connect API** connects to, bridging a live session into the **OpenAI STT → LLM → TTS** pipeline. + + +## Table of Contents + +- [How It Works](#how-it-works) +- [Features](#features) +- [Prerequisites](#prerequisites) +- [Setup](#setup) +- [Environment Configuration](#environment-configuration) +- [Run your Server Application](#run-your-server-application) +- [Testing the Chatbot](#testing-the-chatbot) + +## How It Works + +1. **Vonage connects to your Pipecat WebSocket server -** The /connect API creates a virtual participant and starts streaming audio frames. +2. **Parse the WebSocket messages -** Your Pipecat server reads incoming audio packets from Vonage and sets up a transport. +3. **Start the Pipecat pipeline -** Speech is transcribed by OpenAI STT +4. **The LLM generates responses -** OpenAI TTS converts the text to audio +5. **Return speech back to Vonage -** Pipecat sends audio frames back through the WebSocket, and Vonage injects them into the session in real time. + +## Features + +- **Real-time, bidirectional audio** using WebSockets via Vonage Audio Connector +- **OpenAI-powered pipeline:** STT → LLM → TTS +- **Silero VAD** for accurate speech-pause detection +- **Docker support** for simple deployment and isolation + +## Prerequisites + +- A **Vonage(Opentok) account** +- An **OpenAI API Key** +- Python **3.10+** +- `uv` package manager +- **ngrok** (or any WS tunnel) for local testing +- Docker (optional) + +## Setup + +1. **Set up a virtual environment and install dependencies**: + + ```sh + uv sync + ``` + +2. **Create your .env file**: + + ```sh + cp env.example .env + ``` + Update .env with your credentials and session ID as mentioned in below Section. + +## Environment Configuration + +1. **Create an Opentok/Vonage Session and Publish a Stream** + A Session ID is required for the Audio Connector. + Note: You can use either Opentok or Vonage platform to create the session. Open the Playground (or your own app) to create a session and publish a stream. + Copy the Session ID and set it in `.env` file: + ```sh + VONAGE_SESSION_ID= + ``` + Always use **credentials from the same project** that created the `sessionId`. + +2. **Set the Keys in `.env`** + If the session was created using the OpenTok (API key + secret), set the following in your `.env`: + + ```sh + # OpenAI Key (no quotes) + OPENAI_API_KEY=sk-proj-xxxxxxxxxxxxxxxxxxxxxxxx + + # OpenTok credentials + VONAGE_API_KEY=YOUR_API_KEY + VONAGE_API_SECRET=YOUR_API_SECRET + + # Session ID created in Step 6 + VONAGE_SESSION_ID=1_MX4.... + + # Leave blank; this is auto-filled after `/connect` API call + VONAGE_CONNECTION_ID=... + + ``` + If the session was created using the Vonage platform (App ID + Private Key), set the following in your `.env`: + + ```sh + # Vonage Platform API credentials + VONAGE_APPLICATION_ID=YOUR_APPLICATION_ID + VONAGE_PRIVATE_KEY=YOUR_PRIVATE_KEY_PATH + + # Session ID created in Step 6 + VONAGE_SESSION_ID=1_MX4.... + + # Leave blank; auto-filled by client.py + VONAGE_CONNECTION_ID=... + + ``` + +3. **Install ngrok**: + + Follow the instructions on the [ngrok website](https://ngrok.com/download) to download and install ngrok. You’ll use this to securely expose your local WebSocket server for testing. + +4. **Start ngrok to expose the local WebSocket server**: + + **Run in a separate terminal**, start ngrok to tunnel the local server: + + ```sh + ngrok http 8005 + ``` + + You will see something like: + + ```sh + Forwarding https://a5db22f57efa.ngrok-free.app -> http://localhost:8005 + ``` + + To form the **WSS** URL, replace https:// with wss://. + + Example like for above Forwarding URL below is the wss:// url: + + ```sh + "websocket": { + "uri": "wss://a5db22f57efa.ngrok-free.app", + "audioRate": 16000, + "bidirectional": true + } + ``` + +## Run your Server Application + +You can run the server application using the command below: + + ```sh + uv run server.py + ``` + The server will start on port 8005 and wait for incoming Audio Connector connections. + +## Testing the Chatbot + +1. Follow the instructions in: `examples/vonage-ac-chatbot/client/README.md`. +2. Run the client program (`connect_and_stream.py`) to invoke the **/connect API**. +3. Once the connection is established, begin speaking in the Vonage Video session. Your audio will be forwarded through the Audio Connector to the Pipecat pipeline processed by OpenAI STT → LLM → TTS and the synthesized response will be sent back into the session. +4. You will hear the AI’s voice reply in real time, played back as audio from the virtual participant created by the `/connect` API. diff --git a/vonage-ac-chatbot/client/README.md b/vonage-ac-chatbot/client/README.md new file mode 100644 index 0000000..388ce04 --- /dev/null +++ b/vonage-ac-chatbot/client/README.md @@ -0,0 +1,121 @@ +# Python Client for Server Testing + +This Python client allows you to test the **Vonage Pipecat WebSocket server** by calling the Vonage **/connect** API. It creates a virtual Audio Connector participant inside your Vonage Video session, streams audio from the session to your Pipecat pipeline and plays back the generated response in real time. + +## Setup Instructions + +1. **Open the client directory in separate terminal** + You do **not** need to clone the repository again. + If you already cloned it for the server setup, simply open a new terminal and navigate to: + ```sh + cd vonage-pipecat/examples/vonage-ac-chatbot/client + ``` + +2. **Install dependencies**: + ```sh + uv sync + ``` + +3. **Create .env**: + + ```sh + cp env.example .env + ``` + This `.env` stores all configuration required by the client. + +4. **Use the existing Vonage/Opentok Session from the server setup** + During the server setup, you already: + 1. Created a Vonage/Opentok Video Session + 2. Published a stream + 3. Verified audio is flowing inside the session + The client does **not** need a new session. + The `/connect` API will attach to this existing session. + Simply copy the Session ID you used earlier into your `.env` file: + ```sh + VONAGE_SESSION_ID= + ``` + + If you are using Opentok platform, set OPENTOK_API_URL in your .env: + ```sh + OPENTOK_API_URL=https://api.opentok.com + ``` + If you are using Vonage platform, set VONAGE_API_URL in your .env: + ```sh + VONAGE_API_URL=api.vonage.com + ``` + + **Note:** Ensure you use the **credentials** from the **same project** that created this session. + +5. **Configure credentials and WebSocket settings in `.env`** + If you created the session in Opentok platform, set the following in your `.env`: + ```sh + # OpenTok credentials + VONAGE_API_KEY=YOUR_API_KEY + VONAGE_API_SECRET=YOUR_API_SECRET + + # WebSocket URL of your Pipecat server (ngrok or production) + WS_URI=wss:// + + # Session ID from Step 5 + VONAGE_SESSION_ID=1_MX4.... + + # API base + OPENTOK_API_URL=https://api.opentok.com + + # Leave blank — this is auto-filled after `/connect` API call + VONAGE_CONNECTION_ID= + + # Keep rest as same. + ``` + If you created the session in Vonage platform, set the following in your `.env`: + + ```sh + # Vonage SDK credentials + VONAGE_APPLICATION_ID=YOUR_APPLICATION_ID + VONAGE_PRIVATE_KEY=YOUR_PRIVATE_KEY_PATH + + # Websocket URL of your Pipecat Server (ngrok or production) + WS_URI=wss:// + + # Session ID from Step 5 + VONAGE_SESSION_ID=1_MX4.... + + # API base + VONAGE_API_URL=api.vonage.com + + # Leave blank — this is auto-filled after `/connect` API call + VONAGE_CONNECTION_ID= + + # Keep rest as same. + ``` + +6. **Ensure your Pipecat WebSocket Server is running**: + Before running the client, ensure Websocket Server is running. The client cannot connect unless the WebSocket endpoint is reachable. + +7. **Run the Client**: + The client triggers the `/connect` API → Vonage creates an Audio Connector → audio begins flowing. + If using the Opentok API Key + Secret, run: + ```sh + uv run connect_and_stream.py + ``` + + If using Vonage Application ID + Private Key, run: + ```sh + uv run connect_and_stream_vonage.py + ``` + When successful: + 1) `VONAGE_CONNECTION_ID` is automatically added to `.env`. + 2) The caller's audio is streamed into Pipecat + 3) The AI-generated TTS response is injected back into the session + +**Overriding `.env` Values (Optional)** +The script reads everything from .env via os.getenv(). +You can still override via flags if you want, e.g.: + + ```sh + # Example + uv run connect_and_stream.py --ws-uri wss://my-ngrok/ws --audio-rate 16000 + + # OR + uv run connect_and_stream_vonage.py --ws-uri wss://my-ngrok/ws --audio-rate 16000 + ``` diff --git a/vonage-ac-chatbot/client/connect_and_stream.py b/vonage-ac-chatbot/client/connect_and_stream.py new file mode 100644 index 0000000..b0e85be --- /dev/null +++ b/vonage-ac-chatbot/client/connect_and_stream.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +""" +Use a Vonage (OpenTok) Video API existing session, generate a token, +and connect its audio to your Pipecat WebSocket endpoint. +""" + +import argparse +import json +import os +from pathlib import Path + +from dotenv import load_dotenv +from opentok import Client # SDK 3.x + +# ---- helpers ---------------------------------------------------------------- + + +def parse_kv_pairs(items: list[str]) -> dict[str, str]: + """ + Parse CLI --header/--param entries like "Key=Value" or "Key:Value". + """ + out: dict[str, str] = {} + for raw in items or []: + sep = "=" if "=" in raw else (":" if ":" in raw else None) + if not sep: + raise ValueError(f"Invalid header/param format: {raw!r}. Use Key=Value") + k, v = raw.split(sep, 1) + out[k.strip()] = v.strip() + return out + + +def comma_list(s: str | None) -> list[str]: + return [x.strip() for x in s.split(",")] if s else [] + + +def update_env_var(env_path: str | Path, key: str, value: str) -> None: + """Create or update KEY=VALUE in a .env-style file.""" + path = Path(env_path) + lines: list[str] = [] + + if path.exists(): + lines = path.read_text().splitlines() + + new_lines: list[str] = [] + key_prefix = f"{key}=" + replaced = False + + for line in lines: + if line.strip().startswith("#"): + new_lines.append(line) + continue + if line.lstrip().startswith(key_prefix): + new_lines.append(f"{key}={value}") + replaced = True + else: + new_lines.append(line) + + if not replaced: + new_lines.append(f"{key}={value}") + + path.write_text("\n".join(new_lines) + "\n") + + +# ---- main ------------------------------------------------------------------- + + +def main() -> None: + # Load client-side env (vonage-chatbot/client/.env) + load_dotenv() + + p = argparse.ArgumentParser( + description="Create a session and connect its audio to a WebSocket (Pipecat)." + ) + # Auth + p.add_argument("--api-key", default=os.getenv("VONAGE_API_KEY"), required=False) + p.add_argument("--api-secret", default=os.getenv("VONAGE_API_SECRET"), required=False) + + # Where to connect + p.add_argument("--ws-uri", default=os.getenv("WS_URI"), help="wss://...", required=False) + p.add_argument("--audio-rate", type=int, default=int(os.getenv("VONAGE_AUDIO_RATE", "16000"))) + p.add_argument("--bidirectional", action="store_true", default=True) + + # An existing session which needs to be connected to pipecat-ai + p.add_argument("--session-id", default=os.getenv("VONAGE_SESSION_ID")) + + # Optional streams and headers (to pass to the WS) + p.add_argument( + "--streams", default=os.getenv("VONAGE_STREAMS"), help="Comma-separated stream IDs" + ) + p.add_argument( + "--header", + action="append", + help="Extra header(s) for WS, e.g. --header X-Foo=bar (repeatable)", + ) + + # Optional: choose API base. If your SDK doesn’t accept api_url, set OPENTOK_API_URL env before run. + p.add_argument("--api-base", default=os.getenv("OPENTOK_API_URL", "https://api.opentok.com")) + + args = p.parse_args() + + # Validate inputs + missing = [ + k + for k, v in { + "api-key": args.api_key, + "api-secret": args.api_secret, + "ws-uri": args.ws_uri, + }.items() + if not v + ] + if missing: + raise SystemExit(f"Missing required args/env: {', '.join(missing)}") + + # Init client (SDK 3.x supports api_url kw; if yours doesn’t, remove it and use OPENTOK_API_URL env) + try: + ot = Client(args.api_key, args.api_secret, api_url=args.api_base) + except TypeError: + # Fallback for older SDKs that don't accept api_url + ot = Client(args.api_key, args.api_secret) + + session_id = args.session_id + print(f"Using existing session: {session_id}") + + # Token: generate a fresh one tied to this session + token = ot.generate_token(session_id) + print(f"Generated token: {token[:32]}...") # don’t print full token in logs + + # Build websocket options (mirrors your Postman body) + ws_opts = { + "uri": args.ws_uri, + "audioRate": args.audio_rate, + "bidirectional": bool(args.bidirectional), + } + + # Optional stream filtering + stream_list = comma_list(args.streams) + if stream_list: + ws_opts["streams"] = stream_list + + # Optional headers passed to your WS server + headers = parse_kv_pairs(args.header or []) + if headers: + ws_opts["headers"] = headers + + print("Connecting audio to WebSocket with options:") + print(json.dumps(ws_opts, indent=2)) + + # Call the Audio Connector (equivalent to POST /v2/project/{apiKey}/connect) + resp = ot.connect_audio_to_websocket(session_id, token, ws_opts) + + # Try to get connectionId + connection_id = None + + # Extract connectionId from WebSocketAudioConnection object + connection_id = getattr(resp, "connectionId", None) + + if connection_id: + print(f"\nAudio Connector connectionId: {connection_id}") + + # Write VONAGE_CONNECTION_ID into both client/.env and ../.env (server) + script_dir = Path(__file__).resolve().parent + client_env = script_dir / ".env" + server_env = script_dir.parent / ".env" + + update_env_var(client_env, "VONAGE_CONNECTION_ID", connection_id) + update_env_var(server_env, "VONAGE_CONNECTION_ID", connection_id) + + print("Updated VONAGE_CONNECTION_ID in:") + print(f" {client_env}") + print(f" {server_env}") + else: + print("\nWarning: Could not extract connectionId from Audio Connector response. ") + + print("\nSuccess! Your Video session should now stream audio to/from:", args.ws_uri) + + +if __name__ == "__main__": + main() diff --git a/vonage-ac-chatbot/client/connect_and_stream_vonage.py b/vonage-ac-chatbot/client/connect_and_stream_vonage.py new file mode 100644 index 0000000..f79c63e --- /dev/null +++ b/vonage-ac-chatbot/client/connect_and_stream_vonage.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +""" +Use a Vonage (OpenTok) Video API existing session, generate a token, +and connect its audio to your Pipecat WebSocket endpoint. +""" + +import argparse +import json +import os +from pathlib import Path + +from dotenv import load_dotenv +from vonage import Auth, HttpClientOptions, Vonage +from vonage_video import AudioConnectorOptions, TokenOptions + +# ---- helpers ---------------------------------------------------------------- + + +def parse_kv_pairs(items: list[str]) -> dict[str, str]: + """ + Parse CLI --header/--param entries like "Key=Value" or "Key:Value". + """ + out: dict[str, str] = {} + for raw in items or []: + sep = "=" if "=" in raw else (":" if ":" in raw else None) + if not sep: + raise ValueError(f"Invalid header/param format: {raw!r}. Use Key=Value") + k, v = raw.split(sep, 1) + out[k.strip()] = v.strip() + return out + + +def comma_list(s: str | None) -> list[str]: + return [x.strip() for x in s.split(",")] if s else [] + + +def update_env_var(env_path: str | Path, key: str, value: str) -> None: + """Create or update KEY=VALUE in a .env-style file.""" + path = Path(env_path) + lines: list[str] = [] + + if path.exists(): + lines = path.read_text().splitlines() + + new_lines: list[str] = [] + key_prefix = f"{key}=" + replaced = False + + for line in lines: + if line.strip().startswith("#"): + new_lines.append(line) + continue + if line.lstrip().startswith(key_prefix): + new_lines.append(f"{key}={value}") + replaced = True + else: + new_lines.append(line) + + if not replaced: + new_lines.append(f"{key}={value}") + + path.write_text("\n".join(new_lines) + "\n") + + +# ---- main ------------------------------------------------------------------- + + +def main() -> None: + # Load client-side env (vonage-chatbot/client/.env)sssss + load_dotenv() + + p = argparse.ArgumentParser( + description="Create a session and connect its audio to a WebSocket (Pipecat)." + ) + # Auth + p.add_argument("--application-id", default=os.getenv("VONAGE_APPLICATION_ID"), required=False) + p.add_argument("--private-key", default=os.getenv("VONAGE_PRIVATE_KEY"), required=False) + + # Where to connect + p.add_argument("--ws-uri", default=os.getenv("WS_URI"), help="wss://...", required=False) + p.add_argument("--audio-rate", type=int, default=int(os.getenv("VONAGE_AUDIO_RATE", "16000"))) + + bidirectional_env = os.getenv("VONAGE_BIDIRECTIONAL") + if bidirectional_env is not None: + if bidirectional_env.lower() not in ("true", "false"): + raise SystemExit("VONAGE_BIDIRECTIONAL must be 'true' or 'false'") + bidirectional_default = bidirectional_env.lower() == "true" + else: + bidirectional_default = True + + p.add_argument("--bidirectional", action="store_true", default=bidirectional_default) + + # An existing session which needs to be connected to pipecat-ai + p.add_argument("--session-id", default=os.getenv("VONAGE_SESSION_ID")) + + # Optional streams and headers (to pass to the WS) + p.add_argument( + "--streams", default=os.getenv("VONAGE_STREAMS"), help="Comma-separated stream IDs" + ) + p.add_argument( + "--header", + action="append", + help="Extra header(s) for WS, e.g. --header X-Foo=bar (repeatable)", + ) + + # Optional: choose API base. If your SDK doesn’t accept api_url, set VONAGE_API_URL env before run. + p.add_argument("--api-base", default=os.getenv("VONAGE_API_URL", "api.vonage.com")) + + args = p.parse_args() + + # Validate inputs + missing = [ + k + for k, v in { + "application-id": args.application_id, + "private-key": args.private_key, + "ws-uri": args.ws_uri, + "session-id": args.session_id, + }.items() + if not v + ] + if missing: + raise SystemExit(f"Missing required args/env: {', '.join(missing)}") + + # Create an Auth instance + auth = Auth( + application_id=args.application_id, + private_key=args.private_key, + ) + + # Create HttpClientOptions instance + # (not required unless you want to change options from the defaults) + options = HttpClientOptions(video_host="video." + args.api_base, timeout=30) + + # Create a Vonage instance + vonage = Vonage(auth=auth, http_client_options=options) + + session_id = args.session_id + print(f"Using existing session: {session_id}") + + # Token: generate a fresh one tied to this session + token_options = TokenOptions(session_id=session_id, role="publisher") + token = vonage.video.generate_client_token(token_options) + print(f"Generated token: {token[:32]}...") # don’t print full token in logs + + # Build websocket options (mirrors your Postman body) + ws_opts = { + "uri": args.ws_uri, + "audioRate": args.audio_rate, + "bidirectional": bool(args.bidirectional), + } + + # Optional stream filtering + stream_list = comma_list(args.streams) + if stream_list: + ws_opts["streams"] = stream_list + + # Optional headers passed to your WS server + headers = parse_kv_pairs(args.header or []) + if headers: + ws_opts["headers"] = headers + + print("Connecting audio to WebSocket with options:") + print(json.dumps(ws_opts, indent=2)) + + # Call the Audio Connector (equivalent to POST /v2/project/{apiKey}/connect) + audio_connector_options = AudioConnectorOptions( + session_id=session_id, token=token, websocket=ws_opts + ) + resp = vonage.video.start_audio_connector(audio_connector_options) + + # Try to get connectionId + connection_id = None + + # Extract connectionId from WebSocketAudioConnection object + connection_id = getattr(resp, "connectionId", None) + + if connection_id: + print(f"\nAudio Connector connectionId: {connection_id}") + + # Write VONAGE_CONNECTION_ID into both client/.env and ../.env (server) + script_dir = Path(__file__).resolve().parent + client_env = script_dir / ".env" + server_env = script_dir.parent / ".env" + + update_env_var(client_env, "VONAGE_CONNECTION_ID", connection_id) + update_env_var(server_env, "VONAGE_CONNECTION_ID", connection_id) + + print("Updated VONAGE_CONNECTION_ID in:") + print(f" {client_env}") + print(f" {server_env}") + else: + print("\nWarning: Could not extract connectionId from Audio Connector response. ") + + print("\nSuccess! Your Video session should now stream audio to/from:", args.ws_uri) + + +if __name__ == "__main__": + main() diff --git a/vonage-ac-chatbot/client/env.example b/vonage-ac-chatbot/client/env.example new file mode 100644 index 0000000..651ca9f --- /dev/null +++ b/vonage-ac-chatbot/client/env.example @@ -0,0 +1,29 @@ +# OpenTok credentials +VONAGE_API_KEY=YOUR_API_KEY +VONAGE_API_SECRET=YOUR_API_SECRET + +# API base +OPENTOK_API_URL=https://api.opentok.com + +# Or if you are using Vonage sdk +# Vonage SDK credentials +VONAGE_APPLICATION_ID=YOUR_APPLICATION_ID +VONAGE_PRIVATE_KEY=YOUR_PRIVATE_KEY_PATH + +# API base +VONAGE_API_URL=api.vonage.com + +# Websocket URL of your Pipecat server (ngrok or production) +WS_URI=wss:// + +# Put existing session from playground or app which you want to connect pipecat-ai +VONAGE_SESSION_ID=1_MX4.... + +# Audio settings for the Audio Connector +VONAGE_AUDIO_RATE=16000 + +# Optional: override bidirectional (defaults to true in the script) +# VONAGE_BIDIRECTIONAL=true + +# Leave Blank - This will be auto-written by client after /connect API call: +VONAGE_CONNECTION_ID= diff --git a/vonage-ac-chatbot/client/pyproject.toml b/vonage-ac-chatbot/client/pyproject.toml new file mode 100644 index 0000000..8ec5de3 --- /dev/null +++ b/vonage-ac-chatbot/client/pyproject.toml @@ -0,0 +1,14 @@ +[project] +name = "vonage-pipecat-vonage-chatbot-client" +version = "0.1.0" +description = "Python client for testing the Vonage Pipecat WebSocket server via /connect API." +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "opentok>=3.0.0", + "vonage>=3.3.1", + "python-dotenv", + "websockets>=12.0.0", + "numpy>=1.26.0", + "sounddevice>=0.4.0", +] diff --git a/vonage-ac-chatbot/env.example b/vonage-ac-chatbot/env.example new file mode 100644 index 0000000..680f8cd --- /dev/null +++ b/vonage-ac-chatbot/env.example @@ -0,0 +1,8 @@ +OPENAI_API_KEY= + +# Vonage / OpenTok credentials (used for hang-up in VonageFrameSerializer) +VONAGE_API_KEY=YOUR_API_KEY +VONAGE_API_SECRET=YOUR_API_SECRET +VONAGE_SESSION_ID=YOUR_SESSION_ID +# Leave Blank - This will be auto-written by client after /connect API call: +VONAGE_CONNECTION_ID=... diff --git a/vonage-ac-chatbot/pyproject.toml b/vonage-ac-chatbot/pyproject.toml new file mode 100644 index 0000000..9bca4d2 --- /dev/null +++ b/vonage-ac-chatbot/pyproject.toml @@ -0,0 +1,13 @@ +[project] +name = "vonage-chatbot" +version = "0.1.0" +description = "Vonage Audio Connector voice chatbot example using Pipecat + OpenAI." +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "pipecat-ai[openai,websocket,vonage-audio-connector,silero,runner]>=0.0.95", + "python-dotenv", + "loguru", + "pydub>=0.25", + "PyJWT>=2.8.0", +] diff --git a/vonage-ac-chatbot/server.py b/vonage-ac-chatbot/server.py new file mode 100644 index 0000000..c2ede9a --- /dev/null +++ b/vonage-ac-chatbot/server.py @@ -0,0 +1,165 @@ +# SPDX-License-Identifier: BSD-2-Clause +"""Example: Vonage serializer + custom WS transport + OpenAI STT/LLM/TTS.""" + +from __future__ import annotations + +import asyncio +import os +import time + +import jwt +from dotenv import load_dotenv +from loguru import logger +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.serializers.vonage import VonageFrameSerializer +from pipecat.services.openai import OpenAILLMService, OpenAISTTService, OpenAITTSService +from pipecat.transports.network.websocket_server import WebsocketServerParams +from pipecat.transports.vonage.audio_connector import VonageAudioConnectorTransport + +# ---- Constants --------------------------------------------------------------- + +WS_HOST: str = "0.0.0.0" +WS_PORT: int = 8005 +SESSION_TIMEOUT_SECONDS: int = 60 * 3 # 3 minutes +AUDIO_OUT_SAMPLE_RATE: int = 24_000 + +SYSTEM_INSTRUCTION: str = ( + "You are OpenAI Chatbot, a friendly, helpful robot. " + "Your output will be converted to audio, so avoid special characters. " + "Respond to the user in a creative, helpful way. Keep responses brief—" + "one or two sentences." +) + +# Load environment variables from .env +load_dotenv() + + +def generate_opentok_jwt(api_key: str | None, api_secret: str | None) -> str | None: + """Generate OpenTok JWT for X-OPENTOK-AUTH, or None if missing creds.""" + if not api_key or not api_secret: + logger.warning( + "Vonage example: VONAGE_API_KEY or VONAGE_API_SECRET missing; " + "hang-up via OpenTok REST will be disabled." + ) + return None + + now = int(time.time()) + payload = { + "iss": api_key, + "ist": "project", + "iat": now, + "exp": now + 300, # 5 minutes + } + return jwt.encode(payload, api_secret, algorithm="HS256") + + +async def run_bot_websocket_server() -> None: + # Vonage / OpenTok config for hang-up via force-disconnect REST API + project_id = os.getenv("VONAGE_API_KEY") + api_secret = os.getenv("VONAGE_API_SECRET") + session_id = os.getenv("VONAGE_SESSION_ID") + + opentok_jwt = generate_opentok_jwt(project_id, api_secret) + + # The VonageFrameSerializer uses these values to implement hang-up via: + # DELETE /v2/project/{project_id}/session/{session_id}/connection/{connection_id} + # NOTE: connection_id is NOT set here; it will be injected at runtime via set_connection_id(). + serializer = VonageFrameSerializer( + VonageFrameSerializer.InputParams( + auto_hang_up=True, + send_clear_audio_event=True, + project_id=project_id, + session_id=session_id, + jwt=opentok_jwt, + ) + ) + + ws_transport = VonageAudioConnectorTransport( + host=WS_HOST, + port=WS_PORT, + params=WebsocketServerParams( + serializer=serializer, + audio_in_enabled=True, + audio_out_enabled=True, + add_wav_header=True, + vad_analyzer=SileroVADAnalyzer(), + session_timeout=SESSION_TIMEOUT_SECONDS, + ), + ) + + stt = OpenAISTTService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o-transcribe", + prompt=("Expect words based on questions across technology, science, and culture."), + ) + + tts = OpenAITTSService( + api_key=os.getenv("OPENAI_API_KEY"), + voice="coral", + instructions="There may be literal '\\n' characters; ignore them when speaking.", + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [{"role": "system", "content": SYSTEM_INSTRUCTION}] + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + ws_transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + ws_transport.output(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + audio_out_sample_rate=AUDIO_OUT_SAMPLE_RATE, + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @ws_transport.event_handler("on_client_connected") + async def on_client_connected(_transport, _client) -> None: + logger.info("Client connected") + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @ws_transport.event_handler("on_client_disconnected") + async def on_client_disconnected(_transport, _client) -> None: + logger.info("Client disconnected") + + # Read the latest connectionId written by connect_and_stream.py + load_dotenv(override=True) + conn_id = os.getenv("VONAGE_CONNECTION_ID") + if conn_id: + logger.info(f"Setting serializer connection_id from env: {conn_id}") + serializer.set_connection_id(conn_id) + else: + logger.warning( + "VONAGE_CONNECTION_ID is not set in env. " + "Vonage hang-up via force-disconnect API will be skipped." + ) + + await task.cancel() # This will inject a CancelFrame → serializer triggers hang-up. + + @ws_transport.event_handler("on_websocket_ready") + async def on_websocket_ready(_client) -> None: + logger.info("Server WebSocket ready") + + runner = PipelineRunner(handle_sigint=False) + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(run_bot_websocket_server())