|
|
""" |
|
|
Server management utilities for Universal MCP Client |
|
|
Version 2.0 - Enhanced with server enabling/disabling support |
|
|
""" |
|
|
import asyncio |
|
|
import re |
|
|
import logging |
|
|
import traceback |
|
|
from typing import Tuple |
|
|
|
|
|
from config import MCPServerConfig |
|
|
from mcp_client import UniversalMCPClient |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class ServerManager: |
|
|
"""Manages MCP server connections and status with enhanced server management""" |
|
|
|
|
|
def __init__(self, mcp_client: UniversalMCPClient): |
|
|
self.mcp_client = mcp_client |
|
|
|
|
|
def convert_hf_space_to_url(self, space_name: str) -> str: |
|
|
""" |
|
|
Convert HuggingFace space name to proper URL format. |
|
|
|
|
|
HuggingFace URL rules: |
|
|
- Replace "/" with "-" |
|
|
- Convert to lowercase |
|
|
- Replace dots and other special chars with "-" |
|
|
- Remove consecutive hyphens |
|
|
""" |
|
|
if "/" not in space_name: |
|
|
raise ValueError("Space name should be in format: username/space-name") |
|
|
|
|
|
|
|
|
url_name = space_name.replace("/", "-") |
|
|
|
|
|
|
|
|
url_name = url_name.lower() |
|
|
|
|
|
|
|
|
url_name = re.sub(r'[^a-z0-9\-]', '-', url_name) |
|
|
|
|
|
|
|
|
url_name = re.sub(r'-+', '-', url_name) |
|
|
|
|
|
|
|
|
url_name = url_name.strip('-') |
|
|
|
|
|
return f"https://{url_name}.hf.space" |
|
|
|
|
|
def add_custom_server(self, name: str, space_name: str) -> Tuple[str, str]: |
|
|
"""Add a custom MCP server from HuggingFace space name""" |
|
|
logger.info(f"β Adding MCP server: {name} from space: {space_name}") |
|
|
|
|
|
if not name or not space_name: |
|
|
return "β Please provide both server name and space name", "" |
|
|
|
|
|
space_name = space_name.strip() |
|
|
|
|
|
try: |
|
|
|
|
|
mcp_url = self.convert_hf_space_to_url(space_name) |
|
|
logger.info(f"π Converted {space_name} β {mcp_url}") |
|
|
|
|
|
except ValueError as e: |
|
|
return f"β {str(e)}", "" |
|
|
|
|
|
config = MCPServerConfig( |
|
|
name=name.strip(), |
|
|
url=mcp_url, |
|
|
description=f"MCP server from HuggingFace space: {space_name}", |
|
|
space_id=space_name |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
def run_async(): |
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
try: |
|
|
return loop.run_until_complete(self.mcp_client.add_server_async(config)) |
|
|
finally: |
|
|
loop.close() |
|
|
|
|
|
success, message = run_async() |
|
|
logger.info(f"Server addition result: {success} - {message}") |
|
|
|
|
|
if success: |
|
|
|
|
|
tools_info = "" |
|
|
if 'Found' in message and 'tools:' in message: |
|
|
tools_section = message.split('Found')[1] |
|
|
tools_info = f"**Available Tools:**\n{tools_section}" |
|
|
|
|
|
details_html = f""" |
|
|
<details style="margin-top: 10px;"> |
|
|
<summary style="cursor: pointer; padding: 8px; background: #f0f0f0; border-radius: 4px;"><strong>β
{name} - Connection Details</strong></summary> |
|
|
<div style="padding: 10px; border-left: 3px solid #28a745; margin-left: 10px; margin-top: 5px;"> |
|
|
<p><strong>Space:</strong> {space_name}</p> |
|
|
<p><strong>Base URL:</strong> {mcp_url}</p> |
|
|
<p><strong>Status:</strong> Connected successfully!</p> |
|
|
<p><strong>Enabled:</strong> Yes (new servers are enabled by default)</p> |
|
|
<div style="margin-top: 10px;"> |
|
|
{tools_info.replace('**', '<strong>').replace('**', '</strong>').replace(chr(10), '<br>')} |
|
|
</div> |
|
|
</div> |
|
|
</details> |
|
|
""" |
|
|
return "β
Server added successfully!", details_html |
|
|
else: |
|
|
error_html = f""" |
|
|
<details style="margin-top: 10px;"> |
|
|
<summary style="cursor: pointer; padding: 8px; background: #f8d7da; border-radius: 4px;"><strong>β {name} - Connection Failed</strong></summary> |
|
|
<div style="padding: 10px; border-left: 3px solid #dc3545; margin-left: 10px; margin-top: 5px;"> |
|
|
<p>{message}</p> |
|
|
</div> |
|
|
</details> |
|
|
""" |
|
|
return f"β Failed to add server: {name}", error_html |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"β Failed to add server: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
return error_msg, "" |
|
|
|
|
|
def get_server_status(self) -> Tuple[str, str]: |
|
|
"""Get status of all servers in accordion format""" |
|
|
try: |
|
|
logger.info(f"π Getting server status, found {len(self.mcp_client.servers)} servers") |
|
|
|
|
|
|
|
|
server_count = len(self.mcp_client.servers) |
|
|
server_status_text = f"**Total MCP Servers**: {server_count}" |
|
|
|
|
|
if not self.mcp_client.servers: |
|
|
logger.info("π No servers found") |
|
|
return server_status_text, "<p><em>No MCP servers configured yet.</em></p>" |
|
|
|
|
|
accordion_html = "" |
|
|
|
|
|
for name, config in self.mcp_client.servers.items(): |
|
|
logger.info(f"π Processing server: {name}") |
|
|
base_url = config.url.replace("/gradio_api/mcp/sse", "") |
|
|
|
|
|
|
|
|
health = "π’ Healthy" |
|
|
|
|
|
accordion_html += f""" |
|
|
<details style="margin-bottom: 10px;"> |
|
|
<summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;"><strong>π§ {name}</strong></summary> |
|
|
<div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;"> |
|
|
<p><strong>Title:</strong> {name}</p> |
|
|
<p><strong>Status:</strong> Connected (MCP Protocol)</p> |
|
|
<p><strong>Health:</strong> {health}</p> |
|
|
<p><strong>Base URL:</strong> {base_url}</p> |
|
|
<p><strong>Description:</strong> {config.description}</p> |
|
|
{f'<p><strong>Space ID:</strong> {config.space_id}</p>' if config.space_id else ''} |
|
|
</div> |
|
|
</details> |
|
|
""" |
|
|
|
|
|
logger.info(f"π Generated accordion HTML for {server_count} servers") |
|
|
return server_status_text, accordion_html |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error getting server status: {str(e)}" |
|
|
logger.error(f"β {error_msg}") |
|
|
logger.error(traceback.format_exc()) |
|
|
return "**Total MCP Servers**: Error", f"<p style='color: red;'>β {error_msg}</p>" |
|
|
|
|
|
def remove_server(self, server_name: str) -> Tuple[str, str]: |
|
|
"""Remove a specific server""" |
|
|
success = self.mcp_client.remove_server(server_name) |
|
|
if success: |
|
|
return f"β
Removed server: {server_name}", "" |
|
|
else: |
|
|
return f"β Server not found: {server_name}", "" |
|
|
|
|
|
def remove_all_servers(self) -> Tuple[str, str]: |
|
|
"""Remove all servers""" |
|
|
count = self.mcp_client.remove_all_servers() |
|
|
if count > 0: |
|
|
return f"β
Removed all {count} MCP servers", "" |
|
|
else: |
|
|
return "βΉοΈ No servers to remove", "" |
|
|
|
|
|
def enable_server(self, server_name: str, enabled: bool = True) -> Tuple[str, str]: |
|
|
"""Enable or disable a server""" |
|
|
if server_name in self.mcp_client.servers: |
|
|
self.mcp_client.enable_server(server_name, enabled) |
|
|
status = "enabled" if enabled else "disabled" |
|
|
return f"β
Server {server_name} {status}", "" |
|
|
else: |
|
|
return f"β Server not found: {server_name}", "" |
|
|
|
|
|
def get_server_list_with_status(self) -> list: |
|
|
"""Get server list with enable/disable status for UI components""" |
|
|
return self.mcp_client.get_server_list_with_status() |
|
|
|