| from __future__ import annotations | |
| import re | |
| from typing import Annotated, Dict, List, Tuple | |
| from urllib.parse import urlparse, urljoin | |
| import gradio as gr | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from markdownify import markdownify as md | |
| from readability import Document | |
| from app import _fetch_rate_limiter, _log_call_end, _log_call_start, _truncate_for_log | |
| def _http_get_enhanced(url: str, timeout: int | float = 30, *, skip_rate_limit: bool = False) -> requests.Response: | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "DNT": "1", | |
| "Connection": "keep-alive", | |
| "Upgrade-Insecure-Requests": "1", | |
| } | |
| if not skip_rate_limit: | |
| _fetch_rate_limiter.acquire() | |
| try: | |
| response = requests.get( | |
| url, | |
| headers=headers, | |
| timeout=timeout, | |
| allow_redirects=True, | |
| stream=False, | |
| ) | |
| response.raise_for_status() | |
| return response | |
| except requests.exceptions.Timeout as exc: | |
| raise requests.exceptions.RequestException("Request timed out. The webpage took too long to respond.") from exc | |
| except requests.exceptions.ConnectionError as exc: | |
| raise requests.exceptions.RequestException("Connection error. Please check the URL and your internet connection.") from exc | |
| except requests.exceptions.HTTPError as exc: | |
| if response.status_code == 403: | |
| raise requests.exceptions.RequestException("Access forbidden. The website may be blocking automated requests.") from exc | |
| if response.status_code == 404: | |
| raise requests.exceptions.RequestException("Page not found. Please check the URL.") from exc | |
| if response.status_code == 429: | |
| raise requests.exceptions.RequestException("Rate limited. Please try again in a few minutes.") from exc | |
| raise requests.exceptions.RequestException(f"HTTP error {response.status_code}: {exc}") from exc | |
| def _normalize_whitespace(text: str) -> str: | |
| text = re.sub(r"[ \t\u00A0]+", " ", text) | |
| text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text.strip()) | |
| return text.strip() | |
| def _truncate(text: str, max_chars: int) -> Tuple[str, bool]: | |
| if max_chars is None or max_chars <= 0 or len(text) <= max_chars: | |
| return text, False | |
| return text[:max_chars].rstrip() + " …", True | |
| def _shorten(text: str, limit: int) -> str: | |
| if limit <= 0 or len(text) <= limit: | |
| return text | |
| return text[: max(0, limit - 1)].rstrip() + "…" | |
| def _domain_of(url: str) -> str: | |
| try: | |
| return urlparse(url).netloc or "" | |
| except Exception: | |
| return "" | |
| def _extract_links_from_soup(soup: BeautifulSoup, base_url: str) -> str: | |
| links = [] | |
| for link in soup.find_all("a", href=True): | |
| href = link.get("href") | |
| text = link.get_text(strip=True) | |
| if href.startswith("http"): | |
| full_url = href | |
| elif href.startswith("//"): | |
| full_url = "https:" + href | |
| elif href.startswith("/"): | |
| full_url = urljoin(base_url, href) | |
| else: | |
| full_url = urljoin(base_url, href) | |
| if text and href not in ["#", "javascript:void(0)"]: | |
| links.append(f"- [{text}]({full_url})") | |
| if not links: | |
| return "No links found on this page." | |
| title = soup.find("title") | |
| title_text = title.get_text(strip=True) if title else "Links from webpage" | |
| return f"# {title_text}\n\n" + "\n".join(links) | |
| def _fullpage_markdown_from_soup(full_soup: BeautifulSoup, base_url: str, strip_selectors: str = "") -> str: | |
| if strip_selectors: | |
| selectors = [s.strip() for s in strip_selectors.split(",") if s.strip()] | |
| for selector in selectors: | |
| try: | |
| for element in full_soup.select(selector): | |
| element.decompose() | |
| except Exception: | |
| continue | |
| for element in full_soup.select("script, style, nav, footer, header, aside"): | |
| element.decompose() | |
| main = ( | |
| full_soup.find("main") | |
| or full_soup.find("article") | |
| or full_soup.find("div", class_=re.compile(r"content|main|post|article", re.I)) | |
| or full_soup.find("body") | |
| ) | |
| if not main: | |
| return "No main content found on the webpage." | |
| markdown_text = md(str(main), heading_style="ATX") | |
| markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) | |
| markdown_text = re.sub(r"\[\s*\]\([^)]*\)", "", markdown_text) | |
| markdown_text = re.sub(r"[ \t]+", " ", markdown_text) | |
| markdown_text = markdown_text.strip() | |
| title = full_soup.find("title") | |
| if title and title.get_text(strip=True): | |
| markdown_text = f"# {title.get_text(strip=True)}\n\n{markdown_text}" | |
| return markdown_text or "No content could be extracted." | |
| def _truncate_markdown(markdown: str, max_chars: int) -> Tuple[str, Dict[str, object]]: | |
| total_chars = len(markdown) | |
| if total_chars <= max_chars: | |
| return markdown, { | |
| "truncated": False, | |
| "returned_chars": total_chars, | |
| "total_chars_estimate": total_chars, | |
| "next_cursor": None, | |
| } | |
| truncated = markdown[:max_chars] | |
| last_paragraph = truncated.rfind("\n\n") | |
| if last_paragraph > max_chars * 0.7: | |
| truncated = truncated[:last_paragraph] | |
| cursor_pos = last_paragraph | |
| elif "." in truncated[-100:]: | |
| last_period = truncated.rfind(".") | |
| if last_period > max_chars * 0.8: | |
| truncated = truncated[: last_period + 1] | |
| cursor_pos = last_period + 1 | |
| else: | |
| cursor_pos = len(truncated) | |
| else: | |
| cursor_pos = len(truncated) | |
| metadata = { | |
| "truncated": True, | |
| "returned_chars": len(truncated), | |
| "total_chars_estimate": total_chars, | |
| "next_cursor": cursor_pos, | |
| } | |
| truncated = truncated.rstrip() | |
| truncation_notice = ( | |
| "\n\n---\n" | |
| f"**Content Truncated:** Showing {metadata['returned_chars']:,} of {metadata['total_chars_estimate']:,} characters " | |
| f"({(metadata['returned_chars']/metadata['total_chars_estimate']*100):.1f}%)\n" | |
| f"**Next cursor:** {metadata['next_cursor']} (use this value with offset parameter for continuation)\n" | |
| "---" | |
| ) | |
| return truncated + truncation_notice, metadata | |
| def Web_Fetch( | |
| url: Annotated[str, "The absolute URL to fetch (must return HTML)."], | |
| max_chars: Annotated[int, "Maximum characters to return (0 = no limit, full page content)."] = 3000, | |
| strip_selectors: Annotated[str, "CSS selectors to remove (comma-separated, e.g., '.header, .footer, nav')."] = "", | |
| url_scraper: Annotated[bool, "Extract only links from the page instead of content."] = False, | |
| offset: Annotated[int, "Character offset to start from (for pagination, use next_cursor from previous call)."] = 0, | |
| ) -> str: | |
| _log_call_start( | |
| "Web_Fetch", | |
| url=url, | |
| max_chars=max_chars, | |
| strip_selectors=strip_selectors, | |
| url_scraper=url_scraper, | |
| offset=offset, | |
| ) | |
| if not url or not url.strip(): | |
| result = "Please enter a valid URL." | |
| _log_call_end("Web_Fetch", _truncate_for_log(result)) | |
| return result | |
| try: | |
| resp = _http_get_enhanced(url) | |
| resp.raise_for_status() | |
| except requests.exceptions.RequestException as exc: | |
| result = f"An error occurred: {exc}" | |
| _log_call_end("Web_Fetch", _truncate_for_log(result)) | |
| return result | |
| final_url = str(resp.url) | |
| ctype = resp.headers.get("Content-Type", "") | |
| if "html" not in ctype.lower(): | |
| result = f"Unsupported content type for extraction: {ctype or 'unknown'}" | |
| _log_call_end("Web_Fetch", _truncate_for_log(result)) | |
| return result | |
| resp.encoding = resp.encoding or resp.apparent_encoding | |
| html = resp.text | |
| full_soup = BeautifulSoup(html, "lxml") | |
| if url_scraper: | |
| result = _extract_links_from_soup(full_soup, final_url) | |
| if offset > 0: | |
| result = result[offset:] | |
| if max_chars > 0 and len(result) > max_chars: | |
| result, _ = _truncate_markdown(result, max_chars) | |
| else: | |
| full_result = _fullpage_markdown_from_soup(full_soup, final_url, strip_selectors) | |
| if offset > 0: | |
| if offset >= len(full_result): | |
| result = ( | |
| f"Offset {offset} exceeds content length ({len(full_result)} characters). " | |
| f"Content ends at position {len(full_result)}." | |
| ) | |
| _log_call_end("Web_Fetch", _truncate_for_log(result)) | |
| return result | |
| result = full_result[offset:] | |
| else: | |
| result = full_result | |
| if max_chars > 0 and len(result) > max_chars: | |
| result, metadata = _truncate_markdown(result, max_chars) | |
| if offset > 0: | |
| metadata["total_chars_estimate"] = len(full_result) | |
| metadata["next_cursor"] = offset + metadata["next_cursor"] if metadata["next_cursor"] else None | |
| _log_call_end("Web_Fetch", f"chars={len(result)}, url_scraper={url_scraper}, offset={offset}") | |
| return result | |
| def build_interface() -> gr.Interface: | |
| return gr.Interface( | |
| fn=Web_Fetch, | |
| inputs=[ | |
| gr.Textbox(label="URL", placeholder="https://example.com/article", max_lines=1), | |
| gr.Slider(minimum=0, maximum=20000, value=3000, step=100, label="Max Characters", info="0 = no limit (full page), default 3000"), | |
| gr.Textbox( | |
| label="Strip Selectors", | |
| placeholder=".header, .footer, nav, .sidebar", | |
| value="", | |
| max_lines=1, | |
| info="CSS selectors to remove (comma-separated)", | |
| ), | |
| gr.Checkbox(label="URL Scraper", value=False, info="Extract only links instead of content"), | |
| gr.Slider( | |
| minimum=0, | |
| maximum=100000, | |
| value=0, | |
| step=100, | |
| label="Offset", | |
| info="Character offset to start from (use next_cursor from previous call for pagination)", | |
| ), | |
| ], | |
| outputs=gr.Markdown(label="Extracted Content"), | |
| title="Web Fetch", | |
| description=( | |
| "<div style=\"text-align:center\">Convert any webpage to clean Markdown format with precision controls, " | |
| "or extract all links. Supports custom element removal, length limits, and pagination with offset.</div>" | |
| ), | |
| api_description=( | |
| "Fetch a web page and return it converted to Markdown format or extract links with configurable options. " | |
| "Includes enhanced truncation with detailed metadata and pagination support via offset parameter. " | |
| "Parameters: url (str - absolute URL), max_chars (int - 0=no limit, default 3000), " | |
| "strip_selectors (str - CSS selectors to remove, comma-separated), " | |
| "url_scraper (bool - extract only links instead of content, default False), " | |
| "offset (int - character offset for pagination, use next_cursor from previous call). " | |
| "When content is truncated, returns detailed metadata including truncated status, character counts, " | |
| "and next_cursor for continuation. When url_scraper=True, returns formatted list of all links found on the page." | |
| ), | |
| flagging_mode="never", | |
| ) | |
| __all__ = [ | |
| "Web_Fetch", | |
| "build_interface", | |
| "_http_get_enhanced", | |
| "_fullpage_markdown_from_soup", | |
| ] | |