import json import os import signal import threading from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Union import requests from qwen_agent.tools.base import BaseTool, register_tool from prompt import EXTRACTOR_PROMPT from openai import OpenAI import random from urllib.parse import urlparse, unquote import time import tiktoken VISIT_SERVER_TIMEOUT = int(os.getenv("VISIT_SERVER_TIMEOUT", 200)) WEBCONTENT_MAXLENGTH = int(os.getenv("WEBCONTENT_MAXLENGTH", 150000)) JINA_API_KEYS = os.getenv("JINA_KEY", "") @staticmethod def truncate_to_tokens(text: str, max_tokens: int = 95000) -> str: encoding = tiktoken.get_encoding("cl100k_base") tokens = encoding.encode(text) if len(tokens) <= max_tokens: return text truncated_tokens = tokens[:max_tokens] return encoding.decode(truncated_tokens) OSS_JSON_FORMAT = """# Response Formats ## visit_content {"properties":{"rational":{"type":"string","description":"Locate the **specific sections/data** directly related to the user's goal within the webpage content"},"evidence":{"type":"string","description":"Identify and extract the **most relevant information** from the content, never miss any important information, output the **full original context** of the content as far as possible, it can be more than three paragraphs.","summary":{"type":"string","description":"Organize into a concise paragraph with logical flow, prioritizing clarity and judge the contribution of the information to the goal."}}}}""" @register_tool('visit', allow_overwrite=True) class Visit(BaseTool): # The `description` tells the agent the functionality of this tool. name = 'visit' description = 'Visit webpage(s) and return the summary of the content.' # The `parameters` tell the agent what input parameters the tool has. parameters = { "type": "object", "properties": { "url": { "type": ["string", "array"], "items": { "type": "string" }, "minItems": 1, "description": "The URL(s) of the webpage(s) to visit. Can be a single URL or an array of URLs." }, "goal": { "type": "string", "description": "The goal of the visit for webpage(s)." } }, "required": ["url", "goal"] } # The `call` method is the main function of the tool. def call(self, params: Union[str, dict], **kwargs) -> str: try: params = json.loads(params) url = params["url"] goal = params["goal"] except: return "[Visit] Invalid request format: Input must be a JSON object containing 'url' and 'goal' fields" start_time = time.time() # Create log folder if it doesn't exist log_folder = "log" os.makedirs(log_folder, exist_ok=True) if isinstance(url, str): response = self.readpage_jina(url, goal) else: response = [] assert isinstance(url, List) start_time = time.time() for u in url: if time.time() - start_time > 900: cur_response = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal) cur_response += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n" cur_response += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n" else: try: cur_response = self.readpage_jina(u, goal) except Exception as e: cur_response = f"Error fetching {u}: {str(e)}" response.append(cur_response) response = "\n=======\n".join(response) print(f'Summary Length {len(response)}; Summary Content {response}') return response.strip() def call_server(self, msgs, max_retries=2): api_key = os.environ.get("API_KEY") url_llm = os.environ.get("API_BASE") model_name = "qwen/qwen3-30b-a3b-instruct-2507" client = OpenAI( api_key=api_key, base_url=url_llm, ) for attempt in range(max_retries): try: chat_response = client.chat.completions.create( model=model_name, messages=msgs, temperature=0.7 ) content = chat_response.choices[0].message.content if content: try: json.loads(content) except: # extract json from string left = content.find('{') right = content.rfind('}') if left != -1 and right != -1 and left <= right: content = content[left:right+1] return content except Exception as e: print(e) if attempt == (max_retries - 1): return "" continue def jina_readpage(self, url: str) -> str: """ Read webpage content using Jina service. Args: url: The URL to read goal: The goal/purpose of reading the page Returns: str: The webpage content or error message """ max_retries = 3 timeout = 50 for attempt in range(max_retries): headers = { "Authorization": f"Bearer {JINA_API_KEYS}", } # print(headers) try: response = requests.get( f"https://r.jina.ai/{url}", headers=headers, timeout=timeout ) if response.status_code == 200: webpage_content = response.text return webpage_content else: print(response.text) raise ValueError("jina readpage error") except Exception as e: time.sleep(0.5) if attempt == max_retries - 1: return "[visit] Failed to read page." return "[visit] Failed to read page." def html_readpage_jina(self, url: str) -> str: max_attempts = 8 for attempt in range(max_attempts): content = self.jina_readpage(url) service = "jina" print(service) if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"): return content return "[visit] Failed to read page." def readpage_jina(self, url: str, goal: str) -> str: """ Attempt to read webpage content by alternating between jina and aidata services. Args: url: The URL to read goal: The goal/purpose of reading the page Returns: str: The webpage content or error message """ summary_page_func = self.call_server max_retries = int(os.getenv('VISIT_SERVER_MAX_RETRIES', 1)) content = self.html_readpage_jina(url) if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"): content = truncate_to_tokens(content, max_tokens=95000) messages = [{"role":"user","content": EXTRACTOR_PROMPT.format(webpage_content=content, goal=goal)}] parse_retry_times = 0 raw = summary_page_func(messages, max_retries=max_retries) summary_retries = 3 while len(raw) < 10 and summary_retries >= 0: truncate_length = int(0.7 * len(content)) if summary_retries > 0 else 25000 status_msg = ( f"[visit] Summary url[{url}] " f"attempt {3 - summary_retries + 1}/3, " f"content length: {len(content)}, " f"truncating to {truncate_length} chars" ) if summary_retries > 0 else ( f"[visit] Summary url[{url}] failed after 3 attempts, " f"final truncation to 25000 chars" ) print(status_msg) content = content[:truncate_length] extraction_prompt = EXTRACTOR_PROMPT.format( webpage_content=content, goal=goal ) messages = [{"role": "user", "content": extraction_prompt}] raw = summary_page_func(messages, max_retries=max_retries) summary_retries -= 1 parse_retry_times = 2 if isinstance(raw, str): raw = raw.replace("```json", "").replace("```", "").strip() while parse_retry_times < 3: try: raw = json.loads(raw) break except: raw = summary_page_func(messages, max_retries=max_retries) parse_retry_times += 1 if parse_retry_times >= 3: useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal) useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n" useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n" else: useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal) useful_information += "Evidence in page: \n" + str(raw["evidence"]) + "\n\n" useful_information += "Summary: \n" + str(raw["summary"]) + "\n\n" if len(useful_information) < 10 and summary_retries < 0: print("[visit] Could not generate valid summary after maximum retries") useful_information = "[visit] Failed to read page" return useful_information # If no valid content was obtained after all retries else: useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal) useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n" useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n" return useful_information if __name__ == "__main__": a = Visit() print(a.call('{"url": ["https://2025.aclweb.org/"], "goal": "Find the important dates page and locate the Industry Track paper submission deadline"}'))