|
|
import os |
|
|
import json |
|
|
import time |
|
|
import requests |
|
|
from typing import Dict, List, Any, Optional, Union |
|
|
from pathlib import Path |
|
|
|
|
|
class RequestsMCPClient: |
|
|
""" |
|
|
A requests-based implementation of an MCP client with configurable request timeout. |
|
|
""" |
|
|
def __init__(self, config: Dict[str, Any]): |
|
|
""" |
|
|
Initialize the MCP client with extended request timeout. |
|
|
Args: |
|
|
config: Dictionary with configuration parameters. |
|
|
'url': The URL of the MCP server. |
|
|
'transport': The transport method ('sse' or 'http'). |
|
|
'timeout': Request timeout in seconds (default: 3600 seconds / 1 hour). |
|
|
""" |
|
|
self.url = config.get("url", "http://localhost:11434") |
|
|
self.transport = config.get("transport", "sse") |
|
|
|
|
|
self.timeout = config.get("timeout", 60) |
|
|
self.session = requests.Session() |
|
|
self.tools = None |
|
|
|
|
|
def get_tools(self) -> List[Any]: |
|
|
""" |
|
|
Get available tools from the MCP server. |
|
|
""" |
|
|
try: |
|
|
|
|
|
response = self.session.get(f"{self.url}/tools", timeout=60) |
|
|
response.raise_for_status() |
|
|
tools_data = response.json() |
|
|
|
|
|
class Tool: |
|
|
def __init__(self, tool_data): |
|
|
self.name = tool_data.get("name", "") |
|
|
self.description = tool_data.get("description", "") |
|
|
self.parameters = tool_data.get("parameters", {}) |
|
|
self.parent = None |
|
|
|
|
|
def __call__(self, **kwargs): |
|
|
return self.parent._call_tool(self.name, kwargs) |
|
|
|
|
|
def __repr__(self): |
|
|
return f"Tool(name='{self.name}')" |
|
|
|
|
|
self.tools = [] |
|
|
for tool_data in tools_data: |
|
|
tool = Tool(tool_data) |
|
|
tool.parent = self |
|
|
self.tools.append(tool) |
|
|
return self.tools |
|
|
except requests.exceptions.Timeout: |
|
|
print(f"Timeout when fetching tools from {self.url}") |
|
|
return [] |
|
|
except requests.RequestException as e: |
|
|
print(f"Error getting tools: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def _call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any: |
|
|
""" |
|
|
Call a tool on the MCP server with extended timeout. |
|
|
""" |
|
|
payload = { |
|
|
"name": tool_name, |
|
|
"arguments": arguments |
|
|
} |
|
|
if self.transport.lower() == "sse": |
|
|
return self._call_tool_sse(payload) |
|
|
else: |
|
|
return self._call_tool_http(payload) |
|
|
|
|
|
def _call_tool_http(self, payload: Dict[str, Any]) -> Any: |
|
|
"""Call a tool using HTTP transport with extended timeout.""" |
|
|
try: |
|
|
print(f"Making HTTP request to {self.url}/run with timeout {self.timeout}s") |
|
|
|
|
|
|
|
|
response = self.session.post( |
|
|
f"{self.url}/run", |
|
|
json=payload, |
|
|
timeout=self.timeout |
|
|
) |
|
|
response.raise_for_status() |
|
|
return response.json().get("result") |
|
|
except requests.exceptions.Timeout: |
|
|
return f"Error: Request timed out after {self.timeout} seconds" |
|
|
except requests.RequestException as e: |
|
|
print(f"Error calling tool via HTTP: {str(e)}") |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
def _call_tool_sse(self, payload: Dict[str, Any]) -> Any: |
|
|
"""Call a tool using Server-Sent Events (SSE) transport with extended timeout.""" |
|
|
try: |
|
|
print(f"Making SSE request to {self.url}/run with timeout {self.timeout}s") |
|
|
|
|
|
|
|
|
response = self.session.post( |
|
|
f"{self.url}/run", |
|
|
json=payload, |
|
|
stream=True, |
|
|
headers={"Accept": "text/event-stream"}, |
|
|
timeout=self.timeout |
|
|
) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
complete_result = "" |
|
|
for line in response.iter_lines(decode_unicode=True): |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
if line.startswith("data:"): |
|
|
data = line[5:].strip() |
|
|
try: |
|
|
json_data = json.loads(data) |
|
|
if "error" in json_data: |
|
|
return f"Error: {json_data['error']}" |
|
|
if "result" in json_data: |
|
|
complete_result += json_data["result"] |
|
|
except json.JSONDecodeError: |
|
|
|
|
|
complete_result += data |
|
|
elif line.startswith("event:") and "done" in line: |
|
|
break |
|
|
return complete_result |
|
|
except requests.exceptions.Timeout: |
|
|
return f"Error: Request timed out after {self.timeout} seconds" |
|
|
except requests.RequestException as e: |
|
|
print(f"Error calling tool via SSE: {str(e)}") |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
def disconnect(self): |
|
|
"""Close the session.""" |
|
|
if self.session: |
|
|
self.session.close() |
|
|
print("MCP client session closed.") |
|
|
|
|
|
def query_mcp(prompt, mcp_server_url="http://localhost:11434", model="codellama", |
|
|
transport="sse", timeout=36): |
|
|
""" |
|
|
Query a model using the MCP protocol with extended request timeout. |
|
|
|
|
|
Args: |
|
|
prompt (str): The text prompt to send to the model |
|
|
mcp_server_url (str): URL of the MCP server |
|
|
model (str): Model name to use |
|
|
transport (str): Transport method ('http' or 'sse') |
|
|
timeout (int): Request timeout in seconds (default: 60) |
|
|
|
|
|
Returns: |
|
|
str: The model's response |
|
|
""" |
|
|
mcp_client = None |
|
|
try: |
|
|
|
|
|
mcp_client = RequestsMCPClient({ |
|
|
"url": mcp_server_url, |
|
|
"transport": transport, |
|
|
"timeout": timeout |
|
|
}) |
|
|
print(f"Connecting to MCP server at {mcp_server_url} with {timeout}s timeout") |
|
|
|
|
|
tools = mcp_client.get_tools() |
|
|
if not tools: |
|
|
return f"Error: No tools available from the MCP server at {mcp_server_url}" |
|
|
|
|
|
predict_tool = next((t for t in tools if "predict" in t.name.lower()), None) |
|
|
generate_tool = next((t for t in tools if "generate" in t.name.lower()), None) |
|
|
complete_tool = next((t for t in tools if "complete" in t.name.lower()), None) |
|
|
|
|
|
|
|
|
tool = predict_tool or generate_tool or complete_tool or tools[0] |
|
|
print(f"Using MCP tool: {tool.name}") |
|
|
|
|
|
tool_args = { |
|
|
"model": model, |
|
|
"prompt": prompt |
|
|
} |
|
|
|
|
|
start_time = time.time() |
|
|
print(f"Starting tool call at {time.strftime('%H:%M:%S')}") |
|
|
result = tool(**tool_args) |
|
|
elapsed = time.time() - start_time |
|
|
print(f"Tool call completed in {elapsed:.2f} seconds") |
|
|
return result |
|
|
except Exception as e: |
|
|
error_message = f"Error using MCP client: {str(e)}" |
|
|
print(error_message) |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return error_message |
|
|
finally: |
|
|
if mcp_client: |
|
|
try: |
|
|
mcp_client.disconnect() |
|
|
print("MCP client disconnected") |
|
|
except Exception as e_disconnect: |
|
|
print(f"Error disconnecting MCP client: {e_disconnect}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|