Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added mcp_examples/__init__.py
Empty file.
15 changes: 15 additions & 0 deletions mcp_examples/workflow_nexus_transport/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
This sample demonstrates how to make MCP calls from a workflow, using the MCP Python SDK. The MCP
server can be a standard `stdio` server, or it can be implemented as a Temporal workflow, giving
rise to durable client sessions. The example uses an MCP server with stateful sessions: the
[sequentialthinking](https://github.com/modelcontextprotocol/servers/tree/main/src/sequentialthinking)
reference MCP server, optionally translated as a Python Temporal workflow.

```
temporal operator nexus endpoint create \
--target-namespace default \
--name mcp-sequential-thinking-nexus-endpoint \
--target-task-queue mcp-sequential-thinking-task-queue

uv sync --group=mcp
uv run mcp_examples/workflow_nexus_transport/app.py
```
121 changes: 121 additions & 0 deletions mcp_examples/workflow_nexus_transport/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import asyncio
import json
import uuid
from enum import Enum
from typing import cast

import typer
from mcp import ClientSession, StdioServerParameters
from mcp.types import TextContent
from temporalio import workflow
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.worker import UnsandboxedWorkflowRunner, Worker

from mcp_examples.workflow_nexus_transport.mcp_server_nexus_service import (
MCPServerInput,
MCPServerNexusServiceHandler,
MCPServiceWorkflowBase,
)
from mcp_examples.workflow_nexus_transport.stdio_mcp_server.activity import (
run_stdio_mcp_server,
)
from mcp_examples.workflow_nexus_transport.stdio_mcp_server.workflow import (
MCPStdioClientSessionWorkflow,
)
from mcp_examples.workflow_nexus_transport.workflow_mcp_server.workflow import (
SequentialThinkingMCPServerWorkflow,
)
from mcp_examples.workflow_nexus_transport.workflow_transport import WorkflowTransport

app = typer.Typer()


@workflow.defn
class MCPCallerWorkflow:
@workflow.run
async def run(self, input: MCPServerInput):
transport = WorkflowTransport(
endpoint="mcp-sequential-thinking-nexus-endpoint", input=input
)

async with transport.connect() as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
tools = await session.list_tools()
print(f"Available tools: {[tool.name for tool in tools.tools]}")

tool_input = {
"thought": "To solve a complex problem, I need to break it down into steps",
"thoughtNumber": 1,
"totalThoughts": 3,
"nextThoughtNeeded": True,
}

print(f"\nInput: {json.dumps(tool_input, indent=2)}")

result = await session.call_tool("sequentialthinking", tool_input)
content = cast(list[TextContent], result.content)

print(f"\nOutput: {content[0]}")


class MCPServerType(str, Enum):
stdio = "stdio"
workflow = "workflow"


async def run_caller_workflow(
mcp_server_workflow_cls: type[MCPServiceWorkflowBase],
mcp_server_params: StdioServerParameters | None,
):
mcp_server_input = MCPServerInput(
workflow_name=mcp_server_workflow_cls.__name__,
stdio_server_params=mcp_server_params,
)

client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
async with Worker(
client,
task_queue="mcp-sequential-thinking-task-queue",
workflows=[MCPCallerWorkflow, mcp_server_workflow_cls],
activities=[run_stdio_mcp_server],
nexus_service_handlers=[MCPServerNexusServiceHandler()],
workflow_runner=UnsandboxedWorkflowRunner(),
) as worker:
await client.execute_workflow(
MCPCallerWorkflow.run,
mcp_server_input,
id=str(uuid.uuid4()),
task_queue=worker.task_queue,
)


@app.command()
def main(
mcp_server_type: MCPServerType = typer.Option(
MCPServerType.stdio,
"--mcp-server-type",
help="MCP server type to use: 'stdio' for the official stdio server or 'workflow' for the MCP server implemented as a Temporal workflow",
),
):
match mcp_server_type:
case MCPServerType.stdio:
workflow_cls = MCPStdioClientSessionWorkflow
server_params = StdioServerParameters(
command="npx",
args=["-y", "@modelcontextprotocol/server-sequential-thinking"],
)

case MCPServerType.workflow:
workflow_cls = SequentialThinkingMCPServerWorkflow
server_params = None

asyncio.run(run_caller_workflow(workflow_cls, server_params))


if __name__ == "__main__":
app()
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
A Nexus service that presents the interface of an MCP server.
It is backed by a Temporal workflow.
"""

import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass

import nexusrpc
from mcp import StdioServerParameters
from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
ListToolsResult,
)
from temporalio import nexus, workflow


class MCPServiceWorkflowBase(ABC):
@workflow.update
@abstractmethod
async def list_tools(self, request: ListToolsRequest) -> ListToolsResult: ...

@workflow.update
@abstractmethod
async def call_tool(self, request: CallToolRequest) -> CallToolResult: ...


@dataclass
class MCPServerInput:
workflow_name: str
stdio_server_params: StdioServerParameters | None


@dataclass
class ListToolsInput:
session_token: str
request: ListToolsRequest


@dataclass
class CallToolInput:
session_token: str
request: CallToolRequest


@nexusrpc.service
class MCPServerNexusService:
start: nexusrpc.Operation[MCPServerInput, None]
list_tools: nexusrpc.Operation[ListToolsInput, ListToolsResult]
call_tool: nexusrpc.Operation[CallToolInput, CallToolResult]


@nexusrpc.handler.service_handler(service=MCPServerNexusService)
class MCPServerNexusServiceHandler:
@nexus.workflow_run_operation
async def start(
self, ctx: nexus.WorkflowRunOperationContext, input: MCPServerInput
) -> nexus.WorkflowHandle[None]:
return await ctx.start_workflow(
input.workflow_name,
input, # TODO: workflow shouldn't be passed its own name
id=str(uuid.uuid4()),
task_queue="mcp-sequential-thinking-task-queue",
)

@nexusrpc.handler.sync_operation
async def call_tool(
self, ctx: nexusrpc.handler.StartOperationContext, input: CallToolInput
) -> CallToolResult:
workflow_handle = nexus.WorkflowHandle.from_token(
input.session_token
)._to_client_workflow_handle(nexus.client())
return await workflow_handle.execute_update("call_tool", input.request)

@nexusrpc.handler.sync_operation
async def list_tools(
self, ctx: nexusrpc.handler.StartOperationContext, input: ListToolsInput
) -> ListToolsResult:
workflow_handle = nexus.WorkflowHandle.from_token(
input.session_token
)._to_client_workflow_handle(nexus.client())
return await workflow_handle.execute_update("list_tools", input.request)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from mcp import ClientSession, ListToolsResult, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.types import CallToolRequest, CallToolResult, ListToolsRequest
from temporalio import activity
from temporalio.worker import Worker


@activity.defn
async def run_stdio_mcp_server(params: StdioServerParameters) -> None:
async with stdio_client(params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

@activity.defn(name="list-tools")
async def list_tools(request: ListToolsRequest) -> ListToolsResult:
print(f"🟢 list_tools({request})")
return await session.list_tools()

@activity.defn(name="call-tool")
async def call_tool(request: CallToolRequest) -> CallToolResult:
print(f"🟢 call_tool({request})")
return await session.call_tool(
request.params.name, request.params.arguments
)

worker = Worker(
activity.client(),
task_queue="activity-specific-task-queue",
activities=[list_tools, call_tool],
)
await worker.run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from datetime import timedelta

from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
ListToolsResult,
)
from temporalio import workflow

from mcp_examples.workflow_nexus_transport.mcp_server_nexus_service import (
MCPServerInput,
MCPServiceWorkflowBase,
)

with workflow.unsafe.imports_passed_through():
from mcp_examples.workflow_nexus_transport.stdio_mcp_server.activity import (
run_stdio_mcp_server,
)


@workflow.defn
class MCPStdioClientSessionWorkflow(MCPServiceWorkflowBase):
"""A workflow that acts as an MCP client session, handling tool listing and execution."""

@workflow.run
async def run(self, input: MCPServerInput) -> None:
assert input.stdio_server_params
await workflow.execute_activity(
run_stdio_mcp_server,
input.stdio_server_params,
start_to_close_timeout=timedelta(days=999),
)

@workflow.update
async def list_tools(self, request: ListToolsRequest) -> ListToolsResult:
return await workflow.execute_activity(
"list-tools",
args=[request],
result_type=ListToolsResult,
task_queue="activity-specific-task-queue",
schedule_to_close_timeout=timedelta(seconds=10),
)

@workflow.update
async def call_tool(self, request: CallToolRequest) -> CallToolResult:
return await workflow.execute_activity(
"call-tool",
args=[request],
result_type=CallToolResult,
task_queue="activity-specific-task-queue",
schedule_to_close_timeout=timedelta(seconds=10),
)
Loading