|
|
from __future__ import annotations |
|
|
|
|
|
import re |
|
|
from typing import Annotated, Dict, List, Tuple |
|
|
from urllib.parse import urlparse, urljoin |
|
|
|
|
|
import gradio as gr |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
from markdownify import markdownify as md |
|
|
from readability import Document |
|
|
|
|
|
from app import _fetch_rate_limiter, _log_call_end, _log_call_start, _truncate_for_log |
|
|
from ._docstrings import autodoc |
|
|
|
|
|
|
|
|
|
|
|
TOOL_SUMMARY = ( |
|
|
"Fetch a webpage and return clean Markdown or a list of links, with max length and pagination via offset; " |
|
|
"if truncated, the output includes a notice with next_cursor for exact continuation." |
|
|
) |
|
|
|
|
|
|
|
|
def _http_get_enhanced(url: str, timeout: int | float = 30, *, skip_rate_limit: bool = False) -> requests.Response: |
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
|
"Accept-Language": "en-US,en;q=0.9", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
|
|
"Accept-Encoding": "gzip, deflate, br", |
|
|
"DNT": "1", |
|
|
"Connection": "keep-alive", |
|
|
"Upgrade-Insecure-Requests": "1", |
|
|
} |
|
|
if not skip_rate_limit: |
|
|
_fetch_rate_limiter.acquire() |
|
|
try: |
|
|
response = requests.get( |
|
|
url, |
|
|
headers=headers, |
|
|
timeout=timeout, |
|
|
allow_redirects=True, |
|
|
stream=False, |
|
|
) |
|
|
response.raise_for_status() |
|
|
return response |
|
|
except requests.exceptions.Timeout as exc: |
|
|
raise requests.exceptions.RequestException("Request timed out. The webpage took too long to respond.") from exc |
|
|
except requests.exceptions.ConnectionError as exc: |
|
|
raise requests.exceptions.RequestException("Connection error. Please check the URL and your internet connection.") from exc |
|
|
except requests.exceptions.HTTPError as exc: |
|
|
if response.status_code == 403: |
|
|
raise requests.exceptions.RequestException("Access forbidden. The website may be blocking automated requests.") from exc |
|
|
if response.status_code == 404: |
|
|
raise requests.exceptions.RequestException("Page not found. Please check the URL.") from exc |
|
|
if response.status_code == 429: |
|
|
raise requests.exceptions.RequestException("Rate limited. Please try again in a few minutes.") from exc |
|
|
raise requests.exceptions.RequestException(f"HTTP error {response.status_code}: {exc}") from exc |
|
|
|
|
|
|
|
|
def _normalize_whitespace(text: str) -> str: |
|
|
text = re.sub(r"[ \t\u00A0]+", " ", text) |
|
|
text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text.strip()) |
|
|
return text.strip() |
|
|
|
|
|
|
|
|
def _truncate(text: str, max_chars: int) -> Tuple[str, bool]: |
|
|
if max_chars is None or max_chars <= 0 or len(text) <= max_chars: |
|
|
return text, False |
|
|
return text[:max_chars].rstrip() + " …", True |
|
|
|
|
|
|
|
|
def _shorten(text: str, limit: int) -> str: |
|
|
if limit <= 0 or len(text) <= limit: |
|
|
return text |
|
|
return text[: max(0, limit - 1)].rstrip() + "…" |
|
|
|
|
|
|
|
|
def _domain_of(url: str) -> str: |
|
|
try: |
|
|
return urlparse(url).netloc or "" |
|
|
except Exception: |
|
|
return "" |
|
|
|
|
|
|
|
|
def _extract_links_from_soup(soup: BeautifulSoup, base_url: str) -> str: |
|
|
links = [] |
|
|
for link in soup.find_all("a", href=True): |
|
|
href = link.get("href") |
|
|
text = link.get_text(strip=True) |
|
|
if href.startswith("http"): |
|
|
full_url = href |
|
|
elif href.startswith("//"): |
|
|
full_url = "https:" + href |
|
|
elif href.startswith("/"): |
|
|
full_url = urljoin(base_url, href) |
|
|
else: |
|
|
full_url = urljoin(base_url, href) |
|
|
if text and href not in ["#", "javascript:void(0)"]: |
|
|
links.append(f"- [{text}]({full_url})") |
|
|
if not links: |
|
|
return "No links found on this page." |
|
|
title = soup.find("title") |
|
|
title_text = title.get_text(strip=True) if title else "Links from webpage" |
|
|
return f"# {title_text}\n\n" + "\n".join(links) |
|
|
|
|
|
|
|
|
def _fullpage_markdown_from_soup(full_soup: BeautifulSoup, base_url: str, strip_selectors: str = "") -> str: |
|
|
if strip_selectors: |
|
|
selectors = [s.strip() for s in strip_selectors.split(",") if s.strip()] |
|
|
for selector in selectors: |
|
|
try: |
|
|
for element in full_soup.select(selector): |
|
|
element.decompose() |
|
|
except Exception: |
|
|
continue |
|
|
for element in full_soup.select("script, style, nav, footer, header, aside"): |
|
|
element.decompose() |
|
|
main = ( |
|
|
full_soup.find("main") |
|
|
or full_soup.find("article") |
|
|
or full_soup.find("div", class_=re.compile(r"content|main|post|article", re.I)) |
|
|
or full_soup.find("body") |
|
|
) |
|
|
if not main: |
|
|
return "No main content found on the webpage." |
|
|
markdown_text = md(str(main), heading_style="ATX") |
|
|
markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) |
|
|
markdown_text = re.sub(r"\[\s*\]\([^)]*\)", "", markdown_text) |
|
|
markdown_text = re.sub(r"[ \t]+", " ", markdown_text) |
|
|
markdown_text = markdown_text.strip() |
|
|
title = full_soup.find("title") |
|
|
if title and title.get_text(strip=True): |
|
|
markdown_text = f"# {title.get_text(strip=True)}\n\n{markdown_text}" |
|
|
return markdown_text or "No content could be extracted." |
|
|
|
|
|
|
|
|
def _truncate_markdown(markdown: str, max_chars: int) -> Tuple[str, Dict[str, object]]: |
|
|
total_chars = len(markdown) |
|
|
if total_chars <= max_chars: |
|
|
return markdown, { |
|
|
"truncated": False, |
|
|
"returned_chars": total_chars, |
|
|
"total_chars_estimate": total_chars, |
|
|
"next_cursor": None, |
|
|
} |
|
|
truncated = markdown[:max_chars] |
|
|
last_paragraph = truncated.rfind("\n\n") |
|
|
if last_paragraph > max_chars * 0.7: |
|
|
truncated = truncated[:last_paragraph] |
|
|
cursor_pos = last_paragraph |
|
|
elif "." in truncated[-100:]: |
|
|
last_period = truncated.rfind(".") |
|
|
if last_period > max_chars * 0.8: |
|
|
truncated = truncated[: last_period + 1] |
|
|
cursor_pos = last_period + 1 |
|
|
else: |
|
|
cursor_pos = len(truncated) |
|
|
else: |
|
|
cursor_pos = len(truncated) |
|
|
metadata = { |
|
|
"truncated": True, |
|
|
"returned_chars": len(truncated), |
|
|
"total_chars_estimate": total_chars, |
|
|
"next_cursor": cursor_pos, |
|
|
} |
|
|
truncated = truncated.rstrip() |
|
|
truncation_notice = ( |
|
|
"\n\n---\n" |
|
|
f"**Content Truncated:** Showing {metadata['returned_chars']:,} of {metadata['total_chars_estimate']:,} characters " |
|
|
f"({(metadata['returned_chars']/metadata['total_chars_estimate']*100):.1f}%)\n" |
|
|
f"**Next cursor:** {metadata['next_cursor']} (use this value with offset parameter for continuation)\n" |
|
|
"---" |
|
|
) |
|
|
return truncated + truncation_notice, metadata |
|
|
|
|
|
|
|
|
@autodoc(summary=TOOL_SUMMARY) |
|
|
def Web_Fetch( |
|
|
url: Annotated[str, "The absolute URL to fetch (must return HTML)."], |
|
|
max_chars: Annotated[int, "Maximum characters to return (0 = no limit, full page content)."] = 3000, |
|
|
strip_selectors: Annotated[str, "CSS selectors to remove (comma-separated, e.g., '.header, .footer, nav')."] = "", |
|
|
url_scraper: Annotated[bool, "Extract only links from the page instead of content."] = False, |
|
|
offset: Annotated[int, "Character offset to start from (for pagination, use next_cursor from previous call)."] = 0, |
|
|
) -> str: |
|
|
_log_call_start( |
|
|
"Web_Fetch", |
|
|
url=url, |
|
|
max_chars=max_chars, |
|
|
strip_selectors=strip_selectors, |
|
|
url_scraper=url_scraper, |
|
|
offset=offset, |
|
|
) |
|
|
if not url or not url.strip(): |
|
|
result = "Please enter a valid URL." |
|
|
_log_call_end("Web_Fetch", _truncate_for_log(result)) |
|
|
return result |
|
|
try: |
|
|
resp = _http_get_enhanced(url) |
|
|
resp.raise_for_status() |
|
|
except requests.exceptions.RequestException as exc: |
|
|
result = f"An error occurred: {exc}" |
|
|
_log_call_end("Web_Fetch", _truncate_for_log(result)) |
|
|
return result |
|
|
final_url = str(resp.url) |
|
|
ctype = resp.headers.get("Content-Type", "") |
|
|
if "html" not in ctype.lower(): |
|
|
result = f"Unsupported content type for extraction: {ctype or 'unknown'}" |
|
|
_log_call_end("Web_Fetch", _truncate_for_log(result)) |
|
|
return result |
|
|
resp.encoding = resp.encoding or resp.apparent_encoding |
|
|
html = resp.text |
|
|
full_soup = BeautifulSoup(html, "lxml") |
|
|
if url_scraper: |
|
|
result = _extract_links_from_soup(full_soup, final_url) |
|
|
if offset > 0: |
|
|
result = result[offset:] |
|
|
if max_chars > 0 and len(result) > max_chars: |
|
|
result, _ = _truncate_markdown(result, max_chars) |
|
|
else: |
|
|
full_result = _fullpage_markdown_from_soup(full_soup, final_url, strip_selectors) |
|
|
if offset > 0: |
|
|
if offset >= len(full_result): |
|
|
result = ( |
|
|
f"Offset {offset} exceeds content length ({len(full_result)} characters). " |
|
|
f"Content ends at position {len(full_result)}." |
|
|
) |
|
|
_log_call_end("Web_Fetch", _truncate_for_log(result)) |
|
|
return result |
|
|
result = full_result[offset:] |
|
|
else: |
|
|
result = full_result |
|
|
if max_chars > 0 and len(result) > max_chars: |
|
|
result, metadata = _truncate_markdown(result, max_chars) |
|
|
if offset > 0: |
|
|
metadata["total_chars_estimate"] = len(full_result) |
|
|
metadata["next_cursor"] = offset + metadata["next_cursor"] if metadata["next_cursor"] else None |
|
|
_log_call_end("Web_Fetch", f"chars={len(result)}, url_scraper={url_scraper}, offset={offset}") |
|
|
return result |
|
|
|
|
|
|
|
|
def build_interface() -> gr.Interface: |
|
|
return gr.Interface( |
|
|
fn=Web_Fetch, |
|
|
inputs=[ |
|
|
gr.Textbox(label="URL", placeholder="https://example.com/article", max_lines=1), |
|
|
gr.Slider(minimum=0, maximum=20000, value=3000, step=100, label="Max Characters", info="0 = no limit (full page), default 3000"), |
|
|
gr.Textbox( |
|
|
label="Strip Selectors", |
|
|
placeholder=".header, .footer, nav, .sidebar", |
|
|
value="", |
|
|
max_lines=1, |
|
|
info="CSS selectors to remove (comma-separated)", |
|
|
), |
|
|
gr.Checkbox(label="URL Scraper", value=False, info="Extract only links instead of content"), |
|
|
gr.Slider( |
|
|
minimum=0, |
|
|
maximum=100000, |
|
|
value=0, |
|
|
step=100, |
|
|
label="Offset", |
|
|
info="Character offset to start from (use next_cursor from previous call for pagination)", |
|
|
), |
|
|
], |
|
|
outputs=gr.Markdown(label="Extracted Content"), |
|
|
title="Web Fetch", |
|
|
description=( |
|
|
"<div style=\"text-align:center\">Convert any webpage to clean Markdown format with precision controls, " |
|
|
"or extract all links. Supports custom element removal, length limits, and pagination with offset.</div>" |
|
|
), |
|
|
api_description=TOOL_SUMMARY, |
|
|
flagging_mode="never", |
|
|
) |
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"Web_Fetch", |
|
|
"build_interface", |
|
|
"_http_get_enhanced", |
|
|
"_fullpage_markdown_from_soup", |
|
|
] |
|
|
|