Spaces:
Sleeping
Sleeping
| # worker.py | |
| import os | |
| import asyncio | |
| import psutil | |
| from urllib.parse import urlparse, urlunparse | |
| from typing import List | |
| from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode | |
| from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator | |
| from crawl4ai.content_filter_strategy import PruningContentFilter | |
| import traceback | |
| # ------------------------------ | |
| # File paths & config | |
| # ------------------------------ | |
| __location__ = os.path.dirname(os.path.abspath(__file__)) | |
| batch = 32 # max concurrent crawls | |
| goto_timeout = 60_000 # 1 minutes | |
| # ------------------------------ | |
| # Utility functions | |
| # ------------------------------ | |
| def normalize_url(url: str) -> str: | |
| """Normalize URL to avoid duplicates.""" | |
| parsed = urlparse(url) | |
| return urlunparse((parsed.scheme, parsed.netloc, parsed.path, '', '', '')) | |
| async def get_internal_urls(url_set: set, visited: set, crawler) -> set: | |
| """Collect internal links from a page.""" | |
| internal_urls = crawler.links.get("internal", []) | |
| for link in internal_urls: | |
| href = link.get("href") | |
| if href and href.startswith("http"): | |
| normalized_href = normalize_url(href) | |
| if normalized_href not in visited: | |
| url_set.add(normalized_href) | |
| return url_set | |
| # ------------------------------ | |
| # Core crawling function | |
| # ------------------------------ | |
| async def crawl_parallel(urls: List[str] | str, file_path: str, max_concurrent: int = batch): | |
| """Crawl multiple URLs asynchronously with retries, save pages, and track failures.""" | |
| text_pages = set() | |
| not_visited = set(urls if isinstance(urls, list) else [urls]) | |
| visited = set() | |
| retry = set() | |
| failed = set() | |
| was_str = isinstance(urls, str) | |
| n = 1 | |
| os.makedirs(file_path, exist_ok=True) | |
| process = psutil.Process() | |
| peak_memory = 0 | |
| def log_memory(prefix: str = ""): | |
| nonlocal peak_memory | |
| current_mem = process.memory_info().rss | |
| peak_memory = max(peak_memory, current_mem) | |
| print(f"{prefix} Memory: {current_mem // (1024*1024)} MB | Peak: {peak_memory // (1024*1024)} MB") | |
| # Browser & crawler config | |
| browser_config = BrowserConfig( | |
| headless=True, | |
| verbose=False, | |
| extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"], | |
| text_mode=True | |
| ) | |
| crawl_config = CrawlerRunConfig( | |
| cache_mode=CacheMode.BYPASS, | |
| markdown_generator=DefaultMarkdownGenerator( | |
| content_filter=PruningContentFilter(threshold=0.6), | |
| options={"ignore_links": True} | |
| ), | |
| page_timeout=goto_timeout | |
| ) | |
| crawler = AsyncWebCrawler(config=browser_config) | |
| await crawler.start() | |
| print("\n=== Starting robust parallel crawling ===") | |
| async def safe_crawl(url, session_id): | |
| """Crawl a URL safely, return result or None.""" | |
| try: | |
| result = await crawler.arun(url=url, config=crawl_config, session_id=session_id) | |
| return result | |
| except Exception as e: | |
| print(f"[WARN] Failed to crawl {url}: {e}") | |
| return None | |
| try: | |
| while not_visited: | |
| urls_batch = list(not_visited)[:max_concurrent] | |
| tasks = [safe_crawl(url, f"session_{i}") for i, url in enumerate(urls_batch)] | |
| log_memory(prefix=f"Before batch {n}: ") | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| log_memory(prefix=f"After batch {n}: ") | |
| for url, result in zip(urls_batch, results): | |
| if isinstance(result, Exception) or result is None or not getattr(result, "success", False): | |
| if url not in retry: | |
| retry.add(url) | |
| print(f"[INFO] Retry scheduled for {url}") | |
| else: | |
| failed.add(url) | |
| not_visited.discard(url) | |
| visited.add(url) | |
| print(f"[ERROR] Crawling failed for {url} after retry") | |
| else: | |
| text_pages.add(result.markdown.fit_markdown) | |
| if was_str: | |
| internal_urls = result.links.get("internal", []) | |
| for link in internal_urls: | |
| href = link.get("href") | |
| if href and href.startswith("http"): | |
| normalized_href = normalize_url(href) | |
| if normalized_href not in visited: | |
| not_visited.add(normalized_href) | |
| visited.add(url) | |
| retry.discard(url) | |
| not_visited.discard(url) | |
| n += 1 | |
| except Exception as e: | |
| traceback.print_exc() | |
| print(e) | |
| finally: | |
| await crawler.close() | |
| log_memory(prefix="Final: ") | |
| # Save pages | |
| pages = [p for p in text_pages if p.strip()] | |
| for i, page in enumerate(pages): | |
| with open(os.path.join(file_path, f"page_{i+1}.txt"), "w", encoding="utf-8") as f: | |
| f.write(page) | |
| print(f"\nSummary:") | |
| print(f" - Successfully crawled pages: {len(pages)}") | |
| print(f" - Failed URLs: {len(failed)} -> {failed}") | |
| print(f"Peak memory usage: {peak_memory // (1024*1024)} MB") | |
| return { | |
| "success_count": len(pages), | |
| "failed_urls": list(failed), | |
| "peak_memory_MB": peak_memory // (1024*1024) | |
| } | |
| # ------------------------------ | |
| # Public scrape function | |
| # ------------------------------ | |
| async def scrape_website(urls: str | list, file_path: str): | |
| """Wrapper to start crawling and return summary.""" | |
| summary = await crawl_parallel(urls, file_path, max_concurrent=batch) | |
| return summary | |