Spaces:
Paused
Paused
| import logging | |
| from typing import Optional | |
| import requests | |
| from open_webui.retrieval.web.main import SearchResult, get_filtered_results | |
| from open_webui.env import SRC_LOG_LEVELS | |
| log = logging.getLogger(__name__) | |
| log.setLevel(SRC_LOG_LEVELS["RAG"]) | |
| def search_kagi( | |
| api_key: str, query: str, count: int, filter_list: Optional[list[str]] = None | |
| ) -> list[SearchResult]: | |
| """Search using Kagi's Search API and return the results as a list of SearchResult objects. | |
| The Search API will inherit the settings in your account, including results personalization and snippet length. | |
| Args: | |
| api_key (str): A Kagi Search API key | |
| query (str): The query to search for | |
| count (int): The number of results to return | |
| """ | |
| url = "https://kagi.com/api/v0/search" | |
| headers = { | |
| "Authorization": f"Bot {api_key}", | |
| } | |
| params = {"q": query, "limit": count} | |
| response = requests.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| json_response = response.json() | |
| search_results = json_response.get("data", []) | |
| results = [ | |
| SearchResult( | |
| link=result["url"], title=result["title"], snippet=result.get("snippet") | |
| ) | |
| for result in search_results | |
| if result["t"] == 0 | |
| ] | |
| print(results) | |
| if filter_list: | |
| results = get_filtered_results(results, filter_list) | |
| return results | |