-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathserver_core.py
More file actions
261 lines (209 loc) · 8.84 KB
/
server_core.py
File metadata and controls
261 lines (209 loc) · 8.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""
MCP Server implementation sub-module.
Provides MCPServer class for JSON-RPC 2.0 request handling,
and factory functions for server creation and management.
Extracted from mcp.py for maintainability.
"""
import json
import logging
import sys
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger("mcp")
# Import exceptions
from .exceptions import (
MCPError,
MCPInvalidParamsError,
)
# Import core MCP class and helpers
from .mcp import MCP, get_mcp_instance, initialize
class MCPServer:
"""
MCP Server implementation for handling JSON-RPC requests.
This class provides a server implementation that can handle
MCP protocol requests and responses.
"""
def __init__(self, mcp_instance: Optional[MCP] = None):
"""
Initialize the MCP server.
Args:
mcp_instance: MCP instance to use for tool execution
"""
self.mcp = mcp_instance or get_mcp_instance()
self.running = False
self.request_handlers = {
"tools/list": self._handle_tools_list,
"tools/call": self._handle_tools_call,
"resources/list": self._handle_resources_list,
"resources/read": self._handle_resources_read,
"initialize": self._handle_initialize,
"notifications/initialized": self._handle_initialized,
"shutdown": self._handle_shutdown,
"exit": self._handle_exit
}
def start(self) -> bool:
"""Start the MCP server."""
if self.running:
logger.warning("MCP server is already running")
return False
self.running = True
logger.info("MCP server started")
return True
def register_tool(self, name: str, func: Callable, schema: Dict[str, Any], description: str) -> bool:
"""
Register a tool with the server.
Args:
name: Name of the tool
func: Function to execute
schema: JSON schema for arguments
description: Description of the tool
Returns:
True if the tool is present in the registry after the call,
False if registration raised an exception.
"""
try:
self.mcp.register_tool(name, func, schema, description)
except Exception as e:
logger.error(f"register_tool failed for {name}: {e}")
return False
return name in self.mcp.tools
def stop(self) -> bool:
"""Stop the MCP server."""
if not self.running:
logger.warning("MCP server is not running")
return False
self.running = False
logger.info("MCP server stopped")
return True
def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle an incoming JSON-RPC request.
Args:
request: JSON-RPC request dictionary
Returns:
JSON-RPC response dictionary
"""
try:
if not isinstance(request, dict):
return self._create_error_response(-32700, "Parse error", "Invalid JSON")
if "jsonrpc" not in request or request["jsonrpc"] != "2.0":
return self._create_error_response(-32600, "Invalid Request", "Missing or invalid jsonrpc field")
if "method" not in request:
return self._create_error_response(-32600, "Invalid Request", "Missing method field")
method = request["method"]
params = request.get("params", {})
request_id = request.get("id")
if method in self.request_handlers:
result = self.request_handlers[method](params)
return self._create_success_response(result, request_id)
else:
if method in self.mcp.tools:
result = self.mcp.execute_tool(method, params)
return self._create_success_response(result, request_id)
else:
return self._create_error_response(-32601, "Method not found", f"Method '{method}' not found", request_id)
except MCPError as e:
return self._create_error_response(e.code, type(e).__name__, str(e), request.get("id"))
except Exception as e:
logger.error(f"Unexpected error handling request: {e}")
return self._create_error_response(-32603, "Internal error", str(e), request.get("id"))
def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle initialize request."""
return {
"protocolVersion": "2024-11-05",
"capabilities": self.mcp.get_capabilities(),
"serverInfo": {
"name": "GNN MCP Server",
"version": "1.0.0"
}
}
def _handle_initialized(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle initialized notification."""
return {}
def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tools/list request."""
return {"tools": self.mcp.get_capabilities()["tools"]}
def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tools/call request."""
tool_name = params.get("name")
tool_params = params.get("arguments", {})
if not tool_name:
raise MCPInvalidParamsError("Tool name is required")
result = self.mcp.execute_tool(tool_name, tool_params)
return {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]}
def _handle_resources_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle resources/list request."""
return {"resources": self.mcp.get_capabilities()["resources"]}
def _handle_resources_read(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle resources/read request."""
uri = params.get("uri")
if not uri:
raise MCPInvalidParamsError("Resource URI is required")
result = self.mcp.get_resource(uri)
return {"contents": [{"uri": uri, "mimeType": result["mime_type"], "text": json.dumps(result["content"])}]}
def _handle_shutdown(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle shutdown request."""
self.stop()
return {}
def _handle_exit(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle exit request."""
self.stop()
return {}
def _create_success_response(self, result: Any, request_id: Any) -> Dict[str, Any]:
"""Create a successful JSON-RPC response."""
response = {
"jsonrpc": "2.0",
"result": result
}
if request_id is not None:
response["id"] = request_id
return response
def _create_error_response(self, code: int, message: str, data: Any = None, request_id: Any = None) -> Dict[str, Any]:
"""Create an error JSON-RPC response."""
response = {
"jsonrpc": "2.0",
"error": {
"code": code,
"message": message
}
}
if data is not None:
response["error"]["data"] = data
if request_id is not None:
response["id"] = request_id
return response
def create_mcp_server(mcp_instance: Optional[MCP] = None) -> MCPServer:
"""Create a new MCP server instance."""
return MCPServer(mcp_instance)
def start_mcp_server(mcp_instance: Optional[MCP] = None) -> bool:
"""Start the MCP server."""
server = create_mcp_server(mcp_instance)
return server.start()
def register_tools(mcp: Optional[MCP] = None) -> bool:
"""
Discover all pipeline modules' ``mcp.py`` files and call their
``register_tools`` hooks against the supplied MCP instance (or the global
singleton). Returns True only when every discovered module registered
without error.
This is a thin wrapper around :meth:`MCP.discover_modules` that is
convenient for tests and scripts that already hold an MCP instance they
want to populate. Prior to 2026-04 this function was a no-op stub that
silently returned ``True`` regardless of whether any tools were actually
registered.
"""
try:
target = mcp if mcp is not None else get_mcp_instance()
return bool(target.discover_modules())
except Exception as e:
logger.error(f"Failed to register tools: {e}")
return False
if __name__ == "__main__":
try:
mcp, sdk_found, all_modules_loaded = initialize()
capabilities = mcp.get_capabilities()
print(f"Available tools: {len(capabilities['tools'])}")
print(f"Available resources: {len(capabilities['resources'])}")
status = mcp.get_server_status()
print(f"Server uptime: {status['uptime_formatted']}")
except Exception as e:
logger.error(f"Error in MCP initialization: {e}")
sys.exit(1)