Spaces:
Sleeping
Sleeping
| from contextlib import asynccontextmanager | |
| from typing import Optional | |
| from duckduckgo_search import DDGS | |
| import httpx | |
| from pydantic import BaseModel, Field | |
| from playwright.async_api import Browser, BrowserContext, Page, TimeoutError | |
| from urllib.parse import quote_plus | |
| import logging | |
| import re | |
| from lxml import etree | |
| from asyncio import Semaphore | |
| # Concurrency limit for Playwright browser contexts. | |
| # This is to prevent too many concurrent browser contexts from being created, | |
| PLAYWRIGHT_CONCURRENCY_LIMIT = 10 | |
| class SerpQuery(BaseModel): | |
| queries: list[str] = Field(..., | |
| description="The list of queries to search for") | |
| n_results: int = Field( | |
| 10, description="Number of results to return for each query. Valid values are 10, 25, 50 and 100") | |
| class SerpResults(BaseModel): | |
| """Model for SERP scrapping results""" | |
| error: Optional[str] | |
| results: Optional[list[dict]] | |
| class BraveSearchBlockedException(Exception): | |
| """Dummy exception to detect when the headless browser is flagged as suspicious.""" | |
| def __init__(self, *args): | |
| super().__init__("Brave Search blocked the request, likely due to flagging browser as suspicious") | |
| pass | |
| _PLAYWRIGHT_CONCURRENCY_SEMAPHORE = Semaphore(PLAYWRIGHT_CONCURRENCY_LIMIT) | |
| async def playwright_open_page(browser: Browser): | |
| """Context manager for playwright pages""" | |
| async with _PLAYWRIGHT_CONCURRENCY_SEMAPHORE: | |
| context: BrowserContext = await browser.new_context() | |
| page: Page = await context.new_page() | |
| try: | |
| yield page | |
| finally: | |
| await page.close() | |
| await context.close() | |
| async def query_google_scholar(browser: Browser, q: str, n_results: int = 10): | |
| """Queries google scholar for the specified query and number of results. Returns relevant papers""" | |
| async with playwright_open_page(browser) as page: | |
| async def _block_resources(route, request): | |
| if request.resource_type in ["stylesheet", "image"]: | |
| await route.abort() | |
| else: | |
| await route.continue_() | |
| await page.route("**/*", _block_resources) | |
| url = f"https://scholar.google.com/scholar?q={quote_plus(q)}&num={n_results}" | |
| await page.goto(url) | |
| await page.wait_for_selector("div.gs_ri") | |
| items = await page.locator("div.gs_ri").all() | |
| results = [] | |
| for item in items[:n_results]: | |
| title = await item.locator("h3").inner_text(timeout=1000) | |
| body = await item.locator("div.gs_rs").inner_text(timeout=1000) | |
| href = await item.locator("h3 > a").get_attribute("href") | |
| results.append({ | |
| "title": title, | |
| "body": body, | |
| "href": href | |
| }) | |
| return results | |
| async def query_google_patents(browser: Browser, q: str, n_results: int = 10): | |
| """Queries google patents for the specified query and number of results. Returns relevant patents""" | |
| # regex to locate a patent id | |
| PATENT_ID_REGEX = r"\b[A-Z]{2}\d{6,}(?:[A-Z]\d?)?\b" | |
| async with playwright_open_page(browser) as page: | |
| async def _block_resources(route, request): | |
| if request.resource_type in ["stylesheet", "image"]: | |
| await route.abort() | |
| else: | |
| await route.continue_() | |
| await page.route("**/*", _block_resources) | |
| url = f"https://patents.google.com/?q={quote_plus(q)}&num={n_results}" | |
| await page.goto(url) | |
| # Wait for at least one search result item to appear | |
| # This ensures the page has loaded enough to start scraping | |
| await page.wait_for_function( | |
| f"""() => document.querySelectorAll('search-result-item').length >= 1""", | |
| timeout=30_000 | |
| ) | |
| items = await page.locator("search-result-item").all() | |
| results = [] | |
| for item in items: | |
| text = " ".join(await item.locator("span").all_inner_texts()) | |
| match = re.search(PATENT_ID_REGEX, text) | |
| if not match: | |
| continue | |
| patent_id = match.group() | |
| try: | |
| title = await item.locator("h3, h4").first.inner_text(timeout=1000) | |
| body = await item.locator("div.abstract, div.result-snippet, .snippet, .result-text").first.inner_text(timeout=1000) | |
| except: | |
| continue # If we can't get title or body, skip this item | |
| results.append({ | |
| "id": patent_id, | |
| "href": f"https://patents.google.com/patent/{patent_id}/en", | |
| "title": title, | |
| "body": body | |
| }) | |
| return results[:n_results] | |
| async def query_brave_search(browser: Browser, q: str, n_results: int = 10): | |
| """Queries Brave Search for the specified query.""" | |
| async with playwright_open_page(browser) as page: | |
| async def _block_resources(route, request): | |
| if request.resource_type in ["stylesheet", "image"]: | |
| await route.abort() | |
| else: | |
| await route.continue_() | |
| await page.route("**/*", _block_resources) | |
| url = f"https://search.brave.com/search?q={quote_plus(q)}" | |
| await page.goto(url) | |
| results_cards = await page.locator('.snippet').all() | |
| if len(results_cards) == 0: | |
| page_content = await page.content() | |
| if "suspicious" in page_content: | |
| raise BraveSearchBlockedException() | |
| results = [] | |
| for result in results_cards: | |
| title = await result.locator('.title').all_inner_texts() | |
| description = await result.locator('.snippet-description').all_inner_texts() | |
| url = await result.locator('a').nth(0).get_attribute('href') | |
| # Filter out results with no URL or brave-specific URLs | |
| if url is None or url.startswith('/'): | |
| continue | |
| results.append({ | |
| "title": title[0] if title else "", | |
| "body": description[0] if description else "", | |
| "href": url | |
| }) | |
| if len(results) >= n_results: | |
| break | |
| return results | |
| async def query_bing_search(browser: Browser, q: str, n_results: int = 10): | |
| """Queries bing search for the specified query""" | |
| async with playwright_open_page(browser) as page: | |
| async def _block_resources(route, request): | |
| if request.resource_type in ["stylesheet", "image"]: | |
| await route.abort() | |
| else: | |
| await route.continue_() | |
| await page.route("**/*", _block_resources) | |
| url = f"https://www.bing.com/search?q={quote_plus(q)}" | |
| await page.goto(url) | |
| await page.wait_for_selector("li.b_algo") | |
| results = [] | |
| items = await page.query_selector_all("li.b_algo") | |
| for item in items[:n_results]: | |
| title_el = await item.query_selector("h2 > a") | |
| url = await title_el.get_attribute("href") if title_el else None | |
| title = await title_el.inner_text() if title_el else "" | |
| snippet = "" | |
| # Try several fallback selectors | |
| for selector in [ | |
| "div.b_caption p", # typical snippet | |
| "div.b_caption", # sometimes snippet is here | |
| "div.b_snippet", # used in some result types | |
| "div.b_text", # used in some panels | |
| "p" # fallback to any paragraph | |
| ]: | |
| snippet_el = await item.query_selector(selector) | |
| if snippet_el: | |
| snippet = await snippet_el.inner_text() | |
| if snippet.strip(): | |
| break | |
| if title and url: | |
| results.append({ | |
| "title": title.strip(), | |
| "href": url.strip(), | |
| "body": snippet.strip() | |
| }) | |
| return results | |
| async def query_ddg_search(q: str, n_results: int = 10): | |
| """Queries duckduckgo search for the specified query""" | |
| ddgs = DDGS() | |
| results = [] | |
| for result in ddgs.text(q, max_results=n_results): | |
| results.append( | |
| {"title": result["title"], "body": result["body"], "href": result["href"]}) | |
| return results | |
| async def query_arxiv(client: httpx.AsyncClient, query: str, max_results: int = 3): | |
| """Searches arXiv for the specified query and returns a list of results with titles and PDF URLs.""" | |
| ATOM_NAMESPACE = {'atom': 'http://www.w3.org/2005/Atom'} | |
| ARXIV_API_URL = 'https://export.arxiv.org/api/query?' | |
| search_params = { | |
| 'search_query': query, | |
| 'start': 0, | |
| 'max_results': max_results | |
| } | |
| query_url = ARXIV_API_URL | |
| response = await client.get(query_url, params=search_params) | |
| response.raise_for_status() | |
| root = etree.fromstring(response.content) | |
| entries = root.findall('atom:entry', ATOM_NAMESPACE) | |
| results = [] | |
| for entry in entries: | |
| title = entry.find( | |
| 'atom:title', ATOM_NAMESPACE).text.strip().replace('\n', ' ') | |
| id = entry.find('atom:id', ATOM_NAMESPACE).text.strip() | |
| pdf_url = entry.find( | |
| 'atom:id', ATOM_NAMESPACE).text.replace('/abs/', '/pdf/') | |
| summary = entry.find( | |
| 'atom:summary', ATOM_NAMESPACE).text.strip() | |
| results.append({'title': title, 'href': pdf_url, | |
| 'body': summary, 'id': id}) | |
| return results | |