Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| MCP server for Semgrep - a tool for static analysis of code | |
| """ | |
| import gradio as gr | |
| import subprocess | |
| import json | |
| import os | |
| import tempfile | |
| from typing import Dict, List, Optional | |
| from pathlib import Path | |
| def semgrep_scan( | |
| code_input: str, | |
| scan_type: str = "code", | |
| rules: str = "p/default", | |
| output_format: str = "json" | |
| ) -> Dict: | |
| """ | |
| Сканирует код с помощью Semgrep. | |
| Args: | |
| code_input (str): Код для сканирования или путь к файлу/директории | |
| scan_type (str): Тип сканирования - 'code' для прямого кода или 'path' для файла/директории | |
| rules (str): Правила для сканирования (например, 'p/default' или путь к файлу правил) | |
| output_format (str): Формат вывода - 'json' или 'text' | |
| Returns: | |
| Dict: Результаты сканирования | |
| """ | |
| try: | |
| # Создаем временный файл или используем существующий путь | |
| if scan_type == "code": | |
| # Создаем временный файл с кодом | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp_file: | |
| tmp_file.write(code_input) | |
| target_path = tmp_file.name | |
| else: | |
| # Используем существующий путь | |
| target_path = code_input | |
| if not os.path.exists(target_path): | |
| return { | |
| "error": f"Path not found: {target_path}", | |
| "success": False | |
| } | |
| # Строим команду semgrep | |
| cmd = ["semgrep", "scan"] | |
| # Добавляем правила | |
| cmd.extend(["--config", rules]) | |
| # Добавляем формат вывода | |
| if output_format == "json": | |
| cmd.extend(["--json"]) | |
| # Добавляем путь для сканирования | |
| cmd.append(target_path) | |
| # Выполняем команду | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| # Удаляем временный файл, если он был создан | |
| if scan_type == "code": | |
| try: | |
| os.unlink(target_path) | |
| except: | |
| pass | |
| # Обрабатываем результат | |
| if output_format == "json": | |
| try: | |
| output_data = json.loads(result.stdout) if result.stdout else {} | |
| return { | |
| "success": True, | |
| "results": output_data, | |
| "stderr": result.stderr, | |
| "return_code": result.returncode | |
| } | |
| except json.JSONDecodeError: | |
| return { | |
| "success": False, | |
| "error": "JSON parsing error", | |
| "stdout": result.stdout, | |
| "stderr": result.stderr, | |
| "return_code": result.returncode | |
| } | |
| else: | |
| return { | |
| "success": True, | |
| "output": result.stdout, | |
| "stderr": result.stderr, | |
| "return_code": result.returncode | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Error executing Semgrep: {str(e)}" | |
| } | |
| def semgrep_list_rules() -> Dict: | |
| """ | |
| Получает список доступных правил Semgrep. | |
| Returns: | |
| Dict: Список правил | |
| """ | |
| try: | |
| cmd = ["semgrep", "list-rules"] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| rules = [] | |
| for line in result.stdout.split('\n'): | |
| if line.strip(): | |
| rules.append(line.strip()) | |
| return { | |
| "success": True, | |
| "rules": rules | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": f"Error listing rules: {result.stderr}" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Error executing Semgrep: {str(e)}" | |
| } | |
| # Создаем Gradio интерфейс | |
| with gr.Blocks(title="Semgrep MCP") as demo: | |
| gr.Markdown("# 🔍 Semgrep Scanner") | |
| gr.Markdown("Static analysis tool with MCP support") | |
| with gr.Tab("Basic Scanning"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| scan_type = gr.Radio( | |
| choices=["code", "path"], | |
| value="code", | |
| label="Scan Type" | |
| ) | |
| code_input = gr.Textbox( | |
| lines=10, | |
| placeholder="Enter code or path to scan...", | |
| label="Code or Path" | |
| ) | |
| rules = gr.Textbox( | |
| value="p/default", | |
| label="Rules (e.g., p/default or path to rules file)" | |
| ) | |
| output_format = gr.Dropdown( | |
| choices=["json", "text"], | |
| value="json", | |
| label="Output Format" | |
| ) | |
| scan_btn = gr.Button("🔍 Scan", variant="primary") | |
| with gr.Column(): | |
| scan_output = gr.JSON(label="Scan Results") | |
| scan_btn.click( | |
| fn=semgrep_scan, | |
| inputs=[code_input, scan_type, rules, output_format], | |
| outputs=scan_output | |
| ) | |
| with gr.Tab("Available Rules"): | |
| rules_btn = gr.Button("📋 List Rules", variant="secondary") | |
| rules_output = gr.JSON(label="Available Rules") | |
| rules_btn.click( | |
| fn=semgrep_list_rules, | |
| inputs=[], | |
| outputs=rules_output | |
| ) | |
| with gr.Tab("Examples"): | |
| gr.Markdown(""" | |
| ## 🚨 Examples of code to scan: | |
| ### 1. SQL Injection | |
| ```python | |
| def get_user(user_id): | |
| query = f"SELECT * FROM users WHERE id = {user_id}" | |
| return db.execute(query) | |
| ``` | |
| ### 2. Command Injection | |
| ```python | |
| import subprocess | |
| def run_command(command): | |
| subprocess.call(f"ls {command}", shell=True) | |
| ``` | |
| ### 3. Path Traversal | |
| ```python | |
| def read_file(filename): | |
| with open(f"/home/user/{filename}", "r") as f: | |
| return f.read() | |
| ``` | |
| """) | |
| if __name__ == "__main__": | |
| # Получаем настройки сервера из переменных окружения | |
| server_name = os.getenv("GRADIO_SERVER_NAME", "0.0.0.0") | |
| server_port = int(os.getenv("GRADIO_SERVER_PORT", "7865")) | |
| demo.launch( | |
| mcp_server=True, | |
| server_name=server_name, | |
| server_port=server_port, | |
| share=False | |
| ) |