import asyncio import base64 import json import os import pathlib from typing import AsyncGenerator, Dict, List, Any, Tuple, Optional, Set, Literal import gradio as gr import numpy as np from dotenv import load_dotenv from fastapi import FastAPI from fastapi.responses import HTMLResponse from fastrtc import ( AsyncStreamHandler, Stream, get_twilio_turn_credentials, wait_for_item, ) from github import Github import google.generativeai as genai from google.genai.types import ( LiveConnectConfig, PrebuiltVoiceConfig, SpeechConfig, VoiceConfig, ) from gradio.utils import get_space from pydantic import BaseModel from collections import defaultdict import base64 from pathlib import Path import tempfile from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import re import requests from datetime import datetime # Set up paths current_dir = pathlib.Path(__file__).parent index_html_path = current_dir / "index.html" # Load environment variables load_dotenv() # Configure API keys GITHUB_TOKEN = os.getenv("GITHUB_API_TOKEN") GEMINI_API_KEY = "AIzaSyB-ZGVbyVWylBhVvD4f4DQLcKrj0yKiw4E" TWILIO_ACCOUNT_SID = "AC44b92376582bdbb9f566ba82940a021e" TWILIO_AUTH_TOKEN = "673a36527d5420a7e43b5a20df7b4ed8" if not GITHUB_TOKEN: GITHUB_TOKEN = "YOUR_GITHUB_TOKEN" # Will be replaced by user input if not GEMINI_API_KEY: GEMINI_API_KEY = "YOUR_GEMINI_API_KEY" # Will be replaced by user input # Initialize GitHub API gh = None # Configure Gemini model def configure_gemini(api_key): genai.configure(api_key=api_key) return genai.GenerativeModel( model_name="gemini-1.5-pro-latest", generation_config={ "temperature": 0.7, "top_p": 0.95, "top_k": 40, "max_output_tokens": 8192, }, safety_settings=[ {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, ] ) # Configure Gemini client for voice def create_gemini_client(api_key): return genai.Client( api_key=api_key, http_options={"api_version": "v1alpha"}, ) # Audio encoding function def encode_audio(data: np.ndarray) -> str: """Encode Audio data to send to the server""" return base64.b64encode(data.tobytes()).decode("UTF-8") # Code file extensions to analyze RELEVANT_EXTENSIONS = { ".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".cpp", ".c", ".h", ".hpp", ".rb", ".php", ".go", ".rs", ".swift", ".kt", ".cs", ".css", ".html", ".xml", ".json", ".yaml", ".yml", ".md", ".sh", ".bat" } # Repository analysis class class RepositoryAnalyzer: """Handles GitHub repository analysis""" def __init__(self, repo_url: str, github_token: str): # Extract owner and repo name from URL parts = repo_url.rstrip('/').split('/') if len(parts) < 2: raise ValueError("Invalid repository URL format") self.repo_name = parts[-1] self.owner = parts[-2] self.repo_url = repo_url self.github_token = github_token # Initialize GitHub API self.gh = Github(github_token) self.repo = self.gh.get_repo(f"{self.owner}/{self.repo_name}") self.analysis_data: Dict[str, Any] = {} # Store repository content cache self.file_content_cache = {} def analyze(self, progress_callback=None) -> Dict[str, Any]: """Perform complete repository analysis""" try: if progress_callback: progress_callback(0.1, "Fetching basic repository information...") # Basic repository information self.analysis_data["basic_info"] = self._get_basic_info() if progress_callback: progress_callback(0.2, "Analyzing repository structure...") # Analyze repository structure self.analysis_data["structure"] = self._analyze_structure() if progress_callback: progress_callback(0.3, "Analyzing repository dependencies...") # Analyze dependencies self.analysis_data["dependencies"] = self._analyze_dependencies() if progress_callback: progress_callback(0.4, "Analyzing code patterns...") # Analyze code patterns self.analysis_data["code_patterns"] = self._analyze_code_patterns() if progress_callback: progress_callback(0.6, "Analyzing commit history...") # Analyze commit history self.analysis_data["commit_history"] = self._analyze_commits() if progress_callback: progress_callback(0.8, "Analyzing contributors...") # Get contributor statistics self.analysis_data["contributors"] = self._analyze_contributors() if progress_callback: progress_callback(0.9, "Analyzing pull requests and issues...") # Analyze pull requests and issues self.analysis_data["pull_requests"] = self._analyze_pull_requests() self.analysis_data["issues"] = self._analyze_issues() if progress_callback: progress_callback(1.0, "Analysis complete!") return self.analysis_data except Exception as e: raise Exception(f"Error analyzing repository: {str(e)}") def _get_basic_info(self) -> Dict[str, Any]: """Get basic repository information""" return { "name": self.repo.name, "owner": self.repo.owner.login, "description": self.repo.description or "No description available", "stars": self.repo.stargazers_count, "forks": self.repo.forks_count, "watchers": self.repo.watchers_count, "created_at": self.repo.created_at.isoformat(), "last_updated": self.repo.updated_at.isoformat(), "primary_language": self.repo.language or "Not specified", "license": self.repo.license.name if self.repo.license else "No license specified", "open_issues_count": self.repo.open_issues_count, "is_archived": self.repo.archived, "is_fork": self.repo.fork, "homepage": self.repo.homepage, "url": self.repo.html_url, "size": self.repo.size, "topics": self.repo.get_topics(), } def _analyze_structure(self) -> Dict[str, Any]: """Analyze repository structure and organization""" structure = { "files": defaultdict(int), "directories": set(), "total_size": 0, "readme": None, "license": None, "gitignore": None, "workflow_files": [], "test_directories": [], "docs_directories": [], } try: # Check for root-level special files try: readme_content = self.repo.get_readme() structure["readme"] = { "path": readme_content.path, "size": readme_content.size, } except: pass try: license_content = self.repo.get_license() structure["license"] = { "path": license_content.path, "size": license_content.size, "name": license_content.license.name if license_content.license else "Unknown" } except: pass # Analyze repository structure recursively contents = self.repo.get_contents("") while contents: content = contents.pop(0) # Identify special files if content.path.lower() == ".gitignore": structure["gitignore"] = content.path elif content.path.startswith(".github/workflows/") and content.type == "file": structure["workflow_files"].append(content.path) # Track directories if content.type == "dir": structure["directories"].add(content.path) # Identify special directories path_lower = content.path.lower() if "test" in path_lower or path_lower.endswith("tests"): structure["test_directories"].append(content.path) elif "doc" in path_lower or path_lower.endswith("docs"): structure["docs_directories"].append(content.path) # Get directory contents try: contents.extend(self.repo.get_contents(content.path)) except Exception as e: print(f"Error getting contents of directory {content.path}: {str(e)}") # Track files else: ext = Path(content.path).suffix.lower() structure["files"][ext] += 1 structure["total_size"] += content.size except Exception as e: print(f"Error analyzing structure: {str(e)}") return { "file_types": dict(structure["files"]), "directory_count": len(structure["directories"]), "total_size": structure["total_size"], "file_count": sum(structure["files"].values()), "readme": structure["readme"], "license": structure["license"], "gitignore": structure["gitignore"], "workflow_files": structure["workflow_files"], "test_directories": structure["test_directories"], "docs_directories": structure["docs_directories"], } def _analyze_dependencies(self) -> Dict[str, Any]: """Analyze repository dependencies""" dependencies = { "package_managers": [], "dependencies": {}, "has_lockfiles": False, } dependency_files = { "requirements.txt": "pip", "setup.py": "pip", "pyproject.toml": "poetry/pip", "Pipfile": "pipenv", "package.json": "npm", "pom.xml": "maven", "build.gradle": "gradle", "Gemfile": "bundler", "Cargo.toml": "cargo", "go.mod": "go", "composer.json": "composer", } lockfiles = [ "package-lock.json", "yarn.lock", "Pipfile.lock", "poetry.lock", "Gemfile.lock", "Cargo.lock", "composer.lock", "go.sum" ] try: for file_path, package_manager in dependency_files.items(): try: content = self.repo.get_contents(file_path) if content: dependencies["package_managers"].append(package_manager) # Parse dependencies from common files if file_path == "requirements.txt": file_content = base64.b64decode(content.content).decode('utf-8') deps = [line.strip().split('==')[0] for line in file_content.split('\n') if line.strip() and not line.strip().startswith('#')] dependencies["dependencies"]["pip"] = deps elif file_path == "package.json": file_content = base64.b64decode(content.content).decode('utf-8') pkg_json = json.loads(file_content) deps = list(pkg_json.get("dependencies", {}).keys()) dev_deps = list(pkg_json.get("devDependencies", {}).keys()) dependencies["dependencies"]["npm"] = { "dependencies": deps, "devDependencies": dev_deps } except: pass # Check for lock files for lockfile in lockfiles: try: if self.repo.get_contents(lockfile): dependencies["has_lockfiles"] = True break except: pass except Exception as e: print(f"Error analyzing dependencies: {str(e)}") return dependencies def _analyze_code_patterns(self) -> Dict[str, Any]: """Analyze code patterns and style""" patterns = { "samples": [], "languages": defaultdict(int), "complexity_metrics": defaultdict(list), "documentation_ratio": 0, "avg_code_to_comment_ratio": 0, } try: files = self.repo.get_contents("") analyzed = 0 total_comments = 0 total_code = 0 while files and analyzed < 10: # Analyze up to 10 files file = files.pop(0) if file.type == "dir": files.extend(self.repo.get_contents(file.path)) elif Path(file.path).suffix.lower() in RELEVANT_EXTENSIONS: try: content = base64.b64decode(file.content).decode('utf-8') lines = content.splitlines() if not lines: continue # Count code and comment lines code_lines = 0 comment_lines = 0 empty_lines = 0 ext = Path(file.path).suffix.lower() # Simple comment detection based on file type comment_markers = { ".py": ["#"], ".js": ["//", "/*"], ".ts": ["//", "/*"], ".jsx": ["//", "/*"], ".tsx": ["//", "/*"], ".java": ["//", "/*"], ".cpp": ["//", "/*"], ".c": ["//", "/*"], ".h": ["//", "/*"], ".hpp": ["//", "/*"], ".rb": ["#"], ".php": ["//", "/*", "#"], ".go": ["//", "/*"], ".rs": ["//", "/*"], ".swift": ["//", "/*"], ".kt": ["//", "/*"], ".cs": ["//", "/*"], } if ext in comment_markers: for line in lines: line = line.strip() if not line: empty_lines += 1 elif any(line.startswith(marker) for marker in comment_markers[ext]): comment_lines += 1 else: code_lines += 1 else: # Default counting for unknown file types code_lines = len([line for line in lines if line.strip()]) total_code += code_lines total_comments += comment_lines # Calculate metrics loc = len([line for line in lines if line.strip()]) avg_line_length = sum(len(line) for line in lines if line) / max(1, len([line for line in lines if line])) comment_ratio = comment_lines / max(1, code_lines + comment_lines) # Store file analysis patterns["samples"].append({ "path": file.path, "language": Path(file.path).suffix[1:], "loc": loc, "code_lines": code_lines, "comment_lines": comment_lines, "empty_lines": empty_lines, "comment_ratio": round(comment_ratio, 2), "avg_line_length": round(avg_line_length, 2) }) patterns["languages"][Path(file.path).suffix[1:]] += loc patterns["complexity_metrics"]["loc"].append(loc) patterns["complexity_metrics"]["avg_line_length"].append(avg_line_length) patterns["complexity_metrics"]["comment_ratio"].append(comment_ratio) analyzed += 1 # Store file content in cache for later use self.file_content_cache[file.path] = content except Exception as e: print(f"Error analyzing file {file.path}: {str(e)}") continue # Calculate aggregate metrics if analyzed > 0: patterns["documentation_ratio"] = round(sum(patterns["complexity_metrics"]["comment_ratio"]) / analyzed, 2) patterns["avg_code_to_comment_ratio"] = round(total_code / max(1, total_comments), 2) except Exception as e: print(f"Error in code pattern analysis: {str(e)}") return patterns def _analyze_commits(self) -> Dict[str, Any]: """Analyze commit history and patterns""" commit_data = [] commit_times = [] commit_days = [] commit_authors = defaultdict(int) commit_messages = [] recent_activity = [] try: commits = list(self.repo.get_commits()[:100]) # Get last 100 commits for commit in commits: try: # Get commit details commit_info = { "sha": commit.sha, "author": commit.author.login if commit.author else "Unknown", "date": commit.commit.author.date.isoformat(), "message": commit.commit.message, "changes": { "additions": commit.stats.additions, "deletions": commit.stats.deletions, } } # Track commit data commit_data.append(commit_info) commit_times.append(commit.commit.author.date.hour) commit_days.append(commit.commit.author.date.weekday()) # Track author statistics author = commit.author.login if commit.author else "Unknown" commit_authors[author] += 1 # Track commit messages commit_messages.append(commit.commit.message) # Track recent activity (last 10 commits) if len(recent_activity) < 10: recent_activity.append({ "author": author, "date": commit.commit.author.date.isoformat(), "message": commit.commit.message[:100] + ("..." if len(commit.commit.message) > 100 else ""), }) except Exception as e: print(f"Error processing commit {commit.sha}: {str(e)}") continue # Analyze commit patterns commit_hours = defaultdict(int) for hour in commit_times: commit_hours[hour] += 1 commit_weekdays = defaultdict(int) for day in commit_days: commit_weekdays[day] += 1 # Analyze release patterns (by tag) releases = [] for tag in self.repo.get_tags()[:10]: # Get last 10 tags try: releases.append({ "name": tag.name, "commit": tag.commit.sha, "date": tag.commit.commit.author.date.isoformat(), }) except Exception as e: print(f"Error processing tag {tag.name}: {str(e)}") continue total_commits = len(commit_data) return { "total_commits": total_commits, "commit_hours": dict(commit_hours), "commit_weekdays": dict(commit_weekdays), "avg_additions": sum(c["changes"]["additions"] for c in commit_data) / total_commits if total_commits else 0, "avg_deletions": sum(c["changes"]["deletions"] for c in commit_data) / total_commits if total_commits else 0, "commit_frequency": defaultdict(int, dict(commit_authors)), "recent_activity": recent_activity, "releases": releases, } except Exception as e: print(f"Error in commit analysis: {str(e)}") return { "total_commits": 0, "commit_hours": {}, "commit_weekdays": {}, "avg_additions": 0, "avg_deletions": 0, "commit_frequency": {}, "recent_activity": [], "releases": [], } def _analyze_contributors(self) -> Dict[str, Any]: """Analyze contributor statistics""" contributor_data = [] top_contributors = [] try: contributors = list(self.repo.get_contributors()) # Get all contributors for contributor in contributors: contributor_data.append({ "login": contributor.login, "contributions": contributor.contributions, "type": contributor.type, "url": contributor.html_url, }) # Sort by contributions and get top 5 top_contributors = sorted( contributor_data, key=lambda x: x["contributions"], reverse=True )[:5] except Exception as e: print(f"Error analyzing contributors: {str(e)}") return { "total_contributors": len(contributor_data), "contributors": contributor_data, "top_contributors": top_contributors, } def _analyze_pull_requests(self) -> Dict[str, Any]: """Analyze pull request patterns""" pr_data = { "open_prs": 0, "closed_prs": 0, "merged_prs": 0, "recent_prs": [], } try: # Count open PRs open_prs = self.repo.get_pulls(state='open') pr_data["open_prs"] = open_prs.totalCount # Count closed PRs closed_prs = self.repo.get_pulls(state='closed') pr_data["closed_prs"] = closed_prs.totalCount # Get recent PRs (last 5) recent_prs = list(self.repo.get_pulls(state='all')[:5]) for pr in recent_prs: pr_data["recent_prs"].append({ "number": pr.number, "title": pr.title, "state": pr.state, "created_at": pr.created_at.isoformat(), "author": pr.user.login if pr.user else "Unknown", "is_merged": pr.merged, "url": pr.html_url, }) # Count merged PRs from the sample if pr.merged: pr_data["merged_prs"] += 1 except Exception as e: print(f"Error analyzing pull requests: {str(e)}") return pr_data def _analyze_issues(self) -> Dict[str, Any]: """Analyze issue patterns""" issue_data = { "open_issues": 0, "closed_issues": 0, "recent_issues": [], } try: # Count open issues open_issues = self.repo.get_issues(state='open') issue_data["open_issues"] = open_issues.totalCount # Count closed issues closed_issues = self.repo.get_issues(state='closed') issue_data["closed_issues"] = closed_issues.totalCount # Get recent issues (last 5) recent_issues = list(self.repo.get_issues(state='all')[:5]) for issue in recent_issues: # Skip pull requests (which are also returned as issues) if issue.pull_request is not None: continue issue_data["recent_issues"].append({ "number": issue.number, "title": issue.title, "state": issue.state, "created_at": issue.created_at.isoformat(), "author": issue.user.login if issue.user else "Unknown", "labels": [label.name for label in issue.labels], "url": issue.html_url, }) except Exception as e: print(f"Error analyzing issues: {str(e)}") return issue_data def get_file_content(self, file_path: str) -> str: """Get content of a specific file, using cache if available""" if file_path in self.file_content_cache: return self.file_content_cache[file_path] try: content = self.repo.get_contents(file_path) file_content = base64.b64decode(content.content).decode('utf-8') self.file_content_cache[file_path] = file_content return file_content except Exception as e: print(f"Error getting file content for {file_path}: {str(e)}") return f"Error: Could not retrieve file content: {str(e)}" def search_code(self, query: str) -> List[Dict[str, Any]]: """Search for code in the repository""" results = [] try: # Use GitHub search API code_results = self.gh.search_code(f"repo:{self.owner}/{self.repo_name} {query}") for item in code_results[:10]: # Limit to 10 results try: file_content = self.get_file_content(item.path) # Find matching lines lines = file_content.splitlines() matching_lines = [] for i, line in enumerate(lines): if query.lower() in line.lower(): start_line = max(0, i - 2) end_line = min(len(lines), i + 3) context = "\n".join(lines[start_line:end_line]) matching_lines.append({ "line_number": i + 1, "line": line, "context": context }) results.append({ "path": item.path, "url": item.html_url, "matching_lines": matching_lines[:3], # Limit to 3 matches per file }) except Exception as e: print(f"Error processing search result {item.path}: {str(e)}") continue except Exception as e: print(f"Error searching code: {str(e)}") return results def get_file_list(self, pattern: Optional[str] = None) -> List[str]: """Get list of files in the repository, optionally filtered by pattern""" files = [] try: queue = [("", "")] # (path, directory) while queue: base_path, dir_path = queue.pop(0) full_path = f"{base_path}/{dir_path}".strip("/") try: contents = self.repo.get_contents(full_path or "") for content in contents: if content.type == "dir": new_base = full_path queue.append((new_base, content.name)) else: file_path = f"{full_path}/{content.name}" if full_path else content.name # Filter by pattern if provided if not pattern or re.search(pattern, file_path, re.IGNORECASE): files.append(file_path) except Exception as e: print(f"Error listing files in {full_path}: {str(e)}") continue except Exception as e: print(f"Error getting file list: {str(e)}") return files # Gemini Voice Handler class GeminiHandler(AsyncStreamHandler): """Handler for the Gemini API voice chat""" def __init__( self, expected_layout: Literal["mono"] = "mono", output_sample_rate: int = 24000, output_frame_size: int = 480, analysis_data: Optional[Dict[str, Any]] = None, system_prompt: Optional[str] = None, ) -> None: super().__init__( expected_layout, output_sample_rate, output_frame_size, input_sample_rate=16000, ) self.input_queue: asyncio.Queue = asyncio.Queue() self.output_queue: asyncio.Queue = asyncio.Queue() self.quit: asyncio.Event = asyncio.Event() self.analysis_data = analysis_data or {} self.system_prompt = system_prompt or "" def copy(self) -> "GeminiHandler": return GeminiHandler( expected_layout="mono", output_sample_rate=self.output_sample_rate, output_frame_size=self.output_frame_size, analysis_data=self.analysis_data, system_prompt=self.system_prompt, ) def set_context(self, analysis_data: Dict[str, Any], system_prompt: str): """Set the repository analysis context for voice chat""" self.analysis_data = analysis_data self.system_prompt = system_prompt async def start_up(self): if not self.phone_mode: await self.wait_for_args() api_key, voice_name = self.latest_args[1:] else: api_key, voice_name = None, "Puck" client = genai.Client( api_key=api_key or os.getenv("GEMINI_API_KEY"), http_options={"api_version": "v1alpha"}, ) # Add context prefix if available context_prefix = "" if self.analysis_data and self.system_prompt: context_prefix = f""" {self.system_prompt} Repository Analysis Data: {json.dumps(self.analysis_data, indent=2)} Answer questions about this repository analysis. You are now in voice-based conversation mode. """ config = LiveConnectConfig( response_modalities=["AUDIO"], # type: ignore speech_config=SpeechConfig( voice_config=VoiceConfig( prebuilt_voice_config=PrebuiltVoiceConfig( voice_name=voice_name, ) ) ), prefix=context_prefix, ) try: async with client.aio.live.connect( model="gemini-2.0-flash-exp", config=config ) as session: async for audio in session.start_stream( stream=self.stream(), mime_type="audio/pcm" ): if audio.data: array = np.frombuffer(audio.data, dtype=np.int16) self.output_queue.put_nowait((self.output_sample_rate, array)) except Exception as e: print(f"Error in Gemini streaming session: {str(e)}") async def stream(self) -> AsyncGenerator[bytes, None]: while not self.quit.is_set(): try: audio = await asyncio.wait_for(self.input_queue.get(), 0.1) yield audio except (asyncio.TimeoutError, TimeoutError): pass async def receive(self, frame: tuple[int, np.ndarray]) -> None: _, array = frame array = array.squeeze() audio_message = encode_audio(array) self.input_queue.put_nowait(audio_message) async def emit(self) -> tuple[int, np.ndarray] | None: return await wait_for_item(self.output_queue) def shutdown(self) -> None: self.quit.set() # Function to analyze repository and generate summary @retry( retry=retry_if_exception_type(Exception), stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) def analyze_repository(repo_url: str, github_token: str, gemini_api_key: str, progress=None) -> Tuple[str, str, Any]: """Analyze repository and generate LLM summary with rate limit handling""" try: # Configure Gemini model = configure_gemini(gemini_api_key) # Initialize analyzer if progress: progress(0, desc="Initializing repository analysis...") analyzer = RepositoryAnalyzer(repo_url, github_token) # Perform analysis analysis_data = analyzer.analyze(progress) # Generate LLM summary if progress: progress(0.95, desc="Generating analysis summary...") system_prompt = """You are an expert code analyst with deep experience in software architecture, development practices, and team dynamics. Analyze the provided repository data and create a detailed, insightful analysis using the following markdown template: # Repository Analysis ## 📊 Project Overview [Provide a comprehensive overview including: - Project purpose and scope - Age and maturity of the project - Current activity level and maintenance status - Key metrics (stars, forks, etc.) - Primary technologies and languages used] ## 🏗️ Architecture and Code Organization [Analyze in detail: - Repository structure and organization - Code distribution across different technologies - File and directory organization patterns - Project size and complexity metrics - Code modularity and component structure - Presence of key architectural patterns] ## 💻 Development Practices & Code Quality [Evaluate: - Coding standards and consistency - Code complexity and maintainability metrics - Documentation practices - Testing approach and coverage (if visible) - Error handling and logging practices - Use of design patterns and best practices] ## 📈 Development Workflow & History [Analyze: - Commit patterns and frequency - Release cycles and versioning - Branch management strategy - Code review practices - Continuous integration/deployment indicators - Peak development periods and cycles] ## 👥 Team Dynamics & Collaboration [Examine: - Team size and composition - Contribution patterns - Core maintainer identification - Community engagement level - Communication patterns - Collaboration efficiency] ## 🔧 Technical Depth & Innovation [Assess: - Technical sophistication level - Innovative approaches or solutions - Complex problem-solving examples - Performance optimization efforts - Security considerations - Scalability approach] ## 🚀 Project Health & Sustainability [Evaluate: - Project momentum and growth trends - Maintenance patterns - Community health indicators - Documentation completeness - Onboarding friendliness - Long-term viability indicators] ## 💡 Key Insights & Recommendations [Provide: - 3-5 key strengths identified - 3-5 potential improvement areas - Notable patterns or practices - Unique characteristics - Strategic recommendations] Please provide detailed analysis for each section while maintaining the formatting and emojis. Support insights with specific metrics and examples from the repository data where possible.""" chat = model.start_chat(history=[]) response = chat.send_message(f"{system_prompt}\n\nRepository Analysis Data:\n{json.dumps(analysis_data, indent=2)}") # Save analysis data if progress: progress(0.98, desc="Saving analysis results...") with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: json.dump(analysis_data, f, indent=2) analysis_file = f.name return response.text, analysis_file, analyzer, system_prompt except Exception as e: error_message = f"Error analyzing repository: {str(e)}" raise Exception(error_message) # Function to create a chat session and ask questions def create_chat_session(gemini_api_key: str) -> Any: """Create a new chat session for follow-up questions""" genai.configure(api_key=gemini_api_key) return genai.GenerativeModel( model_name="gemini-1.5-pro-latest", generation_config={ 'temperature': 0.7, 'top_p': 0.8, 'top_k': 40, 'max_output_tokens': 4096, } ) @retry( retry=retry_if_exception_type(Exception), stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) def ask_question(question: str, analysis_file: str, analyzer: RepositoryAnalyzer, gemini_api_key: str, chat_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]: """Process a follow-up question about the analysis""" if not analysis_file or not analyzer: return chat_history + [(question, "Please analyze a repository first before asking questions.")] try: # Load analysis data with open(analysis_file, 'r') as f: analysis_data = json.load(f) # Initialize chat model model = create_chat_session(gemini_api_key) # Check if this is a file content request file_request_match = re.search(r"(show|view|get|display|content of|code for)\s+(?:the\s+)?(?:file\s+)?['\"]?([^'\"]+?)['\"]?(?:\s+file)?", question.lower()) if file_request_match: file_path = file_request_match.group(2).strip() # Try to find the exact file all_files = analyzer.get_file_list() # Check for exact match if file_path in all_files: file_content = analyzer.get_file_content(file_path) return chat_history + [(question, f"Here's the content of `{file_path}`:\n\n```\n{file_content}\n```")] # Check for partial match matching_files = [f for f in all_files if file_path.lower() in f.lower()] if matching_files: if len(matching_files) == 1: file_content = analyzer.get_file_content(matching_files[0]) return chat_history + [(question, f"Here's the content of `{matching_files[0]}`:\n\n```\n{file_content}\n```")] else: file_list = "\n".join([f"- {f}" for f in matching_files[:10]]) return chat_history + [(question, f"I found multiple files matching '{file_path}'. Please specify which one you'd like to see:\n\n{file_list}{' and more...' if len(matching_files) > 10 else ''}")] # Check if this is a code search request search_request_match = re.search(r"(search|find|look for|where is)\s+(?:for\s+)?['\"]?([^'\"]+?)['\"]?(?:\s+in the code)?", question.lower()) if search_request_match: search_query = search_request_match.group(2).strip() search_results = analyzer.search_code(search_query) if search_results: result_text = f"I found {len(search_results)} files containing '{search_query}':\n\n" for result in search_results: result_text += f"**File: {result['path']}**\n" if result['matching_lines']: for match in result['matching_lines']: result_text += f"Line {match['line_number']}: `{match['line'].strip()}`\n" result_text += "\n" else: result_text += "No specific line matches found.\n\n" return chat_history + [(question, result_text)] else: return chat_history + [(question, f"I couldn't find any code matching '{search_query}' in the repository.")] # For general questions, use the AI # Build context from chat history and current question context = "You are an expert code analyst helping users understand repository analysis results.\n\n" context += f"Repository Analysis Data:\n{json.dumps(analysis_data, indent=2)}\n\n" # Add chat history context if chat_history: context += "Previous conversation:\n" for user_msg, assistant_msg in chat_history[-5:]: # Include last 5 messages only context += f"User: {user_msg}\nAssistant: {assistant_msg}\n" # Add current question prompt = context + f"\nUser: {question}\nPlease provide your analysis based on the repository data:" # Get response response = model.generate_content(prompt) # Return in the correct tuple format for Gradio chatbot return chat_history + [(question, response.text)] except Exception as e: error_message = f"Error processing question: {str(e)}" return chat_history + [(question, error_message)] # Input data models class InputData(BaseModel): webrtc_id: str voice_name: str api_key: str repo_url: Optional[str] = None github_token: Optional[str] = None # Create FastAPI app and set up routes app = FastAPI() # Create Gemini handler for voice chat gemini_handler = GeminiHandler() # Create voice chat stream voice_stream = Stream( modality="audio", mode="send-receive", handler=gemini_handler, rtc_configuration=get_twilio_turn_credentials() if get_space() else None, concurrency_limit=5 if get_space() else None, time_limit=120 if get_space() else None, additional_inputs=[ gr.Textbox( label="Gemini API Key", type="password", value=os.getenv("GEMINI_API_KEY") if not get_space() else "", ), gr.Dropdown( label="Voice", choices=[ "Puck", "Charon", "Kore", "Fenrir", "Aoede", ], value="Puck", ), ], ) # Mount voice stream to app voice_stream.mount(app) # Current repository analysis data current_analysis = { "data": None, "analyzer": None, "file": None, "summary": None, "system_prompt": None, } @app.post("/input_hook") async def _(body: InputData): voice_stream.set_input(body.webrtc_id, body.api_key, body.voice_name) # If repo data is provided, analyze it and update the context if body.repo_url and body.github_token and current_analysis["data"] is None: try: # Analyze the repository in a background task to not block the voice connection asyncio.create_task(analyze_and_update_context(body.repo_url, body.github_token, body.api_key)) except Exception as e: print(f"Error analyzing repository: {str(e)}") # Update handler context if analysis data exists if current_analysis["data"] and current_analysis["system_prompt"]: gemini_handler.set_context(current_analysis["data"], current_analysis["system_prompt"]) return {"status": "ok"} @app.post("/analyze_repository") async def analyze_repo(repo_url: str, github_token: str, gemini_api_key: str): try: summary, file_path, analyzer, system_prompt = await asyncio.to_thread( analyze_repository, repo_url, github_token, gemini_api_key ) # Load analysis data from file with open(file_path, 'r') as f: analysis_data = json.load(f) # Update current analysis current_analysis["data"] = analysis_data current_analysis["analyzer"] = analyzer current_analysis["file"] = file_path current_analysis["summary"] = summary current_analysis["system_prompt"] = system_prompt # Update handler context gemini_handler.set_context(analysis_data, system_prompt) return { "status": "success", "summary": summary, "file_path": file_path } except Exception as e: return { "status": "error", "message": str(e) } async def analyze_and_update_context(repo_url: str, github_token: str, gemini_api_key: str): try: summary, file_path, analyzer, system_prompt = await asyncio.to_thread( analyze_repository, repo_url, github_token, gemini_api_key ) # Load analysis data from file with open(file_path, 'r') as f: analysis_data = json.load(f) # Update current analysis current_analysis["data"] = analysis_data current_analysis["analyzer"] = analyzer current_analysis["file"] = file_path current_analysis["summary"] = summary current_analysis["system_prompt"] = system_prompt # Update handler context gemini_handler.set_context(analysis_data, system_prompt) except Exception as e: print(f"Error analyzing repository in background: {str(e)}") @app.post("/ask_question") async def ask_repo_question(question: str): if not current_analysis["file"] or not current_analysis["analyzer"]: return { "status": "error", "message": "Please analyze a repository first before asking questions." } try: response = await asyncio.to_thread( ask_question, question, current_analysis["file"], current_analysis["analyzer"], GEMINI_API_KEY, [] ) # Extract just the response text _, answer = response[0] return { "status": "success", "answer": answer } except Exception as e: return { "status": "error", "message": str(e) } @app.get("/") async def index(): rtc_config = get_twilio_turn_credentials() if get_space() else None # Check if index.html exists if not index_html_path.exists(): # Create basic HTML if not exists html_content = """
Analyze GitHub repositories and chat with the AI using voice or text