File size: 60,886 Bytes
10e9b7d eccf8e4 3c4371f 1637cd5 10e9b7d e80aab9 3db6293 e80aab9 1637cd5 31243f4 1637cd5 31243f4 1637cd5 4021bf3 1637cd5 31243f4 1637cd5 7d65c66 1637cd5 7e4a06b 1637cd5 3c4371f 7e4a06b 3c4371f 7d65c66 1637cd5 7e4a06b 31243f4 1637cd5 7d65c66 31243f4 eccf8e4 31243f4 7d65c66 31243f4 1637cd5 31243f4 1637cd5 31243f4 1637cd5 b177367 7d65c66 3c4371f 1637cd5 31243f4 1637cd5 31243f4 1637cd5 31243f4 7d65c66 1637cd5 31243f4 1637cd5 31243f4 3c4371f 31243f4 1637cd5 b44026d 1637cd5 3c4371f 31243f4 1637cd5 7d65c66 31243f4 e80aab9 7d65c66 e80aab9 31243f4 e80aab9 3c4371f e80aab9 31243f4 7d65c66 1637cd5 31243f4 e80aab9 1637cd5 0ee0419 e514fd7 1637cd5 e514fd7 1637cd5 e514fd7 e80aab9 1637cd5 7d65c66 1637cd5 31243f4 1637cd5 31243f4 e80aab9 3c4371f 1637cd5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 |
import os
import gradio as gr
import requests
import pandas as pd
from typing import Dict, List, Any, Optional, TypedDict, Annotated
import re
import numpy as np
from datetime import datetime
# LangChain and LangGraph imports
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage
from langchain_core.tools import tool
from serpapi import GoogleSearch
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langgraph.graph.message import add_messages
import numexpr
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- State Definition for LangGraph ---
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], add_messages]
# --- Tool Definitions ---
@tool
def web_search(query: str, max_results: int = 8) -> str:
"""
Enhanced web search using DuckDuckGo (no API key required).
Falls back to SerpAPI if available.
"""
try:
# Handle list input
if isinstance(query, list):
query = " ".join(str(item) for item in query)
elif not isinstance(query, str):
query = str(query)
# Try Tavily first if API key is available
tavily_api_key = os.getenv("TAVILY_API_KEY")
if tavily_api_key:
try:
import requests
tavily_url = "https://api.tavily.com/search"
tavily_headers = {
"Content-Type": "application/json"
}
tavily_data = {
"api_key": tavily_api_key,
"query": query,
"search_depth": "advanced",
"include_answer": True,
"include_raw_content": False,
"max_results": max_results
}
response = requests.post(tavily_url, json=tavily_data, headers=tavily_headers, timeout=10)
if response.status_code == 200:
results = response.json()
formatted_results = []
# Extract direct answer if available
if results.get("answer"):
formatted_results.append(f"DIRECT ANSWER: {results['answer']}")
# Extract search results
if results.get("results"):
for i, result in enumerate(results["results"][:max_results], 1):
title = result.get("title", "")
content = result.get("content", "")
url = result.get("url", "")
formatted_results.append(f"{i}. {title}\n {content}\n Source: {url}")
if formatted_results:
return "\n\n".join(formatted_results)
except Exception as tavily_error:
print(f"Tavily search error: {tavily_error}")
# Try DuckDuckGo as fallback (no API key needed)
try:
import requests
from urllib.parse import quote
# Set shorter timeout and add retries
ddg_success = False
formatted_results = []
# Try DuckDuckGo Instant Answer API with retry
for attempt in range(2):
try:
ddg_url = f"https://api.duckduckgo.com/?q={quote(query)}&format=json&no_html=1"
response = requests.get(ddg_url, timeout=5)
if response.status_code == 200:
ddg_data = response.json()
# Extract instant answer
if ddg_data.get("Answer"):
formatted_results.append(f"DIRECT ANSWER: {ddg_data['Answer']}")
ddg_success = True
# Extract abstract (Wikipedia-like summary)
if ddg_data.get("Abstract"):
formatted_results.append(f"SUMMARY: {ddg_data['Abstract']}")
ddg_success = True
# Extract definition
if ddg_data.get("Definition"):
formatted_results.append(f"DEFINITION: {ddg_data['Definition']}")
ddg_success = True
if ddg_success:
break
except:
if attempt == 0:
print(f"DuckDuckGo attempt 1 failed, retrying...")
continue
# If DuckDuckGo failed or gave no results, create basic search results
if not ddg_success:
print(f"DuckDuckGo unavailable, checking alternatives...")
# Try a simple Wikipedia search for specific queries
if "wikipedia" in query.lower() or "featured article" in query.lower():
formatted_results.append(f"Search query: {query}")
formatted_results.append("Note: For Wikipedia Featured Articles, check Wikipedia's FA archives")
formatted_results.append("Tip: Featured Articles are promoted monthly and listed in Wikipedia's FA log")
else:
# Provide some basic context based on common queries
query_lower = query.lower() if isinstance(query, str) else str(query).lower()
if "who is" in query_lower or "who was" in query_lower:
formatted_results.append(f"Search query: {query}")
formatted_results.append("Note: Live web search unavailable. Please verify information.")
elif any(word in query_lower for word in ["when", "what year", "what date"]):
formatted_results.append(f"Search query: {query}")
formatted_results.append("Note: For current dates and recent events, web search is limited.")
else:
formatted_results.append(f"Search query: {query}")
formatted_results.append("Note: Web search temporarily unavailable.")
if formatted_results:
return "\n\n".join(formatted_results)
except Exception as ddg_error:
print(f"DuckDuckGo search error: {ddg_error}")
# Fallback to SerpAPI if available
api_key = os.getenv("SERPAPI_KEY")
if api_key:
params = {
"q": query,
"api_key": api_key,
"num": max_results,
"engine": "google",
"hl": "en",
"gl": "us"
}
search = GoogleSearch(params)
results = search.get_dict()
formatted_results = []
# Extract SerpAPI results (same as before)
if "answer_box" in results:
ab = results["answer_box"]
if "answer" in ab:
formatted_results.append(f"DIRECT ANSWER: {ab['answer']}")
elif "snippet" in ab:
formatted_results.append(f"ANSWER BOX: {ab['snippet']}")
if "organic_results" in results:
for i, result in enumerate(results["organic_results"][:max_results], 1):
title = result.get("title", "")
snippet = result.get("snippet", "")
formatted_results.append(f"{i}. {title}\n {snippet}")
return "\n\n".join(formatted_results) if formatted_results else "No results found"
return "No search service available. Please set SERPAPI_KEY or check internet connection."
except Exception as e:
return f"Search error: {str(e)}"
@tool
def calculator(expression: str) -> str:
"""
Enhanced calculator with unit conversion and advanced functions.
Supports: arithmetic, percentages, trigonometry, logarithms, unit conversion.
Examples: "15% of 200", "sqrt(16)", "convert 5 km to miles"
"""
try:
# Handle list input
if isinstance(expression, list):
expression = " ".join(str(item) for item in expression)
elif not isinstance(expression, str):
expression = str(expression)
expression = expression.strip().lower()
# Handle percentage calculations
if "% of" in expression:
parts = expression.split("% of")
if len(parts) == 2:
percent = float(parts[0].strip())
value = float(parts[1].strip())
result = (percent / 100) * value
return str(result)
# Handle unit conversions
if "convert" in expression or " to " in expression:
# Common conversions
conversions = {
"km to miles": 0.621371,
"miles to km": 1.60934,
"kg to lbs": 2.20462,
"lbs to kg": 0.453592,
"celsius to fahrenheit": lambda c: (c * 9/5) + 32,
"fahrenheit to celsius": lambda f: (f - 32) * 5/9,
"meters to feet": 3.28084,
"feet to meters": 0.3048,
"liters to gallons": 0.264172,
"gallons to liters": 3.78541
}
for conv, factor in conversions.items():
if conv in expression:
# Extract number
import re
numbers = re.findall(r'[\d.]+', expression)
if numbers:
value = float(numbers[0])
if callable(factor):
result = factor(value)
else:
result = value * factor
return f"{result:.4f}".rstrip('0').rstrip('.')
# Replace math functions for numexpr
expression = expression.replace("sqrt", "sqrt")
expression = expression.replace("log10", "log10")
expression = expression.replace("log", "log")
expression = expression.replace("sin", "sin")
expression = expression.replace("cos", "cos")
expression = expression.replace("tan", "tan")
expression = expression.replace("pi", "3.14159265359")
expression = expression.replace("e", "2.71828182846")
# Remove any remaining text
expression = re.sub(r'[a-zA-Z]+', '', expression)
# Evaluate with numexpr
result = numexpr.evaluate(expression)
# Format result
if isinstance(result, (int, np.integer)):
return str(int(result))
elif isinstance(result, (float, np.floating)):
if abs(result) < 1e-10:
return "0"
elif abs(result) > 1e10:
return f"{result:.2e}"
else:
# Keep reasonable precision
formatted = f"{result:.6f}".rstrip('0').rstrip('.')
# If it's a whole number, return as int
if float(formatted).is_integer():
return str(int(float(formatted)))
return formatted
else:
return str(result)
except Exception as e:
# Try basic Python eval for simple cases
try:
import math
result = eval(expression, {"__builtins__": {}, "math": math})
if isinstance(result, float) and result.is_integer():
return str(int(result))
return str(result)
except:
return f"Calculation error: {str(e)}"
@tool
def python_executor(code: str) -> str:
"""
Enhanced Python executor with data analysis and web scraping capabilities.
Includes: pandas, numpy, statistics, datetime, requests, BeautifulSoup.
Always print the final result you want to return.
"""
try:
# Handle list input
if isinstance(code, list):
code = "\n".join(str(item) for item in code)
elif not isinstance(code, str):
code = str(code)
# Enhanced global namespace with more libraries
safe_globals = {
'__builtins__': {
'print': print,
'len': len,
'range': range,
'sum': sum,
'min': min,
'max': max,
'abs': abs,
'round': round,
'sorted': sorted,
'reversed': reversed,
'enumerate': enumerate,
'zip': zip,
'map': map,
'filter': filter,
'str': str,
'int': int,
'float': float,
'list': list,
'dict': dict,
'set': set,
'tuple': tuple,
'bool': bool,
'all': all,
'any': any,
'isinstance': isinstance,
'type': type,
},
'math': __import__('math'),
'datetime': __import__('datetime'),
'json': __import__('json'),
're': __import__('re'),
'numpy': __import__('numpy'),
'np': __import__('numpy'),
'pandas': __import__('pandas'),
'pd': __import__('pandas'),
'statistics': __import__('statistics'),
'itertools': __import__('itertools'),
'collections': __import__('collections'),
'Counter': __import__('collections').Counter,
'defaultdict': __import__('collections').defaultdict,
}
# Capture output
from io import StringIO
import sys
old_stdout = sys.stdout
sys.stdout = output_buffer = StringIO()
try:
# Add common imports to the code if needed
enhanced_code = code
if "from datetime" not in code and "import datetime" not in code:
enhanced_code = "from datetime import datetime, date, timedelta\n" + enhanced_code
exec(enhanced_code, safe_globals)
output = output_buffer.getvalue().strip()
# If no output, check if there's a result variable
if not output:
for var in ['result', 'answer', 'output']:
if var in safe_globals:
output = str(safe_globals[var])
break
return output if output else "No output (add print statement)"
finally:
sys.stdout = old_stdout
except Exception as e:
import traceback
return f"Error: {str(e)}\nTraceback: {traceback.format_exc()}"
@tool
def extract_image_from_question(question: str) -> str:
"""
Extract and analyze images mentioned in questions.
For GAIA benchmark, images are typically base64 encoded or referenced.
"""
try:
# Handle list input
if isinstance(question, list):
question = " ".join(str(item) for item in question)
elif not isinstance(question, str):
question = str(question)
# Check for base64 image data
if "data:image" in question:
return "Image data detected in question"
# Check for image file references
image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg']
for ext in image_extensions:
if ext in question.lower():
return f"Image file reference detected: {ext}"
# Check for common image-related phrases
image_phrases = ['image', 'picture', 'photo', 'diagram', 'figure', 'screenshot']
for phrase in image_phrases:
if phrase in question.lower():
return "Image-related content mentioned in question"
return "No image content detected"
except Exception as e:
return f"Error analyzing for images: {str(e)}"
@tool
def analyze_attachments(question: str) -> str:
"""
Analyze questions for references to attachments (files, videos, audio).
For GAIA questions that reference external content.
"""
# Handle list input
if isinstance(question, list):
question = " ".join(str(item) for item in question)
elif not isinstance(question, str):
question = str(question)
attachments = []
# Check for YouTube videos
youtube_patterns = [
r'youtube\.com/watch\?v=([a-zA-Z0-9_-]+)',
r'youtu\.be/([a-zA-Z0-9_-]+)'
]
for pattern in youtube_patterns:
import re
matches = re.findall(pattern, question)
if matches:
attachments.append(f"YouTube video: {matches[0]}")
# Check for file URLs
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+\.(?:xlsx|xls|csv|pdf|txt)'
url_matches = re.findall(url_pattern, question, re.IGNORECASE)
if url_matches:
for url in url_matches:
if '.xlsx' in url or '.xls' in url:
attachments.append(f"Excel file URL: {url}")
elif '.csv' in url:
attachments.append(f"CSV file URL: {url}")
elif '.pdf' in url:
attachments.append(f"PDF file URL: {url}")
elif '.txt' in url:
attachments.append(f"Text file URL: {url}")
# Check for file references without URLs
file_patterns = [
r'attached (\w+) file',
r'the (\w+) file',
r'(\w+\.\w{2,4})' # filename.ext
]
for pattern in file_patterns:
matches = re.findall(pattern, question, re.IGNORECASE)
if matches:
# Filter out URLs we already found
for match in matches:
if not any(match in url for url in url_matches):
attachments.append(f"File reference: {match}")
if attachments:
return "Attachments found: " + ", ".join(attachments)
return "No attachments detected"
@tool
def analyze_reversed_text(text: str) -> str:
"""
Analyze text that might be written backwards or contains puzzles.
Useful for GAIA questions with reversed text.
"""
try:
# Handle list input
if isinstance(text, list):
text = " ".join(str(item) for item in text)
elif not isinstance(text, str):
text = str(text)
# Check if text might be reversed
reversed_text = text[::-1]
# Common patterns for reversed text
if "rewsna" in text.lower() or "noitseuq" in text.lower():
return f"Text appears to be reversed. Original: {reversed_text}"
# Check for word reversal
words = text.split()
reversed_words = [word[::-1] for word in words]
return f"Normal text: {text}\nReversed text: {reversed_text}\nReversed words: {' '.join(reversed_words)}"
except Exception as e:
return f"Error analyzing text: {str(e)}"
@tool
def analyze_code_in_question(question: str) -> str:
"""
Detect and extract Python code from questions.
Looks for code blocks, inline code, and code-related phrases.
"""
try:
# Handle list input
if isinstance(question, list):
question = " ".join(str(item) for item in question)
elif not isinstance(question, str):
question = str(question)
extracted_code = []
# Pattern 1: Look for markdown code blocks ```python ... ```
code_block_pattern = r'```python\s*(.*?)\s*```'
code_blocks = re.findall(code_block_pattern, question, re.DOTALL | re.IGNORECASE)
if code_blocks:
for i, code in enumerate(code_blocks, 1):
extracted_code.append(f"Code Block {i}:\n{code.strip()}")
# Pattern 2: Look for generic code blocks ``` ... ```
generic_code_pattern = r'```\s*(.*?)\s*```'
generic_blocks = re.findall(generic_code_pattern, question, re.DOTALL)
if generic_blocks:
for i, code in enumerate(generic_blocks, 1):
# Check if it looks like Python code
if any(keyword in code for keyword in ['def ', 'import ', 'class ', 'if ', 'for ', 'while ', 'print(', 'return ']):
extracted_code.append(f"Generic Code Block {i}:\n{code.strip()}")
# Pattern 3: Look for inline code `...`
inline_code_pattern = r'`([^`]+)`'
inline_codes = re.findall(inline_code_pattern, question)
if inline_codes:
# Filter for likely Python code
python_inline = []
for code in inline_codes:
if any(char in code for char in ['(', ')', '=', '[', ']', '{', '}', 'def', 'import', 'print']):
python_inline.append(code)
if python_inline:
extracted_code.append("Inline Code:\n" + "\n".join(f"- {code}" for code in python_inline))
# Pattern 4: Look for code-related phrases
code_phrases = [
r'attached python code',
r'the following code',
r'this code',
r'given code',
r'code snippet',
r'python script',
r'the script',
r'function below',
r'class below',
r'program below'
]
code_indicators = []
for phrase in code_phrases:
if re.search(phrase, question, re.IGNORECASE):
code_indicators.append(phrase.replace(r'\\', ''))
# Pattern 5: Look for common Python patterns not in code blocks
python_patterns = [
r'def\s+\w+\s*\([^)]*\)\s*:', # function definitions
r'class\s+\w+\s*(?:\([^)]*\))?\s*:', # class definitions
r'import\s+\w+', # import statements
r'from\s+\w+\s+import', # from imports
r'if\s+.*:\s*\n', # if statements
r'for\s+\w+\s+in\s+', # for loops
r'while\s+.*:\s*\n', # while loops
]
loose_code = []
for pattern in python_patterns:
matches = re.findall(pattern, question, re.MULTILINE)
if matches:
loose_code.extend(matches)
if loose_code:
extracted_code.append("Detected Python patterns:\n" + "\n".join(f"- {code.strip()}" for code in loose_code[:5]))
# Build response
response_parts = []
if extracted_code:
response_parts.append("Found Python code in question:")
response_parts.extend(extracted_code)
if code_indicators:
response_parts.append(f"\nCode-related phrases detected: {', '.join(code_indicators)}")
if not extracted_code and not code_indicators:
return "No Python code detected in the question"
return "\n\n".join(response_parts)
except Exception as e:
return f"Error analyzing code in question: {str(e)}"
@tool
def get_youtube_transcript(url: str) -> str:
"""
Extract transcript/subtitles from YouTube videos.
Useful for questions asking about video content.
"""
try:
# Handle list input
if isinstance(url, list):
url = " ".join(str(item) for item in url)
elif not isinstance(url, str):
url = str(url)
# Extract video ID from URL
import re
video_id_match = re.search(r'(?:v=|/)([0-9A-Za-z_-]{11}).*', url)
if not video_id_match:
return "Error: Invalid YouTube URL"
video_id = video_id_match.group(1)
# Try to get transcript
try:
from youtube_transcript_api import YouTubeTranscriptApi
import time
# Add a small delay to avoid rate limiting
time.sleep(1)
# Try to get transcript in different languages
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
# Try English first
transcript = None
try:
transcript = transcript_list.find_transcript(['en'])
except:
# Get any available transcript
try:
transcript = transcript_list.find_manually_created_transcript()
except:
try:
transcript = transcript_list.find_generated_transcript()
except:
pass
if transcript:
# Get the actual transcript data
transcript_data = transcript.fetch()
# Combine all text - handle both list and dict formats
if isinstance(transcript_data, list):
full_text = " ".join([entry.get('text', '') if isinstance(entry, dict) else str(entry) for entry in transcript_data])
else:
# Handle other formats
full_text = str(transcript_data)
# For specific dialogue questions, also return with timestamps
if any(phrase in url.lower() or phrase in str(url).lower()
for phrase in ["say", "response", "answer", "dialogue"]):
# Return last 500 chars for context
return f"Transcript excerpt: ...{full_text[-500:]}"
return f"Full transcript: {full_text[:1000]}..." if len(full_text) > 1000 else f"Full transcript: {full_text}"
except Exception as yt_error:
error_str = str(yt_error)
print(f"YouTube transcript error: {yt_error}")
# Handle rate limiting specifically
if "429" in error_str or "Too Many Requests" in error_str:
return "Unable to determine"
# Try alternative method with pytube
try:
from pytube import YouTube
import time
# Add delay to avoid rate limiting
time.sleep(1)
yt = YouTube(url)
# Get video title and description for context
title = yt.title if hasattr(yt, 'title') else "Unknown"
description = yt.description[:200] if hasattr(yt, 'description') and yt.description else "No description"
return f"Video info - Title: {title}\nDescription: {description}\nNote: Transcript not available"
except Exception as pytube_error:
print(f"Pytube error: {pytube_error}")
return "Unable to determine"
except Exception as e:
return f"Error accessing YouTube video: {str(e)}"
@tool
def analyze_multimedia_reference(question: str) -> str:
"""
Detect and provide guidance for multimedia content in questions.
Returns specific answers for common multimedia patterns.
"""
try:
# Handle list input
if isinstance(question, list):
question = " ".join(str(item) for item in question)
elif not isinstance(question, str):
question = str(question)
question_lower = question.lower()
# More intelligent responses based on question context
# Excel/Spreadsheet questions asking for numeric values
if any(term in question_lower for term in ["excel", "spreadsheet", ".xlsx", ".xls", ".csv"]):
if any(term in question_lower for term in ["total", "sum", "how much", "how many", "amount"]):
# For numeric questions about spreadsheets, we can't determine the value
return "Cannot access spreadsheet - provide final answer: Unable to determine"
elif "sales" in question_lower and "total" in question_lower:
return "Cannot access sales data - provide final answer: Unable to determine"
# Python code questions
if "attached" in question_lower and ("python" in question_lower or "code" in question_lower):
if "output" in question_lower and ("numeric" in question_lower or "final" in question_lower):
return "Cannot access attached code - provide final answer: Unable to determine"
elif "fix" in question_lower or "correct" in question_lower:
return "Cannot access attached code to fix - provide final answer: Unable to determine"
# PDF questions asking for counts
if ("pdf" in question_lower or ".pdf" in question_lower) and any(term in question_lower for term in ["how many", "count", "times"]):
return "Cannot access PDF to count - provide final answer: Unable to determine"
# Image questions
if any(term in question_lower for term in ["image", "picture", "photo", ".png", ".jpg", ".jpeg"]):
if "chess" in question_lower:
return "Cannot access chess position image - provide final answer: Unable to determine"
elif any(term in question_lower for term in ["color", "what is", "describe"]):
return "Cannot access image - provide final answer: Unable to determine"
# Audio questions
if any(term in question_lower for term in ["audio", ".mp3", ".wav", "recording"]):
if any(term in question_lower for term in ["transcribe", "what does", "study", "exam"]):
return "Cannot access audio file - provide final answer: Unable to determine"
return "No specific multimedia pattern requiring 'Unable to determine' response"
except Exception as e:
return f"Error analyzing multimedia: {str(e)}"
@tool
def download_and_process_file(url: str, file_type: str = None) -> str:
"""
Download and process files from URLs (Excel, CSV, PDF, etc).
Useful when questions reference files by URL.
"""
try:
# Handle list input
if isinstance(url, list):
url = " ".join(str(item) for item in url)
elif not isinstance(url, str):
url = str(url)
# Clean URL
url = url.strip()
# Try to determine file type from URL if not provided
if not file_type:
if any(ext in url.lower() for ext in ['.xlsx', '.xls']):
file_type = 'excel'
elif '.csv' in url.lower():
file_type = 'csv'
elif '.pdf' in url.lower():
file_type = 'pdf'
elif any(ext in url.lower() for ext in ['.txt', '.text']):
file_type = 'text'
else:
return "Unable to determine file type from URL"
# Download the file
import requests
from io import BytesIO, StringIO
try:
response = requests.get(url, timeout=15, headers={'User-Agent': 'Mozilla/5.0'})
response.raise_for_status()
except requests.exceptions.RequestException as e:
return f"Failed to download file: {str(e)}"
# Process based on file type
if file_type == 'excel':
try:
import pandas as pd
df = pd.read_excel(BytesIO(response.content))
# Provide summary of Excel file
info = []
info.append(f"Excel file loaded successfully")
info.append(f"Shape: {df.shape[0]} rows, {df.shape[1]} columns")
info.append(f"Columns: {', '.join(df.columns)}")
# If numeric columns exist, provide sums
numeric_cols = df.select_dtypes(include=['number']).columns
if len(numeric_cols) > 0:
info.append("\nNumeric column sums:")
for col in numeric_cols:
total = df[col].sum()
info.append(f" {col}: {total}")
# Check for common patterns
if 'sales' in ' '.join(df.columns).lower():
sales_cols = [col for col in df.columns if 'sales' in col.lower()]
if sales_cols:
total_sales = df[sales_cols].sum().sum()
info.append(f"\nTotal sales: {total_sales}")
return '\n'.join(info)
except Exception as e:
return f"Error processing Excel file: {str(e)}"
elif file_type == 'csv':
try:
import pandas as pd
df = pd.read_csv(StringIO(response.text))
info = []
info.append(f"CSV file loaded successfully")
info.append(f"Shape: {df.shape[0]} rows, {df.shape[1]} columns")
info.append(f"Columns: {', '.join(df.columns)}")
# Provide numeric summaries
numeric_cols = df.select_dtypes(include=['number']).columns
if len(numeric_cols) > 0:
info.append("\nNumeric column sums:")
for col in numeric_cols:
total = df[col].sum()
info.append(f" {col}: {total}")
return '\n'.join(info)
except Exception as e:
return f"Error processing CSV file: {str(e)}"
elif file_type == 'pdf':
try:
import PyPDF2
pdf_reader = PyPDF2.PdfReader(BytesIO(response.content))
info = []
info.append(f"PDF file loaded successfully")
info.append(f"Number of pages: {len(pdf_reader.pages)}")
# Extract text from all pages
full_text = ""
for page in pdf_reader.pages:
text = page.extract_text()
full_text += text + "\n"
# Count occurrences of common words if asked
info.append(f"Total characters: {len(full_text)}")
info.append(f"Total words: {len(full_text.split())}")
# Store the text for searching
info.append("\nFull text extracted and available for searching")
return '\n'.join(info) + f"\n\nFull text (first 1000 chars):\n{full_text[:1000]}..."
except Exception as e:
return f"Error processing PDF file: {str(e)}"
elif file_type == 'text':
try:
text_content = response.text
info = []
info.append(f"Text file loaded successfully")
info.append(f"Length: {len(text_content)} characters")
info.append(f"Lines: {len(text_content.splitlines())}")
info.append(f"\nContent preview:\n{text_content[:500]}...")
return '\n'.join(info)
except Exception as e:
return f"Error processing text file: {str(e)}"
else:
return f"Unsupported file type: {file_type}"
except Exception as e:
return f"Error downloading/processing file: {str(e)}"
@tool
def extract_file_urls(question: str) -> str:
"""
Extract file URLs from questions for downloading.
Returns URLs of files that can be downloaded.
"""
try:
# Handle list input
if isinstance(question, list):
question = " ".join(str(item) for item in question)
elif not isinstance(question, str):
question = str(question)
import re
# Pattern to find URLs ending with file extensions
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+\.(?:xlsx|xls|csv|pdf|txt|doc|docx)'
urls = re.findall(url_pattern, question, re.IGNORECASE)
if urls:
return f"Found downloadable file URLs: {', '.join(urls)}"
else:
return "No downloadable file URLs found in the question"
except Exception as e:
return f"Error extracting URLs: {str(e)}"
@tool
def get_current_datetime() -> str:
"""Get the current date and time."""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S %Z")
# --- LangGraph Agent ---
class LangGraphAgent:
def __init__(self, anthropic_api_key: Optional[str] = None):
# Initialize LLM
api_key = anthropic_api_key or os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY must be provided or set in environment variables")
self.llm = ChatAnthropic(
api_key=api_key,
model="claude-3-5-sonnet-20241022",
temperature=0.3,
max_tokens=4096
)
# Initialize tools
self.tools = [
web_search,
calculator,
python_executor,
extract_image_from_question,
analyze_attachments,
analyze_reversed_text,
analyze_code_in_question,
get_youtube_transcript,
analyze_multimedia_reference,
extract_file_urls,
download_and_process_file,
get_current_datetime
]
# Bind tools to LLM
self.llm_with_tools = self.llm.bind_tools(self.tools)
# Create tool node
self.tool_node = ToolNode(self.tools)
# Build the graph
self.graph = self._build_graph()
def _build_graph(self):
workflow = StateGraph(AgentState)
# Define the agent node
workflow.add_node("agent", self._call_model)
workflow.add_node("tools", self.tool_node)
# Set entry point
workflow.set_entry_point("agent")
# Add conditional edge
workflow.add_conditional_edges(
"agent",
self._should_continue,
{
"continue": "tools",
"end": END
}
)
# Add edge from tools back to agent
workflow.add_edge("tools", "agent")
return workflow.compile()
def _call_model(self, state: AgentState):
"""Call the model with tools."""
messages = state["messages"]
response = self.llm_with_tools.invoke(messages)
return {"messages": [response]}
def _should_continue(self, state: AgentState):
"""Determine if we should continue with tools or end."""
last_message = state["messages"][-1]
# If there are tool calls, continue
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "continue"
# Count how many tool calls we've made
tool_call_count = 0
for msg in state["messages"]:
if hasattr(msg, "tool_calls") and msg.tool_calls:
tool_call_count += len(msg.tool_calls)
# Force more tool usage for better accuracy
if tool_call_count < 2:
# Check if we have a final answer yet
if hasattr(last_message, "content") and last_message.content:
content_str = last_message.content if isinstance(last_message.content, str) else str(last_message.content)
has_final_answer = "FINAL ANSWER:" in content_str
# If no final answer and still early, encourage more research
if not has_final_answer and tool_call_count < 3:
return "continue"
# Stop if we have made enough attempts or have a clear final answer
content_str = str(last_message.content) if hasattr(last_message, "content") else ""
if tool_call_count >= 6 or "FINAL ANSWER:" in content_str:
return "end"
return "end"
def run(self, question: str) -> str:
"""Run the agent on a question."""
print(f"\nDEBUG LangGraphAgent.run():")
print(f" Input type: {type(question)}")
print(f" Input value: {repr(question)[:200]}...")
system_prompt = """You are solving GAIA benchmark questions that require deep research and analysis.
IMPORTANT: You should:
1. Use multiple tools to thoroughly research the question
2. Search for specific facts, verify information, and perform calculations
3. Think step-by-step and use chain-of-thought reasoning
4. Double-check facts with multiple searches if needed
5. Use python_executor for complex data analysis or calculations
At the very end, after all your research and reasoning, provide ONLY the final answer in this format:
FINAL ANSWER: [your answer here]
The final answer should contain ONLY the requested information:
- Numbers: just the number (e.g., "5" not "5 people")
- Years: just the year (e.g., "1969")
- Names: exact name with proper capitalization
- Yes/No: exactly "Yes" or "No"
- Lists: comma-separated values
Available tools:
- web_search: Search for current information (use multiple times with different queries)
- calculator: Perform calculations and unit conversions
- python_executor: Complex analysis, data processing, date calculations
- analyze_attachments: Detect references to external files/media
- analyze_reversed_text: Decode backwards or puzzle text
- analyze_code_in_question: Extract and analyze Python code from questions
- get_youtube_transcript: Extract transcripts from YouTube videos
- analyze_multimedia_reference: Handle questions about images, audio, PDFs, Excel files
- extract_file_urls: Find downloadable file URLs in questions
- download_and_process_file: Download and analyze files from URLs (Excel, CSV, PDF)
- get_current_datetime: Get current date/time
For questions mentioning "attached code" or containing code snippets:
1. First use analyze_code_in_question to extract the code
2. Then use python_executor to run it and get the output
For questions with YouTube videos:
1. Use get_youtube_transcript to extract the video transcript
2. Search the transcript for the relevant information
For questions mentioning files with URLs:
1. Use extract_file_urls to find any file URLs in the question
2. If URLs are found, use download_and_process_file to download and analyze the file
3. Extract the specific information requested (totals, counts, etc.)
4. For Excel files asking for totals, sum the relevant columns
5. For PDFs asking for word counts, search the extracted text
For questions mentioning attached files without URLs:
1. Use analyze_multimedia_reference to check if file access is needed
2. Return "Unable to determine" if the file cannot be accessed"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=question)
]
try:
# Configure for more tool usage
config = {
"recursion_limit": 25,
"configurable": {
"thread_id": "gaia_evaluation"
}
}
result = self.graph.invoke({"messages": messages}, config)
# Extract the final answer
final_answer = self._extract_final_answer(result["messages"])
return final_answer
except Exception as e:
return f"Error: {str(e)}"
def _extract_final_answer(self, messages: List[BaseMessage]) -> str:
"""Extract the final answer from the message history."""
# Look through messages in reverse order
for message in reversed(messages):
if hasattr(message, "content") and message.content:
content = message.content.strip()
# Look for FINAL ANSWER marker
if "FINAL ANSWER:" in content:
parts = content.split("FINAL ANSWER:")
if len(parts) >= 2:
answer = parts[-1].strip()
# Clean up the answer
answer = self._clean_answer(answer)
return answer
# If no marker found in last AI message, extract from it
if isinstance(message, AIMessage):
return self._clean_answer(content)
return "Unable to determine"
def _clean_answer(self, answer: str) -> str:
"""Clean and format the final answer."""
# Handle list input
if isinstance(answer, list):
answer = " ".join(str(item) for item in answer)
elif not isinstance(answer, str):
answer = str(answer)
answer = answer.strip()
# Remove quotes if they wrap the entire answer
if len(answer) > 2 and answer[0] == '"' and answer[-1] == '"':
answer = answer[1:-1]
# Remove common prefixes
prefixes_to_remove = [
"the answer is", "answer:", "based on", "according to",
"my research shows", "i found that", "the result is",
"after searching", "from the", "it is", "it's", "there are",
"there is", "approximately", "about", "around"
]
lower_answer = answer.lower()
for prefix in prefixes_to_remove:
if lower_answer.startswith(prefix):
answer = answer[len(prefix):].strip()
if answer and answer[0] == ':':
answer = answer[1:].strip()
lower_answer = answer.lower()
# Handle specific patterns
if "unable to" in lower_answer or "cannot" in lower_answer:
return "Unable to determine"
# Clean yes/no answers
if lower_answer in ["yes.", "no.", "yes,", "no,"]:
return answer[:-1]
# Remove trailing periods for single-word answers
if answer.endswith(".") and " " not in answer:
answer = answer[:-1]
return answer
# --- Basic Agent Definition ---
class BasicAgent:
def __init__(self):
print("Initializing LangGraph Agent...")
# Try to get API key from environment or use a placeholder
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
print("Warning: ANTHROPIC_API_KEY not found in environment variables.")
print("Please set it in the Gradio interface or as an environment variable.")
self.agent = None
else:
try:
self.agent = LangGraphAgent(api_key)
print("LangGraph Agent initialized successfully.")
except Exception as e:
print(f"Error initializing LangGraph Agent: {e}")
self.agent = None
def set_api_key(self, api_key: str):
"""Set or update the API key."""
if api_key:
try:
self.agent = LangGraphAgent(api_key)
return True
except Exception as e:
print(f"Error setting API key: {e}")
return False
return False
def __call__(self, question: str) -> str:
print(f"\n{'='*60}")
print(f"DEBUG: Agent received question")
print(f"Question type: {type(question)}")
print(f"Question length: {len(question) if isinstance(question, str) else 'N/A'}")
print(f"Question preview: {str(question)[:200]}...")
print(f"{'='*60}\n")
if not self.agent:
return "Error: Agent not initialized. Please set your ANTHROPIC_API_KEY."
try:
answer = self.agent.run(question)
print(f"\nDEBUG: Agent generated answer")
print(f"Answer type: {type(answer)}")
print(f"Answer preview: {str(answer)[:200]}...")
return answer
except Exception as e:
error_msg = f"Error processing question: {str(e)}"
print(f"\nDEBUG: Error occurred!")
print(f"Error type: {type(e)}")
print(f"Error details: {str(e)}")
import traceback
print(f"Traceback:\n{traceback.format_exc()}")
return error_msg
# Global agent instance
global_agent = None
def validate_api_keys(anthropic_key: str, serpapi_key: str = None, tavily_key: str = None):
"""Validate the API keys before using them."""
results = []
# Test Anthropic API key
if anthropic_key:
try:
test_llm = ChatAnthropic(
api_key=anthropic_key,
model="claude-3-5-sonnet-20241022",
max_tokens=10
)
# Try a simple test call
test_llm.invoke([HumanMessage(content="test")])
results.append("โ
Anthropic API key is valid")
except Exception as e:
error_msg = str(e)
if "401" in error_msg or "authentication" in error_msg.lower():
results.append("โ Anthropic API key is invalid or expired")
else:
results.append(f"โ Anthropic API error: {error_msg[:100]}...")
else:
results.append("โ No Anthropic API key provided")
# Test Tavily API key
if tavily_key:
try:
import requests
test_url = "https://api.tavily.com/search"
test_data = {
"api_key": tavily_key,
"query": "test",
"max_results": 1
}
response = requests.post(test_url, json=test_data, timeout=5)
if response.status_code == 200:
results.append("โ
Tavily API key is valid")
else:
results.append(f"โ Tavily API key error: {response.status_code}")
except Exception as e:
results.append(f"โ ๏ธ Tavily API test error: {str(e)[:100]}...")
else:
results.append("โน๏ธ No Tavily API key provided")
# Test SerpAPI key
if serpapi_key:
try:
params = {
"q": "test",
"api_key": serpapi_key,
"num": 1,
"engine": "google"
}
search = GoogleSearch(params)
search.get_dict()
results.append("โ
SerpAPI key is valid")
except Exception as e:
results.append(f"โ ๏ธ SerpAPI key error: {str(e)[:100]}...")
else:
results.append("โน๏ธ No SerpAPI key provided")
return "\n".join(results)
def initialize_agent_with_key(api_key: str):
"""Initialize the global agent with the provided API key."""
global global_agent
# First validate the key
validation_result = validate_api_keys(api_key)
if "โ Anthropic API key is invalid" in validation_result:
return validation_result
if api_key:
if global_agent is None:
global_agent = BasicAgent()
success = global_agent.set_api_key(api_key)
if success:
return f"{validation_result}\n\nโ
Agent initialized successfully!"
else:
return "โ Failed to initialize agent. Please check if your API key is valid."
return "โ Please provide an API key."
def run_and_submit_all(api_key: str, profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers,
and displays the results.
"""
global global_agent
# Initialize agent if needed
if global_agent is None or api_key:
init_msg = initialize_agent_with_key(api_key)
print(init_msg)
if "Failed" in init_msg or "Please provide" in init_msg:
return init_msg, None
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Use the global agent
agent = global_agent
if not agent:
return "Error: Agent not initialized properly.", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local"
print(f"Agent code URL: {agent_code}")
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except Exception as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
# 3. Run your Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for i, item in enumerate(questions_data, 1):
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
print(f"\nProcessing question {i}/{len(questions_data)}: {task_id}")
try:
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text,
"Submitted Answer": submitted_answer[:200] + "..." if len(submitted_answer) > 200 else submitted_answer
})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
error_answer = f"AGENT ERROR: {e}"
answers_payload.append({"task_id": task_id, "submitted_answer": error_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text[:100] + "...",
"Submitted Answer": error_answer
})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_payload
}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except Exception as e:
status_message = f"Submission Failed: {str(e)}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# LangGraph Agent for GAIA Evaluation")
gr.Markdown(
"""
**This agent uses LangGraph with multiple tools to answer complex questions:**
- ๐ Web Search (Tavily โ DuckDuckGo โ SerpAPI)
- ๐งฎ Calculator for mathematical computations
- ๐ Python code execution
- ๐
Current date/time
- ๐ผ๏ธ Image analysis (description-based)
**Instructions:**
1. Enter your Anthropic API key (Claude Sonnet 3.5)
2. Optionally enter your Tavily API key for best web search (free tier: 1000/month)
3. Optionally enter your SerpAPI key as backup
4. Log in to your Hugging Face account
5. Click 'Run Evaluation & Submit All Answers'
**Search Priority:** Tavily (if key) โ DuckDuckGo (free) โ SerpAPI (if key)
"""
)
with gr.Row():
with gr.Column():
gr.LoginButton()
with gr.Row():
with gr.Column():
api_key_input = gr.Textbox(
label="Anthropic API Key (Required)",
placeholder="sk-ant-...",
type="password"
)
tavily_key_input = gr.Textbox(
label="Tavily API Key (Recommended for web search)",
placeholder="tvly-...",
type="password"
)
serpapi_key_input = gr.Textbox(
label="SerpAPI Key (Optional backup)",
placeholder="Your SerpAPI key...",
type="password"
)
with gr.Row():
validate_button = gr.Button("Validate API Keys", variant="secondary")
init_button = gr.Button("Initialize Agent", variant="secondary")
run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary")
status_output = gr.Textbox(label="Status / Results", lines=8, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
# Set environment variables when provided
def set_tavily_key(key):
if key:
os.environ["TAVILY_API_KEY"] = key
return "โ
Tavily API key set!"
return ""
def set_serpapi_key(key):
if key:
os.environ["SERPAPI_KEY"] = key
return "โ
SerpAPI key set!"
return ""
tavily_key_input.change(set_tavily_key, inputs=[tavily_key_input], outputs=[])
serpapi_key_input.change(set_serpapi_key, inputs=[serpapi_key_input], outputs=[])
# Function to validate all keys
def validate_all_keys(anthropic_key, tavily_key, serpapi_key):
if tavily_key:
os.environ["TAVILY_API_KEY"] = tavily_key
if serpapi_key:
os.environ["SERPAPI_KEY"] = serpapi_key
return validate_api_keys(anthropic_key, serpapi_key, tavily_key)
validate_button.click(
fn=validate_all_keys,
inputs=[api_key_input, tavily_key_input, serpapi_key_input],
outputs=[status_output]
)
init_button.click(
fn=initialize_agent_with_key,
inputs=[api_key_input],
outputs=[status_output]
)
run_button.click(
fn=run_and_submit_all,
inputs=[api_key_input],
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
print("LangGraph Agent for GAIA Evaluation")
print("Required: ANTHROPIC_API_KEY")
print("Recommended: TAVILY_API_KEY for best web search (1000 free/month)")
print("Optional: SERPAPI_KEY as backup")
print("Fallback: DuckDuckGo search (no API key needed)")
print("-"*74 + "\n")
demo.launch(debug=True, share=False) |