githubRAG / app.py
nihalaninihal's picture
Update app.py
b0187b7 verified
import gradio as gr
import google.generativeai as genai
import os
from dotenv import load_dotenv
from github import Github, RateLimitExceededException, GithubException
import json
from pathlib import Path
from datetime import datetime, timedelta
from collections import defaultdict
import base64
from typing import Dict, List, Any, Optional, Tuple
import tempfile
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import asyncio
import aiohttp
import re
import ast
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from packaging import version
import requests
from bs4 import BeautifulSoup
import networkx as nx
import math
import logging
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables (consider handling missing .env)
load_dotenv()
# --- Constants and Global Variables ---
# Store API tokens globally
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") #getting github token using os
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") #getting gemini api key using os
# Constants for rate limiting - make them configurable if needed
MIN_RATE_LIMIT_BUFFER = 50 # Keep a buffer to avoid hitting the limit
INITIAL_BACKOFF = 60 # Initial backoff time in seconds
# Enhanced relevant file extensions
RELEVANT_EXTENSIONS = {
".py": "Python",
".js": "JavaScript",
".ts": "TypeScript",
".jsx": "React",
".tsx": "React TypeScript",
".java": "Java",
".cpp": "C++",
".c": "C",
".h": "C Header",
".hpp": "C++ Header",
".rb": "Ruby",
".php": "PHP",
".go": "Go",
".rs": "Rust",
".swift": "Swift",
".kt": "Kotlin",
".cs": "C#",
".scala": "Scala",
".r": "R",
".dart": "Dart",
".lua": "Lua",
".sql": "SQL",
".sh": "Shell",
".md": "Markdown", # Include Markdown for documentation analysis
".txt": "Text",
".json": "JSON",
".yml": "YAML",
".yaml": "YAML",
".xml": "XML",
".html": "HTML",
".css": "CSS"
}
# --- Initialization and Validation ---
def validate_github_token(token: str) -> Tuple[bool, str]:
"""
Validate GitHub token before proceeding with analysis.
Returns (is_valid: bool, message: str)
"""
if not token:
return False, "GitHub token is missing." # Check for missing
try:
gh = Github(token)
user = gh.get_user()
username = user.login #important: accessing properties for validation
rate_limit = gh.get_rate_limit()
remaining = rate_limit.core.remaining
if remaining == 0: #using remaining
reset_time = rate_limit.core.reset.strftime("%Y-%m-%d %H:%M:%S UTC")
return False, f"Rate limit exceeded. Resets at {reset_time}"
return True, f"Token validated successfully (authenticated as {username})"
except GithubException as e:
if e.status == 401:
return False, "Invalid token - authentication failed"
elif e.status == 403:
return False, "Token lacks required permissions or rate limit exceeded" #more specific 403 message
elif e.status == 404:
return False, "Invalid token or API endpoint not found" # More specific 404 message
else:
return False, f"GitHub error (status {e.status}): {e.data.get('message', str(e))}"
except Exception as e: # General exception handling as a fallback.
return False, f"Error validating token: {str(e)}"
def initialize_tokens(github_token: str, gemini_key: str) -> str:
"""Initialize API tokens globally with enhanced validation (using env vars now)."""
global GITHUB_TOKEN, GEMINI_API_KEY
if not github_token or not gemini_key:
return "❌ Both GitHub and Gemini API keys are required."
is_valid, message = validate_github_token(github_token)
if not is_valid:
return f"❌ GitHub token validation failed: {message}"
try:
genai.configure(api_key=gemini_key)
model = genai.GenerativeModel('gemini-1.0-pro')
response = model.generate_content("Test")
if response.text is None : # important check.
return "❌ Invalid Gemini API key (no response)" #More informative.
# else:
# return "Invalid"
except Exception as e:
return f"❌ Gemini API key validation failed: {str(e)}"
GITHUB_TOKEN = github_token # Overwrite with validated tokens
GEMINI_API_KEY = gemini_key
return "βœ… All tokens validated and initialized successfully!"
# --- Classes ---
class GitHubAPIHandler:
"""Enhanced GitHub API handler with minimal authentication checks and robust error handling."""
def __init__(self, token: Optional[str] = None):
self.logger = logging.getLogger(__name__)
self.token = token
self._min_rate_limit_buffer = MIN_RATE_LIMIT_BUFFER
self._initial_backoff = INITIAL_BACKOFF
if not self.token:
raise ValueError("GitHub token not provided")
# Create the GitHub client *within* the class
self.gh = self._create_github_client()
def _create_github_client(self) -> Github:
"""Create GitHub client with enhanced error handling"""
try:
# Create Github instance with basic configuration
gh = Github(
self.token,
retry=3, # Number of retries for failed requests
timeout=30, # Timeout in seconds
per_page=100 # Maximum items per page
)
# Verify authentication
try:
user = gh.get_user()
self.logger.info(f"Authenticated as: {user.login}")
except GithubException as e:
if e.status == 401:
raise ValueError("Invalid GitHub token - authentication failed")
elif e.status == 403:
raise ValueError("GitHub token lacks required permissions or rate limit exceeded")
else:
raise ValueError(f"GitHub initialization failed: {str(e)}")
return gh # Return the authenticated client
except Exception as e:
raise ValueError(f"Failed to initialize GitHub client: {str(e)}") # More informative error
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((RateLimitExceededException, GithubException)),
before_sleep=lambda retry_state: logging.info(
f"Rate limited, retrying in {retry_state.next_action.sleep} seconds..."),
)
def get_repository(self, repo_url: str) -> Any:
"""Get repository object using PyGithub, with error handling and validation."""
try:
parts = repo_url.rstrip('/').split('/')
if len(parts) < 2:
raise ValueError(f"Invalid repository URL format: {repo_url}")
owner = parts[-2]
repo_name = parts[-1]
# Using PyGithub's get_repo method
repo = self.gh.get_repo(f"{owner}/{repo_name}")
return repo # Return the repo object
except GithubException as e: # Specifically handle Github exceptions
if e.status == 404:
raise ValueError(f"Repository not found: {owner}/{repo_name}")
elif e.status == 403:
self._handle_forbidden_error() # Handle forbidden access (rate limits, etc.)
raise #Re raise the exception so program doesn't continue
else:
raise ValueError(f"Failed to access repository: {str(e)}")
except Exception as e: #catch all other exception.
raise ValueError(f"Failed to access repository(An unexpected error occurred):{str(e)}")
def _check_rate_limits(self):
"""Enhanced rate limit checking with predictive waiting."""
try:
rate_limit = self.gh.get_rate_limit()
remaining = rate_limit.core.remaining
reset_time = rate_limit.core.reset.timestamp()
self.logger.info(f"Rate limit - Remaining: {remaining}, Reset: {datetime.fromtimestamp(reset_time)}")
if remaining < self._min_rate_limit_buffer:
wait_time = self._get_rate_limit_wait_time()
if wait_time > 0: # Only log if there's a wait.
self.logger.warning(f"Approaching rate limit. Waiting {wait_time:.2f} seconds.")
time.sleep(wait_time) # Wait before hitting the limit
except GithubException as e: # Be specific about the exceptions you handle
self.logger.error(f"Error checking rate limits: {str(e)}")
time.sleep(60) # Wait a reasonable amount of time even if you cannot check
except Exception as e: # Always have general exception to handle
self.logger.error(f"Unexpected Error: {str(e)}") #General unexpected Error handle.
time.sleep(60)
def _get_rate_limit_wait_time(self) -> float:
"""Calculate the time to wait until the rate limit resets."""
try:
rate_limit = self.gh.get_rate_limit()
reset_time = rate_limit.core.reset.timestamp()
current_time = time.time()
return max(0, reset_time - current_time + 1) # Add 1 second buffer
except Exception:
return self._initial_backoff # Fallback on any error in getting rate limits
def _handle_forbidden_error(self):
"""Handle a 403 Forbidden error from the GitHub API."""
try:
# Check if it's a rate limit issue.
rate_limit = self.gh.get_rate_limit()
if rate_limit.core.remaining == 0:
wait_time = self._get_rate_limit_wait_time()
self.logger.warning(f"Rate limit exceeded. Waiting {wait_time:.2f} seconds.")
time.sleep(wait_time)
else:
# If not rate limited, then likely a permissions issue
self.logger.error("Access forbidden. Token may lack required permissions.")
except Exception as e: #handling other errors.
self.logger.error(f"Error handling forbidden response: {str(e)}")
@retry(
stop=stop_after_attempt(3), # Maximum 3 retries
wait=wait_exponential(multiplier=1, min=4, max=10), #exponential backoff.
reraise=True # Reraise exception after retries.
)
def get_file_content(self, repo: Any, path: str) -> Optional[str]:
"""Get content of a file, with retries, rate limit check and error handling."""
try:
self._check_rate_limits() # Check rate limits *before* each attempt.
content = repo.get_contents(path)
return content
except GithubException as e:
if e.status == 404:
self.logger.warning(f"File not found: {path}") # 404 is not critical.
return None # explicitly return None
elif e.status == 403: # Explicitly handle forbidden
self._handle_forbidden_error() # Rate limiting or other access problem
raise # Raise after handling (waiting, logging).
# Any other GitHub error is an issue - log and re-raise
self.logger.error(f"Error getting file content: {str(e)}") #handle
raise #re-raise after loggng
except Exception as e: # General exception for unexpected issue.
self.logger.error(f"Unexpected Error : {str(e)}") #General exception handelling
raise
class CodeMetricsAnalyzer:
"""Handles detailed code metrics analysis with proper error handling."""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.size_metrics_cache = {} # Consider if needed with parallelization
def calculate_halstead_metrics(self, content: str, language: str = "Unknown") -> Dict[str, float]:
"""
Calculate Halstead complexity metrics for code.
"""
try:
# Define language-specific operators (more comprehensive)
operators = {
"Python": set([
'+', '-', '*', '/', '//', '**', '%', '==', '!=', '>', '<', '>=', '<=',
'and', 'or', 'not', 'is', 'in', '+=', '-=', '*=', '/=', '=',
'if', 'elif', 'else', 'for', 'while', 'def', 'class', 'return',
'yield', 'raise', 'break', 'continue', 'pass', 'assert',
'import', 'from', 'as', 'try', 'except', 'finally', 'with', 'async', 'await'
]),
"JavaScript": set([
'+', '-', '*', '/', '%', '**', '==', '===', '!=', '!==', '>', '<',
'>=', '<=', '&&', '||', '!', '=', '+=', '-=', '*=', '/=',
'if', 'else', 'for', 'while', 'function', 'return', 'class',
'new', 'delete', 'typeof', 'instanceof', 'void', 'try', 'catch',
'finally', 'throw', 'break', 'continue', 'default', 'case', 'async', 'await'
]),
"Java": set([ # Added Java operators
'+', '-', '*', '/', '%', '++', '--', '==', '!=', '>', '<', '>=', '<=',
'&&', '||', '!', '=', '+=', '-=', '*=', '/=', '%=',
'if', 'else', 'for', 'while', 'do', 'switch', 'case', 'default',
'break', 'continue', 'return', 'try', 'catch', 'finally', 'throw', 'throws',
'class', 'interface', 'extends', 'implements', 'new', 'instanceof', 'this', 'super'
]),
}.get(language, set(['+', '-', '*', '/', '=', '==', '>', '<', '>=', '<=']))
unique_operators = set()
unique_operands = set()
total_operators = 0
total_operands = 0
lines = content.splitlines()
for line in lines:
line = line.strip()
if line.startswith(('#', '//', '/*', '*')): # Handle comments
continue
for operator in operators:
if operator in line:
unique_operators.add(operator)
total_operators += line.count(operator)
# Improved operand counting (numbers, strings, identifiers)
numbers = re.findall(r'\b\d+(?:\.\d+)?\b', line)
unique_operands.update(numbers)
total_operands += len(numbers)
strings = re.findall(r'["\'][^"\']*["\']', line)
unique_operands.update(strings)
total_operands += len(strings)
identifiers = re.findall(r'\b[a-zA-Z_]\w*\b', line)
for ident in identifiers:
if ident not in operators:
unique_operands.add(ident)
total_operands += 1
n1 = len(unique_operators)
n2 = len(unique_operands)
N1 = total_operators
N2 = total_operands
# Handle edge cases to avoid division by zero
if n1 > 0 and n2 > 0:
program_length = N1 + N2
vocabulary = n1 + n2
volume = program_length * (math.log2(vocabulary) if vocabulary > 0 else 0)
difficulty = (n1 * N2) / (2 * n2) if n2 > 0 else 0
effort = volume * difficulty
time = effort / 18 # Standard Halstead time estimation
else:
program_length = vocabulary = volume = difficulty = effort = time = 0
return {
"halstead_unique_operators": n1,
"halstead_unique_operands": n2,
"halstead_total_operators": N1,
"halstead_total_operands": N2,
"halstead_program_length": program_length,
"halstead_vocabulary": vocabulary,
"halstead_volume": volume,
"halstead_difficulty": difficulty,
"halstead_effort": effort,
"halstead_time": time
}
except Exception as e:
self.logger.error(f"Error calculating Halstead metrics: {str(e)}")
# Return default 0 values for all metrics on error
return {metric: 0 for metric in [
"halstead_unique_operators", "halstead_unique_operands",
"halstead_total_operators", "halstead_total_operands",
"halstead_program_length", "halstead_vocabulary",
"halstead_volume", "halstead_difficulty", "halstead_effort", "halstead_time"
]}
def calculate_comment_density(self, content: str, language: str = "Unknown") -> Dict[str, Any]:
try:
metrics = {
"comment_lines": 0,
"code_lines": 0,
"blank_lines": 0,
"comment_density": 0.0,
"docstring_lines": 0, # Docstrings (Python)
"total_lines": 0, #Total no of line.
"inline_comments": 0
}
patterns = {
"Python": {
"single_line": ["#"],
"multi_start": ['"""', "'''"],
"multi_end": ['"""', "'''"],
"inline_start": "#"
},
"JavaScript": {
"single_line": ["//"],
"multi_start": ["/*"],
"multi_end": ["*/"],
"inline_start": "//"
},
"Java": { # Added Java comment patterns
"single_line": ["//"],
"multi_start": ["/*"],
"multi_end": ["*/"],
"inline_start": "//"
}
}.get(language, {
"single_line": ["//", "#"],
"multi_start": ["/*", '"""', "'''"],
"multi_end": ["*/", '"""', "'''"],
"inline_start": ["//", "#"]
})
lines = content.splitlines()
in_multiline_comment = False
current_multiline_delimiter = None
for line in lines:
stripped = line.strip()
metrics["total_lines"] += 1
if not stripped:
metrics["blank_lines"] += 1
continue
if not in_multiline_comment:
for delimiter in patterns["multi_start"]:
if stripped.startswith(delimiter):
in_multiline_comment = True
current_multiline_delimiter = delimiter
metrics["comment_lines"] += 1
if delimiter in ['"""', "'''"]:
metrics["docstring_lines"] += 1
break
elif delimiter in stripped: # Handle same-line multi-line comments
end_delimiter = "*/" if delimiter == "/*" else delimiter
if end_delimiter in stripped[stripped.index(delimiter) + len(delimiter):]:
metrics["comment_lines"] += 1
if delimiter in ['"""', "'''"]:
metrics["docstring_lines"] += 1
break
if not in_multiline_comment:
is_comment = False
for prefix in patterns["single_line"]:
if stripped.startswith(prefix):
metrics["comment_lines"] += 1
is_comment = True
break
elif prefix in stripped: # Count inline comments
metrics["inline_comments"] += 1
break
if not is_comment:
metrics["code_lines"] += 1
else:
metrics["comment_lines"] += 1
if current_multiline_delimiter in ['"""', "'''"]:
metrics["docstring_lines"] += 1
#checking current multi line delimeter stripped
if current_multiline_delimiter in stripped:
# Handle triple quotes properly
if current_multiline_delimiter in ['"""', "'''"] and \
stripped.count(current_multiline_delimiter) == 1:
continue #
in_multiline_comment = False
current_multiline_delimiter = None
non_blank_lines = metrics["total_lines"] - metrics["blank_lines"] #non blank lines calculating.
if non_blank_lines > 0:
metrics["comment_density"] = (metrics["comment_lines"] + metrics["inline_comments"]) / non_blank_lines * 100
metrics["docstring_density"] = metrics["docstring_lines"] / non_blank_lines * 100
if language == "Python":
# Check for module-level docstring
if len(lines) > 0 and (lines[0].strip().startswith('"""') or lines[0].strip().startswith("'''")):
metrics["has_module_docstring"] = True
metrics["module_docstring_lines"] = sum(1 for line in lines
if '"""' not in line and "'''" not in line
and bool(line.strip()))#counts the number of lines within a module-level docstring that are not the delimiters themselves and contain actual text
else:
metrics["has_module_docstring"] = False
metrics["module_docstring_lines"] = 0
return metrics
except Exception as e:
self.logger.error(f"Error calculating comment density: {str(e)}")
# Return 0s for all density metrics on error
return {
"comment_lines": 0,
"code_lines": 0,
"blank_lines": 0,
"comment_density": 0.0,
"docstring_lines": 0,
"total_lines": 0,
"inline_comments": 0,
"error": str(e) # Include the error message
}
def calculate_cyclomatic_complexity(self, content: str, language: str = "Unknown") -> Dict[str, Any]:
"""Calculate cyclomatic complexity metrics for code with language-specific handling."""
metrics = {
"complexity": 1, # Base complexity (always start at 1)
"cognitive_complexity": 0,
"max_nesting_depth": 0
}
try:
lines = content.splitlines()
current_depth = 0
# Language-specific complexity indicators (expanded)
complexity_keywords = {
"Python": {
"if", "else", "elif", "for", "while", "try", "except", "with",
"async for", "async with", "break", "continue"
},
"JavaScript": {
"if", "else", "for", "while", "try", "catch", "switch", "case",
"break", "continue", "&&", "||", "?", "async", "await" # Add async/await
},
"Java": { # Added Java keywords
"if", "else", "for", "while", "do", "switch", "case", "default",
"break", "continue", "try", "catch", "finally"
}
# Add more language-specific keywords as needed
}.get(language, {
# Default keywords for unknown languages
"if", "else", "elif", "for", "while", "try", "catch", "case", "switch",
"&&", "||", "?", "except", "finally", "with"
})
for line in lines:
# Calculate nesting depth
opens = line.count('{') - line.count('}')
current_depth += opens
metrics["max_nesting_depth"] = max(metrics["max_nesting_depth"], current_depth)
# Increment complexity for control structures
stripped_line = line.strip()
for keyword in complexity_keywords:
if keyword in stripped_line and not stripped_line.startswith(("//", "#", "/*", "*")): # Exclude comments
metrics["complexity"] += 1
metrics["cognitive_complexity"] += (1 + current_depth) # Cognitive complexity increase
if language == "Python":
# Add complexity for list/dict comprehensions
if "for" in stripped_line and ("[" in stripped_line or "{" in stripped_line):
metrics["complexity"] += 1
metrics["cognitive_complexity"] += 1 # Also add to cognitive
return metrics
except Exception as e:
self.logger.error(f"Error calculating complexity: {str(e)}")
# Return defaults, not just an error string, but also include 1 as base.
return {
"complexity": 1, # Ensure baseline complexity
"cognitive_complexity": 0,
"max_nesting_depth": 0
}
def detect_code_duplication(self, content: str, min_lines: int = 6) -> Dict[str, Any]:
"""Detect code duplication within the content"""
try:
metrics = {
"duplicate_blocks": 0,
"duplicate_lines": 0,
"duplication_percentage": 0.0
}
lines = content.splitlines()
total_lines = len(lines)
# Return early if there are not enough lines
if total_lines < min_lines:
return metrics
blocks = {}
for i in range(total_lines - min_lines + 1):
block = '\n'.join(lines[i:i + min_lines])
normalized_block = self._normalize_code_block(block)
if normalized_block.strip(): # Ignore all-whitespace blocks
if normalized_block in blocks:
blocks[normalized_block].append(i)
else:
blocks[normalized_block] = [i]
duplicate_line_set = set() # Track duplicate line indices using a *set*
for block, positions in blocks.items():
if len(positions) > 1:
metrics["duplicate_blocks"] += 1 # Count duplicate blocks
for pos in positions:
for i in range(pos, pos + min_lines): # Add all lines in duplicate block
duplicate_line_set.add(i)
metrics["duplicate_lines"] = len(duplicate_line_set) # Total count of duplicated lines
if total_lines > 0:
metrics["duplication_percentage"] = (metrics["duplicate_lines"] / total_lines) * 100 # Duplication metrics calcutation.
return metrics
except Exception as e:
self.logger.error(f"Error detecting code duplication: {str(e)}")
# Return 0 for all duplication metrics in case of error
return {
"duplicate_blocks": 0,
"duplicate_lines": 0,
"duplication_percentage": 0.0
}
def _normalize_code_block(self, block: str) -> str:
"""Normalize a block of code for comparison by removing comments, whitespace, etc."""
lines = []
for line in block.splitlines():
# Remove comments (handle both Python and JavaScript/Java comments)
line = re.sub(r'#.*$', '', line) # Python comments
line = re.sub(r'//.*$', '', line) # JavaScript comments
line = re.sub(r'/\*.*?\*/', '', line) # Multi-line comments
# Normalize whitespace
line = re.sub(r'\s+', ' ', line.strip())
if line: # Add non-empty lines
lines.append(line)
return '\n'.join(lines)
def calculate_size_metrics(self, content: str, language: str = "Unknown") -> Dict[str, Any]:
try:
metrics = {
"size_bytes": len(content),
"total_lines": 0,
"code_lines": 0,
"blank_lines": 0,
"comment_lines": 0,
"avg_line_length": 0,
"max_line_length": 0,
"file_entropy": 0, # Added file entropy.
}
comments = { # handling diff comments.
"Python": {
"line_comment": "#",
"block_start": ['"""', "'''"],
"block_end": ['"""', "'''"]
},
"JavaScript": {
"line_comment": "//",
"block_start": ["/*"],
"block_end": ["*/"]
},
"Java": { # Added Java comment definitions
"line_comment": "//",
"block_start": ["/*"],
"block_end": ["*/"]
}
}.get(language, {
"line_comment": "#",
"block_start": ["/*", '"""', "'''"],
"block_end": ["*/", '"""', "'''"]
})
lines = content.splitlines()
total_length = 0 # Track the total character count of all lines
char_counts = {} #count the occurance of characters in file
in_block_comment = False
for line in lines:
metrics["total_lines"] += 1
line_length = len(line) #length of lines
total_length += line_length
metrics["max_line_length"] = max(metrics["max_line_length"], line_length)
for char in line:
char_counts[char] = char_counts.get(char, 0) + 1
stripped = line.strip() # Remove the strip function here.
if not stripped:
metrics["blank_lines"] += 1
continue
if not in_block_comment:
is_comment = False
for start in comments["block_start"]:
if stripped.startswith(start): # Use startswith on the stripped line.
in_block_comment = True
metrics["comment_lines"] += 1
is_comment = True #
break #must add break otherwise count may vary.
if not is_comment: # Out of block_start scope so we have more appropriate behaviour.
if stripped.startswith(comments["line_comment"]): # check if line is comment or code.
metrics["comment_lines"] += 1
else:
metrics["code_lines"] += 1
else:
metrics["comment_lines"] += 1 #comment lines
for end in comments["block_end"]: # Block end condition.
if end in stripped: # check comment block ends
in_block_comment = False #
break #
if metrics["total_lines"] > 0:
metrics["avg_line_length"] = total_length / metrics["total_lines"]
# Calculate entropy.
total_chars = sum(char_counts.values())
if total_chars > 0:
entropy = 0
for count in char_counts.values():
prob = count / total_chars
entropy -= prob * math.log2(prob)
metrics["file_entropy"] = entropy
# These aren't always in 'comment_density', so calculate here.
metrics["source_lines"] = metrics["code_lines"] + metrics["comment_lines"]
metrics["comment_ratio"] = (metrics["comment_lines"] / metrics["source_lines"] * 100
if metrics["source_lines"] > 0 else 0) # Handle potential division by zero.
return metrics
except Exception as e:
self.logger.error(f"Error calculating size metrics: {str(e)}")
# Return 0s and basic size info on error. Still provide content length
return {
"size_bytes": len(content) if content else 0, # File Size is valuable,even in error.
"total_lines": 0,
"code_lines": 0,
"blank_lines": 0,
"comment_lines": 0,
"avg_line_length": 0,
"max_line_length": 0,
"file_entropy": 0, # file_entropy added to default values.
"source_lines": 0, # return metrics initialized 0 for other metrices.
"comment_ratio": 0 #Return default values on errors
}
def analyze_function_metrics(self, content: str, language: str = "Unknown") -> Dict[str, Any]:
try:
metrics = {
"total_functions": 0,
"avg_function_length": 0,
"max_function_length": 0,
"avg_function_complexity": 0,
"max_function_complexity": 0,
"documented_functions": 0,
"function_lengths": [], # Collect all lengths
"function_complexities": [], # Collect all complexities
"function_details": [] # Store details of each function
}
# Language-specific function patterns
patterns = {
"Python": r"(?:async\s+)?def\s+(\w+)\s*\([^)]*\)\s*(?:->.*?)?:",
"JavaScript": r"(?:async\s+)?function\s+(\w+)\s*\([^)]*\)|(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>",
"TypeScript": r"(?:async\s+)?function\s+(\w+)\s*\([^)]*\)|(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>",
"Java": r"(?:public|private|protected|\s)\s+(?:static\s+)?[a-zA-Z_<>[\]]+\s+(\w+)\s*\([^)]*\)\s*(?:throws\s+[^{]+)?\s*\{",
"C#": r"(?:public|private|protected|\s)\s+(?:static\s+)?[a-zA-Z_<>[\]]+\s+(\w+)\s*\([^)]*\)\s*(?:where\s+[^{]+)?\s*\{",
}.get(language, r"function\s+(\w+)\s*\([^)]*\)")
lines = content.splitlines()
current_function = None
function_start = 0
in_function = False
function_content = []
brace_count = 0 #for count braces.
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped or stripped.startswith(('/', '#')): #handle empty lines
continue
if re.search(patterns, line):
current_function = {
"name": re.search(patterns, line).group(1), # Extract function name
"start_line": i + 1, # 1-based line numbers
"has_docstring": False,
"complexity": 1, #base complexity is one.
"nested_depth": 0,
"parameters": len(re.findall(r',', line)) + 1 if '(' in line else 0 # Count parameters
}
function_start = i #starting function line number.
in_function = True
function_content = [line] # Start collecting content
continue
if in_function:
function_content.append(line) #add the functions to function content.
brace_count += line.count('{') - line.count('}')
if language == "Python" and i == function_start + 1: # Check for docstring right after def
if stripped.startswith('"""') or stripped.startswith("'''"):
current_function["has_docstring"] = True
# More robust function end detection
if (language in ["Python"] and brace_count == 0 and not line.startswith(' ')) or \
(language not in ["Python"] and brace_count == 0 and line.rstrip().endswith('}')): #Robust function end check
func_content = '\n'.join(function_content) #join content function for metrics
current_function["length"] = len(function_content) # lines of function
complexity_metrics = self.calculate_cyclomatic_complexity(func_content, language)
current_function["complexity"] = complexity_metrics["complexity"] # Cyclomatic complexity
metrics["total_functions"] += 1 # Total Number of functions count.
metrics["function_lengths"].append(current_function["length"])
metrics["function_complexities"].append(current_function["complexity"])
metrics["max_function_length"] = max(metrics["max_function_length"],current_function["length"])# Compare current max value and store greater one.
metrics["max_function_complexity"] = max(metrics["max_function_complexity"],
current_function["complexity"]) # compare and find the max
if current_function["has_docstring"]:
metrics["documented_functions"] += 1 # count Document function
metrics["function_details"].append(current_function)
in_function = False
current_function = None
function_content = [] # Clear all collected datas.
if metrics["total_functions"] > 0:
metrics["avg_function_length"] = sum(metrics["function_lengths"]) / metrics["total_functions"]
metrics["avg_function_complexity"] = sum(metrics["function_complexities"]) / metrics["total_functions"]
metrics["documentation_ratio"] = metrics["documented_functions"] / metrics["total_functions"]
return metrics
except Exception as e:
self.logger.error(f"Error analyzing function metrics: {str(e)}")
# Return default values for all metrics in case of error.
return {
"total_functions": 0,
"avg_function_length": 0,
"max_function_length": 0,
"avg_function_complexity": 0,
"max_function_complexity": 0,
"documented_functions": 0,
"function_lengths": [],
"function_complexities": [],
"function_details": [],
"error": str(e) # Include the error for debugging.
}
def _analyze_file_metrics(self, file_content) -> Optional[Dict[str, Any]]:
"""Analyze metrics for a single file with proper error handling."""
try:
# Decode the file content (assuming it's base64 encoded)
content = base64.b64decode(file_content.content).decode('utf-8')
language = RELEVANT_EXTENSIONS.get(Path(file_content.path).suffix.lower(), "Unknown")
metrics = {
"path": file_content.path,
"metrics": {}
}
# Size metrics (always calculated)
try:
size_metrics = self.calculate_size_metrics(content, language)
metrics["metrics"].update(size_metrics) # Store results, handling None.
except Exception as e:
self.logger.error(f"Error calculating size metrics for {file_content.path}: {str(e)}")
# Provide default values even if there is error
metrics["metrics"].update({
"size_bytes": len(content), #we have this data even in errors.
"total_lines": len(content.splitlines()),
"code_lines": 0,
"blank_lines": 0,
"comment_lines": 0
})
# Complexity metrics (only for supported languages)
if language != "Unknown":
try:
complexity = self.calculate_cyclomatic_complexity(content, language)
metrics["metrics"]["complexity"] = complexity.get("complexity", 0)
metrics["metrics"]["cognitive_complexity"] = complexity.get("cognitive_complexity", 0) # Store cognitive.
except Exception as e:
self.logger.error(f"Error calculating complexity for {file_content.path}: {str(e)}")
metrics["metrics"].update({
"complexity": 0,
"cognitive_complexity": 0 # Default to 0 if error.
})
# Halstead metrics (for supported languages)
if language in ["Python", "JavaScript", "Java"]: # Check if language is supported
try:
halstead = self.calculate_halstead_metrics(content, language)
metrics["metrics"].update(halstead) # Add the results to file data.
except Exception as e:
self.logger.error(f"Error calculating Halstead metrics for {file_content.path}: {str(e)}")
# No defaults needed, halstead already returns 0s.
# Duplication metrics (always calculate)
try:
duplication = self.detect_code_duplication(content)
metrics["metrics"]["duplicate_segments"] = len(duplication.get("duplicate_segments", []))
except Exception as e:
self.logger.error(f"Error detecting duplication for {file_content.path}: {str(e)}")
metrics["metrics"]["duplicate_segments"] = 0 # Set to 0 on error
# Function-level metrics (for supported languages).
if language != "Unknown":
try:
function_metrics = self.analyze_function_metrics(content, language)
if function_metrics and "error" not in function_metrics: # Check for None AND no error
metrics["metrics"].update(function_metrics) #
except Exception as e:
self.logger.error(f"Error analyzing functions for {file_content.path}: {str(e)}")
# no default to add as function metrics handles defaults.
# Comment density (always calculated).
try:
comment_metrics = self.calculate_comment_density(content, language)
metrics["metrics"].update(comment_metrics) # Merge
except Exception as e:
self.logger.error(f"Error calculating comment density for {file_content.path}: {str(e)}")
metrics["metrics"].update({
"comment_density": 0, # Defaults on error
"docstring_lines": 0 # Add other relevant metrics
})
return metrics #Returns calculated data
except Exception as e: # General Exception to prevent crash.
self.logger.error(f"Error analyzing file {file_content.path}: {str(e)}")
# Return minimal error metrics (important)
return {
"path": file_content.path,
"metrics": {
"size_bytes": 0, # Important basic metric, try to preserve.
"total_lines": 0, # and total lines
"error": str(e)
}
}
class DependencyAnalyzer:
"""Handles dependency analysis with improved error handling."""
def __init__(self, repo):
self.repo = repo
self.logger = logging.getLogger(__name__)
self.dependency_files = {
"python": ["requirements.txt", "setup.py", "Pipfile", "pyproject.toml"],
"javascript": ["package.json", "yarn.lock", "package-lock.json"],
"java": ["pom.xml", "build.gradle"],
"ruby": ["Gemfile"],
"php": ["composer.json"],
"go": ["go.mod"],
"rust": ["Cargo.toml"],
"dotnet": ["*.csproj", "*.fsproj", "*.vbproj"] # .NET project files
}
async def analyze_dependencies(self) -> Dict[str, Any]:
"""Analyze project dependencies (async for aiohttp)."""
results = {
"dependency_files": [], # Files that specify the dependencies.
"dependencies": defaultdict(list), # Parsed dependencies.
"dependency_graph": defaultdict(list), # Relationship b/w Dependencies.
"outdated_dependencies": [], #
"security_alerts": [] # Placeholder for future security checks
}
try:
contents = self.repo.get_contents("")
while contents:
file_content = contents.pop(0)
if file_content.type == "dir":
contents.extend(self.repo.get_contents(file_content.path))
else:
for lang, patterns in self.dependency_files.items():
if any(self._matches_pattern(file_content.path, pattern) for pattern in patterns): #
try:
file_text = base64.b64decode(file_content.content).decode('utf-8') #
deps = await self._parse_dependency_file(file_content.path, file_text) #parsing the files to find dependency.
if deps: #check deps is not none.
results["dependencies"][file_content.path] = deps
results["dependency_files"].append(file_content.path) # add current file to list of dependency files.
except Exception as e:
self.logger.error(f"Error parsing {file_content.path}: {str(e)}")
results["outdated_dependencies"] = await self._check_outdated_dependencies(results["dependencies"])#
results["dependency_graph"] = self._build_dependency_graph(results["dependencies"])
except Exception as e:
self.logger.error(f"Error analyzing dependencies: {str(e)}")
# No need to return default values here, as the initialized 'results' dict is sufficient
return results
def _matches_pattern(self, filename: str, pattern: str) -> bool:
"""Check if a filename matches a given pattern (supports wildcards)."""
if pattern.startswith("*"):
return filename.endswith(pattern[1:]) # Simple wildcard match
return filename.endswith(pattern)
async def _parse_dependency_file(self, filepath: str, content: str) -> List[Dict[str, str]]:
"""Parse different dependency file formats and extract dependencies."""
deps = [] # Initialize an empty list to hold dependencies
try:
if filepath.endswith(('requirements.txt', 'Pipfile')): #requirements.txt or pipfile
for line in content.split('\n'):
if '==' in line:
name, version = line.strip().split('==')
deps.append({"name": name, "version": version, "type": "python"})
elif filepath.endswith('package.json'): #package.json
data = json.loads(content)
for dep_type in ['dependencies', 'devDependencies']: # Check both dependencies and devDependencies
if dep_type in data:
for name, version in data[dep_type].items():
# Remove semver characters like ^ and ~ for accurate comparisons
deps.append({
"name": name,
"version": version.replace('^', '').replace('~', ''), # Remove ^ and ~
"type": "npm"
})
# Add more file type parsing as needed (e.g., pom.xml for Java, Gemfile for Ruby)
except Exception as e:
self.logger.error(f"Error parsing {filepath}: {str(e)}")
# Don't add any dependencies if parsing fails
return deps # Always return the list, even if empty
async def _check_outdated_dependencies(self, dependencies: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, Any]]:
"""Check for outdated dependencies using respective package registries (async)."""
outdated = []
async with aiohttp.ClientSession() as session: #use aiotthp for faster http requests.
for filepath, deps in dependencies.items():
for dep in deps:
try:
if dep["type"] == "python":
async with session.get(f"https://pypi.org/pypi/{dep['name']}/json") as response:
if response.status == 200:
data = await response.json()
latest_version = data["info"]["version"]
# Use packaging.version for robust version comparison
if version.parse(latest_version) > version.parse(dep["version"]):
outdated.append({
"name": dep["name"],
"current_version": dep["version"],
"latest_version": latest_version,
"type": "python"
})
elif dep["type"] == "npm":
# Use npm registry API
async with session.get(f"https://registry.npmjs.org/{dep['name']}") as response:
if response.status == 200:
data = await response.json()
latest_version = data["dist-tags"]["latest"]
if version.parse(latest_version) > version.parse(dep['version']):
outdated.append({
"name": dep['name'],
"current_version": dep["version"],
"latest_version": latest_version,
"type": "npm"
})
# Add checks for other package types (Java, Ruby, etc.)
except Exception as e:
self.logger.error(f"Error checking version for {dep['name']}: {str(e)}")
# Continue checking other dependencies even if one fails
return outdated # Return the list, even if empty
def _build_dependency_graph(self, dependencies: Dict[str, List[Dict[str, str]]]) -> Dict[str, List[str]]:
"""Build a dependency graph to visualize relationships (using networkx)."""
graph = nx.DiGraph() # directed graph.
try:
for dep_file, deps in dependencies.items():
for dep in deps:
# Add edges to represent dependencies
graph.add_edge(dep_file, dep["name"]) # Dep file depends on individual libraries.
# Convert to a dictionary of lists for easier handling
return nx.to_dict_of_lists(graph)
except Exception as e:
self.logger.error(f"Error building dependency graph: {str(e)}")
return defaultdict(list) # Return an empty graph in case of error
class TestAnalyzer:
"""Handles test analysis."""
def __init__(self, repo):
self.repo = repo
self.logger = logging.getLogger(__name__) # Add logger
self.test_patterns = {
"python": ["test_*.py", "*_test.py", "tests/*.py"],
"javascript": ["*.test.js", "*.spec.js", "__tests__/*.js"],
"java": ["*Test.java", "*Tests.java"],
"ruby": ["*_test.rb", "*_spec.rb"],
"go": ["*_test.go"]
}
def analyze_tests(self) -> Dict[str, Any]:
"""Analyze test files, test counts, and (if possible) coverage information."""
results = {
"test_files": [],
"test_count": 0,
"coverage_data": {}, # Dictionary to hold any parsed coverage information.
"test_patterns": defaultdict(list) # Store the information about diff. testing pattern.
}
try:
contents = self.repo.get_contents("")
while contents:
content = contents.pop(0)
if content.type == "dir":
contents.extend(self.repo.get_contents(content.path))
elif self._is_test_file(content.path):
results["test_files"].append(content.path)
test_metrics = self._analyze_test_file(content) #metrics of single files.
results["test_patterns"][content.path] = test_metrics # Store results.
results["test_count"] += test_metrics.get("test_count", 0) # Safely get test_count
results["coverage_data"] = self._find_coverage_data() # Get any coverage.
except Exception as e:
self.logger.error(f"Error analyzing tests: {str(e)}") # Use logger
return results # Always return results
def _is_test_file(self, filepath: str) -> bool:
"""Check if a file is likely to be a test file, based on common patterns."""
for patterns in self.test_patterns.values():
for pattern in patterns:
if Path(filepath).match(pattern): # Use Path.match for wildcard matching
return True
return False
def _analyze_test_file(self, file_content) -> Dict[str, Any]:
"""Analyze an individual test file to count tests, assertions, etc."""
try:
content = base64.b64decode(file_content.content).decode('utf-8')
metrics = {
"test_count": 0,
"assertions": 0,
"test_classes": 0 # If using class-based tests
}
# Count test cases (using regex for common patterns)
metrics["test_count"] += len(re.findall(r'def test_', content)) # Python
metrics["test_count"] += len(re.findall(r'it\s*\([\'""]', content)) # JavaScript (Jest/Mocha)
metrics["assertions"] += len(re.findall(r'assert', content)) # General assertions
metrics["test_classes"] += len(re.findall(r'class\s+\w+Test', content)) # test class patterns.
return metrics
except Exception as e:
self.logger.error(f"Error analyzing test file: {str(e)}") # Use logger
return {} # Return empty dict on error
def _find_coverage_data(self) -> Dict[str, Any]:
"""Try to find coverage information (if available, e.g., from coverage reports)."""
coverage_data = {
"total_coverage": None,
"file_coverage": {}, # If file-level data available.
"coverage_report_found": False # for indicating we find coverage files.
}
try:
# Look for common coverage report files
coverage_files = [
".coverage", # Python coverage.py
"coverage.xml", # Cobertura (Python, Java)
"coverage.json", # Jest, other JavaScript
"coverage/lcov.info", # LCOV (C/C++, others)
"coverage/coverage-final.json" # Istanbul (JavaScript)
]
contents = self.repo.get_contents("")
while contents:
content = contents.pop(0)
if content.type == "dir":
contents.extend(self.repo.get_contents(content.path))
elif any(content.path.endswith(f) for f in coverage_files):
coverage_data["coverage_report_found"] = True # set covarage to True, Indicate report present.
parsed_coverage = self._parse_coverage_file(content) # Try to parse.
if parsed_coverage: #check parse_coverage is present
coverage_data.update(parsed_coverage) # Merge into result
except Exception as e:
self.logger.error(f"Error finding coverage data: {str(e)}")
return coverage_data
def _parse_coverage_file(self, file_content) -> Dict[str, Any]:
"""Parse a coverage report file (handles multiple formats)."""
try:
content = base64.b64decode(file_content.content).decode('utf-8')
if file_content.path.endswith('.json'):
data = json.loads(content)
# Handle different JSON formats (e.g., coverage.py, Istanbul)
if 'total' in data: # coverage.py format
return {
'total_coverage': data['total'].get('lines', {}).get('percent', 0),
'file_coverage': {
file: stats.get('lines', {}).get('percent', 0)
for file, stats in data.get('files', {}).items()
}
}
# Add handling for other JSON formats (e.g., Istanbul) as needed
elif file_content.path.endswith('.xml'):
# Parse XML (Cobertura format)
from xml.etree import ElementTree #for parse XML format
root = ElementTree.fromstring(content)
total = float(root.get('line-rate', 0)) * 100 # Overall coverage
file_coverage = {}
# Extract coverage per class/file
for class_elem in root.findall('.//class'):
filename = class_elem.get('filename', '')
line_rate = float(class_elem.get('line-rate', 0)) * 100
file_coverage[filename] = line_rate
return {
'total_coverage': total,
'file_coverage': file_coverage
}
elif file_content.path.endswith('lcov.info'):
# Parse LCOV format
total_lines = 0
covered_lines = 0
current_file = None
file_coverage = {}
for line in content.split('\n'):
if line.startswith('SF:'): # Source file
current_file = line[3:].strip()
elif line.startswith('LH:'): # Lines hit
covered = int(line[3:])
covered_lines += covered
elif line.startswith('LF:'): # Lines found
total = int(line[3:])
total_lines += total
if current_file and total > 0: # calculate coverage.
file_coverage[current_file] = (covered / total) * 100
return {
'total_coverage': (covered_lines / total_lines * 100) if total_lines > 0 else 0, # handle Total lines may be 0
'file_coverage': file_coverage
}
except Exception as e:
self.logger.error(f"Error parsing coverage file: {str(e)}")
return {} # Return empty dict on error
def analyze_test_quality(self, content: str) -> Dict[str, Any]:
"""
Analyze the quality of the tests themselves.
"""
try:
metrics = {
"assertion_density": 0, # Assertions per line of test code
"test_setup_complexity": 0, # How complex is the test setup?
"mock_usage": 0, # How frequently are mocks used?
"test_patterns": [], # List of identified test patterns and best practices.
"anti_patterns": [] # list of identified Anti patterns
}
lines = content.splitlines()
assertion_count = sum(1 for line in lines if 'assert' in line) # check assertion present.
metrics["assertion_density"] = assertion_count / len(lines) if lines else 0
setup_lines = []
in_setup = False
for line in lines:
if 'def setUp' in line or 'def setup' in line:
in_setup = True
elif in_setup and line.strip() and not line.startswith(' '): # if present it has any leading space of not.
in_setup = False
if in_setup:
setup_lines.append(line)
metrics["test_setup_complexity"] = len(setup_lines)
mock_count = sum(1 for line in lines if 'mock' in line.lower()) # count mock if present
metrics["mock_usage"] = mock_count
#detect patterns.
if any('parameterized' in line for line in lines):
metrics["test_patterns"].append("parameterized_tests") #
if any('fixture' in line for line in lines):
metrics["test_patterns"].append("fixture_usage")#
# Identify potential anti-patterns
if any('time.sleep' in line for line in lines):
metrics["anti_patterns"].append("sleep_in_tests")
if any('test' not in line.lower() for line in lines if line.strip().startswith('def')): # all method related to test or not.
metrics["anti_patterns"].append("non_test_methods") # anti_patterns if other extra methods there.
return metrics
except Exception as e:
self.logger.error(f"Error analyzing test quality: {str(e)}")
return { # Return default 0 values on error.
"assertion_density": 0,
"test_setup_complexity": 0,
"mock_usage": 0,
"test_patterns": [],
"anti_patterns": []
}
class DocumentationAnalyzer:
"""Handles documentation analysis."""
def __init__(self, repo):
self.repo = repo
self.logger = logging.getLogger(__name__) # Add logger
self.doc_patterns = [
"README.md",
"CONTRIBUTING.md",
"CHANGELOG.md",
"LICENSE",
"docs/", # Common documentation directories
"documentation/",
"wiki/" # Consider wiki as documentation
]
def analyze_documentation(self) -> Dict[str, Any]:
"""Analyze repository documentation (README, CONTRIBUTING, API docs, etc.)."""
results = {
"readme_analysis": None,
"contributing_guidelines": None,
"api_documentation": None, # Placeholder - can be expanded
"documentation_files": [], # All documantation.
"wiki_pages": [], # If the repo has a wiki
"documentation_coverage": 0.0 # Overall score
}
try:
# Analyze README
readme = self._get_file_content("README.md")
if readme:
results["readme_analysis"] = self._analyze_readme(readme)
# Check contributing guidelines
contributing = self._get_file_content("CONTRIBUTING.md")
if contributing:
results["contributing_guidelines"] = self._analyze_contributing(contributing)
contents = self.repo.get_contents("")
while contents:
content = contents.pop(0)
if content.type == "dir":
# Check for dedicated documentation directories
if content.path.lower() in ["docs", "documentation"]:
results["documentation_files"].extend(self._analyze_doc_directory(content.path))
contents.extend(self.repo.get_contents(content.path))
# Check for specific documentation files
elif any(content.path.endswith(pattern) for pattern in self.doc_patterns):
results["documentation_files"].append(content.path)
results["documentation_coverage"] = self._calculate_doc_coverage()
# Get wiki pages if available
try:
wiki_pages = self.repo.get_wiki_pages() # Requires PyGithub 2.x
results["wiki_pages"] = [page.title for page in wiki_pages]
except: # GitHub API might raise an exception if no wiki
pass
except Exception as e:
self.logger.error(f"Error analyzing documentation: {str(e)}") # Use logger
return results # Always return results
def _get_file_content(self, filepath: str) -> Optional[str]:
"""Helper to get the content of a specific file (handles not found)."""
try:
content = self.repo.get_contents(filepath)
return base64.b64decode(content.content).decode('utf-8')
except:
return None # File not found
def _analyze_readme(self, content: str) -> Dict[str, Any]:
"""Analyze the README content for completeness and key information."""
analysis = {
"sections": [], # List of identified sections (e.g., from headings)
"has_quickstart": False, # Quick start guide
"has_installation": False, # Installation instructions
"has_usage": False, # Basic usage examples
"has_api_docs": False, # Link to API docs?
"has_examples": False, # Code examples
"word_count": len(content.split()),
"completeness_score": 0.0
}
# Extract sections (using regex for headings)
sections = re.findall(r'^#+\s+(.+)$', content, re.MULTILINE) # match and return the content.
analysis["sections"] = sections
# Check for key components (using regex for robustness)
analysis["has_quickstart"] = bool(re.search(r'quick\s*start', content, re.I)) # Case-insensitive
analysis["has_installation"] = bool(re.search(r'install|setup', content, re.I))
analysis["has_usage"] = bool(re.search(r'usage|how\s+to\s+use', content, re.I)) # More flexible matching.
analysis["has_api_docs"] = bool(re.search(r'api|documentation', content, re.I))
analysis["has_examples"] = bool(re.search(r'example|demo', content, re.I)) # Broader example terms
# Calculate a simple completeness score
key_elements = [
analysis["has_quickstart"],
analysis["has_installation"],
analysis["has_usage"],
analysis["has_api_docs"],
analysis["has_examples"]
]
analysis["completeness_score"] = sum(key_elements) / len(key_elements) * 100
return analysis
def _analyze_contributing(self, content: str) -> Dict[str, Any]:
"""Analyze CONTRIBUTING.md for guidelines."""
analysis = {
"has_code_style": False, # Code Style Guide
"has_pr_process": False, # How to make PR
"has_issue_guidelines": False, #Guidelines for reporting issue.
"has_setup_instructions": False, # setup environment Instructions.
"completeness_score": 0.0
}
analysis["has_code_style"] = bool(re.search(r'code\s+style|coding\s+standards', content, re.I))
analysis["has_pr_process"] = bool(re.search(r'pull\s+request|PR', content, re.I)) # checking pull request
analysis["has_issue_guidelines"] = bool(re.search(r'issue|bug\s+report', content, re.I)) #issue and bug report.
analysis["has_setup_instructions"] = bool(re.search(r'setup|getting\s+started', content, re.I))# Setup.
key_elements = [ #key components present or not.
analysis["has_code_style"],
analysis["has_pr_process"],
analysis["has_issue_guidelines"],
analysis["has_setup_instructions"]
]
analysis["completeness_score"] = sum(key_elements) / len(key_elements) * 100 # calculate
return analysis
def _analyze_doc_directory(self, directory: str) -> List[str]:
"""Analyze a dedicated documentation directory (if present)."""
doc_files = []
try:
contents = self.repo.get_contents(directory)
for content in contents:
if content.type == "file":
doc_files.append(content.path)
except Exception as e:
self.logger.error(f"Error analyzing doc directory: {str(e)}") # Use logger
return doc_files
def _calculate_doc_coverage(self) -> float:
"""Calculate an overall documentation coverage score (heuristic)."""
# This is a simplified scoring system and should be customized
score = 0.0
total_points = 0
# Check README presence and quality
readme = self._get_file_content("README.md")
if readme:
readme_analysis = self._analyze_readme(readme)
score += readme_analysis["completeness_score"] / 100 * 40 # README is worth 40%
total_points += 40
# Check contributing guidelines
contributing = self._get_file_content("CONTRIBUTING.md")
if contributing:
contributing_analysis = self._analyze_contributing(contributing)
score += contributing_analysis["completeness_score"] / 100 * 20 # Contributing is worth 20%
total_points += 20
# Check API documentation (basic presence check)
if any(f.endswith(('.md', '.rst')) for f in self.doc_patterns):
score += 20 # API docs are worth 20%
total_points += 20
# Check for examples (this is simplified - could be improved)
if any('example' in f.lower() for f in self.doc_patterns): # Case-insensitive check
score += 20 # Examples are worth 20%
total_points += 20
return (score / total_points * 100) if total_points > 0 else 0.0 # Avoid division by 0
class CommunityAnalyzer:
"""Handles community metrics analysis."""
def __init__(self, repo):
self.repo = repo
self.logger = logging.getLogger(__name__) # Add logger
async def analyze_community(self) -> Dict[str, Any]:
"""Analyze community engagement, health, and contribution patterns."""
results = {
"engagement_metrics": await self._get_engagement_metrics(), # Await async calls
"issue_metrics": await self._analyze_issues(), # Await for analysis
"pr_metrics": await self._analyze_pull_requests(), # Await for PR
"contributor_metrics": self._analyze_contributors(),
"discussion_metrics": await self._analyze_discussions() # If discussions are enabled
}
return results # Returns Calculated community metrics.
async def _get_engagement_metrics(self) -> Dict[str, Any]:
"""Get basic repository engagement metrics (stars, forks, watchers)."""
metrics = {
"stars": self.repo.stargazers_count,
"forks": self.repo.forks_count,
"watchers": self.repo.subscribers_count,
"star_history": [], # Historical star data
"fork_history": [] # Historical fork data
}
try:
# Get star history (last 100 stars for efficiency)
stargazers = self.repo.get_stargazers_with_dates()
metrics["star_history"] = [
{"date": star.starred_at.isoformat(), "count": i + 1} # count: i+1 to show progression.
for i, star in enumerate(stargazers)
]
# Get fork history
forks = self.repo.get_forks() # No need for with_date.
metrics["fork_history"] = [
{"date": fork.created_at.isoformat(), "count": i + 1}
for i, fork in enumerate(forks)
]
except Exception as e:
self.logger.error(f"Error getting engagement metrics: {str(e)}") # Use logger
return metrics # Return calculated metrics data.
async def _analyze_issues(self) -> Dict[str, Any]:
"""Analyze repository issues (open, closed, response times, labels)."""
metrics = {
"total_issues": 0,
"open_issues": 0,
"closed_issues": 0,
"avg_time_to_close": None, # Average time to close an issue
"issue_categories": defaultdict(int), # Categorize issues by label
"response_times": [] # List of response times
}
try:
issues = self.repo.get_issues(state='all') # Get all issues (open and closed)
for issue in issues:
metrics["total_issues"] += 1
if issue.state == 'open':
metrics["open_issues"] += 1
else:
metrics["closed_issues"] += 1
# Calculate time to close (if closed_at is available)
if issue.closed_at and issue.created_at: #Calculate time,if issue closed.
time_to_close = (issue.closed_at - issue.created_at).total_seconds()
metrics["response_times"].append(time_to_close)
# Categorize issues by labels
for label in issue.labels:
metrics["issue_categories"][label.name] += 1
# Calculate average response time
if metrics["response_times"]: # Calculate Avg_response only if any time available.
metrics["avg_time_to_close"] = sum(metrics["response_times"]) / len(metrics["response_times"]) #avg = tot / no.
except Exception as e:
self.logger.error(f"Error analyzing issues: {str(e)}") # Use logger
return metrics
async def _analyze_pull_requests(self) -> Dict[str, Any]:
"""Analyze pull requests (open, closed, merged, review times, sizes)."""
metrics = {
"total_prs": 0,
"open_prs": 0,
"merged_prs": 0,
"closed_prs": 0,
"avg_time_to_merge": None, # Average time to merge a PR
"pr_sizes": defaultdict(int), # Categorize PRs by size (lines of code)
"review_times": [] # List of review times
}
try:
pulls = self.repo.get_pulls(state='all') # Get all PRs (open, closed, merged)
for pr in pulls:
metrics["total_prs"] += 1
if pr.state == 'open':
metrics["open_prs"] += 1
elif pr.merged:
metrics["merged_prs"] += 1
# Calculate time to merge
if pr.merged_at and pr.created_at:
time_to_merge = (pr.merged_at - pr.created_at).total_seconds()
metrics["review_times"].append(time_to_merge) #store calculated value
else:
metrics["closed_prs"] += 1 #
# Categorize PR sizes (simplified, based on additions + deletions)
if pr.additions + pr.deletions < 10:
metrics["pr_sizes"]["xs"] += 1 # Extra small
elif pr.additions + pr.deletions < 50:
metrics["pr_sizes"]["s"] += 1 # Small
elif pr.additions + pr.deletions < 250:
metrics["pr_sizes"]["m"] += 1 # Medium
elif pr.additions + pr.deletions < 1000:
metrics["pr_sizes"]["l"] += 1 # Large
else:
metrics["pr_sizes"]["xl"] += 1 # Extra large
# Calculate average review time
if metrics["review_times"]: #calculate Avg_time to merge if review times available.
metrics["avg_time_to_merge"] = sum(metrics["review_times"]) / len(metrics["review_times"]) #calculate Average.
except Exception as e:
self.logger.error(f"Error analyzing pull requests: {str(e)}") # Use logger
return metrics # retrun calculated metrics value.
def _analyze_contributors(self) -> Dict[str, Any]:
"""Analyze contributor patterns and engagement."""
metrics = {
"total_contributors": 0,
"active_contributors": 0, # Contributors active in the last 90 days
"contributor_types": defaultdict(int), # User, Organization, Bot
"contribution_frequency": defaultdict(int), # High, medium, low
"core_contributors": [] # List of core contributors (e.g., top 10%)
}
try:
contributors = self.repo.get_contributors()
for contributor in contributors:
metrics["total_contributors"] += 1
# Check for recent activity (last 90 days)
recent_commits = contributor.get_commits(since=datetime.now() - timedelta(days=90)) # active since
if recent_commits.totalCount > 0:
metrics["active_contributors"] += 1
# Categorize contributor types
metrics["contributor_types"][contributor.type] += 1 # increment by type.
# Analyze contribution frequency (simplified)
if contributor.contributions > 100: #Contribution level checking.
metrics["contribution_frequency"]["high"] += 1
# Consider contributors with >100 contributions as "core"
metrics["core_contributors"].append({
"login": contributor.login,
"contributions": contributor.contributions, # store
"type": contributor.type #Store.
})
elif contributor.contributions > 20:
metrics["contribution_frequency"]["medium"] += 1 # store in medium if condition satisfy.
else:
metrics["contribution_frequency"]["low"] += 1#
except Exception as e:
self.logger.error(f"Error analyzing contributors: {str(e)}") # Use logger
return metrics #return Calculated Contributer metrics
async def _analyze_discussions(self) -> Dict[str, Any]:
"""Analyze repository discussions (if enabled)."""
metrics = {
"total_discussions": 0,
"active_discussions": 0, # Discussions with recent activity
"categories": defaultdict(int), # Discussion categories
"avg_responses": 0, # Average number of responses per discussion
"response_times": [] # List of response times
}
try:
# Check if discussions are enabled
if self.repo.has_discussions: # first check for discussion enabled.
discussions = self.repo.get_discussions() # retrive all the discussion using get_discussions.
total_responses = 0
for discussion in discussions:
metrics["total_discussions"] += 1
# Check for active discussions (simplified: any comments = active)
if discussion.comments > 0:
metrics["active_discussions"] += 1
total_responses += discussion.comments # Calculate Total no of comments.
# Categorize discussions
metrics["categories"][discussion.category.name] += 1
# Calculate response times (time to first response)
if discussion.comments > 0:
first_response = discussion.get_comments().reversed[0] # Get first comment
response_time = (first_response.created_at - discussion.created_at).total_seconds() # time calcualtion.
metrics["response_times"].append(response_time) # append that.
# Calculate average responses per discussion
if metrics["active_discussions"] > 0: # Calculate only if value present.
metrics["avg_responses"] = total_responses / metrics["active_discussions"]
except Exception as e:
self.logger.error(f"Error analyzing discussions: {str(e)}") # Use logger
return metrics
class RepositoryAnalyzer:
"""Main class to analyze a GitHub repository."""
def __init__(self, repo_url: str, github_token: str):
self.logger = logging.getLogger(__name__)
self.gh = Github(github_token) # Keep for some top-level calls
self.gh_handler = GitHubAPIHandler(github_token) # Use the handler
self.code_metrics = CodeMetricsAnalyzer()
parts = repo_url.rstrip('/').split('/')
if len(parts) < 2:
raise ValueError("Invalid repository URL format")
self.repo_name = parts[-1]
self.owner = parts[-2]
self.analysis_data = { # Initialize data here
"basic_info": {},
"structure": {},
"code_metrics": {},
"dependencies": {},
"tests": {},
"documentation": {},
"community": {},
"visualizations": {}
}
try:
self.repo = self.gh_handler.get_repository(repo_url) # Use handler
# Initialize other analyzers *after* successfully getting the repo
self.dependency_analyzer = DependencyAnalyzer(self.repo)
self.test_analyzer = TestAnalyzer(self.repo)
self.doc_analyzer = DocumentationAnalyzer(self.repo)
self.community_analyzer = CommunityAnalyzer(self.repo)
except Exception as e:
self.logger.error(f"Failed to initialize repository analyzer: {str(e)}")
raise
async def analyze(self) -> Dict[str, Any]:
"""Perform the full repository analysis."""
try:
# Basic repository information
self.analysis_data["basic_info"] = {
"name": self.repo.name,
"owner": self.repo.owner.login,
"description": self.repo.description or "No description available", # Handle None
"stars": self.repo.stargazers_count,
"forks": self.repo.forks_count,
"created_at": self.repo.created_at.isoformat(), # Use isoformat()
"last_updated": self.repo.updated_at.isoformat(),
"primary_language": self.repo.language or "Not specified",
}
# Analyze repository structure with sampling
self.analysis_data["structure"] = await self._analyze_structure()
# Analyze code patterns and metrics
self.analysis_data["code_metrics"] = await self._analyze_code_metrics()
# Analyze dependencies
self.analysis_data["dependencies"] = await self.dependency_analyzer.analyze_dependencies()
# Analyze tests and coverage
self.analysis_data["tests"] = self.test_analyzer.analyze_tests()
# Analyze documentation
self.analysis_data["documentation"] = self.doc_analyzer.analyze_documentation()
# Analyze community health
self.analysis_data["community"] = await self.community_analyzer.analyze_community()
# Generate visualizations
self.analysis_data["visualizations"] = await self._generate_visualizations()
return self.analysis_data # Return the populated dict
except Exception as e:
self.logger.error(f"Error during analysis: {str(e)}")
raise
async def _analyze_structure(self) -> Dict[str, Any]:
"""Analyze the repository's file and directory structure, with sampling."""
structure = {
"files": defaultdict(int), # File type counts (e.g., .py, .js)
"directories": set(), # Unique directory paths
"total_size": 0, # Total size in bytes
"directory_tree": defaultdict(list), # Parent -> [children]
"file_samples": [] # Sample files for detailed analysis
}
try:
all_files = [] # Store all relevant files first
contents = self.repo.get_contents("")
while contents:
content = contents.pop(0)
if content.type == "dir":
structure["directories"].add(content.path)
# Build directory tree structure
structure["directory_tree"][os.path.dirname(content.path)].append(content.path) #correct way
contents.extend(self.repo.get_contents(content.path))
else:
ext = Path(content.path).suffix.lower() # Get lowercase extension
# Only consider relevant files
if ext in RELEVANT_EXTENSIONS:
structure["files"][ext] += 1 # Increment count for the file type
structure["total_size"] += content.size
all_files.append(content)
# Smart sampling of files
if all_files:
# Stratified sampling based on file types
samples_per_type = min(5, max(1, len(all_files) // len(structure["files"]) if structure["files"] else 1)) # At least one sample
for ext in structure["files"].keys():
ext_files = [f for f in all_files if f.path.endswith(ext)] #select the all file
if ext_files:
# Sort by size, and select a diverse sample
ext_files.sort(key=lambda x: x.size)
total_samples = min(samples_per_type, len(ext_files))
# Take samples evenly across the size range
step = max(1, len(ext_files) // total_samples)
for i in range(0, len(ext_files), step)[:total_samples]:# Select diverse files from list.
structure["file_samples"].append({
"path": ext_files[i].path,
"size": ext_files[i].size,
"type": RELEVANT_EXTENSIONS.get(ext, "Unknown") # Get language
})
except Exception as e:
self.logger.error(f"Error analyzing structure: {str(e)}")
# Don't need to return defaults if 'structure' dict is initialized.
return {
"file_types": dict(structure["files"]), # Convert defaultdict to dict
"directory_count": len(structure["directories"]),
"total_size": structure["total_size"],
"file_count": sum(structure["files"].values()), # Total relevant files
"directory_tree": dict(structure["directory_tree"]), # convert
"file_samples": structure["file_samples"]
}
async def _analyze_code_metrics(self) -> Dict[str, Any]:
"""Analyze code metrics for a sample of files, with parallel processing."""
metrics = {
"complexity_metrics": defaultdict(list), # Cyclomatic/cognitive, nesting
"duplication_metrics": defaultdict(list),
"function_metrics": defaultdict(list), # From function analysis
"comment_metrics": defaultdict(list), # Comment density
"language_metrics": defaultdict(dict) # Aggregate by language
}
try:
# Get all relevant files
contents = self.repo.get_contents("")
files_to_analyze = []
while contents:
content = contents.pop(0)
if content.type == "dir":
contents.extend(self.repo.get_contents(content.path))
elif Path(content.path).suffix.lower() in RELEVANT_EXTENSIONS: # Check file.
files_to_analyze.append(content)
# Use parallel processing for file analysis
with ThreadPoolExecutor(max_workers=min(10, len(files_to_analyze))) as executor: # Limit max worker upto 10.
futures = []
for file_content in files_to_analyze:
futures.append(executor.submit(self.code_metrics._analyze_file_metrics, file_content)) # passing arguments
for future in futures: #
try:
file_metrics = future.result() # Collect the results from the File Analysis
if file_metrics:
language = RELEVANT_EXTENSIONS.get(Path(file_metrics["path"]).suffix.lower(), "Unknown")
# Aggregate metrics (by language, for example)
# Correctly handle string keys for metrics
for metric_type, value in file_metrics["metrics"].items():
if isinstance(value, (int, float)):
metrics.setdefault(f"{metric_type}_metrics", defaultdict(list))[language].append(value) # store
# Update language-specific metrics
if language not in metrics["language_metrics"]:
metrics["language_metrics"][language] = {
"file_count": 0,
"total_lines": 0,
"total_complexity": 0
}
lang_metrics = metrics["language_metrics"][language] #get value based on language.
lang_metrics["file_count"] += 1
lang_metrics["total_lines"] += file_metrics["metrics"].get("total_lines", 0) # Total lines addition.
lang_metrics["total_complexity"] += file_metrics["metrics"].get("complexity", 0) #complexity count
except Exception as e:
self.logger.error(f"Error processing file metrics: {str(e)}")
return metrics # return aggregated
except Exception as e:
self.logger.error(f"Error analyzing code metrics: {str(e)}")
return metrics # Return the initialized dict (possibly empty)
async def _generate_visualizations(self) -> Dict[str, Any]:
"""Generate visualizations from the analyzed data (using matplotlib, seaborn, etc.)."""
visualizations = {}
try:
# Language distribution pie chart
if self.analysis_data.get("structure", {}).get("file_types"):
fig, ax = plt.subplots()
languages = self.analysis_data["structure"]["file_types"]
plt.pie(languages.values(), labels=languages.keys(), autopct='%1.1f%%')
plt.title("Language Distribution")
from io import BytesIO
buffer = BytesIO() # convert bytes
plt.savefig(buffer, format='png')
visualizations["language_distribution"] = base64.b64encode(buffer.getvalue()).decode()
plt.close()
# Code complexity heatmap (example using average complexity)
if self.analysis_data.get("code_metrics", {}).get("complexity_metrics"):
complexity_data = []
for lang, values in self.analysis_data["code_metrics"]["complexity_metrics"].items():
if values: # Ensure there are values to average
complexity_data.append({
"language": lang,
"avg_complexity": sum(values) / len(values)
})
if complexity_data: # If Data present generate graph.
df = pd.DataFrame(complexity_data)
plt.figure(figsize=(10, 6))
sns.barplot(data=df, x="language", y="avg_complexity")
plt.title("Average Code Complexity by Language")
plt.xticks(rotation=45) # Rotate x-axis labels
buffer = BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight') # Improve layout
visualizations["complexity_distribution"] = base64.b64encode(buffer.getvalue()).decode()
plt.close()
# Commit activity heatmap (example)
if self.analysis_data.get("community", {}).get("commit_history"): #check whether community & commit-history metrics
commit_data = self.analysis_data["community"]["commit_history"]
df = pd.DataFrame(commit_data)
df['date'] = pd.to_datetime(df['date']) # change into date time for visualization
df = df.set_index('date')
# Resample to daily counts
df = df.resample('D').count()
plt.figure(figsize=(12, 4)) # fixed size.
sns.heatmap(df.pivot_table(index=df.index.dayofweek, columns=df.index.month, values='count', aggfunc='sum')) # cretae heat map
plt.title("Commit Activity Heatmap") #tile.
buffer = BytesIO() #
plt.savefig(buffer, format='png', bbox_inches='tight')
visualizations["commit_heatmap"] = base64.b64encode(buffer.getvalue()).decode() #
plt.close() #
# Add more visualizations as needed (e.g., dependency graph, test coverage)
except Exception as e:
self.logger.error(f"Error generating visualizations: {str(e)}")
return visualizations # Even if empty
# --- Prompt Creation and LLM Interaction ---
def create_enhanced_analysis_prompt(analysis_data: Dict[str, Any]) -> str:
"""Create an enhanced prompt for the LLM analysis."""
return f"""You are an expert code analyst with deep experience in software architecture, development practices, and team dynamics.
Analyze the provided repository data and create a detailed, insightful analysis using the following sections:
# Repository Analysis for {analysis_data['basic_info']['name']}
## πŸ“Š Project Overview
[Analyze the basic repository information, including:
- Project purpose and description
- Repository age and activity level
- Key metrics (stars, forks, contributors)
- Primary technologies used
- Overall project health indicators]
## πŸ—οΈ Architecture and Code Organization
[Analyze the repository structure and code organization:
- Directory structure and organization patterns
- Code distribution across languages
- File organization and modularity
- Architectural patterns
- Development standards and practices
- Code complexity distribution
- Potential architectural improvements]
## πŸ’» Code Quality and Metrics
[Provide detailed analysis of code quality metrics:
- Cyclomatic complexity trends
- Code duplication patterns
- Function length and complexity
- Comment density and documentation quality
- Test coverage and quality
- Areas for potential improvement]
## πŸ“¦ Dependencies and Security
[Analyze the project's dependencies:
- Major dependencies and their versions
- Outdated dependencies
- Security vulnerabilities
- Dependency graph complexity
- Licensing considerations]
## πŸ“š Documentation Assessment
[Evaluate the project's documentation:
- README completeness and quality
- API documentation coverage
- Contributing guidelines
- Code comments and inline documentation
- Examples and tutorials
- Documentation maintenance status]
## πŸ§ͺ Testing and Quality Assurance
[Analyze testing practices:
- Test coverage metrics
- Testing patterns and approaches
- CI/CD implementation
- Quality assurance processes
- Areas needing additional testing]
## πŸ‘₯ Community Health and Engagement
[Evaluate community aspects:
- Contributor demographics and activity
- Issue and PR response times
- Community engagement metrics
- Communication patterns
- Governance model]
## πŸ“ˆ Development Trends
[Analyze development patterns:
- Commit frequency and distribution
- Code change patterns
- Release cycle analysis
- Development velocity
- Team collaboration patterns]
## πŸš€ Performance and Scalability
[Assess technical characteristics:
- Code performance indicators
- Scalability considerations
- Resource usage patterns
- Technical debt indicators
- Optimization opportunities]
## πŸ’‘ Key Insights
[Summarize the most important findings:
- Top 3 strengths
- Top 3 areas for improvement
- Unique characteristics
- Notable patterns or practices
- Risk factors]
## πŸ“‹ Recommendations
[Provide actionable recommendations:
- Immediate improvement opportunities
- Long-term strategic suggestions
- Specific tools or practices to consider
- Priority areas for focus
- Resource allocation suggestions]
Please analyze the following repository data thoroughly and provide detailed insights for each section:
{json.dumps(analysis_data, indent=2)}
"""
async def analyze_repository(repo_url: str, github_token: str, gemini_key: str, progress=gr.Progress()) -> Tuple[str, str, str]:
"""Analyze repository and generate LLM summary (async, with progress)."""
try:
# Re-initialize tokens each time
initialize_tokens(github_token, gemini_key) # Ensure fresh tokens
progress(0, desc="Initializing repository analysis...")
analyzer = RepositoryAnalyzer(repo_url, github_token)
progress(0.3, desc="Analyzing repository structure and patterns...")
analysis_data = await analyzer.analyze() # Await the analysis
progress(0.7, desc="Generating comprehensive analysis...")
# Use the more powerful Gemini 1.5 Pro model
model = genai.GenerativeModel(
model_name="gemini-1.5-pro", # Use 1.5 Pro
generation_config={
"temperature": 0.7,
"top_p": 0.95, # Use nucleus sampling
"top_k": 40,
"max_output_tokens": 8192, # Increased token limit
}
)
prompt = create_enhanced_analysis_prompt(analysis_data) # Use a better, sectioned prompt.
# Use streaming for a better user experience
chat = model.start_chat(history=[]) # Start fresh
response = chat.send_message(prompt)
progress(0.9, desc="Saving analysis results...")
# Save analysis data to a temporary file (for follow-up Q&A)
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
json.dump(analysis_data, f, indent=2)
analysis_file = f.name
progress(1.0, desc="Analysis complete!")
return response.text, analysis_file, "βœ… Analysis completed successfully!"
except Exception as e:
error_message = f"❌ Error analyzing repository: {str(e)}"
return "", "", error_message # Return empty strings for Markdown and file
async def ask_question(question: str, analysis_file: str, chat_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
"""Process a follow-up question about the analysis with enhanced context."""
if not analysis_file:
return chat_history + [(question, "Please analyze a repository first before asking questions.")]
try:
with open(analysis_file, 'r') as f:
analysis_data = json.load(f)
# Initialize chat with system prompt and history
model = genai.GenerativeModel(
model_name="gemini-1.5-pro", # Use 1.5 Pro
generation_config={
"temperature": 0.7,
"top_p": 0.8, # More focused sampling
"top_k": 40,
"max_output_tokens": 4096, # Increased token limit
}
)
# Build the context
context = """You are an expert code analyst helping users understand repository analysis results.
Provide detailed, technical, and actionable insights based on the analysis data. When appropriate,
reference specific metrics and patterns from the analysis. If making recommendations, be specific
and explain the reasoning behind them.
Repository Analysis Data:
"""
context += json.dumps(analysis_data, indent=2) + "\n\n"
if chat_history: # Previous Chat history if have any.
context += "Previous conversation:\n"
for user_msg, assistant_msg in chat_history[-3:]: # Only include last 3 exchanges for relevance.
context += f"User: {user_msg}\nAssistant: {assistant_msg}\n"
prompt = f"""{context}
User's Question: {question}
Please provide a detailed analysis that:
1. Directly addresses the user's question
2. References relevant metrics and data from the analysis
3. Provides context and explanations for technical concepts
4. Suggests actionable next steps or recommendations when appropriate
5. Maintains technical accuracy while being clear and understandable
Your response:"""
chat = model.start_chat(history=[]) # Start a new chat
response = chat.send_message(prompt)
return chat_history + [(question, response.text)] # Store new
except Exception as e:
error_message = f"Error processing question: {str(e)}"
return chat_history + [(question, error_message)]
# --- Gradio Interface ---
def create_interface():
with gr.Blocks(theme=gr.themes.Soft()) as app: # Use a theme
gr.Markdown("""
# πŸ” GitHub Repository Analyzer (Colab Version)
Analyze any public GitHub repository using AI.
""")
# API tokens
with gr.Row():
github_token = gr.Textbox(
label="GitHub Token",
type="password",
placeholder="Enter your GitHub token"
)
gemini_key = gr.Textbox(
label="Gemini API Key",
type="password",
placeholder="Enter your Gemini API key"
)
init_btn = gr.Button("Initialize Tokens", variant="secondary")
# Repository URL and analysis button
with gr.Row():
repo_url = gr.Textbox(
label="GitHub Repository URL",
placeholder="https://github.com/owner/repo",
scale=4 # Larger input box
)
analyze_btn = gr.Button("πŸ” Analyze", variant="primary", scale=1)
# Status message
status_msg = gr.Markdown("") # Display Error Status.
# Analysis results
with gr.Tabs():
with gr.Tab("πŸ“ Analysis Report"): # report Analysis.
summary = gr.Markdown("") # output report.
with gr.Tab("πŸ’­ Q&A"): # Improved label
chatbot = gr.Chatbot(
[],
label="Ask questions about the analysis",
height=400
)
with gr.Row():
question = gr.Textbox(
label="Your Question",
placeholder="Ask about specific aspects of the analysis...",
scale=4
)
ask_btn = gr.Button("Ask", scale=1)
clear_btn = gr.Button("Clear", scale=1)
# Hidden state to store the analysis data file path
analysis_file = gr.State("")
async def safe_analyze(repo_url: str, github_token: str, gemini_key: str):
"""Wrapper function to handle analysis and errors gracefully."""
try:
if not repo_url:
return None, None, "❌ Please enter a GitHub repository URL"
if not github_token or not gemini_key:
return None, None, "❌ Please initialize tokens first"
if not re.match(r'https?://github\.com/[\w-]+/[\w-]+/?$', repo_url):
return None, None, "❌ Invalid GitHub repository URL format"
summary, analysis_file, status = await analyze_repository(repo_url, github_token, gemini_key)
return summary, analysis_file, status
except Exception as e:
return None, None, f"❌ Analysis failed: {str(e)}"
# Event handlers
init_btn.click(
initialize_tokens,
inputs=[github_token, gemini_key],
outputs=status_msg
)
analyze_btn.click(
fn=lambda: "⏳ Analysis in progress...", # Immediate feedback
inputs=None,
outputs=status_msg,
queue=False # Don't queue this click
).then(
safe_analyze, # Call the wrapper
inputs=[repo_url, github_token, gemini_key],
outputs=[summary, analysis_file, status_msg]
)
ask_btn.click(
ask_question,
inputs=[question, analysis_file, chatbot], # Include chatbot history
outputs=[chatbot]
).then(
lambda: "", # Clear the question box after asking
None,
question,
queue=False
)
clear_btn.click(
lambda: ([], ""), # Clear chatbot and question
outputs=[chatbot, question]
)
return app
# Run the interface
if __name__ == "__main__":
app = create_interface()
app.launch(debug=True, share=True)