-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmcp.py
More file actions
234 lines (201 loc) · 8.5 KB
/
mcp.py
File metadata and controls
234 lines (201 loc) · 8.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#!/usr/bin/env python3
"""
MCP API module for GNN pipeline job management.
This module integrates the GNN pipeline API capabilities with the MCP
(Multi-Agent Communication Protocol) framework, allowing AI assistants
to interact with the GNN pipeline for submitting, monitoring, and
managing processing jobs.
"""
import json
import logging
from pathlib import Path
from typing import Any, Dict
from .processor import PIPELINE_STEPS, cancel_job, create_job, get_job, list_jobs
logger = logging.getLogger(__name__)
# Basic module metadata
__version__ = "1.0.0"
__description__ = "API module MCP integration for GNN pipeline job management."
__dependencies__ = []
def gnn_submit_job_mcp(target_dir: str, steps: list = None, skip_steps: list = None, verbose: bool = False, strict: bool = False) -> Dict[str, Any]:
"""Submit a GNN pipeline processing job via MCP."""
try:
# Enforce path boundary: target_dir must exist and stay within repo root
target_path = Path(target_dir)
if not target_path.exists():
repo_root = Path(__file__).parent.parent.parent
target_path = repo_root / target_dir
if not target_path.exists():
return {"status": "error", "message": f"Target directory not found: {target_dir}"}
repo_root = Path(__file__).parent.parent.parent.resolve()
try:
target_path.resolve().relative_to(repo_root)
except ValueError:
return {"status": "error", "message": f"Target directory must be within the repository root: {target_dir}"}
job_id = create_job(target_dir=str(target_path), steps=steps, skip_steps=skip_steps, verbose=verbose, strict=strict)
# We need to trigger async execution somehow, but we are in a sync wrapper.
# Since we use an external process invocation in create_job_async,
# we can use subprocess directly here to initiate it optionally, or
# rely on the API server running. We will return the job_id and instructions.
# Alternatively, we just return the job_id. The user can start the server.
return {"status": "success", "job_id": job_id, "message": "Job created. Note: async execution requires the API server to be running."}
except Exception as e:
logger.error(f"Failed to submit job via MCP: {e}")
return {"status": "error", "message": str(e)}
def gnn_get_job_status_mcp(job_id: str) -> Dict[str, Any]:
"""Retrieve the status of a GNN pipeline job via MCP."""
try:
job = get_job(job_id)
if job:
return {"status": "success", "job": job}
return {"status": "error", "message": f"Job {job_id} not found."}
except Exception as e:
logger.error(f"Failed to get job status via MCP: {e}")
return {"status": "error", "message": str(e)}
def gnn_cancel_job_mcp(job_id: str) -> Dict[str, Any]:
"""Cancel a GNN pipeline job via MCP."""
try:
success = cancel_job(job_id)
if success:
return {"status": "success", "message": f"Job {job_id} cancelled successfully."}
return {"status": "error", "message": f"Failed to cancel job {job_id}. It may not exist or is already terminal."}
except Exception as e:
logger.error(f"Failed to cancel job via MCP: {e}")
return {"status": "error", "message": str(e)}
def gnn_list_jobs_mcp(limit: int = 50) -> Dict[str, Any]:
"""List recent GNN pipeline jobs via MCP."""
try:
jobs = list_jobs(limit=limit)
return {"status": "success", "jobs": jobs, "total": len(jobs)}
except Exception as e:
logger.error(f"Failed to list jobs via MCP: {e}")
return {"status": "error", "message": str(e)}
def gnn_get_pipeline_tools_mcp() -> Dict[str, Any]:
"""List available pipeline steps via MCP."""
try:
tools = [{"step_number": step, "name": name, "description": desc} for step, (name, desc) in PIPELINE_STEPS.items()]
return {"status": "success", "tools": tools}
except Exception as e:
logger.error(f"Failed to list pipeline tools via MCP: {e}")
return {"status": "error", "message": str(e)}
def register_tools(mcp_instance) -> None:
"""Register API domain tools with the MCP server."""
mcp_instance.register_tool(
"gnn_submit_job",
gnn_submit_job_mcp,
{
"type": "object",
"properties": {
"target_dir": {"type": "string", "description": "Target directory containing GNN files"},
"steps": {"type": "array", "items": {"type": "integer"}, "description": "Specific steps to run (optional)"},
"skip_steps": {"type": "array", "items": {"type": "integer"}, "description": "Steps to skip (optional)"},
"verbose": {"type": "boolean", "default": False},
"strict": {"type": "boolean", "default": False}
},
"required": ["target_dir"]
},
"Submit a GNN pipeline processing job.",
module=__package__, category="api",
)
mcp_instance.register_tool(
"gnn_get_job_status",
gnn_get_job_status_mcp,
{
"type": "object",
"properties": {
"job_id": {"type": "string", "description": "The ID of the job to query"}
},
"required": ["job_id"]
},
"Retrieve the status of a GNN pipeline job.",
module=__package__, category="api",
)
mcp_instance.register_tool(
"gnn_cancel_job",
gnn_cancel_job_mcp,
{
"type": "object",
"properties": {
"job_id": {"type": "string", "description": "The ID of the job to cancel"}
},
"required": ["job_id"]
},
"Cancel a GNN pipeline job.",
module=__package__, category="api",
)
mcp_instance.register_tool(
"gnn_list_jobs",
gnn_list_jobs_mcp,
{
"type": "object",
"properties": {
"limit": {"type": "integer", "default": 50, "description": "Maximum number of jobs to return"}
}
},
"List recent GNN pipeline jobs.",
module=__package__, category="api",
)
mcp_instance.register_tool(
"gnn_get_pipeline_tools",
gnn_get_pipeline_tools_mcp,
{
"type": "object",
"properties": {}
},
"List available pipeline steps.",
module=__package__, category="api",
)
logger.info("api module MCP tools registered.")
# We don't delete save_mcp_manifest and register_mcp_tools completely in case
# other scripts rely on them.
MCP_TOOLS = [
{
"name": "gnn_submit_job",
"description": "Submit a GNN pipeline processing job. Accepts a target directory and optional step selection. Returns a job ID for status polling.",
"inputSchema": {
"type": "object",
"properties": {
"target_dir": {"type": "string", "description": "Directory containing GNN files", "default": "input/gnn_files"},
"steps": {"type": "array", "items": {"type": "integer"}, "description": "Steps to run (omit for all steps)"},
"verbose": {"type": "boolean", "default": False}
},
"required": []
}
},
{
"name": "gnn_job_status",
"description": "Check the status of a submitted GNN pipeline job.",
"inputSchema": {
"type": "object",
"properties": {
"job_id": {"type": "string", "description": "Job ID from gnn_submit_job"}
},
"required": ["job_id"]
}
},
{
"name": "gnn_list_tools",
"description": "List all available GNN pipeline steps and their descriptions.",
"inputSchema": {"type": "object", "properties": {}}
}
]
def register_mcp_tools() -> Dict[str, Any]:
"""Return MCP tool registration manifest."""
return {
"module": "api",
"tools": MCP_TOOLS,
"endpoint": "http://localhost:8000/api/v1",
"version": "1.0.0"
}
def save_mcp_manifest(output_dir: Path) -> bool:
"""Save MCP tool manifest to output directory."""
try:
output_dir.mkdir(parents=True, exist_ok=True)
manifest = register_mcp_tools()
manifest_path = output_dir / "api_mcp_manifest.json"
with open(manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
logger.info(f"MCP manifest saved to {manifest_path}")
return True
except Exception as e:
logger.error(f"Failed to save MCP manifest: {e}")
return False