Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import logging | |
| import re | |
| from functools import lru_cache, wraps | |
| from typing import Optional, Dict | |
| from requests.exceptions import RequestException | |
| import wikipedia | |
| from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
| from llama_index.readers.web import BeautifulSoupWebReader | |
| from smolagents import ( | |
| CodeAgent, | |
| InferenceClientModel, | |
| GoogleSearchTool, | |
| tool, | |
| Tool, | |
| ) | |
| # --- Configuration and Setup --- | |
| def configure_logging(): | |
| """Sets up detailed logging configuration.""" | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") | |
| def get_api_keys_from_env() -> Dict[str, Optional[str]]: | |
| """Retrieves API keys directly from environment variables.""" | |
| keys = { | |
| 'together': os.getenv('TOGETHER_API_KEY'), | |
| 'serpapi': os.getenv('SERPAPI_API_KEY') | |
| } | |
| if not keys['together']: | |
| raise ValueError("TOGETHER_API_KEY is required but not found in environment variables.") | |
| return keys | |
| # --- Custom Exceptions --- | |
| class SerpApiClientException(Exception): pass | |
| class YouTubeTranscriptApiError(Exception): pass | |
| # --- Decorators --- | |
| def retry(max_retries=3, initial_delay=1, backoff=2): | |
| """A robust retry decorator with exponential backoff.""" | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| for attempt in range(1, max_retries + 1): | |
| try: | |
| return func(*args, **kwargs) | |
| except (RequestException, SerpApiClientException, YouTubeTranscriptApiError, TranscriptsDisabled, NoTranscriptFound) as e: | |
| if attempt == max_retries: | |
| logging.error(f"{func.__name__} failed after {attempt} attempts: {e}") | |
| return f"Tool Error: {func.__name__} failed after {max_retries} attempts. Details: {e}" | |
| time.sleep(initial_delay * (backoff ** (attempt - 1))) | |
| except Exception as e: | |
| logging.error(f"{func.__name__} failed with a non-retryable error: {e}") | |
| return f"Tool Error: A non-retryable error occurred in {func.__name__}: {e}" | |
| return wrapper | |
| return decorator | |
| # --- Answer Formatting and Extraction --- | |
| def extract_final_answer(response: str) -> str: | |
| """Extracts the final answer from the agent's full response string.""" | |
| if not response: return "" | |
| match = re.search(r'FINAL\s+ANSWER\s*:\s*(.*)', response, re.IGNORECASE | re.DOTALL) | |
| if match: return match.group(1).strip() | |
| lines = response.strip().split('\n') | |
| return lines[-1].strip() if lines else "" | |
| def normalize_answer_format(answer: str) -> str: | |
| """Normalizes the extracted answer to meet strict GAIA formatting requirements.""" | |
| if not answer: return "" | |
| answer = answer.strip().rstrip('.') | |
| is_list = ',' in answer and len(answer.split(',')) > 1 | |
| try: | |
| is_numeric = not is_list and float(answer.replace(',', '')) is not None | |
| except ValueError: | |
| is_numeric = False | |
| if is_numeric: return re.sub(r'[,$%]', '', answer).strip() | |
| if is_list: | |
| elements = [normalize_answer_format(elem.strip()) for elem in answer.split(',')] | |
| return ', '.join(elements) | |
| return answer | |
| # --- Agent Wrapper for GAIA Compliance --- | |
| def create_gaia_agent_wrapper(agent: CodeAgent): | |
| """Creates a callable wrapper around the agent to enforce GAIA answer formatting.""" | |
| def gaia_compliant_agent(question: str) -> str: | |
| logging.info(f"Received question for GAIA compliant agent: '{question}'") | |
| full_response = agent.run(question) | |
| logging.info(f"Agent raw response:\n---\n{full_response}\n---") | |
| final_answer = extract_final_answer(full_response) | |
| normalized_answer = normalize_answer_format(final_answer) | |
| logging.info(f"Normalized answer for submission: '{normalized_answer}'") | |
| return normalized_answer | |
| return gaia_compliant_agent | |
| # --- Tool Implementations (with robustness decorators) --- | |
| def _get_webpage_content_implementation(url: str) -> str: | |
| logging.info(f"π Reading webpage content from: {url}") | |
| loader = BeautifulSoupWebReader() | |
| docs = loader.load_data(urls=[url]) | |
| if not docs or not docs[0].text: | |
| raise ValueError(f"No content could be extracted from {url}") | |
| return docs[0].text[:15000] | |
| def _get_youtube_transcript_implementation(video_url: str) -> str: | |
| logging.info(f"π¬ Fetching YouTube transcript for: {video_url}") | |
| video_id_match = re.search(r'(?:v=|\/)([a-zA-Z0-9_-]{11}).*', video_url) | |
| if not video_id_match: | |
| return "Error: Invalid YouTube URL provided." | |
| video_id = video_id_match.group(1) | |
| try: | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
| transcript_text = ' '.join([t['text'] for t in transcript_list]) | |
| return transcript_text[:15000] | |
| except (TranscriptsDisabled, NoTranscriptFound) as e: | |
| logging.error(f"Could not retrieve transcript for {video_url}: {e}") | |
| raise YouTubeTranscriptApiError(f"Transcript not available for video {video_id}.") from e | |
| def _wikipedia_search_implementation(query: str) -> str: | |
| try: | |
| return wikipedia.summary(query, sentences=5) | |
| except wikipedia.exceptions.PageError: | |
| return f"No Wikipedia page found for '{query}'." | |
| except wikipedia.exceptions.DisambiguationError as e: | |
| return f"Ambiguous query '{query}'. Options: {', '.join(e.options[:3])}" | |
| except Exception as e: | |
| return f"An error occurred during Wikipedia search: {e}" | |
| # --- Tool Interfaces (for the agent) --- | |
| def get_webpage_content(url: str) -> str: | |
| """ | |
| Extracts the text content from a single webpage. | |
| Args: | |
| url (str): The full URL of the webpage to read. | |
| """ | |
| return _get_webpage_content_implementation(url) | |
| def get_youtube_transcript(video_url: str) -> str: | |
| """ | |
| Fetches the full transcript of a YouTube video as a single string. | |
| Args: | |
| video_url (str): The full URL of the YouTube video. | |
| """ | |
| return _get_youtube_transcript_implementation(video_url) | |
| def wikipedia_search(query: str) -> str: | |
| """ | |
| Searches Wikipedia for a given query and returns a summary. | |
| Args: | |
| query (str): The term or question to search for on Wikipedia. | |
| """ | |
| return _wikipedia_search_implementation(query) | |
| def initialize_agent(): | |
| """Initializes the enhanced multi-disciplinary agent for the GAIA benchmark.""" | |
| configure_logging() | |
| logging.info("π Starting GAIA agent initialization...") | |
| try: | |
| api_keys = get_api_keys_from_env() | |
| except ValueError as e: | |
| logging.error(f"FATAL: {e}") | |
| return None | |
| try: | |
| model = InferenceClientModel(model_id="Qwen/Qwen3-235B-A22B-FP8", token=api_keys['together'], provider="together") | |
| logging.info("β Primary model dQwen/Qwen3-235B-A22B-FP8 loaded successfully") | |
| except Exception as e: | |
| logging.warning(f"β οΈ Failed to load primary model, falling back. Error: {e}") | |
| model = InferenceClientModel(model_id="Qwen/Qwen2.5-7B-Instruct", token=api_keys['together'], provider="together") | |
| logging.info("β Fallback model (Qwen 2.5 7B) loaded successfully") | |
| google_search_tool = GoogleSearchTool() if api_keys['serpapi'] else None | |
| tools_list = [ | |
| tool for tool in [ | |
| google_search_tool, | |
| get_webpage_content, | |
| get_youtube_transcript, | |
| wikipedia_search | |
| ] if tool | |
| ] | |
| agent = CodeAgent( | |
| model=model, | |
| tools=tools_list, | |
| instructions="""You are a master AI assistant for the GAIA benchmark. Your goal is to provide a single, precise, and final answer by writing and executing Python code. | |
| **STRATEGY:** | |
| You have a powerful toolkit. You can write and execute any Python code you need. You also have access to pre-defined tools that you can call from within your code to gather information. | |
| 1. **Analyze**: Break down the user's question into logical steps. | |
| 2. **Plan**: Decide if you need to search the web, read a webpage, get a video transcript, or perform a calculation. | |
| 3. **Execute**: Write a Python script to perform the steps. You must always use the `<code>...</code>` format to wrap your code. | |
| **HOW TO USE TOOLS IN YOUR CODE:** | |
| To solve a problem, you will write a Python code block that calls the necessary tools. | |
| *Example 1: Simple Calculation* | |
| Thought: The user wants to know 15! / (12! * 3!). I will use the math library to calculate the factorials and then perform the division. | |
| <code> | |
| import math | |
| result = math.factorial(15) / (math.factorial(12) * math.factorial(3)) | |
| print(int(result)) | |
| </code> | |
| *Example 2: Multi-step question involving web search and reading a page* | |
| Thought: I need to find the name of the journal that published a specific article. First, I will use the Google Search tool to find the webpage for the article. Then, I will use the `get_webpage_content` tool to read the text of that page. Finally, I will analyze the text to find the journal's name and print it. | |
| <code> | |
| # First, find the URL of the paper. | |
| search_results = GoogleSearchTool(query="A Rapid and Sensitive Method for the Quantitation of Microgram Quantities of Protein Utilizing the Principle of Protein-Dye Binding") | |
| # Let's assume the first result has a good URL, like "https://www.sciencedirect.com/science/article/pii/0003269776905271" | |
| # Now, read the content of that page to find the journal name. | |
| page_content = get_webpage_content(url="https://www.sciencedirect.com/science/article/pii/0003269776905271") | |
| # Now I will analyze the text `page_content` in my head to find the journal name. | |
| # After reading the text, I found the journal is "Analytical Biochemistry". | |
| print("Analytical Biochemistry") | |
| </code> | |
| **CRITICAL INSTRUCTION:** You MUST end your entire response with the line `FINAL ANSWER: [Your Final Answer]`. This is the only part of your response that will be graded. Adhere to strict formatting: no extra words, no currency symbols, no commas in numbers. | |
| """ | |
| ) | |
| logging.info("π― GAIA agent with unified CodeAgent architecture initialized successfully!") | |
| return create_gaia_agent_wrapper(agent) | |
| # --- Main Execution Block for Local Testing --- | |
| def main(): | |
| """ | |
| Tests the agent with sample GAIA-style questions. | |
| For local testing, ensure you have set the required environment variables: | |
| export TOGETHER_API_KEY="your_key" | |
| export SERPAPI_API_KEY="your_key" | |
| """ | |
| configure_logging() | |
| logging.info("π§ͺ Starting local agent testing...") | |
| agent = initialize_agent() | |
| if not agent: | |
| logging.critical("π₯ Agent initialization failed. Exiting.") | |
| return | |
| test_questions = [ | |
| "What is 15! / (12! * 3!)?", | |
| "In what year was the Python programming language first released?", | |
| "What is the square root of 2025?", | |
| ] | |
| for i, question in enumerate(test_questions, 1): | |
| logging.info(f"\n{'='*60}\nπ Test Question {i}: {question}\n{'='*60}") | |
| start_time = time.time() | |
| final_answer = agent(question) | |
| elapsed_time = time.time() - start_time | |
| logging.info(f"β Submitted Answer: {final_answer}") | |
| logging.info(f"β±οΈ Execution time: {elapsed_time:.2f} seconds") | |
| time.sleep(1) | |
| logging.info(f"\n{'='*60}\nπ Testing complete!\n{'='*60}") | |
| if __name__ == "__main__": | |
| main() |