Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import google.generativeai as genai | |
| import os | |
| from dotenv import load_dotenv | |
| from github import Github, RateLimitExceededException, GithubException | |
| import json | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict | |
| import base64 | |
| from typing import Dict, List, Any, Optional, Tuple | |
| import tempfile | |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type | |
| import asyncio | |
| import aiohttp | |
| import re | |
| import ast | |
| from concurrent.futures import ThreadPoolExecutor | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from packaging import version | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import networkx as nx | |
| import math | |
| import logging | |
| import time | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables (consider handling missing .env) | |
| load_dotenv() | |
| # --- Constants and Global Variables --- | |
| # Store API tokens globally | |
| GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") #getting github token using os | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") #getting gemini api key using os | |
| # Constants for rate limiting - make them configurable if needed | |
| MIN_RATE_LIMIT_BUFFER = 50 # Keep a buffer to avoid hitting the limit | |
| INITIAL_BACKOFF = 60 # Initial backoff time in seconds | |
| # Enhanced relevant file extensions | |
| RELEVANT_EXTENSIONS = { | |
| ".py": "Python", | |
| ".js": "JavaScript", | |
| ".ts": "TypeScript", | |
| ".jsx": "React", | |
| ".tsx": "React TypeScript", | |
| ".java": "Java", | |
| ".cpp": "C++", | |
| ".c": "C", | |
| ".h": "C Header", | |
| ".hpp": "C++ Header", | |
| ".rb": "Ruby", | |
| ".php": "PHP", | |
| ".go": "Go", | |
| ".rs": "Rust", | |
| ".swift": "Swift", | |
| ".kt": "Kotlin", | |
| ".cs": "C#", | |
| ".scala": "Scala", | |
| ".r": "R", | |
| ".dart": "Dart", | |
| ".lua": "Lua", | |
| ".sql": "SQL", | |
| ".sh": "Shell", | |
| ".md": "Markdown", # Include Markdown for documentation analysis | |
| ".txt": "Text", | |
| ".json": "JSON", | |
| ".yml": "YAML", | |
| ".yaml": "YAML", | |
| ".xml": "XML", | |
| ".html": "HTML", | |
| ".css": "CSS" | |
| } | |
| # --- Initialization and Validation --- | |
| def validate_github_token(token: str) -> Tuple[bool, str]: | |
| """ | |
| Validate GitHub token before proceeding with analysis. | |
| Returns (is_valid: bool, message: str) | |
| """ | |
| if not token: | |
| return False, "GitHub token is missing." # Check for missing | |
| try: | |
| gh = Github(token) | |
| user = gh.get_user() | |
| username = user.login #important: accessing properties for validation | |
| rate_limit = gh.get_rate_limit() | |
| remaining = rate_limit.core.remaining | |
| if remaining == 0: #using remaining | |
| reset_time = rate_limit.core.reset.strftime("%Y-%m-%d %H:%M:%S UTC") | |
| return False, f"Rate limit exceeded. Resets at {reset_time}" | |
| return True, f"Token validated successfully (authenticated as {username})" | |
| except GithubException as e: | |
| if e.status == 401: | |
| return False, "Invalid token - authentication failed" | |
| elif e.status == 403: | |
| return False, "Token lacks required permissions or rate limit exceeded" #more specific 403 message | |
| elif e.status == 404: | |
| return False, "Invalid token or API endpoint not found" # More specific 404 message | |
| else: | |
| return False, f"GitHub error (status {e.status}): {e.data.get('message', str(e))}" | |
| except Exception as e: # General exception handling as a fallback. | |
| return False, f"Error validating token: {str(e)}" | |
| def initialize_tokens(github_token: str, gemini_key: str) -> str: | |
| """Initialize API tokens globally with enhanced validation (using env vars now).""" | |
| global GITHUB_TOKEN, GEMINI_API_KEY | |
| if not github_token or not gemini_key: | |
| return "β Both GitHub and Gemini API keys are required." | |
| is_valid, message = validate_github_token(github_token) | |
| if not is_valid: | |
| return f"β GitHub token validation failed: {message}" | |
| try: | |
| genai.configure(api_key=gemini_key) | |
| model = genai.GenerativeModel('gemini-1.0-pro') | |
| response = model.generate_content("Test") | |
| if response.text is None : # important check. | |
| return "β Invalid Gemini API key (no response)" #More informative. | |
| # else: | |
| # return "Invalid" | |
| except Exception as e: | |
| return f"β Gemini API key validation failed: {str(e)}" | |
| GITHUB_TOKEN = github_token # Overwrite with validated tokens | |
| GEMINI_API_KEY = gemini_key | |
| return "β All tokens validated and initialized successfully!" | |
| # --- Classes --- | |
| class GitHubAPIHandler: | |
| """Enhanced GitHub API handler with minimal authentication checks and robust error handling.""" | |
| def __init__(self, token: Optional[str] = None): | |
| self.logger = logging.getLogger(__name__) | |
| self.token = token | |
| self._min_rate_limit_buffer = MIN_RATE_LIMIT_BUFFER | |
| self._initial_backoff = INITIAL_BACKOFF | |
| if not self.token: | |
| raise ValueError("GitHub token not provided") | |
| # Create the GitHub client *within* the class | |
| self.gh = self._create_github_client() | |
| def _create_github_client(self) -> Github: | |
| """Create GitHub client with enhanced error handling""" | |
| try: | |
| # Create Github instance with basic configuration | |
| gh = Github( | |
| self.token, | |
| retry=3, # Number of retries for failed requests | |
| timeout=30, # Timeout in seconds | |
| per_page=100 # Maximum items per page | |
| ) | |
| # Verify authentication | |
| try: | |
| user = gh.get_user() | |
| self.logger.info(f"Authenticated as: {user.login}") | |
| except GithubException as e: | |
| if e.status == 401: | |
| raise ValueError("Invalid GitHub token - authentication failed") | |
| elif e.status == 403: | |
| raise ValueError("GitHub token lacks required permissions or rate limit exceeded") | |
| else: | |
| raise ValueError(f"GitHub initialization failed: {str(e)}") | |
| return gh # Return the authenticated client | |
| except Exception as e: | |
| raise ValueError(f"Failed to initialize GitHub client: {str(e)}") # More informative error | |
| def get_repository(self, repo_url: str) -> Any: | |
| """Get repository object using PyGithub, with error handling and validation.""" | |
| try: | |
| parts = repo_url.rstrip('/').split('/') | |
| if len(parts) < 2: | |
| raise ValueError(f"Invalid repository URL format: {repo_url}") | |
| owner = parts[-2] | |
| repo_name = parts[-1] | |
| # Using PyGithub's get_repo method | |
| repo = self.gh.get_repo(f"{owner}/{repo_name}") | |
| return repo # Return the repo object | |
| except GithubException as e: # Specifically handle Github exceptions | |
| if e.status == 404: | |
| raise ValueError(f"Repository not found: {owner}/{repo_name}") | |
| elif e.status == 403: | |
| self._handle_forbidden_error() # Handle forbidden access (rate limits, etc.) | |
| raise #Re raise the exception so program doesn't continue | |
| else: | |
| raise ValueError(f"Failed to access repository: {str(e)}") | |
| except Exception as e: #catch all other exception. | |
| raise ValueError(f"Failed to access repository(An unexpected error occurred):{str(e)}") | |
| def _check_rate_limits(self): | |
| """Enhanced rate limit checking with predictive waiting.""" | |
| try: | |
| rate_limit = self.gh.get_rate_limit() | |
| remaining = rate_limit.core.remaining | |
| reset_time = rate_limit.core.reset.timestamp() | |
| self.logger.info(f"Rate limit - Remaining: {remaining}, Reset: {datetime.fromtimestamp(reset_time)}") | |
| if remaining < self._min_rate_limit_buffer: | |
| wait_time = self._get_rate_limit_wait_time() | |
| if wait_time > 0: # Only log if there's a wait. | |
| self.logger.warning(f"Approaching rate limit. Waiting {wait_time:.2f} seconds.") | |
| time.sleep(wait_time) # Wait before hitting the limit | |
| except GithubException as e: # Be specific about the exceptions you handle | |
| self.logger.error(f"Error checking rate limits: {str(e)}") | |
| time.sleep(60) # Wait a reasonable amount of time even if you cannot check | |
| except Exception as e: # Always have general exception to handle | |
| self.logger.error(f"Unexpected Error: {str(e)}") #General unexpected Error handle. | |
| time.sleep(60) | |
| def _get_rate_limit_wait_time(self) -> float: | |
| """Calculate the time to wait until the rate limit resets.""" | |
| try: | |
| rate_limit = self.gh.get_rate_limit() | |
| reset_time = rate_limit.core.reset.timestamp() | |
| current_time = time.time() | |
| return max(0, reset_time - current_time + 1) # Add 1 second buffer | |
| except Exception: | |
| return self._initial_backoff # Fallback on any error in getting rate limits | |
| def _handle_forbidden_error(self): | |
| """Handle a 403 Forbidden error from the GitHub API.""" | |
| try: | |
| # Check if it's a rate limit issue. | |
| rate_limit = self.gh.get_rate_limit() | |
| if rate_limit.core.remaining == 0: | |
| wait_time = self._get_rate_limit_wait_time() | |
| self.logger.warning(f"Rate limit exceeded. Waiting {wait_time:.2f} seconds.") | |
| time.sleep(wait_time) | |
| else: | |
| # If not rate limited, then likely a permissions issue | |
| self.logger.error("Access forbidden. Token may lack required permissions.") | |
| except Exception as e: #handling other errors. | |
| self.logger.error(f"Error handling forbidden response: {str(e)}") | |
| def get_file_content(self, repo: Any, path: str) -> Optional[str]: | |
| """Get content of a file, with retries, rate limit check and error handling.""" | |
| try: | |
| self._check_rate_limits() # Check rate limits *before* each attempt. | |
| content = repo.get_contents(path) | |
| return content | |
| except GithubException as e: | |
| if e.status == 404: | |
| self.logger.warning(f"File not found: {path}") # 404 is not critical. | |
| return None # explicitly return None | |
| elif e.status == 403: # Explicitly handle forbidden | |
| self._handle_forbidden_error() # Rate limiting or other access problem | |
| raise # Raise after handling (waiting, logging). | |
| # Any other GitHub error is an issue - log and re-raise | |
| self.logger.error(f"Error getting file content: {str(e)}") #handle | |
| raise #re-raise after loggng | |
| except Exception as e: # General exception for unexpected issue. | |
| self.logger.error(f"Unexpected Error : {str(e)}") #General exception handelling | |
| raise | |
| class CodeMetricsAnalyzer: | |
| """Handles detailed code metrics analysis with proper error handling.""" | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| self.size_metrics_cache = {} # Consider if needed with parallelization | |
| def calculate_halstead_metrics(self, content: str, language: str = "Unknown") -> Dict[str, float]: | |
| """ | |
| Calculate Halstead complexity metrics for code. | |
| """ | |
| try: | |
| # Define language-specific operators (more comprehensive) | |
| operators = { | |
| "Python": set([ | |
| '+', '-', '*', '/', '//', '**', '%', '==', '!=', '>', '<', '>=', '<=', | |
| 'and', 'or', 'not', 'is', 'in', '+=', '-=', '*=', '/=', '=', | |
| 'if', 'elif', 'else', 'for', 'while', 'def', 'class', 'return', | |
| 'yield', 'raise', 'break', 'continue', 'pass', 'assert', | |
| 'import', 'from', 'as', 'try', 'except', 'finally', 'with', 'async', 'await' | |
| ]), | |
| "JavaScript": set([ | |
| '+', '-', '*', '/', '%', '**', '==', '===', '!=', '!==', '>', '<', | |
| '>=', '<=', '&&', '||', '!', '=', '+=', '-=', '*=', '/=', | |
| 'if', 'else', 'for', 'while', 'function', 'return', 'class', | |
| 'new', 'delete', 'typeof', 'instanceof', 'void', 'try', 'catch', | |
| 'finally', 'throw', 'break', 'continue', 'default', 'case', 'async', 'await' | |
| ]), | |
| "Java": set([ # Added Java operators | |
| '+', '-', '*', '/', '%', '++', '--', '==', '!=', '>', '<', '>=', '<=', | |
| '&&', '||', '!', '=', '+=', '-=', '*=', '/=', '%=', | |
| 'if', 'else', 'for', 'while', 'do', 'switch', 'case', 'default', | |
| 'break', 'continue', 'return', 'try', 'catch', 'finally', 'throw', 'throws', | |
| 'class', 'interface', 'extends', 'implements', 'new', 'instanceof', 'this', 'super' | |
| ]), | |
| }.get(language, set(['+', '-', '*', '/', '=', '==', '>', '<', '>=', '<='])) | |
| unique_operators = set() | |
| unique_operands = set() | |
| total_operators = 0 | |
| total_operands = 0 | |
| lines = content.splitlines() | |
| for line in lines: | |
| line = line.strip() | |
| if line.startswith(('#', '//', '/*', '*')): # Handle comments | |
| continue | |
| for operator in operators: | |
| if operator in line: | |
| unique_operators.add(operator) | |
| total_operators += line.count(operator) | |
| # Improved operand counting (numbers, strings, identifiers) | |
| numbers = re.findall(r'\b\d+(?:\.\d+)?\b', line) | |
| unique_operands.update(numbers) | |
| total_operands += len(numbers) | |
| strings = re.findall(r'["\'][^"\']*["\']', line) | |
| unique_operands.update(strings) | |
| total_operands += len(strings) | |
| identifiers = re.findall(r'\b[a-zA-Z_]\w*\b', line) | |
| for ident in identifiers: | |
| if ident not in operators: | |
| unique_operands.add(ident) | |
| total_operands += 1 | |
| n1 = len(unique_operators) | |
| n2 = len(unique_operands) | |
| N1 = total_operators | |
| N2 = total_operands | |
| # Handle edge cases to avoid division by zero | |
| if n1 > 0 and n2 > 0: | |
| program_length = N1 + N2 | |
| vocabulary = n1 + n2 | |
| volume = program_length * (math.log2(vocabulary) if vocabulary > 0 else 0) | |
| difficulty = (n1 * N2) / (2 * n2) if n2 > 0 else 0 | |
| effort = volume * difficulty | |
| time = effort / 18 # Standard Halstead time estimation | |
| else: | |
| program_length = vocabulary = volume = difficulty = effort = time = 0 | |
| return { | |
| "halstead_unique_operators": n1, | |
| "halstead_unique_operands": n2, | |
| "halstead_total_operators": N1, | |
| "halstead_total_operands": N2, | |
| "halstead_program_length": program_length, | |
| "halstead_vocabulary": vocabulary, | |
| "halstead_volume": volume, | |
| "halstead_difficulty": difficulty, | |
| "halstead_effort": effort, | |
| "halstead_time": time | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error calculating Halstead metrics: {str(e)}") | |
| # Return default 0 values for all metrics on error | |
| return {metric: 0 for metric in [ | |
| "halstead_unique_operators", "halstead_unique_operands", | |
| "halstead_total_operators", "halstead_total_operands", | |
| "halstead_program_length", "halstead_vocabulary", | |
| "halstead_volume", "halstead_difficulty", "halstead_effort", "halstead_time" | |
| ]} | |
| def calculate_comment_density(self, content: str, language: str = "Unknown") -> Dict[str, Any]: | |
| try: | |
| metrics = { | |
| "comment_lines": 0, | |
| "code_lines": 0, | |
| "blank_lines": 0, | |
| "comment_density": 0.0, | |
| "docstring_lines": 0, # Docstrings (Python) | |
| "total_lines": 0, #Total no of line. | |
| "inline_comments": 0 | |
| } | |
| patterns = { | |
| "Python": { | |
| "single_line": ["#"], | |
| "multi_start": ['"""', "'''"], | |
| "multi_end": ['"""', "'''"], | |
| "inline_start": "#" | |
| }, | |
| "JavaScript": { | |
| "single_line": ["//"], | |
| "multi_start": ["/*"], | |
| "multi_end": ["*/"], | |
| "inline_start": "//" | |
| }, | |
| "Java": { # Added Java comment patterns | |
| "single_line": ["//"], | |
| "multi_start": ["/*"], | |
| "multi_end": ["*/"], | |
| "inline_start": "//" | |
| } | |
| }.get(language, { | |
| "single_line": ["//", "#"], | |
| "multi_start": ["/*", '"""', "'''"], | |
| "multi_end": ["*/", '"""', "'''"], | |
| "inline_start": ["//", "#"] | |
| }) | |
| lines = content.splitlines() | |
| in_multiline_comment = False | |
| current_multiline_delimiter = None | |
| for line in lines: | |
| stripped = line.strip() | |
| metrics["total_lines"] += 1 | |
| if not stripped: | |
| metrics["blank_lines"] += 1 | |
| continue | |
| if not in_multiline_comment: | |
| for delimiter in patterns["multi_start"]: | |
| if stripped.startswith(delimiter): | |
| in_multiline_comment = True | |
| current_multiline_delimiter = delimiter | |
| metrics["comment_lines"] += 1 | |
| if delimiter in ['"""', "'''"]: | |
| metrics["docstring_lines"] += 1 | |
| break | |
| elif delimiter in stripped: # Handle same-line multi-line comments | |
| end_delimiter = "*/" if delimiter == "/*" else delimiter | |
| if end_delimiter in stripped[stripped.index(delimiter) + len(delimiter):]: | |
| metrics["comment_lines"] += 1 | |
| if delimiter in ['"""', "'''"]: | |
| metrics["docstring_lines"] += 1 | |
| break | |
| if not in_multiline_comment: | |
| is_comment = False | |
| for prefix in patterns["single_line"]: | |
| if stripped.startswith(prefix): | |
| metrics["comment_lines"] += 1 | |
| is_comment = True | |
| break | |
| elif prefix in stripped: # Count inline comments | |
| metrics["inline_comments"] += 1 | |
| break | |
| if not is_comment: | |
| metrics["code_lines"] += 1 | |
| else: | |
| metrics["comment_lines"] += 1 | |
| if current_multiline_delimiter in ['"""', "'''"]: | |
| metrics["docstring_lines"] += 1 | |
| #checking current multi line delimeter stripped | |
| if current_multiline_delimiter in stripped: | |
| # Handle triple quotes properly | |
| if current_multiline_delimiter in ['"""', "'''"] and \ | |
| stripped.count(current_multiline_delimiter) == 1: | |
| continue # | |
| in_multiline_comment = False | |
| current_multiline_delimiter = None | |
| non_blank_lines = metrics["total_lines"] - metrics["blank_lines"] #non blank lines calculating. | |
| if non_blank_lines > 0: | |
| metrics["comment_density"] = (metrics["comment_lines"] + metrics["inline_comments"]) / non_blank_lines * 100 | |
| metrics["docstring_density"] = metrics["docstring_lines"] / non_blank_lines * 100 | |
| if language == "Python": | |
| # Check for module-level docstring | |
| if len(lines) > 0 and (lines[0].strip().startswith('"""') or lines[0].strip().startswith("'''")): | |
| metrics["has_module_docstring"] = True | |
| metrics["module_docstring_lines"] = sum(1 for line in lines | |
| if '"""' not in line and "'''" not in line | |
| and bool(line.strip()))#counts the number of lines within a module-level docstring that are not the delimiters themselves and contain actual text | |
| else: | |
| metrics["has_module_docstring"] = False | |
| metrics["module_docstring_lines"] = 0 | |
| return metrics | |
| except Exception as e: | |
| self.logger.error(f"Error calculating comment density: {str(e)}") | |
| # Return 0s for all density metrics on error | |
| return { | |
| "comment_lines": 0, | |
| "code_lines": 0, | |
| "blank_lines": 0, | |
| "comment_density": 0.0, | |
| "docstring_lines": 0, | |
| "total_lines": 0, | |
| "inline_comments": 0, | |
| "error": str(e) # Include the error message | |
| } | |
| def calculate_cyclomatic_complexity(self, content: str, language: str = "Unknown") -> Dict[str, Any]: | |
| """Calculate cyclomatic complexity metrics for code with language-specific handling.""" | |
| metrics = { | |
| "complexity": 1, # Base complexity (always start at 1) | |
| "cognitive_complexity": 0, | |
| "max_nesting_depth": 0 | |
| } | |
| try: | |
| lines = content.splitlines() | |
| current_depth = 0 | |
| # Language-specific complexity indicators (expanded) | |
| complexity_keywords = { | |
| "Python": { | |
| "if", "else", "elif", "for", "while", "try", "except", "with", | |
| "async for", "async with", "break", "continue" | |
| }, | |
| "JavaScript": { | |
| "if", "else", "for", "while", "try", "catch", "switch", "case", | |
| "break", "continue", "&&", "||", "?", "async", "await" # Add async/await | |
| }, | |
| "Java": { # Added Java keywords | |
| "if", "else", "for", "while", "do", "switch", "case", "default", | |
| "break", "continue", "try", "catch", "finally" | |
| } | |
| # Add more language-specific keywords as needed | |
| }.get(language, { | |
| # Default keywords for unknown languages | |
| "if", "else", "elif", "for", "while", "try", "catch", "case", "switch", | |
| "&&", "||", "?", "except", "finally", "with" | |
| }) | |
| for line in lines: | |
| # Calculate nesting depth | |
| opens = line.count('{') - line.count('}') | |
| current_depth += opens | |
| metrics["max_nesting_depth"] = max(metrics["max_nesting_depth"], current_depth) | |
| # Increment complexity for control structures | |
| stripped_line = line.strip() | |
| for keyword in complexity_keywords: | |
| if keyword in stripped_line and not stripped_line.startswith(("//", "#", "/*", "*")): # Exclude comments | |
| metrics["complexity"] += 1 | |
| metrics["cognitive_complexity"] += (1 + current_depth) # Cognitive complexity increase | |
| if language == "Python": | |
| # Add complexity for list/dict comprehensions | |
| if "for" in stripped_line and ("[" in stripped_line or "{" in stripped_line): | |
| metrics["complexity"] += 1 | |
| metrics["cognitive_complexity"] += 1 # Also add to cognitive | |
| return metrics | |
| except Exception as e: | |
| self.logger.error(f"Error calculating complexity: {str(e)}") | |
| # Return defaults, not just an error string, but also include 1 as base. | |
| return { | |
| "complexity": 1, # Ensure baseline complexity | |
| "cognitive_complexity": 0, | |
| "max_nesting_depth": 0 | |
| } | |
| def detect_code_duplication(self, content: str, min_lines: int = 6) -> Dict[str, Any]: | |
| """Detect code duplication within the content""" | |
| try: | |
| metrics = { | |
| "duplicate_blocks": 0, | |
| "duplicate_lines": 0, | |
| "duplication_percentage": 0.0 | |
| } | |
| lines = content.splitlines() | |
| total_lines = len(lines) | |
| # Return early if there are not enough lines | |
| if total_lines < min_lines: | |
| return metrics | |
| blocks = {} | |
| for i in range(total_lines - min_lines + 1): | |
| block = '\n'.join(lines[i:i + min_lines]) | |
| normalized_block = self._normalize_code_block(block) | |
| if normalized_block.strip(): # Ignore all-whitespace blocks | |
| if normalized_block in blocks: | |
| blocks[normalized_block].append(i) | |
| else: | |
| blocks[normalized_block] = [i] | |
| duplicate_line_set = set() # Track duplicate line indices using a *set* | |
| for block, positions in blocks.items(): | |
| if len(positions) > 1: | |
| metrics["duplicate_blocks"] += 1 # Count duplicate blocks | |
| for pos in positions: | |
| for i in range(pos, pos + min_lines): # Add all lines in duplicate block | |
| duplicate_line_set.add(i) | |
| metrics["duplicate_lines"] = len(duplicate_line_set) # Total count of duplicated lines | |
| if total_lines > 0: | |
| metrics["duplication_percentage"] = (metrics["duplicate_lines"] / total_lines) * 100 # Duplication metrics calcutation. | |
| return metrics | |
| except Exception as e: | |
| self.logger.error(f"Error detecting code duplication: {str(e)}") | |
| # Return 0 for all duplication metrics in case of error | |
| return { | |
| "duplicate_blocks": 0, | |
| "duplicate_lines": 0, | |
| "duplication_percentage": 0.0 | |
| } | |
| def _normalize_code_block(self, block: str) -> str: | |
| """Normalize a block of code for comparison by removing comments, whitespace, etc.""" | |
| lines = [] | |
| for line in block.splitlines(): | |
| # Remove comments (handle both Python and JavaScript/Java comments) | |
| line = re.sub(r'#.*$', '', line) # Python comments | |
| line = re.sub(r'//.*$', '', line) # JavaScript comments | |
| line = re.sub(r'/\*.*?\*/', '', line) # Multi-line comments | |
| # Normalize whitespace | |
| line = re.sub(r'\s+', ' ', line.strip()) | |
| if line: # Add non-empty lines | |
| lines.append(line) | |
| return '\n'.join(lines) | |
| def calculate_size_metrics(self, content: str, language: str = "Unknown") -> Dict[str, Any]: | |
| try: | |
| metrics = { | |
| "size_bytes": len(content), | |
| "total_lines": 0, | |
| "code_lines": 0, | |
| "blank_lines": 0, | |
| "comment_lines": 0, | |
| "avg_line_length": 0, | |
| "max_line_length": 0, | |
| "file_entropy": 0, # Added file entropy. | |
| } | |
| comments = { # handling diff comments. | |
| "Python": { | |
| "line_comment": "#", | |
| "block_start": ['"""', "'''"], | |
| "block_end": ['"""', "'''"] | |
| }, | |
| "JavaScript": { | |
| "line_comment": "//", | |
| "block_start": ["/*"], | |
| "block_end": ["*/"] | |
| }, | |
| "Java": { # Added Java comment definitions | |
| "line_comment": "//", | |
| "block_start": ["/*"], | |
| "block_end": ["*/"] | |
| } | |
| }.get(language, { | |
| "line_comment": "#", | |
| "block_start": ["/*", '"""', "'''"], | |
| "block_end": ["*/", '"""', "'''"] | |
| }) | |
| lines = content.splitlines() | |
| total_length = 0 # Track the total character count of all lines | |
| char_counts = {} #count the occurance of characters in file | |
| in_block_comment = False | |
| for line in lines: | |
| metrics["total_lines"] += 1 | |
| line_length = len(line) #length of lines | |
| total_length += line_length | |
| metrics["max_line_length"] = max(metrics["max_line_length"], line_length) | |
| for char in line: | |
| char_counts[char] = char_counts.get(char, 0) + 1 | |
| stripped = line.strip() # Remove the strip function here. | |
| if not stripped: | |
| metrics["blank_lines"] += 1 | |
| continue | |
| if not in_block_comment: | |
| is_comment = False | |
| for start in comments["block_start"]: | |
| if stripped.startswith(start): # Use startswith on the stripped line. | |
| in_block_comment = True | |
| metrics["comment_lines"] += 1 | |
| is_comment = True # | |
| break #must add break otherwise count may vary. | |
| if not is_comment: # Out of block_start scope so we have more appropriate behaviour. | |
| if stripped.startswith(comments["line_comment"]): # check if line is comment or code. | |
| metrics["comment_lines"] += 1 | |
| else: | |
| metrics["code_lines"] += 1 | |
| else: | |
| metrics["comment_lines"] += 1 #comment lines | |
| for end in comments["block_end"]: # Block end condition. | |
| if end in stripped: # check comment block ends | |
| in_block_comment = False # | |
| break # | |
| if metrics["total_lines"] > 0: | |
| metrics["avg_line_length"] = total_length / metrics["total_lines"] | |
| # Calculate entropy. | |
| total_chars = sum(char_counts.values()) | |
| if total_chars > 0: | |
| entropy = 0 | |
| for count in char_counts.values(): | |
| prob = count / total_chars | |
| entropy -= prob * math.log2(prob) | |
| metrics["file_entropy"] = entropy | |
| # These aren't always in 'comment_density', so calculate here. | |
| metrics["source_lines"] = metrics["code_lines"] + metrics["comment_lines"] | |
| metrics["comment_ratio"] = (metrics["comment_lines"] / metrics["source_lines"] * 100 | |
| if metrics["source_lines"] > 0 else 0) # Handle potential division by zero. | |
| return metrics | |
| except Exception as e: | |
| self.logger.error(f"Error calculating size metrics: {str(e)}") | |
| # Return 0s and basic size info on error. Still provide content length | |
| return { | |
| "size_bytes": len(content) if content else 0, # File Size is valuable,even in error. | |
| "total_lines": 0, | |
| "code_lines": 0, | |
| "blank_lines": 0, | |
| "comment_lines": 0, | |
| "avg_line_length": 0, | |
| "max_line_length": 0, | |
| "file_entropy": 0, # file_entropy added to default values. | |
| "source_lines": 0, # return metrics initialized 0 for other metrices. | |
| "comment_ratio": 0 #Return default values on errors | |
| } | |
| def analyze_function_metrics(self, content: str, language: str = "Unknown") -> Dict[str, Any]: | |
| try: | |
| metrics = { | |
| "total_functions": 0, | |
| "avg_function_length": 0, | |
| "max_function_length": 0, | |
| "avg_function_complexity": 0, | |
| "max_function_complexity": 0, | |
| "documented_functions": 0, | |
| "function_lengths": [], # Collect all lengths | |
| "function_complexities": [], # Collect all complexities | |
| "function_details": [] # Store details of each function | |
| } | |
| # Language-specific function patterns | |
| patterns = { | |
| "Python": r"(?:async\s+)?def\s+(\w+)\s*\([^)]*\)\s*(?:->.*?)?:", | |
| "JavaScript": r"(?:async\s+)?function\s+(\w+)\s*\([^)]*\)|(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", | |
| "TypeScript": r"(?:async\s+)?function\s+(\w+)\s*\([^)]*\)|(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", | |
| "Java": r"(?:public|private|protected|\s)\s+(?:static\s+)?[a-zA-Z_<>[\]]+\s+(\w+)\s*\([^)]*\)\s*(?:throws\s+[^{]+)?\s*\{", | |
| "C#": r"(?:public|private|protected|\s)\s+(?:static\s+)?[a-zA-Z_<>[\]]+\s+(\w+)\s*\([^)]*\)\s*(?:where\s+[^{]+)?\s*\{", | |
| }.get(language, r"function\s+(\w+)\s*\([^)]*\)") | |
| lines = content.splitlines() | |
| current_function = None | |
| function_start = 0 | |
| in_function = False | |
| function_content = [] | |
| brace_count = 0 #for count braces. | |
| for i, line in enumerate(lines): | |
| stripped = line.strip() | |
| if not stripped or stripped.startswith(('/', '#')): #handle empty lines | |
| continue | |
| if re.search(patterns, line): | |
| current_function = { | |
| "name": re.search(patterns, line).group(1), # Extract function name | |
| "start_line": i + 1, # 1-based line numbers | |
| "has_docstring": False, | |
| "complexity": 1, #base complexity is one. | |
| "nested_depth": 0, | |
| "parameters": len(re.findall(r',', line)) + 1 if '(' in line else 0 # Count parameters | |
| } | |
| function_start = i #starting function line number. | |
| in_function = True | |
| function_content = [line] # Start collecting content | |
| continue | |
| if in_function: | |
| function_content.append(line) #add the functions to function content. | |
| brace_count += line.count('{') - line.count('}') | |
| if language == "Python" and i == function_start + 1: # Check for docstring right after def | |
| if stripped.startswith('"""') or stripped.startswith("'''"): | |
| current_function["has_docstring"] = True | |
| # More robust function end detection | |
| if (language in ["Python"] and brace_count == 0 and not line.startswith(' ')) or \ | |
| (language not in ["Python"] and brace_count == 0 and line.rstrip().endswith('}')): #Robust function end check | |
| func_content = '\n'.join(function_content) #join content function for metrics | |
| current_function["length"] = len(function_content) # lines of function | |
| complexity_metrics = self.calculate_cyclomatic_complexity(func_content, language) | |
| current_function["complexity"] = complexity_metrics["complexity"] # Cyclomatic complexity | |
| metrics["total_functions"] += 1 # Total Number of functions count. | |
| metrics["function_lengths"].append(current_function["length"]) | |
| metrics["function_complexities"].append(current_function["complexity"]) | |
| metrics["max_function_length"] = max(metrics["max_function_length"],current_function["length"])# Compare current max value and store greater one. | |
| metrics["max_function_complexity"] = max(metrics["max_function_complexity"], | |
| current_function["complexity"]) # compare and find the max | |
| if current_function["has_docstring"]: | |
| metrics["documented_functions"] += 1 # count Document function | |
| metrics["function_details"].append(current_function) | |
| in_function = False | |
| current_function = None | |
| function_content = [] # Clear all collected datas. | |
| if metrics["total_functions"] > 0: | |
| metrics["avg_function_length"] = sum(metrics["function_lengths"]) / metrics["total_functions"] | |
| metrics["avg_function_complexity"] = sum(metrics["function_complexities"]) / metrics["total_functions"] | |
| metrics["documentation_ratio"] = metrics["documented_functions"] / metrics["total_functions"] | |
| return metrics | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing function metrics: {str(e)}") | |
| # Return default values for all metrics in case of error. | |
| return { | |
| "total_functions": 0, | |
| "avg_function_length": 0, | |
| "max_function_length": 0, | |
| "avg_function_complexity": 0, | |
| "max_function_complexity": 0, | |
| "documented_functions": 0, | |
| "function_lengths": [], | |
| "function_complexities": [], | |
| "function_details": [], | |
| "error": str(e) # Include the error for debugging. | |
| } | |
| def _analyze_file_metrics(self, file_content) -> Optional[Dict[str, Any]]: | |
| """Analyze metrics for a single file with proper error handling.""" | |
| try: | |
| # Decode the file content (assuming it's base64 encoded) | |
| content = base64.b64decode(file_content.content).decode('utf-8') | |
| language = RELEVANT_EXTENSIONS.get(Path(file_content.path).suffix.lower(), "Unknown") | |
| metrics = { | |
| "path": file_content.path, | |
| "metrics": {} | |
| } | |
| # Size metrics (always calculated) | |
| try: | |
| size_metrics = self.calculate_size_metrics(content, language) | |
| metrics["metrics"].update(size_metrics) # Store results, handling None. | |
| except Exception as e: | |
| self.logger.error(f"Error calculating size metrics for {file_content.path}: {str(e)}") | |
| # Provide default values even if there is error | |
| metrics["metrics"].update({ | |
| "size_bytes": len(content), #we have this data even in errors. | |
| "total_lines": len(content.splitlines()), | |
| "code_lines": 0, | |
| "blank_lines": 0, | |
| "comment_lines": 0 | |
| }) | |
| # Complexity metrics (only for supported languages) | |
| if language != "Unknown": | |
| try: | |
| complexity = self.calculate_cyclomatic_complexity(content, language) | |
| metrics["metrics"]["complexity"] = complexity.get("complexity", 0) | |
| metrics["metrics"]["cognitive_complexity"] = complexity.get("cognitive_complexity", 0) # Store cognitive. | |
| except Exception as e: | |
| self.logger.error(f"Error calculating complexity for {file_content.path}: {str(e)}") | |
| metrics["metrics"].update({ | |
| "complexity": 0, | |
| "cognitive_complexity": 0 # Default to 0 if error. | |
| }) | |
| # Halstead metrics (for supported languages) | |
| if language in ["Python", "JavaScript", "Java"]: # Check if language is supported | |
| try: | |
| halstead = self.calculate_halstead_metrics(content, language) | |
| metrics["metrics"].update(halstead) # Add the results to file data. | |
| except Exception as e: | |
| self.logger.error(f"Error calculating Halstead metrics for {file_content.path}: {str(e)}") | |
| # No defaults needed, halstead already returns 0s. | |
| # Duplication metrics (always calculate) | |
| try: | |
| duplication = self.detect_code_duplication(content) | |
| metrics["metrics"]["duplicate_segments"] = len(duplication.get("duplicate_segments", [])) | |
| except Exception as e: | |
| self.logger.error(f"Error detecting duplication for {file_content.path}: {str(e)}") | |
| metrics["metrics"]["duplicate_segments"] = 0 # Set to 0 on error | |
| # Function-level metrics (for supported languages). | |
| if language != "Unknown": | |
| try: | |
| function_metrics = self.analyze_function_metrics(content, language) | |
| if function_metrics and "error" not in function_metrics: # Check for None AND no error | |
| metrics["metrics"].update(function_metrics) # | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing functions for {file_content.path}: {str(e)}") | |
| # no default to add as function metrics handles defaults. | |
| # Comment density (always calculated). | |
| try: | |
| comment_metrics = self.calculate_comment_density(content, language) | |
| metrics["metrics"].update(comment_metrics) # Merge | |
| except Exception as e: | |
| self.logger.error(f"Error calculating comment density for {file_content.path}: {str(e)}") | |
| metrics["metrics"].update({ | |
| "comment_density": 0, # Defaults on error | |
| "docstring_lines": 0 # Add other relevant metrics | |
| }) | |
| return metrics #Returns calculated data | |
| except Exception as e: # General Exception to prevent crash. | |
| self.logger.error(f"Error analyzing file {file_content.path}: {str(e)}") | |
| # Return minimal error metrics (important) | |
| return { | |
| "path": file_content.path, | |
| "metrics": { | |
| "size_bytes": 0, # Important basic metric, try to preserve. | |
| "total_lines": 0, # and total lines | |
| "error": str(e) | |
| } | |
| } | |
| class DependencyAnalyzer: | |
| """Handles dependency analysis with improved error handling.""" | |
| def __init__(self, repo): | |
| self.repo = repo | |
| self.logger = logging.getLogger(__name__) | |
| self.dependency_files = { | |
| "python": ["requirements.txt", "setup.py", "Pipfile", "pyproject.toml"], | |
| "javascript": ["package.json", "yarn.lock", "package-lock.json"], | |
| "java": ["pom.xml", "build.gradle"], | |
| "ruby": ["Gemfile"], | |
| "php": ["composer.json"], | |
| "go": ["go.mod"], | |
| "rust": ["Cargo.toml"], | |
| "dotnet": ["*.csproj", "*.fsproj", "*.vbproj"] # .NET project files | |
| } | |
| async def analyze_dependencies(self) -> Dict[str, Any]: | |
| """Analyze project dependencies (async for aiohttp).""" | |
| results = { | |
| "dependency_files": [], # Files that specify the dependencies. | |
| "dependencies": defaultdict(list), # Parsed dependencies. | |
| "dependency_graph": defaultdict(list), # Relationship b/w Dependencies. | |
| "outdated_dependencies": [], # | |
| "security_alerts": [] # Placeholder for future security checks | |
| } | |
| try: | |
| contents = self.repo.get_contents("") | |
| while contents: | |
| file_content = contents.pop(0) | |
| if file_content.type == "dir": | |
| contents.extend(self.repo.get_contents(file_content.path)) | |
| else: | |
| for lang, patterns in self.dependency_files.items(): | |
| if any(self._matches_pattern(file_content.path, pattern) for pattern in patterns): # | |
| try: | |
| file_text = base64.b64decode(file_content.content).decode('utf-8') # | |
| deps = await self._parse_dependency_file(file_content.path, file_text) #parsing the files to find dependency. | |
| if deps: #check deps is not none. | |
| results["dependencies"][file_content.path] = deps | |
| results["dependency_files"].append(file_content.path) # add current file to list of dependency files. | |
| except Exception as e: | |
| self.logger.error(f"Error parsing {file_content.path}: {str(e)}") | |
| results["outdated_dependencies"] = await self._check_outdated_dependencies(results["dependencies"])# | |
| results["dependency_graph"] = self._build_dependency_graph(results["dependencies"]) | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing dependencies: {str(e)}") | |
| # No need to return default values here, as the initialized 'results' dict is sufficient | |
| return results | |
| def _matches_pattern(self, filename: str, pattern: str) -> bool: | |
| """Check if a filename matches a given pattern (supports wildcards).""" | |
| if pattern.startswith("*"): | |
| return filename.endswith(pattern[1:]) # Simple wildcard match | |
| return filename.endswith(pattern) | |
| async def _parse_dependency_file(self, filepath: str, content: str) -> List[Dict[str, str]]: | |
| """Parse different dependency file formats and extract dependencies.""" | |
| deps = [] # Initialize an empty list to hold dependencies | |
| try: | |
| if filepath.endswith(('requirements.txt', 'Pipfile')): #requirements.txt or pipfile | |
| for line in content.split('\n'): | |
| if '==' in line: | |
| name, version = line.strip().split('==') | |
| deps.append({"name": name, "version": version, "type": "python"}) | |
| elif filepath.endswith('package.json'): #package.json | |
| data = json.loads(content) | |
| for dep_type in ['dependencies', 'devDependencies']: # Check both dependencies and devDependencies | |
| if dep_type in data: | |
| for name, version in data[dep_type].items(): | |
| # Remove semver characters like ^ and ~ for accurate comparisons | |
| deps.append({ | |
| "name": name, | |
| "version": version.replace('^', '').replace('~', ''), # Remove ^ and ~ | |
| "type": "npm" | |
| }) | |
| # Add more file type parsing as needed (e.g., pom.xml for Java, Gemfile for Ruby) | |
| except Exception as e: | |
| self.logger.error(f"Error parsing {filepath}: {str(e)}") | |
| # Don't add any dependencies if parsing fails | |
| return deps # Always return the list, even if empty | |
| async def _check_outdated_dependencies(self, dependencies: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, Any]]: | |
| """Check for outdated dependencies using respective package registries (async).""" | |
| outdated = [] | |
| async with aiohttp.ClientSession() as session: #use aiotthp for faster http requests. | |
| for filepath, deps in dependencies.items(): | |
| for dep in deps: | |
| try: | |
| if dep["type"] == "python": | |
| async with session.get(f"https://pypi.org/pypi/{dep['name']}/json") as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| latest_version = data["info"]["version"] | |
| # Use packaging.version for robust version comparison | |
| if version.parse(latest_version) > version.parse(dep["version"]): | |
| outdated.append({ | |
| "name": dep["name"], | |
| "current_version": dep["version"], | |
| "latest_version": latest_version, | |
| "type": "python" | |
| }) | |
| elif dep["type"] == "npm": | |
| # Use npm registry API | |
| async with session.get(f"https://registry.npmjs.org/{dep['name']}") as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| latest_version = data["dist-tags"]["latest"] | |
| if version.parse(latest_version) > version.parse(dep['version']): | |
| outdated.append({ | |
| "name": dep['name'], | |
| "current_version": dep["version"], | |
| "latest_version": latest_version, | |
| "type": "npm" | |
| }) | |
| # Add checks for other package types (Java, Ruby, etc.) | |
| except Exception as e: | |
| self.logger.error(f"Error checking version for {dep['name']}: {str(e)}") | |
| # Continue checking other dependencies even if one fails | |
| return outdated # Return the list, even if empty | |
| def _build_dependency_graph(self, dependencies: Dict[str, List[Dict[str, str]]]) -> Dict[str, List[str]]: | |
| """Build a dependency graph to visualize relationships (using networkx).""" | |
| graph = nx.DiGraph() # directed graph. | |
| try: | |
| for dep_file, deps in dependencies.items(): | |
| for dep in deps: | |
| # Add edges to represent dependencies | |
| graph.add_edge(dep_file, dep["name"]) # Dep file depends on individual libraries. | |
| # Convert to a dictionary of lists for easier handling | |
| return nx.to_dict_of_lists(graph) | |
| except Exception as e: | |
| self.logger.error(f"Error building dependency graph: {str(e)}") | |
| return defaultdict(list) # Return an empty graph in case of error | |
| class TestAnalyzer: | |
| """Handles test analysis.""" | |
| def __init__(self, repo): | |
| self.repo = repo | |
| self.logger = logging.getLogger(__name__) # Add logger | |
| self.test_patterns = { | |
| "python": ["test_*.py", "*_test.py", "tests/*.py"], | |
| "javascript": ["*.test.js", "*.spec.js", "__tests__/*.js"], | |
| "java": ["*Test.java", "*Tests.java"], | |
| "ruby": ["*_test.rb", "*_spec.rb"], | |
| "go": ["*_test.go"] | |
| } | |
| def analyze_tests(self) -> Dict[str, Any]: | |
| """Analyze test files, test counts, and (if possible) coverage information.""" | |
| results = { | |
| "test_files": [], | |
| "test_count": 0, | |
| "coverage_data": {}, # Dictionary to hold any parsed coverage information. | |
| "test_patterns": defaultdict(list) # Store the information about diff. testing pattern. | |
| } | |
| try: | |
| contents = self.repo.get_contents("") | |
| while contents: | |
| content = contents.pop(0) | |
| if content.type == "dir": | |
| contents.extend(self.repo.get_contents(content.path)) | |
| elif self._is_test_file(content.path): | |
| results["test_files"].append(content.path) | |
| test_metrics = self._analyze_test_file(content) #metrics of single files. | |
| results["test_patterns"][content.path] = test_metrics # Store results. | |
| results["test_count"] += test_metrics.get("test_count", 0) # Safely get test_count | |
| results["coverage_data"] = self._find_coverage_data() # Get any coverage. | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing tests: {str(e)}") # Use logger | |
| return results # Always return results | |
| def _is_test_file(self, filepath: str) -> bool: | |
| """Check if a file is likely to be a test file, based on common patterns.""" | |
| for patterns in self.test_patterns.values(): | |
| for pattern in patterns: | |
| if Path(filepath).match(pattern): # Use Path.match for wildcard matching | |
| return True | |
| return False | |
| def _analyze_test_file(self, file_content) -> Dict[str, Any]: | |
| """Analyze an individual test file to count tests, assertions, etc.""" | |
| try: | |
| content = base64.b64decode(file_content.content).decode('utf-8') | |
| metrics = { | |
| "test_count": 0, | |
| "assertions": 0, | |
| "test_classes": 0 # If using class-based tests | |
| } | |
| # Count test cases (using regex for common patterns) | |
| metrics["test_count"] += len(re.findall(r'def test_', content)) # Python | |
| metrics["test_count"] += len(re.findall(r'it\s*\([\'""]', content)) # JavaScript (Jest/Mocha) | |
| metrics["assertions"] += len(re.findall(r'assert', content)) # General assertions | |
| metrics["test_classes"] += len(re.findall(r'class\s+\w+Test', content)) # test class patterns. | |
| return metrics | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing test file: {str(e)}") # Use logger | |
| return {} # Return empty dict on error | |
| def _find_coverage_data(self) -> Dict[str, Any]: | |
| """Try to find coverage information (if available, e.g., from coverage reports).""" | |
| coverage_data = { | |
| "total_coverage": None, | |
| "file_coverage": {}, # If file-level data available. | |
| "coverage_report_found": False # for indicating we find coverage files. | |
| } | |
| try: | |
| # Look for common coverage report files | |
| coverage_files = [ | |
| ".coverage", # Python coverage.py | |
| "coverage.xml", # Cobertura (Python, Java) | |
| "coverage.json", # Jest, other JavaScript | |
| "coverage/lcov.info", # LCOV (C/C++, others) | |
| "coverage/coverage-final.json" # Istanbul (JavaScript) | |
| ] | |
| contents = self.repo.get_contents("") | |
| while contents: | |
| content = contents.pop(0) | |
| if content.type == "dir": | |
| contents.extend(self.repo.get_contents(content.path)) | |
| elif any(content.path.endswith(f) for f in coverage_files): | |
| coverage_data["coverage_report_found"] = True # set covarage to True, Indicate report present. | |
| parsed_coverage = self._parse_coverage_file(content) # Try to parse. | |
| if parsed_coverage: #check parse_coverage is present | |
| coverage_data.update(parsed_coverage) # Merge into result | |
| except Exception as e: | |
| self.logger.error(f"Error finding coverage data: {str(e)}") | |
| return coverage_data | |
| def _parse_coverage_file(self, file_content) -> Dict[str, Any]: | |
| """Parse a coverage report file (handles multiple formats).""" | |
| try: | |
| content = base64.b64decode(file_content.content).decode('utf-8') | |
| if file_content.path.endswith('.json'): | |
| data = json.loads(content) | |
| # Handle different JSON formats (e.g., coverage.py, Istanbul) | |
| if 'total' in data: # coverage.py format | |
| return { | |
| 'total_coverage': data['total'].get('lines', {}).get('percent', 0), | |
| 'file_coverage': { | |
| file: stats.get('lines', {}).get('percent', 0) | |
| for file, stats in data.get('files', {}).items() | |
| } | |
| } | |
| # Add handling for other JSON formats (e.g., Istanbul) as needed | |
| elif file_content.path.endswith('.xml'): | |
| # Parse XML (Cobertura format) | |
| from xml.etree import ElementTree #for parse XML format | |
| root = ElementTree.fromstring(content) | |
| total = float(root.get('line-rate', 0)) * 100 # Overall coverage | |
| file_coverage = {} | |
| # Extract coverage per class/file | |
| for class_elem in root.findall('.//class'): | |
| filename = class_elem.get('filename', '') | |
| line_rate = float(class_elem.get('line-rate', 0)) * 100 | |
| file_coverage[filename] = line_rate | |
| return { | |
| 'total_coverage': total, | |
| 'file_coverage': file_coverage | |
| } | |
| elif file_content.path.endswith('lcov.info'): | |
| # Parse LCOV format | |
| total_lines = 0 | |
| covered_lines = 0 | |
| current_file = None | |
| file_coverage = {} | |
| for line in content.split('\n'): | |
| if line.startswith('SF:'): # Source file | |
| current_file = line[3:].strip() | |
| elif line.startswith('LH:'): # Lines hit | |
| covered = int(line[3:]) | |
| covered_lines += covered | |
| elif line.startswith('LF:'): # Lines found | |
| total = int(line[3:]) | |
| total_lines += total | |
| if current_file and total > 0: # calculate coverage. | |
| file_coverage[current_file] = (covered / total) * 100 | |
| return { | |
| 'total_coverage': (covered_lines / total_lines * 100) if total_lines > 0 else 0, # handle Total lines may be 0 | |
| 'file_coverage': file_coverage | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error parsing coverage file: {str(e)}") | |
| return {} # Return empty dict on error | |
| def analyze_test_quality(self, content: str) -> Dict[str, Any]: | |
| """ | |
| Analyze the quality of the tests themselves. | |
| """ | |
| try: | |
| metrics = { | |
| "assertion_density": 0, # Assertions per line of test code | |
| "test_setup_complexity": 0, # How complex is the test setup? | |
| "mock_usage": 0, # How frequently are mocks used? | |
| "test_patterns": [], # List of identified test patterns and best practices. | |
| "anti_patterns": [] # list of identified Anti patterns | |
| } | |
| lines = content.splitlines() | |
| assertion_count = sum(1 for line in lines if 'assert' in line) # check assertion present. | |
| metrics["assertion_density"] = assertion_count / len(lines) if lines else 0 | |
| setup_lines = [] | |
| in_setup = False | |
| for line in lines: | |
| if 'def setUp' in line or 'def setup' in line: | |
| in_setup = True | |
| elif in_setup and line.strip() and not line.startswith(' '): # if present it has any leading space of not. | |
| in_setup = False | |
| if in_setup: | |
| setup_lines.append(line) | |
| metrics["test_setup_complexity"] = len(setup_lines) | |
| mock_count = sum(1 for line in lines if 'mock' in line.lower()) # count mock if present | |
| metrics["mock_usage"] = mock_count | |
| #detect patterns. | |
| if any('parameterized' in line for line in lines): | |
| metrics["test_patterns"].append("parameterized_tests") # | |
| if any('fixture' in line for line in lines): | |
| metrics["test_patterns"].append("fixture_usage")# | |
| # Identify potential anti-patterns | |
| if any('time.sleep' in line for line in lines): | |
| metrics["anti_patterns"].append("sleep_in_tests") | |
| if any('test' not in line.lower() for line in lines if line.strip().startswith('def')): # all method related to test or not. | |
| metrics["anti_patterns"].append("non_test_methods") # anti_patterns if other extra methods there. | |
| return metrics | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing test quality: {str(e)}") | |
| return { # Return default 0 values on error. | |
| "assertion_density": 0, | |
| "test_setup_complexity": 0, | |
| "mock_usage": 0, | |
| "test_patterns": [], | |
| "anti_patterns": [] | |
| } | |
| class DocumentationAnalyzer: | |
| """Handles documentation analysis.""" | |
| def __init__(self, repo): | |
| self.repo = repo | |
| self.logger = logging.getLogger(__name__) # Add logger | |
| self.doc_patterns = [ | |
| "README.md", | |
| "CONTRIBUTING.md", | |
| "CHANGELOG.md", | |
| "LICENSE", | |
| "docs/", # Common documentation directories | |
| "documentation/", | |
| "wiki/" # Consider wiki as documentation | |
| ] | |
| def analyze_documentation(self) -> Dict[str, Any]: | |
| """Analyze repository documentation (README, CONTRIBUTING, API docs, etc.).""" | |
| results = { | |
| "readme_analysis": None, | |
| "contributing_guidelines": None, | |
| "api_documentation": None, # Placeholder - can be expanded | |
| "documentation_files": [], # All documantation. | |
| "wiki_pages": [], # If the repo has a wiki | |
| "documentation_coverage": 0.0 # Overall score | |
| } | |
| try: | |
| # Analyze README | |
| readme = self._get_file_content("README.md") | |
| if readme: | |
| results["readme_analysis"] = self._analyze_readme(readme) | |
| # Check contributing guidelines | |
| contributing = self._get_file_content("CONTRIBUTING.md") | |
| if contributing: | |
| results["contributing_guidelines"] = self._analyze_contributing(contributing) | |
| contents = self.repo.get_contents("") | |
| while contents: | |
| content = contents.pop(0) | |
| if content.type == "dir": | |
| # Check for dedicated documentation directories | |
| if content.path.lower() in ["docs", "documentation"]: | |
| results["documentation_files"].extend(self._analyze_doc_directory(content.path)) | |
| contents.extend(self.repo.get_contents(content.path)) | |
| # Check for specific documentation files | |
| elif any(content.path.endswith(pattern) for pattern in self.doc_patterns): | |
| results["documentation_files"].append(content.path) | |
| results["documentation_coverage"] = self._calculate_doc_coverage() | |
| # Get wiki pages if available | |
| try: | |
| wiki_pages = self.repo.get_wiki_pages() # Requires PyGithub 2.x | |
| results["wiki_pages"] = [page.title for page in wiki_pages] | |
| except: # GitHub API might raise an exception if no wiki | |
| pass | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing documentation: {str(e)}") # Use logger | |
| return results # Always return results | |
| def _get_file_content(self, filepath: str) -> Optional[str]: | |
| """Helper to get the content of a specific file (handles not found).""" | |
| try: | |
| content = self.repo.get_contents(filepath) | |
| return base64.b64decode(content.content).decode('utf-8') | |
| except: | |
| return None # File not found | |
| def _analyze_readme(self, content: str) -> Dict[str, Any]: | |
| """Analyze the README content for completeness and key information.""" | |
| analysis = { | |
| "sections": [], # List of identified sections (e.g., from headings) | |
| "has_quickstart": False, # Quick start guide | |
| "has_installation": False, # Installation instructions | |
| "has_usage": False, # Basic usage examples | |
| "has_api_docs": False, # Link to API docs? | |
| "has_examples": False, # Code examples | |
| "word_count": len(content.split()), | |
| "completeness_score": 0.0 | |
| } | |
| # Extract sections (using regex for headings) | |
| sections = re.findall(r'^#+\s+(.+)$', content, re.MULTILINE) # match and return the content. | |
| analysis["sections"] = sections | |
| # Check for key components (using regex for robustness) | |
| analysis["has_quickstart"] = bool(re.search(r'quick\s*start', content, re.I)) # Case-insensitive | |
| analysis["has_installation"] = bool(re.search(r'install|setup', content, re.I)) | |
| analysis["has_usage"] = bool(re.search(r'usage|how\s+to\s+use', content, re.I)) # More flexible matching. | |
| analysis["has_api_docs"] = bool(re.search(r'api|documentation', content, re.I)) | |
| analysis["has_examples"] = bool(re.search(r'example|demo', content, re.I)) # Broader example terms | |
| # Calculate a simple completeness score | |
| key_elements = [ | |
| analysis["has_quickstart"], | |
| analysis["has_installation"], | |
| analysis["has_usage"], | |
| analysis["has_api_docs"], | |
| analysis["has_examples"] | |
| ] | |
| analysis["completeness_score"] = sum(key_elements) / len(key_elements) * 100 | |
| return analysis | |
| def _analyze_contributing(self, content: str) -> Dict[str, Any]: | |
| """Analyze CONTRIBUTING.md for guidelines.""" | |
| analysis = { | |
| "has_code_style": False, # Code Style Guide | |
| "has_pr_process": False, # How to make PR | |
| "has_issue_guidelines": False, #Guidelines for reporting issue. | |
| "has_setup_instructions": False, # setup environment Instructions. | |
| "completeness_score": 0.0 | |
| } | |
| analysis["has_code_style"] = bool(re.search(r'code\s+style|coding\s+standards', content, re.I)) | |
| analysis["has_pr_process"] = bool(re.search(r'pull\s+request|PR', content, re.I)) # checking pull request | |
| analysis["has_issue_guidelines"] = bool(re.search(r'issue|bug\s+report', content, re.I)) #issue and bug report. | |
| analysis["has_setup_instructions"] = bool(re.search(r'setup|getting\s+started', content, re.I))# Setup. | |
| key_elements = [ #key components present or not. | |
| analysis["has_code_style"], | |
| analysis["has_pr_process"], | |
| analysis["has_issue_guidelines"], | |
| analysis["has_setup_instructions"] | |
| ] | |
| analysis["completeness_score"] = sum(key_elements) / len(key_elements) * 100 # calculate | |
| return analysis | |
| def _analyze_doc_directory(self, directory: str) -> List[str]: | |
| """Analyze a dedicated documentation directory (if present).""" | |
| doc_files = [] | |
| try: | |
| contents = self.repo.get_contents(directory) | |
| for content in contents: | |
| if content.type == "file": | |
| doc_files.append(content.path) | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing doc directory: {str(e)}") # Use logger | |
| return doc_files | |
| def _calculate_doc_coverage(self) -> float: | |
| """Calculate an overall documentation coverage score (heuristic).""" | |
| # This is a simplified scoring system and should be customized | |
| score = 0.0 | |
| total_points = 0 | |
| # Check README presence and quality | |
| readme = self._get_file_content("README.md") | |
| if readme: | |
| readme_analysis = self._analyze_readme(readme) | |
| score += readme_analysis["completeness_score"] / 100 * 40 # README is worth 40% | |
| total_points += 40 | |
| # Check contributing guidelines | |
| contributing = self._get_file_content("CONTRIBUTING.md") | |
| if contributing: | |
| contributing_analysis = self._analyze_contributing(contributing) | |
| score += contributing_analysis["completeness_score"] / 100 * 20 # Contributing is worth 20% | |
| total_points += 20 | |
| # Check API documentation (basic presence check) | |
| if any(f.endswith(('.md', '.rst')) for f in self.doc_patterns): | |
| score += 20 # API docs are worth 20% | |
| total_points += 20 | |
| # Check for examples (this is simplified - could be improved) | |
| if any('example' in f.lower() for f in self.doc_patterns): # Case-insensitive check | |
| score += 20 # Examples are worth 20% | |
| total_points += 20 | |
| return (score / total_points * 100) if total_points > 0 else 0.0 # Avoid division by 0 | |
| class CommunityAnalyzer: | |
| """Handles community metrics analysis.""" | |
| def __init__(self, repo): | |
| self.repo = repo | |
| self.logger = logging.getLogger(__name__) # Add logger | |
| async def analyze_community(self) -> Dict[str, Any]: | |
| """Analyze community engagement, health, and contribution patterns.""" | |
| results = { | |
| "engagement_metrics": await self._get_engagement_metrics(), # Await async calls | |
| "issue_metrics": await self._analyze_issues(), # Await for analysis | |
| "pr_metrics": await self._analyze_pull_requests(), # Await for PR | |
| "contributor_metrics": self._analyze_contributors(), | |
| "discussion_metrics": await self._analyze_discussions() # If discussions are enabled | |
| } | |
| return results # Returns Calculated community metrics. | |
| async def _get_engagement_metrics(self) -> Dict[str, Any]: | |
| """Get basic repository engagement metrics (stars, forks, watchers).""" | |
| metrics = { | |
| "stars": self.repo.stargazers_count, | |
| "forks": self.repo.forks_count, | |
| "watchers": self.repo.subscribers_count, | |
| "star_history": [], # Historical star data | |
| "fork_history": [] # Historical fork data | |
| } | |
| try: | |
| # Get star history (last 100 stars for efficiency) | |
| stargazers = self.repo.get_stargazers_with_dates() | |
| metrics["star_history"] = [ | |
| {"date": star.starred_at.isoformat(), "count": i + 1} # count: i+1 to show progression. | |
| for i, star in enumerate(stargazers) | |
| ] | |
| # Get fork history | |
| forks = self.repo.get_forks() # No need for with_date. | |
| metrics["fork_history"] = [ | |
| {"date": fork.created_at.isoformat(), "count": i + 1} | |
| for i, fork in enumerate(forks) | |
| ] | |
| except Exception as e: | |
| self.logger.error(f"Error getting engagement metrics: {str(e)}") # Use logger | |
| return metrics # Return calculated metrics data. | |
| async def _analyze_issues(self) -> Dict[str, Any]: | |
| """Analyze repository issues (open, closed, response times, labels).""" | |
| metrics = { | |
| "total_issues": 0, | |
| "open_issues": 0, | |
| "closed_issues": 0, | |
| "avg_time_to_close": None, # Average time to close an issue | |
| "issue_categories": defaultdict(int), # Categorize issues by label | |
| "response_times": [] # List of response times | |
| } | |
| try: | |
| issues = self.repo.get_issues(state='all') # Get all issues (open and closed) | |
| for issue in issues: | |
| metrics["total_issues"] += 1 | |
| if issue.state == 'open': | |
| metrics["open_issues"] += 1 | |
| else: | |
| metrics["closed_issues"] += 1 | |
| # Calculate time to close (if closed_at is available) | |
| if issue.closed_at and issue.created_at: #Calculate time,if issue closed. | |
| time_to_close = (issue.closed_at - issue.created_at).total_seconds() | |
| metrics["response_times"].append(time_to_close) | |
| # Categorize issues by labels | |
| for label in issue.labels: | |
| metrics["issue_categories"][label.name] += 1 | |
| # Calculate average response time | |
| if metrics["response_times"]: # Calculate Avg_response only if any time available. | |
| metrics["avg_time_to_close"] = sum(metrics["response_times"]) / len(metrics["response_times"]) #avg = tot / no. | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing issues: {str(e)}") # Use logger | |
| return metrics | |
| async def _analyze_pull_requests(self) -> Dict[str, Any]: | |
| """Analyze pull requests (open, closed, merged, review times, sizes).""" | |
| metrics = { | |
| "total_prs": 0, | |
| "open_prs": 0, | |
| "merged_prs": 0, | |
| "closed_prs": 0, | |
| "avg_time_to_merge": None, # Average time to merge a PR | |
| "pr_sizes": defaultdict(int), # Categorize PRs by size (lines of code) | |
| "review_times": [] # List of review times | |
| } | |
| try: | |
| pulls = self.repo.get_pulls(state='all') # Get all PRs (open, closed, merged) | |
| for pr in pulls: | |
| metrics["total_prs"] += 1 | |
| if pr.state == 'open': | |
| metrics["open_prs"] += 1 | |
| elif pr.merged: | |
| metrics["merged_prs"] += 1 | |
| # Calculate time to merge | |
| if pr.merged_at and pr.created_at: | |
| time_to_merge = (pr.merged_at - pr.created_at).total_seconds() | |
| metrics["review_times"].append(time_to_merge) #store calculated value | |
| else: | |
| metrics["closed_prs"] += 1 # | |
| # Categorize PR sizes (simplified, based on additions + deletions) | |
| if pr.additions + pr.deletions < 10: | |
| metrics["pr_sizes"]["xs"] += 1 # Extra small | |
| elif pr.additions + pr.deletions < 50: | |
| metrics["pr_sizes"]["s"] += 1 # Small | |
| elif pr.additions + pr.deletions < 250: | |
| metrics["pr_sizes"]["m"] += 1 # Medium | |
| elif pr.additions + pr.deletions < 1000: | |
| metrics["pr_sizes"]["l"] += 1 # Large | |
| else: | |
| metrics["pr_sizes"]["xl"] += 1 # Extra large | |
| # Calculate average review time | |
| if metrics["review_times"]: #calculate Avg_time to merge if review times available. | |
| metrics["avg_time_to_merge"] = sum(metrics["review_times"]) / len(metrics["review_times"]) #calculate Average. | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing pull requests: {str(e)}") # Use logger | |
| return metrics # retrun calculated metrics value. | |
| def _analyze_contributors(self) -> Dict[str, Any]: | |
| """Analyze contributor patterns and engagement.""" | |
| metrics = { | |
| "total_contributors": 0, | |
| "active_contributors": 0, # Contributors active in the last 90 days | |
| "contributor_types": defaultdict(int), # User, Organization, Bot | |
| "contribution_frequency": defaultdict(int), # High, medium, low | |
| "core_contributors": [] # List of core contributors (e.g., top 10%) | |
| } | |
| try: | |
| contributors = self.repo.get_contributors() | |
| for contributor in contributors: | |
| metrics["total_contributors"] += 1 | |
| # Check for recent activity (last 90 days) | |
| recent_commits = contributor.get_commits(since=datetime.now() - timedelta(days=90)) # active since | |
| if recent_commits.totalCount > 0: | |
| metrics["active_contributors"] += 1 | |
| # Categorize contributor types | |
| metrics["contributor_types"][contributor.type] += 1 # increment by type. | |
| # Analyze contribution frequency (simplified) | |
| if contributor.contributions > 100: #Contribution level checking. | |
| metrics["contribution_frequency"]["high"] += 1 | |
| # Consider contributors with >100 contributions as "core" | |
| metrics["core_contributors"].append({ | |
| "login": contributor.login, | |
| "contributions": contributor.contributions, # store | |
| "type": contributor.type #Store. | |
| }) | |
| elif contributor.contributions > 20: | |
| metrics["contribution_frequency"]["medium"] += 1 # store in medium if condition satisfy. | |
| else: | |
| metrics["contribution_frequency"]["low"] += 1# | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing contributors: {str(e)}") # Use logger | |
| return metrics #return Calculated Contributer metrics | |
| async def _analyze_discussions(self) -> Dict[str, Any]: | |
| """Analyze repository discussions (if enabled).""" | |
| metrics = { | |
| "total_discussions": 0, | |
| "active_discussions": 0, # Discussions with recent activity | |
| "categories": defaultdict(int), # Discussion categories | |
| "avg_responses": 0, # Average number of responses per discussion | |
| "response_times": [] # List of response times | |
| } | |
| try: | |
| # Check if discussions are enabled | |
| if self.repo.has_discussions: # first check for discussion enabled. | |
| discussions = self.repo.get_discussions() # retrive all the discussion using get_discussions. | |
| total_responses = 0 | |
| for discussion in discussions: | |
| metrics["total_discussions"] += 1 | |
| # Check for active discussions (simplified: any comments = active) | |
| if discussion.comments > 0: | |
| metrics["active_discussions"] += 1 | |
| total_responses += discussion.comments # Calculate Total no of comments. | |
| # Categorize discussions | |
| metrics["categories"][discussion.category.name] += 1 | |
| # Calculate response times (time to first response) | |
| if discussion.comments > 0: | |
| first_response = discussion.get_comments().reversed[0] # Get first comment | |
| response_time = (first_response.created_at - discussion.created_at).total_seconds() # time calcualtion. | |
| metrics["response_times"].append(response_time) # append that. | |
| # Calculate average responses per discussion | |
| if metrics["active_discussions"] > 0: # Calculate only if value present. | |
| metrics["avg_responses"] = total_responses / metrics["active_discussions"] | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing discussions: {str(e)}") # Use logger | |
| return metrics | |
| class RepositoryAnalyzer: | |
| """Main class to analyze a GitHub repository.""" | |
| def __init__(self, repo_url: str, github_token: str): | |
| self.logger = logging.getLogger(__name__) | |
| self.gh = Github(github_token) # Keep for some top-level calls | |
| self.gh_handler = GitHubAPIHandler(github_token) # Use the handler | |
| self.code_metrics = CodeMetricsAnalyzer() | |
| parts = repo_url.rstrip('/').split('/') | |
| if len(parts) < 2: | |
| raise ValueError("Invalid repository URL format") | |
| self.repo_name = parts[-1] | |
| self.owner = parts[-2] | |
| self.analysis_data = { # Initialize data here | |
| "basic_info": {}, | |
| "structure": {}, | |
| "code_metrics": {}, | |
| "dependencies": {}, | |
| "tests": {}, | |
| "documentation": {}, | |
| "community": {}, | |
| "visualizations": {} | |
| } | |
| try: | |
| self.repo = self.gh_handler.get_repository(repo_url) # Use handler | |
| # Initialize other analyzers *after* successfully getting the repo | |
| self.dependency_analyzer = DependencyAnalyzer(self.repo) | |
| self.test_analyzer = TestAnalyzer(self.repo) | |
| self.doc_analyzer = DocumentationAnalyzer(self.repo) | |
| self.community_analyzer = CommunityAnalyzer(self.repo) | |
| except Exception as e: | |
| self.logger.error(f"Failed to initialize repository analyzer: {str(e)}") | |
| raise | |
| async def analyze(self) -> Dict[str, Any]: | |
| """Perform the full repository analysis.""" | |
| try: | |
| # Basic repository information | |
| self.analysis_data["basic_info"] = { | |
| "name": self.repo.name, | |
| "owner": self.repo.owner.login, | |
| "description": self.repo.description or "No description available", # Handle None | |
| "stars": self.repo.stargazers_count, | |
| "forks": self.repo.forks_count, | |
| "created_at": self.repo.created_at.isoformat(), # Use isoformat() | |
| "last_updated": self.repo.updated_at.isoformat(), | |
| "primary_language": self.repo.language or "Not specified", | |
| } | |
| # Analyze repository structure with sampling | |
| self.analysis_data["structure"] = await self._analyze_structure() | |
| # Analyze code patterns and metrics | |
| self.analysis_data["code_metrics"] = await self._analyze_code_metrics() | |
| # Analyze dependencies | |
| self.analysis_data["dependencies"] = await self.dependency_analyzer.analyze_dependencies() | |
| # Analyze tests and coverage | |
| self.analysis_data["tests"] = self.test_analyzer.analyze_tests() | |
| # Analyze documentation | |
| self.analysis_data["documentation"] = self.doc_analyzer.analyze_documentation() | |
| # Analyze community health | |
| self.analysis_data["community"] = await self.community_analyzer.analyze_community() | |
| # Generate visualizations | |
| self.analysis_data["visualizations"] = await self._generate_visualizations() | |
| return self.analysis_data # Return the populated dict | |
| except Exception as e: | |
| self.logger.error(f"Error during analysis: {str(e)}") | |
| raise | |
| async def _analyze_structure(self) -> Dict[str, Any]: | |
| """Analyze the repository's file and directory structure, with sampling.""" | |
| structure = { | |
| "files": defaultdict(int), # File type counts (e.g., .py, .js) | |
| "directories": set(), # Unique directory paths | |
| "total_size": 0, # Total size in bytes | |
| "directory_tree": defaultdict(list), # Parent -> [children] | |
| "file_samples": [] # Sample files for detailed analysis | |
| } | |
| try: | |
| all_files = [] # Store all relevant files first | |
| contents = self.repo.get_contents("") | |
| while contents: | |
| content = contents.pop(0) | |
| if content.type == "dir": | |
| structure["directories"].add(content.path) | |
| # Build directory tree structure | |
| structure["directory_tree"][os.path.dirname(content.path)].append(content.path) #correct way | |
| contents.extend(self.repo.get_contents(content.path)) | |
| else: | |
| ext = Path(content.path).suffix.lower() # Get lowercase extension | |
| # Only consider relevant files | |
| if ext in RELEVANT_EXTENSIONS: | |
| structure["files"][ext] += 1 # Increment count for the file type | |
| structure["total_size"] += content.size | |
| all_files.append(content) | |
| # Smart sampling of files | |
| if all_files: | |
| # Stratified sampling based on file types | |
| samples_per_type = min(5, max(1, len(all_files) // len(structure["files"]) if structure["files"] else 1)) # At least one sample | |
| for ext in structure["files"].keys(): | |
| ext_files = [f for f in all_files if f.path.endswith(ext)] #select the all file | |
| if ext_files: | |
| # Sort by size, and select a diverse sample | |
| ext_files.sort(key=lambda x: x.size) | |
| total_samples = min(samples_per_type, len(ext_files)) | |
| # Take samples evenly across the size range | |
| step = max(1, len(ext_files) // total_samples) | |
| for i in range(0, len(ext_files), step)[:total_samples]:# Select diverse files from list. | |
| structure["file_samples"].append({ | |
| "path": ext_files[i].path, | |
| "size": ext_files[i].size, | |
| "type": RELEVANT_EXTENSIONS.get(ext, "Unknown") # Get language | |
| }) | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing structure: {str(e)}") | |
| # Don't need to return defaults if 'structure' dict is initialized. | |
| return { | |
| "file_types": dict(structure["files"]), # Convert defaultdict to dict | |
| "directory_count": len(structure["directories"]), | |
| "total_size": structure["total_size"], | |
| "file_count": sum(structure["files"].values()), # Total relevant files | |
| "directory_tree": dict(structure["directory_tree"]), # convert | |
| "file_samples": structure["file_samples"] | |
| } | |
| async def _analyze_code_metrics(self) -> Dict[str, Any]: | |
| """Analyze code metrics for a sample of files, with parallel processing.""" | |
| metrics = { | |
| "complexity_metrics": defaultdict(list), # Cyclomatic/cognitive, nesting | |
| "duplication_metrics": defaultdict(list), | |
| "function_metrics": defaultdict(list), # From function analysis | |
| "comment_metrics": defaultdict(list), # Comment density | |
| "language_metrics": defaultdict(dict) # Aggregate by language | |
| } | |
| try: | |
| # Get all relevant files | |
| contents = self.repo.get_contents("") | |
| files_to_analyze = [] | |
| while contents: | |
| content = contents.pop(0) | |
| if content.type == "dir": | |
| contents.extend(self.repo.get_contents(content.path)) | |
| elif Path(content.path).suffix.lower() in RELEVANT_EXTENSIONS: # Check file. | |
| files_to_analyze.append(content) | |
| # Use parallel processing for file analysis | |
| with ThreadPoolExecutor(max_workers=min(10, len(files_to_analyze))) as executor: # Limit max worker upto 10. | |
| futures = [] | |
| for file_content in files_to_analyze: | |
| futures.append(executor.submit(self.code_metrics._analyze_file_metrics, file_content)) # passing arguments | |
| for future in futures: # | |
| try: | |
| file_metrics = future.result() # Collect the results from the File Analysis | |
| if file_metrics: | |
| language = RELEVANT_EXTENSIONS.get(Path(file_metrics["path"]).suffix.lower(), "Unknown") | |
| # Aggregate metrics (by language, for example) | |
| # Correctly handle string keys for metrics | |
| for metric_type, value in file_metrics["metrics"].items(): | |
| if isinstance(value, (int, float)): | |
| metrics.setdefault(f"{metric_type}_metrics", defaultdict(list))[language].append(value) # store | |
| # Update language-specific metrics | |
| if language not in metrics["language_metrics"]: | |
| metrics["language_metrics"][language] = { | |
| "file_count": 0, | |
| "total_lines": 0, | |
| "total_complexity": 0 | |
| } | |
| lang_metrics = metrics["language_metrics"][language] #get value based on language. | |
| lang_metrics["file_count"] += 1 | |
| lang_metrics["total_lines"] += file_metrics["metrics"].get("total_lines", 0) # Total lines addition. | |
| lang_metrics["total_complexity"] += file_metrics["metrics"].get("complexity", 0) #complexity count | |
| except Exception as e: | |
| self.logger.error(f"Error processing file metrics: {str(e)}") | |
| return metrics # return aggregated | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing code metrics: {str(e)}") | |
| return metrics # Return the initialized dict (possibly empty) | |
| async def _generate_visualizations(self) -> Dict[str, Any]: | |
| """Generate visualizations from the analyzed data (using matplotlib, seaborn, etc.).""" | |
| visualizations = {} | |
| try: | |
| # Language distribution pie chart | |
| if self.analysis_data.get("structure", {}).get("file_types"): | |
| fig, ax = plt.subplots() | |
| languages = self.analysis_data["structure"]["file_types"] | |
| plt.pie(languages.values(), labels=languages.keys(), autopct='%1.1f%%') | |
| plt.title("Language Distribution") | |
| from io import BytesIO | |
| buffer = BytesIO() # convert bytes | |
| plt.savefig(buffer, format='png') | |
| visualizations["language_distribution"] = base64.b64encode(buffer.getvalue()).decode() | |
| plt.close() | |
| # Code complexity heatmap (example using average complexity) | |
| if self.analysis_data.get("code_metrics", {}).get("complexity_metrics"): | |
| complexity_data = [] | |
| for lang, values in self.analysis_data["code_metrics"]["complexity_metrics"].items(): | |
| if values: # Ensure there are values to average | |
| complexity_data.append({ | |
| "language": lang, | |
| "avg_complexity": sum(values) / len(values) | |
| }) | |
| if complexity_data: # If Data present generate graph. | |
| df = pd.DataFrame(complexity_data) | |
| plt.figure(figsize=(10, 6)) | |
| sns.barplot(data=df, x="language", y="avg_complexity") | |
| plt.title("Average Code Complexity by Language") | |
| plt.xticks(rotation=45) # Rotate x-axis labels | |
| buffer = BytesIO() | |
| plt.savefig(buffer, format='png', bbox_inches='tight') # Improve layout | |
| visualizations["complexity_distribution"] = base64.b64encode(buffer.getvalue()).decode() | |
| plt.close() | |
| # Commit activity heatmap (example) | |
| if self.analysis_data.get("community", {}).get("commit_history"): #check whether community & commit-history metrics | |
| commit_data = self.analysis_data["community"]["commit_history"] | |
| df = pd.DataFrame(commit_data) | |
| df['date'] = pd.to_datetime(df['date']) # change into date time for visualization | |
| df = df.set_index('date') | |
| # Resample to daily counts | |
| df = df.resample('D').count() | |
| plt.figure(figsize=(12, 4)) # fixed size. | |
| sns.heatmap(df.pivot_table(index=df.index.dayofweek, columns=df.index.month, values='count', aggfunc='sum')) # cretae heat map | |
| plt.title("Commit Activity Heatmap") #tile. | |
| buffer = BytesIO() # | |
| plt.savefig(buffer, format='png', bbox_inches='tight') | |
| visualizations["commit_heatmap"] = base64.b64encode(buffer.getvalue()).decode() # | |
| plt.close() # | |
| # Add more visualizations as needed (e.g., dependency graph, test coverage) | |
| except Exception as e: | |
| self.logger.error(f"Error generating visualizations: {str(e)}") | |
| return visualizations # Even if empty | |
| # --- Prompt Creation and LLM Interaction --- | |
| def create_enhanced_analysis_prompt(analysis_data: Dict[str, Any]) -> str: | |
| """Create an enhanced prompt for the LLM analysis.""" | |
| return f"""You are an expert code analyst with deep experience in software architecture, development practices, and team dynamics. | |
| Analyze the provided repository data and create a detailed, insightful analysis using the following sections: | |
| # Repository Analysis for {analysis_data['basic_info']['name']} | |
| ## π Project Overview | |
| [Analyze the basic repository information, including: | |
| - Project purpose and description | |
| - Repository age and activity level | |
| - Key metrics (stars, forks, contributors) | |
| - Primary technologies used | |
| - Overall project health indicators] | |
| ## ποΈ Architecture and Code Organization | |
| [Analyze the repository structure and code organization: | |
| - Directory structure and organization patterns | |
| - Code distribution across languages | |
| - File organization and modularity | |
| - Architectural patterns | |
| - Development standards and practices | |
| - Code complexity distribution | |
| - Potential architectural improvements] | |
| ## π» Code Quality and Metrics | |
| [Provide detailed analysis of code quality metrics: | |
| - Cyclomatic complexity trends | |
| - Code duplication patterns | |
| - Function length and complexity | |
| - Comment density and documentation quality | |
| - Test coverage and quality | |
| - Areas for potential improvement] | |
| ## π¦ Dependencies and Security | |
| [Analyze the project's dependencies: | |
| - Major dependencies and their versions | |
| - Outdated dependencies | |
| - Security vulnerabilities | |
| - Dependency graph complexity | |
| - Licensing considerations] | |
| ## π Documentation Assessment | |
| [Evaluate the project's documentation: | |
| - README completeness and quality | |
| - API documentation coverage | |
| - Contributing guidelines | |
| - Code comments and inline documentation | |
| - Examples and tutorials | |
| - Documentation maintenance status] | |
| ## π§ͺ Testing and Quality Assurance | |
| [Analyze testing practices: | |
| - Test coverage metrics | |
| - Testing patterns and approaches | |
| - CI/CD implementation | |
| - Quality assurance processes | |
| - Areas needing additional testing] | |
| ## π₯ Community Health and Engagement | |
| [Evaluate community aspects: | |
| - Contributor demographics and activity | |
| - Issue and PR response times | |
| - Community engagement metrics | |
| - Communication patterns | |
| - Governance model] | |
| ## π Development Trends | |
| [Analyze development patterns: | |
| - Commit frequency and distribution | |
| - Code change patterns | |
| - Release cycle analysis | |
| - Development velocity | |
| - Team collaboration patterns] | |
| ## π Performance and Scalability | |
| [Assess technical characteristics: | |
| - Code performance indicators | |
| - Scalability considerations | |
| - Resource usage patterns | |
| - Technical debt indicators | |
| - Optimization opportunities] | |
| ## π‘ Key Insights | |
| [Summarize the most important findings: | |
| - Top 3 strengths | |
| - Top 3 areas for improvement | |
| - Unique characteristics | |
| - Notable patterns or practices | |
| - Risk factors] | |
| ## π Recommendations | |
| [Provide actionable recommendations: | |
| - Immediate improvement opportunities | |
| - Long-term strategic suggestions | |
| - Specific tools or practices to consider | |
| - Priority areas for focus | |
| - Resource allocation suggestions] | |
| Please analyze the following repository data thoroughly and provide detailed insights for each section: | |
| {json.dumps(analysis_data, indent=2)} | |
| """ | |
| async def analyze_repository(repo_url: str, github_token: str, gemini_key: str, progress=gr.Progress()) -> Tuple[str, str, str]: | |
| """Analyze repository and generate LLM summary (async, with progress).""" | |
| try: | |
| # Re-initialize tokens each time | |
| initialize_tokens(github_token, gemini_key) # Ensure fresh tokens | |
| progress(0, desc="Initializing repository analysis...") | |
| analyzer = RepositoryAnalyzer(repo_url, github_token) | |
| progress(0.3, desc="Analyzing repository structure and patterns...") | |
| analysis_data = await analyzer.analyze() # Await the analysis | |
| progress(0.7, desc="Generating comprehensive analysis...") | |
| # Use the more powerful Gemini 1.5 Pro model | |
| model = genai.GenerativeModel( | |
| model_name="gemini-1.5-pro", # Use 1.5 Pro | |
| generation_config={ | |
| "temperature": 0.7, | |
| "top_p": 0.95, # Use nucleus sampling | |
| "top_k": 40, | |
| "max_output_tokens": 8192, # Increased token limit | |
| } | |
| ) | |
| prompt = create_enhanced_analysis_prompt(analysis_data) # Use a better, sectioned prompt. | |
| # Use streaming for a better user experience | |
| chat = model.start_chat(history=[]) # Start fresh | |
| response = chat.send_message(prompt) | |
| progress(0.9, desc="Saving analysis results...") | |
| # Save analysis data to a temporary file (for follow-up Q&A) | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: | |
| json.dump(analysis_data, f, indent=2) | |
| analysis_file = f.name | |
| progress(1.0, desc="Analysis complete!") | |
| return response.text, analysis_file, "β Analysis completed successfully!" | |
| except Exception as e: | |
| error_message = f"β Error analyzing repository: {str(e)}" | |
| return "", "", error_message # Return empty strings for Markdown and file | |
| async def ask_question(question: str, analysis_file: str, chat_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]: | |
| """Process a follow-up question about the analysis with enhanced context.""" | |
| if not analysis_file: | |
| return chat_history + [(question, "Please analyze a repository first before asking questions.")] | |
| try: | |
| with open(analysis_file, 'r') as f: | |
| analysis_data = json.load(f) | |
| # Initialize chat with system prompt and history | |
| model = genai.GenerativeModel( | |
| model_name="gemini-1.5-pro", # Use 1.5 Pro | |
| generation_config={ | |
| "temperature": 0.7, | |
| "top_p": 0.8, # More focused sampling | |
| "top_k": 40, | |
| "max_output_tokens": 4096, # Increased token limit | |
| } | |
| ) | |
| # Build the context | |
| context = """You are an expert code analyst helping users understand repository analysis results. | |
| Provide detailed, technical, and actionable insights based on the analysis data. When appropriate, | |
| reference specific metrics and patterns from the analysis. If making recommendations, be specific | |
| and explain the reasoning behind them. | |
| Repository Analysis Data: | |
| """ | |
| context += json.dumps(analysis_data, indent=2) + "\n\n" | |
| if chat_history: # Previous Chat history if have any. | |
| context += "Previous conversation:\n" | |
| for user_msg, assistant_msg in chat_history[-3:]: # Only include last 3 exchanges for relevance. | |
| context += f"User: {user_msg}\nAssistant: {assistant_msg}\n" | |
| prompt = f"""{context} | |
| User's Question: {question} | |
| Please provide a detailed analysis that: | |
| 1. Directly addresses the user's question | |
| 2. References relevant metrics and data from the analysis | |
| 3. Provides context and explanations for technical concepts | |
| 4. Suggests actionable next steps or recommendations when appropriate | |
| 5. Maintains technical accuracy while being clear and understandable | |
| Your response:""" | |
| chat = model.start_chat(history=[]) # Start a new chat | |
| response = chat.send_message(prompt) | |
| return chat_history + [(question, response.text)] # Store new | |
| except Exception as e: | |
| error_message = f"Error processing question: {str(e)}" | |
| return chat_history + [(question, error_message)] | |
| # --- Gradio Interface --- | |
| def create_interface(): | |
| with gr.Blocks(theme=gr.themes.Soft()) as app: # Use a theme | |
| gr.Markdown(""" | |
| # π GitHub Repository Analyzer (Colab Version) | |
| Analyze any public GitHub repository using AI. | |
| """) | |
| # API tokens | |
| with gr.Row(): | |
| github_token = gr.Textbox( | |
| label="GitHub Token", | |
| type="password", | |
| placeholder="Enter your GitHub token" | |
| ) | |
| gemini_key = gr.Textbox( | |
| label="Gemini API Key", | |
| type="password", | |
| placeholder="Enter your Gemini API key" | |
| ) | |
| init_btn = gr.Button("Initialize Tokens", variant="secondary") | |
| # Repository URL and analysis button | |
| with gr.Row(): | |
| repo_url = gr.Textbox( | |
| label="GitHub Repository URL", | |
| placeholder="https://github.com/owner/repo", | |
| scale=4 # Larger input box | |
| ) | |
| analyze_btn = gr.Button("π Analyze", variant="primary", scale=1) | |
| # Status message | |
| status_msg = gr.Markdown("") # Display Error Status. | |
| # Analysis results | |
| with gr.Tabs(): | |
| with gr.Tab("π Analysis Report"): # report Analysis. | |
| summary = gr.Markdown("") # output report. | |
| with gr.Tab("π Q&A"): # Improved label | |
| chatbot = gr.Chatbot( | |
| [], | |
| label="Ask questions about the analysis", | |
| height=400 | |
| ) | |
| with gr.Row(): | |
| question = gr.Textbox( | |
| label="Your Question", | |
| placeholder="Ask about specific aspects of the analysis...", | |
| scale=4 | |
| ) | |
| ask_btn = gr.Button("Ask", scale=1) | |
| clear_btn = gr.Button("Clear", scale=1) | |
| # Hidden state to store the analysis data file path | |
| analysis_file = gr.State("") | |
| async def safe_analyze(repo_url: str, github_token: str, gemini_key: str): | |
| """Wrapper function to handle analysis and errors gracefully.""" | |
| try: | |
| if not repo_url: | |
| return None, None, "β Please enter a GitHub repository URL" | |
| if not github_token or not gemini_key: | |
| return None, None, "β Please initialize tokens first" | |
| if not re.match(r'https?://github\.com/[\w-]+/[\w-]+/?$', repo_url): | |
| return None, None, "β Invalid GitHub repository URL format" | |
| summary, analysis_file, status = await analyze_repository(repo_url, github_token, gemini_key) | |
| return summary, analysis_file, status | |
| except Exception as e: | |
| return None, None, f"β Analysis failed: {str(e)}" | |
| # Event handlers | |
| init_btn.click( | |
| initialize_tokens, | |
| inputs=[github_token, gemini_key], | |
| outputs=status_msg | |
| ) | |
| analyze_btn.click( | |
| fn=lambda: "β³ Analysis in progress...", # Immediate feedback | |
| inputs=None, | |
| outputs=status_msg, | |
| queue=False # Don't queue this click | |
| ).then( | |
| safe_analyze, # Call the wrapper | |
| inputs=[repo_url, github_token, gemini_key], | |
| outputs=[summary, analysis_file, status_msg] | |
| ) | |
| ask_btn.click( | |
| ask_question, | |
| inputs=[question, analysis_file, chatbot], # Include chatbot history | |
| outputs=[chatbot] | |
| ).then( | |
| lambda: "", # Clear the question box after asking | |
| None, | |
| question, | |
| queue=False | |
| ) | |
| clear_btn.click( | |
| lambda: ([], ""), # Clear chatbot and question | |
| outputs=[chatbot, question] | |
| ) | |
| return app | |
| # Run the interface | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.launch(debug=True, share=True) |