githubVoiceBot / app.py
nihalaninihal's picture
Update app.py
de661bd verified
import asyncio
import base64
import json
import os
import pathlib
from typing import AsyncGenerator, Dict, List, Any, Tuple, Optional, Set, Literal
import gradio as gr
import numpy as np
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastrtc import (
AsyncStreamHandler,
Stream,
get_twilio_turn_credentials,
wait_for_item,
)
from github import Github
import google.generativeai as genai
from google.genai.types import (
LiveConnectConfig,
PrebuiltVoiceConfig,
SpeechConfig,
VoiceConfig,
)
from gradio.utils import get_space
from pydantic import BaseModel
from collections import defaultdict
import base64
from pathlib import Path
import tempfile
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import re
import requests
from datetime import datetime
# Set up paths
current_dir = pathlib.Path(__file__).parent
index_html_path = current_dir / "index.html"
# Load environment variables
load_dotenv()
# Configure API keys
GITHUB_TOKEN = os.getenv("GITHUB_API_TOKEN")
GEMINI_API_KEY = "AIzaSyB-ZGVbyVWylBhVvD4f4DQLcKrj0yKiw4E"
TWILIO_ACCOUNT_SID = "AC44b92376582bdbb9f566ba82940a021e"
TWILIO_AUTH_TOKEN = "673a36527d5420a7e43b5a20df7b4ed8"
if not GITHUB_TOKEN:
GITHUB_TOKEN = "YOUR_GITHUB_TOKEN" # Will be replaced by user input
if not GEMINI_API_KEY:
GEMINI_API_KEY = "YOUR_GEMINI_API_KEY" # Will be replaced by user input
# Initialize GitHub API
gh = None
# Configure Gemini model
def configure_gemini(api_key):
genai.configure(api_key=api_key)
return genai.GenerativeModel(
model_name="gemini-1.5-pro-latest",
generation_config={
"temperature": 0.7,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
},
safety_settings=[
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
]
)
# Configure Gemini client for voice
def create_gemini_client(api_key):
return genai.Client(
api_key=api_key,
http_options={"api_version": "v1alpha"},
)
# Audio encoding function
def encode_audio(data: np.ndarray) -> str:
"""Encode Audio data to send to the server"""
return base64.b64encode(data.tobytes()).decode("UTF-8")
# Code file extensions to analyze
RELEVANT_EXTENSIONS = {
".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".cpp", ".c", ".h",
".hpp", ".rb", ".php", ".go", ".rs", ".swift", ".kt", ".cs", ".css",
".html", ".xml", ".json", ".yaml", ".yml", ".md", ".sh", ".bat"
}
# Repository analysis class
class RepositoryAnalyzer:
"""Handles GitHub repository analysis"""
def __init__(self, repo_url: str, github_token: str):
# Extract owner and repo name from URL
parts = repo_url.rstrip('/').split('/')
if len(parts) < 2:
raise ValueError("Invalid repository URL format")
self.repo_name = parts[-1]
self.owner = parts[-2]
self.repo_url = repo_url
self.github_token = github_token
# Initialize GitHub API
self.gh = Github(github_token)
self.repo = self.gh.get_repo(f"{self.owner}/{self.repo_name}")
self.analysis_data: Dict[str, Any] = {}
# Store repository content cache
self.file_content_cache = {}
def analyze(self, progress_callback=None) -> Dict[str, Any]:
"""Perform complete repository analysis"""
try:
if progress_callback:
progress_callback(0.1, "Fetching basic repository information...")
# Basic repository information
self.analysis_data["basic_info"] = self._get_basic_info()
if progress_callback:
progress_callback(0.2, "Analyzing repository structure...")
# Analyze repository structure
self.analysis_data["structure"] = self._analyze_structure()
if progress_callback:
progress_callback(0.3, "Analyzing repository dependencies...")
# Analyze dependencies
self.analysis_data["dependencies"] = self._analyze_dependencies()
if progress_callback:
progress_callback(0.4, "Analyzing code patterns...")
# Analyze code patterns
self.analysis_data["code_patterns"] = self._analyze_code_patterns()
if progress_callback:
progress_callback(0.6, "Analyzing commit history...")
# Analyze commit history
self.analysis_data["commit_history"] = self._analyze_commits()
if progress_callback:
progress_callback(0.8, "Analyzing contributors...")
# Get contributor statistics
self.analysis_data["contributors"] = self._analyze_contributors()
if progress_callback:
progress_callback(0.9, "Analyzing pull requests and issues...")
# Analyze pull requests and issues
self.analysis_data["pull_requests"] = self._analyze_pull_requests()
self.analysis_data["issues"] = self._analyze_issues()
if progress_callback:
progress_callback(1.0, "Analysis complete!")
return self.analysis_data
except Exception as e:
raise Exception(f"Error analyzing repository: {str(e)}")
def _get_basic_info(self) -> Dict[str, Any]:
"""Get basic repository information"""
return {
"name": self.repo.name,
"owner": self.repo.owner.login,
"description": self.repo.description or "No description available",
"stars": self.repo.stargazers_count,
"forks": self.repo.forks_count,
"watchers": self.repo.watchers_count,
"created_at": self.repo.created_at.isoformat(),
"last_updated": self.repo.updated_at.isoformat(),
"primary_language": self.repo.language or "Not specified",
"license": self.repo.license.name if self.repo.license else "No license specified",
"open_issues_count": self.repo.open_issues_count,
"is_archived": self.repo.archived,
"is_fork": self.repo.fork,
"homepage": self.repo.homepage,
"url": self.repo.html_url,
"size": self.repo.size,
"topics": self.repo.get_topics(),
}
def _analyze_structure(self) -> Dict[str, Any]:
"""Analyze repository structure and organization"""
structure = {
"files": defaultdict(int),
"directories": set(),
"total_size": 0,
"readme": None,
"license": None,
"gitignore": None,
"workflow_files": [],
"test_directories": [],
"docs_directories": [],
}
try:
# Check for root-level special files
try:
readme_content = self.repo.get_readme()
structure["readme"] = {
"path": readme_content.path,
"size": readme_content.size,
}
except:
pass
try:
license_content = self.repo.get_license()
structure["license"] = {
"path": license_content.path,
"size": license_content.size,
"name": license_content.license.name if license_content.license else "Unknown"
}
except:
pass
# Analyze repository structure recursively
contents = self.repo.get_contents("")
while contents:
content = contents.pop(0)
# Identify special files
if content.path.lower() == ".gitignore":
structure["gitignore"] = content.path
elif content.path.startswith(".github/workflows/") and content.type == "file":
structure["workflow_files"].append(content.path)
# Track directories
if content.type == "dir":
structure["directories"].add(content.path)
# Identify special directories
path_lower = content.path.lower()
if "test" in path_lower or path_lower.endswith("tests"):
structure["test_directories"].append(content.path)
elif "doc" in path_lower or path_lower.endswith("docs"):
structure["docs_directories"].append(content.path)
# Get directory contents
try:
contents.extend(self.repo.get_contents(content.path))
except Exception as e:
print(f"Error getting contents of directory {content.path}: {str(e)}")
# Track files
else:
ext = Path(content.path).suffix.lower()
structure["files"][ext] += 1
structure["total_size"] += content.size
except Exception as e:
print(f"Error analyzing structure: {str(e)}")
return {
"file_types": dict(structure["files"]),
"directory_count": len(structure["directories"]),
"total_size": structure["total_size"],
"file_count": sum(structure["files"].values()),
"readme": structure["readme"],
"license": structure["license"],
"gitignore": structure["gitignore"],
"workflow_files": structure["workflow_files"],
"test_directories": structure["test_directories"],
"docs_directories": structure["docs_directories"],
}
def _analyze_dependencies(self) -> Dict[str, Any]:
"""Analyze repository dependencies"""
dependencies = {
"package_managers": [],
"dependencies": {},
"has_lockfiles": False,
}
dependency_files = {
"requirements.txt": "pip",
"setup.py": "pip",
"pyproject.toml": "poetry/pip",
"Pipfile": "pipenv",
"package.json": "npm",
"pom.xml": "maven",
"build.gradle": "gradle",
"Gemfile": "bundler",
"Cargo.toml": "cargo",
"go.mod": "go",
"composer.json": "composer",
}
lockfiles = [
"package-lock.json", "yarn.lock", "Pipfile.lock", "poetry.lock",
"Gemfile.lock", "Cargo.lock", "composer.lock", "go.sum"
]
try:
for file_path, package_manager in dependency_files.items():
try:
content = self.repo.get_contents(file_path)
if content:
dependencies["package_managers"].append(package_manager)
# Parse dependencies from common files
if file_path == "requirements.txt":
file_content = base64.b64decode(content.content).decode('utf-8')
deps = [line.strip().split('==')[0] for line in file_content.split('\n')
if line.strip() and not line.strip().startswith('#')]
dependencies["dependencies"]["pip"] = deps
elif file_path == "package.json":
file_content = base64.b64decode(content.content).decode('utf-8')
pkg_json = json.loads(file_content)
deps = list(pkg_json.get("dependencies", {}).keys())
dev_deps = list(pkg_json.get("devDependencies", {}).keys())
dependencies["dependencies"]["npm"] = {
"dependencies": deps,
"devDependencies": dev_deps
}
except:
pass
# Check for lock files
for lockfile in lockfiles:
try:
if self.repo.get_contents(lockfile):
dependencies["has_lockfiles"] = True
break
except:
pass
except Exception as e:
print(f"Error analyzing dependencies: {str(e)}")
return dependencies
def _analyze_code_patterns(self) -> Dict[str, Any]:
"""Analyze code patterns and style"""
patterns = {
"samples": [],
"languages": defaultdict(int),
"complexity_metrics": defaultdict(list),
"documentation_ratio": 0,
"avg_code_to_comment_ratio": 0,
}
try:
files = self.repo.get_contents("")
analyzed = 0
total_comments = 0
total_code = 0
while files and analyzed < 10: # Analyze up to 10 files
file = files.pop(0)
if file.type == "dir":
files.extend(self.repo.get_contents(file.path))
elif Path(file.path).suffix.lower() in RELEVANT_EXTENSIONS:
try:
content = base64.b64decode(file.content).decode('utf-8')
lines = content.splitlines()
if not lines:
continue
# Count code and comment lines
code_lines = 0
comment_lines = 0
empty_lines = 0
ext = Path(file.path).suffix.lower()
# Simple comment detection based on file type
comment_markers = {
".py": ["#"],
".js": ["//", "/*"],
".ts": ["//", "/*"],
".jsx": ["//", "/*"],
".tsx": ["//", "/*"],
".java": ["//", "/*"],
".cpp": ["//", "/*"],
".c": ["//", "/*"],
".h": ["//", "/*"],
".hpp": ["//", "/*"],
".rb": ["#"],
".php": ["//", "/*", "#"],
".go": ["//", "/*"],
".rs": ["//", "/*"],
".swift": ["//", "/*"],
".kt": ["//", "/*"],
".cs": ["//", "/*"],
}
if ext in comment_markers:
for line in lines:
line = line.strip()
if not line:
empty_lines += 1
elif any(line.startswith(marker) for marker in comment_markers[ext]):
comment_lines += 1
else:
code_lines += 1
else:
# Default counting for unknown file types
code_lines = len([line for line in lines if line.strip()])
total_code += code_lines
total_comments += comment_lines
# Calculate metrics
loc = len([line for line in lines if line.strip()])
avg_line_length = sum(len(line) for line in lines if line) / max(1, len([line for line in lines if line]))
comment_ratio = comment_lines / max(1, code_lines + comment_lines)
# Store file analysis
patterns["samples"].append({
"path": file.path,
"language": Path(file.path).suffix[1:],
"loc": loc,
"code_lines": code_lines,
"comment_lines": comment_lines,
"empty_lines": empty_lines,
"comment_ratio": round(comment_ratio, 2),
"avg_line_length": round(avg_line_length, 2)
})
patterns["languages"][Path(file.path).suffix[1:]] += loc
patterns["complexity_metrics"]["loc"].append(loc)
patterns["complexity_metrics"]["avg_line_length"].append(avg_line_length)
patterns["complexity_metrics"]["comment_ratio"].append(comment_ratio)
analyzed += 1
# Store file content in cache for later use
self.file_content_cache[file.path] = content
except Exception as e:
print(f"Error analyzing file {file.path}: {str(e)}")
continue
# Calculate aggregate metrics
if analyzed > 0:
patterns["documentation_ratio"] = round(sum(patterns["complexity_metrics"]["comment_ratio"]) / analyzed, 2)
patterns["avg_code_to_comment_ratio"] = round(total_code / max(1, total_comments), 2)
except Exception as e:
print(f"Error in code pattern analysis: {str(e)}")
return patterns
def _analyze_commits(self) -> Dict[str, Any]:
"""Analyze commit history and patterns"""
commit_data = []
commit_times = []
commit_days = []
commit_authors = defaultdict(int)
commit_messages = []
recent_activity = []
try:
commits = list(self.repo.get_commits()[:100]) # Get last 100 commits
for commit in commits:
try:
# Get commit details
commit_info = {
"sha": commit.sha,
"author": commit.author.login if commit.author else "Unknown",
"date": commit.commit.author.date.isoformat(),
"message": commit.commit.message,
"changes": {
"additions": commit.stats.additions,
"deletions": commit.stats.deletions,
}
}
# Track commit data
commit_data.append(commit_info)
commit_times.append(commit.commit.author.date.hour)
commit_days.append(commit.commit.author.date.weekday())
# Track author statistics
author = commit.author.login if commit.author else "Unknown"
commit_authors[author] += 1
# Track commit messages
commit_messages.append(commit.commit.message)
# Track recent activity (last 10 commits)
if len(recent_activity) < 10:
recent_activity.append({
"author": author,
"date": commit.commit.author.date.isoformat(),
"message": commit.commit.message[:100] + ("..." if len(commit.commit.message) > 100 else ""),
})
except Exception as e:
print(f"Error processing commit {commit.sha}: {str(e)}")
continue
# Analyze commit patterns
commit_hours = defaultdict(int)
for hour in commit_times:
commit_hours[hour] += 1
commit_weekdays = defaultdict(int)
for day in commit_days:
commit_weekdays[day] += 1
# Analyze release patterns (by tag)
releases = []
for tag in self.repo.get_tags()[:10]: # Get last 10 tags
try:
releases.append({
"name": tag.name,
"commit": tag.commit.sha,
"date": tag.commit.commit.author.date.isoformat(),
})
except Exception as e:
print(f"Error processing tag {tag.name}: {str(e)}")
continue
total_commits = len(commit_data)
return {
"total_commits": total_commits,
"commit_hours": dict(commit_hours),
"commit_weekdays": dict(commit_weekdays),
"avg_additions": sum(c["changes"]["additions"] for c in commit_data) / total_commits if total_commits else 0,
"avg_deletions": sum(c["changes"]["deletions"] for c in commit_data) / total_commits if total_commits else 0,
"commit_frequency": defaultdict(int, dict(commit_authors)),
"recent_activity": recent_activity,
"releases": releases,
}
except Exception as e:
print(f"Error in commit analysis: {str(e)}")
return {
"total_commits": 0,
"commit_hours": {},
"commit_weekdays": {},
"avg_additions": 0,
"avg_deletions": 0,
"commit_frequency": {},
"recent_activity": [],
"releases": [],
}
def _analyze_contributors(self) -> Dict[str, Any]:
"""Analyze contributor statistics"""
contributor_data = []
top_contributors = []
try:
contributors = list(self.repo.get_contributors())
# Get all contributors
for contributor in contributors:
contributor_data.append({
"login": contributor.login,
"contributions": contributor.contributions,
"type": contributor.type,
"url": contributor.html_url,
})
# Sort by contributions and get top 5
top_contributors = sorted(
contributor_data,
key=lambda x: x["contributions"],
reverse=True
)[:5]
except Exception as e:
print(f"Error analyzing contributors: {str(e)}")
return {
"total_contributors": len(contributor_data),
"contributors": contributor_data,
"top_contributors": top_contributors,
}
def _analyze_pull_requests(self) -> Dict[str, Any]:
"""Analyze pull request patterns"""
pr_data = {
"open_prs": 0,
"closed_prs": 0,
"merged_prs": 0,
"recent_prs": [],
}
try:
# Count open PRs
open_prs = self.repo.get_pulls(state='open')
pr_data["open_prs"] = open_prs.totalCount
# Count closed PRs
closed_prs = self.repo.get_pulls(state='closed')
pr_data["closed_prs"] = closed_prs.totalCount
# Get recent PRs (last 5)
recent_prs = list(self.repo.get_pulls(state='all')[:5])
for pr in recent_prs:
pr_data["recent_prs"].append({
"number": pr.number,
"title": pr.title,
"state": pr.state,
"created_at": pr.created_at.isoformat(),
"author": pr.user.login if pr.user else "Unknown",
"is_merged": pr.merged,
"url": pr.html_url,
})
# Count merged PRs from the sample
if pr.merged:
pr_data["merged_prs"] += 1
except Exception as e:
print(f"Error analyzing pull requests: {str(e)}")
return pr_data
def _analyze_issues(self) -> Dict[str, Any]:
"""Analyze issue patterns"""
issue_data = {
"open_issues": 0,
"closed_issues": 0,
"recent_issues": [],
}
try:
# Count open issues
open_issues = self.repo.get_issues(state='open')
issue_data["open_issues"] = open_issues.totalCount
# Count closed issues
closed_issues = self.repo.get_issues(state='closed')
issue_data["closed_issues"] = closed_issues.totalCount
# Get recent issues (last 5)
recent_issues = list(self.repo.get_issues(state='all')[:5])
for issue in recent_issues:
# Skip pull requests (which are also returned as issues)
if issue.pull_request is not None:
continue
issue_data["recent_issues"].append({
"number": issue.number,
"title": issue.title,
"state": issue.state,
"created_at": issue.created_at.isoformat(),
"author": issue.user.login if issue.user else "Unknown",
"labels": [label.name for label in issue.labels],
"url": issue.html_url,
})
except Exception as e:
print(f"Error analyzing issues: {str(e)}")
return issue_data
def get_file_content(self, file_path: str) -> str:
"""Get content of a specific file, using cache if available"""
if file_path in self.file_content_cache:
return self.file_content_cache[file_path]
try:
content = self.repo.get_contents(file_path)
file_content = base64.b64decode(content.content).decode('utf-8')
self.file_content_cache[file_path] = file_content
return file_content
except Exception as e:
print(f"Error getting file content for {file_path}: {str(e)}")
return f"Error: Could not retrieve file content: {str(e)}"
def search_code(self, query: str) -> List[Dict[str, Any]]:
"""Search for code in the repository"""
results = []
try:
# Use GitHub search API
code_results = self.gh.search_code(f"repo:{self.owner}/{self.repo_name} {query}")
for item in code_results[:10]: # Limit to 10 results
try:
file_content = self.get_file_content(item.path)
# Find matching lines
lines = file_content.splitlines()
matching_lines = []
for i, line in enumerate(lines):
if query.lower() in line.lower():
start_line = max(0, i - 2)
end_line = min(len(lines), i + 3)
context = "\n".join(lines[start_line:end_line])
matching_lines.append({
"line_number": i + 1,
"line": line,
"context": context
})
results.append({
"path": item.path,
"url": item.html_url,
"matching_lines": matching_lines[:3], # Limit to 3 matches per file
})
except Exception as e:
print(f"Error processing search result {item.path}: {str(e)}")
continue
except Exception as e:
print(f"Error searching code: {str(e)}")
return results
def get_file_list(self, pattern: Optional[str] = None) -> List[str]:
"""Get list of files in the repository, optionally filtered by pattern"""
files = []
try:
queue = [("", "")] # (path, directory)
while queue:
base_path, dir_path = queue.pop(0)
full_path = f"{base_path}/{dir_path}".strip("/")
try:
contents = self.repo.get_contents(full_path or "")
for content in contents:
if content.type == "dir":
new_base = full_path
queue.append((new_base, content.name))
else:
file_path = f"{full_path}/{content.name}" if full_path else content.name
# Filter by pattern if provided
if not pattern or re.search(pattern, file_path, re.IGNORECASE):
files.append(file_path)
except Exception as e:
print(f"Error listing files in {full_path}: {str(e)}")
continue
except Exception as e:
print(f"Error getting file list: {str(e)}")
return files
# Gemini Voice Handler
class GeminiHandler(AsyncStreamHandler):
"""Handler for the Gemini API voice chat"""
def __init__(
self,
expected_layout: Literal["mono"] = "mono",
output_sample_rate: int = 24000,
output_frame_size: int = 480,
analysis_data: Optional[Dict[str, Any]] = None,
system_prompt: Optional[str] = None,
) -> None:
super().__init__(
expected_layout,
output_sample_rate,
output_frame_size,
input_sample_rate=16000,
)
self.input_queue: asyncio.Queue = asyncio.Queue()
self.output_queue: asyncio.Queue = asyncio.Queue()
self.quit: asyncio.Event = asyncio.Event()
self.analysis_data = analysis_data or {}
self.system_prompt = system_prompt or ""
def copy(self) -> "GeminiHandler":
return GeminiHandler(
expected_layout="mono",
output_sample_rate=self.output_sample_rate,
output_frame_size=self.output_frame_size,
analysis_data=self.analysis_data,
system_prompt=self.system_prompt,
)
def set_context(self, analysis_data: Dict[str, Any], system_prompt: str):
"""Set the repository analysis context for voice chat"""
self.analysis_data = analysis_data
self.system_prompt = system_prompt
async def start_up(self):
if not self.phone_mode:
await self.wait_for_args()
api_key, voice_name = self.latest_args[1:]
else:
api_key, voice_name = None, "Puck"
client = genai.Client(
api_key=api_key or os.getenv("GEMINI_API_KEY"),
http_options={"api_version": "v1alpha"},
)
# Add context prefix if available
context_prefix = ""
if self.analysis_data and self.system_prompt:
context_prefix = f"""
{self.system_prompt}
Repository Analysis Data:
{json.dumps(self.analysis_data, indent=2)}
Answer questions about this repository analysis. You are now in voice-based conversation mode.
"""
config = LiveConnectConfig(
response_modalities=["AUDIO"], # type: ignore
speech_config=SpeechConfig(
voice_config=VoiceConfig(
prebuilt_voice_config=PrebuiltVoiceConfig(
voice_name=voice_name,
)
)
),
prefix=context_prefix,
)
try:
async with client.aio.live.connect(
model="gemini-2.0-flash-exp", config=config
) as session:
async for audio in session.start_stream(
stream=self.stream(), mime_type="audio/pcm"
):
if audio.data:
array = np.frombuffer(audio.data, dtype=np.int16)
self.output_queue.put_nowait((self.output_sample_rate, array))
except Exception as e:
print(f"Error in Gemini streaming session: {str(e)}")
async def stream(self) -> AsyncGenerator[bytes, None]:
while not self.quit.is_set():
try:
audio = await asyncio.wait_for(self.input_queue.get(), 0.1)
yield audio
except (asyncio.TimeoutError, TimeoutError):
pass
async def receive(self, frame: tuple[int, np.ndarray]) -> None:
_, array = frame
array = array.squeeze()
audio_message = encode_audio(array)
self.input_queue.put_nowait(audio_message)
async def emit(self) -> tuple[int, np.ndarray] | None:
return await wait_for_item(self.output_queue)
def shutdown(self) -> None:
self.quit.set()
# Function to analyze repository and generate summary
@retry(
retry=retry_if_exception_type(Exception),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def analyze_repository(repo_url: str, github_token: str, gemini_api_key: str, progress=None) -> Tuple[str, str, Any]:
"""Analyze repository and generate LLM summary with rate limit handling"""
try:
# Configure Gemini
model = configure_gemini(gemini_api_key)
# Initialize analyzer
if progress:
progress(0, desc="Initializing repository analysis...")
analyzer = RepositoryAnalyzer(repo_url, github_token)
# Perform analysis
analysis_data = analyzer.analyze(progress)
# Generate LLM summary
if progress:
progress(0.95, desc="Generating analysis summary...")
system_prompt = """You are an expert code analyst with deep experience in software architecture, development practices, and team dynamics. Analyze the provided repository data and create a detailed, insightful analysis using the following markdown template:
# Repository Analysis
## πŸ“Š Project Overview
[Provide a comprehensive overview including:
- Project purpose and scope
- Age and maturity of the project
- Current activity level and maintenance status
- Key metrics (stars, forks, etc.)
- Primary technologies and languages used]
## πŸ—οΈ Architecture and Code Organization
[Analyze in detail:
- Repository structure and organization
- Code distribution across different technologies
- File and directory organization patterns
- Project size and complexity metrics
- Code modularity and component structure
- Presence of key architectural patterns]
## πŸ’» Development Practices & Code Quality
[Evaluate:
- Coding standards and consistency
- Code complexity and maintainability metrics
- Documentation practices
- Testing approach and coverage (if visible)
- Error handling and logging practices
- Use of design patterns and best practices]
## πŸ“ˆ Development Workflow & History
[Analyze:
- Commit patterns and frequency
- Release cycles and versioning
- Branch management strategy
- Code review practices
- Continuous integration/deployment indicators
- Peak development periods and cycles]
## πŸ‘₯ Team Dynamics & Collaboration
[Examine:
- Team size and composition
- Contribution patterns
- Core maintainer identification
- Community engagement level
- Communication patterns
- Collaboration efficiency]
## πŸ”§ Technical Depth & Innovation
[Assess:
- Technical sophistication level
- Innovative approaches or solutions
- Complex problem-solving examples
- Performance optimization efforts
- Security considerations
- Scalability approach]
## πŸš€ Project Health & Sustainability
[Evaluate:
- Project momentum and growth trends
- Maintenance patterns
- Community health indicators
- Documentation completeness
- Onboarding friendliness
- Long-term viability indicators]
## πŸ’‘ Key Insights & Recommendations
[Provide:
- 3-5 key strengths identified
- 3-5 potential improvement areas
- Notable patterns or practices
- Unique characteristics
- Strategic recommendations]
Please provide detailed analysis for each section while maintaining the formatting and emojis. Support insights with specific metrics and examples from the repository data where possible."""
chat = model.start_chat(history=[])
response = chat.send_message(f"{system_prompt}\n\nRepository Analysis Data:\n{json.dumps(analysis_data, indent=2)}")
# Save analysis data
if progress:
progress(0.98, desc="Saving analysis results...")
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
json.dump(analysis_data, f, indent=2)
analysis_file = f.name
return response.text, analysis_file, analyzer, system_prompt
except Exception as e:
error_message = f"Error analyzing repository: {str(e)}"
raise Exception(error_message)
# Function to create a chat session and ask questions
def create_chat_session(gemini_api_key: str) -> Any:
"""Create a new chat session for follow-up questions"""
genai.configure(api_key=gemini_api_key)
return genai.GenerativeModel(
model_name="gemini-1.5-pro-latest",
generation_config={
'temperature': 0.7,
'top_p': 0.8,
'top_k': 40,
'max_output_tokens': 4096,
}
)
@retry(
retry=retry_if_exception_type(Exception),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def ask_question(question: str, analysis_file: str, analyzer: RepositoryAnalyzer, gemini_api_key: str,
chat_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
"""Process a follow-up question about the analysis"""
if not analysis_file or not analyzer:
return chat_history + [(question, "Please analyze a repository first before asking questions.")]
try:
# Load analysis data
with open(analysis_file, 'r') as f:
analysis_data = json.load(f)
# Initialize chat model
model = create_chat_session(gemini_api_key)
# Check if this is a file content request
file_request_match = re.search(r"(show|view|get|display|content of|code for)\s+(?:the\s+)?(?:file\s+)?['\"]?([^'\"]+?)['\"]?(?:\s+file)?",
question.lower())
if file_request_match:
file_path = file_request_match.group(2).strip()
# Try to find the exact file
all_files = analyzer.get_file_list()
# Check for exact match
if file_path in all_files:
file_content = analyzer.get_file_content(file_path)
return chat_history + [(question, f"Here's the content of `{file_path}`:\n\n```\n{file_content}\n```")]
# Check for partial match
matching_files = [f for f in all_files if file_path.lower() in f.lower()]
if matching_files:
if len(matching_files) == 1:
file_content = analyzer.get_file_content(matching_files[0])
return chat_history + [(question, f"Here's the content of `{matching_files[0]}`:\n\n```\n{file_content}\n```")]
else:
file_list = "\n".join([f"- {f}" for f in matching_files[:10]])
return chat_history + [(question, f"I found multiple files matching '{file_path}'. Please specify which one you'd like to see:\n\n{file_list}{' and more...' if len(matching_files) > 10 else ''}")]
# Check if this is a code search request
search_request_match = re.search(r"(search|find|look for|where is)\s+(?:for\s+)?['\"]?([^'\"]+?)['\"]?(?:\s+in the code)?",
question.lower())
if search_request_match:
search_query = search_request_match.group(2).strip()
search_results = analyzer.search_code(search_query)
if search_results:
result_text = f"I found {len(search_results)} files containing '{search_query}':\n\n"
for result in search_results:
result_text += f"**File: {result['path']}**\n"
if result['matching_lines']:
for match in result['matching_lines']:
result_text += f"Line {match['line_number']}: `{match['line'].strip()}`\n"
result_text += "\n"
else:
result_text += "No specific line matches found.\n\n"
return chat_history + [(question, result_text)]
else:
return chat_history + [(question, f"I couldn't find any code matching '{search_query}' in the repository.")]
# For general questions, use the AI
# Build context from chat history and current question
context = "You are an expert code analyst helping users understand repository analysis results.\n\n"
context += f"Repository Analysis Data:\n{json.dumps(analysis_data, indent=2)}\n\n"
# Add chat history context
if chat_history:
context += "Previous conversation:\n"
for user_msg, assistant_msg in chat_history[-5:]: # Include last 5 messages only
context += f"User: {user_msg}\nAssistant: {assistant_msg}\n"
# Add current question
prompt = context + f"\nUser: {question}\nPlease provide your analysis based on the repository data:"
# Get response
response = model.generate_content(prompt)
# Return in the correct tuple format for Gradio chatbot
return chat_history + [(question, response.text)]
except Exception as e:
error_message = f"Error processing question: {str(e)}"
return chat_history + [(question, error_message)]
# Input data models
class InputData(BaseModel):
webrtc_id: str
voice_name: str
api_key: str
repo_url: Optional[str] = None
github_token: Optional[str] = None
# Create FastAPI app and set up routes
app = FastAPI()
# Create Gemini handler for voice chat
gemini_handler = GeminiHandler()
# Create voice chat stream
voice_stream = Stream(
modality="audio",
mode="send-receive",
handler=gemini_handler,
rtc_configuration=get_twilio_turn_credentials() if get_space() else None,
concurrency_limit=5 if get_space() else None,
time_limit=120 if get_space() else None,
additional_inputs=[
gr.Textbox(
label="Gemini API Key",
type="password",
value=os.getenv("GEMINI_API_KEY") if not get_space() else "",
),
gr.Dropdown(
label="Voice",
choices=[
"Puck",
"Charon",
"Kore",
"Fenrir",
"Aoede",
],
value="Puck",
),
],
)
# Mount voice stream to app
voice_stream.mount(app)
# Current repository analysis data
current_analysis = {
"data": None,
"analyzer": None,
"file": None,
"summary": None,
"system_prompt": None,
}
@app.post("/input_hook")
async def _(body: InputData):
voice_stream.set_input(body.webrtc_id, body.api_key, body.voice_name)
# If repo data is provided, analyze it and update the context
if body.repo_url and body.github_token and current_analysis["data"] is None:
try:
# Analyze the repository in a background task to not block the voice connection
asyncio.create_task(analyze_and_update_context(body.repo_url, body.github_token, body.api_key))
except Exception as e:
print(f"Error analyzing repository: {str(e)}")
# Update handler context if analysis data exists
if current_analysis["data"] and current_analysis["system_prompt"]:
gemini_handler.set_context(current_analysis["data"], current_analysis["system_prompt"])
return {"status": "ok"}
@app.post("/analyze_repository")
async def analyze_repo(repo_url: str, github_token: str, gemini_api_key: str):
try:
summary, file_path, analyzer, system_prompt = await asyncio.to_thread(
analyze_repository, repo_url, github_token, gemini_api_key
)
# Load analysis data from file
with open(file_path, 'r') as f:
analysis_data = json.load(f)
# Update current analysis
current_analysis["data"] = analysis_data
current_analysis["analyzer"] = analyzer
current_analysis["file"] = file_path
current_analysis["summary"] = summary
current_analysis["system_prompt"] = system_prompt
# Update handler context
gemini_handler.set_context(analysis_data, system_prompt)
return {
"status": "success",
"summary": summary,
"file_path": file_path
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
async def analyze_and_update_context(repo_url: str, github_token: str, gemini_api_key: str):
try:
summary, file_path, analyzer, system_prompt = await asyncio.to_thread(
analyze_repository, repo_url, github_token, gemini_api_key
)
# Load analysis data from file
with open(file_path, 'r') as f:
analysis_data = json.load(f)
# Update current analysis
current_analysis["data"] = analysis_data
current_analysis["analyzer"] = analyzer
current_analysis["file"] = file_path
current_analysis["summary"] = summary
current_analysis["system_prompt"] = system_prompt
# Update handler context
gemini_handler.set_context(analysis_data, system_prompt)
except Exception as e:
print(f"Error analyzing repository in background: {str(e)}")
@app.post("/ask_question")
async def ask_repo_question(question: str):
if not current_analysis["file"] or not current_analysis["analyzer"]:
return {
"status": "error",
"message": "Please analyze a repository first before asking questions."
}
try:
response = await asyncio.to_thread(
ask_question,
question,
current_analysis["file"],
current_analysis["analyzer"],
GEMINI_API_KEY,
[]
)
# Extract just the response text
_, answer = response[0]
return {
"status": "success",
"answer": answer
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
@app.get("/")
async def index():
rtc_config = get_twilio_turn_credentials() if get_space() else None
# Check if index.html exists
if not index_html_path.exists():
# Create basic HTML if not exists
html_content = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>GitHub Repository Analyzer with Voice Chat</title>
<style>
:root {
--color-accent: #6366f1;
--color-background: #0f172a;
--color-surface: #1e293b;
--color-text: #e2e8f0;
--boxSize: 8px;
--gutter: 4px;
}
body {
margin: 0;
padding: 0;
background-color: var(--color-background);
color: var(--color-text);
font-family: system-ui, -apple-system, sans-serif;
min-height: 100vh;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
}
.container {
width: 90%;
max-width: 800px;
background-color: var(--color-surface);
padding: 2rem;
border-radius: 1rem;
box-shadow: 0 25px 50px -12px rgba(0, 0, 0, 0.25);
}
.wave-container {
position: relative;
display: flex;
min-height: 100px;
max-height: 128px;
justify-content: center;
align-items: center;
margin: 2rem 0;
}
.box-container {
display: flex;
justify-content: space-between;
height: 64px;
width: 100%;
}
.box {
height: 100%;
width: var(--boxSize);
background: var(--color-accent);
border-radius: 8px;
transition: transform 0.05s ease;
}
.controls {
display: grid;
gap: 1rem;
margin-bottom: 2rem;
}
.input-group {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
label {
font-size: 0.875rem;
font-weight: 500;
}
input,
select {
padding: 0.75rem;
border-radius: 0.5rem;
border: 1px solid rgba(255, 255, 255, 0.1);
background-color: var(--color-background);
color: var(--color-text);
font-size: 1rem;
}
button {
padding: 1rem 2rem;
border-radius: 0.5rem;
border: none;
background-color: var(--color-accent);
color: white;
font-weight: 600;
cursor: pointer;
transition: all 0.2s ease;
}
button:hover {
opacity: 0.9;
transform: translateY(-1px);
}
.icon-with-spinner {
display: flex;
align-items: center;
justify-content: center;
gap: 12px;
min-width: 180px;
}
.spinner {
width: 20px;
height: 20px;
border: 2px solid white;
border-top-color: transparent;
border-radius: 50%;
animation: spin 1s linear infinite;
flex-shrink: 0;
}
@keyframes spin {
to {
transform: rotate(360deg);
}
}
.pulse-container {
display: flex;
align-items: center;
justify-content: center;
gap: 12px;
min-width: 180px;
}
.pulse-circle {
width: 20px;
height: 20px;
border-radius: 50%;
background-color: white;
opacity: 0.2;
flex-shrink: 0;
transform: translateX(-0%) scale(var(--audio-level, 1));
transition: transform 0.1s ease;
}
/* Add styles for toast notifications */
.toast {
position: fixed;
top: 20px;
left: 50%;
transform: translateX(-50%);
padding: 16px 24px;
border-radius: 4px;
font-size: 14px;
z-index: 1000;
display: none;
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.2);
}
.toast.error {
background-color: #f44336;
color: white;
}
.toast.warning {
background-color: #ffd700;
color: black;
}
.tabs {
display: flex;
margin-bottom: 1rem;
}
.tab {
padding: 0.5rem 1rem;
cursor: pointer;
border-bottom: 2px solid transparent;
transition: all 0.2s ease;
}
.tab.active {
border-bottom: 2px solid var(--color-accent);
color: var(--color-accent);
}
.tab-content {
display: none;
}
.tab-content.active {
display: block;
}
.chat-container {
height: 300px;
overflow-y: auto;
border: 1px solid rgba(255, 255, 255, 0.1);
border-radius: 0.5rem;
padding: 1rem;
margin-bottom: 1rem;
background-color: rgba(0, 0, 0, 0.2);
}
.chat-message {
margin-bottom: 1rem;
padding-bottom: 1rem;
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
}
.chat-message:last-child {
border-bottom: none;
}
.chat-message .user {
font-weight: bold;
color: var(--color-accent);
margin-bottom: 0.5rem;
}
.chat-message .bot {
font-weight: bold;
color: #10b981;
margin-bottom: 0.5rem;
}
.chat-input {
display: flex;
gap: 0.5rem;
}
.chat-input input {
flex: 1;
}
</style>
</head>
<body>
<!-- Add toast element after body opening tag -->
<div id="error-toast" class="toast"></div>
<div style="text-align: center">
<h1>GitHub Repository Analyzer with Voice Chat</h1>
<p>Analyze GitHub repositories and chat with the AI using voice or text</p>
</div>
<div class="container">
<div class="tabs">
<div class="tab active" data-tab="repo">Repository Setup</div>
<div class="tab" data-tab="text">Text Chat</div>
<div class="tab" data-tab="voice">Voice Chat</div>
</div>
<div class="tab-content active" id="repo-tab">
<div class="controls">
<div class="input-group">
<label for="github-token">GitHub API Token</label>
<input type="password" id="github-token" placeholder="Enter your GitHub API token">
</div>
<div class="input-group">
<label for="gemini-api-key">Gemini API Key</label>
<input type="password" id="gemini-api-key" placeholder="Enter your Gemini API key">
</div>
<div class="input-group">
<label for="repo-url">GitHub Repository URL</label>
<input type="text" id="repo-url" placeholder="https://github.com/owner/repo">
</div>
<button id="analyze-button">Analyze Repository</button>
</div>
<div id="analysis-result" style="white-space: pre-wrap; max-height: 400px; overflow-y: auto; display: none;"></div>
</div>
<div class="tab-content" id="text-tab">
<div id="chat-container" class="chat-container"></div>
<div class="chat-input">
<input type="text" id="text-question" placeholder="Ask a question about the repository...">
<button id="ask-button">Ask</button>
</div>
</div>
<div class="tab-content" id="voice-tab">
<div class="controls">
<div class="input-group">
<label for="voice">Voice</label>
<select id="voice">
<option value="Puck">Puck</option>
<option value="Charon">Charon</option>
<option value="Kore">Kore</option>
<option value="Fenrir">Fenrir</option>
<option value="Aoede">Aoede</option>
</select>
</div>
</div>
<div class="wave-container">
<div class="box-container">
<!-- Boxes will be dynamically added here -->
</div>
</div>
<button id="start-button">Start Voice Chat</button>
</div>
</div>
<audio id="audio-output"></audio>
<script>
// Global variables
let peerConnection;
let audioContext;
let dataChannel;
let isRecording = false;
let webrtc_id;
let currentTab = 'repo';
let repositoryAnalyzed = false;
// DOM elements
const startButton = document.getElementById('start-button');
const analyzeButton = document.getElementById('analyze-button');
const askButton = document.getElementById('ask-button');
const repoUrlInput = document.getElementById('repo-url');
const githubTokenInput = document.getElementById('github-token');
const geminiApiKeyInput = document.getElementById('gemini-api-key');
const voiceSelect = document.getElementById('voice');
const audioOutput = document.getElementById('audio-output');
const boxContainer = document.querySelector('.box-container');
const tabs = document.querySelectorAll('.tab');
const tabContents = document.querySelectorAll('.tab-content');
const chatContainer = document.getElementById('chat-container');
const textQuestionInput = document.getElementById('text-question');
const analysisResult = document.getElementById('analysis-result');
// Initialize audio visualization
const numBars = 32;
for (let i = 0; i < numBars; i++) {
const box = document.createElement('div');
box.className = 'box';
boxContainer.appendChild(box);
}
// Tab switching
tabs.forEach(tab => {
tab.addEventListener('click', () => {
// First check if repository has been analyzed when switching to chat tabs
if ((tab.dataset.tab === 'text' || tab.dataset.tab === 'voice') && !repositoryAnalyzed) {
showError('Please analyze a repository first before chatting.');
return;
}
// Deactivate all tabs
tabs.forEach(t => t.classList.remove('active'));
tabContents.forEach(c => c.classList.remove('active'));
// Activate selected tab
tab.classList.add('active');
document.getElementById(`${tab.dataset.tab}-tab`).classList.add('active');
currentTab = tab.dataset.tab;
});
});
// Error message display
function showError(message) {
const toast = document.getElementById('error-toast');
toast.textContent = message;
toast.className = 'toast error';
toast.style.display = 'block';
// Hide toast after 5 seconds
setTimeout(() => {
toast.style.display = 'none';
}, 5000);
}
// Repository analysis
analyzeButton.addEventListener('click', async () => {
const repoUrl = repoUrlInput.value.trim();
const githubToken = githubTokenInput.value.trim();
const geminiApiKey = geminiApiKeyInput.value.trim();
if (!repoUrl || !githubToken || !geminiApiKey) {
showError('Please fill in all fields');
return;
}
try {
analyzeButton.disabled = true;
analyzeButton.textContent = 'Analyzing...';
// Call the analyze endpoint
const response = await fetch('/analyze_repository', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
repo_url: repoUrl,
github_token: githubToken,
gemini_api_key: geminiApiKey
})
});
const data = await response.json();
if (data.status === 'success') {
// Display the analysis result
analysisResult.textContent = data.summary;
analysisResult.style.display = 'block';
repositoryAnalyzed = true;
} else {
showError(data.message || 'Error analyzing repository');
}
} catch (err) {
showError('Failed to analyze repository: ' + err.message);
} finally {
analyzeButton.disabled = false;
analyzeButton.textContent = 'Analyze Repository';
}
});
// Text chat
askButton.addEventListener('click', async () => {
const question = textQuestionInput.value.trim();
if (!question) {
return;
}
// Add user message to chat
addMessageToChat('You', question);
textQuestionInput.value = '';
try {
// Call the ask endpoint
const response = await fetch('/ask_question', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
question: question
})
});
const data = await response.json();
if (data.status === 'success') {
// Add bot response to chat
addMessageToChat('AI', data.answer);
} else {
addMessageToChat('AI', 'Error: ' + (data.message || 'Failed to get response'));
}
} catch (err) {
addMessageToChat('AI', 'Error: ' + err.message);
}
});
// Add message to chat container
function addMessageToChat(sender, message) {
const messageDiv = document.createElement('div');
messageDiv.className = 'chat-message';
const senderDiv = document.createElement('div');
senderDiv.className = sender === 'You' ? 'user' : 'bot';
senderDiv.textContent = sender + ':';
const contentDiv = document.createElement('div');
contentDiv.innerHTML = formatMessage(message);
messageDiv.appendChild(senderDiv);
messageDiv.appendChild(contentDiv);
chatContainer.appendChild(messageDiv);
chatContainer.scrollTop = chatContainer.scrollHeight;
}
// Format message with Markdown-like syntax
function formatMessage(message) {
// Convert code blocks
message = message.replace(/```([^`]+)```/g, '<pre><code>$1</code></pre>');
// Convert inline code
message = message.replace(/`([^`]+)`/g, '<code>$1</code>');
// Convert bold
message = message.replace(/\*\*([^*]+)\*\*/g, '<strong>$1</strong>');
// Convert bullets
message = message.replace(/- ([^\n]+)/g, 'β€’ $1<br>');
// Convert line breaks
message = message.replace(/\n/g, '<br>');
return message;
}
// Voice chat WebRTC setup
async function setupWebRTC() {
const config = __RTC_CONFIGURATION__;
peerConnection = new RTCPeerConnection(config);
webrtc_id = Math.random().toString(36).substring(7);
const timeoutId = setTimeout(() => {
const toast = document.getElementById('error-toast');
toast.textContent = "Connection is taking longer than usual. Are you on a VPN?";
toast.className = 'toast warning';
toast.style.display = 'block';
// Hide warning after 5 seconds
setTimeout(() => {
toast.style.display = 'none';
}, 5000);
}, 5000);
try {
const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
stream.getTracks().forEach(track => peerConnection.addTrack(track, stream));
// Update audio visualization setup
audioContext = new AudioContext();
analyser_input = audioContext.createAnalyser();
const source = audioContext.createMediaStreamSource(stream);
source.connect(analyser_input);
analyser_input.fftSize = 64;
dataArray_input = new Uint8Array(analyser_input.frequencyBinCount);
function updateAudioLevel() {
analyser_input.getByteFrequencyData(dataArray_input);
const average = Array.from(dataArray_input).reduce((a, b) => a + b, 0) / dataArray_input.length;
const audioLevel = average / 255;
const pulseCircle = document.querySelector('.pulse-circle');
if (pulseCircle) {
pulseCircle.style.setProperty('--audio-level', 1 + audioLevel);
}
animationId = requestAnimationFrame(updateAudioLevel);
}
updateAudioLevel();
// Add connection state change listener
peerConnection.addEventListener('connectionstatechange', () => {
console.log('connectionstatechange', peerConnection.connectionState);
if (peerConnection.connectionState === 'connected') {
clearTimeout(timeoutId);
const toast = document.getElementById('error-toast');
toast.style.display = 'none';
}
updateButtonState();
});
// Handle incoming audio
peerConnection.addEventListener('track', (evt) => {
if (audioOutput && audioOutput.srcObject !== evt.streams[0]) {
audioOutput.srcObject = evt.streams[0];
audioOutput.play();
// Set up audio visualization on the output stream
audioContext = new AudioContext();
analyser = audioContext.createAnalyser();
const source = audioContext.createMediaStreamSource(evt.streams[0]);
source.connect(analyser);
analyser.fftSize = 2048;
dataArray = new Uint8Array(analyser.frequencyBinCount);
updateVisualization();
}
});
// Create data channel for messages
dataChannel = peerConnection.createDataChannel('text');
dataChannel.onmessage = (event) => {
const eventJson = JSON.parse(event.data);
if (eventJson.type === "error") {
showError(eventJson.message);
} else if (eventJson.type === "send_input") {
fetch('/input_hook', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
webrtc_id: webrtc_id,
api_key: geminiApiKeyInput.value,
voice_name: voiceSelect.value,
repo_url: repoUrlInput.value,
github_token: githubTokenInput.value
})
});
}
};
// Create and send offer
const offer = await peerConnection.createOffer();
await peerConnection.setLocalDescription(offer);
await new Promise((resolve) => {
if (peerConnection.iceGatheringState === "complete") {
resolve();
} else {
const checkState = () => {
if (peerConnection.iceGatheringState === "complete") {
peerConnection.removeEventListener("icegatheringstatechange", checkState);
resolve();
}
};
peerConnection.addEventListener("icegatheringstatechange", checkState);
}
});
const response = await fetch('/webrtc/offer', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
sdp: peerConnection.localDescription.sdp,
type: peerConnection.localDescription.type,
webrtc_id: webrtc_id,
})
});
const serverResponse = await response.json();
if (serverResponse.status === 'failed') {
showError(serverResponse.meta.error === 'concurrency_limit_reached'
? `Too many connections. Maximum limit is ${serverResponse.meta.limit}`
: serverResponse.meta.error);
stop();
startButton.textContent = 'Start Voice Chat';
return;
}
await peerConnection.setRemoteDescription(serverResponse);
} catch (err) {
clearTimeout(timeoutId);
console.error('Error setting up WebRTC:', err);
showError('Failed to establish connection. Please try again.');
stop();
startButton.textContent = 'Start Voice Chat';
}
}
function updateButtonState() {
if (peerConnection && (peerConnection.connectionState === 'connecting' || peerConnection.connectionState === 'new')) {
startButton.innerHTML = `
<div class="icon-with-spinner">
<div class="spinner"></div>
<span>Connecting...</span>
</div>
`;
} else if (peerConnection && peerConnection.connectionState === 'connected') {
startButton.innerHTML = `
<div class="pulse-container">
<div class="pulse-circle"></div>
<span>Stop Voice Chat</span>
</div>
`;
} else {
startButton.innerHTML = 'Start Voice Chat';
}
}
function updateVisualization() {
if (!analyser) return;
analyser.getByteFrequencyData(dataArray);
const bars = document.querySelectorAll('.box');
for (let i = 0; i < bars.length; i++) {
const barHeight = (dataArray[i] / 255) * 2;
bars[i].style.transform = `scaleY(${Math.max(0.1, barHeight)})`;
}
animationId = requestAnimationFrame(updateVisualization);
}
function stopWebRTC() {
if (peerConnection) {
peerConnection.close();
}
if (animationId) {
cancelAnimationFrame(animationId);
}
if (audioContext) {
audioContext.close();
}
updateButtonState();
}
startButton.addEventListener('click', () => {
if (!isRecording) {
setupWebRTC();
startButton.classList.add('recording');
} else {
stopWebRTC();
startButton.classList.remove('recording');
}
isRecording = !isRecording;
});
// Press Enter to send text chat message
textQuestionInput.addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
askButton.click();
}
});
</script>
</body>
</html>
"""
else:
html_content = index_html_path.read_text()
# Replace RTC configuration
html_content = html_content.replace("__RTC_CONFIGURATION__", json.dumps(rtc_config))
return HTMLResponse(content=html_content)
# Gradio interface for web UI
def create_gradio_app():
with gr.Blocks(theme=gr.themes.Soft()) as app:
gr.Markdown("""
# πŸ” GitHub Repository Analyzer with Voice Chat
Analyze any public GitHub repository using AI. The tool will:
1. πŸ“Š Analyze repository structure, code, and development patterns
2. πŸ’‘ Generate comprehensive insights about the repository
3. πŸ’¬ Allow you to chat with AI about the repository using text or voice
4. πŸ“ Search code and view files in the repository
Enter a GitHub repository URL and your API keys to get started.
""")
with gr.Row():
with gr.Column():
repo_url = gr.Textbox(
label="GitHub Repository URL",
placeholder="https://github.com/owner/repo",
value=""
)
github_token = gr.Textbox(
label="GitHub API Token",
placeholder="Your GitHub API Token",
type="password",
value=GITHUB_TOKEN if GITHUB_TOKEN != "YOUR_GITHUB_TOKEN" else ""
)
gemini_api_key = gr.Textbox(
label="Gemini API Key",
placeholder="Your Gemini API Key",
type="password",
value=GEMINI_API_KEY if GEMINI_API_KEY != "YOUR_GEMINI_API_KEY" else ""
)
with gr.Row():
analyze_btn = gr.Button("πŸ” Analyze Repository", variant="primary")
# Add status message
status_msg = gr.Markdown("", elem_id="status_message")
tabs = gr.Tabs()
with tabs:
with gr.TabItem("Analysis"):
# Use Markdown for better formatting
summary = gr.Markdown(
label="Analysis Summary",
value=""
)
with gr.TabItem("Text Chat"):
repo_chatbot = gr.Chatbot(
label="Chat with Repository",
height=400,
show_label=True
)
with gr.Row():
repo_question = gr.Textbox(
label="Ask about the repository",
placeholder="Ask a question, search code, or request a file...",
scale=4
)
repo_ask_btn = gr.Button("πŸ’¬ Ask", variant="primary", scale=1)
repo_clear_btn = gr.Button("πŸ—‘οΈ Clear Chat", variant="secondary", scale=1)
with gr.TabItem("Voice Chat"):
gr.Markdown("""
## Voice Chat with Repository
Speak with an AI assistant about the repository using real-time voice chat.
Click the "Open Voice Chat" button to launch the voice interface in a new tab.
Note: Voice chat requires browser permission to access your microphone.
""")
voice_chat_btn = gr.Button("🎀 Open Voice Chat", variant="primary")
# Hidden state for analysis file and analyzer
analysis_file = gr.State("")
analyzer_state = gr.State(None)
system_prompt_state = gr.State("")
def clear_chat():
return []
def clear_outputs():
return "", [], "", None, ""
def open_voice_chat():
return "Opening voice chat in a new tab...", gr.update()
# Set up event handlers
analyze_btn.click(
fn=lambda: "⏳ Analysis in progress...",
inputs=None,
outputs=status_msg,
queue=False
).then(
analyze_repository,
inputs=[repo_url, github_token, gemini_api_key],
outputs=[summary, analysis_file, analyzer_state, system_prompt_state],
).then(
lambda: "βœ… Analysis complete! You can now chat with the repository.",
inputs=None,
outputs=status_msg,
queue=False
)
repo_ask_btn.click(
ask_question,
inputs=[repo_question, analysis_file, analyzer_state, gemini_api_key, repo_chatbot],
outputs=[repo_chatbot],
).then(
lambda: "", # Clear the question input
None,
repo_question,
queue=False
)
repo_clear_btn.click(
clear_chat,
inputs=None,
outputs=[repo_chatbot],
queue=False
)
voice_chat_btn.click(
open_voice_chat,
inputs=None,
outputs=[status_msg, voice_chat_btn],
).then(
lambda: gr.update(value="Voice Chat Opened"),
None,
voice_chat_btn,
js="""() => { window.open('/', '_blank'); return null; }"""
)
return app
# Main execution
if __name__ == "__main__":
import uvicorn
# Create Gradio interface
gradio_app = create_gradio_app()
# Get the Gradio FastAPI app
gradio_app = gr.mount_gradio_app(app, gradio_app, path="/gradio")
# Start the FastAPI server
uvicorn.run(app, host="0.0.0.0", port=7860)