File size: 11,405 Bytes
97c8e77 7504804 97c8e77 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
import json
import os
import signal
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Union
import requests
from qwen_agent.tools.base import BaseTool, register_tool
from prompt import EXTRACTOR_PROMPT
from openai import OpenAI
import random
from urllib.parse import urlparse, unquote
import time
import tiktoken
VISIT_SERVER_TIMEOUT = int(os.getenv("VISIT_SERVER_TIMEOUT", 200))
WEBCONTENT_MAXLENGTH = int(os.getenv("WEBCONTENT_MAXLENGTH", 150000))
JINA_API_KEYS = os.getenv("JINA_KEY", "")
@staticmethod
def truncate_to_tokens(text: str, max_tokens: int = 95000) -> str:
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(text)
if len(tokens) <= max_tokens:
return text
truncated_tokens = tokens[:max_tokens]
return encoding.decode(truncated_tokens)
OSS_JSON_FORMAT = """# Response Formats
## visit_content
{"properties":{"rational":{"type":"string","description":"Locate the **specific sections/data** directly related to the user's goal within the webpage content"},"evidence":{"type":"string","description":"Identify and extract the **most relevant information** from the content, never miss any important information, output the **full original context** of the content as far as possible, it can be more than three paragraphs.","summary":{"type":"string","description":"Organize into a concise paragraph with logical flow, prioritizing clarity and judge the contribution of the information to the goal."}}}}"""
@register_tool('visit', allow_overwrite=True)
class Visit(BaseTool):
# The `description` tells the agent the functionality of this tool.
name = 'visit'
description = 'Visit webpage(s) and return the summary of the content.'
# The `parameters` tell the agent what input parameters the tool has.
parameters = {
"type": "object",
"properties": {
"url": {
"type": ["string", "array"],
"items": {
"type": "string"
},
"minItems": 1,
"description": "The URL(s) of the webpage(s) to visit. Can be a single URL or an array of URLs."
},
"goal": {
"type": "string",
"description": "The goal of the visit for webpage(s)."
}
},
"required": ["url", "goal"]
}
# The `call` method is the main function of the tool.
def call(self, params: Union[str, dict], **kwargs) -> str:
try:
params = json.loads(params)
url = params["url"]
goal = params["goal"]
except:
return "[Visit] Invalid request format: Input must be a JSON object containing 'url' and 'goal' fields"
start_time = time.time()
# Create log folder if it doesn't exist
log_folder = "log"
os.makedirs(log_folder, exist_ok=True)
if isinstance(url, str):
response = self.readpage_jina(url, goal)
else:
response = []
assert isinstance(url, List)
start_time = time.time()
for u in url:
if time.time() - start_time > 900:
cur_response = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
cur_response += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"
cur_response += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"
else:
try:
cur_response = self.readpage_jina(u, goal)
except Exception as e:
cur_response = f"Error fetching {u}: {str(e)}"
response.append(cur_response)
response = "\n=======\n".join(response)
print(f'Summary Length {len(response)}; Summary Content {response}')
return response.strip()
def call_server(self, msgs, max_retries=2):
api_key = os.environ.get("API_KEY")
url_llm = os.environ.get("API_BASE")
model_name = "qwen/qwen3-30b-a3b-instruct-2507"
client = OpenAI(
api_key=api_key,
base_url=url_llm,
)
for attempt in range(max_retries):
try:
chat_response = client.chat.completions.create(
model=model_name,
messages=msgs,
temperature=0.7
)
content = chat_response.choices[0].message.content
if content:
try:
json.loads(content)
except:
# extract json from string
left = content.find('{')
right = content.rfind('}')
if left != -1 and right != -1 and left <= right:
content = content[left:right+1]
return content
except Exception as e:
print(e)
if attempt == (max_retries - 1):
return ""
continue
def jina_readpage(self, url: str) -> str:
"""
Read webpage content using Jina service.
Args:
url: The URL to read
goal: The goal/purpose of reading the page
Returns:
str: The webpage content or error message
"""
max_retries = 3
timeout = 50
for attempt in range(max_retries):
headers = {
"Authorization": f"Bearer {JINA_API_KEYS}",
}
# print(headers)
try:
response = requests.get(
f"https://r.jina.ai/{url}",
headers=headers,
timeout=timeout
)
if response.status_code == 200:
webpage_content = response.text
return webpage_content
else:
print(response.text)
raise ValueError("jina readpage error")
except Exception as e:
time.sleep(0.5)
if attempt == max_retries - 1:
return "[visit] Failed to read page."
return "[visit] Failed to read page."
def html_readpage_jina(self, url: str) -> str:
max_attempts = 8
for attempt in range(max_attempts):
content = self.jina_readpage(url)
service = "jina"
print(service)
if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"):
return content
return "[visit] Failed to read page."
def readpage_jina(self, url: str, goal: str) -> str:
"""
Attempt to read webpage content by alternating between jina and aidata services.
Args:
url: The URL to read
goal: The goal/purpose of reading the page
Returns:
str: The webpage content or error message
"""
summary_page_func = self.call_server
max_retries = int(os.getenv('VISIT_SERVER_MAX_RETRIES', 1))
content = self.html_readpage_jina(url)
if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"):
content = truncate_to_tokens(content, max_tokens=95000)
messages = [{"role":"user","content": EXTRACTOR_PROMPT.format(webpage_content=content, goal=goal)}]
parse_retry_times = 0
raw = summary_page_func(messages, max_retries=max_retries)
summary_retries = 3
while len(raw) < 10 and summary_retries >= 0:
truncate_length = int(0.7 * len(content)) if summary_retries > 0 else 25000
status_msg = (
f"[visit] Summary url[{url}] "
f"attempt {3 - summary_retries + 1}/3, "
f"content length: {len(content)}, "
f"truncating to {truncate_length} chars"
) if summary_retries > 0 else (
f"[visit] Summary url[{url}] failed after 3 attempts, "
f"final truncation to 25000 chars"
)
print(status_msg)
content = content[:truncate_length]
extraction_prompt = EXTRACTOR_PROMPT.format(
webpage_content=content,
goal=goal
)
messages = [{"role": "user", "content": extraction_prompt}]
raw = summary_page_func(messages, max_retries=max_retries)
summary_retries -= 1
parse_retry_times = 2
if isinstance(raw, str):
raw = raw.replace("```json", "").replace("```", "").strip()
while parse_retry_times < 3:
try:
raw = json.loads(raw)
break
except:
raw = summary_page_func(messages, max_retries=max_retries)
parse_retry_times += 1
if parse_retry_times >= 3:
useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"
useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"
else:
useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
useful_information += "Evidence in page: \n" + str(raw["evidence"]) + "\n\n"
useful_information += "Summary: \n" + str(raw["summary"]) + "\n\n"
if len(useful_information) < 10 and summary_retries < 0:
print("[visit] Could not generate valid summary after maximum retries")
useful_information = "[visit] Failed to read page"
return useful_information
# If no valid content was obtained after all retries
else:
useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"
useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"
return useful_information
if __name__ == "__main__":
a = Visit()
print(a.call('{"url": ["https://2025.aclweb.org/"], "goal": "Find the important dates page and locate the Industry Track paper submission deadline"}')) |