Spaces:
Sleeping
Sleeping
| """ | |
| Data models for the web crawler | |
| """ | |
| import time | |
| import hashlib | |
| import tldextract | |
| from urllib.parse import urlparse, urljoin, urlunparse | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional, Set, Tuple | |
| from pydantic import BaseModel, Field, HttpUrl, field_validator | |
| from enum import Enum | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class URLStatus(str, Enum): | |
| """Status of a URL in the crawl process""" | |
| PENDING = "pending" # Not yet processed | |
| IN_PROGRESS = "in_progress" # Currently being processed | |
| COMPLETED = "completed" # Successfully processed | |
| FAILED = "failed" # Failed to process | |
| FILTERED = "filtered" # Filtered out based on rules | |
| ROBOTSTXT_EXCLUDED = "robotstxt_excluded" # Excluded by robots.txt | |
| class Priority(int, Enum): | |
| """Priority levels for URLs""" | |
| VERY_HIGH = 1 | |
| HIGH = 2 | |
| MEDIUM = 3 | |
| LOW = 4 | |
| VERY_LOW = 5 | |
| class URL(BaseModel): | |
| """URL model with metadata for crawling""" | |
| url: str | |
| normalized_url: str = "" # Normalized version of the URL | |
| domain: str = "" # Domain extracted from the URL | |
| depth: int = 0 # Depth from seed URL | |
| discovered_at: datetime = Field(default_factory=datetime.now) | |
| last_crawled: Optional[datetime] = None | |
| completed_at: Optional[datetime] = None # When the URL was completed/failed | |
| status: URLStatus = URLStatus.PENDING | |
| priority: Priority = Priority.MEDIUM | |
| parent_url: Optional[str] = None # URL that led to this URL | |
| retries: int = 0 # Number of times retried | |
| error: Optional[str] = None # Error message if failed | |
| metadata: Dict[str, Any] = Field(default_factory=dict) # Additional metadata | |
| def set_normalized_url(cls, v, values): | |
| """Normalize the URL if not already set""" | |
| if not v and "url" in values: | |
| return normalize_url(values["url"]) | |
| return v | |
| def set_domain(cls, v, values): | |
| """Extract domain from URL if not already set""" | |
| if not v and "url" in values: | |
| parsed = tldextract.extract(values["url"]) | |
| return f"{parsed.domain}.{parsed.suffix}" if parsed.suffix else parsed.domain | |
| return v | |
| class Config: | |
| arbitrary_types_allowed = True | |
| class RobotsInfo(BaseModel): | |
| """Information from robots.txt for a domain""" | |
| domain: str | |
| allowed: bool = True # Whether crawling is allowed | |
| crawl_delay: Optional[float] = None # Crawl delay in seconds | |
| last_fetched: datetime = Field(default_factory=datetime.now) | |
| user_agents: Dict[str, Dict[str, Any]] = Field(default_factory=dict) # Info per user agent | |
| status_code: Optional[int] = None # HTTP status code when fetching robots.txt | |
| class Config: | |
| arbitrary_types_allowed = True | |
| class Page(BaseModel): | |
| """Web page model with content and metadata""" | |
| url: str | |
| status_code: int | |
| content: str # HTML content | |
| content_type: str | |
| content_length: int | |
| content_hash: str # Hash of the content for duplicate detection | |
| headers: Dict[str, str] = Field(default_factory=dict) | |
| links: List[str] = Field(default_factory=list) # Links extracted from the page | |
| crawled_at: datetime = Field(default_factory=datetime.now) | |
| redirect_url: Optional[str] = None # URL after redirects | |
| elapsed_time: float = 0.0 # Time taken to fetch the page | |
| is_duplicate: bool = False # Whether this is duplicate content | |
| metadata: Dict[str, Any] = Field(default_factory=dict) # Additional metadata | |
| class Config: | |
| arbitrary_types_allowed = True | |
| class DomainStats(BaseModel): | |
| """Statistics for a domain""" | |
| domain: str | |
| pages_crawled: int = 0 | |
| successful_crawls: int = 0 | |
| failed_crawls: int = 0 | |
| last_crawled: Optional[datetime] = None | |
| robots_info: Optional[RobotsInfo] = None | |
| crawl_times: List[float] = Field(default_factory=list) # Recent crawl times | |
| errors: Dict[int, int] = Field(default_factory=dict) # Status code counts for errors | |
| class Config: | |
| arbitrary_types_allowed = True | |
| def normalize_url(url: str) -> str: | |
| """ | |
| Normalize a URL by: | |
| 1. Converting to lowercase | |
| 2. Removing fragments | |
| 3. Removing default ports | |
| 4. Sorting query parameters | |
| 5. Removing trailing slashes | |
| 6. Adding scheme if missing | |
| """ | |
| try: | |
| # Parse URL | |
| parsed = urlparse(url) | |
| # Add scheme if missing | |
| if not parsed.scheme: | |
| url = 'http://' + url | |
| parsed = urlparse(url) | |
| # Get domain and path | |
| domain = parsed.netloc.lower() | |
| path = parsed.path | |
| # Remove default ports | |
| if ':' in domain: | |
| domain_parts = domain.split(':') | |
| if (parsed.scheme == 'http' and domain_parts[1] == '80') or \ | |
| (parsed.scheme == 'https' and domain_parts[1] == '443'): | |
| domain = domain_parts[0] | |
| # Sort query parameters | |
| query = parsed.query | |
| if query: | |
| query_params = sorted(query.split('&')) | |
| query = '&'.join(query_params) | |
| # Remove trailing slashes from path | |
| while path.endswith('/') and len(path) > 1: | |
| path = path[:-1] | |
| # Add leading slash if missing | |
| if not path: | |
| path = '/' | |
| # Reconstruct URL | |
| normalized = f"{parsed.scheme}://{domain}{path}" | |
| if query: | |
| normalized += f"?{query}" | |
| logger.debug(f"Normalized URL: {url} -> {normalized}") | |
| return normalized | |
| except Exception as e: | |
| logger.error(f"Error normalizing URL {url}: {e}") | |
| return url | |
| def calculate_content_hash(content: str) -> str: | |
| """Calculate hash of content for duplicate detection""" | |
| return hashlib.md5(content.encode('utf-8')).hexdigest() |