Build a static INFORMATION WEBSITE (documentation site). Do NOT build a dashboard, simulator, or fake metrics.
9f4c671
verified
| // Quantum Code Weaver Engine - Documentation Site JavaScript | |
| // ================================ | |
| // COMPLETE PYTHON PROJECT SOURCE CODE | |
| // ================================ | |
| const FILES = { | |
| "README.md": `# Quantum Code Weaver Engine | |
| A distributed, graph-based coding engine transforming software development from conversation to state-space search. | |
| ## Architecture | |
| - **Orchestrator (The Brain)**: Local LLM orchestrator responsible for verification, scoring, and routing. | |
| - **Workers (The Muscle)**: Cloud LLM workers for high-volume, parallel code generation. | |
| - **Redis Queues (Nervous System)**: Pub/Sub queue implementing the Hot Event Loop. | |
| - **Neo4j Graph (Long-Term Memory)**: Graph database storing the Verification Graph. | |
| ## Key Principles | |
| 1. **Context Economy**: Orchestrator consumes summarized state from candidate payload. | |
| 2. **Verification Graph**: Only successful states (score ≥ threshold) are stored in Neo4j. | |
| 3. **Async Decoupling**: Redis queues separate orchestrator from workers. | |
| ## Quick Start | |
| 1. Install dependencies: \`pip install -r requirements.txt\` | |
| 2. Start Redis and Neo4j. | |
| 3. Run: \`python src/main.py\` | |
| ## License | |
| MIT | |
| `, | |
| "requirements.txt": `redis>=4.5.4 | |
| neo4j>=5.14.0 | |
| pydantic>=2.5.0 | |
| asyncio>=3.4.3 | |
| aioredis>=2.0.1 | |
| python-dotenv>=1.0.0 | |
| `, | |
| "src/orchestrator.py": `""" | |
| Async orchestrator that never blocks waiting for workers. | |
| Consumes summarized_state from candidate payload (context economy). | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| from dataclasses import asdict | |
| from typing import AsyncGenerator, Dict, Any, List | |
| import hashlib | |
| from redis.asyncio import Redis | |
| from neo4j import GraphDatabase | |
| from .models import TaskSpec, CandidateResult, SummarizedState | |
| from .config import settings | |
| logger = logging.getLogger(__name__) | |
| class Orchestrator: | |
| """Main orchestrator class. | |
| Responsible for: | |
| 1. Polling queue:results for CandidateResult | |
| 2. Evaluating each candidate (deterministic scoring) | |
| 3. Updating Neo4j graph if score ≥ threshold | |
| """ | |
| def __init__( | |
| self, | |
| redis_host: str = settings.REDIS_HOST, | |
| redis_port: int = settings.REDIS_PORT, | |
| neo4j_uri: str = settings.NEO4J_URI, | |
| neo4j_user: str = settings.NEO4J_USER, | |
| neo4j_password: str = settings.NEO4J_PASSWORD, | |
| threshold_score: float = settings.THRESHOLD_SCORE, | |
| ): | |
| """Initialize orchestrator with Redis and Neo4j connections.""" | |
| self.redis_client = Redis(host=redis_host, port=redis_port) | |
| self.neo4j_driver = GraphDatabase.driver( | |
| neo4j_uri, | |
| auth=(neo4j_user, neo4j_password) | |
| ) | |
| self.threshold_score = threshold_score | |
| self.is_running = False | |
| logger.info(f"Orchestrator initialized with threshold {threshold_score}") | |
| async def poll_results(self) -> AsyncGenerator[CandidateResult, None]: | |
| """Async generator consuming queue:results. | |
| Yields: | |
| CandidateResult: parsed from JSON in Redis list. | |
| Note: | |
| Uses RPOP (right pop) from queue:results. | |
| Never blocks indefinitely; uses short sleep. | |
| """ | |
| while self.is_running: | |
| try: | |
| # RPOP from queue:results | |
| result_bytes = await self.redis_client.rpop("queue:results") | |
| if result_bytes is None: | |
| # No results, sleep briefly | |
| await asyncio.sleep(0.01) | |
| continue | |
| result_dict = json.loads(result_bytes) | |
| candidate = CandidateResult(**result_dict) | |
| logger.debug(f"Received candidate {candidate.candidate_id[:8]}...") | |
| yield candidate | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to decode JSON: {e}") | |
| except Exception as e: | |
| logger.error(f"Error polling results: {e}") | |
| await asyncio.sleep(0.1) | |
| def evaluate_candidate(self, candidate: CandidateResult) -> float: | |
| """Score candidate 0..1. | |
| Deterministic placeholder based on input length and hash. | |
| In production, replace with LLM verification. | |
| Args: | |
| candidate: CandidateResult with code and summarized_state. | |
| Returns: | |
| float: score between 0 and 1. | |
| """ | |
| # Deterministic scoring based on candidate hash and length | |
| content = candidate.code + str(candidate.summarized_state.dict()) | |
| content_hash = hashlib.md5(content.encode()).hexdigest() | |
| # Use first 4 bytes of hash as integer | |
| hash_int = int(content_hash[:8], 16) | |
| # Combine with length factor | |
| length_factor = min(len(candidate.code) / 1000, 1.0) # normalize by 1k chars | |
| # Produce deterministic float 0..1 | |
| score = ((hash_int % 10000) / 10000) * 0.5 + length_factor * 0.5 | |
| score = min(max(score, 0.0), 1.0) | |
| logger.debug(f"Score for {candidate.candidate_id[:8]}: {score:.3f}") | |
| return score | |
| async def update_graph(self, candidate: CandidateResult, score: float) -> None: | |
| """Commit to Neo4j only if score >= threshold. | |
| Creates node for candidate and edges from parent states. | |
| Args: | |
| candidate: CandidateResult. | |
| score: float from evaluate_candidate. | |
| """ | |
| if score < self.threshold_score: | |
| logger.info(f"Candidate {candidate.candidate_id[:8]} score {score:.3f} < threshold {self.threshold_score}, skipping graph update.") | |
| return | |
| try: | |
| async with self.neo4j_driver.session() as session: | |
| # Create candidate node | |
| query = """ | |
| MERGE (c:Candidate {id: $candidate_id}) | |
| SET c.code = $code, | |
| c.score = $score, | |
| c.timestamp = datetime(), | |
| c.parents = $parent_ids | |
| """ | |
| params = { | |
| "candidate_id": candidate.candidate_id, | |
| "code": candidate.code, | |
| "score": score, | |
| "parent_ids": [p.state_id for p in candidate.summarized_state.parent_states] | |
| } | |
| await session.run(query, **params) | |
| # Create edges from parent states | |
| for parent in candidate.summarized_state.parent_states: | |
| edge_query = """ | |
| MATCH (p:State {id: $parent_id}) | |
| MATCH (c:Candidate {id: $candidate_id}) | |
| MERGE (p)-[:DERIVES]->(c) | |
| """ | |
| await session.run(edge_query, parent_id=parent.state_id, candidate_id=candidate.candidate_id) | |
| logger.info(f"Graph updated with candidate {candidate.candidate_id[:8]} (score {score:.3f})") | |
| except Exception as e: | |
| logger.error(f"Failed to update graph for {candidate.candidate_id}: {e}") | |
| async def run(self) -> None: | |
| """Main orchestrator loop.""" | |
| self.is_running = True | |
| logger.info("Orchestrator started.") | |
| try: | |
| async for candidate in self.poll_results(): | |
| score = self.evaluate_candidate(candidate) | |
| await self.update_graph(candidate, score) | |
| except asyncio.CancelledError: | |
| logger.info("Orchestrator shutting down.") | |
| finally: | |
| self.is_running = False | |
| await self.redis_client.close() | |
| await self.neo4j_driver.close() | |
| `, | |
| "src/models.py": `""" | |
| Data models (dataclasses) for Quantum Code Weaver Engine. | |
| """ | |
| from dataclasses import dataclass, field | |
| from typing import List, Optional, Dict, Any | |
| from datetime import datetime | |
| import uuid | |
| @dataclass | |
| class SummarizedState: | |
| """Summarized state passed to orchestrator (context economy). | |
| The orchestrator must not query Neo4j to build context; | |
| it receives this summarized state in the candidate payload. | |
| """ | |
| state_id: str | |
| parent_states: List['StateRef'] = field(default_factory=list) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| def dict(self) -> Dict[str, Any]: | |
| return { | |
| "state_id": self.state_id, | |
| "parent_states": [p.dict() for p in self.parent_states], | |
| "metadata": self.metadata | |
| } | |
| @dataclass | |
| class StateRef: | |
| """Reference to a parent state in the graph.""" | |
| state_id: str | |
| score: float | |
| def dict(self) -> Dict[str, Any]: | |
| return {"state_id": self.state_id, "score": self.score} | |
| @dataclass | |
| class TaskSpec: | |
| """Specification for a worker task. | |
| Published to queue:tasks. | |
| """ | |
| task_id: str = field(default_factory=lambda: str(uuid.uuid4())) | |
| summarized_state: SummarizedState = field(default_factory=lambda: SummarizedState(state_id="root")) | |
| max_tokens: int = 2048 | |
| temperature: float = 0.7 | |
| constraints: List[str] = field(default_factory=list) | |
| def dict(self) -> Dict[str, Any]: | |
| return { | |
| "task_id": self.task_id, | |
| "summarized_state": self.summarized_state.dict(), | |
| "max_tokens": self.max_tokens, | |
| "temperature": self.temperature, | |
| "constraints": self.constraints | |
| } | |
| @classmethod | |
| def from_dict(cls, data: Dict[str, Any]) -> 'TaskSpec': | |
| """Create TaskSpec from dictionary.""" | |
| summarized_state = SummarizedState( | |
| state_id=data["summarized_state"]["state_id"], | |
| parent_states=[ | |
| StateRef(**p) for p in data["summarized_state"].get("parent_states", []) | |
| ], | |
| metadata=data["summarized_state"].get("metadata", {}) | |
| ) | |
| return cls( | |
| task_id=data.get("task_id", str(uuid.uuid4())), | |
| summarized_state=summarized_state, | |
| max_tokens=data.get("max_tokens", 2048), | |
| temperature=data.get("temperature", 0.7), | |
| constraints=data.get("constraints", []) | |
| ) | |
| @dataclass | |
| class CandidateResult: | |
| """Result from a worker (candidate code). | |
| Published to queue:results. | |
| """ | |
| candidate_id: str = field(default_factory=lambda: str(uuid.uuid4())) | |
| task_id: str | |
| code: str | |
| summarized_state: SummarizedState | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| timestamp: datetime = field(default_factory=datetime.utcnow) | |
| def dict(self) -> Dict[str, Any]: | |
| return { | |
| "candidate_id": self.candidate_id, | |
| "task_id": self.task_id, | |
| "code": self.code, | |
| "summarized_state": self.summarized_state.dict(), | |
| "metadata": self.metadata, | |
| "timestamp": self.timestamp.isoformat() | |
| } | |
| @classmethod | |
| def from_dict(cls, data: Dict[str, Any]) -> 'CandidateResult': | |
| """Create CandidateResult from dictionary.""" | |
| summarized_state = SummarizedState( | |
| state_id=data["summarized_state"]["state_id"], | |
| parent_states=[ | |
| StateRef(**p) for p in data["summarized_state"].get("parent_states", []) | |
| ], | |
| metadata=data["summarized_state"].get("metadata", {}) | |
| ) | |
| return cls( | |
| candidate_id=data.get("candidate_id", str(uuid.uuid4())), | |
| task_id=data["task_id"], | |
| code=data["code"], | |
| summarized_state=summarized_state, | |
| metadata=data.get("metadata", {}), | |
| timestamp=datetime.fromisoformat(data.get("timestamp", datetime.utcnow().isoformat())) | |
| ) | |
| `, | |
| "src/storage_redis.py": `""" | |
| Redis storage and queue operations. | |
| """ | |
| import json | |
| import logging | |
| from typing import Optional, Dict, Any | |
| from redis.asyncio import Redis | |
| from .models import TaskSpec, CandidateResult | |
| from .config import settings | |
| logger = logging.getLogger(__name__) | |
| class RedisStorage: | |
| """Redis client wrapper for queue operations.""" | |
| def __init__( | |
| self, | |
| host: str = settings.REDIS_HOST, | |
| port: int = settings.REDIS_PORT, | |
| db: int = 0, | |
| ): | |
| self.client = Redis(host=host, port=port, db=db, decode_responses=False) | |
| async def push_task(self, task: TaskSpec) -> None: | |
| """Push TaskSpec to queue:tasks. | |
| Args: | |
| task: TaskSpec to enqueue. | |
| """ | |
| try: | |
| data = json.dumps(task.dict()).encode('utf-8') | |
| await self.client.lpush("queue:tasks", data) | |
| logger.debug(f"Task {task.task_id[:8]} pushed to queue:tasks") | |
| except Exception as e: | |
| logger.error(f"Failed to push task {task.task_id}: {e}") | |
| async def pop_task(self) -> Optional[TaskSpec]: | |
| """Pop TaskSpec from queue:tasks (RPOP). | |
| Returns: | |
| TaskSpec or None if queue empty. | |
| """ | |
| try: | |
| data = await self.client.rpop("queue:tasks") | |
| if data is None: | |
| return None | |
| task_dict = json.loads(data.decode('utf-8')) | |
| return TaskSpec.from_dict(task_dict) | |
| except Exception as e: | |
| logger.error(f"Failed to pop task: {e}") | |
| return None | |
| async def push_result(self, result: CandidateResult) -> None: | |
| """Push CandidateResult to queue:results. | |
| Args: | |
| result: CandidateResult to enqueue. | |
| """ | |
| try: | |
| data = json.dumps(result.dict()).encode('utf-8') | |
| await self.client.lpush("queue:results", data) | |
| logger.debug(f"Result {result.candidate_id[:8]} pushed to queue:results") | |
| except Exception as e: | |
| logger.error(f"Failed to push result {result.candidate_id}: {e}") | |
| async def pop_result(self) -> Optional[CandidateResult]: | |
| """Pop CandidateResult from queue:results (RPOP). | |
| Returns: | |
| CandidateResult or None if queue empty. | |
| """ | |
| try: | |
| data = await self.client.rpop("queue:results") | |
| if data is None: | |
| return None | |
| result_dict = json.loads(data.decode('utf-8')) | |
| return CandidateResult.from_dict(result_dict) | |
| except Exception as e: | |
| logger.error(f"Failed to pop result: {e}") | |
| return None | |
| async def get_queue_lengths(self) -> Dict[str, int]: | |
| """Get lengths of both queues. | |
| Returns: | |
| Dict with keys 'tasks' and 'results'. | |
| """ | |
| try: | |
| tasks_len = await self.client.llen("queue:tasks") | |
| results_len = await self.client.llen("queue:results") | |
| return {"tasks": tasks_len, "results": results_len} | |
| except Exception as e: | |
| logger.error(f"Failed to get queue lengths: {e}") | |
| return {"tasks": 0, "results": 0} | |
| async def close(self) -> None: | |
| """Close Redis connection.""" | |
| await self.client.close() | |
| logger.debug("Redis connection closed") | |
| `, | |
| "src/storage_neo4j.py": `""" | |
| Neo4j graph storage operations. | |
| """ | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| from neo4j import GraphDatabase, AsyncGraphDatabase | |
| from .models import SummarizedState, CandidateResult | |
| from .config import settings | |
| logger = logging.getLogger(__name__) | |
| class Neo4jStorage: | |
| """Neo4j client wrapper for graph operations.""" | |
| def __init__( | |
| self, | |
| uri: str = settings.NEO4J_URI, | |
| user: str = settings.NEO4J_USER, | |
| password: str = settings.NEO4J_PASSWORD, | |
| ): | |
| self.driver = GraphDatabase.driver(uri, auth=(user, password)) | |
| self.async_driver = AsyncGraphDatabase.driver(uri, auth=(user, password)) | |
| def create_state_node(self, state: SummarizedState, score: float) -> None: | |
| """Create a State node in Neo4j. | |
| Args: | |
| state: SummarizedState. | |
| score: verification score. | |
| """ | |
| with self.driver.session() as session: | |
| query = """ | |
| MERGE (s:State {id: $state_id}) | |
| SET s.score = $score, | |
| s.metadata = $metadata, | |
| s.created = datetime() | |
| """ | |
| session.run( | |
| query, | |
| state_id=state.state_id, | |
| score=score, | |
| metadata=state.metadata | |
| ) | |
| logger.debug(f"State node {state.state_id[:8]} created/updated") | |
| def create_candidate_node(self, candidate: CandidateResult, score: float) -> None: | |
| """Create a Candidate node in Neo4j. | |
| Args: | |
| candidate: CandidateResult. | |
| score: verification score. | |
| """ | |
| with self.driver.session() as session: | |
| query = """ | |
| MERGE (c:Candidate {id: $candidate_id}) | |
| SET c.code = $code, | |
| c.score = $score, | |
| c.task_id = $task_id, | |
| c.timestamp = datetime(), | |
| c.metadata = $metadata | |
| """ | |
| session.run( | |
| query, | |
| candidate_id=candidate.candidate_id, | |
| code=candidate.code, | |
| score=score, | |
| task_id=candidate.task_id, | |
| metadata=candidate.metadata | |
| ) | |
| logger.debug(f"Candidate node {candidate.candidate_id[:8]} created/updated") | |
| def create_derivation_edge( | |
| self, | |
| from_state_id: str, | |
| to_candidate_id: str, | |
| edge_type: str = "DERIVES" | |
| ) -> None: | |
| """Create edge from State to Candidate. | |
| Args: | |
| from_state_id: source State ID. | |
| to_candidate_id: target Candidate ID. | |
| edge_type: relationship type. | |
| """ | |
| with self.driver.session() as session: | |
| query = f""" | |
| MATCH (s:State {{id: $from_state_id}}) | |
| MATCH (c:Candidate {{id: $to_candidate_id}}) | |
| MERGE (s)-[:{edge_type}]->(c) | |
| """ | |
| session.run( | |
| query, | |
| from_state_id=from_state_id, | |
| to_candidate_id=to_candidate_id | |
| ) | |
| logger.debug(f"Edge {from_state_id[:8]} -> {to_candidate_id[:8]} created") | |
| def get_state(self, state_id: str) -> Optional[Dict[str, Any]]: | |
| """Retrieve a State node by ID. | |
| Args: | |
| state_id: State ID. | |
| Returns: | |
| State properties dict or None. | |
| """ | |
| with self.driver.session() as session: | |
| query = """ | |
| MATCH (s:State {id: $state_id}) | |
| RETURN properties(s) as props | |
| """ | |
| result = session.run(query, state_id=state_id) | |
| record = result.single() | |
| return record["props"] if record else None | |
| def get_candidate(self, candidate_id: str) -> Optional[Dict[str, Any]]: | |
| """Retrieve a Candidate node by ID. | |
| Args: | |
| candidate_id: Candidate ID. | |
| Returns: | |
| Candidate properties dict or None. | |
| """ | |
| with self.driver.session() as session: | |
| query = """ | |
| MATCH (c:Candidate {id: $candidate_id}) | |
| RETURN properties(c) as props | |
| """ | |
| result = session.run(query, candidate_id=candidate_id) | |
| record = result.single() | |
| return record["props"] if record else None | |
| def get_verification_graph_stats(self) -> Dict[str, Any]: | |
| """Get statistics about the verification graph. | |
| Returns: | |
| Dict with counts of nodes, edges, etc. | |
| """ | |
| with self.driver.session() as session: | |
| query = """ | |
| MATCH (s:State) | |
| WITH count(s) as state_count | |
| MATCH (c:Candidate) | |
| WITH state_count, count(c) as candidate_count | |
| MATCH ()-[r]->() | |
| RETURN state_count, candidate_count, count(r) as edge_count | |
| """ | |
| result = session.run(query) | |
| record = result.single() | |
| if record: | |
| return { | |
| "state_count": record["state_count"], | |
| "candidate_count": record["candidate_count"], | |
| "edge_count": record["edge_count"] | |
| } | |
| return {"state_count": 0, "candidate_count": 0, "edge_count": 0} | |
| async def close(self) -> None: | |
| """Close Neo4j driver.""" | |
| await self.driver.close() | |
| await self.async_driver.close() | |
| logger.debug("Neo4j connection closed") | |
| `, | |
| "src/config.py": `""" | |
| Configuration settings for Quantum Code Weaver Engine. | |
| """ | |
| import os | |
| from typing import Optional | |
| from pydantic import BaseSettings | |
| class Settings(BaseSettings): | |
| """Application settings loaded from environment or .env file.""" | |
| # Redis | |
| REDIS_HOST: str = "localhost" | |
| REDIS_PORT: int = 6379 | |
| REDIS_DB: int = 0 | |
| # Neo4j | |
| NEO4J_URI: str = "bolt://localhost:7687" | |
| NEO4J_USER: str = "neo4j" | |
| NEO4J_PASSWORD: str = "password" | |
| # Orchestrator | |
| THRESHOLD_SCORE: float = 0.8 | |
| POLL_INTERVAL: float = 0.01 # seconds | |
| # Worker | |
| WORKER_COUNT: int = 5 | |
| MAX_TOKENS: int = 2048 | |
| TEMPERATURE: float = 0.7 | |
| # Logging | |
| LOG_LEVEL: str = "INFO" | |
| LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| class Config: | |
| env_file = ".env" | |
| env_file_encoding = "utf-8" | |
| # Global settings instance | |
| settings = Settings() | |
| `, | |
| "src/main.py": `""" | |
| Main entry point for Quantum Code Weaver Engine. | |
| """ | |
| import asyncio | |
| import logging | |
| import signal | |
| import sys | |
| from contextlib import asynccontextmanager | |
| from .orchestrator import Orchestrator | |
| from .config import settings | |
| # Configure logging | |
| logging.basicConfig( | |
| level=settings.LOG_LEVEL, | |
| format=settings.LOG_FORMAT, | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler("qcwe.log") | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| @asynccontextmanager | |
| async def lifecycle(orchestrator: Orchestrator): | |
| """Context manager for orchestrator lifecycle. | |
| Handles graceful shutdown. | |
| """ | |
| try: | |
| # Start orchestrator task | |
| task = asyncio.create_task(orchestrator.run()) | |
| logger.info("Orchestrator task started.") | |
| yield | |
| finally: | |
| # Cancel task and wait | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| pass | |
| logger.info("Orchestrator shutdown complete.") | |
| def handle_shutdown(signum, frame): | |
| """Signal handler for graceful shutdown.""" | |
| logger.info(f"Received signal {signum}, initiating shutdown...") | |
| # The asyncio event loop will be stopped by the lifecycle context manager | |
| sys.exit(0) | |
| async def main(): | |
| """Main async entry point.""" | |
| logger.info("Starting Quantum Code Weaver Engine") | |
| logger.info(f"Threshold score: {settings.THRESHOLD_SCORE}") | |
| logger.info(f"Redis: {settings.REDIS_HOST}:{settings.REDIS_PORT}") | |
| logger.info(f"Neo4j: {settings.NEO4J_URI}") | |
| # Set up signal handlers | |
| signal.signal(signal.SIGINT, handle_shutdown) | |
| signal.signal(signal.SIGTERM, handle_shutdown) | |
| # Create orchestrator | |
| orchestrator = Orchestrator( | |
| redis_host=settings.REDIS_HOST, | |
| redis_port=settings.REDIS_PORT, | |
| neo4j_uri=settings.NEO4J_URI, | |
| neo4j_user=settings.NEO4J_USER, | |
| neo4j_password=settings.NEO4J_PASSWORD, | |
| threshold_score=settings.THRESHOLD_SCORE | |
| ) | |
| # Run with lifecycle management | |
| async with lifecycle(orchestrator): | |
| # Keep the main loop alive | |
| while True: | |
| await asyncio.sleep(1) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |
| `, | |
| "src/__init__.py": `""" | |
| Quantum Code Weaver Engine package. | |
| """ | |
| __version__ = "1.0.0" | |
| ` | |
| }; | |
| // ================================ | |
| // DOCUMENTATION SITE FUNCTIONALITY | |
| // ================================ | |
| // Initialize when DOM is ready | |
| function initDocumentationSite() { | |
| renderFileTree(); | |
| setupRouting(); | |
| setupCopyButton(); | |
| highlightCurrentFile(); | |
| updateFileInfo('README.md'); | |
| // Initialize syntax highlighting | |
| hljs.highlightAll(); | |
| } | |
| // Render the file tree in the sidebar | |
| function renderFileTree() { | |
| const container = document.getElementById('file-tree'); | |
| if (!container) return; | |
| container.innerHTML = ''; | |
| const fileGroups = {}; | |
| // Group files by directory | |
| Object.keys(FILES).forEach(path => { | |
| const parts = path.split('/'); | |
| if (parts.length === 1) { | |
| if (!fileGroups['root']) fileGroups['root'] = []; | |
| fileGroups['root'].push(path); | |
| } else { | |
| const dir = parts[0]; | |
| if (!fileGroups[dir]) fileGroups[dir] = []; | |
| fileGroups[dir].push(path); | |
| } | |
| }); | |
| // Render groups | |
| Object.keys(fileGroups).sort().forEach(dir => { | |
| if (dir !== 'root') { | |
| const folderItem = document.createElement('div'); | |
| folderItem.className = 'file-tree-item mb-1'; | |
| folderItem.innerHTML = ` | |
| <i data-feather="folder" class="file-tree-icon"></i> | |
| <span class="font-medium">${dir}</span> | |
| <span class="ml-auto text-gray-500 text-xs">${fileGroups[dir].length}</span> | |
| `; | |
| container.appendChild(folderItem); | |
| } | |
| fileGroups[dir].sort().forEach(filePath => { | |
| const item = document.createElement('div'); | |
| item.className = 'file-tree-item pl-8'; | |
| item.dataset.path = filePath; | |
| const icon = getFileIcon(filePath); | |
| item.innerHTML = ` | |
| <i data-feather="${icon}" class="file-tree-icon"></i> | |
| <span class="truncate">${filePath.split('/').pop()}</span> | |
| `; | |
| item.addEventListener('click', () => { | |
| openFile(filePath); | |
| }); | |
| container.appendChild(item); | |
| }); | |
| }); | |
| feather.replace(); | |
| } | |
| // Get appropriate Feather icon for file type | |
| function getFileIcon(path) { | |
| if (path.endsWith('.py')) return 'file-text'; | |
| if (path.endsWith('.md')) return 'book'; | |
| if (path.endsWith('.txt')) return 'file-text'; | |
| if (path.includes('src/')) return 'code'; | |
| return 'file'; | |
| } | |
| // Open a file and display its contents | |
| function openFile(path) { | |
| if (!FILES.hasOwnProperty(path)) { | |
| console.error(`File not found: ${path}`); | |
| return; | |
| } | |
| // Update active file in sidebar | |
| document.querySelectorAll('.file-tree-item').forEach(item => { | |
| item.classList.remove('active'); | |
| if (item.dataset.path === path) { | |
| item.classList.add('active'); | |
| } | |
| }); | |
| // Update breadcrumb and title | |
| document.getElementById('breadcrumb-path').textContent = path; | |
| document.getElementById('file-title').textContent = path.split('/').pop(); | |
| document.getElementById('file-path-display').textContent = path; | |
| // Update code content | |
| const codeElement = document.getElementById('code-content'); | |
| codeElement.textContent = FILES[path]; | |
| // Update syntax highlighting | |
| const language = getLanguageClass(path); | |
| codeElement.className = `language-${language} hljs`; | |
| hljs.highlightElement(codeElement); | |
| // Update file info | |
| updateFileInfo(path); | |
| // Update URL hash | |
| window.location.hash = `#/file/${encodeURIComponent(path)}`; | |
| } | |
| // Determine language class for syntax highlighting | |
| function getLanguageClass(path) { | |
| if (path.endsWith('.py')) return 'python'; | |
| if (path.endsWith('.md')) return 'markdown'; | |
| if (path.endsWith('.txt')) return 'plaintext'; | |
| if (path.endsWith('requirements.txt')) return 'bash'; | |
| return 'python'; | |
| } | |
| // Update file info panel (lines, size, etc.) | |
| function updateFileInfo(path) { | |
| const content = FILES[path]; | |
| const lines = content.split('\n').length; | |
| const size = new Blob([content]).size; | |
| document.getElementById('file-type').textContent = getFileType(path); | |
| document.getElementById('file-lines').textContent = lines.toLocaleString(); | |
| document.getElementById('file-size').textContent = formatFileSize(size); | |
| document.getElementById('file-updated').textContent = 'Today'; | |
| } | |
| // Get human-readable file type | |
| function getFileType(path) { | |
| if (path.endsWith('.py')) return 'Python'; | |
| if (path.endsWith('.md')) return 'Markdown'; | |
| if (path.endsWith('.txt')) return 'Text'; | |
| if (path.endsWith('requirements.txt')) return 'Requirements'; | |
| if (path.includes('__init__.py')) return 'Package'; | |
| return 'Text'; | |
| } | |
| // Format file size | |
| function formatFileSize(bytes) { | |
| if (bytes < 1024) return bytes + ' B'; | |
| if (bytes < 1024 * 1024) return (bytes / 1024).toFixed(1) + ' KB'; | |
| return (bytes / (1024 * 1024)).toFixed(1) + ' MB'; | |
| } | |
| // Setup hash-based routing | |
| function setupRouting() { | |
| // Check initial hash | |
| if (window.location.hash) { | |
| const match = window.location.hash.match(/#\/file\/(.+)/); | |
| if (match) { | |
| const path = decodeURIComponent(match[1]); | |
| if (FILES.hasOwnProperty(path)) { | |
| openFile(path); | |
| } | |
| } | |
| } else { | |
| // Default to README.md | |
| openFile('README.md'); | |
| } | |
| // Listen for hash changes | |
| window.addEventListener('hashchange', () => { | |
| const match = window.location.hash.match(/#\/file\/(.+)/); | |
| if (match) { | |
| const path = decodeURIComponent(match[1]); | |
| if (FILES.hasOwnProperty(path)) { | |
| openFile(path); | |
| } | |
| } | |
| }); | |
| } | |
| // Setup copy button functionality | |
| function setupCopyButton() { | |
| const button = document.getElementById('copy-button'); | |
| if (!button) return; | |
| button.addEventListener('click', () => { | |
| const path = document.getElementById('breadcrumb-path').textContent; | |
| const content = FILES[path]; | |
| navigator.clipboard.writeText(content).then(() => { | |
| const originalText = button.innerHTML; | |
| button.innerHTML = '<i data-feather="check" class="w-4 h-4"></i><span>Copied!</span>'; | |
| button.classList.add('copy-success'); | |
| setTimeout(() => { | |
| button.innerHTML = originalText; | |
| button.classList.remove('copy-success'); | |
| feather.replace(); | |
| }, 2000); | |
| }).catch(err => { | |
| console.error('Failed to copy: ', err); | |
| button.innerHTML = '<i data-feather="x" class="w-4 h-4"></i><span>Failed</span>'; | |
| setTimeout(() => { | |
| button.innerHTML = '<i data-feather="copy" class="w-4 h-4"></i><span>Copy File</span>'; | |
| feather.replace(); | |
| }, 2000); | |
| }); | |
| }); | |
| } | |
| // Highlight current file in sidebar | |
| function highlightCurrentFile() { | |
| const path = document.getElementById('breadcrumb-path').textContent; | |
| document.querySelectorAll('.file-tree-item').forEach(item => { | |
| item.classList.remove('active'); | |
| if (item.dataset.path === path) { | |
| item.classList.add('active'); | |
| } | |
| }); | |
| } | |
| // Expose function globally | |
| window.initDocumentationSite = initDocumentationSite; |