|  | import requests | 
					
						
						|  | import io | 
					
						
						|  | import base64 | 
					
						
						|  | import openai | 
					
						
						|  | from openai import OpenAI | 
					
						
						|  | from smolagents import tool | 
					
						
						|  | import os | 
					
						
						|  | import pandas as pd | 
					
						
						|  | import functools | 
					
						
						|  | from typing import List, Optional, Dict, Any | 
					
						
						|  | import sys | 
					
						
						|  |  | 
					
						
						|  | import av | 
					
						
						|  | from yt_dlp import YoutubeDL | 
					
						
						|  |  | 
					
						
						|  | from PIL import Image | 
					
						
						|  | import wikipediaapi | 
					
						
						|  | import tempfile | 
					
						
						|  |  | 
					
						
						|  | model_id = "gpt-4.1" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def read_image(query: str, img_url: str) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Use a visual question answering (VQA) model to generate a response to a query based on an image. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | query (str): A natural language question about the image. | 
					
						
						|  | img_url (str): The URL of the image to analyze. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: A response generated by the VQA model based on the provided image and question. | 
					
						
						|  | """ | 
					
						
						|  | client = OpenAI() | 
					
						
						|  | response = client.responses.create( | 
					
						
						|  | model=model_id, | 
					
						
						|  | input=[ | 
					
						
						|  | { | 
					
						
						|  | "role": "user", | 
					
						
						|  | "content": [ | 
					
						
						|  | {"type": "input_text", "text": query}, | 
					
						
						|  | { | 
					
						
						|  | "type": "input_image", | 
					
						
						|  | "image_url": img_url, | 
					
						
						|  | }, | 
					
						
						|  | ], | 
					
						
						|  | } | 
					
						
						|  | ], | 
					
						
						|  | ) | 
					
						
						|  | return response.output_text | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def read_code(file_url: str) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Read the contents of a code file such as py file instead of executing it. Use this tool to analyze a code snippet. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_url (str): The URL of the code file to retrieve. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: The content of the file as a string. | 
					
						
						|  | """ | 
					
						
						|  | response = requests.get(file_url) | 
					
						
						|  | response.raise_for_status() | 
					
						
						|  | return response.text | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def transcribe_audio(file_url: str, file_name: str) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Download and transcribe an audio file using transcription model. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_url (str): Direct URL to the audio file (e.g., .mp3, .wav). | 
					
						
						|  | file_name (str): Filename including extension, used to determine format. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: The transcribed text from the audio file. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | response = requests.get(file_url) | 
					
						
						|  | response.raise_for_status() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | extension = file_name.split(".")[-1].lower() or "mp3" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | audio_file = io.BytesIO(response.content) | 
					
						
						|  | audio_file.name = f"audio.{extension}" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | client = OpenAI() | 
					
						
						|  | transcription = client.audio.transcriptions.create( | 
					
						
						|  | model="gpt-4o-transcribe", file=audio_file | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | return transcription.text | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _pytube_buffer(url: str) -> Optional[io.BytesIO]: | 
					
						
						|  | try: | 
					
						
						|  | from pytube import YouTube | 
					
						
						|  |  | 
					
						
						|  | yt = YouTube(url) | 
					
						
						|  | stream = ( | 
					
						
						|  | yt.streams.filter(progressive=True, file_extension="mp4") | 
					
						
						|  | .order_by("resolution") | 
					
						
						|  | .desc() | 
					
						
						|  | .first() | 
					
						
						|  | ) | 
					
						
						|  | if stream is None: | 
					
						
						|  | raise RuntimeError("No MP4 with audio found") | 
					
						
						|  | buf = io.BytesIO() | 
					
						
						|  | stream.stream_to_buffer(buf) | 
					
						
						|  | buf.seek(0) | 
					
						
						|  | return buf | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"[youtube_to_buffer] PyTube failed → {e}", file=sys.stderr) | 
					
						
						|  | return None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _ytdlp_buffer(url: str) -> io.BytesIO: | 
					
						
						|  | """ | 
					
						
						|  | Return a BytesIO containing some MP4 video stream for `url`. | 
					
						
						|  | Works whether YouTube serves a progressive file or separate A/V. | 
					
						
						|  | """ | 
					
						
						|  | ydl_opts = { | 
					
						
						|  | "quiet": True, | 
					
						
						|  | "skip_download": True, | 
					
						
						|  | "format": "bestvideo[ext=mp4]/best[ext=mp4]/best", | 
					
						
						|  | } | 
					
						
						|  | with YoutubeDL(ydl_opts) as ydl: | 
					
						
						|  | info = ydl.extract_info(url, download=False) | 
					
						
						|  | if "entries" in info: | 
					
						
						|  | info = info["entries"][0] | 
					
						
						|  |  | 
					
						
						|  | if "url" in info: | 
					
						
						|  | video_urls = [info["url"]] | 
					
						
						|  |  | 
					
						
						|  | elif "requested_formats" in info: | 
					
						
						|  | video_urls = [ | 
					
						
						|  | fmt["url"] | 
					
						
						|  | for fmt in info["requested_formats"] | 
					
						
						|  | if fmt.get("vcodec") != "none" | 
					
						
						|  | ] | 
					
						
						|  | if not video_urls: | 
					
						
						|  | raise RuntimeError("yt-dlp returned audio-only formats") | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  | raise RuntimeError("yt-dlp could not extract a stream URL") | 
					
						
						|  |  | 
					
						
						|  | buf = io.BytesIO() | 
					
						
						|  | for direct_url in video_urls: | 
					
						
						|  | with requests.get(direct_url, stream=True) as r: | 
					
						
						|  | r.raise_for_status() | 
					
						
						|  | for chunk in r.iter_content(chunk_size=1 << 16): | 
					
						
						|  | buf.write(chunk) | 
					
						
						|  |  | 
					
						
						|  | buf.seek(0) | 
					
						
						|  | return buf | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @functools.lru_cache(maxsize=8) | 
					
						
						|  | def youtube_to_buffer(url: str) -> io.BytesIO: | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | Return a BytesIO containing a single progressive MP4 | 
					
						
						|  | (H.264 + AAC) – the safest thing PyAV can open everywhere. | 
					
						
						|  | """ | 
					
						
						|  | ydl_opts = { | 
					
						
						|  | "quiet": True, | 
					
						
						|  | "skip_download": True, | 
					
						
						|  |  | 
					
						
						|  | "format": ( | 
					
						
						|  | "best[ext=mp4][vcodec^=avc1][acodec!=none]" | 
					
						
						|  | "/best[ext=mp4][acodec!=none]" | 
					
						
						|  | ), | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | with YoutubeDL(ydl_opts) as ydl: | 
					
						
						|  | info = ydl.extract_info(url, download=False) | 
					
						
						|  | if "entries" in info: | 
					
						
						|  | info = info["entries"][0] | 
					
						
						|  |  | 
					
						
						|  | direct_url = info.get("url") | 
					
						
						|  | if not direct_url: | 
					
						
						|  | raise RuntimeError("yt-dlp could not find a progressive MP4 track") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | buf = io.BytesIO() | 
					
						
						|  | with requests.get(direct_url, stream=True) as r: | 
					
						
						|  | r.raise_for_status() | 
					
						
						|  | for chunk in r.iter_content(chunk_size=1 << 17): | 
					
						
						|  | buf.write(chunk) | 
					
						
						|  |  | 
					
						
						|  | buf.seek(0) | 
					
						
						|  | return buf | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def sample_frames(video_bytes: io.BytesIO, n_frames: int = 6) -> List[Image.Image]: | 
					
						
						|  | """Decode `n_frames` uniformly spaced RGB frames as PIL images.""" | 
					
						
						|  | container = av.open(video_bytes, metadata_errors="ignore") | 
					
						
						|  | video = container.streams.video[0] | 
					
						
						|  | total = video.frames or 0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | step = max(1, total // n_frames) if total else 30 | 
					
						
						|  |  | 
					
						
						|  | frames: list[Image.Image] = [] | 
					
						
						|  | for i, frame in enumerate(container.decode(video=0)): | 
					
						
						|  | if i % step == 0: | 
					
						
						|  | frames.append(frame.to_image()) | 
					
						
						|  | if len(frames) >= n_frames: | 
					
						
						|  | break | 
					
						
						|  | container.close() | 
					
						
						|  | return frames | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def pil_to_data_url(img: Image.Image, quality: int = 80) -> str: | 
					
						
						|  | buf = io.BytesIO() | 
					
						
						|  | img.save(buf, format="JPEG", quality=quality, optimize=True) | 
					
						
						|  | b64 = base64.b64encode(buf.getvalue()).decode() | 
					
						
						|  | return f"data:image/jpeg;base64,{b64}" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def save_audio_stream_to_temp_wav_file(video_bytes: io.BytesIO) -> Optional[str]: | 
					
						
						|  | """ | 
					
						
						|  | Extracts the audio stream from video_bytes, saves it as a temporary WAV file, | 
					
						
						|  | and returns the path to the file. | 
					
						
						|  | Returns None if no audio stream is found or an error occurs. | 
					
						
						|  | """ | 
					
						
						|  | try: | 
					
						
						|  | video_bytes.seek(0) | 
					
						
						|  | input_container = av.open(video_bytes, metadata_errors="ignore") | 
					
						
						|  |  | 
					
						
						|  | if not input_container.streams.audio: | 
					
						
						|  | print("No audio streams found in the video.", file=sys.stderr) | 
					
						
						|  | return None | 
					
						
						|  | input_audio_stream = input_container.streams.audio[0] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | 
					
						
						|  | temp_audio_file_path = tmp_file.name | 
					
						
						|  |  | 
					
						
						|  | output_container = av.open(temp_audio_file_path, mode="w", format="wav") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | channel_layout = "stereo" | 
					
						
						|  | if ( | 
					
						
						|  | hasattr(input_audio_stream.codec_context, "layout") | 
					
						
						|  | and input_audio_stream.codec_context.layout | 
					
						
						|  | ): | 
					
						
						|  | channel_layout = input_audio_stream.codec_context.layout.name | 
					
						
						|  | elif ( | 
					
						
						|  | hasattr(input_audio_stream.codec_context, "channels") | 
					
						
						|  | and input_audio_stream.codec_context.channels == 1 | 
					
						
						|  | ): | 
					
						
						|  | channel_layout = "mono" | 
					
						
						|  |  | 
					
						
						|  | output_audio_stream = output_container.add_stream( | 
					
						
						|  | "pcm_s16le", | 
					
						
						|  | rate=input_audio_stream.codec_context.sample_rate, | 
					
						
						|  | layout=channel_layout, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | for frame in input_container.decode(input_audio_stream): | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for packet in output_audio_stream.encode(frame): | 
					
						
						|  | output_container.mux(packet) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for packet in output_audio_stream.encode(): | 
					
						
						|  | output_container.mux(packet) | 
					
						
						|  |  | 
					
						
						|  | output_container.close() | 
					
						
						|  | input_container.close() | 
					
						
						|  | return temp_audio_file_path | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"Error extracting audio to temp WAV file: {e}", file=sys.stderr) | 
					
						
						|  |  | 
					
						
						|  | if "temp_audio_file_path" in locals() and os.path.exists(temp_audio_file_path): | 
					
						
						|  | os.remove(temp_audio_file_path) | 
					
						
						|  | return None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def run_video(query: str, url: str) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Get a YouTube video from url and return an answer to a natural-language query using the video. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | query (str):  A natural-language question whose answer is expected to be found in the visual content of the video. | 
					
						
						|  | url (str): Fully qualified URL of the YouTube video to analyze. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: A response generated by the VQA model based on the provided video and question. | 
					
						
						|  | """ | 
					
						
						|  | n_frames = 4 | 
					
						
						|  | buff = youtube_to_buffer(url) | 
					
						
						|  | if buff is None: | 
					
						
						|  | return "Error: Could not download or buffer the video." | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | frames = sample_frames(buff, n_frames=n_frames) | 
					
						
						|  | buff.seek(0) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | transcript = "[Audio could not be processed]" | 
					
						
						|  | audio_file_path = None | 
					
						
						|  | try: | 
					
						
						|  | audio_file_path = save_audio_stream_to_temp_wav_file(buff) | 
					
						
						|  | if audio_file_path: | 
					
						
						|  | with open(audio_file_path, "rb") as audio_data: | 
					
						
						|  |  | 
					
						
						|  | transcription_response = openai.audio.transcriptions.create( | 
					
						
						|  | model="gpt-4o-transcribe", file=audio_data | 
					
						
						|  | ) | 
					
						
						|  | transcript = transcription_response.text | 
					
						
						|  | else: | 
					
						
						|  | transcript = "[No audio stream found or error during extraction]" | 
					
						
						|  | print( | 
					
						
						|  | "No audio file path returned, skipping transcription.", file=sys.stderr | 
					
						
						|  | ) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"Error during audio transcription: {e}", file=sys.stderr) | 
					
						
						|  | transcript = f"[Error during audio transcription: {e}]" | 
					
						
						|  | finally: | 
					
						
						|  | if audio_file_path and os.path.exists(audio_file_path): | 
					
						
						|  | os.remove(audio_file_path) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | prompt_text = f"Original Query: {query}\n\nVideo Transcript:\n{transcript}\n\nKey Visual Frames (analyze these along with the transcript to answer the query):" | 
					
						
						|  |  | 
					
						
						|  | content = [{"type": "text", "text": prompt_text}] | 
					
						
						|  |  | 
					
						
						|  | for img in frames: | 
					
						
						|  | content.append( | 
					
						
						|  | { | 
					
						
						|  | "type": "image_url", | 
					
						
						|  | "image_url": {"url": pil_to_data_url(img)}, | 
					
						
						|  | } | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | resp = openai.chat.completions.create( | 
					
						
						|  | model=model_id, | 
					
						
						|  | messages=[{"role": "user", "content": content}], | 
					
						
						|  | temperature=0.1, | 
					
						
						|  | ) | 
					
						
						|  | result = resp.choices[0].message.content.strip() | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"Error calling OpenAI API: {e}", file=sys.stderr) | 
					
						
						|  | result = f"[Error processing with AI model: {e}]" | 
					
						
						|  |  | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_image(response, filename, content_type): | 
					
						
						|  | """Process image files - convert to base64 data URL for vision models""" | 
					
						
						|  | img_data = base64.b64encode(response.content).decode("utf-8") | 
					
						
						|  | data_url = f"data:{content_type};base64,{img_data}" | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "file_type": "image", | 
					
						
						|  | "filename": filename, | 
					
						
						|  | "content_type": content_type, | 
					
						
						|  | "data_url": data_url, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_audio(response, filename, content_type): | 
					
						
						|  | """Process audio files - either return data URL or save to temp file for processing""" | 
					
						
						|  | audio_data = base64.b64encode(response.content).decode("utf-8") | 
					
						
						|  | data_url = f"data:{content_type};base64,{audio_data}" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | audio_file = io.BytesIO(response.content) | 
					
						
						|  | extension = os.path.splitext(filename)[1].lower() or ".mp3" | 
					
						
						|  | audio_file.name = f"audio{extension}" | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "file_type": "audio", | 
					
						
						|  | "filename": filename, | 
					
						
						|  | "content_type": content_type, | 
					
						
						|  | "data_url": data_url, | 
					
						
						|  | "audio_buffer": audio_file, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_video(response, filename, content_type): | 
					
						
						|  | """Process video files - save to buffer and extract frames""" | 
					
						
						|  | video_buffer = io.BytesIO(response.content) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | frames = sample_frames(video_buffer, n_frames=4) | 
					
						
						|  | frame_urls = [pil_to_data_url(img) for img in frames] | 
					
						
						|  | frame_extraction_success = True | 
					
						
						|  | except Exception: | 
					
						
						|  | frame_urls = [] | 
					
						
						|  | frame_extraction_success = False | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "file_type": "video", | 
					
						
						|  | "filename": filename, | 
					
						
						|  | "content_type": content_type, | 
					
						
						|  | "video_buffer": video_buffer, | 
					
						
						|  | "frame_urls": frame_urls, | 
					
						
						|  | "frames_extracted": frame_extraction_success, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_tabular(response, filename, content_type): | 
					
						
						|  | """Process spreadsheet files using pandas""" | 
					
						
						|  | excel_buffer = io.BytesIO(response.content) | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | if filename.lower().endswith(".csv"): | 
					
						
						|  | df = pd.read_csv(excel_buffer) | 
					
						
						|  | else: | 
					
						
						|  | df = pd.read_excel(excel_buffer) | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "file_type": "tabular", | 
					
						
						|  | "filename": filename, | 
					
						
						|  | "content_type": content_type, | 
					
						
						|  | "data": df.to_dict(orient="records"), | 
					
						
						|  | "columns": df.columns.tolist(), | 
					
						
						|  | "shape": df.shape, | 
					
						
						|  | } | 
					
						
						|  | except Exception as e: | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "file_type": "tabular", | 
					
						
						|  | "filename": filename, | 
					
						
						|  | "content_type": content_type, | 
					
						
						|  | "error": f"Failed to parse tabular data: {e}", | 
					
						
						|  | "raw_data": base64.b64encode(response.content).decode("utf-8"), | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_text(response, filename, content_type): | 
					
						
						|  | """Process text files (code, plain text, etc.)""" | 
					
						
						|  | try: | 
					
						
						|  | text_content = response.text | 
					
						
						|  | return { | 
					
						
						|  | "file_type": "text", | 
					
						
						|  | "filename": filename, | 
					
						
						|  | "content_type": content_type, | 
					
						
						|  | "content": text_content, | 
					
						
						|  | "extension": os.path.splitext(filename)[ | 
					
						
						|  | 1 | 
					
						
						|  | ], | 
					
						
						|  | } | 
					
						
						|  | except Exception as e: | 
					
						
						|  | return { | 
					
						
						|  | "file_type": "text", | 
					
						
						|  | "filename": filename, | 
					
						
						|  | "content_type": content_type, | 
					
						
						|  | "error": f"Failed to decode text: {e}", | 
					
						
						|  | "raw_data": base64.b64encode(response.content).decode("utf-8"), | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_json(response, filename, content_type): | 
					
						
						|  | """Process JSON data""" | 
					
						
						|  | try: | 
					
						
						|  | json_data = response.json() | 
					
						
						|  | return { | 
					
						
						|  | "file_type": "json", | 
					
						
						|  | "filename": filename, | 
					
						
						|  | "content_type": content_type, | 
					
						
						|  | "data": json_data, | 
					
						
						|  | } | 
					
						
						|  | except Exception: | 
					
						
						|  |  | 
					
						
						|  | return process_text(response, filename, content_type) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_pdf(response, filename, content_type): | 
					
						
						|  | """Process PDF files - return as binary with metadata""" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | pdf_data = base64.b64encode(response.content).decode("utf-8") | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "file_type": "pdf", | 
					
						
						|  | "filename": filename, | 
					
						
						|  | "content_type": content_type, | 
					
						
						|  | "data": pdf_data, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_binary(response, filename, content_type): | 
					
						
						|  | """Process other binary files (fallback handler)""" | 
					
						
						|  | binary_data = base64.b64encode(response.content).decode("utf-8") | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "file_type": "binary", | 
					
						
						|  | "filename": filename, | 
					
						
						|  | "content_type": content_type, | 
					
						
						|  | "data": binary_data, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def fetch_task_files(url: str) -> Dict[str, Any]: | 
					
						
						|  | """ | 
					
						
						|  | Download and process files from a given URL. This tool detects the file type and returns the data in a suitable format for further processing. | 
					
						
						|  |  | 
					
						
						|  | For different file types, this tool returns: | 
					
						
						|  |  | 
					
						
						|  | - Images: Returns a data_url that can be directly used with the read_image tool | 
					
						
						|  | Example: result = fetch_task_files(url); then use read_image(question, result["data_url"]) | 
					
						
						|  |  | 
					
						
						|  | - Audio: Returns audio data that can be used with the transcribe_audio tool | 
					
						
						|  | Example: result = fetch_task_files(url); then use transcribe_audio(result["data_url"], result["filename"]) | 
					
						
						|  |  | 
					
						
						|  | - Video: Returns frame extractions and a video buffer for processing with run_video | 
					
						
						|  | Example: result = fetch_task_files(url); you can access frames via result["frame_urls"] | 
					
						
						|  |  | 
					
						
						|  | - Tabular data (Excel/CSV): Returns parsed data as records, column names, and dimensions | 
					
						
						|  | Example: result = fetch_task_files(url); then analyze data with result["data"] and result["columns"] | 
					
						
						|  |  | 
					
						
						|  | - Text/Code: Returns the content as text for analysis | 
					
						
						|  | Example: result = fetch_task_files(url); then access text via result["content"] | 
					
						
						|  |  | 
					
						
						|  | - PDFs & other files: Returns encoded file data for processing | 
					
						
						|  |  | 
					
						
						|  | All responses include metadata like file_type, filename, and content_type to help determine how to handle the file. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | url (str): Direct URL to the file to download. For task files, construct using the API base URL | 
					
						
						|  | and the task ID (e.g., "https://agents-course-unit4-scoring.hf.space/files/{task_id}") | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | dict: A dictionary with file data and metadata structured for the specific file type | 
					
						
						|  | """ | 
					
						
						|  | files_url = url | 
					
						
						|  | print(f"Fetching file from: {files_url}") | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | response = requests.get(files_url, timeout=15) | 
					
						
						|  | response.raise_for_status() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | content_type = response.headers.get("Content-Type", "").lower() | 
					
						
						|  | filename = response.headers.get("content-disposition", "") | 
					
						
						|  | if "filename=" in filename: | 
					
						
						|  | filename = filename.split("filename=")[-1].strip('"') | 
					
						
						|  | else: | 
					
						
						|  | filename = "file.bin" | 
					
						
						|  |  | 
					
						
						|  | print(f"Received file: {filename}, type: {content_type}") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "image/" in content_type or any( | 
					
						
						|  | filename.lower().endswith(ext) for ext in [".png", ".jpg", ".jpeg", ".gif"] | 
					
						
						|  | ): | 
					
						
						|  | return process_image(response, filename, content_type) | 
					
						
						|  |  | 
					
						
						|  | elif "audio/" in content_type or any( | 
					
						
						|  | filename.lower().endswith(ext) for ext in [".mp3", ".wav", ".ogg"] | 
					
						
						|  | ): | 
					
						
						|  | return process_audio(response, filename, content_type) | 
					
						
						|  |  | 
					
						
						|  | elif "video/" in content_type or any( | 
					
						
						|  | filename.lower().endswith(ext) for ext in [".mp4", ".avi", ".mov"] | 
					
						
						|  | ): | 
					
						
						|  | return process_video(response, filename, content_type) | 
					
						
						|  |  | 
					
						
						|  | elif ( | 
					
						
						|  | "spreadsheet" in content_type | 
					
						
						|  | or "excel" in content_type | 
					
						
						|  | or any(filename.lower().endswith(ext) for ext in [".xlsx", ".xls", ".csv"]) | 
					
						
						|  | ): | 
					
						
						|  | return process_tabular(response, filename, content_type) | 
					
						
						|  |  | 
					
						
						|  | elif ( | 
					
						
						|  | "text/" in content_type | 
					
						
						|  | or "code" in content_type | 
					
						
						|  | or any( | 
					
						
						|  | filename.lower().endswith(ext) | 
					
						
						|  | for ext in [".txt", ".py", ".js", ".html", ".md"] | 
					
						
						|  | ) | 
					
						
						|  | ): | 
					
						
						|  | return process_text(response, filename, content_type) | 
					
						
						|  |  | 
					
						
						|  | elif "application/json" in content_type or filename.lower().endswith(".json"): | 
					
						
						|  | return process_json(response, filename, content_type) | 
					
						
						|  |  | 
					
						
						|  | elif "application/pdf" in content_type or filename.lower().endswith(".pdf"): | 
					
						
						|  | return process_pdf(response, filename, content_type) | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | return process_binary(response, filename, content_type) | 
					
						
						|  |  | 
					
						
						|  | except requests.exceptions.RequestException as e: | 
					
						
						|  | print(f"Error fetching url: {files_url} - {e}") | 
					
						
						|  | return {"error": f"Error fetching files: {e}"} | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"An unexpected error occurred fetching files from url: {files_url}- {e}") | 
					
						
						|  | return {"error": f"An unexpected error occurred: {e}"} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def search_wikipedia(query: str) -> str: | 
					
						
						|  | """ | 
					
						
						|  | get the contents of wikipedia page retrieved by search query. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | query (str):  A search term to search within wikipedia. Ideally it should be one word or a group of few words. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: The text content of wikipedia page | 
					
						
						|  | """ | 
					
						
						|  | get_wiki = wikipediaapi.Wikipedia( | 
					
						
						|  | language="en", | 
					
						
						|  | user_agent="test_tokki", | 
					
						
						|  | extract_format=wikipediaapi.ExtractFormat.WIKI, | 
					
						
						|  | ) | 
					
						
						|  | page_content = get_wiki.page(query) | 
					
						
						|  | text_content = page_content.text | 
					
						
						|  |  | 
					
						
						|  | cutoff = 25000 | 
					
						
						|  | text_content = " ".join(text_content.split(" ")[:cutoff]) | 
					
						
						|  | return text_content | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if __name__ == "__main__": | 
					
						
						|  |  | 
					
						
						|  | api_base = "https://agents-course-unit4-scoring.hf.space" | 
					
						
						|  | test_urls = [ | 
					
						
						|  | f"{api_base}/files/cca530fc-4052-43b2-b130-b30968d8aa44", | 
					
						
						|  | f"{api_base}/files/99c9cc74-fdc8-46c6-8f8d-3ce2d3bfeea3", | 
					
						
						|  | f"{api_base}/files/7bd855d8-463d-4ed5-93ca-5fe35145f733", | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | for url in test_urls: | 
					
						
						|  | print( | 
					
						
						|  | "=" * 20 | 
					
						
						|  | + " " | 
					
						
						|  | + f"Testing fetch_task_files with URL: {url}" | 
					
						
						|  | + " " | 
					
						
						|  | + "=" * 20 | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | result = fetch_task_files(url) | 
					
						
						|  | print(f"File type: {result.get('file_type')}") | 
					
						
						|  | print(f"Filename: {result.get('filename')}") | 
					
						
						|  |  |