Spaces:
Sleeping
Sleeping
| """ | |
| URL Frontier implementation for web crawler | |
| The URL Frontier maintains URLs to be crawled with two main goals: | |
| 1. Prioritization - Important URLs are crawled first | |
| 2. Politeness - Avoid overloading web servers with too many requests | |
| """ | |
| import time | |
| import logging | |
| import heapq | |
| import pickle | |
| import threading | |
| import random | |
| from typing import Dict, List, Tuple, Optional, Any, Set | |
| from collections import deque | |
| import redis | |
| from redis.exceptions import RedisError | |
| import mmh3 | |
| import os | |
| import json | |
| from models import URL, Priority, URLStatus | |
| import config | |
| # Import local configuration if available | |
| try: | |
| import local_config | |
| # Override config settings with local settings | |
| for key in dir(local_config): | |
| if key.isupper(): | |
| setattr(config, key, getattr(local_config, key)) | |
| logging.info("Loaded local configuration") | |
| except ImportError: | |
| pass | |
| # Configure logging | |
| logging.basicConfig( | |
| level=getattr(logging, config.LOG_LEVEL), | |
| format=config.LOG_FORMAT | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class URLFrontier: | |
| """ | |
| URL Frontier implementation with prioritization and politeness | |
| Architecture: | |
| - Front queues: Priority-based queues | |
| - Back queues: Host-based queues for politeness | |
| This uses Redis for persistent storage to handle large number of URLs | |
| and enable distributed crawling. In deployment mode, it can also use | |
| in-memory storage. | |
| """ | |
| def __init__(self, redis_client: Optional[redis.Redis] = None, use_memory: bool = False): | |
| """Initialize the URL Frontier""" | |
| self.use_memory = use_memory | |
| if use_memory: | |
| # Initialize in-memory storage | |
| self.memory_storage = { | |
| 'seen_urls': set(), | |
| 'priority_queues': [[] for _ in range(config.PRIORITY_QUEUE_NUM)], | |
| 'host_queues': [[] for _ in range(config.HOST_QUEUE_NUM)] | |
| } | |
| else: | |
| # Use Redis | |
| self.redis = redis_client or redis.from_url(config.REDIS_URI) | |
| self.priority_count = config.PRIORITY_QUEUE_NUM # Number of priority queues | |
| self.host_count = config.HOST_QUEUE_NUM # Number of host queues | |
| self.url_seen_key = "webcrawler:url_seen" # Bloom filter for seen URLs | |
| self.priority_queue_key_prefix = "webcrawler:priority_queue:" | |
| self.host_queue_key_prefix = "webcrawler:host_queue:" | |
| self.lock = threading.RLock() # Thread-safe operations | |
| # Simple mode uses Redis Set instead of Bloom filter | |
| self.use_simple_mode = getattr(config, 'USE_SIMPLE_URL_SEEN', False) | |
| logger.info(f"URLFrontier using simple mode: {self.use_simple_mode}") | |
| # Ensure directory for checkpoint exists | |
| if not os.path.exists(config.STORAGE_PATH): | |
| os.makedirs(config.STORAGE_PATH) | |
| # Initialize URL seen storage | |
| if not self.use_memory: | |
| self._init_url_seen() | |
| def _init_url_seen(self): | |
| """Initialize URL seen storage based on configuration""" | |
| try: | |
| # If using simple mode, just use a Redis set | |
| if self.use_simple_mode: | |
| if not self.redis.exists(self.url_seen_key): | |
| logger.info("Initializing URL seen set") | |
| self.redis.sadd(self.url_seen_key, "initialized") | |
| return | |
| # Try to use Bloom filter | |
| if not self.redis.exists(self.url_seen_key): | |
| logger.info("Initializing URL seen bloom filter") | |
| try: | |
| # Use a bloom filter with 100 million items and 0.01 false positive rate | |
| # This requires approximately 119.5 MB of memory | |
| self.redis.execute_command("BF.RESERVE", self.url_seen_key, 0.01, 100000000) | |
| except RedisError as e: | |
| logger.error(f"Failed to initialize bloom filter: {e}") | |
| logger.info("Falling back to simple set for URL seen detection") | |
| self.use_simple_mode = True | |
| # Initialize a set instead | |
| if not self.redis.exists(self.url_seen_key): | |
| self.redis.sadd(self.url_seen_key, "initialized") | |
| except RedisError as e: | |
| logger.error(f"Error initializing URL seen: {e}") | |
| # Fallback to set if bloom filter is not available | |
| self.use_simple_mode = True | |
| if not self.redis.exists(self.url_seen_key): | |
| self.redis.sadd(self.url_seen_key, "initialized") | |
| def add_url(self, url_obj: URL) -> bool: | |
| """Add a URL to the frontier""" | |
| with self.lock: | |
| url = url_obj.url | |
| # Check if URL has been seen | |
| if self.use_memory: | |
| if url in self.memory_storage['seen_urls']: | |
| return False | |
| self.memory_storage['seen_urls'].add(url) | |
| else: | |
| if self.use_simple_mode: | |
| if self.redis.sismember(self.url_seen_key, url): | |
| return False | |
| self.redis.sadd(self.url_seen_key, url) | |
| else: | |
| if self._check_url_seen(url): | |
| return False | |
| self._mark_url_seen(url) | |
| # Add to priority queue | |
| priority_index = url_obj.priority.value % self.priority_count | |
| if self.use_memory: | |
| self.memory_storage['priority_queues'][priority_index].append(url_obj) | |
| else: | |
| priority_key = f"{self.priority_queue_key_prefix}{priority_index}" | |
| self.redis.rpush(priority_key, url_obj.json()) | |
| return True | |
| def get_next_url(self) -> Optional[URL]: | |
| """Get the next URL to crawl""" | |
| with self.lock: | |
| # Try each priority queue | |
| for i in range(self.priority_count): | |
| if self.use_memory: | |
| queue = self.memory_storage['priority_queues'][i] | |
| if queue: | |
| return queue.pop(0) | |
| else: | |
| priority_key = f"{self.priority_queue_key_prefix}{i}" | |
| url_data = self.redis.lpop(priority_key) | |
| if url_data: | |
| return URL.parse_raw(url_data) | |
| return None | |
| def _check_url_seen(self, url: str) -> bool: | |
| """Check if URL has been seen""" | |
| if self.use_memory: | |
| return url in self.memory_storage['seen_urls'] | |
| elif self.use_simple_mode: | |
| return self.redis.sismember(self.url_seen_key, url) | |
| else: | |
| # Using Redis Bloom filter | |
| return bool(self.redis.getbit(self.url_seen_key, self._hash_url(url))) | |
| def _mark_url_seen(self, url: str) -> None: | |
| """Mark URL as seen""" | |
| if self.use_memory: | |
| self.memory_storage['seen_urls'].add(url) | |
| elif self.use_simple_mode: | |
| self.redis.sadd(self.url_seen_key, url) | |
| else: | |
| # Using Redis Bloom filter | |
| self.redis.setbit(self.url_seen_key, self._hash_url(url), 1) | |
| def _hash_url(self, url: str) -> int: | |
| """Hash URL for Bloom filter""" | |
| return hash(url) % (1 << 32) # 32-bit hash | |
| def size(self) -> int: | |
| """Get the total size of all queues""" | |
| if self.use_memory: | |
| return sum(len(q) for q in self.memory_storage['priority_queues']) | |
| else: | |
| total = 0 | |
| for i in range(self.priority_count): | |
| priority_key = f"{self.priority_queue_key_prefix}{i}" | |
| total += self.redis.llen(priority_key) | |
| return total | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get frontier statistics""" | |
| stats = { | |
| "size": self.size(), | |
| "priority_queues": {}, | |
| "host_queues": {}, | |
| } | |
| try: | |
| # Get priority queue stats | |
| for priority in range(1, self.priority_count + 1): | |
| queue_key = f"{self.priority_queue_key_prefix}{priority}" | |
| stats["priority_queues"][f"priority_{priority}"] = self.redis.llen(queue_key) | |
| # Get host queue stats (just count total host queues with items) | |
| host_queue_count = 0 | |
| for host_id in range(self.host_count): | |
| queue_key = f"{self.host_queue_key_prefix}{host_id}" | |
| if self.redis.llen(queue_key) > 0: | |
| host_queue_count += 1 | |
| stats["host_queues"]["count_with_items"] = host_queue_count | |
| # Add URLs seen count if using simple mode | |
| if self.use_simple_mode: | |
| stats["urls_seen"] = self.redis.scard(self.url_seen_key) | |
| return stats | |
| except RedisError as e: | |
| logger.error(f"Error getting frontier stats: {e}") | |
| return stats | |
| def checkpoint(self) -> bool: | |
| """Save frontier state""" | |
| if self.use_memory: | |
| # No need to checkpoint in-memory storage | |
| return True | |
| try: | |
| # Save priority queues | |
| for i in range(self.priority_count): | |
| priority_key = f"{self.priority_queue_key_prefix}{i}" | |
| queue_data = [] | |
| while True: | |
| url_data = self.redis.lpop(priority_key) | |
| if not url_data: | |
| break | |
| queue_data.append(url_data) | |
| # Save to file | |
| checkpoint_file = os.path.join(config.STORAGE_PATH, f"priority_queue_{i}.json") | |
| with open(checkpoint_file, 'w') as f: | |
| json.dump(queue_data, f) | |
| # Restore queue | |
| for url_data in reversed(queue_data): | |
| self.redis.rpush(priority_key, url_data) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error creating frontier checkpoint: {e}") | |
| return False | |
| def restore(self) -> bool: | |
| """Restore frontier state""" | |
| if self.use_memory: | |
| # No need to restore in-memory storage | |
| return True | |
| try: | |
| # Restore priority queues | |
| for i in range(self.priority_count): | |
| checkpoint_file = os.path.join(config.STORAGE_PATH, f"priority_queue_{i}.json") | |
| if os.path.exists(checkpoint_file): | |
| with open(checkpoint_file, 'r') as f: | |
| queue_data = json.load(f) | |
| # Clear existing queue | |
| priority_key = f"{self.priority_queue_key_prefix}{i}" | |
| self.redis.delete(priority_key) | |
| # Restore queue | |
| for url_data in queue_data: | |
| self.redis.rpush(priority_key, url_data) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error restoring frontier checkpoint: {e}") | |
| return False | |
| def clear(self) -> bool: | |
| """ | |
| Clear all queues in the frontier | |
| Returns: | |
| bool: True if successful, False otherwise | |
| """ | |
| try: | |
| # Delete all queue keys | |
| keys = [] | |
| for priority in range(1, self.priority_count + 1): | |
| keys.append(f"{self.priority_queue_key_prefix}{priority}") | |
| for host_id in range(self.host_count): | |
| keys.append(f"{self.host_queue_key_prefix}{host_id}") | |
| if keys: | |
| self.redis.delete(*keys) | |
| # Reset URL seen filter (optional) | |
| self.redis.delete(self.url_seen_key) | |
| logger.info("Frontier cleared") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error clearing frontier: {e}") | |
| return False |