from __future__ import annotations from typing import Annotated, List import gradio as gr from ddgs import DDGS from app import _log_call_end, _log_call_start, _search_rate_limiter, _truncate_for_log from ._docstrings import autodoc # Single source of truth for the LLM-facing tool description TOOL_SUMMARY = ( "Run a DuckDuckGo-backed search across text, news, images, videos, or books. " "Readable results include pagination hints and next_offset when more results are available; " "Use in combination with `Web_Fetch` to navigate the web." ) _SAFESEARCH_LEVEL = "off" # Defaults and choices for newly added parameters BACKEND_CHOICES = [ "auto", "duckduckgo", "bing", "brave", "yahoo", "wikipedia", ] # Allowed backends per type (explicit selection set) _ALLOWED_BACKENDS = { "text": ["duckduckgo", "bing", "brave", "yahoo", "wikipedia"], "news": ["duckduckgo", "bing", "yahoo"], "images": ["duckduckgo"], "videos": ["duckduckgo"], "books": ["annasarchive"], } # Auto order per type (used when backend == "auto"); wikipedia excluded for text _AUTO_ORDER = { "text": ["duckduckgo", "bing", "brave", "yahoo"], "news": ["duckduckgo", "bing", "yahoo"], "images": ["duckduckgo"], "videos": ["duckduckgo"], "books": ["annasarchive"], } # Date filter choices: canonical values used by resolver DATE_FILTER_CHOICES = ["any", "day", "week", "month", "year"] def _resolve_backend(search_type: str, backend_choice: str) -> str: """Resolve backend string for DDGS based on search type and user choice. - If backend_choice is "auto", return a comma-separated fallback order for that type. - If backend_choice is not supported by the type, fall back to the first allowed backend. - Books endpoint uses only 'annasarchive'. """ stype = search_type if search_type in _ALLOWED_BACKENDS else "text" allowed = _ALLOWED_BACKENDS[stype] if backend_choice == "auto": return ", ".join(_AUTO_ORDER[stype]) if stype == "books": return "annasarchive" # Validate backend against allowed set for this type if backend_choice in allowed: return backend_choice # Fallback to first allowed backend return allowed[0] def _resolve_timelimit(date_filter: str, search_type: str) -> str | None: """Map UI date filter to DDGS timelimit code per endpoint. Returns one of: None, 'd', 'w', 'm', 'y'. For news/videos (which support d/w/m), selecting 'year' will coerce to 'm' to stay within supported range. """ normalized = (date_filter or "any").strip().lower() if normalized in ("any", "none", ""): return None mapping = { "day": "d", "week": "w", "month": "m", "year": "y", } code = mapping.get(normalized) if not code: return None if search_type in ("news", "videos") and code == "y": return "m" return code def _extract_date_from_snippet(snippet: str) -> str: if not snippet: return "" import re date_patterns = [ r"\b(\d{4}[-/]\d{1,2}[-/]\d{1,2})\b", r"\b([A-Za-z]{3,9}\s+\d{1,2},?\s+\d{4})\b", r"\b(\d{1,2}\s+[A-Za-z]{3,9}\s+\d{4})\b", r"\b(\d+\s+(?:day|week|month|year)s?\s+ago)\b", r"(?:Published|Updated|Posted):\s*([^,\n]+?)(?:[,\n]|$)", ] for pattern in date_patterns: matches = re.findall(pattern, snippet, re.IGNORECASE) if matches: return matches[0].strip() return "" def _format_search_result(result: dict, search_type: str, index: int) -> List[str]: lines: List[str] = [] if search_type == "text": title = result.get("title", "").strip() url = result.get("href", "").strip() snippet = result.get("body", "").strip() date = _extract_date_from_snippet(snippet) lines.append(f"{index}. {title}") lines.append(f" URL: {url}") if snippet: lines.append(f" Summary: {snippet}") if date: lines.append(f" Date: {date}") elif search_type == "news": title = result.get("title", "").strip() url = result.get("url", "").strip() body = result.get("body", "").strip() date = result.get("date", "").strip() source = result.get("source", "").strip() lines.append(f"{index}. {title}") lines.append(f" URL: {url}") if source: lines.append(f" Source: {source}") if date: lines.append(f" Date: {date}") if body: lines.append(f" Summary: {body}") elif search_type == "images": title = result.get("title", "").strip() image_url = result.get("image", "").strip() source_url = result.get("url", "").strip() source = result.get("source", "").strip() width = result.get("width", "") height = result.get("height", "") lines.append(f"{index}. {title}") lines.append(f" Image: {image_url}") lines.append(f" Source: {source_url}") if source: lines.append(f" Publisher: {source}") if width and height: lines.append(f" Dimensions: {width}x{height}") elif search_type == "videos": title = result.get("title", "").strip() description = result.get("description", "").strip() duration = result.get("duration", "").strip() published = result.get("published", "").strip() uploader = result.get("uploader", "").strip() embed_url = result.get("embed_url", "").strip() lines.append(f"{index}. {title}") if embed_url: lines.append(f" Video: {embed_url}") if uploader: lines.append(f" Uploader: {uploader}") if duration: lines.append(f" Duration: {duration}") if published: lines.append(f" Published: {published}") if description: lines.append(f" Description: {description}") elif search_type == "books": title = result.get("title", "").strip() url = result.get("url", "").strip() body = result.get("body", "").strip() lines.append(f"{index}. {title}") lines.append(f" URL: {url}") if body: lines.append(f" Description: {body}") return lines @autodoc( summary=TOOL_SUMMARY, ) def Web_Search( query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."], max_results: Annotated[int, "Number of results to return (1β20)."] = 5, page: Annotated[int, "Page number for pagination (1-based, each page contains max_results items)."] = 1, offset: Annotated[int, "Result offset to start from (overrides page if > 0, for precise continuation)."] = 0, search_type: Annotated[str, "Type of search: 'text' (web pages), 'news', 'images', 'videos', or 'books'."] = "text", backend: Annotated[str, "Search backend or ordered fallbacks. Use 'auto' for recommended order."] = "auto", date_filter: Annotated[str, "Time filter: any, day, week, month, year."] = "any", ) -> str: _log_call_start( "Web_Search", query=query, max_results=max_results, page=page, search_type=search_type, offset=offset, backend=backend, date_filter=date_filter, ) if not query or not query.strip(): result = "No search query provided. Please enter a search term." _log_call_end("Web_Search", _truncate_for_log(result)) return result max_results = max(1, min(20, max_results)) page = max(1, page) offset = max(0, offset) valid_types = ["text", "news", "images", "videos", "books"] if search_type not in valid_types: search_type = "text" if offset > 0: actual_offset = offset calculated_page = (offset // max_results) + 1 else: actual_offset = (page - 1) * max_results calculated_page = page total_needed = actual_offset + max_results used_fallback = False original_search_type = search_type # Prepare cross-cutting parameters resolved_backend = _resolve_backend(search_type, (backend or "auto").lower()) timelimit = _resolve_timelimit(date_filter, search_type) def _perform_search(stype: str) -> list[dict]: try: _search_rate_limiter.acquire() with DDGS() as ddgs: if stype == "text": user_backend_choice = (backend or "auto").lower() if user_backend_choice == "auto": # Custom auto: DDG first, then append other engines results: list[dict] = [] seen: set[str] = set() def add_unique(items: list[dict], key_field: str) -> None: for it in items or []: url = (it.get(key_field, "") or "").strip() if url and url not in seen: seen.add(url) results.append(it) # First: duckduckgo try: ddg_items = list( ddgs.text( query, max_results=total_needed + 10, safesearch=_SAFESEARCH_LEVEL, timelimit=timelimit, backend="duckduckgo", ) ) except Exception: ddg_items = [] add_unique(ddg_items, "href") # Then: other engines appended (excluding duckduckgo) for eng in [b for b in _AUTO_ORDER["text"] if b != "duckduckgo"]: try: extra = list( ddgs.text( query, max_results=total_needed + 10, safesearch=_SAFESEARCH_LEVEL, timelimit=timelimit, backend=eng, ) ) except Exception: extra = [] add_unique(extra, "href") return results else: raw_gen = ddgs.text( query, max_results=total_needed + 10, safesearch=_SAFESEARCH_LEVEL, timelimit=timelimit, backend=resolved_backend, ) elif stype == "news": user_backend_choice = (backend or "auto").lower() if user_backend_choice == "auto": # Custom auto: DDG first, then append other engines results: list[dict] = [] seen: set[str] = set() def add_unique(items: list[dict], key_field: str) -> None: for it in items or []: url = (it.get(key_field, "") or "").strip() if url and url not in seen: seen.add(url) results.append(it) # First: duckduckgo news try: ddg_news = list( ddgs.news( query, max_results=total_needed + 10, safesearch=_SAFESEARCH_LEVEL, timelimit=timelimit, backend="duckduckgo", ) ) except Exception: ddg_news = [] add_unique(ddg_news, "url") # Then: other news engines appended for eng in [b for b in _AUTO_ORDER["news"] if b != "duckduckgo"]: try: extra = list( ddgs.news( query, max_results=total_needed + 10, safesearch=_SAFESEARCH_LEVEL, timelimit=timelimit, backend=eng, ) ) except Exception: extra = [] add_unique(extra, "url") return results else: raw_gen = ddgs.news( query, max_results=total_needed + 10, safesearch=_SAFESEARCH_LEVEL, timelimit=timelimit, backend=_resolve_backend("news", (backend or "auto").lower()), ) elif stype == "images": raw_gen = ddgs.images( query, max_results=total_needed + 10, safesearch=_SAFESEARCH_LEVEL, timelimit=timelimit, backend=_resolve_backend("images", (backend or "auto").lower()), ) elif stype == "videos": raw_gen = ddgs.videos( query, max_results=total_needed + 10, safesearch=_SAFESEARCH_LEVEL, timelimit=timelimit, backend=_resolve_backend("videos", (backend or "auto").lower()), ) else: raw_gen = ddgs.books( query, max_results=total_needed + 10, backend=_resolve_backend("books", (backend or "auto").lower()), ) try: return list(raw_gen) except Exception as inner_exc: if "no results" in str(inner_exc).lower() or "not found" in str(inner_exc).lower(): return [] raise inner_exc except Exception as exc: error_msg = f"Search failed: {str(exc)[:200]}" lowered = str(exc).lower() if "blocked" in lowered or "rate" in lowered: error_msg = "Search temporarily blocked due to rate limiting. Please try again in a few minutes." elif "timeout" in lowered: error_msg = "Search timed out. Please try again with a simpler query." elif "network" in lowered or "connection" in lowered: error_msg = "Network connection error. Please check your internet connection and try again." elif "no results" in lowered or "not found" in lowered: return [] raise Exception(error_msg) try: raw = _perform_search(search_type) except Exception as exc: result = f"Error: {exc}" _log_call_end("Web_Search", _truncate_for_log(result)) return result if not raw and search_type == "news": try: raw = _perform_search("text") if raw: used_fallback = True search_type = "text" except Exception: pass if not raw: fallback_note = " (also tried 'text' search as fallback)" if original_search_type == "news" and used_fallback else "" result = f"No {original_search_type} results found for query: {query}{fallback_note}" _log_call_end("Web_Search", _truncate_for_log(result)) return result paginated_results = raw[actual_offset: actual_offset + max_results] if not paginated_results: if actual_offset >= len(raw): result = f"Offset {actual_offset} exceeds available results ({len(raw)} total). Try offset=0 to start from beginning." else: result = f"No {original_search_type} results found on page {calculated_page} for query: {query}. Try page 1 or reduce page number." _log_call_end("Web_Search", _truncate_for_log(result)) return result total_available = len(raw) start_num = actual_offset + 1 end_num = actual_offset + len(paginated_results) next_offset = actual_offset + len(paginated_results) search_label = original_search_type.title() if used_fallback: search_label += " β Text (Smart Fallback)" pagination_info = f"Page {calculated_page}" if offset > 0: pagination_info = f"Offset {actual_offset} (β {pagination_info})" lines = [f"{search_label} search results for: {query}"] if used_fallback: lines.append("π Note: News search returned no results, automatically searched general web content instead") lines.append(f"{pagination_info} (results {start_num}-{end_num} of ~{total_available}+ available)\n") for i, result in enumerate(paginated_results, start_num): result_lines = _format_search_result(result, search_type, i) lines.extend(result_lines) lines.append("") if total_available > end_num: lines.append("π‘ More results available:") lines.append(f" β’ Next page: page={calculated_page + 1}") lines.append(f" β’ Next offset: offset={next_offset}") lines.append(f" β’ Use offset={next_offset} to continue exactly from result {next_offset + 1}") result = "\n".join(lines) search_info = f"type={original_search_type}" if used_fallback: search_info += "βtext" _log_call_end("Web_Search", f"{search_info} page={calculated_page} offset={actual_offset} results={len(paginated_results)} chars={len(result)}") return result def build_interface() -> gr.Interface: return gr.Interface( fn=Web_Search, inputs=[ gr.Textbox(label="Query", placeholder="topic OR site:example.com", max_lines=1), gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results"), gr.Slider(minimum=1, maximum=10, value=1, step=1, label="Page", info="Page number for pagination (ignored if offset > 0)"), gr.Slider( minimum=0, maximum=1000, value=0, step=1, label="Offset", info="Result offset to start from (overrides page if > 0, use next_offset from previous search)", ), gr.Radio( label="Search Type", choices=["text", "news", "images", "videos", "books"], value="text", info="Type of content to search for", ), gr.Radio( label="Backend", choices=BACKEND_CHOICES, value="auto", info="Search engine backend or fallback order (auto applies recommended order)", ), gr.Radio( label="Date filter", choices=DATE_FILTER_CHOICES, value="any", info="Limit results to: day, week, month, or year (varies by type)", ), ], outputs=gr.Textbox(label="Search Results", interactive=False, lines=20, max_lines=20), title="Web Search", description=( "