Skip to content
Open

Mcp #13

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added examples/MCP/business_data.db
Binary file not shown.
Empty file.
140 changes: 140 additions & 0 deletions examples/MCP/echo_server_stdio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# examples/MCP/echo_server_stdio.py
import asyncio
import logging
import os
import sys # For sys.stdout in logging handlers
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.types import Tool, TextContent, Resource
import mcp.server.stdio # For stdio_server context manager

# --- Logging Setup ---
# Ensure the 'logs' directory exists in the same directory as this script,
# or adjust the log_file_path.
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE_PATH = os.path.join(LOG_DIR, 'echo_server_stdio.log')

# Configure logging to write to a file and also to stdout (for direct runs)
logging.basicConfig(
level=logging.DEBUG, # Set to DEBUG for verbose output
format="%(asctime)s - ECHO_SRV_STDIO - %(process)d - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
handlers=[
logging.FileHandler(LOG_FILE_PATH, mode='w'), # Overwrite log file on each start
logging.StreamHandler(sys.stdout) # Log to console as well
]
)
logger = logging.getLogger("echo_mcp_stdio_srv")

# --- Server Configuration ---
ECHO_PREFIX = os.getenv("ECHO_PREFIX", "[EchoStdioDefault]")

# Create the MCP Server instance
# The server name here is for identification within this process,
# the actual name presented to clients is in InitializationOptions.
server = Server("EchoStdioServerExampleInstance")
logger.debug(f"MCP Server object '{server.name}' created.")

# --- Tool Definitions ---
@server.list_tools()
async def list_tools_impl() -> list[Tool]:
logger.debug("Handler: list_tools_impl called.")
return [
Tool(
name="echo",
description="Echoes the input message with a prefix.",
inputSchema={
"type": "object",
"properties": {
"message": {"type": "string", "description": "The message to echo."}
},
"required": ["message"],
}
)
]

@server.call_tool()
async def call_tool_impl(name: str, args: dict | None) -> list[TextContent]:
logger.debug(f"Handler: call_tool_impl called with tool_name='{name}', args={args!r}")
if name == "echo":
message_to_echo = args.get("message", "No message provided.") if args else "No message provided."
response_text = f"{ECHO_PREFIX} {message_to_echo}"
logger.info(f"Tool 'echo' responding with: '{response_text}'")
return [TextContent(type="text", text=response_text)]
logger.warning(f"Handler: call_tool_impl received unknown tool name: '{name}'")
# According to MCP spec, should ideally raise an error or return an error in CallToolResult.
# For simplicity, returning empty list for unknown tools.
return []

# --- Resource Definitions ---
@server.list_resources()
async def list_resources_impl() -> list[Resource]:
logger.debug("Handler: list_resources_impl called.")
return [
Resource(
uri="echo://status",
name="Server Status",
description="Provides the current status and prefix of the echo server.",
mimeType="text/plain"
)
]

@server.read_resource()
async def read_resource_impl(uri: str) -> str:
logger.debug(f"Handler: read_resource_impl called with uri='{uri}'")
if str(uri) == "echo://status":
status_text = f"Echo server is operational. Current prefix: {ECHO_PREFIX}"
logger.info(f"Resource 'echo://status' responding with: '{status_text}'")
return status_text
logger.warning(f"Handler: read_resource_impl received unknown resource URI: '{uri}'")
raise ValueError(f"Resource not found: {uri}") # MCP server should handle this and convert to error response

# --- Main Server Logic ---
async def main_echo():
"""Initializes and runs the MCP stdio server."""
logger.info(f"Echo MCP stdio Server starting up (prefix: {ECHO_PREFIX}). PID: {os.getpid()}")
try:
# mcp.server.stdio.stdio_server() provides the binary read/write streams for MCP communication
async with mcp.server.stdio.stdio_server() as (binary_reader, binary_writer):
logger.debug("stdio_server context manager entered. Binary reader/writer obtained.")

# Define initialization options for the client
init_options = InitializationOptions(
server_name="EchoStdioSrvFriendlyName", # Name presented to clients
server_version="0.2.0", # Version of this server
capabilities=server.get_capabilities(
notification_options=NotificationOptions(receive=False, send=False), # Example: no notifications
experimental_capabilities={} # **FIX APPLIED HERE**
)
)
logger.debug(f"Server capabilities defined for client: {init_options.capabilities!r}")

# Start the MCP server loop, listening on the provided streams
await server.run(binary_reader, binary_writer, init_options)

# This line is typically only reached if server.run() exits gracefully (e.g., client disconnects cleanly)
logger.info("MCP server.run() completed.")

except asyncio.CancelledError:
logger.info("main_echo task was cancelled (e.g., during shutdown).")
except Exception as e:
# Catch any unexpected errors during server setup or run
logger.critical(f"CRITICAL ERROR in main_echo: {e!r}", exc_info=True)
# Re-raise to ensure the process exits with an error, signaling failure to the parent.
raise
finally:
logger.info("main_echo function finished or exited.")

if __name__ == "__main__":
# This block executes when the script is run directly (e.g., `python echo_server_stdio.py`)
logger.info(f"Executing echo_server_stdio.py directly. Logging to: {LOG_FILE_PATH}")
try:
asyncio.run(main_echo())
except KeyboardInterrupt:
logger.info("echo_server_stdio.py terminated by user (KeyboardInterrupt).")
except Exception as e_main:
# Catch errors from asyncio.run(main_echo()) itself or unhandled exceptions from main_echo
logger.critical(f"Unhandled exception in echo_server_stdio.py __main__ block: {e_main!r}", exc_info=True)
finally:
logger.info("echo_server_stdio.py process exiting.")
150 changes: 150 additions & 0 deletions examples/MCP/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# mcp_chatbot_main.py
import asyncio
import logging
import os
import json
from dotenv import load_dotenv

from tframex import TFrameXApp, OpenAIChatLLM, Message # TFrameXRuntimeContext is used internally by app.run_context()
# from tframex.util.llms import BaseLLMWrapper # Not directly needed for this script

load_dotenv()

from tframex.util.logging import setup_logging
setup_logging(level=logging.INFO)
logging.getLogger("tframex.app").setLevel(logging.INFO)
logging.getLogger("tframex.mcp").setLevel(logging.INFO)
logging.getLogger("tframex.mcp.server_connector").setLevel(logging.INFO)
logging.getLogger("tframex.agents.llm_agent").setLevel(logging.INFO) # Set to DEBUG for tool call details
logging.getLogger("tframex.engine").setLevel(logging.INFO)
logging.getLogger("mcp.client").setLevel(logging.WARNING)

logger = logging.getLogger("TFrameX_MCP_Chatbot")

async def main():
logger.info("--- TFrameX with MCP - Interactive Chatbot Example ---")

# 1. Configure LLM
llm_api_key = os.getenv("OPENAI_API_KEY")
llm_api_base = os.getenv("OPENAI_API_BASE")
llm_model_name = os.getenv("OPENAI_MODEL_NAME")

if not all([llm_api_key, llm_api_base, llm_model_name]):
logger.error("LLM configuration missing in .env. Exiting.")
return

default_llm = OpenAIChatLLM(
model_name=llm_model_name,
api_base_url=llm_api_base,
api_key=llm_api_key
)
logger.info(f"Using LLM: {llm_model_name} at {llm_api_base}")

# 2. Initialize TFrameXApp with MCP configuration
app = TFrameXApp(
default_llm=default_llm,
mcp_config_file="servers_config.json"
)

# MCP servers will be initialized when app.run_context() is entered,
# or can be done explicitly here if needed before agent registration (not typical).
# await app.initialize_mcp_servers() # Optional explicit call

# 3. Define a native TFrameX tool
@app.tool(description="Gets the current date and time.")
async def get_current_datetime() -> str:
from datetime import datetime
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"NATIVE TOOL: get_current_datetime executed, returning: {now_str}")
return now_str

# 4. Define the UniversalAssistant agent
@app.agent(
name="UniversalAssistant",
system_prompt=(
"/no_think You are UniversalAssistant, a helpful AI. You have access to tools for time, MCP server introspection, and specific MCP server functionalities.\n"
"MCP server tools are prefixed (e.g., 'math_http_service__add', 'echo_stdio_service__echo').\n"
"Use 'tframex_read_mcp_resource' to read MCP resources if you know the server_alias and resource_uri.\n"
"Carefully choose tools and provide arguments. Available tools: {available_tools_descriptions}"
),
tools=[ # Native TFrameX tools + MCP meta-tools
"get_current_datetime",
"tframex_list_mcp_servers",
"tframex_list_mcp_resources",
"tframex_read_mcp_resource",
"tframex_list_mcp_prompts",
"tframex_use_mcp_prompt",
],
mcp_tools_from_servers="ALL" # Agent can use tools from ALL connected MCP servers
)
async def universal_assistant_placeholder():
pass # Logic is handled by LLMAgent base class

# 5. Run the built-in interactive chat with the UniversalAssistant
# TFrameXRuntimeContext is created and managed by app.run_context()
async with app.run_context() as rt: # MCP servers will be initialized here.
logger.info("Starting interactive chat with 'UniversalAssistant'.")
logger.info("MCP Servers should connect now if not already.")
logger.info("Try asking about time, math (e.g., '10 + 5'), or echoing (e.g., 'echo hello from stdio').")
logger.info("You can also ask it to 'list mcp servers' or 'list resources from echo_stdio_service'.")

await rt.interactive_chat(default_agent_name="UniversalAssistant")
# The interactive_chat method in TFrameXRuntimeContext will handle the
# user input loop and calling the specified agent.

# Crucial for stdio MCP servers to terminate properly after chat ends
logger.info("Interactive chat finished. Shutting down MCP servers...")
await app.shutdown_mcp_servers()
logger.info("--- TFrameX MCP Chatbot Example Finished ---")

if __name__ == "__main__":
# Setup dummy/example config files if they don't exist
# Ensure 'echo_server_stdio.py' is in the same directory or adjust paths in 'servers_config.json'
if not os.path.exists("echo_server_stdio.py"):
logger.error("echo_server_stdio.py not found. Please create it or update servers_config.json.")
# For the example to run without manual setup, you might choose to exit or simplify servers_config

if not os.path.exists("servers_config.json"):
logger.warning("servers_config.json not found. Creating a dummy config with only math_http_service.")
dummy_config = {
"mcpServers": {
"math_http_service": {"type": "streamable-http", "url": "http://localhost:8000/mcp/"}
# Add echo_stdio_service here if echo_server_stdio.py exists
# "echo_stdio_service": {
# "type": "stdio",
# "command": "python",
# "args": ["./echo_server_stdio.py"], # Assuming it's in the same dir
# "env": {"ECHO_PREFIX": "[EchoStdioFromChat]"}
# }
}
}
# Check again if echo_server_stdio.py exists before adding to dummy config
if os.path.exists("echo_server_stdio.py") and "echo_stdio_service" not in dummy_config["mcpServers"] :
dummy_config["mcpServers"]["echo_stdio_service"] = {
"type": "stdio",
"command": "python", # Assuming python is in PATH
"args": ["./echo_server_stdio.py"], # Path relative to where this script is run
"env": {"ECHO_PREFIX": "[EchoStdioExample]"}
}
else:
logger.warning("echo_server_stdio.py not found, dummy config will not include it.")

with open("servers_config.json", "w") as f:
json.dump(dummy_config, f, indent=2)
logger.info(f"Created/updated dummy servers_config.json: {dummy_config}")

if not os.path.exists(".env"):
logger.warning(".env file not found. Creating a dummy .env. PLEASE UPDATE IT with your actual LLM details.")
with open(".env", "w") as f:
f.write('OPENAI_API_KEY="your_llm_api_key_here"\n')
f.write('OPENAI_API_BASE="your_llm_api_base_here_e.g.http://localhost:8080/v1"\n')
f.write('OPENAI_MODEL_NAME="your_llm_model_name_here"\n')

try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Application terminated by user (KeyboardInterrupt).")
except Exception as e:
logger.critical("Unhandled exception in main asyncio run.", exc_info=True)
finally:
logger.info("Application exiting process.")
26 changes: 26 additions & 0 deletions examples/MCP/servers_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"mcpServers": {
"math_server": {
"type": "streamable-http",
"url": "http://localhost:8000/mcp/"
},
"sqlite_bi_tool": {
"type": "stdio",
"command": "uvx",
"args": ["mcp-server-sqlite", "--db-path", "./business_data.db"],
"env": {}
},
"blender_service": {
"type": "stdio",
"command": "cmd",
"args": [
"/c",
"uvx",
"blender-mcp"
],
"env": {},
"init_step_timeout": 60.0,
"tool_call_timeout": 120.0
}
}
}
Loading