callanwu's picture
Update visit.py
7504804 verified
import json
import os
import signal
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Union
import requests
from qwen_agent.tools.base import BaseTool, register_tool
from prompt import EXTRACTOR_PROMPT
from openai import OpenAI
import random
from urllib.parse import urlparse, unquote
import time
import tiktoken
VISIT_SERVER_TIMEOUT = int(os.getenv("VISIT_SERVER_TIMEOUT", 200))
WEBCONTENT_MAXLENGTH = int(os.getenv("WEBCONTENT_MAXLENGTH", 150000))
JINA_API_KEYS = os.getenv("JINA_KEY", "")
@staticmethod
def truncate_to_tokens(text: str, max_tokens: int = 95000) -> str:
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(text)
if len(tokens) <= max_tokens:
return text
truncated_tokens = tokens[:max_tokens]
return encoding.decode(truncated_tokens)
OSS_JSON_FORMAT = """# Response Formats
## visit_content
{"properties":{"rational":{"type":"string","description":"Locate the **specific sections/data** directly related to the user's goal within the webpage content"},"evidence":{"type":"string","description":"Identify and extract the **most relevant information** from the content, never miss any important information, output the **full original context** of the content as far as possible, it can be more than three paragraphs.","summary":{"type":"string","description":"Organize into a concise paragraph with logical flow, prioritizing clarity and judge the contribution of the information to the goal."}}}}"""
@register_tool('visit', allow_overwrite=True)
class Visit(BaseTool):
# The `description` tells the agent the functionality of this tool.
name = 'visit'
description = 'Visit webpage(s) and return the summary of the content.'
# The `parameters` tell the agent what input parameters the tool has.
parameters = {
"type": "object",
"properties": {
"url": {
"type": ["string", "array"],
"items": {
"type": "string"
},
"minItems": 1,
"description": "The URL(s) of the webpage(s) to visit. Can be a single URL or an array of URLs."
},
"goal": {
"type": "string",
"description": "The goal of the visit for webpage(s)."
}
},
"required": ["url", "goal"]
}
# The `call` method is the main function of the tool.
def call(self, params: Union[str, dict], **kwargs) -> str:
try:
params = json.loads(params)
url = params["url"]
goal = params["goal"]
except:
return "[Visit] Invalid request format: Input must be a JSON object containing 'url' and 'goal' fields"
start_time = time.time()
# Create log folder if it doesn't exist
log_folder = "log"
os.makedirs(log_folder, exist_ok=True)
if isinstance(url, str):
response = self.readpage_jina(url, goal)
else:
response = []
assert isinstance(url, List)
start_time = time.time()
for u in url:
if time.time() - start_time > 900:
cur_response = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
cur_response += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"
cur_response += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"
else:
try:
cur_response = self.readpage_jina(u, goal)
except Exception as e:
cur_response = f"Error fetching {u}: {str(e)}"
response.append(cur_response)
response = "\n=======\n".join(response)
print(f'Summary Length {len(response)}; Summary Content {response}')
return response.strip()
def call_server(self, msgs, max_retries=2):
api_key = os.environ.get("API_KEY")
url_llm = os.environ.get("API_BASE")
model_name = "qwen/qwen3-30b-a3b-instruct-2507"
client = OpenAI(
api_key=api_key,
base_url=url_llm,
)
for attempt in range(max_retries):
try:
chat_response = client.chat.completions.create(
model=model_name,
messages=msgs,
temperature=0.7
)
content = chat_response.choices[0].message.content
if content:
try:
json.loads(content)
except:
# extract json from string
left = content.find('{')
right = content.rfind('}')
if left != -1 and right != -1 and left <= right:
content = content[left:right+1]
return content
except Exception as e:
print(e)
if attempt == (max_retries - 1):
return ""
continue
def jina_readpage(self, url: str) -> str:
"""
Read webpage content using Jina service.
Args:
url: The URL to read
goal: The goal/purpose of reading the page
Returns:
str: The webpage content or error message
"""
max_retries = 3
timeout = 50
for attempt in range(max_retries):
headers = {
"Authorization": f"Bearer {JINA_API_KEYS}",
}
# print(headers)
try:
response = requests.get(
f"https://r.jina.ai/{url}",
headers=headers,
timeout=timeout
)
if response.status_code == 200:
webpage_content = response.text
return webpage_content
else:
print(response.text)
raise ValueError("jina readpage error")
except Exception as e:
time.sleep(0.5)
if attempt == max_retries - 1:
return "[visit] Failed to read page."
return "[visit] Failed to read page."
def html_readpage_jina(self, url: str) -> str:
max_attempts = 8
for attempt in range(max_attempts):
content = self.jina_readpage(url)
service = "jina"
print(service)
if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"):
return content
return "[visit] Failed to read page."
def readpage_jina(self, url: str, goal: str) -> str:
"""
Attempt to read webpage content by alternating between jina and aidata services.
Args:
url: The URL to read
goal: The goal/purpose of reading the page
Returns:
str: The webpage content or error message
"""
summary_page_func = self.call_server
max_retries = int(os.getenv('VISIT_SERVER_MAX_RETRIES', 1))
content = self.html_readpage_jina(url)
if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"):
content = truncate_to_tokens(content, max_tokens=95000)
messages = [{"role":"user","content": EXTRACTOR_PROMPT.format(webpage_content=content, goal=goal)}]
parse_retry_times = 0
raw = summary_page_func(messages, max_retries=max_retries)
summary_retries = 3
while len(raw) < 10 and summary_retries >= 0:
truncate_length = int(0.7 * len(content)) if summary_retries > 0 else 25000
status_msg = (
f"[visit] Summary url[{url}] "
f"attempt {3 - summary_retries + 1}/3, "
f"content length: {len(content)}, "
f"truncating to {truncate_length} chars"
) if summary_retries > 0 else (
f"[visit] Summary url[{url}] failed after 3 attempts, "
f"final truncation to 25000 chars"
)
print(status_msg)
content = content[:truncate_length]
extraction_prompt = EXTRACTOR_PROMPT.format(
webpage_content=content,
goal=goal
)
messages = [{"role": "user", "content": extraction_prompt}]
raw = summary_page_func(messages, max_retries=max_retries)
summary_retries -= 1
parse_retry_times = 2
if isinstance(raw, str):
raw = raw.replace("```json", "").replace("```", "").strip()
while parse_retry_times < 3:
try:
raw = json.loads(raw)
break
except:
raw = summary_page_func(messages, max_retries=max_retries)
parse_retry_times += 1
if parse_retry_times >= 3:
useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"
useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"
else:
useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
useful_information += "Evidence in page: \n" + str(raw["evidence"]) + "\n\n"
useful_information += "Summary: \n" + str(raw["summary"]) + "\n\n"
if len(useful_information) < 10 and summary_retries < 0:
print("[visit] Could not generate valid summary after maximum retries")
useful_information = "[visit] Failed to read page"
return useful_information
# If no valid content was obtained after all retries
else:
useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"
useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"
return useful_information
if __name__ == "__main__":
a = Visit()
print(a.call('{"url": ["https://2025.aclweb.org/"], "goal": "Find the important dates page and locate the Industry Track paper submission deadline"}'))