Spaces:
Runtime error
Runtime error
| import asyncio | |
| import base64 | |
| import json | |
| import os | |
| import pathlib | |
| from typing import AsyncGenerator, Dict, List, Any, Tuple, Optional, Set, Literal | |
| import gradio as gr | |
| import numpy as np | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI | |
| from fastapi.responses import HTMLResponse | |
| from fastrtc import ( | |
| AsyncStreamHandler, | |
| Stream, | |
| get_twilio_turn_credentials, | |
| wait_for_item, | |
| ) | |
| from github import Github | |
| import google.generativeai as genai | |
| from google.genai.types import ( | |
| LiveConnectConfig, | |
| PrebuiltVoiceConfig, | |
| SpeechConfig, | |
| VoiceConfig, | |
| ) | |
| from gradio.utils import get_space | |
| from pydantic import BaseModel | |
| from collections import defaultdict | |
| import base64 | |
| from pathlib import Path | |
| import tempfile | |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type | |
| import re | |
| import requests | |
| from datetime import datetime | |
| # Set up paths | |
| current_dir = pathlib.Path(__file__).parent | |
| index_html_path = current_dir / "index.html" | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure API keys | |
| GITHUB_TOKEN = os.getenv("GITHUB_API_TOKEN") | |
| GEMINI_API_KEY = "AIzaSyB-ZGVbyVWylBhVvD4f4DQLcKrj0yKiw4E" | |
| TWILIO_ACCOUNT_SID = "AC44b92376582bdbb9f566ba82940a021e" | |
| TWILIO_AUTH_TOKEN = "673a36527d5420a7e43b5a20df7b4ed8" | |
| if not GITHUB_TOKEN: | |
| GITHUB_TOKEN = "YOUR_GITHUB_TOKEN" # Will be replaced by user input | |
| if not GEMINI_API_KEY: | |
| GEMINI_API_KEY = "YOUR_GEMINI_API_KEY" # Will be replaced by user input | |
| # Initialize GitHub API | |
| gh = None | |
| # Configure Gemini model | |
| def configure_gemini(api_key): | |
| genai.configure(api_key=api_key) | |
| return genai.GenerativeModel( | |
| model_name="gemini-1.5-pro-latest", | |
| generation_config={ | |
| "temperature": 0.7, | |
| "top_p": 0.95, | |
| "top_k": 40, | |
| "max_output_tokens": 8192, | |
| }, | |
| safety_settings=[ | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| ] | |
| ) | |
| # Configure Gemini client for voice | |
| def create_gemini_client(api_key): | |
| return genai.Client( | |
| api_key=api_key, | |
| http_options={"api_version": "v1alpha"}, | |
| ) | |
| # Audio encoding function | |
| def encode_audio(data: np.ndarray) -> str: | |
| """Encode Audio data to send to the server""" | |
| return base64.b64encode(data.tobytes()).decode("UTF-8") | |
| # Code file extensions to analyze | |
| RELEVANT_EXTENSIONS = { | |
| ".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".cpp", ".c", ".h", | |
| ".hpp", ".rb", ".php", ".go", ".rs", ".swift", ".kt", ".cs", ".css", | |
| ".html", ".xml", ".json", ".yaml", ".yml", ".md", ".sh", ".bat" | |
| } | |
| # Repository analysis class | |
| class RepositoryAnalyzer: | |
| """Handles GitHub repository analysis""" | |
| def __init__(self, repo_url: str, github_token: str): | |
| # Extract owner and repo name from URL | |
| parts = repo_url.rstrip('/').split('/') | |
| if len(parts) < 2: | |
| raise ValueError("Invalid repository URL format") | |
| self.repo_name = parts[-1] | |
| self.owner = parts[-2] | |
| self.repo_url = repo_url | |
| self.github_token = github_token | |
| # Initialize GitHub API | |
| self.gh = Github(github_token) | |
| self.repo = self.gh.get_repo(f"{self.owner}/{self.repo_name}") | |
| self.analysis_data: Dict[str, Any] = {} | |
| # Store repository content cache | |
| self.file_content_cache = {} | |
| def analyze(self, progress_callback=None) -> Dict[str, Any]: | |
| """Perform complete repository analysis""" | |
| try: | |
| if progress_callback: | |
| progress_callback(0.1, "Fetching basic repository information...") | |
| # Basic repository information | |
| self.analysis_data["basic_info"] = self._get_basic_info() | |
| if progress_callback: | |
| progress_callback(0.2, "Analyzing repository structure...") | |
| # Analyze repository structure | |
| self.analysis_data["structure"] = self._analyze_structure() | |
| if progress_callback: | |
| progress_callback(0.3, "Analyzing repository dependencies...") | |
| # Analyze dependencies | |
| self.analysis_data["dependencies"] = self._analyze_dependencies() | |
| if progress_callback: | |
| progress_callback(0.4, "Analyzing code patterns...") | |
| # Analyze code patterns | |
| self.analysis_data["code_patterns"] = self._analyze_code_patterns() | |
| if progress_callback: | |
| progress_callback(0.6, "Analyzing commit history...") | |
| # Analyze commit history | |
| self.analysis_data["commit_history"] = self._analyze_commits() | |
| if progress_callback: | |
| progress_callback(0.8, "Analyzing contributors...") | |
| # Get contributor statistics | |
| self.analysis_data["contributors"] = self._analyze_contributors() | |
| if progress_callback: | |
| progress_callback(0.9, "Analyzing pull requests and issues...") | |
| # Analyze pull requests and issues | |
| self.analysis_data["pull_requests"] = self._analyze_pull_requests() | |
| self.analysis_data["issues"] = self._analyze_issues() | |
| if progress_callback: | |
| progress_callback(1.0, "Analysis complete!") | |
| return self.analysis_data | |
| except Exception as e: | |
| raise Exception(f"Error analyzing repository: {str(e)}") | |
| def _get_basic_info(self) -> Dict[str, Any]: | |
| """Get basic repository information""" | |
| return { | |
| "name": self.repo.name, | |
| "owner": self.repo.owner.login, | |
| "description": self.repo.description or "No description available", | |
| "stars": self.repo.stargazers_count, | |
| "forks": self.repo.forks_count, | |
| "watchers": self.repo.watchers_count, | |
| "created_at": self.repo.created_at.isoformat(), | |
| "last_updated": self.repo.updated_at.isoformat(), | |
| "primary_language": self.repo.language or "Not specified", | |
| "license": self.repo.license.name if self.repo.license else "No license specified", | |
| "open_issues_count": self.repo.open_issues_count, | |
| "is_archived": self.repo.archived, | |
| "is_fork": self.repo.fork, | |
| "homepage": self.repo.homepage, | |
| "url": self.repo.html_url, | |
| "size": self.repo.size, | |
| "topics": self.repo.get_topics(), | |
| } | |
| def _analyze_structure(self) -> Dict[str, Any]: | |
| """Analyze repository structure and organization""" | |
| structure = { | |
| "files": defaultdict(int), | |
| "directories": set(), | |
| "total_size": 0, | |
| "readme": None, | |
| "license": None, | |
| "gitignore": None, | |
| "workflow_files": [], | |
| "test_directories": [], | |
| "docs_directories": [], | |
| } | |
| try: | |
| # Check for root-level special files | |
| try: | |
| readme_content = self.repo.get_readme() | |
| structure["readme"] = { | |
| "path": readme_content.path, | |
| "size": readme_content.size, | |
| } | |
| except: | |
| pass | |
| try: | |
| license_content = self.repo.get_license() | |
| structure["license"] = { | |
| "path": license_content.path, | |
| "size": license_content.size, | |
| "name": license_content.license.name if license_content.license else "Unknown" | |
| } | |
| except: | |
| pass | |
| # Analyze repository structure recursively | |
| contents = self.repo.get_contents("") | |
| while contents: | |
| content = contents.pop(0) | |
| # Identify special files | |
| if content.path.lower() == ".gitignore": | |
| structure["gitignore"] = content.path | |
| elif content.path.startswith(".github/workflows/") and content.type == "file": | |
| structure["workflow_files"].append(content.path) | |
| # Track directories | |
| if content.type == "dir": | |
| structure["directories"].add(content.path) | |
| # Identify special directories | |
| path_lower = content.path.lower() | |
| if "test" in path_lower or path_lower.endswith("tests"): | |
| structure["test_directories"].append(content.path) | |
| elif "doc" in path_lower or path_lower.endswith("docs"): | |
| structure["docs_directories"].append(content.path) | |
| # Get directory contents | |
| try: | |
| contents.extend(self.repo.get_contents(content.path)) | |
| except Exception as e: | |
| print(f"Error getting contents of directory {content.path}: {str(e)}") | |
| # Track files | |
| else: | |
| ext = Path(content.path).suffix.lower() | |
| structure["files"][ext] += 1 | |
| structure["total_size"] += content.size | |
| except Exception as e: | |
| print(f"Error analyzing structure: {str(e)}") | |
| return { | |
| "file_types": dict(structure["files"]), | |
| "directory_count": len(structure["directories"]), | |
| "total_size": structure["total_size"], | |
| "file_count": sum(structure["files"].values()), | |
| "readme": structure["readme"], | |
| "license": structure["license"], | |
| "gitignore": structure["gitignore"], | |
| "workflow_files": structure["workflow_files"], | |
| "test_directories": structure["test_directories"], | |
| "docs_directories": structure["docs_directories"], | |
| } | |
| def _analyze_dependencies(self) -> Dict[str, Any]: | |
| """Analyze repository dependencies""" | |
| dependencies = { | |
| "package_managers": [], | |
| "dependencies": {}, | |
| "has_lockfiles": False, | |
| } | |
| dependency_files = { | |
| "requirements.txt": "pip", | |
| "setup.py": "pip", | |
| "pyproject.toml": "poetry/pip", | |
| "Pipfile": "pipenv", | |
| "package.json": "npm", | |
| "pom.xml": "maven", | |
| "build.gradle": "gradle", | |
| "Gemfile": "bundler", | |
| "Cargo.toml": "cargo", | |
| "go.mod": "go", | |
| "composer.json": "composer", | |
| } | |
| lockfiles = [ | |
| "package-lock.json", "yarn.lock", "Pipfile.lock", "poetry.lock", | |
| "Gemfile.lock", "Cargo.lock", "composer.lock", "go.sum" | |
| ] | |
| try: | |
| for file_path, package_manager in dependency_files.items(): | |
| try: | |
| content = self.repo.get_contents(file_path) | |
| if content: | |
| dependencies["package_managers"].append(package_manager) | |
| # Parse dependencies from common files | |
| if file_path == "requirements.txt": | |
| file_content = base64.b64decode(content.content).decode('utf-8') | |
| deps = [line.strip().split('==')[0] for line in file_content.split('\n') | |
| if line.strip() and not line.strip().startswith('#')] | |
| dependencies["dependencies"]["pip"] = deps | |
| elif file_path == "package.json": | |
| file_content = base64.b64decode(content.content).decode('utf-8') | |
| pkg_json = json.loads(file_content) | |
| deps = list(pkg_json.get("dependencies", {}).keys()) | |
| dev_deps = list(pkg_json.get("devDependencies", {}).keys()) | |
| dependencies["dependencies"]["npm"] = { | |
| "dependencies": deps, | |
| "devDependencies": dev_deps | |
| } | |
| except: | |
| pass | |
| # Check for lock files | |
| for lockfile in lockfiles: | |
| try: | |
| if self.repo.get_contents(lockfile): | |
| dependencies["has_lockfiles"] = True | |
| break | |
| except: | |
| pass | |
| except Exception as e: | |
| print(f"Error analyzing dependencies: {str(e)}") | |
| return dependencies | |
| def _analyze_code_patterns(self) -> Dict[str, Any]: | |
| """Analyze code patterns and style""" | |
| patterns = { | |
| "samples": [], | |
| "languages": defaultdict(int), | |
| "complexity_metrics": defaultdict(list), | |
| "documentation_ratio": 0, | |
| "avg_code_to_comment_ratio": 0, | |
| } | |
| try: | |
| files = self.repo.get_contents("") | |
| analyzed = 0 | |
| total_comments = 0 | |
| total_code = 0 | |
| while files and analyzed < 10: # Analyze up to 10 files | |
| file = files.pop(0) | |
| if file.type == "dir": | |
| files.extend(self.repo.get_contents(file.path)) | |
| elif Path(file.path).suffix.lower() in RELEVANT_EXTENSIONS: | |
| try: | |
| content = base64.b64decode(file.content).decode('utf-8') | |
| lines = content.splitlines() | |
| if not lines: | |
| continue | |
| # Count code and comment lines | |
| code_lines = 0 | |
| comment_lines = 0 | |
| empty_lines = 0 | |
| ext = Path(file.path).suffix.lower() | |
| # Simple comment detection based on file type | |
| comment_markers = { | |
| ".py": ["#"], | |
| ".js": ["//", "/*"], | |
| ".ts": ["//", "/*"], | |
| ".jsx": ["//", "/*"], | |
| ".tsx": ["//", "/*"], | |
| ".java": ["//", "/*"], | |
| ".cpp": ["//", "/*"], | |
| ".c": ["//", "/*"], | |
| ".h": ["//", "/*"], | |
| ".hpp": ["//", "/*"], | |
| ".rb": ["#"], | |
| ".php": ["//", "/*", "#"], | |
| ".go": ["//", "/*"], | |
| ".rs": ["//", "/*"], | |
| ".swift": ["//", "/*"], | |
| ".kt": ["//", "/*"], | |
| ".cs": ["//", "/*"], | |
| } | |
| if ext in comment_markers: | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| empty_lines += 1 | |
| elif any(line.startswith(marker) for marker in comment_markers[ext]): | |
| comment_lines += 1 | |
| else: | |
| code_lines += 1 | |
| else: | |
| # Default counting for unknown file types | |
| code_lines = len([line for line in lines if line.strip()]) | |
| total_code += code_lines | |
| total_comments += comment_lines | |
| # Calculate metrics | |
| loc = len([line for line in lines if line.strip()]) | |
| avg_line_length = sum(len(line) for line in lines if line) / max(1, len([line for line in lines if line])) | |
| comment_ratio = comment_lines / max(1, code_lines + comment_lines) | |
| # Store file analysis | |
| patterns["samples"].append({ | |
| "path": file.path, | |
| "language": Path(file.path).suffix[1:], | |
| "loc": loc, | |
| "code_lines": code_lines, | |
| "comment_lines": comment_lines, | |
| "empty_lines": empty_lines, | |
| "comment_ratio": round(comment_ratio, 2), | |
| "avg_line_length": round(avg_line_length, 2) | |
| }) | |
| patterns["languages"][Path(file.path).suffix[1:]] += loc | |
| patterns["complexity_metrics"]["loc"].append(loc) | |
| patterns["complexity_metrics"]["avg_line_length"].append(avg_line_length) | |
| patterns["complexity_metrics"]["comment_ratio"].append(comment_ratio) | |
| analyzed += 1 | |
| # Store file content in cache for later use | |
| self.file_content_cache[file.path] = content | |
| except Exception as e: | |
| print(f"Error analyzing file {file.path}: {str(e)}") | |
| continue | |
| # Calculate aggregate metrics | |
| if analyzed > 0: | |
| patterns["documentation_ratio"] = round(sum(patterns["complexity_metrics"]["comment_ratio"]) / analyzed, 2) | |
| patterns["avg_code_to_comment_ratio"] = round(total_code / max(1, total_comments), 2) | |
| except Exception as e: | |
| print(f"Error in code pattern analysis: {str(e)}") | |
| return patterns | |
| def _analyze_commits(self) -> Dict[str, Any]: | |
| """Analyze commit history and patterns""" | |
| commit_data = [] | |
| commit_times = [] | |
| commit_days = [] | |
| commit_authors = defaultdict(int) | |
| commit_messages = [] | |
| recent_activity = [] | |
| try: | |
| commits = list(self.repo.get_commits()[:100]) # Get last 100 commits | |
| for commit in commits: | |
| try: | |
| # Get commit details | |
| commit_info = { | |
| "sha": commit.sha, | |
| "author": commit.author.login if commit.author else "Unknown", | |
| "date": commit.commit.author.date.isoformat(), | |
| "message": commit.commit.message, | |
| "changes": { | |
| "additions": commit.stats.additions, | |
| "deletions": commit.stats.deletions, | |
| } | |
| } | |
| # Track commit data | |
| commit_data.append(commit_info) | |
| commit_times.append(commit.commit.author.date.hour) | |
| commit_days.append(commit.commit.author.date.weekday()) | |
| # Track author statistics | |
| author = commit.author.login if commit.author else "Unknown" | |
| commit_authors[author] += 1 | |
| # Track commit messages | |
| commit_messages.append(commit.commit.message) | |
| # Track recent activity (last 10 commits) | |
| if len(recent_activity) < 10: | |
| recent_activity.append({ | |
| "author": author, | |
| "date": commit.commit.author.date.isoformat(), | |
| "message": commit.commit.message[:100] + ("..." if len(commit.commit.message) > 100 else ""), | |
| }) | |
| except Exception as e: | |
| print(f"Error processing commit {commit.sha}: {str(e)}") | |
| continue | |
| # Analyze commit patterns | |
| commit_hours = defaultdict(int) | |
| for hour in commit_times: | |
| commit_hours[hour] += 1 | |
| commit_weekdays = defaultdict(int) | |
| for day in commit_days: | |
| commit_weekdays[day] += 1 | |
| # Analyze release patterns (by tag) | |
| releases = [] | |
| for tag in self.repo.get_tags()[:10]: # Get last 10 tags | |
| try: | |
| releases.append({ | |
| "name": tag.name, | |
| "commit": tag.commit.sha, | |
| "date": tag.commit.commit.author.date.isoformat(), | |
| }) | |
| except Exception as e: | |
| print(f"Error processing tag {tag.name}: {str(e)}") | |
| continue | |
| total_commits = len(commit_data) | |
| return { | |
| "total_commits": total_commits, | |
| "commit_hours": dict(commit_hours), | |
| "commit_weekdays": dict(commit_weekdays), | |
| "avg_additions": sum(c["changes"]["additions"] for c in commit_data) / total_commits if total_commits else 0, | |
| "avg_deletions": sum(c["changes"]["deletions"] for c in commit_data) / total_commits if total_commits else 0, | |
| "commit_frequency": defaultdict(int, dict(commit_authors)), | |
| "recent_activity": recent_activity, | |
| "releases": releases, | |
| } | |
| except Exception as e: | |
| print(f"Error in commit analysis: {str(e)}") | |
| return { | |
| "total_commits": 0, | |
| "commit_hours": {}, | |
| "commit_weekdays": {}, | |
| "avg_additions": 0, | |
| "avg_deletions": 0, | |
| "commit_frequency": {}, | |
| "recent_activity": [], | |
| "releases": [], | |
| } | |
| def _analyze_contributors(self) -> Dict[str, Any]: | |
| """Analyze contributor statistics""" | |
| contributor_data = [] | |
| top_contributors = [] | |
| try: | |
| contributors = list(self.repo.get_contributors()) | |
| # Get all contributors | |
| for contributor in contributors: | |
| contributor_data.append({ | |
| "login": contributor.login, | |
| "contributions": contributor.contributions, | |
| "type": contributor.type, | |
| "url": contributor.html_url, | |
| }) | |
| # Sort by contributions and get top 5 | |
| top_contributors = sorted( | |
| contributor_data, | |
| key=lambda x: x["contributions"], | |
| reverse=True | |
| )[:5] | |
| except Exception as e: | |
| print(f"Error analyzing contributors: {str(e)}") | |
| return { | |
| "total_contributors": len(contributor_data), | |
| "contributors": contributor_data, | |
| "top_contributors": top_contributors, | |
| } | |
| def _analyze_pull_requests(self) -> Dict[str, Any]: | |
| """Analyze pull request patterns""" | |
| pr_data = { | |
| "open_prs": 0, | |
| "closed_prs": 0, | |
| "merged_prs": 0, | |
| "recent_prs": [], | |
| } | |
| try: | |
| # Count open PRs | |
| open_prs = self.repo.get_pulls(state='open') | |
| pr_data["open_prs"] = open_prs.totalCount | |
| # Count closed PRs | |
| closed_prs = self.repo.get_pulls(state='closed') | |
| pr_data["closed_prs"] = closed_prs.totalCount | |
| # Get recent PRs (last 5) | |
| recent_prs = list(self.repo.get_pulls(state='all')[:5]) | |
| for pr in recent_prs: | |
| pr_data["recent_prs"].append({ | |
| "number": pr.number, | |
| "title": pr.title, | |
| "state": pr.state, | |
| "created_at": pr.created_at.isoformat(), | |
| "author": pr.user.login if pr.user else "Unknown", | |
| "is_merged": pr.merged, | |
| "url": pr.html_url, | |
| }) | |
| # Count merged PRs from the sample | |
| if pr.merged: | |
| pr_data["merged_prs"] += 1 | |
| except Exception as e: | |
| print(f"Error analyzing pull requests: {str(e)}") | |
| return pr_data | |
| def _analyze_issues(self) -> Dict[str, Any]: | |
| """Analyze issue patterns""" | |
| issue_data = { | |
| "open_issues": 0, | |
| "closed_issues": 0, | |
| "recent_issues": [], | |
| } | |
| try: | |
| # Count open issues | |
| open_issues = self.repo.get_issues(state='open') | |
| issue_data["open_issues"] = open_issues.totalCount | |
| # Count closed issues | |
| closed_issues = self.repo.get_issues(state='closed') | |
| issue_data["closed_issues"] = closed_issues.totalCount | |
| # Get recent issues (last 5) | |
| recent_issues = list(self.repo.get_issues(state='all')[:5]) | |
| for issue in recent_issues: | |
| # Skip pull requests (which are also returned as issues) | |
| if issue.pull_request is not None: | |
| continue | |
| issue_data["recent_issues"].append({ | |
| "number": issue.number, | |
| "title": issue.title, | |
| "state": issue.state, | |
| "created_at": issue.created_at.isoformat(), | |
| "author": issue.user.login if issue.user else "Unknown", | |
| "labels": [label.name for label in issue.labels], | |
| "url": issue.html_url, | |
| }) | |
| except Exception as e: | |
| print(f"Error analyzing issues: {str(e)}") | |
| return issue_data | |
| def get_file_content(self, file_path: str) -> str: | |
| """Get content of a specific file, using cache if available""" | |
| if file_path in self.file_content_cache: | |
| return self.file_content_cache[file_path] | |
| try: | |
| content = self.repo.get_contents(file_path) | |
| file_content = base64.b64decode(content.content).decode('utf-8') | |
| self.file_content_cache[file_path] = file_content | |
| return file_content | |
| except Exception as e: | |
| print(f"Error getting file content for {file_path}: {str(e)}") | |
| return f"Error: Could not retrieve file content: {str(e)}" | |
| def search_code(self, query: str) -> List[Dict[str, Any]]: | |
| """Search for code in the repository""" | |
| results = [] | |
| try: | |
| # Use GitHub search API | |
| code_results = self.gh.search_code(f"repo:{self.owner}/{self.repo_name} {query}") | |
| for item in code_results[:10]: # Limit to 10 results | |
| try: | |
| file_content = self.get_file_content(item.path) | |
| # Find matching lines | |
| lines = file_content.splitlines() | |
| matching_lines = [] | |
| for i, line in enumerate(lines): | |
| if query.lower() in line.lower(): | |
| start_line = max(0, i - 2) | |
| end_line = min(len(lines), i + 3) | |
| context = "\n".join(lines[start_line:end_line]) | |
| matching_lines.append({ | |
| "line_number": i + 1, | |
| "line": line, | |
| "context": context | |
| }) | |
| results.append({ | |
| "path": item.path, | |
| "url": item.html_url, | |
| "matching_lines": matching_lines[:3], # Limit to 3 matches per file | |
| }) | |
| except Exception as e: | |
| print(f"Error processing search result {item.path}: {str(e)}") | |
| continue | |
| except Exception as e: | |
| print(f"Error searching code: {str(e)}") | |
| return results | |
| def get_file_list(self, pattern: Optional[str] = None) -> List[str]: | |
| """Get list of files in the repository, optionally filtered by pattern""" | |
| files = [] | |
| try: | |
| queue = [("", "")] # (path, directory) | |
| while queue: | |
| base_path, dir_path = queue.pop(0) | |
| full_path = f"{base_path}/{dir_path}".strip("/") | |
| try: | |
| contents = self.repo.get_contents(full_path or "") | |
| for content in contents: | |
| if content.type == "dir": | |
| new_base = full_path | |
| queue.append((new_base, content.name)) | |
| else: | |
| file_path = f"{full_path}/{content.name}" if full_path else content.name | |
| # Filter by pattern if provided | |
| if not pattern or re.search(pattern, file_path, re.IGNORECASE): | |
| files.append(file_path) | |
| except Exception as e: | |
| print(f"Error listing files in {full_path}: {str(e)}") | |
| continue | |
| except Exception as e: | |
| print(f"Error getting file list: {str(e)}") | |
| return files | |
| # Gemini Voice Handler | |
| class GeminiHandler(AsyncStreamHandler): | |
| """Handler for the Gemini API voice chat""" | |
| def __init__( | |
| self, | |
| expected_layout: Literal["mono"] = "mono", | |
| output_sample_rate: int = 24000, | |
| output_frame_size: int = 480, | |
| analysis_data: Optional[Dict[str, Any]] = None, | |
| system_prompt: Optional[str] = None, | |
| ) -> None: | |
| super().__init__( | |
| expected_layout, | |
| output_sample_rate, | |
| output_frame_size, | |
| input_sample_rate=16000, | |
| ) | |
| self.input_queue: asyncio.Queue = asyncio.Queue() | |
| self.output_queue: asyncio.Queue = asyncio.Queue() | |
| self.quit: asyncio.Event = asyncio.Event() | |
| self.analysis_data = analysis_data or {} | |
| self.system_prompt = system_prompt or "" | |
| def copy(self) -> "GeminiHandler": | |
| return GeminiHandler( | |
| expected_layout="mono", | |
| output_sample_rate=self.output_sample_rate, | |
| output_frame_size=self.output_frame_size, | |
| analysis_data=self.analysis_data, | |
| system_prompt=self.system_prompt, | |
| ) | |
| def set_context(self, analysis_data: Dict[str, Any], system_prompt: str): | |
| """Set the repository analysis context for voice chat""" | |
| self.analysis_data = analysis_data | |
| self.system_prompt = system_prompt | |
| async def start_up(self): | |
| if not self.phone_mode: | |
| await self.wait_for_args() | |
| api_key, voice_name = self.latest_args[1:] | |
| else: | |
| api_key, voice_name = None, "Puck" | |
| client = genai.Client( | |
| api_key=api_key or os.getenv("GEMINI_API_KEY"), | |
| http_options={"api_version": "v1alpha"}, | |
| ) | |
| # Add context prefix if available | |
| context_prefix = "" | |
| if self.analysis_data and self.system_prompt: | |
| context_prefix = f""" | |
| {self.system_prompt} | |
| Repository Analysis Data: | |
| {json.dumps(self.analysis_data, indent=2)} | |
| Answer questions about this repository analysis. You are now in voice-based conversation mode. | |
| """ | |
| config = LiveConnectConfig( | |
| response_modalities=["AUDIO"], # type: ignore | |
| speech_config=SpeechConfig( | |
| voice_config=VoiceConfig( | |
| prebuilt_voice_config=PrebuiltVoiceConfig( | |
| voice_name=voice_name, | |
| ) | |
| ) | |
| ), | |
| prefix=context_prefix, | |
| ) | |
| try: | |
| async with client.aio.live.connect( | |
| model="gemini-2.0-flash-exp", config=config | |
| ) as session: | |
| async for audio in session.start_stream( | |
| stream=self.stream(), mime_type="audio/pcm" | |
| ): | |
| if audio.data: | |
| array = np.frombuffer(audio.data, dtype=np.int16) | |
| self.output_queue.put_nowait((self.output_sample_rate, array)) | |
| except Exception as e: | |
| print(f"Error in Gemini streaming session: {str(e)}") | |
| async def stream(self) -> AsyncGenerator[bytes, None]: | |
| while not self.quit.is_set(): | |
| try: | |
| audio = await asyncio.wait_for(self.input_queue.get(), 0.1) | |
| yield audio | |
| except (asyncio.TimeoutError, TimeoutError): | |
| pass | |
| async def receive(self, frame: tuple[int, np.ndarray]) -> None: | |
| _, array = frame | |
| array = array.squeeze() | |
| audio_message = encode_audio(array) | |
| self.input_queue.put_nowait(audio_message) | |
| async def emit(self) -> tuple[int, np.ndarray] | None: | |
| return await wait_for_item(self.output_queue) | |
| def shutdown(self) -> None: | |
| self.quit.set() | |
| # Function to analyze repository and generate summary | |
| def analyze_repository(repo_url: str, github_token: str, gemini_api_key: str, progress=None) -> Tuple[str, str, Any]: | |
| """Analyze repository and generate LLM summary with rate limit handling""" | |
| try: | |
| # Configure Gemini | |
| model = configure_gemini(gemini_api_key) | |
| # Initialize analyzer | |
| if progress: | |
| progress(0, desc="Initializing repository analysis...") | |
| analyzer = RepositoryAnalyzer(repo_url, github_token) | |
| # Perform analysis | |
| analysis_data = analyzer.analyze(progress) | |
| # Generate LLM summary | |
| if progress: | |
| progress(0.95, desc="Generating analysis summary...") | |
| system_prompt = """You are an expert code analyst with deep experience in software architecture, development practices, and team dynamics. Analyze the provided repository data and create a detailed, insightful analysis using the following markdown template: | |
| # Repository Analysis | |
| ## π Project Overview | |
| [Provide a comprehensive overview including: | |
| - Project purpose and scope | |
| - Age and maturity of the project | |
| - Current activity level and maintenance status | |
| - Key metrics (stars, forks, etc.) | |
| - Primary technologies and languages used] | |
| ## ποΈ Architecture and Code Organization | |
| [Analyze in detail: | |
| - Repository structure and organization | |
| - Code distribution across different technologies | |
| - File and directory organization patterns | |
| - Project size and complexity metrics | |
| - Code modularity and component structure | |
| - Presence of key architectural patterns] | |
| ## π» Development Practices & Code Quality | |
| [Evaluate: | |
| - Coding standards and consistency | |
| - Code complexity and maintainability metrics | |
| - Documentation practices | |
| - Testing approach and coverage (if visible) | |
| - Error handling and logging practices | |
| - Use of design patterns and best practices] | |
| ## π Development Workflow & History | |
| [Analyze: | |
| - Commit patterns and frequency | |
| - Release cycles and versioning | |
| - Branch management strategy | |
| - Code review practices | |
| - Continuous integration/deployment indicators | |
| - Peak development periods and cycles] | |
| ## π₯ Team Dynamics & Collaboration | |
| [Examine: | |
| - Team size and composition | |
| - Contribution patterns | |
| - Core maintainer identification | |
| - Community engagement level | |
| - Communication patterns | |
| - Collaboration efficiency] | |
| ## π§ Technical Depth & Innovation | |
| [Assess: | |
| - Technical sophistication level | |
| - Innovative approaches or solutions | |
| - Complex problem-solving examples | |
| - Performance optimization efforts | |
| - Security considerations | |
| - Scalability approach] | |
| ## π Project Health & Sustainability | |
| [Evaluate: | |
| - Project momentum and growth trends | |
| - Maintenance patterns | |
| - Community health indicators | |
| - Documentation completeness | |
| - Onboarding friendliness | |
| - Long-term viability indicators] | |
| ## π‘ Key Insights & Recommendations | |
| [Provide: | |
| - 3-5 key strengths identified | |
| - 3-5 potential improvement areas | |
| - Notable patterns or practices | |
| - Unique characteristics | |
| - Strategic recommendations] | |
| Please provide detailed analysis for each section while maintaining the formatting and emojis. Support insights with specific metrics and examples from the repository data where possible.""" | |
| chat = model.start_chat(history=[]) | |
| response = chat.send_message(f"{system_prompt}\n\nRepository Analysis Data:\n{json.dumps(analysis_data, indent=2)}") | |
| # Save analysis data | |
| if progress: | |
| progress(0.98, desc="Saving analysis results...") | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: | |
| json.dump(analysis_data, f, indent=2) | |
| analysis_file = f.name | |
| return response.text, analysis_file, analyzer, system_prompt | |
| except Exception as e: | |
| error_message = f"Error analyzing repository: {str(e)}" | |
| raise Exception(error_message) | |
| # Function to create a chat session and ask questions | |
| def create_chat_session(gemini_api_key: str) -> Any: | |
| """Create a new chat session for follow-up questions""" | |
| genai.configure(api_key=gemini_api_key) | |
| return genai.GenerativeModel( | |
| model_name="gemini-1.5-pro-latest", | |
| generation_config={ | |
| 'temperature': 0.7, | |
| 'top_p': 0.8, | |
| 'top_k': 40, | |
| 'max_output_tokens': 4096, | |
| } | |
| ) | |
| def ask_question(question: str, analysis_file: str, analyzer: RepositoryAnalyzer, gemini_api_key: str, | |
| chat_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]: | |
| """Process a follow-up question about the analysis""" | |
| if not analysis_file or not analyzer: | |
| return chat_history + [(question, "Please analyze a repository first before asking questions.")] | |
| try: | |
| # Load analysis data | |
| with open(analysis_file, 'r') as f: | |
| analysis_data = json.load(f) | |
| # Initialize chat model | |
| model = create_chat_session(gemini_api_key) | |
| # Check if this is a file content request | |
| file_request_match = re.search(r"(show|view|get|display|content of|code for)\s+(?:the\s+)?(?:file\s+)?['\"]?([^'\"]+?)['\"]?(?:\s+file)?", | |
| question.lower()) | |
| if file_request_match: | |
| file_path = file_request_match.group(2).strip() | |
| # Try to find the exact file | |
| all_files = analyzer.get_file_list() | |
| # Check for exact match | |
| if file_path in all_files: | |
| file_content = analyzer.get_file_content(file_path) | |
| return chat_history + [(question, f"Here's the content of `{file_path}`:\n\n```\n{file_content}\n```")] | |
| # Check for partial match | |
| matching_files = [f for f in all_files if file_path.lower() in f.lower()] | |
| if matching_files: | |
| if len(matching_files) == 1: | |
| file_content = analyzer.get_file_content(matching_files[0]) | |
| return chat_history + [(question, f"Here's the content of `{matching_files[0]}`:\n\n```\n{file_content}\n```")] | |
| else: | |
| file_list = "\n".join([f"- {f}" for f in matching_files[:10]]) | |
| return chat_history + [(question, f"I found multiple files matching '{file_path}'. Please specify which one you'd like to see:\n\n{file_list}{' and more...' if len(matching_files) > 10 else ''}")] | |
| # Check if this is a code search request | |
| search_request_match = re.search(r"(search|find|look for|where is)\s+(?:for\s+)?['\"]?([^'\"]+?)['\"]?(?:\s+in the code)?", | |
| question.lower()) | |
| if search_request_match: | |
| search_query = search_request_match.group(2).strip() | |
| search_results = analyzer.search_code(search_query) | |
| if search_results: | |
| result_text = f"I found {len(search_results)} files containing '{search_query}':\n\n" | |
| for result in search_results: | |
| result_text += f"**File: {result['path']}**\n" | |
| if result['matching_lines']: | |
| for match in result['matching_lines']: | |
| result_text += f"Line {match['line_number']}: `{match['line'].strip()}`\n" | |
| result_text += "\n" | |
| else: | |
| result_text += "No specific line matches found.\n\n" | |
| return chat_history + [(question, result_text)] | |
| else: | |
| return chat_history + [(question, f"I couldn't find any code matching '{search_query}' in the repository.")] | |
| # For general questions, use the AI | |
| # Build context from chat history and current question | |
| context = "You are an expert code analyst helping users understand repository analysis results.\n\n" | |
| context += f"Repository Analysis Data:\n{json.dumps(analysis_data, indent=2)}\n\n" | |
| # Add chat history context | |
| if chat_history: | |
| context += "Previous conversation:\n" | |
| for user_msg, assistant_msg in chat_history[-5:]: # Include last 5 messages only | |
| context += f"User: {user_msg}\nAssistant: {assistant_msg}\n" | |
| # Add current question | |
| prompt = context + f"\nUser: {question}\nPlease provide your analysis based on the repository data:" | |
| # Get response | |
| response = model.generate_content(prompt) | |
| # Return in the correct tuple format for Gradio chatbot | |
| return chat_history + [(question, response.text)] | |
| except Exception as e: | |
| error_message = f"Error processing question: {str(e)}" | |
| return chat_history + [(question, error_message)] | |
| # Input data models | |
| class InputData(BaseModel): | |
| webrtc_id: str | |
| voice_name: str | |
| api_key: str | |
| repo_url: Optional[str] = None | |
| github_token: Optional[str] = None | |
| # Create FastAPI app and set up routes | |
| app = FastAPI() | |
| # Create Gemini handler for voice chat | |
| gemini_handler = GeminiHandler() | |
| # Create voice chat stream | |
| voice_stream = Stream( | |
| modality="audio", | |
| mode="send-receive", | |
| handler=gemini_handler, | |
| rtc_configuration=get_twilio_turn_credentials() if get_space() else None, | |
| concurrency_limit=5 if get_space() else None, | |
| time_limit=120 if get_space() else None, | |
| additional_inputs=[ | |
| gr.Textbox( | |
| label="Gemini API Key", | |
| type="password", | |
| value=os.getenv("GEMINI_API_KEY") if not get_space() else "", | |
| ), | |
| gr.Dropdown( | |
| label="Voice", | |
| choices=[ | |
| "Puck", | |
| "Charon", | |
| "Kore", | |
| "Fenrir", | |
| "Aoede", | |
| ], | |
| value="Puck", | |
| ), | |
| ], | |
| ) | |
| # Mount voice stream to app | |
| voice_stream.mount(app) | |
| # Current repository analysis data | |
| current_analysis = { | |
| "data": None, | |
| "analyzer": None, | |
| "file": None, | |
| "summary": None, | |
| "system_prompt": None, | |
| } | |
| async def _(body: InputData): | |
| voice_stream.set_input(body.webrtc_id, body.api_key, body.voice_name) | |
| # If repo data is provided, analyze it and update the context | |
| if body.repo_url and body.github_token and current_analysis["data"] is None: | |
| try: | |
| # Analyze the repository in a background task to not block the voice connection | |
| asyncio.create_task(analyze_and_update_context(body.repo_url, body.github_token, body.api_key)) | |
| except Exception as e: | |
| print(f"Error analyzing repository: {str(e)}") | |
| # Update handler context if analysis data exists | |
| if current_analysis["data"] and current_analysis["system_prompt"]: | |
| gemini_handler.set_context(current_analysis["data"], current_analysis["system_prompt"]) | |
| return {"status": "ok"} | |
| async def analyze_repo(repo_url: str, github_token: str, gemini_api_key: str): | |
| try: | |
| summary, file_path, analyzer, system_prompt = await asyncio.to_thread( | |
| analyze_repository, repo_url, github_token, gemini_api_key | |
| ) | |
| # Load analysis data from file | |
| with open(file_path, 'r') as f: | |
| analysis_data = json.load(f) | |
| # Update current analysis | |
| current_analysis["data"] = analysis_data | |
| current_analysis["analyzer"] = analyzer | |
| current_analysis["file"] = file_path | |
| current_analysis["summary"] = summary | |
| current_analysis["system_prompt"] = system_prompt | |
| # Update handler context | |
| gemini_handler.set_context(analysis_data, system_prompt) | |
| return { | |
| "status": "success", | |
| "summary": summary, | |
| "file_path": file_path | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": str(e) | |
| } | |
| async def analyze_and_update_context(repo_url: str, github_token: str, gemini_api_key: str): | |
| try: | |
| summary, file_path, analyzer, system_prompt = await asyncio.to_thread( | |
| analyze_repository, repo_url, github_token, gemini_api_key | |
| ) | |
| # Load analysis data from file | |
| with open(file_path, 'r') as f: | |
| analysis_data = json.load(f) | |
| # Update current analysis | |
| current_analysis["data"] = analysis_data | |
| current_analysis["analyzer"] = analyzer | |
| current_analysis["file"] = file_path | |
| current_analysis["summary"] = summary | |
| current_analysis["system_prompt"] = system_prompt | |
| # Update handler context | |
| gemini_handler.set_context(analysis_data, system_prompt) | |
| except Exception as e: | |
| print(f"Error analyzing repository in background: {str(e)}") | |
| async def ask_repo_question(question: str): | |
| if not current_analysis["file"] or not current_analysis["analyzer"]: | |
| return { | |
| "status": "error", | |
| "message": "Please analyze a repository first before asking questions." | |
| } | |
| try: | |
| response = await asyncio.to_thread( | |
| ask_question, | |
| question, | |
| current_analysis["file"], | |
| current_analysis["analyzer"], | |
| GEMINI_API_KEY, | |
| [] | |
| ) | |
| # Extract just the response text | |
| _, answer = response[0] | |
| return { | |
| "status": "success", | |
| "answer": answer | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": str(e) | |
| } | |
| async def index(): | |
| rtc_config = get_twilio_turn_credentials() if get_space() else None | |
| # Check if index.html exists | |
| if not index_html_path.exists(): | |
| # Create basic HTML if not exists | |
| html_content = """<!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>GitHub Repository Analyzer with Voice Chat</title> | |
| <style> | |
| :root { | |
| --color-accent: #6366f1; | |
| --color-background: #0f172a; | |
| --color-surface: #1e293b; | |
| --color-text: #e2e8f0; | |
| --boxSize: 8px; | |
| --gutter: 4px; | |
| } | |
| body { | |
| margin: 0; | |
| padding: 0; | |
| background-color: var(--color-background); | |
| color: var(--color-text); | |
| font-family: system-ui, -apple-system, sans-serif; | |
| min-height: 100vh; | |
| display: flex; | |
| flex-direction: column; | |
| align-items: center; | |
| justify-content: center; | |
| } | |
| .container { | |
| width: 90%; | |
| max-width: 800px; | |
| background-color: var(--color-surface); | |
| padding: 2rem; | |
| border-radius: 1rem; | |
| box-shadow: 0 25px 50px -12px rgba(0, 0, 0, 0.25); | |
| } | |
| .wave-container { | |
| position: relative; | |
| display: flex; | |
| min-height: 100px; | |
| max-height: 128px; | |
| justify-content: center; | |
| align-items: center; | |
| margin: 2rem 0; | |
| } | |
| .box-container { | |
| display: flex; | |
| justify-content: space-between; | |
| height: 64px; | |
| width: 100%; | |
| } | |
| .box { | |
| height: 100%; | |
| width: var(--boxSize); | |
| background: var(--color-accent); | |
| border-radius: 8px; | |
| transition: transform 0.05s ease; | |
| } | |
| .controls { | |
| display: grid; | |
| gap: 1rem; | |
| margin-bottom: 2rem; | |
| } | |
| .input-group { | |
| display: flex; | |
| flex-direction: column; | |
| gap: 0.5rem; | |
| } | |
| label { | |
| font-size: 0.875rem; | |
| font-weight: 500; | |
| } | |
| input, | |
| select { | |
| padding: 0.75rem; | |
| border-radius: 0.5rem; | |
| border: 1px solid rgba(255, 255, 255, 0.1); | |
| background-color: var(--color-background); | |
| color: var(--color-text); | |
| font-size: 1rem; | |
| } | |
| button { | |
| padding: 1rem 2rem; | |
| border-radius: 0.5rem; | |
| border: none; | |
| background-color: var(--color-accent); | |
| color: white; | |
| font-weight: 600; | |
| cursor: pointer; | |
| transition: all 0.2s ease; | |
| } | |
| button:hover { | |
| opacity: 0.9; | |
| transform: translateY(-1px); | |
| } | |
| .icon-with-spinner { | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| gap: 12px; | |
| min-width: 180px; | |
| } | |
| .spinner { | |
| width: 20px; | |
| height: 20px; | |
| border: 2px solid white; | |
| border-top-color: transparent; | |
| border-radius: 50%; | |
| animation: spin 1s linear infinite; | |
| flex-shrink: 0; | |
| } | |
| @keyframes spin { | |
| to { | |
| transform: rotate(360deg); | |
| } | |
| } | |
| .pulse-container { | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| gap: 12px; | |
| min-width: 180px; | |
| } | |
| .pulse-circle { | |
| width: 20px; | |
| height: 20px; | |
| border-radius: 50%; | |
| background-color: white; | |
| opacity: 0.2; | |
| flex-shrink: 0; | |
| transform: translateX(-0%) scale(var(--audio-level, 1)); | |
| transition: transform 0.1s ease; | |
| } | |
| /* Add styles for toast notifications */ | |
| .toast { | |
| position: fixed; | |
| top: 20px; | |
| left: 50%; | |
| transform: translateX(-50%); | |
| padding: 16px 24px; | |
| border-radius: 4px; | |
| font-size: 14px; | |
| z-index: 1000; | |
| display: none; | |
| box-shadow: 0 2px 5px rgba(0, 0, 0, 0.2); | |
| } | |
| .toast.error { | |
| background-color: #f44336; | |
| color: white; | |
| } | |
| .toast.warning { | |
| background-color: #ffd700; | |
| color: black; | |
| } | |
| .tabs { | |
| display: flex; | |
| margin-bottom: 1rem; | |
| } | |
| .tab { | |
| padding: 0.5rem 1rem; | |
| cursor: pointer; | |
| border-bottom: 2px solid transparent; | |
| transition: all 0.2s ease; | |
| } | |
| .tab.active { | |
| border-bottom: 2px solid var(--color-accent); | |
| color: var(--color-accent); | |
| } | |
| .tab-content { | |
| display: none; | |
| } | |
| .tab-content.active { | |
| display: block; | |
| } | |
| .chat-container { | |
| height: 300px; | |
| overflow-y: auto; | |
| border: 1px solid rgba(255, 255, 255, 0.1); | |
| border-radius: 0.5rem; | |
| padding: 1rem; | |
| margin-bottom: 1rem; | |
| background-color: rgba(0, 0, 0, 0.2); | |
| } | |
| .chat-message { | |
| margin-bottom: 1rem; | |
| padding-bottom: 1rem; | |
| border-bottom: 1px solid rgba(255, 255, 255, 0.1); | |
| } | |
| .chat-message:last-child { | |
| border-bottom: none; | |
| } | |
| .chat-message .user { | |
| font-weight: bold; | |
| color: var(--color-accent); | |
| margin-bottom: 0.5rem; | |
| } | |
| .chat-message .bot { | |
| font-weight: bold; | |
| color: #10b981; | |
| margin-bottom: 0.5rem; | |
| } | |
| .chat-input { | |
| display: flex; | |
| gap: 0.5rem; | |
| } | |
| .chat-input input { | |
| flex: 1; | |
| } | |
| </style> | |
| </head> | |
| <body> | |
| <!-- Add toast element after body opening tag --> | |
| <div id="error-toast" class="toast"></div> | |
| <div style="text-align: center"> | |
| <h1>GitHub Repository Analyzer with Voice Chat</h1> | |
| <p>Analyze GitHub repositories and chat with the AI using voice or text</p> | |
| </div> | |
| <div class="container"> | |
| <div class="tabs"> | |
| <div class="tab active" data-tab="repo">Repository Setup</div> | |
| <div class="tab" data-tab="text">Text Chat</div> | |
| <div class="tab" data-tab="voice">Voice Chat</div> | |
| </div> | |
| <div class="tab-content active" id="repo-tab"> | |
| <div class="controls"> | |
| <div class="input-group"> | |
| <label for="github-token">GitHub API Token</label> | |
| <input type="password" id="github-token" placeholder="Enter your GitHub API token"> | |
| </div> | |
| <div class="input-group"> | |
| <label for="gemini-api-key">Gemini API Key</label> | |
| <input type="password" id="gemini-api-key" placeholder="Enter your Gemini API key"> | |
| </div> | |
| <div class="input-group"> | |
| <label for="repo-url">GitHub Repository URL</label> | |
| <input type="text" id="repo-url" placeholder="https://github.com/owner/repo"> | |
| </div> | |
| <button id="analyze-button">Analyze Repository</button> | |
| </div> | |
| <div id="analysis-result" style="white-space: pre-wrap; max-height: 400px; overflow-y: auto; display: none;"></div> | |
| </div> | |
| <div class="tab-content" id="text-tab"> | |
| <div id="chat-container" class="chat-container"></div> | |
| <div class="chat-input"> | |
| <input type="text" id="text-question" placeholder="Ask a question about the repository..."> | |
| <button id="ask-button">Ask</button> | |
| </div> | |
| </div> | |
| <div class="tab-content" id="voice-tab"> | |
| <div class="controls"> | |
| <div class="input-group"> | |
| <label for="voice">Voice</label> | |
| <select id="voice"> | |
| <option value="Puck">Puck</option> | |
| <option value="Charon">Charon</option> | |
| <option value="Kore">Kore</option> | |
| <option value="Fenrir">Fenrir</option> | |
| <option value="Aoede">Aoede</option> | |
| </select> | |
| </div> | |
| </div> | |
| <div class="wave-container"> | |
| <div class="box-container"> | |
| <!-- Boxes will be dynamically added here --> | |
| </div> | |
| </div> | |
| <button id="start-button">Start Voice Chat</button> | |
| </div> | |
| </div> | |
| <audio id="audio-output"></audio> | |
| <script> | |
| // Global variables | |
| let peerConnection; | |
| let audioContext; | |
| let dataChannel; | |
| let isRecording = false; | |
| let webrtc_id; | |
| let currentTab = 'repo'; | |
| let repositoryAnalyzed = false; | |
| // DOM elements | |
| const startButton = document.getElementById('start-button'); | |
| const analyzeButton = document.getElementById('analyze-button'); | |
| const askButton = document.getElementById('ask-button'); | |
| const repoUrlInput = document.getElementById('repo-url'); | |
| const githubTokenInput = document.getElementById('github-token'); | |
| const geminiApiKeyInput = document.getElementById('gemini-api-key'); | |
| const voiceSelect = document.getElementById('voice'); | |
| const audioOutput = document.getElementById('audio-output'); | |
| const boxContainer = document.querySelector('.box-container'); | |
| const tabs = document.querySelectorAll('.tab'); | |
| const tabContents = document.querySelectorAll('.tab-content'); | |
| const chatContainer = document.getElementById('chat-container'); | |
| const textQuestionInput = document.getElementById('text-question'); | |
| const analysisResult = document.getElementById('analysis-result'); | |
| // Initialize audio visualization | |
| const numBars = 32; | |
| for (let i = 0; i < numBars; i++) { | |
| const box = document.createElement('div'); | |
| box.className = 'box'; | |
| boxContainer.appendChild(box); | |
| } | |
| // Tab switching | |
| tabs.forEach(tab => { | |
| tab.addEventListener('click', () => { | |
| // First check if repository has been analyzed when switching to chat tabs | |
| if ((tab.dataset.tab === 'text' || tab.dataset.tab === 'voice') && !repositoryAnalyzed) { | |
| showError('Please analyze a repository first before chatting.'); | |
| return; | |
| } | |
| // Deactivate all tabs | |
| tabs.forEach(t => t.classList.remove('active')); | |
| tabContents.forEach(c => c.classList.remove('active')); | |
| // Activate selected tab | |
| tab.classList.add('active'); | |
| document.getElementById(`${tab.dataset.tab}-tab`).classList.add('active'); | |
| currentTab = tab.dataset.tab; | |
| }); | |
| }); | |
| // Error message display | |
| function showError(message) { | |
| const toast = document.getElementById('error-toast'); | |
| toast.textContent = message; | |
| toast.className = 'toast error'; | |
| toast.style.display = 'block'; | |
| // Hide toast after 5 seconds | |
| setTimeout(() => { | |
| toast.style.display = 'none'; | |
| }, 5000); | |
| } | |
| // Repository analysis | |
| analyzeButton.addEventListener('click', async () => { | |
| const repoUrl = repoUrlInput.value.trim(); | |
| const githubToken = githubTokenInput.value.trim(); | |
| const geminiApiKey = geminiApiKeyInput.value.trim(); | |
| if (!repoUrl || !githubToken || !geminiApiKey) { | |
| showError('Please fill in all fields'); | |
| return; | |
| } | |
| try { | |
| analyzeButton.disabled = true; | |
| analyzeButton.textContent = 'Analyzing...'; | |
| // Call the analyze endpoint | |
| const response = await fetch('/analyze_repository', { | |
| method: 'POST', | |
| headers: { | |
| 'Content-Type': 'application/json', | |
| }, | |
| body: JSON.stringify({ | |
| repo_url: repoUrl, | |
| github_token: githubToken, | |
| gemini_api_key: geminiApiKey | |
| }) | |
| }); | |
| const data = await response.json(); | |
| if (data.status === 'success') { | |
| // Display the analysis result | |
| analysisResult.textContent = data.summary; | |
| analysisResult.style.display = 'block'; | |
| repositoryAnalyzed = true; | |
| } else { | |
| showError(data.message || 'Error analyzing repository'); | |
| } | |
| } catch (err) { | |
| showError('Failed to analyze repository: ' + err.message); | |
| } finally { | |
| analyzeButton.disabled = false; | |
| analyzeButton.textContent = 'Analyze Repository'; | |
| } | |
| }); | |
| // Text chat | |
| askButton.addEventListener('click', async () => { | |
| const question = textQuestionInput.value.trim(); | |
| if (!question) { | |
| return; | |
| } | |
| // Add user message to chat | |
| addMessageToChat('You', question); | |
| textQuestionInput.value = ''; | |
| try { | |
| // Call the ask endpoint | |
| const response = await fetch('/ask_question', { | |
| method: 'POST', | |
| headers: { | |
| 'Content-Type': 'application/json', | |
| }, | |
| body: JSON.stringify({ | |
| question: question | |
| }) | |
| }); | |
| const data = await response.json(); | |
| if (data.status === 'success') { | |
| // Add bot response to chat | |
| addMessageToChat('AI', data.answer); | |
| } else { | |
| addMessageToChat('AI', 'Error: ' + (data.message || 'Failed to get response')); | |
| } | |
| } catch (err) { | |
| addMessageToChat('AI', 'Error: ' + err.message); | |
| } | |
| }); | |
| // Add message to chat container | |
| function addMessageToChat(sender, message) { | |
| const messageDiv = document.createElement('div'); | |
| messageDiv.className = 'chat-message'; | |
| const senderDiv = document.createElement('div'); | |
| senderDiv.className = sender === 'You' ? 'user' : 'bot'; | |
| senderDiv.textContent = sender + ':'; | |
| const contentDiv = document.createElement('div'); | |
| contentDiv.innerHTML = formatMessage(message); | |
| messageDiv.appendChild(senderDiv); | |
| messageDiv.appendChild(contentDiv); | |
| chatContainer.appendChild(messageDiv); | |
| chatContainer.scrollTop = chatContainer.scrollHeight; | |
| } | |
| // Format message with Markdown-like syntax | |
| function formatMessage(message) { | |
| // Convert code blocks | |
| message = message.replace(/```([^`]+)```/g, '<pre><code>$1</code></pre>'); | |
| // Convert inline code | |
| message = message.replace(/`([^`]+)`/g, '<code>$1</code>'); | |
| // Convert bold | |
| message = message.replace(/\*\*([^*]+)\*\*/g, '<strong>$1</strong>'); | |
| // Convert bullets | |
| message = message.replace(/- ([^\n]+)/g, 'β’ $1<br>'); | |
| // Convert line breaks | |
| message = message.replace(/\n/g, '<br>'); | |
| return message; | |
| } | |
| // Voice chat WebRTC setup | |
| async function setupWebRTC() { | |
| const config = __RTC_CONFIGURATION__; | |
| peerConnection = new RTCPeerConnection(config); | |
| webrtc_id = Math.random().toString(36).substring(7); | |
| const timeoutId = setTimeout(() => { | |
| const toast = document.getElementById('error-toast'); | |
| toast.textContent = "Connection is taking longer than usual. Are you on a VPN?"; | |
| toast.className = 'toast warning'; | |
| toast.style.display = 'block'; | |
| // Hide warning after 5 seconds | |
| setTimeout(() => { | |
| toast.style.display = 'none'; | |
| }, 5000); | |
| }, 5000); | |
| try { | |
| const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); | |
| stream.getTracks().forEach(track => peerConnection.addTrack(track, stream)); | |
| // Update audio visualization setup | |
| audioContext = new AudioContext(); | |
| analyser_input = audioContext.createAnalyser(); | |
| const source = audioContext.createMediaStreamSource(stream); | |
| source.connect(analyser_input); | |
| analyser_input.fftSize = 64; | |
| dataArray_input = new Uint8Array(analyser_input.frequencyBinCount); | |
| function updateAudioLevel() { | |
| analyser_input.getByteFrequencyData(dataArray_input); | |
| const average = Array.from(dataArray_input).reduce((a, b) => a + b, 0) / dataArray_input.length; | |
| const audioLevel = average / 255; | |
| const pulseCircle = document.querySelector('.pulse-circle'); | |
| if (pulseCircle) { | |
| pulseCircle.style.setProperty('--audio-level', 1 + audioLevel); | |
| } | |
| animationId = requestAnimationFrame(updateAudioLevel); | |
| } | |
| updateAudioLevel(); | |
| // Add connection state change listener | |
| peerConnection.addEventListener('connectionstatechange', () => { | |
| console.log('connectionstatechange', peerConnection.connectionState); | |
| if (peerConnection.connectionState === 'connected') { | |
| clearTimeout(timeoutId); | |
| const toast = document.getElementById('error-toast'); | |
| toast.style.display = 'none'; | |
| } | |
| updateButtonState(); | |
| }); | |
| // Handle incoming audio | |
| peerConnection.addEventListener('track', (evt) => { | |
| if (audioOutput && audioOutput.srcObject !== evt.streams[0]) { | |
| audioOutput.srcObject = evt.streams[0]; | |
| audioOutput.play(); | |
| // Set up audio visualization on the output stream | |
| audioContext = new AudioContext(); | |
| analyser = audioContext.createAnalyser(); | |
| const source = audioContext.createMediaStreamSource(evt.streams[0]); | |
| source.connect(analyser); | |
| analyser.fftSize = 2048; | |
| dataArray = new Uint8Array(analyser.frequencyBinCount); | |
| updateVisualization(); | |
| } | |
| }); | |
| // Create data channel for messages | |
| dataChannel = peerConnection.createDataChannel('text'); | |
| dataChannel.onmessage = (event) => { | |
| const eventJson = JSON.parse(event.data); | |
| if (eventJson.type === "error") { | |
| showError(eventJson.message); | |
| } else if (eventJson.type === "send_input") { | |
| fetch('/input_hook', { | |
| method: 'POST', | |
| headers: { | |
| 'Content-Type': 'application/json', | |
| }, | |
| body: JSON.stringify({ | |
| webrtc_id: webrtc_id, | |
| api_key: geminiApiKeyInput.value, | |
| voice_name: voiceSelect.value, | |
| repo_url: repoUrlInput.value, | |
| github_token: githubTokenInput.value | |
| }) | |
| }); | |
| } | |
| }; | |
| // Create and send offer | |
| const offer = await peerConnection.createOffer(); | |
| await peerConnection.setLocalDescription(offer); | |
| await new Promise((resolve) => { | |
| if (peerConnection.iceGatheringState === "complete") { | |
| resolve(); | |
| } else { | |
| const checkState = () => { | |
| if (peerConnection.iceGatheringState === "complete") { | |
| peerConnection.removeEventListener("icegatheringstatechange", checkState); | |
| resolve(); | |
| } | |
| }; | |
| peerConnection.addEventListener("icegatheringstatechange", checkState); | |
| } | |
| }); | |
| const response = await fetch('/webrtc/offer', { | |
| method: 'POST', | |
| headers: { 'Content-Type': 'application/json' }, | |
| body: JSON.stringify({ | |
| sdp: peerConnection.localDescription.sdp, | |
| type: peerConnection.localDescription.type, | |
| webrtc_id: webrtc_id, | |
| }) | |
| }); | |
| const serverResponse = await response.json(); | |
| if (serverResponse.status === 'failed') { | |
| showError(serverResponse.meta.error === 'concurrency_limit_reached' | |
| ? `Too many connections. Maximum limit is ${serverResponse.meta.limit}` | |
| : serverResponse.meta.error); | |
| stop(); | |
| startButton.textContent = 'Start Voice Chat'; | |
| return; | |
| } | |
| await peerConnection.setRemoteDescription(serverResponse); | |
| } catch (err) { | |
| clearTimeout(timeoutId); | |
| console.error('Error setting up WebRTC:', err); | |
| showError('Failed to establish connection. Please try again.'); | |
| stop(); | |
| startButton.textContent = 'Start Voice Chat'; | |
| } | |
| } | |
| function updateButtonState() { | |
| if (peerConnection && (peerConnection.connectionState === 'connecting' || peerConnection.connectionState === 'new')) { | |
| startButton.innerHTML = ` | |
| <div class="icon-with-spinner"> | |
| <div class="spinner"></div> | |
| <span>Connecting...</span> | |
| </div> | |
| `; | |
| } else if (peerConnection && peerConnection.connectionState === 'connected') { | |
| startButton.innerHTML = ` | |
| <div class="pulse-container"> | |
| <div class="pulse-circle"></div> | |
| <span>Stop Voice Chat</span> | |
| </div> | |
| `; | |
| } else { | |
| startButton.innerHTML = 'Start Voice Chat'; | |
| } | |
| } | |
| function updateVisualization() { | |
| if (!analyser) return; | |
| analyser.getByteFrequencyData(dataArray); | |
| const bars = document.querySelectorAll('.box'); | |
| for (let i = 0; i < bars.length; i++) { | |
| const barHeight = (dataArray[i] / 255) * 2; | |
| bars[i].style.transform = `scaleY(${Math.max(0.1, barHeight)})`; | |
| } | |
| animationId = requestAnimationFrame(updateVisualization); | |
| } | |
| function stopWebRTC() { | |
| if (peerConnection) { | |
| peerConnection.close(); | |
| } | |
| if (animationId) { | |
| cancelAnimationFrame(animationId); | |
| } | |
| if (audioContext) { | |
| audioContext.close(); | |
| } | |
| updateButtonState(); | |
| } | |
| startButton.addEventListener('click', () => { | |
| if (!isRecording) { | |
| setupWebRTC(); | |
| startButton.classList.add('recording'); | |
| } else { | |
| stopWebRTC(); | |
| startButton.classList.remove('recording'); | |
| } | |
| isRecording = !isRecording; | |
| }); | |
| // Press Enter to send text chat message | |
| textQuestionInput.addEventListener('keypress', (e) => { | |
| if (e.key === 'Enter') { | |
| askButton.click(); | |
| } | |
| }); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| else: | |
| html_content = index_html_path.read_text() | |
| # Replace RTC configuration | |
| html_content = html_content.replace("__RTC_CONFIGURATION__", json.dumps(rtc_config)) | |
| return HTMLResponse(content=html_content) | |
| # Gradio interface for web UI | |
| def create_gradio_app(): | |
| with gr.Blocks(theme=gr.themes.Soft()) as app: | |
| gr.Markdown(""" | |
| # π GitHub Repository Analyzer with Voice Chat | |
| Analyze any public GitHub repository using AI. The tool will: | |
| 1. π Analyze repository structure, code, and development patterns | |
| 2. π‘ Generate comprehensive insights about the repository | |
| 3. π¬ Allow you to chat with AI about the repository using text or voice | |
| 4. π Search code and view files in the repository | |
| Enter a GitHub repository URL and your API keys to get started. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| repo_url = gr.Textbox( | |
| label="GitHub Repository URL", | |
| placeholder="https://github.com/owner/repo", | |
| value="" | |
| ) | |
| github_token = gr.Textbox( | |
| label="GitHub API Token", | |
| placeholder="Your GitHub API Token", | |
| type="password", | |
| value=GITHUB_TOKEN if GITHUB_TOKEN != "YOUR_GITHUB_TOKEN" else "" | |
| ) | |
| gemini_api_key = gr.Textbox( | |
| label="Gemini API Key", | |
| placeholder="Your Gemini API Key", | |
| type="password", | |
| value=GEMINI_API_KEY if GEMINI_API_KEY != "YOUR_GEMINI_API_KEY" else "" | |
| ) | |
| with gr.Row(): | |
| analyze_btn = gr.Button("π Analyze Repository", variant="primary") | |
| # Add status message | |
| status_msg = gr.Markdown("", elem_id="status_message") | |
| tabs = gr.Tabs() | |
| with tabs: | |
| with gr.TabItem("Analysis"): | |
| # Use Markdown for better formatting | |
| summary = gr.Markdown( | |
| label="Analysis Summary", | |
| value="" | |
| ) | |
| with gr.TabItem("Text Chat"): | |
| repo_chatbot = gr.Chatbot( | |
| label="Chat with Repository", | |
| height=400, | |
| show_label=True | |
| ) | |
| with gr.Row(): | |
| repo_question = gr.Textbox( | |
| label="Ask about the repository", | |
| placeholder="Ask a question, search code, or request a file...", | |
| scale=4 | |
| ) | |
| repo_ask_btn = gr.Button("π¬ Ask", variant="primary", scale=1) | |
| repo_clear_btn = gr.Button("ποΈ Clear Chat", variant="secondary", scale=1) | |
| with gr.TabItem("Voice Chat"): | |
| gr.Markdown(""" | |
| ## Voice Chat with Repository | |
| Speak with an AI assistant about the repository using real-time voice chat. | |
| Click the "Open Voice Chat" button to launch the voice interface in a new tab. | |
| Note: Voice chat requires browser permission to access your microphone. | |
| """) | |
| voice_chat_btn = gr.Button("π€ Open Voice Chat", variant="primary") | |
| # Hidden state for analysis file and analyzer | |
| analysis_file = gr.State("") | |
| analyzer_state = gr.State(None) | |
| system_prompt_state = gr.State("") | |
| def clear_chat(): | |
| return [] | |
| def clear_outputs(): | |
| return "", [], "", None, "" | |
| def open_voice_chat(): | |
| return "Opening voice chat in a new tab...", gr.update() | |
| # Set up event handlers | |
| analyze_btn.click( | |
| fn=lambda: "β³ Analysis in progress...", | |
| inputs=None, | |
| outputs=status_msg, | |
| queue=False | |
| ).then( | |
| analyze_repository, | |
| inputs=[repo_url, github_token, gemini_api_key], | |
| outputs=[summary, analysis_file, analyzer_state, system_prompt_state], | |
| ).then( | |
| lambda: "β Analysis complete! You can now chat with the repository.", | |
| inputs=None, | |
| outputs=status_msg, | |
| queue=False | |
| ) | |
| repo_ask_btn.click( | |
| ask_question, | |
| inputs=[repo_question, analysis_file, analyzer_state, gemini_api_key, repo_chatbot], | |
| outputs=[repo_chatbot], | |
| ).then( | |
| lambda: "", # Clear the question input | |
| None, | |
| repo_question, | |
| queue=False | |
| ) | |
| repo_clear_btn.click( | |
| clear_chat, | |
| inputs=None, | |
| outputs=[repo_chatbot], | |
| queue=False | |
| ) | |
| voice_chat_btn.click( | |
| open_voice_chat, | |
| inputs=None, | |
| outputs=[status_msg, voice_chat_btn], | |
| ).then( | |
| lambda: gr.update(value="Voice Chat Opened"), | |
| None, | |
| voice_chat_btn, | |
| js="""() => { window.open('/', '_blank'); return null; }""" | |
| ) | |
| return app | |
| # Main execution | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Create Gradio interface | |
| gradio_app = create_gradio_app() | |
| # Get the Gradio FastAPI app | |
| gradio_app = gr.mount_gradio_app(app, gradio_app, path="/gradio") | |
| # Start the FastAPI server | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |