diff --git a/.github/reusable-steps/categorize-projects/action.yml b/.github/reusable-steps/categorize-projects/action.yml index 43bf4bd9..c812775b 100644 --- a/.github/reusable-steps/categorize-projects/action.yml +++ b/.github/reusable-steps/categorize-projects/action.yml @@ -12,6 +12,8 @@ outputs: value: ${{ steps.group-subprojects.outputs.gradio }} webcam: value: ${{ steps.group-subprojects.outputs.webcam }} + mcp: + value: ${{ steps.group-subprojects.outputs.mcp }} js: value: ${{ steps.group-subprojects.outputs.js }} @@ -26,6 +28,7 @@ runs: python=() gradio=() webcam=() + mcp=() js=() for dir in ${{ inputs.subprojects }}; do @@ -33,6 +36,8 @@ runs: js+=("$dir") elif find "$dir" -maxdepth 1 -name "*.ipynb" | grep -q "."; then notebook+=("$dir") + elif [ -f "$dir/requirements.txt" ] && grep -q "mcp" "$dir/requirements.txt"; then + mcp+=("$dir") elif [ -f "$dir/requirements.txt" ] && { grep -q "gradio" "$dir/requirements.txt" || grep -q "fastapi" "$dir/requirements.txt"; }; then gradio+=("$dir") elif [ -f "$dir/main.py" ] && grep -q -- "--stream" "$dir/main.py"; then @@ -46,12 +51,14 @@ runs: python_json=$(printf '%s\n' "${python[@]}" | jq -R -s -c 'split("\n") | map(select(length > 0))') gradio_json=$(printf '%s\n' "${gradio[@]}" | jq -R -s -c 'split("\n") | map(select(length > 0))') webcam_json=$(printf '%s\n' "${webcam[@]}" | jq -R -s -c 'split("\n") | map(select(length > 0))') + mcp_json=$(printf '%s\n' "${mcp[@]}" | jq -R -s -c 'split("\n") | map(select(length > 0))') js_json=$(printf '%s\n' "${js[@]}" | jq -R -s -c 'split("\n") | map(select(length > 0))') echo "notebook=$notebook_json" >> $GITHUB_OUTPUT echo "python=$python_json" >> $GITHUB_OUTPUT echo "gradio=$gradio_json" >> $GITHUB_OUTPUT echo "webcam=$webcam_json" >> $GITHUB_OUTPUT + echo "mcp=$mcp_json" >> $GITHUB_OUTPUT echo "js=$js_json" >> $GITHUB_OUTPUT - name: Print subprojects to test shell: bash @@ -60,4 +67,5 @@ runs: echo "Python subprojects: ${{ steps.group-subprojects.outputs.python }}" echo "Gradio subprojects: ${{ steps.group-subprojects.outputs.gradio }}" echo "Webcam subprojects: ${{ steps.group-subprojects.outputs.webcam }}" + echo "MCP subprojects: ${{ steps.group-subprojects.outputs.mcp }}" echo "JS subprojects: ${{ steps.group-subprojects.outputs.js }}" diff --git a/.github/workflows/sanity-check-kits.yml b/.github/workflows/sanity-check-kits.yml index 057780b7..9cc8862b 100644 --- a/.github/workflows/sanity-check-kits.yml +++ b/.github/workflows/sanity-check-kits.yml @@ -24,6 +24,7 @@ jobs: webcam: ${{ steps.categorize-subprojects.outputs.webcam }} python: ${{ steps.categorize-subprojects.outputs.python }} notebook: ${{ steps.categorize-subprojects.outputs.notebook }} + mcp: ${{ steps.categorize-subprojects.outputs.mcp }} steps: - name: Check out code uses: actions/checkout@v4 @@ -80,6 +81,29 @@ jobs: script: ${{ env.script }} project: ${{ matrix.subproject }} + mcp: + needs: find-subprojects + if: ${{ needs.find-subprojects.outputs.mcp != '[]' }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ ubuntu-24.04, ubuntu-24.04-arm, windows-2025 ] + python: [ "3.11", "3.13" ] + subproject: ${{ fromJson(needs.find-subprojects.outputs.mcp) }} + steps: + - uses: actions/checkout@v4 + - uses: ./.github/reusable-steps/setup-os + - name: Set up Python ${{ matrix.python }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python }} + - uses: ./.github/reusable-steps/setup-python + with: + python: ${{ matrix.python }} + project: ${{ matrix.subproject }} + # todo more steps to be added here + webcam: needs: find-subprojects if: ${{ needs.find-subprojects.outputs.webcam != '[]' }} diff --git a/ai_ref_kits/README.md b/ai_ref_kits/README.md index c700b82b..a7c7688e 100644 --- a/ai_ref_kits/README.md +++ b/ai_ref_kits/README.md @@ -16,6 +16,7 @@ - [🎨 Multimodal AI Visual Generator](#-multimodal-ai-visual-generator) - [πŸ’¬ Conversational AI Chatbot](#-conversational-ai-chatbot) - [πŸ›’ AI Insight Agent with RAG](#-AI-Insight-Agent-with-RAG) + - [πŸ—Ί Agentic Travel Planner](#-agentic-travel-planner) - [Troubleshooting and Resources](#troubleshooting-and-resources) @@ -114,7 +115,6 @@ Multimodal AI Visual Generator is a generative AI reference kit that transforms |--------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------| | Related AI concepts | Speech Recognition, Natural Language Understanding, Large Language Models (LLMs), Retrieval Augmented Generation (RAG), Speech Synthesis, Generative AI | | Example industries | Tourism | -| Demo | | The Conversational AI Chatbot is an open-source, voice-driven chat agent that answers spoken questions with meaningful, spoken responses. It can be configured to respond in any type of scenario or context. This kit demonstrates the AI Chatbot’s capabilities by simulating the experience of talking to a hotel concierge. @@ -126,10 +126,19 @@ This kit demonstrates the AI Chatbot’s capabilities by simulating the experien |--------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------| | Related AI concepts | Natural Language Understanding, Large Language Models (LLMs), Retrieval Augmented Generation (RAG), Agentic AI, Generative AI | | Example industries | Retail | -| Demo | | The AI Insight Agent with RAG uses Large Language Models (LLMs) and Retrieval-Augmented Generation (RAG) to interpret user prompts, engage in meaningful dialogue, perform calculations, use RAG techniques to improve its knowledge and interact with the user to add items to a virtual shopping cart. +### πŸ—Ί Agentic Travel Planner +[![agentic_travel_planner](https://github.com/user-attachments/assets/f2139810-8871-45e4-9f74-333f2e904956)](agentic_multimodal_travel_planer) + +| [Agentic Travel Planner](agentic_multimodal_travel_planer) | | +|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------| +| Related AI concepts | Large Language Models (LLMs), Vision Language Models (VLMs), Retrieval Augmented Generation (RAG), Agentic AI, Generative AI | +| Example industries | Tourism | + +The Agentic Travel Planner leverages Large Language Models (LLMs), Vision Language Models (VLMs), and Retrieval-Augmented Generation (RAG) to create personalized travel itineraries. It interprets attached images, conducts research, and generates detailed plans, and including accommodations, all tailored to the user's interests. + ## Troubleshooting and Resources - Open a [discussion topic](https://github.com/openvinotoolkit/openvino_build_deploy/discussions) - Create an [issue](https://github.com/openvinotoolkit/openvino_build_deploy/issues) diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/README.md b/ai_ref_kits/agentic_multimodal_travel_planer/README.md new file mode 100644 index 00000000..7fd11978 --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/README.md @@ -0,0 +1,362 @@ +
+ +# Agentic Tourism - Multi-Agent Travel Assistant with OpenVINOβ„’ Toolkit +A sophisticated multi-agent system that provides intelligent travel assistance using specialized AI agents for hotel search, flight booking, and image captioning travel recommendations. Built with OpenVINO, MCP (Model Context Protocol), A2A (Agent to Agent Protocol), and the BeeAI framework. + +

+ 🏠 About the Kits  +

+
+ +[![Apache License](https://img.shields.io/badge/license-Apache_2.0-green.svg)](https://github.com/openvinotoolkit/openvino_build_deploy/blob/master/LICENSE.txt) + +--- + +This reference kit demonstrates a multi-agent travel assistant. It coordinates specialized agents for hotel and flight search via MCP-connected tools and uses an image captioning VLM for visual understanding. The system is built with OpenVINOβ„’ and the OpenVINO Model Server for optimized local inference, and orchestrated using BeeAI, MCP, and the A2A protocol. + +This kit uses the following technology stack: + +- [OpenVINO Toolkit](https://docs.openvino.ai/) +- [OpenVINOβ„’ GenAI](https://docs.openvino.ai/2025/openvino-workflow-generative/inference-with-genai.html) +- [Optimum Intel](https://docs.openvino.ai/2025/openvino-workflow-generative/inference-with-optimum-intel.html) +- [OpenVINO Model Server](https://docs.openvino.ai/2025/model-server/ovms_what_is_openvino_model_server.html) +- [BEE AI Framework](https://github.com/i-am-bee/beeai-framework) +- [Model Context Protocol](https://modelcontextprotocol.io/docs/getting-started/intro) +- [A2A](https://github.com/a2aproject/A2A) + +- [OpenVINO/Qwen3-8B-int4-ov](https://huggingface.co/OpenVINO/Qwen3-8B-int4-ov) (LLM) +- [OpenVINO/Phi-3.5-vision-instruct-int4-ov](https://huggingface.co/OpenVINO/Phi-3.5-vision-instruct-int4-ov) (VLM) + +Check out our [AI Reference Kits repository](https://github.com/openvinotoolkit/openvino_build_deploy) for other kits. + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Travel Router Agent β”‚ +β”‚ (Main Coordinator & Orchestrator) β”‚ +β”‚ (Port 9996) β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€-─────┼─────────────────┐ + β”‚ β”‚ β”‚ + β–Ό β–Ό β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚Hotel Finder β”‚ β”‚Flight Finderβ”‚ β”‚Image Proc. β”‚ +β”‚ Agent β”‚ β”‚ Agent β”‚ β”‚ Agent β”‚ +β”‚ (Port 9999)β”‚ β”‚ (Port 9998) β”‚ β”‚ (Port 9997) β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ β”‚ + β–Ό β–Ό β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚Hotel Search β”‚ β”‚Flight Searchβ”‚ β”‚Image Captionβ”‚ +β”‚ MCP Server β”‚ β”‚ MCP Server β”‚ β”‚ MCP Server β”‚ +β”‚ (Port 3001) β”‚ β”‚ (Port 3002) β”‚ β”‚ (Port 3003) β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Hardware OpenVINO AI Stack β”‚ +β”‚ β€’ Qwen3-8B-int4-ov (LLM) β”‚ +β”‚ β€’ Phi-3.5-vision-instruct-int4-ov (VLM) β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + +``` + +--- + +## Steps +1. Start OVMS LLMs +2. Start MCP servers (`start_mcp_servers.py`) +3. Start Agents (`start_agents.py`) +4. Start UI (`start_ui.py`) + +## Setting Up Your Environment + +To set up your environment, you first clone the repository, then create a virtual environment, activate the environment, and install the packages. + +### Clone the Repository + +To clone the repository, run this command: + +```shell +git clone https://github.com/openvinotoolkit/openvino_build_deploy.git +``` + +This command clones the repository into a directory named `openvino_build_deploy` in the current directory. After the directory is cloned, run the following command to go to that directory: + + +```shell +cd openvino_build_deploy/ai_ref_kits/agentic_multimodal_travel_planer +``` + +### Create virtual environment + +``` +python3 -m venv agentic_venv +``` + +### Activate the Environment + +The command you run to activate the virtual environment you created depends on whether you have a Unix-based operating system (Linux or macOS) or a Windows operating system. + +To activate the virtual environment for a **Unix-based** operating system, run: + +```shell +source agentic_venv/bin/activate +``` + +To activate the virtual environment for a **Windows** operating system, run: + +```shell +agentic_venv\Scripts\activate +``` + +This activates the virtual environment and changes your shell's prompt to indicate that you are now working in that environment. + +### Install the Requirements + +To install the required packages, run the following commands: + +```shell +python -m pip install --upgrade pip +pip install -r requirements.txt +``` + +## Step 1: Getting the LLMs for agents ready with OpenVINO model Server (OVMS) + +### Docker Installation +For installation instructions, refer to the [official Docker documentation for Ubuntu](https://docs.docker.com/engine/install/ubuntu/). + +### Get OpenVINO Model Server image +Once you have Docker installed on your machine, pull the OpenVINO Model Server image: +``` +docker pull openvino/model_server:latest +``` + +### Download optimized models + +OpenVINO Model Server will serve your models. In this example you will use two models: an LLM and a VLM. + +Create your folder: +``` +sudo mkdir -p $(pwd)/models +sudo chown -R $(id -u):$(id -g) $(pwd)/models +chmod -R 755 $(pwd)/models +``` + +Agent LLM: **Qwen3-8B** +``` +docker run --user $(id -u):$(id -g) --rm -v $(pwd)/models:/models openvino/model_server:latest --pull --model_repository_path /models --source_model OpenVINO/Qwen3-8B-int4-ov --task text_generation --tool_parser hermes3 +``` + +Vision Language Model (VLM): **Phi-3.5-vision-instruct-int4-ov** +``` +docker run --user $(id -u):$(id -g) --rm -v $(pwd)/models:/models:rw openvino/model_server:latest \ +--pull --model_repository_path /models --source_model OpenVINO/Phi-3.5-vision-instruct-int4-ov --task text_generation --pipeline_type VLM +``` + +### Start OpenVINO Model Server +Once you have your models, your next step is to start the services. + +LLM +``` +docker run -d --user $(id -u):$(id -g) --rm \ + -p 8001:8000 \ + -v $(pwd)/models:/models openvino/model_server:latest \ + --rest_port 8000 \ + --model_repository_path /models \ + --source_model OpenVINO/Qwen3-8B-int4-ov \ + --tool_parser hermes3 \ + --cache_size 2 \ + --task text_generation \ + --enable_prefix_caching true +``` + +VLM +``` +docker run -d --rm \ + -p 8002:8000 \ + -v $(pwd)/models:/models:ro \ + openvino/model_server:latest \ + --rest_port 8000 \ + --model_name OpenVINO/Phi-3.5-vision-instruct-int4-ov \ + --model_path /models/OpenVINO/Phi-3.5-vision-instruct-int4-ov +``` + +### Verify the services are running + +Run: +``` +docker ps +``` + +You should see the two models serving +``` +CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES +424634ea10fe openvino/model_server:latest "/ovms/bin/ovms --re…" 3 days ago Up 3 days 0.0.0.0:8001->8000/tcp, [::]:8001->8000/tcp competent_ganguly9 +a962a7695b1f openvino/model_server:latest "/ovms/bin/ovms --re…" 3 days ago Up 3 days 0.0.0.0:8002->8000/tcp, [::]:8002->8000/tcp agitated_galois +``` + +Your LLM is now running and ready to be used by the agents. + +## Step 2: Start the MCP servers + +This example uses three MCP servers that the agents will consume: +- Flight search (Intel AI Builder): provides available flight options +- Hotel search (Intel AI Builder): provides available hotels +- Image captioning: generates captions for images + +### Get your SerpAPI key +Flight and travel agents use an external API for hotel and flight search. Obtain an API key from SerpAPI. + +1. Go to https://serpapi.com/ +2. Navigate to "Your Private API Key" +3. Copy the key + +Once you have your key, you can launch the MCP servers. + +### Launch MCP servers + +Set your key +``` +export SERP_API_KEY=***YOUR_KEY*** +``` + +Run +``` +python start_mcp_servers.py +``` + +**NOTE**: This script starts the MCP servers in the background and reads configuration from `config/mcp_config.yaml`. You can configure each MCP server there. + +You should see confirmation that the MCP servers are running: +``` +MCP 'image_mcp' started on port 3003 +MCP 'hotel_finder' started on port 3001 +MCP 'flight_finder' started on port 3002 + +Successfully started MCP servers: image_mcp, hotel_finder, flight_finder + +Logs are in `logs/`. You can open each MCP server's log file there. +``` +The script also provides a stop command: + +``` +python start_mcp_servers.py --stop +``` + +## Step 3: Start Agents + +Start all the agents. + +``` +python start_agents.py +``` + +You should see: +``` +Agent 'travel_router' started on port 9996 +Agent 'flight_finder' started on port 9998 +Agent 'hotel_finder' started on port 9999 +Agent 'image_captioning' started on port 9997 + +Successfully started agents: travel_router, flight_finder, hotel_finder, image_captioning + +Logs are in `logs/` +``` + +Logs are in `logs/`. You can open each agent's log file there. + +The script also provides a stop command: + +``` +python start_agents.py --stop +``` + +Logs are in `logs/`. You can navigate to the folder to the log of each Agent server. + +## Step 4: Start UI + +``` +python start_ui.py +``` + +Open `http://127.0.0.1:7860` in your browser. + +## Customization (optional) + +### Agents configuration +Below is how you can configure your own agents for the Agentic Multimodal Travel Planner. This guide covers the basics of agent configuration and helps you set up new agents or customize existing ones. + +#### 1. Overview + +Agents are defined via YAML configuration files located in the `config/` directory: + +- `config/agents_config.yaml`: Controls properties, endpoints, ports, and enabled status of each agent. +- `config/agents_prompts.yaml`: Defines the system instructions (prompt) and templates for communication of each agent. + +#### 2. How to add a new agent + +1. **Open `config/agents_config.yaml`** + Find an agent section (e.g., `flight_finder:`), and use it as a template for your new agent. + + ```yaml + your_custom_agent: + name: "agent_name" + port: 9988 + role : "agent_role" + enabled: true + ``` + + - `your_custom_agent`: Unique name for your agent. + - `name`: Human-readable identifier for your agent. + - `port`: TCP port the agent will listen on (make sure it's unused). + - `role`: Brief role/description for your agent. + - `enabled`: Set to `true` to enable the agent. + +2. **Open `config/agents_prompts.yaml`** + Create an entry for your agent with its system prompt and dialogue template. + + ```yaml + your_custom_agent: + system: | + You are the Custom Agent. Your job is to provide ... (describe the behavior here) + template: | + [User]: {{query}} + [Agent]: + ``` + + - The `system` prompt is injected as the agent’s system-level context. + - The `template` can be customized to guide your agent's responses. + +4. **Start Your Agent** + Make sure your agent is enabled in `agents_config.yaml`, then run: + + ``` + python start_agents.py + ``` + + Your agent should start alongside the others. Check the logs in the `logs/` directory for messages from your agent. + +#### 3. Tips for Customization + +- Use unique port numbers for each new agent. +- You may copy config stanzas for existing agents as a quick start. +- Edit the system prompt to set the "personality" and role for your agent. +- Restart the agent process after changing any YAML config. + +#### 4. Disabling/Enabling Agents + +Set `enabled: false` to disable an agent in `config/agents_config.yaml`β€”that agent won't be started by `start_agents.py`. + +#### 5. Troubleshooting + +- If your agent doesn't appear, check the logs in `logs/` for errors. +- Make sure the port is not being used by another process. +- Ensure your new agent class is properly importable and subclassed as required by the BeeAI Framework. + +--- + +With this approach, you can flexibly expand the capabilities of your travel planning system by adding or customizing new agents to fit your requirements! + +[//]: # (telemetry pixel) + diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/agents/agent_runner.py b/ai_ref_kits/agentic_multimodal_travel_planer/agents/agent_runner.py new file mode 100644 index 00000000..a3a758bf --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/agents/agent_runner.py @@ -0,0 +1,442 @@ +#!/usr/bin/env python3 +""" +Generic Agent Runner - Modular script to run any A2A agent based on YAML config. +It reads configuration from config/agents_config.yaml and prompts from +config/agent_prompts.yaml. + +This script can run: +- Regular agents with MCP tools +- Supervisor agents that coordinate other agents via HandoffTools +- All agents can be run in daemon mode for background operation +""" + +import sys +from pathlib import Path +import argparse +import asyncio +import multiprocessing +import os +from contextlib import AsyncExitStack +import requests +import yaml +import re + +from mcp.client.sse import sse_client +from mcp.client.session import ClientSession + +from beeai_framework.agents.requirement import RequirementAgent +from beeai_framework.agents.requirement.requirements.conditional import ( + ConditionalRequirement, +) +from beeai_framework.adapters.a2a.agents.agent import A2AAgent +from beeai_framework.adapters.a2a.serve.server import ( + A2AServer, + A2AServerConfig, +) +from beeai_framework.backend import ChatModel, ChatModelParameters +from beeai_framework.memory import UnconstrainedMemory +from beeai_framework.middleware.trajectory import GlobalTrajectoryMiddleware +from beeai_framework.serve.utils import LRUMemoryManager +from beeai_framework.tools import Tool +from beeai_framework.tools.handoff import HandoffTool +from beeai_framework.tools.mcp import MCPTool + +# Add parent directory to path for imports (before local imports!) +sys.path.append(str(Path(__file__).parent.parent)) # noqa: E402 +from utils.util import load_config + +# ============================================================================= +# MCP TOOLS MANAGEMENT +# ============================================================================= + + +class MCPToolsManager: + """Manages MCP server connections and tool discovery""" + + async def setup_mcp_tools_with_stack(self, config, stack): + """Setup MCP tools using AsyncExitStack for proper connection management. If the connection fails, the error is caught and the loop continues to the next server.""" + mcp_config = config.get('mcp_config') + if not mcp_config: + return [] + # Handle both old single-server format and new multi-server format + servers = ( + [{'name': 'default', 'url': mcp_config['url'], 'enabled': True}] + if 'url' in mcp_config + else mcp_config.get('servers', []) + ) + + if not servers: + return [] + + enabled_servers = [s for s in servers if s and s.get('enabled', True)] + if not enabled_servers: + return [] + + all_mcp_tools = [] + + for server_config in enabled_servers: + server_url = server_config['url'] + "/sse" + + try: + read_stream, write_stream = await stack.enter_async_context( + sse_client(server_url) + ) + session = await stack.enter_async_context( + ClientSession(read_stream, write_stream) + ) + await session.initialize() + + mcp_tools = await MCPTool.from_client(session) + all_mcp_tools.extend(mcp_tools) + + except (ConnectionError, OSError, TimeoutError): + continue + + return all_mcp_tools + +# ============================================================================= +# AGENT CREATION +# ============================================================================= + + +class AgentFactory: + """Factory class for creating different types of agents""" + + def create_agent(self, config, tools=None, agent_name=None): + """Create an agent from configuration (regular or supervisor)""" + if 'supervised_agents' in config: + return self._create_supervisor_agent(config, tools) + else: + return self._create_regular_agent(config, tools) + + def _create_regular_agent(self, config, tools=None): + """Create a regular A2A agent from configuration""" + llm = ChatModel.from_name( + config['llm_model'], + ChatModelParameters(temperature=config['llm_temperature']) + ) + llm.tool_choice_support = {"auto", "none"} + + # Create middleware if enabled + middlewares = [] + + middleware_config = config['middleware']['trajectory'] + if middleware_config.get('enabled', True): + middlewares.append(GlobalTrajectoryMiddleware( + included=( + [Tool, ChatModel] + if "ChatModel" in middleware_config['included_types'] + else [Tool] + ), + pretty=middleware_config['pretty'], + prefix_by_type={Tool: middleware_config['tool_prefix']} + )) + + + # Handle tool configuration + # If tools are provided externally (MCP tools), use them + # Otherwise, check if tools is a list in config + if tools: + agent_tools = tools + else: + tools_config = config.get('tools', []) + # If tools is a dict with mcp_tools: true, it means + # tools are loaded externally + if (isinstance(tools_config, dict) and + tools_config.get('mcp_tools', False)): + # Empty for now, will be loaded by run_agent + agent_tools = [] + else: + agent_tools = ( + tools_config if isinstance(tools_config, list) else [] + ) + + print(f"Agent using {len(agent_tools)} tools") + if agent_tools: + print(" Connected tools:") + for i, tool in enumerate(agent_tools, 1): + tool_type = type(tool).__name__ + print(f" {i}. {tool.name} ({tool_type})") + else: + print(" No tools connected") + + # Process requirements + requirements = [] + if agent_tools: + for r in config.get('requirements', []): + tool_instance = next( + (t for t in agent_tools if t.name == r["tool_name"]), + None + ) + if tool_instance: + kwargs = {k: v for k, v in r.items() if k != "tool_name"} + # Evaluate string lambdas in custom_checks to callable functions + if "custom_checks" in kwargs and isinstance(kwargs["custom_checks"], list): + import re + kwargs["custom_checks"] = [ + eval(check) if isinstance(check, str) else check + for check in kwargs["custom_checks"] + ] + requirements.append(ConditionalRequirement( + tool_instance, **kwargs + )) + + return RequirementAgent( + llm=llm, + tools=agent_tools, + memory=UnconstrainedMemory(), + instructions=config['prompt'], + middlewares=middlewares, + name=config.get('name'), + description=config.get('description'), + requirements=requirements + ) + + def _create_supervisor_agent(self, config, mcp_tools=None): + """Create a SUPERVISOR agent + that coordinates other agents + via HandoffTools""" + + available_agents, agent_cards = self._discover_supervised_agents( + config + ) + + print("***********CREATING SUPERVISOR AGENT***********") + # Create supervisor tools (HandoffTools) + supervisor_tools = [] + for agent_name, agent_instance in available_agents.items( + ): + agent_description = agent_cards.get(agent_name, {}).get( + "description", agent_name + ) + supervisor_tools.append(HandoffTool( + name=agent_name, + description=agent_description, + target=agent_instance + )) + + # Combine MCP tools with HandoffTools if provided + if mcp_tools: + all_tools = mcp_tools + supervisor_tools + print( + f"Supervisor using {len(mcp_tools)} MCP tools + " + f"{len(supervisor_tools)} HandoffTools = " + f"{len(all_tools)} total tools" + ) + print(" MCP tools:") + for i, tool in enumerate(mcp_tools, 1): + tool_type = type(tool).__name__ + print(f" {i}. {tool.name} ({tool_type})") + print(" HandoffTools:") + for i, tool in enumerate(supervisor_tools, 1): + tool_type = type(tool).__name__ + # Show target agent info - get URL/name, fallback to string + target_info = ( + getattr(tool._target, 'url', None) + or getattr(tool._target, 'name', None) + or str(tool._target) + ) + print(f" {i}. {tool.name} ({tool_type}) -> {target_info}") + else: + all_tools = supervisor_tools + print( + f"Supervisor using {len(supervisor_tools)} " + f"HandoffTools (no MCP tools)" + ) + print(" HandoffTools:") + for i, tool in enumerate(supervisor_tools, 1): + tool_type = type(tool).__name__ + # Show target agent info - get URL/name, fallback to string + target_info = ( + getattr(tool._target, 'url', None) + or getattr(tool._target, 'name', None) + or str(tool._target) + ) + print(f" {i}. {tool.name} ({tool_type}) -> {target_info}") + + return self._create_regular_agent(config, all_tools) + + def _discover_supervised_agents(self, config): + """Discover A2A agents that the supervisor can delegate to""" + available_agents = {} + agent_cards = {} + + if 'supervised_agents' not in config: + return available_agents, agent_cards + + for agent_config in config['supervised_agents']: + agent_name = agent_config['name'] + agent_url = agent_config['url'] + + # Override URL with environment variable if available + port_env_var = agent_config.get('port_env_var') + if port_env_var and port_env_var in os.environ: + port = os.environ[port_env_var] + base_url = agent_url.rsplit(':', 1)[0] + agent_url = f"{base_url}:{port}" + + try: + agent_card_url = f"{agent_url}/.well-known/agent-card.json" + response = requests.get(agent_card_url, timeout=5) + + if response.status_code == 200: + agent_card = response.json() + agent_cards[agent_name] = agent_card + available_agents[agent_name] = A2AAgent( + url=agent_url, memory=UnconstrainedMemory() + ) + print(f"{agent_name}: Connected successfully") + else: + print( + f"{agent_name}: Agent card not found " + f"(HTTP {response.status_code})" + ) + + except requests.RequestException as e: + print(f" {agent_name}: Not accessible ({e})") + + print(f"Discovered {len(available_agents)} A2A agents for supervision") + return available_agents, agent_cards + +# ============================================================================= +# SERVER MANAGEMENT +# ============================================================================= + + +class ServerManager: + """Manages A2A server creation and lifecycle""" + + def create_server(self, agent, config): + """Create and configure the A2A server""" + return A2AServer( + config=A2AServerConfig(port=config['port']), + memory_manager=LRUMemoryManager( + maxsize=config['memory_size'] + ) + ).register( + agent, name=config['name'], description=config['description'] + ) + +# ============================================================================= +# UTILITY FUNCTIONS +# ============================================================================= + + +class ConfigManager: + """Manages configuration loading and agent discovery""" + + @staticmethod + def get_available_agents(): + """Dynamically get available agents from config directory""" + config_dir = Path(__file__).parent.parent / "config" + config_file = config_dir / "agents_config.yaml" + + if not config_file.exists(): + return [] + + with open(config_file, 'r', encoding='utf-8') as f: + agents_config = yaml.safe_load(f) + + return list(agents_config.keys()) + +# ============================================================================= +# MAIN AGENT RUNNER +# ============================================================================= + + +class AgentRunner: + """Main orchestrator class for running agents""" + + def __init__(self): + self.mcp_manager = MCPToolsManager() + self.agent_factory = AgentFactory() + self.server_manager = ServerManager() + self.config_manager = ConfigManager() + self.agent_instance = None # Store agent reference + + def reset_agent_memory(self): + """Reset the memory of the agent instance""" + if self.agent_instance and hasattr(self.agent_instance, 'memory'): + self.agent_instance.memory.reset() + print("Agent memory reset successfully") + + async def run_agent(self, agent_name): + """Run an agent based on its configuration. + + Args: + agent_name: Name of the agent to run from config + """ + config = load_config(agent_name) + mcp_config = config.get('mcp_config') + has_valid_mcp = mcp_config and ( + 'url' in mcp_config or + ('servers' in mcp_config and mcp_config['servers']) + ) + + if has_valid_mcp: + # Keep AsyncExitStack alive for the entire agent lifecycle + async with AsyncExitStack() as stack: + all_mcp_tools = ( + await self.mcp_manager.setup_mcp_tools_with_stack( + config, stack + ) + ) + if not all_mcp_tools: + return + agent = self.agent_factory.create_agent( + config, all_mcp_tools, agent_name + ) + self.agent_instance = agent # Store agent reference + server = self.server_manager.create_server(agent, config) + + # Run server in the same process to keep MCP connections alive + print(f"Starting {agent_name} with MCP tools...") + try: + await asyncio.to_thread(server.serve) + except KeyboardInterrupt: + print(f"\n{agent_name} stopped") + else: + agent = self.agent_factory.create_agent(config, None, agent_name) + self.agent_instance = agent # Store agent reference + server = self.server_manager.create_server(agent, config) + + process = multiprocessing.Process(target=server.serve) + process.start() + + try: + while process.is_alive(): + await asyncio.sleep(1) + except (KeyboardInterrupt, asyncio.CancelledError): + process.terminate() + process.join() + + def get_available_agents(self): + """Get list of available agents""" + return self.config_manager.get_available_agents() + + +def main(): + """Main entry point""" + runner = AgentRunner() + available_agents = runner.get_available_agents() + + parser = argparse.ArgumentParser(description='Generic A2A Agent Runner') + parser.add_argument( + '--agent', + choices=available_agents, + help=f'Agent to run. Available: {", ".join(available_agents)}' + ) + args = parser.parse_args() + + try: + asyncio.run(runner.run_agent(args.agent)) + except KeyboardInterrupt: + pass + except (RuntimeError, ValueError) as e: + print(f"Error starting {args.agent}: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() + diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/config/agents_config.yaml b/ai_ref_kits/agentic_multimodal_travel_planer/config/agents_config.yaml new file mode 100644 index 00000000..58d6d3cd --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/config/agents_config.yaml @@ -0,0 +1,211 @@ +# Agents Configuration File + +# Travel Router Agent Configuration (Supervisor) +travel_router: + name: "travel_router" + description: "Intelligent travel coordinator that provides accurate, realistic travel information by consulting specialized agents" + port: 9996 + role: "Travel Coordinator Supervisor" + enabled: true + + # LLM Configuration for Travel Router + llm: + model: "openai:OpenVINO/Qwen3-8B-int4-ov" + temperature: 0.3 + api_key: "unused" + api_base: "http://127.0.0.1:8001/v3" + + # Memory Configuration + memory_size: 100 + + # Supervised Agents Configuration. It means that the travel_router agent can delegate to the following agents. + supervised_agents: + - name: "hotel_finder" + url: "http://127.0.0.1:9999" + - name: "flight_finder" + url: "http://127.0.0.1:9998" + - name: "image_captioning" + url: "http://127.0.0.1:9997" + + # Tools Configuration (supervisor specific) + tools: + - name: "ThinkTool" + enabled: true # Disable it if you want to improve response time, tool for planning + - name: "HandoffTool" + enabled: true + auto_discovered: true + + # Middleware Configuration + middleware: + trajectory: + included_types: ["Tool"] + pretty: true + tool_prefix: "πŸ” " + + # Requirements with custom_checks to validate info before delegating + requirements: + - tool_name: "flight_finder" + custom_checks: + - "lambda state: any(re.search(r'\\b(yes|correct|ok|sure|go ahead|proceed|that\\'s right|right|confirmed)\\b', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text') and hasattr(msg, 'role') and msg.role == 'user') or any(re.search(r'\\b(from|departure|leaving|origin|flying from)\\s+[A-Z][a-z]{2,}|^[A-Z][a-z]{2,}\\s+(to|destination)', msg.text) for msg in state.memory.messages if hasattr(msg, 'text') and hasattr(msg, 'role') and msg.role == 'user')" + - "lambda state: any(re.search(r'\\b(yes|correct|ok|sure|go ahead|proceed|that\\'s right|right|confirmed)\\b', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text') and hasattr(msg, 'role') and msg.role == 'user') or any(re.search(r'\\b(to|destination|going|arriving|flying to)\\s+[A-Z][a-z]{2,}|[A-Z][a-z]{2,}\\s+(from|departure)', msg.text) for msg in state.memory.messages if hasattr(msg, 'text') and hasattr(msg, 'role') and msg.role == 'user')" + - "lambda state: any(re.search(r'\\b(yes|correct|ok|sure|go ahead|proceed|that\\'s right|right|confirmed)\\b', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text') and hasattr(msg, 'role') and msg.role == 'user') or any(re.search(r'\\b(\\d{4}-\\d{2}-\\d{2}|\\d{1,2}[/-]\\d{1,2}[/-]\\d{2,4}|(jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\\s+\\d{1,2}|\\d{1,2}\\s+(jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*|march|april|may|june|july|august|september|october|november|december|january|february)', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text') and hasattr(msg, 'role') and msg.role == 'user')" + - "lambda state: any(re.search(r'\\b(yes|correct|ok|sure|go ahead|proceed|that\\'s right|right|confirmed)\\b', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text') and hasattr(msg, 'role') and msg.role == 'user') or any(re.search(r'\\b(return|returning|inbound|back|come back|march|april|may|june|july|august|september|october|november|december|january|february|jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text') and hasattr(msg, 'role') and msg.role == 'user')" + - "lambda state: any(re.search(r'\\b(yes|correct|ok|sure|go ahead|proceed|that\\'s right|right|confirmed)\\b', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text') and hasattr(msg, 'role') and msg.role == 'user') or any(re.search(r'\\b(economy|business|first|coach)\\b', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text') and hasattr(msg, 'role') and msg.role == 'user')" + - tool_name: "hotel_finder" + custom_checks: + - "lambda state: any(re.search(r'\\b(in|at|to|destination)\\s+[a-z]+', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text'))" + - "lambda state: any(re.search(r'\\b(check.?in|arrival|arriving).*?\\d{4}-\\d{2}-\\d{2}|\\d{1,2}[/-]\\d{1,2}[/-]\\d{2,4}', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text'))" + - "lambda state: any(re.search(r'\\b(check.?out|departure|leaving).*?\\d{4}-\\d{2}-\\d{2}|\\d{1,2}[/-]\\d{1,2}[/-]\\d{2,4}', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text'))" + - "lambda state: any(re.search(r'\\b(\\d+|one|two|three|four|five|six|seven|eight|nine|ten)\\s+(guest|people|person|adult)', msg.text.lower()) for msg in state.memory.messages if hasattr(msg, 'text'))" + + +# Flight Finder Agent Configuration +flight_finder: + name: "flight_finder" + description: "Specialized agent for flight search" + port: 9998 + role: "Flight Search Specialist" + enabled: true + + llm: + model: "openai:OpenVINO/Qwen3-8B-int4-ov" + temperature: 0.3 + api_key: "unused" + api_base: "http://127.0.0.1:8001/v3" + + # Memory Configuration + memory_size: 100 + + mcp_config: + servers: + - name: "Flight_search_api" + url: "http://127.0.0.1:3002" + protocol: "sse" + description: "Flight search" + enabled: true + + # Memory Configuration + memory_size: 5 + + # Tools Configuration + tools: + mcp_tools: true + + # Tools Configuration specify if it would use mcp_tools or beeai api tools + tools: + mcp_tools: true + + # Middleware Configuration + middleware: + trajectory: + included_types: ["Tool"] + pretty: true + tool_prefix: "πŸ” " + + # BeeAI rules requirements, useful to force agent behaviour + requirements: + - tool_name: "search_flights" + force_at_step: 1 + max_invocations: 2 + +# Hotel Finder Agent Configuration +# Hotel Finder Agent Configuration +hotel_finder: + name: "hotel_finder" + description: "Specialized agent for hotel search with access to hotel search tools" + port: 9999 + role: "Hotel Search Specialist" + enabled: true + + # LLM Configuration + llm: + model: "openai:Qwen/Qwen3-8B" + temperature: 0.3 + api_key: "unused" + api_base: "http://127.0.0.1:8001/v3" + + mcp_config: + servers: + - name: "Hotel Search Tool" + url: "http://127.0.0.1:3001" + protocol: "sse" + description: "Hotel search tool to get prices and accomodation places" + enabled: true + + + # Memory Configuration + memory_size: 5 + + # Tools Configuration + tools: + - name: "ThinkTool" + enabled: true + - name: "mcp_tools" + enabled: true + - name: "FinalAnswerTool" + enabled: true + + # Middleware Configuration (For console logging) + middleware: + trajectory: + included_types: ["Tool", "ChatModel"] + pretty: true + tool_prefix: "πŸ” " + + # BeeAI rules requirements, useful to force agent behaviour + requirements: + - tool_name: "search_hotels" + force_at_step: 1 + max_invocations: 2 + +# Video Analysis Agent Configuration +image_captioning: + name: "image_captioning" + description: "Image captioning agent with 2 core tools: image ingestion and image captioning" + port: 9997 + role: "Video Analysis Specialist" + enabled: true + # LLM Configuration for Video Search + llm: + model: "openai:OpenVINO/Qwen3-8B-int4-ov" + temperature: 0.3 + api_key: "unused" + api_base: "http://127.0.0.1:8001/v3" + + # MCP Configuration for image captioning + mcp_config: + url: "http://localhost:3003" + protocol: "sse" + description: "Image captioning tool to get captions for images" + enabled: true + + # Memory Configuration + memory_size: 5 + + # Requirements - Force tool usage + requirements: + - tool_name: "ingest_videos" + force_at_step: 1 + max_invocations: 1 + # Tools Configuration (MCP-based) + tools: + type: "mcp_tools" + source: "mcp_server" + + # Middleware Configuration + middleware: + trajectory: + enabled: false # Video agent doesn't use trajectory middleware + +########################################################### +# EXAMPLE FOR HELPER RULES FOR AGENTS +# Explore https://framework.beeai.dev/modules/agents/requirement-agent to know more about the requirements +########################################################### +# requirements: +# - tool_name: "list_hotels" +# force_at_step: 1 +# max_invocations: 2 +# - tool_name: "search_hotels" +# only_after: ["list_hotels"] +# - tool_name: "search_flights" +# max_invocations: 0 \ No newline at end of file diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/config/agents_prompts.yaml b/ai_ref_kits/agentic_multimodal_travel_planer/config/agents_prompts.yaml new file mode 100644 index 00000000..daa40a3e --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/config/agents_prompts.yaml @@ -0,0 +1,119 @@ +# Agents Prompts File +# This file contains all prompts/instructions for different agents + +# Travel Router Agent Prompt (Supervisor) +travel_router_prompt: | + You are a travel assistant. Follow these steps: + + FOR FLIGHTS: + STEP 1: Collect ALL required information: + - from (departure city) + - to (destination city) + - departure_date + - return_date + - class + + If ANY is missing, use final_answer to ask for it. + + STEP 2: Once you have ALL 5 values, send confirmation: + Use final_answer with the ACTUAL values the user provided. Replace placeholders with real values from the conversation. + Format: "I will search for flights from [actual_from_city] to [actual_to_city], departing [actual_departure_date], returning [actual_return_date], in [actual_class]. Is this correct?" + + STEP 3: Wait for user confirmation: + - If user says "yes", "correct", "ok", "sure", "go ahead", "proceed", "that's right" β†’ Use HandoffTool[flight_finder] + - If user says "no" or corrects β†’ Go back to STEP 1 + + STEP 4: After confirmation, delegate with ACTUAL values: + HandoffTool[flight_finder] with {"from": "[actual_from_city]", "to": "[actual_to_city]", "departure_date": "[actual_departure_date]", "return_date": "[actual_return_date]", "class": "[actual_class]"} + + IMPORTANT: Use the actual values from the conversation, never use placeholders like [FROM] or [TO] in your responses. + + FOR HOTELS: + STEP 1: Collect ALL required information: + - destination (city) + - check_in_date + - check_out_date + - guests + + If ANY is missing, use final_answer to ask for it. + + STEP 2: Once you have ALL 4 values, send confirmation: + Use final_answer with the ACTUAL values the user provided. Replace placeholders with real values from the conversation. + Format: "I will search for hotels in [actual_destination] from [actual_check_in] to [actual_check_out] for [actual_guests] guests. Is this correct?" + + IMPORTANT: Use the actual values from the conversation, never use placeholders in your responses. + + STEP 3: Wait for user confirmation, then delegate with HandoffTool[hotel_finder] + + FOR IMAGES: + When user provides an image or asks about an image: + Use HandoffTool[image_captioning] with {"task": ": "} + + Example: + HandoffTool[image_captioning] with {"task": "What is shown in this image?: /path/to/image.jpg"} + + IMPORTANT: Always confirm before delegating flights/hotels. Never delegate without user confirmation. + +# Flight Finder Agent Prompt +flight_finder_prompt: | + You search for flights using the data provided by the travel coordinator. + + You will receive: from, to, departure_date, return_date, class + + CRITICAL: Today's date is [TODAY'S DATE]. Before calling search_flights_tool, check if dates are in the past: + + - If departure_date is before today: Change the year to next year (e.g., if today is 2025-11-26 and departure_date is 2025-03-15, use 2026-03-15) + - If return_date is before today: Change the year to next year (e.g., if today is 2025-11-26 and return_date is 2025-03-22, use 2026-03-22) + - If return_date is before departure_date after adjustment: Adjust return_date to be after departure_date + + Date format: YYYY-MM-DD. Compare dates: "2025-03-01" < "2025-11-26" means March 1st is before November 26th. + + After adjusting dates if needed, call search_flights_tool with: + - origin = from value + - destination = to value + - outbound_date = adjusted departure_date (or original if not in past) + - return_date = adjusted return_date (or original if not in past) + - travel_class = class value (use "economy" if "coach") + + Return results using final_answer. + + BE EFFICIENT: Speed over perfection. +# Hotel Finder Agent Prompt +hotel_finder_prompt: | + You search for hotels using the data provided by the travel coordinator. + + You will receive: destination, check_in, check_out, guests + + Call search_hotels with the provided values: + - city = destination value + - check_in_date = check_in value + - check_out_date = check_out value + - adults = guests value + + Return results using final_answer. + +# Video Analysis Agent Prompt +image_captioning_prompt: | + You are a **Image Analysis Specialist**. Your primary responsibility is to analyze images and provide descriptions using the image_captioning tool. + + βš™οΈ CRITICAL INSTRUCTIONS: + - You MUST always call the **image_captioning** tool for any request that involves image analysis + - You must NEVER reply with explanations or text output. Only perform the tool action. + + 🧰 AVAILABLE TOOLS: + - **image_captioning(image_path, prompt)** - Your MCP tool for image analysis + - **image_path**: the full path of the image file + - **prompt**: the prompt for the image captioning + + πŸͺœ WORKFLOW: + 1. **Extract parameters from handoff or context:** + - From handoff input: use image_path and prompt directly + - From conversation: look for UserMessage with file content or "Image path as context: β€” " format + + 2. **Call image_captioning tool immediately:** + - Use extracted image_path and prompt + - No analysis or explanation needed + + 🧩 EXAMPLES: + **Handoff Input:** {"image_path": "", "prompt": "what is shown in this image?"} + **Action:** image_captioning(image_path="", prompt="what is shown in this image?") diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/config/mcp_config.yaml b/ai_ref_kits/agentic_multimodal_travel_planer/config/mcp_config.yaml new file mode 100644 index 00000000..e3400487 --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/config/mcp_config.yaml @@ -0,0 +1,15 @@ +# MCP Servers Configuration + +image_mcp: + model_id: "OpenVINO/Phi-3.5-vision-instruct-int4-ov" + ovms_base_url: "http://localhost:8002/v3" + script: mcp_tools/image_captioning_mcp.py + mcp_port: 3003 + +hotel_finder: + script: mcp_tools/ai_builder_mcp_hotel_finder.py + mcp_port: 3001 + +flight_finder: + mcp_port: 3002 + script: mcp_tools/ai_builder_mcp_flights.py \ No newline at end of file diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/mcp_tools/image_captioning_mcp.py b/ai_ref_kits/agentic_multimodal_travel_planer/mcp_tools/image_captioning_mcp.py new file mode 100644 index 00000000..eb55e987 --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/mcp_tools/image_captioning_mcp.py @@ -0,0 +1,206 @@ +"""Image captioning MCP server for the agentic multimodal travel planner. + +This module provides an MCP server that uses OpenVINO Model Server (OVMS) +to perform image captioning tasks using vision-language models. +""" + +import asyncio +import base64 +import sys +from pathlib import Path + +import yaml +from mcp.server.fastmcp import Context, FastMCP +from openai import OpenAI + + +class ImageCaptionServer: + """Image captioning MCP server using OpenVINO Model Server (OVMS). + + This server provides image captioning capabilities through an MCP + interface, using OVMS with vision-language models accessible via + the OpenAI API format. + + Attributes: + config_path: Path to the YAML configuration file. + config: Loaded configuration dictionary. + model: Model identifier from config. + base_url: OVMS base URL. + mcp_port: Port for the MCP server. + client: OpenAI client instance. + mcp_server: FastMCP server instance. + """ + + def __init__(self, config_path: str): + """Initialize the ImageCaptionServer. + + Args: + config_path: Path to the YAML configuration file. + + Raises: + ValueError: If config_path is empty or missing required fields. + """ + if not config_path or not config_path.strip(): + raise ValueError("config_path is required and cannot be empty") + + self.config_path = Path(config_path) + self.config = self._load_config() + self._validate_config() + self._initialize_resources() + + # Initialize OpenAI client with OVMS base URL + self.client = OpenAI( + base_url=self.base_url, + api_key="dummy" + ) + + # Initialize MCP server + self.mcp_server = FastMCP( + name="ImageCaptionMCP", + port=self.mcp_port, + host="0.0.0.0", + ) + + def _load_config(self) -> dict: + """Load YAML configuration from file. + + Returns: + Configuration dictionary. + + Exits: + If config file is not found or cannot be loaded. + """ + if not self.config_path.exists(): + print( + f"Error: Config file '{self.config_path}' not found. Exiting." + ) + sys.exit(1) + + try: + with open(self.config_path, 'r') as f: + config = yaml.safe_load(f) + return config if config is not None else {} + except Exception as e: + print(f"Error: Failed to load config '{self.config_path}': {e}") + sys.exit(1) + + def _get_config_value(self, path: str, default=None): + """Get nested config value using dotted path notation. + + Args: + path: Dotted path to the config value (e.g., 'image_mcp.model_id'). + default: Default value if path not found. + + Returns: + Configuration value or default if not found. + """ + try: + value = self.config + for key in path.split('.'): + value = value[key] + return value + except (KeyError, TypeError): + return default + + def _validate_config(self): + """Validate and extract required configuration fields. + + Raises: + ValueError: If required fields are missing from config. + """ + self.model = self._get_config_value('image_mcp.model_id', None) + if not self.model: + raise ValueError("Missing required field: image_mcp.model_id") + + self.base_url = self._get_config_value( + 'image_mcp.ovms_base_url', + None + ) + if not self.base_url: + raise ValueError( + "Missing required field: image_mcp.ovms_base_url" + ) + + self.mcp_port = self._get_config_value('image_mcp.mcp_port', 3005) + + def _initialize_resources(self): + """Initialize additional resources like headers.""" + self.headers = {"Content-Type": "application/json"} + + def encode_image(self, image_path: str) -> str: + """Encode local image file to base64 string. + + Args: + image_path: Path to the image file. + + Returns: + Base64-encoded string representation of the image. + """ + with open(image_path, "rb") as f: + return base64.b64encode(f.read()).decode("utf-8") + + def image_captioning(self, image_path: str, prompt: str) -> str: + """Generate image caption using OVMS via OpenAI API format. + + Args: + image_path: Path to the image file. + prompt: Text prompt to guide the image captioning. + + Returns: + Generated caption text for the image. + """ + img_data = self.encode_image(image_path) + resp = self.client.chat.completions.create( + model=self.model, + messages=[{ + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{img_data}" + } + } + ] + }] + ) + return resp.choices[0].message.content + + def run(self): + """Start the MCP server with registered tools.""" + asyncio.run(self.mcp_server.run_sse_async()) + + +def main(): + """Main entry point for the image captioning MCP server.""" + # Initialize server with absolute config path so it works from any CWD + repo_root = Path(__file__).resolve().parent.parent + config_path = repo_root / "config" / "mcp_config.yaml" + server = ImageCaptionServer(str(config_path)) + + # Register the image captioning tool + @server.mcp_server.tool() + async def image_captioning( + image_path: str, + prompt: str, + ctx: Context, + ) -> str: + """Generate caption for an image using vision-language model. + + Args: + image_path: Path to the image file to caption. + prompt: Text prompt to guide the captioning. + ctx: MCP context object. + + Returns: + Generated caption text. + """ + return server.image_captioning(image_path, prompt) + + # Start the server + server.run() + + +if __name__ == "__main__": + main() diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/requirements.txt b/ai_ref_kits/agentic_multimodal_travel_planer/requirements.txt new file mode 100644 index 00000000..5befe0e0 --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/requirements.txt @@ -0,0 +1,10 @@ +gradio==5.49.1 +beeai-framework==0.1.56 +beeai-framework[a2a]==0.1.56 +uvicorn==0.37.0 +PyYAML==6.0.1 +fastapi==0.115.5 +python-dotenv==1.1.0 +mcp==1.10.1 +google-search-results==2.4.2 +openai==1.108.2 \ No newline at end of file diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/start_agents.py b/ai_ref_kits/agentic_multimodal_travel_planer/start_agents.py new file mode 100644 index 00000000..61c37829 --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/start_agents.py @@ -0,0 +1,163 @@ +"""Agent startup script to create a travel planner multimodal agentic system. + +This module manages the lifecycle of multiple agents, starting worker agents +before the supervisor agent to ensure proper dependency initialization. +""" +import os +import subprocess +import sys +import time +from pathlib import Path + +import yaml + +from utils.util import is_port_in_use, kill_processes_on_port + +CONFIG_PATH = Path("config/agents_config.yaml") +AGENT_RUNNER = Path("agents/agent_runner.py") +LOG_DIR = Path("logs") +LOG_DIR.mkdir(exist_ok=True) + + +def start_agent(name, config): + """Start an individual agent process. + + Args: + name: The name of the agent to start. + config: Configuration dictionary for the agent. + + Returns: + True if agent started successfully, False otherwise. + """ + port = config.get("port") + if not port: + print(f"Agent '{name}' missing port, skipping.") + return False + + # Kill any process using the port + if is_port_in_use(port): + kill_processes_on_port(port) + time.sleep(0.5) + + log_file = LOG_DIR / f"{name}.log" + + try: + # Start process with output redirected to log file (unbuffered) + with open(log_file, "w", buffering=1) as log: + proc = subprocess.Popen( + [sys.executable, "-u", str(AGENT_RUNNER), "--agent", name], + stdout=log, + stderr=subprocess.STDOUT, + start_new_session=True, + env={**os.environ, "PYTHONUNBUFFERED": "1"}, + ) + + # Wait for agent to be ready by monitoring log output + ready = False + start_time = time.time() + timeout_s = 30 + log_position = 0 + + while time.time() - start_time < timeout_s: + # Check if process died early + if proc.poll() is not None: + print( + f"Agent '{name}' exited early (code: {proc.returncode})" + ) + return False + + # Read new log content + if log_file.exists(): + with open(log_file, "r") as f: + f.seek(log_position) + new_content = f.read() + log_position = f.tell() + + if new_content: + # Check for readiness indicators + if "uvicorn running on" in new_content.lower(): + ready = True + break + + # Also verify port is in use + if is_port_in_use(port): + ready = True + break + + time.sleep(0.3) + + if ready: + time.sleep(0.5) + print(f"Agent '{name}' started on port {port}") + return True + + print(f"Warning: Agent '{name}' timed out waiting for readiness.") + return False + + except Exception as e: + print(f"Failed to start agent '{name}': {e}") + return False + + +def stop_agents(): + subprocess.run(["pkill", "-f", str(AGENT_RUNNER)], check=False) + print("All agents stopped.") + + +def main(): + """Main entry point for starting agents. + + Starts all worker agents first, then starts the supervisor agent to + ensure proper dependency initialization. + """ + if len(sys.argv) > 1 and sys.argv[1] == "--stop": + stop_agents() + return + + if not CONFIG_PATH.exists(): + print(f"Config file not found: {CONFIG_PATH}") + sys.exit(1) + + with open(CONFIG_PATH, "r") as file: + config = yaml.safe_load(file) or {} + + # Separate supervisor from other agents + supervisor_name = "travel_router" + supervisor_config = None + other_agents = {} + + for name, agent_conf in config.items(): + if name == supervisor_name: + supervisor_config = agent_conf + else: + other_agents[name] = agent_conf + + started = [] + failed = [] + + # Start all non-supervisor agents first + print("Starting worker agents...") + for name, agent_conf in other_agents.items(): + if agent_conf.get("enabled", True) and start_agent(name, agent_conf): + started.append(name) + else: + failed.append(name) + + # Start supervisor last if it exists + if supervisor_config: + print(f"\nStarting supervisor agent ({supervisor_name})...") + enabled = supervisor_config.get("enabled", True) + if enabled and start_agent(supervisor_name, supervisor_config): + started.append(supervisor_name) + else: + failed.append(supervisor_name) + + if started: + print(f"\nSuccessfully started agents: {', '.join(started)}") + if failed: + print(f"Failed to start agents: {', '.join(failed)}") + print(f"\nLogs are in {LOG_DIR}/\n") + + +if __name__ == "__main__": + main() diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/start_mcp_servers.py b/ai_ref_kits/agentic_multimodal_travel_planer/start_mcp_servers.py new file mode 100644 index 00000000..24e90821 --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/start_mcp_servers.py @@ -0,0 +1,394 @@ +"""MCP server startup script for the agentic multimodal travel planner. + +This module manages MCP (Model Context Protocol) servers, providing +functionality to start, stop, and download server scripts from GitHub. +""" + +import subprocess +import sys +import time +import urllib.error +import urllib.request +from pathlib import Path +from typing import Dict, List, Tuple + +import yaml + +CONFIG_PATH = Path("config/mcp_config.yaml") +LOG_DIR = Path("logs") +LOG_DIR.mkdir(exist_ok=True) + +# GitHub URLs for known MCP servers +GITHUB_MCP_URLS = { + "ai_builder_mcp_flights.py": ( + "https://raw.githubusercontent.com/intel/intel-ai-assistant-builder/" + "main/mcp/mcp_servers/mcp_google_flight/server.py" + ), + "ai_builder_mcp_hotel_finder.py": ( + "https://raw.githubusercontent.com/intel/intel-ai-assistant-builder/" + "main/mcp/mcp_servers/mcp_google_hotel/server.py" + ), +} + +from utils.util import is_port_in_use, kill_processes_on_port + +def download_script_if_missing(name: str, script_path: Path) -> bool: + """Download script from GitHub if it doesn't exist locally. + + Args: + name: The friendly name of the MCP server. + script_path: Path where the script should be saved. + + Returns: + True if download was successful, False otherwise. + """ + script_name = script_path.name + + if script_name not in GITHUB_MCP_URLS: + print( + f"βœ— Script '{script_name}' not found and no download URL available" + ) + return False + + url = GITHUB_MCP_URLS[script_name] + + try: + print(f"Downloading '{name}' from GitHub...") + print(f" URL: {url}") + print(f" Destination: {script_path}") + + # Create parent directory if needed + script_path.parent.mkdir(parents=True, exist_ok=True) + + with urllib.request.urlopen(url, timeout=30) as response: + content = response.read() + + with open(script_path, "wb") as f: + f.write(content) + + # Make executable + script_path.chmod(0o755) + + print(f"βœ“ Downloaded '{name}' successfully") + return True + + except urllib.error.URLError as e: + print(f"βœ— Failed to download '{name}': {e}") + return False + except Exception as e: + print(f"βœ— Error downloading '{name}': {e}") + return False + + +def start_mcp_server(name: str, conf: Dict) -> bool: + """Start an individual MCP server process. + + Args: + name: The name of the MCP server to start. + conf: Configuration dictionary for the MCP server. + + Returns: + True if server started successfully, False otherwise. + """ + script = conf.get("script") + if not script: + print(f"MCP '{name}' missing script, skipping.") + return False + + port = conf.get("mcp_port") + + # If a port is specified, ensure it is free + if port: + if is_port_in_use(port): + kill_processes_on_port(port) + time.sleep(0.5) + + # Resolve script path + script_path = Path(script) + if not script_path.exists(): + print(f"Script for '{name}' not found: {script_path}") + print(f"Attempting to download from GitHub...") + if not download_script_if_missing(name, script_path): + return False + print() + + log_file = LOG_DIR / f"mcp_{name}.log" + + # Build command + cmd: List[str] = [sys.executable, str(script_path)] + script_lower = script_path.name.lower() + if "ai_builder_mcp" in script_lower: + cmd += ["start", "--protocol", "sse"] + if port: + cmd += ["--port", str(port)] + + try: + # Start process with output redirected to log file + with open(log_file, "w") as log: + proc = subprocess.Popen( + cmd, + stdout=log, + stderr=subprocess.STDOUT, + start_new_session=True, + ) + + # Wait for server to be ready by monitoring log output + ready = False + start_time = time.time() + timeout_s = 30 + log_position = 0 + + while time.time() - start_time < timeout_s: + # Check if process died early + if proc.poll() is not None: + print(f"MCP '{name}' exited early (code: {proc.returncode})") + return False + + # Read new log content + if log_file.exists(): + with open(log_file, "r") as f: + f.seek(log_position) + new_content = f.read() + log_position = f.tell() + + if new_content: + # Check for readiness indicators in new content + content_lower = new_content.lower() + if any(s in content_lower for s in [ + "uvicorn running on", + "server started successfully", + "mcp server started", + "starting simple video mcp server", + ]): + ready = True + break + + # Also verify port is in use if specified + if port and is_port_in_use(port): + ready = True + break + + time.sleep(0.3) + + if ready: + time.sleep(0.5) + status = ( + f"MCP '{name}' started" + + (f" on port {port}" if port else "") + ) + print(status) + return True + + print(f"Warning: MCP '{name}' timed out waiting for readiness.") + return False + + except Exception as e: + print(f"Failed to start MCP '{name}': {e}") + return False + + +def load_config() -> Dict: + """Load MCP server configuration from YAML file. + + Returns: + Configuration dictionary, or empty dict if file not found. + """ + if not CONFIG_PATH.exists(): + print(f"Config file not found: {CONFIG_PATH}") + return {} + with open(CONFIG_PATH, "r") as f: + return yaml.safe_load(f) or {} + + +def select_targets( + cfg: Dict, only: List[str] = None +) -> List[Tuple[str, Dict]]: + """Select MCP server targets from configuration. + + Args: + cfg: Configuration dictionary. + only: Optional list of server names to filter by. + + Returns: + List of tuples containing (server_name, config_dict). + """ + items: List[Tuple[str, Dict]] = [] + for name, section in (cfg or {}).items(): + if not isinstance(section, dict): + continue + if "script" not in section: + continue + if only and name not in only: + continue + items.append((name, section)) + return items + + +def stop_mcp_servers( + targets: List[Tuple[str, Dict]], + kill_all: bool = False, +) -> None: + """Stop running MCP server processes. + + Args: + targets: List of (server_name, config_dict) tuples to stop. + kill_all: If True, aggressively kill by known process patterns. + """ + killed = 0 + for name, section in targets: + port = section.get("mcp_port") + if port: + try: + result = subprocess.run( + ["lsof", "-t", f"-i:{port}"], + capture_output=True, + text=True, + check=False, + ) + for pid in result.stdout.strip().split("\n"): + if pid: + subprocess.run(["kill", "-9", pid], check=False) + print(f"Killed process {pid} on port {port}") + killed += 1 + except FileNotFoundError: + # Best effort if lsof missing + pass + + # Also try to kill by script name as a fallback + script = section.get("script") + if script: + try: + subprocess.run( + ["pkill", "-f", str(Path(script).name)], + check=False, + ) + except Exception: + pass + + # Aggressive kill mode: try to kill by common MCP script patterns + if kill_all: + patterns = [ + "ai_builder_mcp_hotel_finder.py", + "ai_builder_mcp_flights.py", + "simple_video_mcp_server.py", + "image_mcp_new.py", + "image_mcp.py", + ] + for pat in patterns: + try: + subprocess.run(["pkill", "-f", pat], check=False) + except Exception: + pass + + print( + f"Stopped {killed} process(es). All selected MCP servers stopped." + ) + + +def download_mcp_servers(targets: List[Tuple[str, Dict]]) -> None: + """Download MCP server scripts from AI Assistant Builder GitHub to their configured paths. + + Args: + targets: List of (server_name, config_dict) tuples to download. + """ + downloaded = 0 + skipped = 0 + + for name, section in targets: + script = section.get("script") + if not script: + continue + + script_path = Path(script) + + # Try to download + if download_script_if_missing(name, script_path): + downloaded += 1 + else: + skipped += 1 + + print( + f"\nDownload complete: {downloaded} downloaded, {skipped} skipped" + ) + + +def main(): + """Main entry point for managing MCP servers. + + Provides command-line interface for starting, stopping, and downloading + MCP server scripts. + """ + import argparse + + parser = argparse.ArgumentParser( + description=( + "Start or stop MCP servers defined in config/mcp_config.yaml" + ), + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--stop", + action="store_true", + help="Stop the selected servers instead of starting them", + ) + parser.add_argument( + "--kill", + action="store_true", + help=( + "Aggressive mode when used with --stop: kill by ports and known" + " MCP process patterns" + ), + ) + parser.add_argument( + "--only", + nargs="*", + help=( + "Names of MCP servers to operate on (default: all with a" + " script entry)" + ), + ) + parser.add_argument( + "--download", + action="store_true", + help=( + "Download MCP server scripts from GitHub to paths specified" + " in config" + ), + ) + + args = parser.parse_args() + + cfg = load_config() + if not cfg: + sys.exit(1) + + targets = select_targets(cfg, args.only) + if not targets: + print("No MCP servers matched the selection.") + sys.exit(1) + + if args.download: + download_mcp_servers(targets) + return + + if args.stop: + stop_mcp_servers(targets, kill_all=args.kill) + return + + started: List[str] = [] + failed: List[str] = [] + for name, section in targets: + if start_mcp_server(name, section): + started.append(name) + else: + failed.append(name) + + if started: + print(f"\nSuccessfully started MCP servers: {', '.join(started)}") + if failed: + print(f"Failed to start MCP servers: {', '.join(failed)}") + print(f"\nLogs are in {LOG_DIR}/\n") + + +if __name__ == "__main__": + main() diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/start_ui.py b/ai_ref_kits/agentic_multimodal_travel_planer/start_ui.py new file mode 100644 index 00000000..b5bee583 --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/start_ui.py @@ -0,0 +1,617 @@ +#!/usr/bin/env python3 +"""Gradio UI for the agentic multimodal travel planner. + +This module provides a web-based user interface for interacting with the +travel router agent, supporting both text queries and image analysis through +a chat interface. +""" + +import asyncio +import concurrent.futures +import os +import traceback +from pathlib import Path + +import gradio as gr +from beeai_framework.adapters.a2a.agents.agent import A2AAgent +from beeai_framework.memory import UnconstrainedMemory +from dotenv import load_dotenv +from utils.util import ( + extract_agent_handoffs_from_log, + run_async_in_thread, + save_uploaded_image, +) + + +class TravelRouterClient: + """UI client that connects to the travel_router agent. + + This client manages the connection to the travel router agent and handles + query processing through the A2A (Agent-to-Agent) protocol. + + Attributes: + client: A2AAgent instance for communication. + initialized: Boolean flag indicating connection status. + memory: Memory instance for maintaining conversation history. + """ + + def __init__(self): + """Initialize the TravelRouterClient.""" + self.client = None + self.initialized = False + self.memory = UnconstrainedMemory() + + async def initialize(self): + """Initialize connection to the travel_router agent. + + Returns: + True if connection successful, False otherwise. + """ + print("Initializing connection to Travel Router...") + load_dotenv() + + try: + # Connect to the running travel_router agent + travel_router_port = os.getenv("TRAVEL_ROUTER_PORT", "9996") + travel_router_url = f"http://127.0.0.1:{travel_router_port}" + + print(f"πŸ”— Connecting to Travel Router at {travel_router_url}") + + # Create A2A client to connect to the travel_router + self.client = A2AAgent(url=travel_router_url, memory=self.memory) + print("βœ… Connected to Travel Router successfully") + + self.initialized = True + return True + + except Exception as e: + print(f"Failed to initialize Travel Router Agent: {e}") + print(f"Traceback: {traceback.format_exc()}") + return False + + async def chat(self, query: str) -> str: + """Process a query through the travel router. + + Args: + query: User query text to process. + + Returns: + Response text from the agent. + """ + if not self.initialized: + await self.initialize() + + if not self.initialized: + return ( + "Error: Travel Router not available. Please check the agent " + "configuration." + ) + + try: + # Call the travel router client + response = await self.client.run(query) + + # Debug: print response structure + print(f"Response type: {type(response)}") + print( + f"Response attrs: " + f"{[a for a in dir(response) if not a.startswith('_')]}" + ) + + return self._extract_response_text(response) + + except Exception as e: + print(f"Error with router agent: {e}") + print(f"Error traceback: {traceback.format_exc()}") + return f"Error processing request: {e}" + + def _extract_response_text(self, response) -> str: + """Return the first text-like value available on the response.""" + + def _as_text(value): + if value is None: + return None + if isinstance(value, str): + return value + if isinstance(value, (list, tuple)): + for item in reversed(value): + text = _as_text(item) + if text: + return text + return None + for attr in ("text", "content", "final_answer", "output"): + if hasattr(value, attr): + text = _as_text(getattr(value, attr)) + if text: + return text + return None + + sources = ( + ( + "result.final_answer", + getattr( + getattr(response, "result", None), + "final_answer", + None, + ), + ), + ( + "state.final_answer", + getattr( + getattr(response, "state", None), + "final_answer", + None, + ), + ), + ("final_answer", getattr(response, "final_answer", None)), + ("output", getattr(response, "output", None)), + ("last_message", getattr(response, "last_message", None)), + ("text", getattr(response, "text", None)), + ("messages[-1]", getattr(response, "messages", None)), + ) + + for label, candidate in sources: + text = _as_text(candidate) + if text: + print(f"Found in {label}: {text[:100]}...") + return text + + fallback = str(response) + print(f"Fallback to str: {fallback[:100]}...") + return fallback if fallback else "No response received" + + +# Example prompts for the UI +EXAMPLES = [ + "Which city is shown in this image?", + "Describe what's in this image", + "How many people are in this photo?", + "What is this building?", + "Describe the scene in detail", + "Give me flights from New York to Paris for March 1st to March 10th", + "Give me hotels in Paris for March 1st to March 10th for 2 guests", + ( + "Give me flights from New York to Paris for March 1st to March 10th " + "in business class" + ), +] + +# Global state variables +chatbox_msg = [] +stop_requested = False +current_image_path = None +log_cache = {} # Cache for log file positions +workflow_steps = [] # Track workflow steps + + +def initialize_travel_router_client(): + """Initialize the travel router client connection. + + Returns: + Initialized TravelRouterClient instance or None on failure. + """ + print("πŸ”„ Initializing Travel Router Client...") + try: + travel_router_port = os.getenv("TRAVEL_ROUTER_PORT", "9996") + print( + f"πŸ”— Connecting to Travel Router at " + f"http://127.0.0.1:{travel_router_port}" + ) + + client = TravelRouterClient() + + # Initialize the client asynchronously + try: + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(run_async_in_thread, client.initialize) + future.result() + except RuntimeError: + # No running event loop, we can use asyncio.run directly + asyncio.run(client.initialize()) + + print("βœ… Travel Router Client initialized") + return client + except Exception as e: + print(f"❌ Failed to initialize Travel Router Client: {e}") + traceback.print_exc() + return None + + +# Initialize travel router client +travel_router_client = initialize_travel_router_client() + + +async def image_captioning(image_input): + """Process and save uploaded image for analysis. + + Args: + image_input: Image input from Gradio (file path or numpy array). + + Returns: + Status message string. + """ + project_root = Path(__file__).parent.parent + tmp_dir = project_root / "tmp_files" + + try: + saved_image_path = save_uploaded_image( + image_input=image_input, + destination_dir=tmp_dir, + prefix="caption_image", + ) + print(f"βœ… Image saved to {saved_image_path}", flush=True) + except Exception as exc: + return str(exc) + + global current_image_path + current_image_path = str(saved_image_path) + return "βœ… Image uploaded successfully" + + +def stop_query(): + """Stop the current query execution. + + Returns: + Tuple of (status message, chatbox messages, send_btn state, + stop_btn state, clear_btn state). + """ + global stop_requested + stop_requested = True + return ( + "πŸ›‘ Stopping query...", + chatbox_msg, + gr.update(interactive=True), # send_btn enabled + gr.update(interactive=False), # stop_btn disabled + gr.update(interactive=True), # clear_btn enabled + ) + + +def add_workflow_step(step_text): + """Add a step to the workflow display. + + Args: + step_text: Text describing the workflow step. + """ + global workflow_steps + import datetime + timestamp = datetime.datetime.now().strftime("%H:%M:%S") + workflow_steps.append(f"{timestamp} | {step_text}") + # Keep only last 20 steps + if len(workflow_steps) > 20: + workflow_steps.pop(0) + + +def extract_agent_handoffs(): + """Extract agent handoff events from travel_router log.""" + logs_dir = Path(__file__).parent / "logs" + travel_router_log = logs_dir / "travel_router.log" + return extract_agent_handoffs_from_log(travel_router_log, log_cache) + + +def read_latest_logs(): + """Display workflow steps. + + Returns: + Formatted markdown string with workflow steps. + """ + global workflow_steps + + if not workflow_steps: + return "### πŸ€– Agent Workflow\n\n_Waiting for activity..._" + + # Format output + log_content = "### πŸ€– Agent Workflow\n\n" + log_content += '\n\n'.join(workflow_steps) + + return log_content + + +def reset_all_agent_memories(): + """Reset memory for travel router and all supervised agents.""" + global travel_router_client + # Reinitialize the travel router client to create a fresh session + # This creates a new A2AAgent with fresh memory, effectively starting + # a new conversation session with the server-side agent + if travel_router_client: + try: + # Create new memory instance + travel_router_client.memory = UnconstrainedMemory() + # Reinitialize the A2AAgent client with fresh memory + travel_router_port = os.getenv("TRAVEL_ROUTER_PORT", "9996") + travel_router_url = f"http://127.0.0.1:{travel_router_port}" + travel_router_client.client = A2AAgent( + url=travel_router_url, + memory=travel_router_client.memory + ) + print("βœ… Travel Router client reinitialized with fresh memory") + except Exception as e: + print(f"⚠️ Error reinitializing client: {e}") + print("βœ… All agent memories cleared") + + +def clear_all(): + """Clear all chat history and reset state. + + Returns: + Tuple of (empty log window, empty chatbox). + """ + global chatbox_msg, travel_router_client, stop_requested + global current_image_path, workflow_steps + # Clear chatbox messages and memory + chatbox_msg = [] + stop_requested = False + current_image_path = None + workflow_steps = [] + # Reset agent memories + try: + reset_all_agent_memories() + except Exception as e: + print(f"⚠️ Error resetting memories: {e}") + + return read_latest_logs(), chatbox_msg + + +async def run_agent_workflow(query: str): + """Execute agent workflow for user query. + + Args: + query: User query text. + + Yields: + Tuple of (log window text, chatbox messages, input field text, + send_btn state, stop_btn state, clear_btn state). + """ + global chatbox_msg, travel_router_client, stop_requested + global current_image_path, workflow_steps, log_cache + + # Clear workflow steps for new query + workflow_steps = [] + + # Mark current log position to only capture new handoffs + logs_dir = Path(__file__).parent / "logs" + travel_router_log = logs_dir / "travel_router.log" + if travel_router_log.exists(): + cache_key = str(travel_router_log) + try: + current_size = travel_router_log.stat().st_size + log_cache[cache_key] = { + 'position': current_size, + 'seen_handoffs': set() + } + except Exception: + pass + + # Create enhanced query that includes image path if available + enhanced_query = query + if current_image_path: + # Include a clear text hint so router can parse image_path + enhanced_query = f"{query} : = <{current_image_path}> " + add_workflow_step("πŸ“Έ Image included in query") + + chatbox_msg.append({"role": "user", "content": enhanced_query}) + stop_requested = False + + if travel_router_client is None or not travel_router_client.initialized: + add_workflow_step("❌ Travel Router not available") + yield ( + read_latest_logs(), + chatbox_msg, + "", + gr.update(interactive=True), # send_btn enabled + gr.update(interactive=False), # stop_btn disabled + gr.update(interactive=True), # clear_btn enabled + ) + return + + try: + add_workflow_step("πŸ“€ Sending query to Travel Router") + # Disable send and clear, enable stop + yield ( + read_latest_logs(), + chatbox_msg, + "", + gr.update(interactive=False), # send_btn disabled + gr.update(interactive=True), # stop_btn enabled + gr.update(interactive=False), # clear_btn disabled + ) + + msg_text = f"Sending query to Travel Router: {enhanced_query}" + print(msg_text, flush=True) + + add_workflow_step("πŸ€” Travel Router is processing...") + yield ( + read_latest_logs(), + chatbox_msg, + "", + gr.update(interactive=False), # send_btn disabled + gr.update(interactive=True), # stop_btn enabled + gr.update(interactive=False), # clear_btn disabled + ) + + # Create task for chat and monitor for handoffs + chat_task = asyncio.create_task( + travel_router_client.chat(enhanced_query) + ) + + # Poll for handoffs while waiting for response + while not chat_task.done(): + await asyncio.sleep(0.2) # Poll more frequently + new_handoffs = extract_agent_handoffs() + if new_handoffs: + for handoff in new_handoffs: + add_workflow_step(handoff) + yield ( + read_latest_logs(), + chatbox_msg, + "", + gr.update(interactive=False), # send_btn disabled + gr.update(interactive=True), # stop_btn enabled + gr.update(interactive=False), # clear_btn disabled + ) + + # Get the final response + response = await chat_task + # Remove trailing colon if present (UI formatting issue) + if isinstance(response, str) and response.rstrip().endswith(':'): + response = response.rstrip()[:-1].rstrip() + + # Give it a moment for logs to flush + await asyncio.sleep(0.3) + + # Check for any final handoffs multiple times + for _ in range(3): + final_handoffs = extract_agent_handoffs() + if final_handoffs: + for handoff in final_handoffs: + add_workflow_step(handoff) + yield ( + read_latest_logs(), + chatbox_msg, + "", + gr.update(interactive=False), # send_btn disabled + gr.update(interactive=True), # stop_btn enabled + gr.update(interactive=False), # clear_btn disabled + ) + await asyncio.sleep(0.1) + + add_workflow_step("βœ… Received response from Travel Router") + + # Add response to chat history + chatbox_msg.append({"role": "assistant", "content": response}) + + # Return final result - re-enable send and clear, disable stop + yield ( + read_latest_logs(), + chatbox_msg, + "", + gr.update(interactive=True), # send_btn enabled + gr.update(interactive=False), # stop_btn disabled + gr.update(interactive=True), # clear_btn enabled + ) + + except Exception as e: + error_msg = f"Error communicating with Travel Router: {e}" + add_workflow_step(f"❌ Error: {str(e)[:50]}") + chatbox_msg.append({"role": "assistant", "content": error_msg}) + # Re-enable send and clear, disable stop + yield ( + read_latest_logs(), + chatbox_msg, + "", + gr.update(interactive=True), # send_btn enabled + gr.update(interactive=False), # stop_btn disabled + gr.update(interactive=True), # clear_btn enabled + ) + + +def create_gradio_interface(): + """Create and configure the Gradio interface. + + Returns: + Configured Gradio Blocks interface. + """ + # Set Gradio temp directory and env var + gradio_temp_dir = Path(__file__).parent.parent / "temp" + gradio_temp_dir.mkdir(exist_ok=True) + os.environ["GRADIO_TEMP_DIR"] = str(gradio_temp_dir) + + demo = gr.Blocks( + theme=gr.themes.Soft(), + css=".disclaimer{font-variant-caps:all-small-caps;}", + ) + with demo: + # Title + gr.Markdown( + "

Multi-Agent Travel Assistant πŸ€–

" + ) + gr.Markdown( + "
Powered by OpenVINO + MCP + A2A Tools
" + ) + + with gr.Row(): + # Left Column: Image Upload + Status + with gr.Column(scale=2): + image_file = gr.Image( + value=None, + label="Upload or choose an image", + interactive=True, + ) + status = gr.Textbox( + "Ready", interactive=False, show_label=False, lines=3 + ) + + # Right Column: Chat UI + with gr.Column(scale=2): + chatbot = gr.Chatbot( + label="Conversation", height=500, type="messages" + ) + with gr.Row(): + msg = gr.Textbox( + placeholder="Type your message…", + show_label=False, + container=False, + interactive=True, + ) + with gr.Row(): + stop_btn = gr.Button("Stop", interactive=False) + clr_btn = gr.Button("Clear") + send_btn = gr.Button("Send", variant="primary") + gr.Examples( + EXAMPLES, inputs=[msg], label="Click example, then Send" + ) + with gr.Column(scale=2): + log_window = gr.Markdown( + value=read_latest_logs(), + label="Agent Workflow", + height=300 + ) + + # Register listeners + image_file.change( + fn=image_captioning, inputs=[image_file], outputs=[status] + ) + send_btn.click( + fn=run_agent_workflow, + inputs=[msg], + outputs=[log_window, chatbot, msg, send_btn, stop_btn, clr_btn], + ) + msg.submit( + fn=run_agent_workflow, + inputs=[msg], + outputs=[log_window, chatbot, msg, send_btn, stop_btn, clr_btn], + ) + stop_btn.click( + fn=stop_query, + inputs=[], + outputs=[log_window, chatbot, send_btn, stop_btn, clr_btn] + ) + clr_btn.click(fn=clear_all, inputs=[], outputs=[log_window, chatbot]) + + return demo + + +def main(): + """Main entry point for the Gradio UI application.""" + try: + # Get port from environment variable or use default + port = int(os.getenv("GRADIO_SERVER_PORT", 7860)) + print(f"🌐 Starting on port {port}") + + demo = create_gradio_interface() + demo.launch( + server_name="0.0.0.0", + server_port=port, + inbrowser=True, + share=False, + debug=False, + show_error=True, + quiet=False + ) + except Exception as e: + print(f"❌ Error launching Gradio interface: {e}") + print("πŸ”§ If issues persist, check if port 7860 is available") + traceback.print_exc() + + +if __name__ == "__main__": + main() diff --git a/ai_ref_kits/agentic_multimodal_travel_planer/utils/util.py b/ai_ref_kits/agentic_multimodal_travel_planer/utils/util.py new file mode 100644 index 00000000..26828d43 --- /dev/null +++ b/ai_ref_kits/agentic_multimodal_travel_planer/utils/util.py @@ -0,0 +1,455 @@ +""" +Simple configuration loader for agents. +""" + +import asyncio +import os +import shutil +import socket +import subprocess +import time +import yaml +from datetime import datetime +from pathlib import Path +from typing import Awaitable, Callable, Dict, Iterable +from urllib.parse import urlparse + +import requests +from PIL import Image + +def validate_llm_endpoint(api_base, timeout=5): + """Validate if the LLM API endpoint is accessible""" + try: + # For OpenVINO Model Server, try the models endpoint + models_url = f"{api_base.rstrip('/')}/models" + response = requests.get(models_url, timeout=timeout) + if response.status_code == 200: + return True, "OpenVINO Model Server is accessible" + except requests.exceptions.RequestException: + pass + + try: + # Try the base URL + response = requests.get(api_base, timeout=timeout) + if response.status_code in [200, 404, 405]: # 405 Method Not Allowed is also OK + return True, "LLM endpoint is accessible" + except requests.exceptions.RequestException: + pass + + try: + # For OpenAI-compatible APIs, try the health endpoint + health_url = f"{api_base.rstrip('/')}/health" + response = requests.get(health_url, timeout=timeout) + if response.status_code == 200: + return True, "LLM endpoint is healthy" + except requests.exceptions.RequestException: + pass + + # Check if the server is at least responding on the port + try: + # Extract host and port from api_base using urlparse + parsed = urlparse( + api_base if '://' in api_base else f'http://{api_base}' + ) + host = parsed.hostname or 'localhost' + port = parsed.port or (443 if parsed.scheme == 'https' else 80) + + # Convert 0.0.0.0 to localhost for socket check + if host == '0.0.0.0': + host = 'localhost' + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + result = sock.connect_ex((host, port)) + sock.close() + + if result == 0: + return True, f"Server is running on {host}:{port}" + except Exception: + pass + + return False, "LLM endpoint not accessible" + + +def _create_tool(tool_config, agent_config=None): + """Create a tool instance based on configuration""" + from beeai_framework.tools.think import ThinkTool + from beeai_framework.tools.weather import OpenMeteoTool + + tool_map = { + 'ThinkTool': ThinkTool, + 'OpenMeteoTool': OpenMeteoTool, + 'FinalAnswerTool': None, # FinalAnswerTool is built into RequirementAgent + } + + tool_name = tool_config['name'] + + # Handle Bridge Tower tools + if tool_name in ['MultimodalSearchTool', 'ImageTextRetrievalTool', 'VideoContentSearchTool']: + if tool_config.get('enabled', True) and agent_config: + try: + from utils.bridgetower_tools import create_bridgetower_tools + bridgetower_tools = create_bridgetower_tools(agent_config) + # Return the specific tool requested + for tool in bridgetower_tools: + if tool.name == tool_name: + return tool + except ImportError as e: + print(f"⚠️ Bridge Tower tools not available: {e}") + return None + + # Handle standard tools + if tool_name in tool_map and tool_config.get('enabled', True): + if tool_map[tool_name] is None: + # FinalAnswerTool is built into RequirementAgent, return a marker + return 'FinalAnswerTool' + return tool_map[tool_name]() + elif tool_name == 'HandoffTool' and tool_config.get('auto_discovered', False): + # HandoffTools will be created dynamically in the supervisor agent + return None + return None + + +def _create_handoff_tools(supervised_agents_config): + """Create HandoffTools for supervised agents""" + import requests + from beeai_framework.tools.handoff import HandoffTool + from beeai_framework.adapters.a2a.agents.agent import A2AAgent + from beeai_framework.memory import UnconstrainedMemory + + handoff_tools = [] + + for agent_config in supervised_agents_config: + agent_name = agent_config['name'] + agent_url = agent_config['url'] + + # Override URL with environment variable if available + port_env_var = agent_config.get('port_env_var') + if port_env_var and port_env_var in os.environ: + port = os.environ[port_env_var] + base_url = agent_url.rsplit(':', 1)[0] # Remove port + agent_url = f"{base_url}:{port}" + + try: + print(f"πŸ”— Testing connection to {agent_name} at {agent_url}") + + # Test connectivity first + agent_card_url = f"{agent_url}/.well-known/agent-card.json" + response = requests.get(agent_card_url, timeout=3) + + if response.status_code != 200: + print(f"⚠️ Agent {agent_name} not available (HTTP {response.status_code}), skipping HandoffTool creation") + continue + + # Get agent card for description + base_description = f"Consult {agent_name} for specialized tasks via A2A protocol." + try: + agent_card = response.json() + base_description = agent_card.get('description', base_description) + print(f"βœ… Found agent card for {agent_name}") + except Exception: + print(f"⚠️ Invalid agent card for {agent_name}, using default description") + + # Add format requirements to description based on agent type + if agent_name == "flight_finder": + description = ( + f"{base_description} REQUIRES structured input with keys: " + "from, to, departure_date, class. Example: " + "{'from': 'Toronto', 'to': 'Rome', 'departure_date': " + "'2025-12-15', 'class': 'economy'}. " + "DO NOT use {'task': '...'} format." + ) + elif agent_name == "hotel_finder": + description = ( + f"{base_description} REQUIRES structured input with keys: " + "destination, check_in_date, check_out_date, guests. " + "Example: {'destination': 'Paris', 'check_in_date': " + "'2025-12-20', 'check_out_date': '2025-12-25', 'guests': 2}. " + "DO NOT use {'task': '...'} format." + ) + else: + description = base_description + + # Create A2A agent connection + a2a_agent = A2AAgent(url=agent_url, memory=UnconstrainedMemory()) + + # Create HandoffTool + handoff_tool = HandoffTool( + a2a_agent, + name=agent_name, + description=description + ) + + handoff_tools.append(handoff_tool) + print(f"βœ… Created HandoffTool for {agent_name}") + + except requests.RequestException as e: + print( + f"⚠️ Agent {agent_name} not reachable ({str(e)}), " + "skipping HandoffTool creation" + ) + except Exception as e: + print(f"❌ Failed to create HandoffTool for {agent_name}: {e}") + + if not handoff_tools: + print("⚠️ No specialized agents available - router will use fallback knowledge") + print("⚠️ Returning empty handoff tools list to force direct responses") + + return handoff_tools + + +def load_config(agent_name: str): + """ + Load configuration for a specific agent from YAML files and set environment variables. + + Args: + agent_name: Name of the agent (e.g., 'hotel_finder', 'flight_finder', 'video_search') + + Returns: + dict: Configuration dictionary with all needed values + """ + config_dir = Path(__file__).parent.parent / "config" + + # Load agent config + with open(config_dir / "agents_config.yaml", 'r') as f: + agents_config = yaml.safe_load(f) + + # Load prompts config + with open(config_dir / "agents_prompts.yaml", 'r') as f: + prompts_config = yaml.safe_load(f) + + # Get agent specific config + agent_config = agents_config[agent_name] + agent_llm_config = agent_config['llm'] + + # Use config values directly (no environment variable overrides) + port = agent_config['port'] + llm_model = agent_llm_config['model'] + llm_temperature = agent_llm_config['temperature'] + api_base = agent_llm_config['api_base'] + api_key = agent_llm_config['api_key'] + + # Validate LLM endpoint before proceeding + print(f"Validating LLM endpoint: {api_base}") + is_valid, message = validate_llm_endpoint(api_base) + + if not is_valid: + print(f"{message}") + print(f"Please check if your LLM server is running on {api_base}") + raise ConnectionError(f"LLM endpoint validation failed: {message}") + else: + print(f"{message}") + + # Set environment variables for the agent + os.environ["OPENAI_API_KEY"] = api_key + os.environ["OPENAI_API_BASE"] = api_base + + # Create tools from configuration + tools = [] + tools_config = agent_config.get('tools', []) + + # Handle tools as list (standard format) + if isinstance(tools_config, list): + for tool_config in tools_config: + tool = _create_tool(tool_config, agent_config) + if tool: + tools.append(tool) + # Handle tools as dict (legacy MCP format) - skip for now, will be handled externally + elif isinstance(tools_config, dict): + print(f"Detected MCP tools configuration - tools will be provided externally") + pass + + # If this is a supervisor agent, create HandoffTools for supervised agents + if 'supervised_agents' in agent_config: + handoff_tools = _create_handoff_tools(agent_config['supervised_agents']) + tools.extend(handoff_tools) + + # Get prompt and inject today's date for flight_finder + prompt = prompts_config[f'{agent_name}_prompt'].strip() + if agent_name == 'flight_finder': + today_date = datetime.now().strftime('%Y-%m-%d') + prompt = prompt.replace('[TODAY\'S DATE]', today_date) + + # Build the return config + config = { + 'port': port, + 'llm_model': llm_model, + 'llm_temperature': llm_temperature, + 'api_base': api_base, + 'api_key': api_key, + 'name': agent_config['name'], + 'description': agent_config['description'], + 'prompt': prompt, + 'middleware': agent_config['middleware'], + 'memory_size': agent_config['memory_size'], + 'tools': tools, + 'is_supervisor': 'supervised_agents' in agent_config, + 'mcp_config': agent_config.get('mcp_config'), # Add MCP configuration + 'requirements': agent_config.get('requirements', []) + } + + # Only add supervised_agents if it exists in the original config + if 'supervised_agents' in agent_config: + config['supervised_agents'] = agent_config['supervised_agents'] + + return config + + +def is_port_in_use(port: int) -> bool: + """Check if a port is currently in use. + + Args: + port: The port number to check. + + Returns: + True if the port is in use, False otherwise. + """ + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(0.5) + return sock.connect_ex(("127.0.0.1", port)) == 0 + except OSError: + return False + + +def kill_processes_on_port(port: int) -> None: + """Kill any processes using the specified port. + + Args: + port: The port number to clear. + """ + try: + result = subprocess.run( + ["lsof", "-t", f"-i:{port}"], + capture_output=True, + text=True, + check=False, + ) + if result.stdout.strip(): + pids = result.stdout.strip().split("\n") + for pid in pids: + if pid: + subprocess.run(["kill", "-9", pid], check=False) + print(f"Killed process {pid} on port {port}") + except FileNotFoundError: + # lsof not available; best effort skip + pass + + +def run_async_in_thread( + coro_factory: Callable[..., Awaitable], *args, **kwargs +): + """Run an async callable inside a fresh event loop on a worker thread.""" + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + coroutine = coro_factory(*args, **kwargs) + return new_loop.run_until_complete(coroutine) + finally: + new_loop.close() + + +def _path_within_directory(path: Path, directory: Path) -> bool: + """Safely check that path resides within directory.""" + try: + path.relative_to(directory) + return True + except ValueError: + common = os.path.commonpath([str(path), str(directory)]) + return common == str(directory) + + +def save_uploaded_image(image_input, destination_dir, prefix="caption_image"): + """Persist uploaded/cached image input into destination_dir.""" + destination_dir = Path(destination_dir) + destination_dir.mkdir(parents=True, exist_ok=True) + + timestamp = int(time.time()) + saved_image_path = destination_dir / f"{prefix}_{timestamp}.jpg" + + if isinstance(image_input, str): + source_path = Path(image_input) + try: + source_resolved = source_path.resolve() + dest_resolved = destination_dir.resolve() + except Exception as exc: + raise ValueError( + f"Error: Could not resolve path: {image_input} ({exc})" + ) from exc + + if not _path_within_directory(source_resolved, dest_resolved): + raise ValueError(f"Error: Unsafe file path: {image_input}") + + if not source_path.exists(): + raise FileNotFoundError( + f"Error: Image file not found at {image_input}" + ) + + shutil.copy2(source_path, saved_image_path) + return saved_image_path + + if hasattr(image_input, "shape"): + try: + pil_image = Image.fromarray(image_input) + pil_image.save(saved_image_path) + return saved_image_path + except Exception as exc: + raise RuntimeError( + f"Error processing uploaded image: {exc}. " + "Please ensure PIL (Pillow) is installed." + ) from exc + + raise TypeError( + f"Error: Unsupported image input type: {type(image_input)}" + ) + + +def extract_agent_handoffs_from_log( + log_path, cache: Dict[str, Dict[str, Iterable]] +): + """Parse agent handoff events from a log file using a cache.""" + log_path = Path(log_path) + if not log_path.exists(): + return [] + + cache_key = str(log_path) + cache.setdefault(cache_key, {"position": 0, "seen_handoffs": set()}) + + new_steps = [] + entry = cache[cache_key] + + try: + file_size = log_path.stat().st_size + last_position = entry["position"] + seen_handoffs = entry["seen_handoffs"] + + if file_size > last_position: + with open( + log_path, "r", encoding="utf-8", errors="ignore" + ) as handle: + handle.seek(last_position) + for line in handle: + line = line.strip() + if "--> πŸ” HandoffTool[" in line: + parts = line.split("HandoffTool[")[1].split("]") + agent_name = parts[0] + handoff_id = f"{agent_name}_start" + if handoff_id not in seen_handoffs: + new_steps.append( + f"πŸ”„ Delegating to {agent_name}..." + ) + seen_handoffs.add(handoff_id) + elif "<-- πŸ” HandoffTool[" in line: + parts = line.split("HandoffTool[")[1].split("]") + agent_name = parts[0] + handoff_id = f"{agent_name}_complete" + if handoff_id not in seen_handoffs: + new_steps.append(f"βœ… {agent_name} completed") + seen_handoffs.add(handoff_id) + + entry["position"] = handle.tell() + except Exception: + pass + + return new_steps +