| from __future__ import annotations | |
| from typing import Annotated, List | |
| import gradio as gr | |
| from ddgs import DDGS | |
| from app import _log_call_end, _log_call_start, _search_rate_limiter, _truncate_for_log | |
| def _extract_date_from_snippet(snippet: str) -> str: | |
| if not snippet: | |
| return "" | |
| import re | |
| date_patterns = [ | |
| r"\b(\d{4}[-/]\d{1,2}[-/]\d{1,2})\b", | |
| r"\b([A-Za-z]{3,9}\s+\d{1,2},?\s+\d{4})\b", | |
| r"\b(\d{1,2}\s+[A-Za-z]{3,9}\s+\d{4})\b", | |
| r"\b(\d+\s+(?:day|week|month|year)s?\s+ago)\b", | |
| r"(?:Published|Updated|Posted):\s*([^,\n]+?)(?:[,\n]|$)", | |
| ] | |
| for pattern in date_patterns: | |
| matches = re.findall(pattern, snippet, re.IGNORECASE) | |
| if matches: | |
| return matches[0].strip() | |
| return "" | |
| def _format_search_result(result: dict, search_type: str, index: int) -> List[str]: | |
| lines: List[str] = [] | |
| if search_type == "text": | |
| title = result.get("title", "").strip() | |
| url = result.get("href", "").strip() | |
| snippet = result.get("body", "").strip() | |
| date = _extract_date_from_snippet(snippet) | |
| lines.append(f"{index}. {title}") | |
| lines.append(f" URL: {url}") | |
| if snippet: | |
| lines.append(f" Summary: {snippet}") | |
| if date: | |
| lines.append(f" Date: {date}") | |
| elif search_type == "news": | |
| title = result.get("title", "").strip() | |
| url = result.get("url", "").strip() | |
| body = result.get("body", "").strip() | |
| date = result.get("date", "").strip() | |
| source = result.get("source", "").strip() | |
| lines.append(f"{index}. {title}") | |
| lines.append(f" URL: {url}") | |
| if source: | |
| lines.append(f" Source: {source}") | |
| if date: | |
| lines.append(f" Date: {date}") | |
| if body: | |
| lines.append(f" Summary: {body}") | |
| elif search_type == "images": | |
| title = result.get("title", "").strip() | |
| image_url = result.get("image", "").strip() | |
| source_url = result.get("url", "").strip() | |
| source = result.get("source", "").strip() | |
| width = result.get("width", "") | |
| height = result.get("height", "") | |
| lines.append(f"{index}. {title}") | |
| lines.append(f" Image: {image_url}") | |
| lines.append(f" Source: {source_url}") | |
| if source: | |
| lines.append(f" Publisher: {source}") | |
| if width and height: | |
| lines.append(f" Dimensions: {width}x{height}") | |
| elif search_type == "videos": | |
| title = result.get("title", "").strip() | |
| description = result.get("description", "").strip() | |
| duration = result.get("duration", "").strip() | |
| published = result.get("published", "").strip() | |
| uploader = result.get("uploader", "").strip() | |
| embed_url = result.get("embed_url", "").strip() | |
| lines.append(f"{index}. {title}") | |
| if embed_url: | |
| lines.append(f" Video: {embed_url}") | |
| if uploader: | |
| lines.append(f" Uploader: {uploader}") | |
| if duration: | |
| lines.append(f" Duration: {duration}") | |
| if published: | |
| lines.append(f" Published: {published}") | |
| if description: | |
| lines.append(f" Description: {description}") | |
| elif search_type == "books": | |
| title = result.get("title", "").strip() | |
| url = result.get("url", "").strip() | |
| body = result.get("body", "").strip() | |
| lines.append(f"{index}. {title}") | |
| lines.append(f" URL: {url}") | |
| if body: | |
| lines.append(f" Description: {body}") | |
| return lines | |
| def Web_Search( | |
| query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."], | |
| max_results: Annotated[int, "Number of results to return (1–20)."] = 5, | |
| page: Annotated[int, "Page number for pagination (1-based, each page contains max_results items)."] = 1, | |
| search_type: Annotated[str, "Type of search: 'text' (web pages), 'news', 'images', 'videos', or 'books'."] = "text", | |
| offset: Annotated[int, "Result offset to start from (overrides page if > 0, for precise continuation)."] = 0, | |
| ) -> str: | |
| _log_call_start("Web_Search", query=query, max_results=max_results, page=page, search_type=search_type, offset=offset) | |
| if not query or not query.strip(): | |
| result = "No search query provided. Please enter a search term." | |
| _log_call_end("Web_Search", _truncate_for_log(result)) | |
| return result | |
| max_results = max(1, min(20, max_results)) | |
| page = max(1, page) | |
| offset = max(0, offset) | |
| valid_types = ["text", "news", "images", "videos", "books"] | |
| if search_type not in valid_types: | |
| search_type = "text" | |
| if offset > 0: | |
| actual_offset = offset | |
| calculated_page = (offset // max_results) + 1 | |
| else: | |
| actual_offset = (page - 1) * max_results | |
| calculated_page = page | |
| total_needed = actual_offset + max_results | |
| used_fallback = False | |
| original_search_type = search_type | |
| def _perform_search(stype: str) -> list[dict]: | |
| try: | |
| _search_rate_limiter.acquire() | |
| with DDGS() as ddgs: | |
| if stype == "text": | |
| raw_gen = ddgs.text(query, max_results=total_needed + 10) | |
| elif stype == "news": | |
| raw_gen = ddgs.news(query, max_results=total_needed + 10) | |
| elif stype == "images": | |
| raw_gen = ddgs.images(query, max_results=total_needed + 10) | |
| elif stype == "videos": | |
| raw_gen = ddgs.videos(query, max_results=total_needed + 10) | |
| else: | |
| raw_gen = ddgs.books(query, max_results=total_needed + 10) | |
| try: | |
| return list(raw_gen) | |
| except Exception as inner_exc: | |
| if "no results" in str(inner_exc).lower() or "not found" in str(inner_exc).lower(): | |
| return [] | |
| raise inner_exc | |
| except Exception as exc: | |
| error_msg = f"Search failed: {str(exc)[:200]}" | |
| lowered = str(exc).lower() | |
| if "blocked" in lowered or "rate" in lowered: | |
| error_msg = "Search temporarily blocked due to rate limiting. Please try again in a few minutes." | |
| elif "timeout" in lowered: | |
| error_msg = "Search timed out. Please try again with a simpler query." | |
| elif "network" in lowered or "connection" in lowered: | |
| error_msg = "Network connection error. Please check your internet connection and try again." | |
| elif "no results" in lowered or "not found" in lowered: | |
| return [] | |
| raise Exception(error_msg) | |
| try: | |
| raw = _perform_search(search_type) | |
| except Exception as exc: | |
| result = f"Error: {exc}" | |
| _log_call_end("Web_Search", _truncate_for_log(result)) | |
| return result | |
| if not raw and search_type == "news": | |
| try: | |
| raw = _perform_search("text") | |
| if raw: | |
| used_fallback = True | |
| search_type = "text" | |
| except Exception: | |
| pass | |
| if not raw: | |
| fallback_note = " (also tried 'text' search as fallback)" if original_search_type == "news" and used_fallback else "" | |
| result = f"No {original_search_type} results found for query: {query}{fallback_note}" | |
| _log_call_end("Web_Search", _truncate_for_log(result)) | |
| return result | |
| paginated_results = raw[actual_offset: actual_offset + max_results] | |
| if not paginated_results: | |
| if actual_offset >= len(raw): | |
| result = f"Offset {actual_offset} exceeds available results ({len(raw)} total). Try offset=0 to start from beginning." | |
| else: | |
| result = f"No {original_search_type} results found on page {calculated_page} for query: {query}. Try page 1 or reduce page number." | |
| _log_call_end("Web_Search", _truncate_for_log(result)) | |
| return result | |
| total_available = len(raw) | |
| start_num = actual_offset + 1 | |
| end_num = actual_offset + len(paginated_results) | |
| next_offset = actual_offset + len(paginated_results) | |
| search_label = original_search_type.title() | |
| if used_fallback: | |
| search_label += " → Text (Smart Fallback)" | |
| pagination_info = f"Page {calculated_page}" | |
| if offset > 0: | |
| pagination_info = f"Offset {actual_offset} (≈ {pagination_info})" | |
| lines = [f"{search_label} search results for: {query}"] | |
| if used_fallback: | |
| lines.append("📍 Note: News search returned no results, automatically searched general web content instead") | |
| lines.append(f"{pagination_info} (results {start_num}-{end_num} of ~{total_available}+ available)\n") | |
| for i, result in enumerate(paginated_results, start_num): | |
| result_lines = _format_search_result(result, search_type, i) | |
| lines.extend(result_lines) | |
| lines.append("") | |
| if total_available > end_num: | |
| lines.append("💡 More results available:") | |
| lines.append(f" • Next page: page={calculated_page + 1}") | |
| lines.append(f" • Next offset: offset={next_offset}") | |
| lines.append(f" • Use offset={next_offset} to continue exactly from result {next_offset + 1}") | |
| result = "\n".join(lines) | |
| search_info = f"type={original_search_type}" | |
| if used_fallback: | |
| search_info += "→text" | |
| _log_call_end("Web_Search", f"{search_info} page={calculated_page} offset={actual_offset} results={len(paginated_results)} chars={len(result)}") | |
| return result | |
| def build_interface() -> gr.Interface: | |
| return gr.Interface( | |
| fn=Web_Search, | |
| inputs=[ | |
| gr.Textbox(label="Query", placeholder="topic OR site:example.com", max_lines=1), | |
| gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results"), | |
| gr.Slider(minimum=1, maximum=10, value=1, step=1, label="Page", info="Page number for pagination (ignored if offset > 0)"), | |
| gr.Radio( | |
| label="Search Type", | |
| choices=["text", "news", "images", "videos", "books"], | |
| value="text", | |
| info="Type of content to search for", | |
| ), | |
| gr.Slider( | |
| minimum=0, | |
| maximum=1000, | |
| value=0, | |
| step=1, | |
| label="Offset", | |
| info="Result offset to start from (overrides page if > 0, use next_offset from previous search)", | |
| ), | |
| ], | |
| outputs=gr.Textbox(label="Search Results", interactive=False, lines=20, max_lines=20), | |
| title="Web Search", | |
| description=( | |
| "<div style=\"text-align:center\">Multi-type web search with readable output format, date detection, and flexible pagination. " | |
| "Supports text, news, images, videos, and books. Features smart fallback for news searches and precise offset control.</div>" | |
| ), | |
| api_description=( | |
| "Run a web search (DuckDuckGo backend) with support for multiple content types and return formatted results. " | |
| "Features smart fallback: if 'news' search returns no results, automatically retries with 'text' search " | |
| "to catch sources like Hacker News that might not appear in news-specific results. " | |
| "Supports advanced search operators: site: for specific domains, quotes for exact phrases, " | |
| "OR for alternatives, and - to exclude terms. Examples: 'Python programming', 'site:example.com', " | |
| "'\"artificial intelligence\"', 'cats -dogs', 'Python OR JavaScript'. " | |
| "Parameters: query (str), max_results (int, 1-20), page (int, 1-based pagination), " | |
| "search_type (str: text/news/images/videos/books), offset (int, result offset for precise continuation). " | |
| "If offset > 0, it overrides the page parameter. Returns appropriately formatted results with metadata, " | |
| "pagination hints, and next_offset information for each content type." | |
| ), | |
| flagging_mode="never", | |
| submit_btn="Search", | |
| ) | |
| __all__ = ["Web_Search", "build_interface"] | |