Spaces:
Sleeping
Sleeping
| import asyncio | |
| from contextlib import asynccontextmanager | |
| from typing import Optional | |
| from fastapi import APIRouter, FastAPI | |
| from fastapi.routing import APIRouter | |
| import httpx | |
| from pydantic import BaseModel, Field | |
| from playwright.async_api import async_playwright, Browser, BrowserContext, Page | |
| import logging | |
| import uvicorn | |
| from scrap import PatentScrapBulkResponse, scrap_patent_async, scrap_patent_bulk_async | |
| from serp import SerpQuery, SerpResults, query_arxiv, query_bing_search, query_brave_search, query_ddg_search, query_google_patents, query_google_scholar | |
| from utils import log_gathered_exceptions | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]: %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| # playwright global context | |
| playwright = None | |
| pw_browser: Optional[Browser] = None | |
| # httpx client | |
| httpx_client = httpx.AsyncClient(timeout=30, limits=httpx.Limits( | |
| max_connections=30, max_keepalive_connections=20)) | |
| async def api_lifespan(app: FastAPI): | |
| global playwright, pw_browser | |
| playwright = await async_playwright().start() | |
| pw_browser = await playwright.chromium.launch(headless=True) | |
| yield | |
| await pw_browser.close() | |
| await playwright.stop() | |
| app = FastAPI(lifespan=api_lifespan, docs_url="/", | |
| title="SERPent", description=open("docs/docs.md").read()) | |
| # Router for scrapping related endpoints | |
| scrap_router = APIRouter(prefix="/scrap", tags=["scrapping"]) | |
| # Router for SERP-scrapping related endpoints | |
| serp_router = APIRouter(prefix="/serp", tags=["serp scrapping"]) | |
| # ===================== Search endpoints ===================== | |
| async def search_google_scholar(params: SerpQuery): | |
| """Queries google scholar for the specified query""" | |
| logging.info(f"Searching Google Scholar for queries: {params.queries}") | |
| results = await asyncio.gather(*[query_google_scholar(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) | |
| log_gathered_exceptions(results, "google scholar search", params) | |
| # Filter out exceptions and flatten the results | |
| filtered_results = [r for r in results if not isinstance(r, Exception)] | |
| flattened_results = [ | |
| item for sublist in filtered_results for item in sublist] | |
| # all queries failed, return the last exception | |
| if len(filtered_results) == 0: | |
| return SerpResults(results=[], error=str(results[-1])) | |
| return SerpResults(results=flattened_results, error=None) | |
| async def search_arxiv(params: SerpQuery): | |
| """Searches arxiv for the specified queries and returns the found documents.""" | |
| logging.info(f"Searching Arxiv for queries: {params.queries}") | |
| results = await asyncio.gather(*[query_arxiv(httpx_client, q, params.n_results) for q in params.queries], return_exceptions=True) | |
| log_gathered_exceptions(results, "arxiv search", params) | |
| filtered_results = [r for r in results if not isinstance(r, Exception)] | |
| flattened_results = [ | |
| item for sublist in filtered_results for item in sublist] | |
| if len(filtered_results) == 0: | |
| return SerpResults(results=[], error=str(results[-1])) | |
| return SerpResults(results=flattened_results, error=None) | |
| async def search_patents(params: SerpQuery) -> SerpResults: | |
| """Searches google patents for the specified queries and returns the found documents.""" | |
| logging.info(f"Searching Google Patents for queries: {params.queries}") | |
| results = await asyncio.gather(*[query_google_patents(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) | |
| log_gathered_exceptions(results, "google patent search", params) | |
| # Filter out exceptions and flatten the results | |
| filtered_results = [r for r in results if not isinstance(r, Exception)] | |
| flattened_results = [ | |
| item for sublist in filtered_results for item in sublist] | |
| # all queries failed, return the last exception | |
| if len(filtered_results) == 0: | |
| return SerpResults(results=[], error=str(results[-1])) | |
| return SerpResults(results=flattened_results, error=None) | |
| async def search_brave(params: SerpQuery) -> SerpResults: | |
| """Searches brave search for the specified queries and returns the found documents.""" | |
| logging.info(f"Searching Brave Search for queries: {params.queries}") | |
| results = await asyncio.gather(*[query_brave_search(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) | |
| log_gathered_exceptions(results, "brave search", params) | |
| # Filter out exceptions and flatten the results | |
| filtered_results = [r for r in results if not isinstance(r, Exception)] | |
| flattened_results = [ | |
| item for sublist in filtered_results for item in sublist] | |
| # all queries failed, return the last exception | |
| if len(filtered_results) == 0: | |
| return SerpResults(results=[], error=str(results[-1])) | |
| return SerpResults(results=flattened_results, error=None) | |
| async def search_bing(params: SerpQuery) -> SerpResults: | |
| """Searches Bing search for the specified queries and returns the found documents.""" | |
| logging.info(f"Searching Bing Search for queries: {params.queries}") | |
| results = await asyncio.gather(*[query_bing_search(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) | |
| log_gathered_exceptions(results, "bing search", params) | |
| # Filter out exceptions and flatten the results | |
| filtered_results = [r for r in results if not isinstance(r, Exception)] | |
| flattened_results = [ | |
| item for sublist in filtered_results for item in sublist] | |
| # all queries failed, return the last exception | |
| if len(filtered_results) == 0: | |
| return SerpResults(results=[], error=str(results[-1])) | |
| return SerpResults(results=flattened_results, error=None) | |
| async def search_duck(params: SerpQuery) -> SerpResults: | |
| """Searches duckduckgo for the specified queries and returns the found documents""" | |
| logging.info(f"Searching DuckDuckGo for queries: {params.queries}") | |
| results = await asyncio.gather(*[query_ddg_search(q, params.n_results) for q in params.queries], return_exceptions=True) | |
| log_gathered_exceptions(results, "duckduckgo search", params) | |
| # Filter out exceptions and flatten the results | |
| filtered_results = [r for r in results if not isinstance(r, Exception)] | |
| flattened_results = [ | |
| item for sublist in filtered_results for item in sublist] | |
| # all queries failed, return the last exception | |
| if len(filtered_results) == 0: | |
| return SerpResults(results=[], error=str(results[-1])) | |
| return SerpResults(results=flattened_results, error=None) | |
| async def search(params: SerpQuery): | |
| """Attempts to search the specified queries using ALL backends""" | |
| results = [] | |
| for q in params.queries: | |
| try: | |
| logging.info(f"Querying DDG with query: `{q}`") | |
| res = await query_ddg_search(q, params.n_results) | |
| results.extend(res) | |
| continue | |
| except Exception as e: | |
| logging.error(f"Failed to query DDG with query `{q}`: {e}") | |
| logging.info("Trying with next browser backend.") | |
| try: | |
| logging.info(f"Querying Brave Search with query: `{q}`") | |
| res = await query_brave_search(pw_browser, q, params.n_results) | |
| results.extend(res) | |
| continue | |
| except Exception as e: | |
| logging.error( | |
| f"Failed to query Brave Search with query `{q}`: {e}") | |
| logging.info("Trying with next browser backend.") | |
| try: | |
| logging.info(f"Querying Bing with query: `{q}`") | |
| res = await query_bing_search(pw_browser, q, params.n_results) | |
| results.extend(res) | |
| continue | |
| except Exception as e: | |
| logging.error(f"Failed to query Bing search with query `{q}`: {e}") | |
| logging.info("Trying with next browser backend.") | |
| if len(results) == 0: | |
| return SerpResults(results=[], error="All backends are rate-limited.") | |
| return SerpResults(results=results, error=None) | |
| # =========================== Scrapping endpoints =========================== | |
| # TODO: return a proper error response if the patent is not found or scrapping fails | |
| async def scrap_patent(patent_id: str): | |
| """Scraps the specified patent from Google Patents.""" | |
| try: | |
| patent = await scrap_patent_async(httpx_client, f"https://patents.google.com/patent/{patent_id}/en") | |
| return patent | |
| except Exception as e: | |
| logging.warning(f"Failed to scrap patent {patent_id}: {e}") | |
| return None | |
| class ScrapPatentsRequest(BaseModel): | |
| """Request model for scrapping multiple patents.""" | |
| patent_ids: list[str] = Field(..., | |
| description="List of patent IDs to scrap") | |
| async def scrap_patents(params: ScrapPatentsRequest) -> PatentScrapBulkResponse: | |
| """Scraps multiple patents from Google Patents.""" | |
| patents = await scrap_patent_bulk_async(httpx_client, params.patent_ids) | |
| return patents | |
| # =============================================================================== | |
| app.include_router(serp_router) | |
| app.include_router(scrap_router) | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |