|
|
from __future__ import annotations |
|
|
|
|
|
from typing import Annotated, List |
|
|
|
|
|
import gradio as gr |
|
|
from ddgs import DDGS |
|
|
|
|
|
from app import _log_call_end, _log_call_start, _search_rate_limiter, _truncate_for_log |
|
|
from ._docstrings import autodoc |
|
|
|
|
|
|
|
|
|
|
|
TOOL_SUMMARY = ( |
|
|
"Run a DuckDuckGo-backed search across text, news, images, videos, or books. " |
|
|
"Readable results include pagination hints and next_offset when more results are available; " |
|
|
"Use in combination with `Web_Fetch` to navigate the web." |
|
|
) |
|
|
|
|
|
|
|
|
_SAFESEARCH_LEVEL = "off" |
|
|
|
|
|
|
|
|
BACKEND_CHOICES = [ |
|
|
"auto", |
|
|
"duckduckgo", |
|
|
"bing", |
|
|
"brave", |
|
|
"yahoo", |
|
|
"wikipedia", |
|
|
] |
|
|
|
|
|
|
|
|
_ALLOWED_BACKENDS = { |
|
|
"text": ["duckduckgo", "bing", "brave", "yahoo", "wikipedia"], |
|
|
"news": ["duckduckgo", "bing", "yahoo"], |
|
|
"images": ["duckduckgo"], |
|
|
"videos": ["duckduckgo"], |
|
|
"books": ["annasarchive"], |
|
|
} |
|
|
|
|
|
|
|
|
_AUTO_ORDER = { |
|
|
"text": ["duckduckgo", "bing", "brave", "yahoo"], |
|
|
"news": ["duckduckgo", "bing", "yahoo"], |
|
|
"images": ["duckduckgo"], |
|
|
"videos": ["duckduckgo"], |
|
|
"books": ["annasarchive"], |
|
|
} |
|
|
|
|
|
|
|
|
DATE_FILTER_CHOICES = ["any", "day", "week", "month", "year"] |
|
|
|
|
|
|
|
|
def _resolve_backend(search_type: str, backend_choice: str) -> str: |
|
|
"""Resolve backend string for DDGS based on search type and user choice. |
|
|
|
|
|
- If backend_choice is "auto", return a comma-separated fallback order for that type. |
|
|
- If backend_choice is not supported by the type, fall back to the first allowed backend. |
|
|
- Books endpoint uses only 'annasarchive'. |
|
|
""" |
|
|
stype = search_type if search_type in _ALLOWED_BACKENDS else "text" |
|
|
allowed = _ALLOWED_BACKENDS[stype] |
|
|
if backend_choice == "auto": |
|
|
return ", ".join(_AUTO_ORDER[stype]) |
|
|
if stype == "books": |
|
|
return "annasarchive" |
|
|
|
|
|
if backend_choice in allowed: |
|
|
return backend_choice |
|
|
|
|
|
return allowed[0] |
|
|
|
|
|
|
|
|
def _resolve_timelimit(date_filter: str, search_type: str) -> str | None: |
|
|
"""Map UI date filter to DDGS timelimit code per endpoint. |
|
|
|
|
|
Returns one of: None, 'd', 'w', 'm', 'y'. For news/videos (which support d/w/m), |
|
|
selecting 'year' will coerce to 'm' to stay within supported range. |
|
|
""" |
|
|
normalized = (date_filter or "any").strip().lower() |
|
|
if normalized in ("any", "none", ""): |
|
|
return None |
|
|
mapping = { |
|
|
"day": "d", |
|
|
"week": "w", |
|
|
"month": "m", |
|
|
"year": "y", |
|
|
} |
|
|
code = mapping.get(normalized) |
|
|
if not code: |
|
|
return None |
|
|
if search_type in ("news", "videos") and code == "y": |
|
|
return "m" |
|
|
return code |
|
|
|
|
|
|
|
|
def _extract_date_from_snippet(snippet: str) -> str: |
|
|
if not snippet: |
|
|
return "" |
|
|
import re |
|
|
|
|
|
date_patterns = [ |
|
|
r"\b(\d{4}[-/]\d{1,2}[-/]\d{1,2})\b", |
|
|
r"\b([A-Za-z]{3,9}\s+\d{1,2},?\s+\d{4})\b", |
|
|
r"\b(\d{1,2}\s+[A-Za-z]{3,9}\s+\d{4})\b", |
|
|
r"\b(\d+\s+(?:day|week|month|year)s?\s+ago)\b", |
|
|
r"(?:Published|Updated|Posted):\s*([^,\n]+?)(?:[,\n]|$)", |
|
|
] |
|
|
for pattern in date_patterns: |
|
|
matches = re.findall(pattern, snippet, re.IGNORECASE) |
|
|
if matches: |
|
|
return matches[0].strip() |
|
|
return "" |
|
|
|
|
|
|
|
|
def _format_search_result(result: dict, search_type: str, index: int) -> List[str]: |
|
|
lines: List[str] = [] |
|
|
if search_type == "text": |
|
|
title = result.get("title", "").strip() |
|
|
url = result.get("href", "").strip() |
|
|
snippet = result.get("body", "").strip() |
|
|
date = _extract_date_from_snippet(snippet) |
|
|
lines.append(f"{index}. {title}") |
|
|
lines.append(f" URL: {url}") |
|
|
if snippet: |
|
|
lines.append(f" Summary: {snippet}") |
|
|
if date: |
|
|
lines.append(f" Date: {date}") |
|
|
elif search_type == "news": |
|
|
title = result.get("title", "").strip() |
|
|
url = result.get("url", "").strip() |
|
|
body = result.get("body", "").strip() |
|
|
date = result.get("date", "").strip() |
|
|
source = result.get("source", "").strip() |
|
|
lines.append(f"{index}. {title}") |
|
|
lines.append(f" URL: {url}") |
|
|
if source: |
|
|
lines.append(f" Source: {source}") |
|
|
if date: |
|
|
lines.append(f" Date: {date}") |
|
|
if body: |
|
|
lines.append(f" Summary: {body}") |
|
|
elif search_type == "images": |
|
|
title = result.get("title", "").strip() |
|
|
image_url = result.get("image", "").strip() |
|
|
source_url = result.get("url", "").strip() |
|
|
source = result.get("source", "").strip() |
|
|
width = result.get("width", "") |
|
|
height = result.get("height", "") |
|
|
lines.append(f"{index}. {title}") |
|
|
lines.append(f" Image: {image_url}") |
|
|
lines.append(f" Source: {source_url}") |
|
|
if source: |
|
|
lines.append(f" Publisher: {source}") |
|
|
if width and height: |
|
|
lines.append(f" Dimensions: {width}x{height}") |
|
|
elif search_type == "videos": |
|
|
title = result.get("title", "").strip() |
|
|
description = result.get("description", "").strip() |
|
|
duration = result.get("duration", "").strip() |
|
|
published = result.get("published", "").strip() |
|
|
uploader = result.get("uploader", "").strip() |
|
|
embed_url = result.get("embed_url", "").strip() |
|
|
lines.append(f"{index}. {title}") |
|
|
if embed_url: |
|
|
lines.append(f" Video: {embed_url}") |
|
|
if uploader: |
|
|
lines.append(f" Uploader: {uploader}") |
|
|
if duration: |
|
|
lines.append(f" Duration: {duration}") |
|
|
if published: |
|
|
lines.append(f" Published: {published}") |
|
|
if description: |
|
|
lines.append(f" Description: {description}") |
|
|
elif search_type == "books": |
|
|
title = result.get("title", "").strip() |
|
|
url = result.get("url", "").strip() |
|
|
body = result.get("body", "").strip() |
|
|
lines.append(f"{index}. {title}") |
|
|
lines.append(f" URL: {url}") |
|
|
if body: |
|
|
lines.append(f" Description: {body}") |
|
|
return lines |
|
|
|
|
|
|
|
|
@autodoc( |
|
|
summary=TOOL_SUMMARY, |
|
|
) |
|
|
def Web_Search( |
|
|
query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."], |
|
|
max_results: Annotated[int, "Number of results to return (1–20)."] = 5, |
|
|
page: Annotated[int, "Page number for pagination (1-based, each page contains max_results items)."] = 1, |
|
|
offset: Annotated[int, "Result offset to start from (overrides page if > 0, for precise continuation)."] = 0, |
|
|
search_type: Annotated[str, "Type of search: 'text' (web pages), 'news', 'images', 'videos', or 'books'."] = "text", |
|
|
backend: Annotated[str, "Search backend or ordered fallbacks. Use 'auto' for recommended order."] = "auto", |
|
|
date_filter: Annotated[str, "Time filter: any, day, week, month, year."] = "any", |
|
|
) -> str: |
|
|
_log_call_start( |
|
|
"Web_Search", |
|
|
query=query, |
|
|
max_results=max_results, |
|
|
page=page, |
|
|
search_type=search_type, |
|
|
offset=offset, |
|
|
backend=backend, |
|
|
date_filter=date_filter, |
|
|
) |
|
|
if not query or not query.strip(): |
|
|
result = "No search query provided. Please enter a search term." |
|
|
_log_call_end("Web_Search", _truncate_for_log(result)) |
|
|
return result |
|
|
max_results = max(1, min(20, max_results)) |
|
|
page = max(1, page) |
|
|
offset = max(0, offset) |
|
|
valid_types = ["text", "news", "images", "videos", "books"] |
|
|
if search_type not in valid_types: |
|
|
search_type = "text" |
|
|
if offset > 0: |
|
|
actual_offset = offset |
|
|
calculated_page = (offset // max_results) + 1 |
|
|
else: |
|
|
actual_offset = (page - 1) * max_results |
|
|
calculated_page = page |
|
|
total_needed = actual_offset + max_results |
|
|
used_fallback = False |
|
|
original_search_type = search_type |
|
|
|
|
|
resolved_backend = _resolve_backend(search_type, (backend or "auto").lower()) |
|
|
timelimit = _resolve_timelimit(date_filter, search_type) |
|
|
|
|
|
def _perform_search(stype: str) -> list[dict]: |
|
|
try: |
|
|
_search_rate_limiter.acquire() |
|
|
with DDGS() as ddgs: |
|
|
if stype == "text": |
|
|
user_backend_choice = (backend or "auto").lower() |
|
|
if user_backend_choice == "auto": |
|
|
|
|
|
results: list[dict] = [] |
|
|
seen: set[str] = set() |
|
|
|
|
|
def add_unique(items: list[dict], key_field: str) -> None: |
|
|
for it in items or []: |
|
|
url = (it.get(key_field, "") or "").strip() |
|
|
if url and url not in seen: |
|
|
seen.add(url) |
|
|
results.append(it) |
|
|
|
|
|
|
|
|
try: |
|
|
ddg_items = list( |
|
|
ddgs.text( |
|
|
query, |
|
|
max_results=total_needed + 10, |
|
|
safesearch=_SAFESEARCH_LEVEL, |
|
|
timelimit=timelimit, |
|
|
backend="duckduckgo", |
|
|
) |
|
|
) |
|
|
except Exception: |
|
|
ddg_items = [] |
|
|
add_unique(ddg_items, "href") |
|
|
|
|
|
|
|
|
for eng in [b for b in _AUTO_ORDER["text"] if b != "duckduckgo"]: |
|
|
try: |
|
|
extra = list( |
|
|
ddgs.text( |
|
|
query, |
|
|
max_results=total_needed + 10, |
|
|
safesearch=_SAFESEARCH_LEVEL, |
|
|
timelimit=timelimit, |
|
|
backend=eng, |
|
|
) |
|
|
) |
|
|
except Exception: |
|
|
extra = [] |
|
|
add_unique(extra, "href") |
|
|
|
|
|
return results |
|
|
else: |
|
|
raw_gen = ddgs.text( |
|
|
query, |
|
|
max_results=total_needed + 10, |
|
|
safesearch=_SAFESEARCH_LEVEL, |
|
|
timelimit=timelimit, |
|
|
backend=resolved_backend, |
|
|
) |
|
|
elif stype == "news": |
|
|
user_backend_choice = (backend or "auto").lower() |
|
|
if user_backend_choice == "auto": |
|
|
|
|
|
results: list[dict] = [] |
|
|
seen: set[str] = set() |
|
|
|
|
|
def add_unique(items: list[dict], key_field: str) -> None: |
|
|
for it in items or []: |
|
|
url = (it.get(key_field, "") or "").strip() |
|
|
if url and url not in seen: |
|
|
seen.add(url) |
|
|
results.append(it) |
|
|
|
|
|
|
|
|
try: |
|
|
ddg_news = list( |
|
|
ddgs.news( |
|
|
query, |
|
|
max_results=total_needed + 10, |
|
|
safesearch=_SAFESEARCH_LEVEL, |
|
|
timelimit=timelimit, |
|
|
backend="duckduckgo", |
|
|
) |
|
|
) |
|
|
except Exception: |
|
|
ddg_news = [] |
|
|
add_unique(ddg_news, "url") |
|
|
|
|
|
|
|
|
for eng in [b for b in _AUTO_ORDER["news"] if b != "duckduckgo"]: |
|
|
try: |
|
|
extra = list( |
|
|
ddgs.news( |
|
|
query, |
|
|
max_results=total_needed + 10, |
|
|
safesearch=_SAFESEARCH_LEVEL, |
|
|
timelimit=timelimit, |
|
|
backend=eng, |
|
|
) |
|
|
) |
|
|
except Exception: |
|
|
extra = [] |
|
|
add_unique(extra, "url") |
|
|
|
|
|
return results |
|
|
else: |
|
|
raw_gen = ddgs.news( |
|
|
query, |
|
|
max_results=total_needed + 10, |
|
|
safesearch=_SAFESEARCH_LEVEL, |
|
|
timelimit=timelimit, |
|
|
backend=_resolve_backend("news", (backend or "auto").lower()), |
|
|
) |
|
|
elif stype == "images": |
|
|
raw_gen = ddgs.images( |
|
|
query, |
|
|
max_results=total_needed + 10, |
|
|
safesearch=_SAFESEARCH_LEVEL, |
|
|
timelimit=timelimit, |
|
|
backend=_resolve_backend("images", (backend or "auto").lower()), |
|
|
) |
|
|
elif stype == "videos": |
|
|
raw_gen = ddgs.videos( |
|
|
query, |
|
|
max_results=total_needed + 10, |
|
|
safesearch=_SAFESEARCH_LEVEL, |
|
|
timelimit=timelimit, |
|
|
backend=_resolve_backend("videos", (backend or "auto").lower()), |
|
|
) |
|
|
else: |
|
|
raw_gen = ddgs.books( |
|
|
query, |
|
|
max_results=total_needed + 10, |
|
|
backend=_resolve_backend("books", (backend or "auto").lower()), |
|
|
) |
|
|
try: |
|
|
return list(raw_gen) |
|
|
except Exception as inner_exc: |
|
|
if "no results" in str(inner_exc).lower() or "not found" in str(inner_exc).lower(): |
|
|
return [] |
|
|
raise inner_exc |
|
|
except Exception as exc: |
|
|
error_msg = f"Search failed: {str(exc)[:200]}" |
|
|
lowered = str(exc).lower() |
|
|
if "blocked" in lowered or "rate" in lowered: |
|
|
error_msg = "Search temporarily blocked due to rate limiting. Please try again in a few minutes." |
|
|
elif "timeout" in lowered: |
|
|
error_msg = "Search timed out. Please try again with a simpler query." |
|
|
elif "network" in lowered or "connection" in lowered: |
|
|
error_msg = "Network connection error. Please check your internet connection and try again." |
|
|
elif "no results" in lowered or "not found" in lowered: |
|
|
return [] |
|
|
raise Exception(error_msg) |
|
|
|
|
|
try: |
|
|
raw = _perform_search(search_type) |
|
|
except Exception as exc: |
|
|
result = f"Error: {exc}" |
|
|
_log_call_end("Web_Search", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
if not raw and search_type == "news": |
|
|
try: |
|
|
raw = _perform_search("text") |
|
|
if raw: |
|
|
used_fallback = True |
|
|
search_type = "text" |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if not raw: |
|
|
fallback_note = " (also tried 'text' search as fallback)" if original_search_type == "news" and used_fallback else "" |
|
|
result = f"No {original_search_type} results found for query: {query}{fallback_note}" |
|
|
_log_call_end("Web_Search", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
paginated_results = raw[actual_offset: actual_offset + max_results] |
|
|
if not paginated_results: |
|
|
if actual_offset >= len(raw): |
|
|
result = f"Offset {actual_offset} exceeds available results ({len(raw)} total). Try offset=0 to start from beginning." |
|
|
else: |
|
|
result = f"No {original_search_type} results found on page {calculated_page} for query: {query}. Try page 1 or reduce page number." |
|
|
_log_call_end("Web_Search", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
total_available = len(raw) |
|
|
start_num = actual_offset + 1 |
|
|
end_num = actual_offset + len(paginated_results) |
|
|
next_offset = actual_offset + len(paginated_results) |
|
|
search_label = original_search_type.title() |
|
|
if used_fallback: |
|
|
search_label += " → Text (Smart Fallback)" |
|
|
pagination_info = f"Page {calculated_page}" |
|
|
if offset > 0: |
|
|
pagination_info = f"Offset {actual_offset} (≈ {pagination_info})" |
|
|
lines = [f"{search_label} search results for: {query}"] |
|
|
if used_fallback: |
|
|
lines.append("📍 Note: News search returned no results, automatically searched general web content instead") |
|
|
lines.append(f"{pagination_info} (results {start_num}-{end_num} of ~{total_available}+ available)\n") |
|
|
for i, result in enumerate(paginated_results, start_num): |
|
|
result_lines = _format_search_result(result, search_type, i) |
|
|
lines.extend(result_lines) |
|
|
lines.append("") |
|
|
if total_available > end_num: |
|
|
lines.append("💡 More results available:") |
|
|
lines.append(f" • Next page: page={calculated_page + 1}") |
|
|
lines.append(f" • Next offset: offset={next_offset}") |
|
|
lines.append(f" • Use offset={next_offset} to continue exactly from result {next_offset + 1}") |
|
|
result = "\n".join(lines) |
|
|
search_info = f"type={original_search_type}" |
|
|
if used_fallback: |
|
|
search_info += "→text" |
|
|
_log_call_end("Web_Search", f"{search_info} page={calculated_page} offset={actual_offset} results={len(paginated_results)} chars={len(result)}") |
|
|
return result |
|
|
|
|
|
|
|
|
def build_interface() -> gr.Interface: |
|
|
return gr.Interface( |
|
|
fn=Web_Search, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Query", placeholder="topic OR site:example.com", max_lines=1), |
|
|
gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results"), |
|
|
gr.Slider(minimum=1, maximum=10, value=1, step=1, label="Page", info="Page number for pagination (ignored if offset > 0)"), |
|
|
gr.Slider( |
|
|
minimum=0, |
|
|
maximum=1000, |
|
|
value=0, |
|
|
step=1, |
|
|
label="Offset", |
|
|
info="Result offset to start from (overrides page if > 0, use next_offset from previous search)", |
|
|
), |
|
|
gr.Radio( |
|
|
label="Search Type", |
|
|
choices=["text", "news", "images", "videos", "books"], |
|
|
value="text", |
|
|
info="Type of content to search for", |
|
|
), |
|
|
gr.Radio( |
|
|
label="Backend", |
|
|
choices=BACKEND_CHOICES, |
|
|
value="auto", |
|
|
info="Search engine backend or fallback order (auto applies recommended order)", |
|
|
), |
|
|
gr.Radio( |
|
|
label="Date filter", |
|
|
choices=DATE_FILTER_CHOICES, |
|
|
value="any", |
|
|
info="Limit results to: day, week, month, or year (varies by type)", |
|
|
), |
|
|
], |
|
|
outputs=gr.Textbox(label="Search Results", interactive=False, lines=20, max_lines=20), |
|
|
title="Web Search", |
|
|
description=( |
|
|
"<div style=\"text-align:center\">Multi-type web search with readable output format, date detection, and flexible pagination. " |
|
|
"Supports text, news, images, videos, and books. Features smart fallback for news searches and precise offset control.</div>" |
|
|
), |
|
|
api_description=TOOL_SUMMARY, |
|
|
flagging_mode="never", |
|
|
submit_btn="Search", |
|
|
) |
|
|
|
|
|
|
|
|
__all__ = ["Web_Search", "build_interface"] |
|
|
|