|
|
import json |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from typing import List, Union |
|
|
import requests |
|
|
from qwen_agent.tools.base import BaseTool, register_tool |
|
|
import asyncio |
|
|
from typing import Dict, List, Optional, Union |
|
|
import uuid |
|
|
import http.client |
|
|
import json |
|
|
|
|
|
import os |
|
|
|
|
|
|
|
|
SERPER_KEY=os.environ.get('SERPER_KEY_ID') |
|
|
|
|
|
|
|
|
@register_tool("search", allow_overwrite=True) |
|
|
class Search(BaseTool): |
|
|
name = "search" |
|
|
description = "Performs batched web searches: supply an array 'query'; the tool retrieves the top 10 results for each query in one call." |
|
|
parameters = { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"query": { |
|
|
"type": "array", |
|
|
"items": { |
|
|
"type": "string" |
|
|
}, |
|
|
"description": "Array of query strings. Include multiple complementary search queries in a single call." |
|
|
}, |
|
|
}, |
|
|
"required": ["query"], |
|
|
} |
|
|
|
|
|
def __init__(self, cfg: Optional[dict] = None): |
|
|
super().__init__(cfg) |
|
|
def google_search_with_serp(self, query: str): |
|
|
def contains_chinese_basic(text: str) -> bool: |
|
|
return any('\u4E00' <= char <= '\u9FFF' for char in text) |
|
|
conn = http.client.HTTPSConnection("google.serper.dev") |
|
|
if contains_chinese_basic(query): |
|
|
payload = json.dumps({ |
|
|
"q": query, |
|
|
"location": "China", |
|
|
"gl": "cn", |
|
|
"hl": "zh-cn" |
|
|
}) |
|
|
|
|
|
else: |
|
|
payload = json.dumps({ |
|
|
"q": query, |
|
|
"location": "United States", |
|
|
"gl": "us", |
|
|
"hl": "en" |
|
|
}) |
|
|
headers = { |
|
|
'X-API-KEY': SERPER_KEY, |
|
|
'Content-Type': 'application/json' |
|
|
} |
|
|
|
|
|
|
|
|
for i in range(5): |
|
|
try: |
|
|
conn.request("POST", "/search", payload, headers) |
|
|
res = conn.getresponse() |
|
|
break |
|
|
except Exception as e: |
|
|
print(e) |
|
|
if i == 4: |
|
|
return f"Google search Timeout, return None, Please try again later." |
|
|
continue |
|
|
|
|
|
data = res.read() |
|
|
results = json.loads(data.decode("utf-8")) |
|
|
print(results) |
|
|
|
|
|
try: |
|
|
if "organic" not in results: |
|
|
raise Exception(f"No results found for query: '{query}'. Use a less specific query.") |
|
|
|
|
|
web_snippets = list() |
|
|
idx = 0 |
|
|
if "organic" in results: |
|
|
for page in results["organic"]: |
|
|
idx += 1 |
|
|
date_published = "" |
|
|
if "date" in page: |
|
|
date_published = "\nDate published: " + page["date"] |
|
|
|
|
|
source = "" |
|
|
if "source" in page: |
|
|
source = "\nSource: " + page["source"] |
|
|
|
|
|
snippet = "" |
|
|
if "snippet" in page: |
|
|
snippet = "\n" + page["snippet"] |
|
|
|
|
|
redacted_version = f"{idx}. [{page['title']}]({page['link']}){date_published}{source}\n{snippet}" |
|
|
redacted_version = redacted_version.replace("Your browser can't play this video.", "") |
|
|
web_snippets.append(redacted_version) |
|
|
|
|
|
content = f"A Google search for '{query}' found {len(web_snippets)} results:\n\n## Web Results\n" + "\n\n".join(web_snippets) |
|
|
return content |
|
|
except Exception as e: |
|
|
print(e) |
|
|
return f"No results found for '{query}'. Try with a more general query." |
|
|
|
|
|
|
|
|
|
|
|
def search_with_serp(self, query: str): |
|
|
result = self.google_search_with_serp(query) |
|
|
return result |
|
|
|
|
|
def call(self, params: Union[str, dict], **kwargs) -> str: |
|
|
try: |
|
|
print(params) |
|
|
params = json.loads(params) |
|
|
print(params) |
|
|
query = params["query"] |
|
|
print("query:\n", query) |
|
|
except: |
|
|
return "[Search] Invalid request format: Input must be a JSON object containing 'query' field" |
|
|
|
|
|
if isinstance(query, str): |
|
|
|
|
|
response = self.search_with_serp(query) |
|
|
else: |
|
|
|
|
|
assert isinstance(query, List) |
|
|
responses = [] |
|
|
for q in query: |
|
|
responses.append(self.search_with_serp(q)) |
|
|
response = "\n=======\n".join(responses) |
|
|
|
|
|
return response |
|
|
|