Spaces:
Sleeping
Sleeping
| import requests | |
| import json | |
| import os | |
| import base64 | |
| import re | |
| import ast | |
| import networkx as nx | |
| import radon.metrics as metrics | |
| import radon.complexity as complexity | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict, Counter | |
| import pandas as pd | |
| import numpy as np | |
| from github import Github, GithubException | |
| import time | |
| from dotenv import load_dotenv | |
| # Visualization imports | |
| import vizro.plotly.express as px | |
| import vizro | |
| import vizro.models as vzm | |
| import plotly.graph_objects as go | |
| import gradio as gr | |
| from pyvis.network import Network | |
| # Google Gemini AI (optional) | |
| try: | |
| import google.generativeai as genai | |
| GEMINI_AVAILABLE = True | |
| except ImportError: | |
| GEMINI_AVAILABLE = False | |
| print("Google Generative AI package not found. PR summarization feature will be disabled.") | |
| class GitHubRepoInfo: | |
| """Enhanced class to get comprehensive information about a GitHub repository.""" | |
| def __init__(self, token=None): | |
| """Initialize with optional GitHub API token.""" | |
| self.base_url = "https://api.github.com" | |
| self.headers = {"Accept": "application/vnd.github.v3+json"} | |
| self.token = token | |
| self.github = None # Initialize github attribute | |
| # Set up authentication | |
| if token: | |
| self.headers["Authorization"] = f"token {token}" | |
| try: | |
| self.github = Github(token) | |
| self.github.get_user().login # Test connection | |
| except Exception as e: | |
| print(f"Warning: Failed to initialize PyGithub with token: {e}") | |
| self.github = Github() # Fallback to unauthenticated | |
| elif os.environ.get("GITHUB_TOKEN"): | |
| self.token = os.environ.get("GITHUB_TOKEN") | |
| self.headers["Authorization"] = f"token {self.token}" | |
| try: | |
| self.github = Github(self.token) | |
| self.github.get_user().login # Test connection | |
| except Exception as e: | |
| print(f"Warning: Failed to initialize PyGithub with token: {e}") | |
| self.github = Github() # Fallback to unauthenticated | |
| else: | |
| self.github = Github() # Unauthenticated | |
| # Configure rate limit handling | |
| self.rate_limit_remaining = 5000 # Assume higher limit if authenticated | |
| self.rate_limit_reset = datetime.now() | |
| # Initialize rate limit info if possible | |
| if self.github: | |
| try: | |
| rate_limit = self.github.get_rate_limit() | |
| self.rate_limit_remaining = rate_limit.core.remaining | |
| self.rate_limit_reset = datetime.fromtimestamp(rate_limit.core.reset) | |
| except Exception as e: | |
| print(f"Warning: Could not get initial rate limit from PyGithub: {e}") | |
| def _check_rate_limit(self): | |
| """Check API rate limit and wait if necessary.""" | |
| if self.rate_limit_remaining <= 10: | |
| reset_time = self.rate_limit_reset | |
| current_time = datetime.now() | |
| if reset_time > current_time: | |
| wait_time = (reset_time - current_time).total_seconds() + 10 # Add buffer | |
| print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.") | |
| time.sleep(wait_time) | |
| # Update rate limit info after each API call | |
| response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) | |
| if response.status_code == 200: | |
| rate_data = response.json() | |
| self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"] | |
| self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"]) | |
| def _paginated_get(self, url, params=None, max_items=None): | |
| """Handle paginated API responses with rate limit awareness.""" | |
| if params is None: | |
| params = {} | |
| items = [] | |
| page = 1 | |
| per_page = min(100, params.get("per_page", 30)) | |
| params["per_page"] = per_page | |
| while True: | |
| self._check_rate_limit() | |
| params["page"] = page | |
| response = requests.get(url, headers=self.headers, params=params) | |
| if response.status_code == 200: | |
| page_items = response.json() | |
| if not page_items: | |
| break | |
| items.extend(page_items) | |
| page += 1 | |
| # Check if we've reached the requested limit | |
| if max_items and len(items) >= max_items: | |
| return items[:max_items] | |
| # Check if we've reached the end (GitHub returns fewer items than requested) | |
| if len(page_items) < per_page: | |
| break | |
| else: | |
| print(f"Error {response.status_code}: {response.text}") | |
| break | |
| return items | |
| def get_repo_info(self, owner, repo): | |
| """Get basic repository information.""" | |
| self._check_rate_limit() | |
| url = f"{self.base_url}/repos/{owner}/{repo}" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| print(f"Error {response.status_code}: {response.text}") | |
| return None | |
| def get_contributors(self, owner, repo, max_contributors=None): | |
| """Get repository contributors with pagination support.""" | |
| url = f"{self.base_url}/repos/{owner}/{repo}/contributors" | |
| return self._paginated_get(url, max_items=max_contributors) | |
| def get_languages(self, owner, repo): | |
| """Get languages used in the repository.""" | |
| self._check_rate_limit() | |
| url = f"{self.base_url}/repos/{owner}/{repo}/languages" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| print(f"Error getting languages: {response.status_code}") | |
| return {} | |
| def get_commits(self, owner, repo, params=None, max_commits=None): | |
| """Get commits with enhanced filtering and pagination.""" | |
| url = f"{self.base_url}/repos/{owner}/{repo}/commits" | |
| return self._paginated_get(url, params=params, max_items=max_commits) | |
| def get_commit_activity(self, owner, repo): | |
| """Get commit activity stats for the past year.""" | |
| self._check_rate_limit() | |
| url = f"{self.base_url}/repos/{owner}/{repo}/stats/commit_activity" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| return response.json() | |
| elif response.status_code == 202: | |
| # GitHub is computing the statistics, wait and retry | |
| print("GitHub is computing statistics, waiting and retrying...") | |
| time.sleep(2) | |
| return self.get_commit_activity(owner, repo) | |
| else: | |
| print(f"Error getting commit activity: {response.status_code}") | |
| return [] | |
| def get_code_frequency(self, owner, repo): | |
| """Get weekly code addition and deletion statistics.""" | |
| self._check_rate_limit() | |
| url = f"{self.base_url}/repos/{owner}/{repo}/stats/code_frequency" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| return response.json() | |
| elif response.status_code == 202: | |
| # GitHub is computing the statistics, wait and retry | |
| print("GitHub is computing statistics, waiting and retrying...") | |
| time.sleep(2) | |
| return self.get_code_frequency(owner, repo) | |
| else: | |
| print(f"Error getting code frequency: {response.status_code}") | |
| return [] | |
| def get_contributor_activity(self, owner, repo): | |
| """Get contributor commit activity over time.""" | |
| self._check_rate_limit() | |
| url = f"{self.base_url}/repos/{owner}/{repo}/stats/contributors" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| return response.json() | |
| elif response.status_code == 202: | |
| # GitHub is computing the statistics, wait and retry | |
| print("GitHub is computing statistics, waiting and retrying...") | |
| time.sleep(2) | |
| return self.get_contributor_activity(owner, repo) | |
| else: | |
| print(f"Error getting contributor activity: {response.status_code}") | |
| return [] | |
| def get_branches(self, owner, repo): | |
| """Get repository branches.""" | |
| url = f"{self.base_url}/repos/{owner}/{repo}/branches" | |
| return self._paginated_get(url) | |
| def get_releases(self, owner, repo, max_releases=None): | |
| """Get repository releases with pagination support.""" | |
| url = f"{self.base_url}/repos/{owner}/{repo}/releases" | |
| return self._paginated_get(url, max_items=max_releases) | |
| def get_issues(self, owner, repo, state="all", max_issues=None, params=None): | |
| """Get repository issues with enhanced filtering.""" | |
| url = f"{self.base_url}/repos/{owner}/{repo}/issues" | |
| if params is None: | |
| params = {} | |
| params["state"] = state | |
| return self._paginated_get(url, params=params, max_items=max_issues) | |
| def get_issue_timeline(self, owner, repo, days_back=180): | |
| """Analyze issue creation and closing over time.""" | |
| # Get issues including closed ones | |
| issues = self.get_issues(owner, repo, state="all") | |
| # Prepare timeline data | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=days_back) | |
| # Initialize daily counters | |
| date_range = pd.date_range(start=start_date, end=end_date) | |
| created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
| closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
| # Collect issue creation and closing dates | |
| for issue in issues: | |
| created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') | |
| if created_at >= start_date: | |
| created_counts[created_at.strftime('%Y-%m-%d')] += 1 | |
| if issue['state'] == 'closed' and issue.get('closed_at'): | |
| closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') | |
| if closed_at >= start_date: | |
| closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 | |
| # Calculate resolution times for closed issues | |
| resolution_times = [] | |
| for issue in issues: | |
| if issue['state'] == 'closed' and issue.get('closed_at'): | |
| created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') | |
| closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') | |
| resolution_time = (closed_at - created_at).total_seconds() / 3600 # hours | |
| resolution_times.append(resolution_time) | |
| # Calculate issue labels distribution | |
| label_counts = defaultdict(int) | |
| for issue in issues: | |
| for label in issue.get('labels', []): | |
| label_counts[label['name']] += 1 | |
| return { | |
| 'created': created_counts, | |
| 'closed': closed_counts, | |
| 'resolution_times': resolution_times, | |
| 'labels': dict(label_counts) | |
| } | |
| def get_pull_requests(self, owner, repo, state="all", max_prs=None, params=None): | |
| """Get repository pull requests with enhanced filtering.""" | |
| url = f"{self.base_url}/repos/{owner}/{repo}/pulls" | |
| if params is None: | |
| params = {} | |
| params["state"] = state | |
| return self._paginated_get(url, params=params, max_items=max_prs) | |
| def get_pr_timeline(self, owner, repo, days_back=180): | |
| """Analyze PR creation, closing, and metrics over time.""" | |
| # Get PRs including closed and merged ones | |
| prs = self.get_pull_requests(owner, repo, state="all") | |
| # Prepare timeline data | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=days_back) | |
| # Initialize daily counters | |
| date_range = pd.date_range(start=start_date, end=end_date) | |
| created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
| closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
| merged_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
| # Track metrics | |
| merge_times = [] | |
| pr_sizes = [] | |
| # Collect PR data | |
| for pr in prs: | |
| created_at = datetime.strptime(pr['created_at'], '%Y-%m-%dT%H:%M:%SZ') | |
| if created_at >= start_date: | |
| created_counts[created_at.strftime('%Y-%m-%d')] += 1 | |
| # Get PR size (additions + deletions) | |
| if pr.get('additions') is not None and pr.get('deletions') is not None: | |
| pr_sizes.append({ | |
| 'additions': pr['additions'], | |
| 'deletions': pr['deletions'], | |
| 'total': pr['additions'] + pr['deletions'], | |
| 'files_changed': pr.get('changed_files', 0) | |
| }) | |
| # Check if PR is closed | |
| if pr['state'] == 'closed': | |
| closed_at = datetime.strptime(pr['closed_at'], '%Y-%m-%dT%H:%M:%SZ') | |
| if closed_at >= start_date: | |
| closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 | |
| # Check if PR was merged | |
| if pr['merged_at']: | |
| merged_at = datetime.strptime(pr['merged_at'], '%Y-%m-%dT%H:%M:%SZ') | |
| if merged_at >= start_date: | |
| merged_counts[merged_at.strftime('%Y-%m-%d')] += 1 | |
| # Calculate time to merge | |
| merge_time = (merged_at - created_at).total_seconds() / 3600 # hours | |
| merge_times.append(merge_time) | |
| # Calculate acceptance rate | |
| total_closed = sum(closed_counts.values()) | |
| total_merged = sum(merged_counts.values()) | |
| acceptance_rate = (total_merged / total_closed) * 100 if total_closed > 0 else 0 | |
| return { | |
| 'created': created_counts, | |
| 'closed': closed_counts, | |
| 'merged': merged_counts, | |
| 'merge_times': merge_times, | |
| 'pr_sizes': pr_sizes, | |
| 'acceptance_rate': acceptance_rate | |
| } | |
| def get_contents(self, owner, repo, path="", ref=None): | |
| """Get repository contents at the specified path.""" | |
| self._check_rate_limit() | |
| url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" | |
| params = {} | |
| if ref: | |
| params["ref"] = ref | |
| response = requests.get(url, headers=self.headers, params=params) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| print(f"Error getting contents: {response.status_code}") | |
| return [] | |
| def get_readme(self, owner, repo, ref=None): | |
| """Get repository README file.""" | |
| self._check_rate_limit() | |
| url = f"{self.base_url}/repos/{owner}/{repo}/readme" | |
| params = {} | |
| if ref: | |
| params["ref"] = ref | |
| response = requests.get(url, headers=self.headers, params=params) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data.get("content"): | |
| content = base64.b64decode(data["content"]).decode("utf-8") | |
| return { | |
| "name": data["name"], | |
| "path": data["path"], | |
| "content": content | |
| } | |
| return data | |
| else: | |
| print(f"README not found or error: {response.status_code}") | |
| return None | |
| def get_file_content(self, owner, repo, path, ref=None): | |
| """Get the content of a specific file in the repository.""" | |
| self._check_rate_limit() | |
| url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" | |
| params = {} | |
| if ref: | |
| params["ref"] = ref | |
| response = requests.get(url, headers=self.headers, params=params) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data.get("content"): | |
| try: | |
| content = base64.b64decode(data["content"]).decode("utf-8") | |
| return content | |
| except UnicodeDecodeError: | |
| return "[Binary file content not displayed]" | |
| return None | |
| else: | |
| print(f"Error getting file content: {response.status_code}") | |
| return None | |
| def is_text_file(self, file_path): | |
| """Determine if a file is likely a text file based on extension.""" | |
| text_extensions = [ | |
| '.txt', '.md', '.rst', '.py', '.js', '.html', '.css', '.java', '.c', | |
| '.cpp', '.h', '.hpp', '.json', '.xml', '.yaml', '.yml', '.toml', | |
| '.ini', '.cfg', '.conf', '.sh', '.bat', '.ps1', '.rb', '.pl', '.php', | |
| '.go', '.rs', '.ts', '.jsx', '.tsx', '.vue', '.swift', '.kt', '.scala', | |
| '.groovy', '.lua', '.r', '.dart', '.ex', '.exs', '.erl', '.hrl', | |
| '.clj', '.hs', '.elm', '.f90', '.f95', '.f03', '.sql', '.gitignore', | |
| '.dockerignore', '.env', '.editorconfig', '.htaccess', '.cs', '.ipynb', | |
| '.R', '.Rmd', '.jl', '.fs', '.ml', '.mli', '.d', '.scm', '.lisp', | |
| '.el', '.m', '.mm', '.vb', '.asm', '.s', '.Dockerfile', '.gradle' | |
| ] | |
| extension = os.path.splitext(file_path)[1].lower() | |
| return extension in text_extensions | |
| def get_recursive_contents(self, owner, repo, path="", max_depth=3, current_depth=0, max_files=1000, ref=None): | |
| """Recursively get repository contents with a depth limit and file count limit.""" | |
| if current_depth >= max_depth: | |
| return [] | |
| contents = self.get_contents(owner, repo, path, ref) | |
| results = [] | |
| file_count = 0 | |
| for item in contents: | |
| if file_count >= max_files: | |
| break | |
| if item["type"] == "dir": | |
| # For directories, add the directory itself and recursively get contents | |
| dir_item = { | |
| "type": "dir", | |
| "name": item["name"], | |
| "path": item["path"], | |
| "contents": self.get_recursive_contents( | |
| owner, repo, item["path"], max_depth, current_depth + 1, | |
| max_files - file_count, ref | |
| ) | |
| } | |
| results.append(dir_item) | |
| else: | |
| # For files, add the file info | |
| results.append({ | |
| "type": "file", | |
| "name": item["name"], | |
| "path": item["path"], | |
| "size": item["size"], | |
| "url": item["html_url"] | |
| }) | |
| file_count += 1 | |
| return results | |
| def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None): | |
| """Get content of all text files in the repository (with limit).""" | |
| contents = self.get_contents(owner, repo, path, ref) | |
| text_files = [] | |
| file_count = 0 | |
| # Process current directory | |
| for item in contents: | |
| if file_count >= max_files: | |
| break | |
| if item["type"] == "file" and self.is_text_file(item["name"]): | |
| content = self.get_file_content(owner, repo, item["path"], ref) | |
| if content and content != "[Binary file content not displayed]": | |
| text_files.append({ | |
| "name": item["name"], | |
| "path": item["path"], | |
| "content": content | |
| }) | |
| file_count += 1 | |
| elif item["type"] == "dir": | |
| # Recursively get text files from subdirectories | |
| subdir_files = self.get_all_text_files( | |
| owner, repo, item["path"], max_files - file_count, ref | |
| ) | |
| text_files.extend(subdir_files) | |
| file_count += len(subdir_files) | |
| return text_files | |
| def get_documentation_files(self, owner, repo, ref=None): | |
| """Get documentation files from the repository.""" | |
| # Common documentation file paths and directories | |
| doc_paths = [ | |
| "docs", "doc", "documentation", "wiki", "CONTRIBUTING.md", | |
| "CONTRIBUTORS.md", "CODE_OF_CONDUCT.md", "SECURITY.md", | |
| "SUPPORT.md", "docs/index.md", "docs/README.md", "docs/getting-started.md", | |
| ".github/ISSUE_TEMPLATE", ".github/PULL_REQUEST_TEMPLATE.md" | |
| ] | |
| doc_files = [] | |
| # Try to get each documentation file/directory | |
| for path in doc_paths: | |
| try: | |
| contents = self.get_contents(owner, repo, path, ref) | |
| # If it's a directory, get all markdown files in it | |
| if isinstance(contents, list): | |
| for item in contents: | |
| if item["type"] == "file" and item["name"].lower().endswith((".md", ".rst", ".txt")): | |
| content = self.get_file_content(owner, repo, item["path"], ref) | |
| if content: | |
| doc_files.append({ | |
| "name": item["name"], | |
| "path": item["path"], | |
| "content": content | |
| }) | |
| # If it's a file, get its content | |
| elif isinstance(contents, dict) and contents.get("type") == "file": | |
| content = self.get_file_content(owner, repo, path, ref) | |
| if content: | |
| doc_files.append({ | |
| "name": contents["name"], | |
| "path": contents["path"], | |
| "content": content | |
| }) | |
| except: | |
| # Path doesn't exist or access issues | |
| continue | |
| return doc_files | |
| def analyze_ast(self, code, file_path): | |
| """Analyze Python code using AST (Abstract Syntax Tree).""" | |
| if not file_path.endswith('.py'): | |
| return None | |
| try: | |
| tree = ast.parse(code) | |
| # Extract more detailed information using AST | |
| functions = [] | |
| classes = [] | |
| imports = [] | |
| function_complexities = {} | |
| for node in ast.walk(tree): | |
| # Get function definitions with arguments | |
| if isinstance(node, ast.FunctionDef): | |
| args = [] | |
| defaults = len(node.args.defaults) | |
| args_count = len(node.args.args) - defaults | |
| # Get positional args | |
| for arg in node.args.args[:args_count]: | |
| if hasattr(arg, 'arg'): # Python 3 | |
| args.append(arg.arg) | |
| else: # Python 2 | |
| args.append(arg.id) | |
| # Get args with defaults | |
| for i, arg in enumerate(node.args.args[args_count:]): | |
| if hasattr(arg, 'arg'): # Python 3 | |
| args.append(f"{arg.arg}=...") | |
| else: # Python 2 | |
| args.append(f"{arg.id}=...") | |
| # Calculate function complexity | |
| func_complexity = complexity.cc_visit(node) | |
| function_complexities[node.name] = func_complexity | |
| # Get docstring if available | |
| docstring = ast.get_docstring(node) | |
| functions.append({ | |
| 'name': node.name, | |
| 'args': args, | |
| 'complexity': func_complexity, | |
| 'docstring': docstring | |
| }) | |
| # Get class definitions | |
| elif isinstance(node, ast.ClassDef): | |
| methods = [] | |
| class_docstring = ast.get_docstring(node) | |
| # Get class methods | |
| for child in node.body: | |
| if isinstance(child, ast.FunctionDef): | |
| method_complexity = complexity.cc_visit(child) | |
| method_docstring = ast.get_docstring(child) | |
| methods.append({ | |
| 'name': child.name, | |
| 'complexity': method_complexity, | |
| 'docstring': method_docstring | |
| }) | |
| classes.append({ | |
| 'name': node.name, | |
| 'methods': methods, | |
| 'docstring': class_docstring | |
| }) | |
| # Get imports | |
| elif isinstance(node, ast.Import): | |
| for name in node.names: | |
| imports.append(name.name) | |
| elif isinstance(node, ast.ImportFrom): | |
| module = node.module or "" | |
| for name in node.names: | |
| imports.append(f"{module}.{name.name}") | |
| # Calculate overall code complexity | |
| code_complexity = complexity.cc_visit_ast(tree) | |
| # Calculate maintainability index | |
| try: | |
| mi_score = metrics.mi_visit(code, True) | |
| except: | |
| mi_score = None | |
| return { | |
| 'functions': functions, | |
| 'classes': classes, | |
| 'imports': imports, | |
| 'complexity': { | |
| 'overall': code_complexity, | |
| 'functions': function_complexities, | |
| 'maintainability_index': mi_score | |
| } | |
| } | |
| except SyntaxError: | |
| print(f"Syntax error in Python file: {file_path}") | |
| return None | |
| except Exception as e: | |
| print(f"Error analyzing {file_path}: {str(e)}") | |
| return None | |
| def analyze_js_ts(self, code, file_path): | |
| """Analyze JavaScript/TypeScript code using regex with improved patterns.""" | |
| if not file_path.endswith(('.js', '.ts', '.jsx', '.tsx')): | |
| return None | |
| # More sophisticated regex patterns for JS/TS analysis | |
| results = { | |
| 'functions': [], | |
| 'classes': [], | |
| 'imports': [], | |
| 'exports': [], | |
| 'hooks': [] # For React hooks | |
| } | |
| # Function patterns (covering various declaration styles) | |
| function_patterns = [ | |
| # Regular functions | |
| r'function\s+(\w+)\s*\(([^)]*)\)', | |
| # Arrow functions assigned to variables | |
| r'(?:const|let|var)\s+(\w+)\s*=\s*(?:\([^)]*\)|[^=]*)\s*=>\s*{', | |
| # Class methods | |
| r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{', | |
| # Object methods | |
| r'(\w+)\s*:\s*function\s*\(([^)]*)\)' | |
| ] | |
| for pattern in function_patterns: | |
| for match in re.finditer(pattern, code): | |
| func_name = match.group(1) | |
| args = match.group(2).strip() if len(match.groups()) > 1 else "" | |
| results['functions'].append({ | |
| 'name': func_name, | |
| 'args': args | |
| }) | |
| # Class pattern | |
| class_pattern = r'class\s+(\w+)(?:\s+extends\s+(\w+))?\s*{([^}]*)}' | |
| for match in re.finditer(class_pattern, code, re.DOTALL): | |
| class_name = match.group(1) | |
| parent_class = match.group(2) if match.group(2) else None | |
| class_body = match.group(3) | |
| # Find methods in class | |
| methods = [] | |
| method_pattern = r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{([^}]*)}' | |
| for method_match in re.finditer(method_pattern, class_body): | |
| method_name = method_match.group(1) | |
| methods.append(method_name) | |
| results['classes'].append({ | |
| 'name': class_name, | |
| 'extends': parent_class, | |
| 'methods': methods | |
| }) | |
| # Import patterns | |
| import_patterns = [ | |
| # ES6 imports | |
| r'import\s+(?:{([^}]*)}|\*\s+as\s+(\w+)|(\w+))\s+from\s+[\'"]([^\'"]+)[\'"]', | |
| # CommonJS requires | |
| r'(?:const|let|var)\s+(?:{([^}]*)}|(\w+))\s*=\s*require\([\'"]([^\'"]+)[\'"]\)' | |
| ] | |
| for pattern in import_patterns: | |
| for match in re.finditer(pattern, code): | |
| groups = match.groups() | |
| if groups[0]: # Destructured import | |
| imports = [name.strip() for name in groups[0].split(',')] | |
| for imp in imports: | |
| results['imports'].append(imp) | |
| elif groups[1]: # Namespace import (import * as X) | |
| results['imports'].append(groups[1]) | |
| elif groups[2]: # Default import | |
| results['imports'].append(groups[2]) | |
| elif groups[3]: # Module name | |
| results['imports'].append(groups[3]) | |
| # React hooks detection (for React files) | |
| if file_path.endswith(('.jsx', '.tsx')): | |
| hook_pattern = r'use([A-Z]\w+)\s*\(' | |
| for match in re.finditer(hook_pattern, code): | |
| hook_name = 'use' + match.group(1) | |
| results['hooks'].append(hook_name) | |
| # Export patterns | |
| export_patterns = [ | |
| # Named exports | |
| r'export\s+(?:const|let|var|function|class)\s+(\w+)', | |
| # Default exports | |
| r'export\s+default\s+(?:function|class)?\s*(\w+)?' | |
| ] | |
| for pattern in export_patterns: | |
| for match in re.finditer(pattern, code): | |
| if match.group(1): | |
| results['exports'].append(match.group(1)) | |
| return results | |
| def extract_code_summary(self, file_content, file_path): | |
| """Extract comprehensive summary information from code files.""" | |
| extension = os.path.splitext(file_path)[1].lower() | |
| # Initialize summary | |
| summary = { | |
| "functions": [], | |
| "classes": [], | |
| "imports": [], | |
| "description": "", | |
| "complexity": None | |
| } | |
| # Extract Python definitions with AST | |
| if extension == '.py': | |
| ast_result = self.analyze_ast(file_content, file_path) | |
| if ast_result: | |
| summary["functions"] = [f["name"] for f in ast_result["functions"]] | |
| summary["classes"] = [c["name"] for c in ast_result["classes"]] | |
| summary["imports"] = ast_result["imports"] | |
| summary["complexity"] = ast_result["complexity"] | |
| # Try to extract module docstring | |
| try: | |
| tree = ast.parse(file_content) | |
| module_docstring = ast.get_docstring(tree) | |
| if module_docstring: | |
| summary["description"] = module_docstring | |
| except: | |
| pass | |
| # Add detailed function and class info | |
| summary["detailed_functions"] = ast_result["functions"] | |
| summary["detailed_classes"] = ast_result["classes"] | |
| # Extract JavaScript/TypeScript definitions | |
| elif extension in ['.js', '.ts', '.jsx', '.tsx']: | |
| js_result = self.analyze_js_ts(file_content, file_path) | |
| if js_result: | |
| summary["functions"] = [f["name"] for f in js_result["functions"]] | |
| summary["classes"] = [c["name"] for c in js_result["classes"]] | |
| summary["imports"] = js_result["imports"] | |
| # Add detailed function and class info | |
| summary["detailed_functions"] = js_result["functions"] | |
| summary["detailed_classes"] = js_result["classes"] | |
| summary["hooks"] = js_result.get("hooks", []) | |
| summary["exports"] = js_result.get("exports", []) | |
| # Calculate basic code metrics for any text file | |
| if file_content: | |
| lines = file_content.split('\n') | |
| code_lines = 0 | |
| comment_lines = 0 | |
| blank_lines = 0 | |
| comment_prefixes = ['#', '//', '/*', '*', '<!--'] | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| blank_lines += 1 | |
| elif any(line.startswith(prefix) for prefix in comment_prefixes): | |
| comment_lines += 1 | |
| else: | |
| code_lines += 1 | |
| summary["metrics"] = { | |
| "total_lines": len(lines), | |
| "code_lines": code_lines, | |
| "comment_lines": comment_lines, | |
| "blank_lines": blank_lines, | |
| "comment_ratio": comment_lines / max(1, code_lines + comment_lines) | |
| } | |
| return summary | |
| def analyze_dependencies(self, owner, repo, max_files=100): | |
| """Analyze code dependencies across the repository.""" | |
| # Get Python and JavaScript files | |
| text_files = self.get_all_text_files(owner, repo, max_files=max_files) | |
| # Filter for Python and JS/TS files | |
| code_files = [f for f in text_files if f["name"].endswith(('.py', '.js', '.ts', '.jsx', '.tsx'))] | |
| # Track dependencies | |
| dependencies = { | |
| 'internal': defaultdict(set), # File to file dependencies | |
| 'external': defaultdict(set), # External package dependencies by file | |
| 'modules': defaultdict(set) # Defined modules/components by file | |
| } | |
| # Extract module names from file paths | |
| file_to_module = {} | |
| for file in code_files: | |
| # Convert file path to potential module name | |
| module_path = os.path.splitext(file["path"])[0].replace('/', '.') | |
| file_to_module[file["path"]] = module_path | |
| # Track what each file defines | |
| summary = self.extract_code_summary(file["content"], file["path"]) | |
| if file["name"].endswith('.py'): | |
| for function in summary.get("functions", []): | |
| dependencies['modules'][file["path"]].add(f"{module_path}.{function}") | |
| for class_name in summary.get("classes", []): | |
| dependencies['modules'][file["path"]].add(f"{module_path}.{class_name}") | |
| else: # JS/TS files | |
| for export in summary.get("exports", []): | |
| dependencies['modules'][file["path"]].add(export) | |
| # Analyze imports/dependencies | |
| for file in code_files: | |
| summary = self.extract_code_summary(file["content"], file["path"]) | |
| for imp in summary.get("imports", []): | |
| # Check if this is an internal import | |
| is_internal = False | |
| if file["name"].endswith('.py'): | |
| # For Python, check if the import matches any module path | |
| for module_path in file_to_module.values(): | |
| if imp == module_path or imp.startswith(f"{module_path}."): | |
| is_internal = True | |
| # Find the file that defines this module | |
| for f_path, m_path in file_to_module.items(): | |
| if m_path == imp.split('.')[0]: | |
| dependencies['internal'][file["path"]].add(f_path) | |
| break | |
| else: | |
| # For JS/TS, check relative imports | |
| if imp.startswith('./') or imp.startswith('../'): | |
| is_internal = True | |
| # Try to resolve the relative import | |
| src_dir = os.path.dirname(file["path"]) | |
| target_path = os.path.normpath(os.path.join(src_dir, imp)) | |
| # Add known extensions if not specified | |
| if '.' not in os.path.basename(target_path): | |
| for ext in ['.js', '.ts', '.jsx', '.tsx']: | |
| test_path = f"{target_path}{ext}" | |
| if test_path in file_to_module: | |
| dependencies['internal'][file["path"]].add(test_path) | |
| break | |
| # If not internal, consider it external | |
| if not is_internal: | |
| # Clean up the import name (remove relative path parts) | |
| if not file["name"].endswith('.py'): | |
| imp = imp.split('/')[0] # Take the package name part | |
| dependencies['external'][file["path"]].add(imp) | |
| return dependencies | |
| def create_dependency_graph(self, dependencies): | |
| """Create a NetworkX graph from dependencies for visualization.""" | |
| G = nx.DiGraph() | |
| # Add nodes for files | |
| for file_path in dependencies['internal'].keys(): | |
| G.add_node(file_path, type='file') | |
| # Add edges for internal dependencies | |
| for file_path, deps in dependencies['internal'].items(): | |
| for dep in deps: | |
| G.add_edge(file_path, dep) | |
| # Add nodes and edges for external dependencies | |
| external_nodes = set() | |
| for file_path, deps in dependencies['external'].items(): | |
| for dep in deps: | |
| external_node = f"ext:{dep}" | |
| if external_node not in external_nodes: | |
| G.add_node(external_node, type='external') | |
| external_nodes.add(external_node) | |
| G.add_edge(file_path, external_node) | |
| return G | |
| def get_repo_text_summary(self, owner, repo, max_files=25): | |
| """Extract and summarize text content from the repository with improved metrics.""" | |
| # Get README | |
| readme = self.get_readme(owner, repo) | |
| # Get documentation | |
| docs = self.get_documentation_files(owner, repo) | |
| # Get key code files (limit to avoid API rate limits) | |
| text_files = self.get_all_text_files(owner, repo, max_files=max_files) | |
| # Analyze code files | |
| code_summary = {} | |
| complexity_metrics = { | |
| 'cyclomatic_complexity': [], | |
| 'maintainability_index': [], | |
| 'comment_ratios': [] | |
| } | |
| for file in text_files: | |
| ext = os.path.splitext(file["name"])[1].lower() | |
| if ext in ['.py', '.js', '.ts', '.jsx', '.tsx']: | |
| file_summary = self.extract_code_summary(file["content"], file["path"]) | |
| code_summary[file["path"]] = file_summary | |
| # Collect complexity metrics | |
| if file_summary.get('complexity'): | |
| cc = file_summary['complexity'].get('overall') | |
| if cc is not None: | |
| complexity_metrics['cyclomatic_complexity'].append((file["path"], cc)) | |
| mi = file_summary['complexity'].get('maintainability_index') | |
| if mi is not None: | |
| complexity_metrics['maintainability_index'].append((file["path"], mi)) | |
| if file_summary.get('metrics'): | |
| comment_ratio = file_summary['metrics'].get('comment_ratio', 0) | |
| complexity_metrics['comment_ratios'].append((file["path"], comment_ratio)) | |
| # Analyze dependencies | |
| dependencies = self.analyze_dependencies(owner, repo, max_files=max_files) | |
| # Summarize repository content by file type | |
| file_types = defaultdict(int) | |
| for file in text_files: | |
| ext = os.path.splitext(file["name"])[1].lower() | |
| file_types[ext] += 1 | |
| # Calculate aggregate code metrics | |
| total_code_lines = sum(summary.get('metrics', {}).get('code_lines', 0) | |
| for summary in code_summary.values()) | |
| total_comment_lines = sum(summary.get('metrics', {}).get('comment_lines', 0) | |
| for summary in code_summary.values()) | |
| aggregate_metrics = { | |
| 'total_files': len(text_files), | |
| 'total_code_lines': total_code_lines, | |
| 'total_comment_lines': total_comment_lines, | |
| 'average_comment_ratio': (total_comment_lines / total_code_lines) if total_code_lines > 0 else 0 | |
| } | |
| return { | |
| "readme": readme, | |
| "documentation": docs, | |
| "code_summary": code_summary, | |
| "complexity_metrics": complexity_metrics, | |
| "dependencies": dependencies, | |
| "file_type_counts": dict(file_types), | |
| "aggregate_metrics": aggregate_metrics, | |
| "text_files": text_files # Include the actual text file contents | |
| } | |
| def get_temporal_analysis(self, owner, repo): | |
| """Perform temporal analysis of repository activity.""" | |
| # Get commit activity over time | |
| commit_activity = self.get_commit_activity(owner, repo) | |
| # Get code frequency (additions/deletions over time) | |
| code_frequency = self.get_code_frequency(owner, repo) | |
| # Get contributor activity | |
| contributor_activity = self.get_contributor_activity(owner, repo) | |
| # Get issue and PR timelines | |
| issue_timeline = self.get_issue_timeline(owner, repo) | |
| pr_timeline = self.get_pr_timeline(owner, repo) | |
| # Process data for visualization | |
| # - Weekly commit counts | |
| weekly_commits = [] | |
| if commit_activity: | |
| for week in commit_activity: | |
| date = datetime.fromtimestamp(week['week']) | |
| weekly_commits.append({ | |
| 'date': date.strftime('%Y-%m-%d'), | |
| 'total': week['total'], | |
| 'days': week['days'] # Daily breakdown within the week | |
| }) | |
| # - Weekly code changes | |
| weekly_code_changes = [] | |
| if code_frequency: | |
| for item in code_frequency: | |
| date = datetime.fromtimestamp(item[0]) | |
| weekly_code_changes.append({ | |
| 'date': date.strftime('%Y-%m-%d'), | |
| 'additions': item[1], | |
| 'deletions': -item[2] # Convert to positive for visualization | |
| }) | |
| # - Contributor timeline | |
| contributor_timeline = {} | |
| if contributor_activity: | |
| for contributor in contributor_activity: | |
| author = contributor['author']['login'] | |
| weeks = contributor['weeks'] | |
| if author not in contributor_timeline: | |
| contributor_timeline[author] = [] | |
| for week in weeks: | |
| if week['c'] > 0: # Only include weeks with commits | |
| date = datetime.fromtimestamp(week['w']) | |
| contributor_timeline[author].append({ | |
| 'date': date.strftime('%Y-%m-%d'), | |
| 'commits': week['c'], | |
| 'additions': week['a'], | |
| 'deletions': week['d'] | |
| }) | |
| return { | |
| 'weekly_commits': weekly_commits, | |
| 'weekly_code_changes': weekly_code_changes, | |
| 'contributor_timeline': contributor_timeline, | |
| 'issue_timeline': issue_timeline, | |
| 'pr_timeline': pr_timeline | |
| } | |
| def get_all_info(self, owner, repo): | |
| """Get comprehensive information about a repository with enhanced metrics.""" | |
| result = { | |
| "timestamp": datetime.now().isoformat(), | |
| "basic_info": self.get_repo_info(owner, repo) | |
| } | |
| if not result["basic_info"]: | |
| print(f"Could not retrieve repository information for {owner}/{repo}") | |
| return None | |
| print("Getting repository statistics...") | |
| # Get additional information | |
| result["languages"] = self.get_languages(owner, repo) | |
| result["contributors"] = self.get_contributors(owner, repo, max_contributors=30) | |
| result["recent_commits"] = self.get_commits(owner, repo, max_commits=30) | |
| result["branches"] = self.get_branches(owner, repo) | |
| result["releases"] = self.get_releases(owner, repo, max_releases=10) | |
| result["open_issues"] = self.get_issues(owner, repo, state="open", max_issues=50) | |
| result["open_pull_requests"] = self.get_pull_requests(owner, repo, state="open", max_prs=50) | |
| result["root_contents"] = self.get_contents(owner, repo) | |
| print("Analyzing repository content...") | |
| # Get text content and documentation | |
| result["text_content"] = self.get_repo_text_summary(owner, repo, max_files=30) | |
| print("Analyzing repository activity over time...") | |
| # Get temporal analysis | |
| result["temporal_analysis"] = self.get_temporal_analysis(owner, repo) | |
| return result | |
| def get_pull_request_details(self, owner, repo, pr_number): | |
| """Get detailed information for a specific Pull Request using PyGithub.""" | |
| if not self.github: | |
| print("PyGithub client not initialized. Cannot fetch PR details.") | |
| return None | |
| try: | |
| repo_obj = self.github.get_repo(f"{owner}/{repo}") | |
| pr = repo_obj.get_pull(pr_number) | |
| # Extract relevant information into a dictionary | |
| details = { | |
| "number": pr.number, | |
| "title": pr.title, | |
| "state": pr.state, # 'open', 'closed' | |
| "merged": pr.merged, | |
| "body": pr.body or "", # Ensure body is string | |
| "url": pr.html_url, | |
| "created_at": pr.created_at.isoformat() if pr.created_at else None, | |
| "updated_at": pr.updated_at.isoformat() if pr.updated_at else None, | |
| "closed_at": pr.closed_at.isoformat() if pr.closed_at else None, | |
| "merged_at": pr.merged_at.isoformat() if pr.merged_at else None, | |
| "author": pr.user.login if pr.user else "N/A", | |
| "commits_count": pr.commits, | |
| "additions": pr.additions, | |
| "deletions": pr.deletions, | |
| "changed_files_count": pr.changed_files, | |
| "labels": [label.name for label in pr.labels], | |
| "assignees": [assignee.login for assignee in pr.assignees], | |
| "milestone": pr.milestone.title if pr.milestone else None, | |
| "repo_full_name": f"{owner}/{repo}", # Add repo context | |
| } | |
| return details | |
| except GithubException as e: | |
| if e.status == 404: | |
| print(f"Error: Pull Request #{pr_number} not found in {owner}/{repo}.") | |
| else: | |
| print(f"Error fetching PR #{pr_number} details: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching PR details: {e}") | |
| return None | |
| class RepoAnalyzer: | |
| """Streamlined class to analyze GitHub repositories.""" | |
| def __init__(self, github_token=None, gemini_api_key=None): | |
| """Initialize with GitHub and optional Gemini credentials.""" | |
| load_dotenv() # Load .env file if it exists | |
| self.github_token = github_token or os.getenv("GITHUB_TOKEN") | |
| self.gemini_api_key = gemini_api_key or os.getenv("GOOGLE_API_KEY") | |
| # Initialize GitHub analyzer | |
| self.github_analyzer = GitHubRepoInfo(token=self.github_token) | |
| # Initialize Gemini model if API key is provided | |
| self.gemini_model = None | |
| if self.gemini_api_key and GEMINI_AVAILABLE: | |
| try: | |
| genai.configure(api_key=self.gemini_api_key) | |
| self.gemini_model = genai.GenerativeModel('gemini-1.5-pro-latest') | |
| print("Gemini model initialized for PR summarization.") | |
| except Exception as e: | |
| print(f"Error initializing Gemini: {e}") | |
| self.repo_data = None | |
| self.owner = None | |
| self.repo = None | |
| self.repo_full_name = None | |
| def analyze_repo(self, owner, repo): | |
| """Analyze a GitHub repository and store the data.""" | |
| self.owner = owner | |
| self.repo = repo | |
| self.repo_full_name = f"{owner}/{repo}" | |
| print(f"\nFetching repository information for {self.repo_full_name}...") | |
| self.repo_data = self.github_analyzer.get_all_info(owner, repo) | |
| if not self.repo_data: | |
| print(f"Failed to get repository information for {self.repo_full_name}") | |
| return False | |
| print(f"Successfully analyzed repository: {self.repo_full_name}") | |
| return True | |
| def _get_pr_summary_prompt(self, pr_details, role): | |
| """Generate a prompt for Gemini to summarize PR based on role.""" | |
| # Extract key details safely | |
| title = pr_details.get('title', 'N/A') | |
| body = pr_details.get('body', 'No description provided.') | |
| pr_number = pr_details.get('number', 'N/A') | |
| repo_name = pr_details.get('repo_full_name', 'N/A') | |
| author = pr_details.get('author', 'N/A') | |
| state = pr_details.get('state', 'N/A') | |
| merged_status = 'Merged' if pr_details.get('merged') else ('Closed' if state == 'closed' else 'Open') | |
| created_at = pr_details.get('created_at', 'N/A') | |
| commits_count = pr_details.get('commits_count', 'N/A') | |
| changed_files = pr_details.get('changed_files_count', 'N/A') | |
| additions = pr_details.get('additions', 'N/A') | |
| deletions = pr_details.get('deletions', 'N/A') | |
| labels = ', '.join(pr_details.get('labels', [])) or 'None' | |
| # Truncate long body | |
| max_body_len = 1500 | |
| truncated_body = body[:max_body_len] + ('...' if len(body) > max_body_len else '') | |
| base_prompt = f""" | |
| You are an AI assistant specializing in summarizing GitHub Pull Requests. | |
| Analyze the following Pull Request details from repository '{repo_name}' and provide a summary tailored for a '{role}'. | |
| **Pull Request #{pr_number}: {title}** | |
| * **Author:** {author} | |
| * **Status:** {state.capitalize()} ({merged_status}) | |
| * **Created:** {created_at} | |
| * **Commits:** {commits_count} | |
| * **Changed Files:** {changed_files} | |
| * **Code Churn:** +{additions} / -{deletions} lines | |
| * **Labels:** {labels} | |
| * **Description/Body:** | |
| {truncated_body} | |
| --- | |
| """ | |
| role_instructions = "" | |
| # Define role-specific instructions | |
| if role == 'Developer': | |
| role_instructions = """ | |
| **Summary Focus (Developer):** | |
| * Summarize the core technical changes and their purpose. | |
| * Identify key files, modules, or functions affected. | |
| * Mention any potential technical complexities, risks, or areas needing careful code review (based *only* on the description and metadata). | |
| * Note any mention of tests added or modified. | |
| * Be concise and focus on technical aspects relevant for peer review or understanding the change. | |
| """ | |
| elif role == 'Manager' or role == 'Team Lead': | |
| role_instructions = """ | |
| **Summary Focus (Manager/Team Lead):** | |
| * Explain the high-level purpose and business value (what problem does this PR solve or what feature does it add?). | |
| * Summarize the overall status (e.g., Ready for Review, Needs Work, Merged, Blocked?). | |
| * Give a sense of the PR's size/complexity (e.g., Small/Medium/Large based on file/line changes and description). | |
| * Highlight any mentioned risks, blockers, or dependencies on other work. | |
| * Include the author and key dates (created, merged/closed). | |
| * Focus on information needed for tracking progress and impact. | |
| """ | |
| elif role == 'Program Manager' or role == 'Product Owner': | |
| role_instructions = """ | |
| **Summary Focus (Program/Product Manager):** | |
| * Describe the user-facing impact or the feature/bug fix being addressed. | |
| * Relate the PR to product goals or requirements if possible (based on title/body/labels). | |
| * Note the status (especially if merged or closed). | |
| * Mention associated issues or tickets if referenced in the body (though not explicitly provided here, look for patterns like '#123'). | |
| * Focus on 'what' and 'why' from a product perspective. | |
| """ | |
| else: # Default/General | |
| role_instructions = """ | |
| **Summary Focus (General):** | |
| * State the main goal or purpose of the PR clearly. | |
| * Identify the author and the current status (Open/Closed/Merged). | |
| * Provide a brief, balanced overview of the key changes made. | |
| * Keep the summary accessible to a wider audience. | |
| """ | |
| return base_prompt + role_instructions + "\n**Summary:**" | |
| def summarize_pull_request(self, pr_number, role='Developer'): | |
| """Summarize a pull request using Gemini AI.""" | |
| if not self.gemini_model: | |
| return "Gemini model not initialized. Cannot generate summary." | |
| if not self.owner or not self.repo: | |
| return "Repository owner and name not set. Analyze a repository first." | |
| print(f"\nFetching details for PR #{pr_number} in {self.repo_full_name}...") | |
| pr_details = self.github_analyzer.get_pull_request_details(self.owner, self.repo, pr_number) | |
| if not pr_details: | |
| return f"Could not retrieve details for PR #{pr_number}." | |
| print(f"Generating summary for role: {role}...") | |
| # Generate the role-specific prompt | |
| prompt = self._get_pr_summary_prompt(pr_details, role) | |
| try: | |
| response = self.gemini_model.generate_content(prompt) | |
| summary_text = response.text | |
| return summary_text | |
| except Exception as e: | |
| print(f"Error communicating with Gemini for PR summary: {e}") | |
| return f"Error generating PR summary: {e}" | |
| def create_dependency_network_html(self, output_file="dependency_network.html"): | |
| """Create an interactive network visualization of dependencies using PyVis.""" | |
| if not self.repo_data: | |
| print("No repository data available.") | |
| return None | |
| # Get the dependencies | |
| dependencies = self.repo_data.get("text_content", {}).get("dependencies", {}) | |
| if not dependencies: | |
| print("No dependency data available.") | |
| return None | |
| internal_deps = dependencies.get('internal', {}) | |
| external_deps = dependencies.get('external', {}) | |
| # Create NetworkX graph first | |
| G = nx.DiGraph() | |
| # Add file nodes and internal dependencies | |
| for file_path, deps in internal_deps.items(): | |
| file_name = os.path.basename(file_path) | |
| G.add_node(file_path, label=file_name, title=file_path, group="file") | |
| for dep in deps: | |
| dep_name = os.path.basename(dep) | |
| G.add_node(dep, label=dep_name, title=dep, group="file") | |
| G.add_edge(file_path, dep) | |
| # Add external dependencies | |
| for file_path, deps in external_deps.items(): | |
| if file_path not in G.nodes: | |
| file_name = os.path.basename(file_path) | |
| G.add_node(file_path, label=file_name, title=file_path, group="file") | |
| for dep in deps: | |
| ext_node = f"ext:{dep}" | |
| G.add_node(ext_node, label=dep, title=dep, group="external") | |
| G.add_edge(file_path, ext_node) | |
| # Create PyVis network from NetworkX graph | |
| net = Network(height="750px", width="100%", directed=True, notebook=False) | |
| # Set network options for better visualization | |
| net.set_options(""" | |
| { | |
| "physics": { | |
| "hierarchicalRepulsion": { | |
| "centralGravity": 0.0, | |
| "springLength": 100, | |
| "springConstant": 0.01, | |
| "nodeDistance": 120 | |
| }, | |
| "maxVelocity": 50, | |
| "minVelocity": 0.1, | |
| "solver": "hierarchicalRepulsion" | |
| }, | |
| "layout": { | |
| "improvedLayout": true | |
| } | |
| } | |
| """) | |
| # Add nodes with properties from NetworkX graph | |
| for node, node_attrs in G.nodes(data=True): | |
| group = node_attrs.get('group', 'file') | |
| # Set colors based on node type | |
| color = "#97c2fc" if group == "file" else "#fb7e81" # blue for files, red for external | |
| net.add_node( | |
| node, | |
| label=node_attrs.get('label', str(node)), | |
| title=node_attrs.get('title', str(node)), | |
| color=color | |
| ) | |
| # Add edges | |
| for source, target in G.edges(): | |
| net.add_edge(source, target) | |
| # Generate and save the HTML file | |
| net.save_graph(output_file) | |
| print(f"Dependency network visualization saved to {output_file}") | |
| return output_file | |
| def create_vizro_dashboard(self, output_dir='./vizro_dashboard'): | |
| """Create a Vizro dashboard from repository data.""" | |
| if not self.repo_data: | |
| print("No repository data available. Run analyze_repo() first.") | |
| return None | |
| # Create output directory if it doesn't exist | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| # Extract repository data | |
| repo_name = self.repo_data["basic_info"]["full_name"] | |
| basic_info = self.repo_data["basic_info"] | |
| # Create dashboard pages | |
| pages = [] | |
| # 1. Overview Page | |
| overview_components = [] | |
| # Basic repository info as a card | |
| repo_info_md = f""" | |
| # {basic_info['full_name']} | |
| **Description:** {basic_info.get('description', 'No description')} | |
| **Stars:** {basic_info['stargazers_count']} | | |
| **Forks:** {basic_info['forks_count']} | | |
| **Open Issues:** {basic_info['open_issues_count']} | |
| **Created:** {basic_info['created_at']} | | |
| **Last Updated:** {basic_info['updated_at']} | |
| **Default Branch:** {basic_info['default_branch']} | |
| **License:** {basic_info['license']['name'] if basic_info.get('license') else 'Not specified'} | |
| """ | |
| overview_components.append(vzm.Card(text=repo_info_md)) | |
| # Languages pie chart | |
| if self.repo_data.get("languages"): | |
| langs_data = [] | |
| total = sum(self.repo_data["languages"].values()) | |
| for lang, bytes_count in self.repo_data["languages"].items(): | |
| percentage = (bytes_count / total) * 100 | |
| langs_data.append({ | |
| "Language": lang, | |
| "Bytes": bytes_count, | |
| "Percentage": percentage | |
| }) | |
| langs_df = pd.DataFrame(langs_data) | |
| lang_pie = vzm.Graph( | |
| figure=px.pie( | |
| langs_df, | |
| values="Percentage", | |
| names="Language", | |
| title="Language Distribution" | |
| ) | |
| ) | |
| overview_components.append(vzm.Card(graph=lang_pie)) | |
| # Contributors bar chart | |
| if self.repo_data.get("contributors"): | |
| contrib_data = [] | |
| for contributor in self.repo_data["contributors"][:15]: | |
| contrib_data.append({ | |
| "Username": contributor['login'], | |
| "Contributions": contributor['contributions'] | |
| }) | |
| contrib_df = pd.DataFrame(contrib_data) | |
| contrib_bar = vzm.Graph( | |
| figure=px.bar( | |
| contrib_df, | |
| x="Username", | |
| y="Contributions", | |
| title="Top Contributors" | |
| ) | |
| ) | |
| overview_components.append(vzm.Card(graph=contrib_bar)) | |
| # Add overview page | |
| pages.append( | |
| vzm.Page( | |
| title="Overview", | |
| components=overview_components | |
| ) | |
| ) | |
| # 2. Activity Page | |
| activity_components = [] | |
| # Commit activity over time | |
| weekly_commits = self.repo_data.get("temporal_analysis", {}).get("weekly_commits", []) | |
| if weekly_commits: | |
| commits_df = pd.DataFrame([ | |
| {"Date": week['date'], "Commits": week['total']} | |
| for week in weekly_commits | |
| ]) | |
| commits_line = vzm.Graph( | |
| figure=px.line( | |
| commits_df, | |
| x="Date", | |
| y="Commits", | |
| title="Weekly Commit Activity" | |
| ) | |
| ) | |
| activity_components.append(vzm.Card(graph=commits_line)) | |
| # Code changes over time | |
| weekly_code_changes = self.repo_data.get("temporal_analysis", {}).get("weekly_code_changes", []) | |
| if weekly_code_changes: | |
| changes_data = [] | |
| for week in weekly_code_changes: | |
| changes_data.append({ | |
| "Date": week['date'], | |
| "Additions": week['additions'], | |
| "Deletions": -abs(week['deletions']) # Make negative for visualization | |
| }) | |
| changes_df = pd.DataFrame(changes_data) | |
| # Create a stacked bar chart | |
| changes_fig = go.Figure() | |
| changes_fig.add_trace(go.Bar( | |
| x=changes_df["Date"], | |
| y=changes_df["Additions"], | |
| name="Additions", | |
| marker_color="green" | |
| )) | |
| changes_fig.add_trace(go.Bar( | |
| x=changes_df["Date"], | |
| y=changes_df["Deletions"], | |
| name="Deletions", | |
| marker_color="red" | |
| )) | |
| changes_fig.update_layout( | |
| title="Weekly Code Changes", | |
| barmode="relative" | |
| ) | |
| changes_chart = vzm.Graph(figure=changes_fig) | |
| activity_components.append(vzm.Card(graph=changes_chart)) | |
| # Issue resolution times | |
| issue_timeline = self.repo_data.get("temporal_analysis", {}).get("issue_timeline", {}) | |
| if issue_timeline and issue_timeline.get('resolution_times'): | |
| resolution_times = issue_timeline['resolution_times'] | |
| # Convert to hours for better visualization (cap at one week) | |
| rt_hours = [min(rt, 168) for rt in resolution_times if rt is not None] | |
| # Create histogram | |
| issue_resolution_fig = px.histogram( | |
| x=rt_hours, | |
| title="Issue Resolution Times (Capped at 1 Week)", | |
| labels={"x": "Hours to Resolution", "y": "Number of Issues"} | |
| ) | |
| # Add mean and median lines | |
| if rt_hours: | |
| mean_rt = np.mean(rt_hours) | |
| median_rt = np.median(rt_hours) | |
| issue_resolution_fig.add_vline( | |
| x=mean_rt, | |
| line_dash="dash", | |
| line_color="red", | |
| annotation_text=f"Mean: {mean_rt:.2f} hours" | |
| ) | |
| issue_resolution_fig.add_vline( | |
| x=median_rt, | |
| line_dash="dash", | |
| line_color="green", | |
| annotation_text=f"Median: {median_rt:.2f} hours" | |
| ) | |
| resolution_hist = vzm.Graph(figure=issue_resolution_fig) | |
| activity_components.append(vzm.Card(graph=resolution_hist)) | |
| # Add activity page | |
| pages.append( | |
| vzm.Page( | |
| title="Activity", | |
| components=activity_components | |
| ) | |
| ) | |
| # 3. Code Quality Page | |
| code_components = [] | |
| # Code complexity metrics | |
| complexity_metrics = self.repo_data.get("text_content", {}).get("complexity_metrics", {}) | |
| cyclomatic_complexity = complexity_metrics.get("cyclomatic_complexity", []) | |
| if cyclomatic_complexity: | |
| # Prepare data for top complex files | |
| complexity_data = [] | |
| for path, cc in cyclomatic_complexity: | |
| # Ensure cc is numeric | |
| if isinstance(cc, (int, float)): | |
| complexity_data.append({ | |
| "File": os.path.basename(path), | |
| "Path": path, | |
| "Complexity": cc | |
| }) | |
| if complexity_data: | |
| # Sort by complexity | |
| complexity_data.sort(key=lambda x: x["Complexity"], reverse=True) | |
| # Take top 10 | |
| top_complex_files = complexity_data[:10] | |
| complex_df = pd.DataFrame(top_complex_files) | |
| complex_bar = vzm.Graph( | |
| figure=px.bar( | |
| complex_df, | |
| x="File", | |
| y="Complexity", | |
| title="Most Complex Files", | |
| hover_data=["Path"] | |
| ) | |
| ) | |
| code_components.append(vzm.Card(graph=complex_bar)) | |
| # Complexity histogram | |
| cc_values = [d["Complexity"] for d in complexity_data] | |
| cc_hist = vzm.Graph( | |
| figure=px.histogram( | |
| x=cc_values, | |
| title="Cyclomatic Complexity Distribution", | |
| labels={"x": "Complexity", "y": "Number of Files"} | |
| ) | |
| ) | |
| code_components.append(vzm.Card(graph=cc_hist)) | |
| # Comment ratio by file | |
| comment_ratios = complexity_metrics.get("comment_ratios", []) | |
| if comment_ratios: | |
| comment_data = [] | |
| for path, ratio in comment_ratios: | |
| comment_data.append({ | |
| "File": os.path.basename(path), | |
| "Path": path, | |
| "Comment Ratio": ratio | |
| }) | |
| # Sort by ratio | |
| comment_data.sort(key=lambda x: x["Comment Ratio"], reverse=True) | |
| # Take top 10 | |
| top_commented_files = comment_data[:10] | |
| comment_df = pd.DataFrame(top_commented_files) | |
| comment_bar = vzm.Graph( | |
| figure=px.bar( | |
| comment_df, | |
| x="File", | |
| y="Comment Ratio", | |
| title="Most Commented Files", | |
| hover_data=["Path"] | |
| ) | |
| ) | |
| code_components.append(vzm.Card(graph=comment_bar)) | |
| # Add code quality page | |
| pages.append( | |
| vzm.Page( | |
| title="Code Quality", | |
| components=code_components | |
| ) | |
| ) | |
| # 4. Dependencies Page | |
| dependencies = self.repo_data.get("text_content", {}).get("dependencies", {}) | |
| if dependencies: | |
| dependencies_components = [] | |
| # External dependencies | |
| external_deps = dependencies.get("external", {}) | |
| if external_deps: | |
| # Count packages | |
| ext_counts = Counter() | |
| for file_deps in external_deps.values(): | |
| ext_counts.update(dep for dep in file_deps if isinstance(dep, str)) | |
| # Get top dependencies | |
| top_deps = ext_counts.most_common(10) | |
| deps_data = [] | |
| for pkg, count in top_deps: | |
| deps_data.append({ | |
| "Package": pkg, | |
| "Count": count | |
| }) | |
| deps_df = pd.DataFrame(deps_data) | |
| deps_bar = vzm.Graph( | |
| figure=px.bar( | |
| deps_df, | |
| x="Package", | |
| y="Count", | |
| title="Most Used External Dependencies" | |
| ) | |
| ) | |
| dependencies_components.append(vzm.Card(graph=deps_bar)) | |
| # Create dependency network visualization with PyVis in a separate HTML file | |
| # and embed a note about it in the dashboard | |
| try: | |
| network_file = self.create_dependency_network_html( | |
| output_file=os.path.join(output_dir, "dependency_network.html") | |
| ) | |
| if network_file: | |
| network_note = f""" | |
| ## Code Dependency Network | |
| An interactive visualization of code dependencies has been created as a separate file: | |
| `{os.path.basename(network_file)}` | |
| Open this file in a web browser to explore the code dependency network. | |
| """ | |
| dependencies_components.append(vzm.Card(text=network_note)) | |
| except Exception as e: | |
| print(f"Error creating dependency network: {e}") | |
| # Add dependencies page if we have components | |
| if dependencies_components: | |
| pages.append( | |
| vzm.Page( | |
| title="Dependencies", | |
| components=dependencies_components | |
| ) | |
| ) | |
| # Create the dashboard | |
| dashboard = vzm.Dashboard( | |
| title=f"GitHub Repository Analysis: {repo_name}", | |
| pages=pages | |
| ) | |
| # Export dashboard | |
| dashboard_path = os.path.join(output_dir, "dashboard.html") | |
| try: | |
| dashboard.save(dashboard_path) | |
| print(f"Vizro dashboard saved to {dashboard_path}") | |
| return dashboard_path | |
| except Exception as e: | |
| print(f"Error saving dashboard: {e}") | |
| return None | |
| # Create Gradio interface | |
| def create_gradio_interface(): | |
| """Create a Gradio interface for the GitHub repository analyzer.""" | |
| def analyze_repository(owner, repo, github_token=None, gemini_api_key=None): | |
| """Function to analyze a repository and return a Vizro dashboard.""" | |
| try: | |
| analyzer = RepoAnalyzer( | |
| github_token=github_token if github_token else None, | |
| gemini_api_key=gemini_api_key if gemini_api_key else None | |
| ) | |
| # Analyze repository | |
| success = analyzer.analyze_repo(owner, repo) | |
| if not success: | |
| return None, None, f"Failed to analyze repository: {owner}/{repo}. Check the repository name and your GitHub token." | |
| # Create Vizro dashboard | |
| dashboard_path = analyzer.create_vizro_dashboard(output_dir='./vizro_dashboard') | |
| # Create dependency network visualization | |
| network_path = analyzer.create_dependency_network_html(output_file='./vizro_dashboard/dependency_network.html') | |
| # Generate a simple report | |
| basic_info = analyzer.repo_data["basic_info"] | |
| report = f""" | |
| ### Repository Analysis: {basic_info['full_name']} | |
| **Description:** {basic_info.get('description', 'No description')} | |
| **Statistics:** | |
| - Stars: {basic_info['stargazers_count']} | |
| - Forks: {basic_info['forks_count']} | |
| - Open Issues: {basic_info['open_issues_count']} | |
| **Interactive Dashboard:** | |
| The full interactive Vizro dashboard has been created at: `{dashboard_path}` | |
| **Dependency Network:** | |
| The interactive dependency network visualization has been created at: `{network_path}` | |
| **Language Summary:** | |
| """ | |
| # Add language info | |
| if analyzer.repo_data.get("languages"): | |
| langs = analyzer.repo_data["languages"] | |
| total = sum(langs.values()) | |
| for lang, bytes_count in sorted(langs.items(), key=lambda x: x[1], reverse=True): | |
| percentage = (bytes_count / total) * 100 | |
| report += f"- {lang}: {percentage:.1f}%\n" | |
| # Add code metrics if available | |
| if analyzer.repo_data.get("text_content", {}).get("aggregate_metrics"): | |
| metrics = analyzer.repo_data["text_content"]["aggregate_metrics"] | |
| report += f""" | |
| **Code Metrics:** | |
| - Total Files Analyzed: {metrics.get('total_files', 'N/A')} | |
| - Total Code Lines: {metrics.get('total_code_lines', 'N/A')} | |
| - Comment Ratio: {metrics.get('average_comment_ratio', 'N/A'):.2f} | |
| """ | |
| return dashboard_path, network_path, report | |
| except Exception as e: | |
| return None, None, f"Error analyzing repository: {str(e)}" | |
| def summarize_pr(owner, repo, pr_number, role, github_token=None, gemini_api_key=None): | |
| """Function to summarize a PR for Gradio.""" | |
| try: | |
| analyzer = RepoAnalyzer( | |
| github_token=github_token if github_token else None, | |
| gemini_api_key=gemini_api_key if gemini_api_key else None | |
| ) | |
| # Analyze repo first (lightweight) | |
| success = analyzer.analyze_repo(owner, repo) | |
| if not success: | |
| return f"Failed to analyze repository: {owner}/{repo}. Check the repository name and your GitHub token." | |
| # Summarize the PR | |
| summary = analyzer.summarize_pull_request(int(pr_number), role) | |
| return summary | |
| except Exception as e: | |
| return f"Error summarizing PR: {str(e)}" | |
| def view_dashboard(dashboard_path): | |
| """Load dashboard content for the iframe.""" | |
| try: | |
| if not dashboard_path or not os.path.exists(dashboard_path): | |
| return "Dashboard file not found" | |
| with open(dashboard_path, 'r', encoding='utf-8') as f: | |
| html_content = f.read() | |
| return html_content | |
| except Exception as e: | |
| return f"Error loading dashboard: {str(e)}" | |
| def view_network(network_path): | |
| """Load network visualization content for the iframe.""" | |
| try: | |
| if not network_path or not os.path.exists(network_path): | |
| return "Network visualization file not found" | |
| with open(network_path, 'r', encoding='utf-8') as f: | |
| html_content = f.read() | |
| return html_content | |
| except Exception as e: | |
| return f"Error loading network visualization: {str(e)}" | |
| # UI Components | |
| with gr.Blocks(title="GitHub Repository Analyzer") as app: | |
| gr.Markdown("# GitHub Repository Analyzer with Vizro and PyVis") | |
| gr.Markdown("Analyze GitHub repositories, visualize code dependencies, and summarize pull requests") | |
| with gr.Tab("Repository Analysis"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| owner_input = gr.Textbox(label="Repository Owner (Username/Organization)") | |
| repo_input = gr.Textbox(label="Repository Name") | |
| github_token = gr.Textbox(label="GitHub Token (Optional)", type="password") | |
| gemini_api_key = gr.Textbox(label="Google API Key (Optional, for PR Summaries)", type="password") | |
| analyze_btn = gr.Button("Analyze Repository") | |
| with gr.Column(scale=2): | |
| report_output = gr.Markdown(label="Analysis Report") | |
| # Store paths but don't display them | |
| dashboard_path_state = gr.State() | |
| network_path_state = gr.State() | |
| with gr.Tabs(): | |
| with gr.TabItem("Dashboard"): | |
| # Fix: Remove height parameter from HTML component | |
| dashboard_frame = gr.HTML(label="Dashboard Preview") | |
| with gr.TabItem("Dependency Network"): | |
| # Fix: Remove height parameter from HTML component | |
| network_frame = gr.HTML(label="Dependency Network") | |
| analyze_btn.click( | |
| analyze_repository, | |
| inputs=[ | |
| owner_input, repo_input, github_token, gemini_api_key | |
| ], | |
| outputs=[dashboard_path_state, network_path_state, report_output] | |
| ) | |
| # Update iframes when paths change | |
| dashboard_path_state.change( | |
| view_dashboard, | |
| inputs=[dashboard_path_state], | |
| outputs=[dashboard_frame] | |
| ) | |
| network_path_state.change( | |
| view_network, | |
| inputs=[network_path_state], | |
| outputs=[network_frame] | |
| ) | |
| with gr.Tab("PR Summarizer"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| pr_owner_input = gr.Textbox(label="Repository Owner") | |
| pr_repo_input = gr.Textbox(label="Repository Name") | |
| pr_number_input = gr.Number(label="PR Number", precision=0) | |
| pr_role_input = gr.Dropdown( | |
| choices=["Developer", "Manager", "Team Lead", "Product Owner", "Program Manager", "General"], | |
| label="Your Role", | |
| value="Developer" | |
| ) | |
| pr_github_token = gr.Textbox(label="GitHub Token (Optional)", type="password") | |
| pr_gemini_api_key = gr.Textbox(label="Google API Key (Required for Gemini)", type="password") | |
| summarize_btn = gr.Button("Summarize PR") | |
| with gr.Column(scale=2): | |
| pr_summary_output = gr.Markdown(label="PR Summary") | |
| summarize_btn.click( | |
| summarize_pr, | |
| inputs=[ | |
| pr_owner_input, pr_repo_input, pr_number_input, | |
| pr_role_input, pr_github_token, pr_gemini_api_key | |
| ], | |
| outputs=pr_summary_output | |
| ) | |
| return app | |
| # Main function to run the app | |
| def main(): | |
| """Run the GitHub Repository Analyzer with Gradio interface.""" | |
| # Load environment variables | |
| load_dotenv() | |
| # Create and launch the Gradio interface | |
| app = create_gradio_interface() | |
| app.launch(share=True) | |
| if __name__ == "__main__": | |
| main() |