Spaces:
Sleeping
Sleeping
| """ | |
| Robots.txt handler for web crawler | |
| """ | |
| import time | |
| import logging | |
| import requests | |
| from urllib.parse import urlparse, urljoin | |
| from typing import Dict, Optional, Tuple | |
| import tldextract | |
| from datetime import datetime, timedelta | |
| from cachetools import TTLCache | |
| import robotexclusionrulesparser | |
| from models import RobotsInfo | |
| import config | |
| # Import local configuration if available | |
| try: | |
| import local_config | |
| # Override config settings with local settings | |
| for key in dir(local_config): | |
| if key.isupper(): | |
| setattr(config, key, getattr(local_config, key)) | |
| logging.info("Loaded local configuration") | |
| except ImportError: | |
| pass | |
| # Configure logging | |
| logging.basicConfig( | |
| level=getattr(logging, config.LOG_LEVEL), | |
| format=config.LOG_FORMAT | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class RobotsHandler: | |
| """Handles robots.txt fetching and parsing""" | |
| def __init__(self, user_agent: Optional[str] = None, cache_size: int = 1000, cache_ttl: int = 3600): | |
| """ | |
| Initialize robots handler | |
| Args: | |
| user_agent: User agent to use when fetching robots.txt | |
| cache_size: Maximum number of robots.txt rules to cache | |
| cache_ttl: Time to live for cache entries in seconds | |
| """ | |
| self.user_agent = user_agent or config.USER_AGENT | |
| self.parser = robotexclusionrulesparser.RobotExclusionRulesParser() | |
| # Cache of robots.txt rules for domains | |
| self.robots_cache = TTLCache(maxsize=cache_size, ttl=cache_ttl) | |
| # Create request session | |
| self.session = requests.Session() | |
| self.session.headers.update({'User-Agent': self.user_agent}) | |
| def can_fetch(self, url: str) -> Tuple[bool, Optional[float]]: | |
| """ | |
| Check if URL can be fetched according to robots.txt | |
| Args: | |
| url: URL to check | |
| Returns: | |
| Tuple of (can_fetch, crawl_delay), where crawl_delay is in seconds | |
| """ | |
| try: | |
| parsed = urlparse(url) | |
| base_url = f"{parsed.scheme}://{parsed.netloc}" | |
| domain = self._get_domain(url) | |
| # Check if robots info is in cache | |
| robots_info = self._get_robots_info(base_url, domain) | |
| # Check if allowed | |
| path = parsed.path or "/" | |
| allowed = robots_info.allowed | |
| if allowed: | |
| allowed = self.parser.is_allowed(self.user_agent, path) | |
| # Get crawl delay | |
| crawl_delay = robots_info.crawl_delay | |
| if not crawl_delay and hasattr(self.parser, 'get_crawl_delay'): | |
| try: | |
| crawl_delay = float(self.parser.get_crawl_delay(self.user_agent) or 0) | |
| except: | |
| crawl_delay = 0 | |
| return allowed, crawl_delay | |
| except Exception as e: | |
| logger.warning(f"Error checking robots.txt for {url}: {e}") | |
| # In case of error, assume allowed | |
| return True, None | |
| def _get_robots_info(self, base_url: str, domain: str) -> RobotsInfo: | |
| """ | |
| Get robots.txt info for a domain | |
| Args: | |
| base_url: Base URL of the domain | |
| domain: Domain name | |
| Returns: | |
| RobotsInfo object | |
| """ | |
| # Check if in cache | |
| if domain in self.robots_cache: | |
| return self.robots_cache[domain] | |
| # Fetch robots.txt | |
| robots_url = urljoin(base_url, "/robots.txt") | |
| try: | |
| response = self.session.get( | |
| robots_url, | |
| timeout=config.CRAWL_TIMEOUT, | |
| allow_redirects=True | |
| ) | |
| status_code = response.status_code | |
| # If robots.txt exists | |
| if status_code == 200: | |
| # Parse robots.txt | |
| self.parser.parse(response.text) | |
| # Create simpler user agents info that doesn't depend on get_user_agents | |
| user_agents = {} | |
| # Just store info for our specific user agent | |
| crawl_delay = None | |
| if hasattr(self.parser, 'get_crawl_delay'): | |
| try: | |
| crawl_delay = self.parser.get_crawl_delay(self.user_agent) | |
| except: | |
| crawl_delay = None | |
| user_agents[self.user_agent] = { | |
| 'crawl_delay': crawl_delay | |
| } | |
| # Create robots info | |
| robots_info = RobotsInfo( | |
| domain=domain, | |
| allowed=True, | |
| crawl_delay=crawl_delay, | |
| last_fetched=datetime.now(), | |
| user_agents=user_agents, | |
| status_code=status_code | |
| ) | |
| else: | |
| # If no robots.txt or error, assume allowed | |
| self.parser.parse("") # Parse empty robots.txt | |
| robots_info = RobotsInfo( | |
| domain=domain, | |
| allowed=True, | |
| crawl_delay=None, | |
| last_fetched=datetime.now(), | |
| user_agents={}, | |
| status_code=status_code | |
| ) | |
| # Cache robots info | |
| self.robots_cache[domain] = robots_info | |
| return robots_info | |
| except requests.RequestException as e: | |
| logger.warning(f"Error fetching robots.txt from {robots_url}: {e}") | |
| # In case of error, assume allowed | |
| self.parser.parse("") # Parse empty robots.txt | |
| robots_info = RobotsInfo( | |
| domain=domain, | |
| allowed=True, | |
| crawl_delay=None, | |
| last_fetched=datetime.now(), | |
| user_agents={}, | |
| status_code=None | |
| ) | |
| # Cache robots info | |
| self.robots_cache[domain] = robots_info | |
| return robots_info | |
| def _get_domain(self, url: str) -> str: | |
| """Extract domain from URL""" | |
| parsed = tldextract.extract(url) | |
| return f"{parsed.domain}.{parsed.suffix}" if parsed.suffix else parsed.domain | |
| def clear_cache(self) -> None: | |
| """Clear the robots.txt cache""" | |
| self.robots_cache.clear() | |
| def update_cache(self, domain: str) -> None: | |
| """ | |
| Force update of a domain's robots.txt in the cache | |
| Args: | |
| domain: Domain to update | |
| """ | |
| if domain in self.robots_cache: | |
| del self.robots_cache[domain] |