|
|
""" |
|
|
Utility functions for Universal MCP Client |
|
|
""" |
|
|
import re |
|
|
import logging |
|
|
from typing import List, Dict, Any, Optional |
|
|
from pathlib import Path |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def validate_huggingface_space_name(space_name: str) -> bool: |
|
|
""" |
|
|
Validate HuggingFace space name format |
|
|
Expected format: username/space-name |
|
|
""" |
|
|
if not space_name or not isinstance(space_name, str): |
|
|
return False |
|
|
|
|
|
|
|
|
if "/" not in space_name: |
|
|
return False |
|
|
|
|
|
parts = space_name.split("/") |
|
|
if len(parts) != 2: |
|
|
return False |
|
|
|
|
|
username, space_name_part = parts |
|
|
|
|
|
|
|
|
|
|
|
username_pattern = r'^[a-zA-Z0-9\-_]+$' |
|
|
space_pattern = r'^[a-zA-Z0-9\-_]+$' |
|
|
|
|
|
return bool(re.match(username_pattern, username) and re.match(space_pattern, space_name_part)) |
|
|
|
|
|
def sanitize_server_name(name: str) -> str: |
|
|
""" |
|
|
Sanitize server name for use as MCP server identifier |
|
|
""" |
|
|
if not name: |
|
|
return "unnamed_server" |
|
|
|
|
|
|
|
|
sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name.lower()) |
|
|
|
|
|
|
|
|
sanitized = re.sub(r'_+', '_', sanitized) |
|
|
|
|
|
|
|
|
sanitized = sanitized.strip('_') |
|
|
|
|
|
return sanitized or "unnamed_server" |
|
|
|
|
|
def format_file_size(size_bytes: int) -> str: |
|
|
""" |
|
|
Format file size in human readable format |
|
|
""" |
|
|
if size_bytes == 0: |
|
|
return "0 B" |
|
|
|
|
|
size_names = ["B", "KB", "MB", "GB", "TB"] |
|
|
i = 0 |
|
|
while size_bytes >= 1024 and i < len(size_names) - 1: |
|
|
size_bytes /= 1024.0 |
|
|
i += 1 |
|
|
|
|
|
return f"{size_bytes:.1f} {size_names[i]}" |
|
|
|
|
|
def get_file_info(file_path: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get information about a file |
|
|
""" |
|
|
try: |
|
|
path = Path(file_path) |
|
|
if not path.exists(): |
|
|
return {"error": "File not found"} |
|
|
|
|
|
stat = path.stat() |
|
|
|
|
|
return { |
|
|
"name": path.name, |
|
|
"size": stat.st_size, |
|
|
"size_formatted": format_file_size(stat.st_size), |
|
|
"extension": path.suffix.lower(), |
|
|
"exists": True |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting file info for {file_path}: {e}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: |
|
|
""" |
|
|
Truncate text to a maximum length with suffix |
|
|
""" |
|
|
if not text or len(text) <= max_length: |
|
|
return text |
|
|
|
|
|
return text[:max_length - len(suffix)] + suffix |
|
|
|
|
|
def format_tool_description(tool_name: str, description: str, max_desc_length: int = 150) -> str: |
|
|
""" |
|
|
Format tool description for display |
|
|
""" |
|
|
formatted_name = tool_name.replace("_", " ").title() |
|
|
truncated_desc = truncate_text(description, max_desc_length) |
|
|
|
|
|
return f"**{formatted_name}**: {truncated_desc}" |
|
|
|
|
|
def extract_media_type_from_url(url: str) -> Optional[str]: |
|
|
""" |
|
|
Extract media type from URL based on file extension |
|
|
""" |
|
|
if not url: |
|
|
return None |
|
|
|
|
|
|
|
|
if url.startswith('data:'): |
|
|
if 'image/' in url: |
|
|
return 'image' |
|
|
elif 'audio/' in url: |
|
|
return 'audio' |
|
|
elif 'video/' in url: |
|
|
return 'video' |
|
|
return None |
|
|
|
|
|
|
|
|
url_lower = url.lower() |
|
|
|
|
|
if any(ext in url_lower for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.svg']): |
|
|
return 'image' |
|
|
elif any(ext in url_lower for ext in ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac']): |
|
|
return 'audio' |
|
|
elif any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']): |
|
|
return 'video' |
|
|
|
|
|
return None |
|
|
|
|
|
def clean_html_for_display(html_text: str) -> str: |
|
|
""" |
|
|
Clean HTML text for safe display in Gradio |
|
|
""" |
|
|
if not html_text: |
|
|
return "" |
|
|
|
|
|
|
|
|
html_text = re.sub(r'<script[^>]*>.*?</script>', '', html_text, flags=re.IGNORECASE | re.DOTALL) |
|
|
|
|
|
|
|
|
html_text = re.sub(r'on\w+\s*=\s*["\'][^"\']*["\']', '', html_text, flags=re.IGNORECASE) |
|
|
|
|
|
return html_text |
|
|
|
|
|
def generate_accordion_html(title: str, content: str, is_open: bool = False) -> str: |
|
|
""" |
|
|
Generate HTML for a collapsible accordion section |
|
|
""" |
|
|
open_attr = "open" if is_open else "" |
|
|
|
|
|
return f""" |
|
|
<details {open_attr} style="margin-bottom: 10px;"> |
|
|
<summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;"> |
|
|
<strong>{title}</strong> |
|
|
</summary> |
|
|
<div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;"> |
|
|
{content} |
|
|
</div> |
|
|
</details> |
|
|
""" |
|
|
|
|
|
class EventTracker: |
|
|
"""Simple event tracking for debugging and monitoring""" |
|
|
|
|
|
def __init__(self): |
|
|
self.events: List[Dict[str, Any]] = [] |
|
|
self.max_events = 100 |
|
|
|
|
|
def track_event(self, event_type: str, data: Dict[str, Any] = None): |
|
|
"""Track an event""" |
|
|
import datetime |
|
|
|
|
|
event = { |
|
|
"timestamp": datetime.datetime.now().isoformat(), |
|
|
"type": event_type, |
|
|
"data": data or {} |
|
|
} |
|
|
|
|
|
self.events.append(event) |
|
|
|
|
|
|
|
|
if len(self.events) > self.max_events: |
|
|
self.events = self.events[-self.max_events:] |
|
|
|
|
|
logger.debug(f"Event tracked: {event_type}") |
|
|
|
|
|
def get_recent_events(self, count: int = 10) -> List[Dict[str, Any]]: |
|
|
"""Get recent events""" |
|
|
return self.events[-count:] |
|
|
|
|
|
def clear_events(self): |
|
|
"""Clear all tracked events""" |
|
|
self.events.clear() |
|
|
|
|
|
|
|
|
event_tracker = EventTracker() |
|
|
|
|
|
|