Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import json | |
| import requests | |
| import urllib.request | |
| import os | |
| import ssl | |
| import base64 | |
| from PIL import Image | |
| import soundfile as sf | |
| import mimetypes | |
| import logging | |
| from io import BytesIO | |
| import tempfile | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Azure ML endpoint configuration | |
| url = os.getenv("AZURE_ENDPOINT") | |
| api_key = os.getenv("AZURE_API_KEY") | |
| # Default parameter values | |
| default_max_tokens = 4096 | |
| default_temperature = 0.0 | |
| default_top_p = 1.0 | |
| default_presence_penalty = 0.0 | |
| default_frequency_penalty = 0.0 | |
| # Initialize MIME types | |
| mimetypes.init() | |
| def call_aml_endpoint(payload, url, api_key, params=None): | |
| """Call Azure ML endpoint with the given payload.""" | |
| # Allow self-signed HTTPS certificates | |
| def allow_self_signed_https(allowed): | |
| if allowed and not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None): | |
| ssl._create_default_https_context = ssl._create_unverified_context | |
| allow_self_signed_https(True) | |
| # Set parameters from the UI inputs or use defaults | |
| if params is None: | |
| params = { | |
| "max_tokens": default_max_tokens, | |
| "temperature": default_temperature, | |
| "top_p": default_top_p, | |
| "presence_penalty": default_presence_penalty, | |
| "frequency_penalty": default_frequency_penalty | |
| } | |
| parameters = { | |
| "max_tokens": int(params["max_tokens"]), | |
| "temperature": float(params["temperature"]), | |
| "top_p": float(params["top_p"]), | |
| "presence_penalty": float(params["presence_penalty"]), | |
| "frequency_penalty": float(params["frequency_penalty"]), | |
| "stream": True | |
| } | |
| if "parameters" not in payload["input_data"]: | |
| payload["input_data"]["parameters"] = parameters | |
| # Encode the request body | |
| body = str.encode(json.dumps(payload)) | |
| if not api_key: | |
| raise Exception("A key should be provided to invoke the endpoint") | |
| # Set up headers | |
| headers = {'Content-Type': 'application/json', 'Authorization': ('Bearer ' + api_key)} | |
| # Create and send the request | |
| req = urllib.request.Request(url, body, headers) | |
| try: | |
| logger.info(f"Sending request to {url}") | |
| logger.info(f"Using parameters: {parameters}") | |
| response = urllib.request.urlopen(req) | |
| result = response.read().decode('utf-8') | |
| logger.info("Received response successfully") | |
| return json.loads(result) | |
| except urllib.error.HTTPError as error: | |
| logger.error(f"Request failed with status code: {error.code}") | |
| logger.error(f"Headers: {error.info()}") | |
| error_message = error.read().decode("utf8", 'ignore') | |
| logger.error(f"Error message: {error_message}") | |
| return {"error": error_message} | |
| def improved_fetch_audio_from_url(url): | |
| """Improved function to fetch audio data from URL and convert to base64 | |
| Args: | |
| url (str): URL of the audio file | |
| Returns: | |
| tuple: (mime_type, base64_encoded_data) if successful, (None, None) otherwise | |
| """ | |
| try: | |
| # Get the audio file from the URL | |
| logger.info(f"Fetching audio from URL: {url}") | |
| # Use a session with increased timeout | |
| session = requests.Session() | |
| response = session.get(url, timeout=30) | |
| response.raise_for_status() | |
| # Determine MIME type based on URL | |
| file_extension = os.path.splitext(url)[1].lower() | |
| mime_type = None | |
| if file_extension == '.wav': | |
| mime_type = "audio/wav" | |
| elif file_extension == '.mp3': | |
| mime_type = "audio/mpeg" | |
| elif file_extension == '.flac': | |
| mime_type = "audio/flac" | |
| elif file_extension in ['.m4a', '.aac']: | |
| mime_type = "audio/aac" | |
| elif file_extension == '.ogg': | |
| mime_type = "audio/ogg" | |
| else: | |
| # Try to detect the MIME type from headers | |
| content_type = response.headers.get('Content-Type', '') | |
| if content_type.startswith('audio/'): | |
| mime_type = content_type | |
| else: | |
| mime_type = "audio/wav" # Default to WAV | |
| logger.info(f"Detected MIME type: {mime_type}") | |
| # Save content to a temporary file for debugging | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) | |
| temp_file.write(response.content) | |
| temp_file.close() | |
| logger.info(f"Saved audio to temporary file: {temp_file.name}") | |
| # Read the file to verify it's valid | |
| try: | |
| # For WAV files, try to read with soundfile to verify | |
| if mime_type == "audio/wav": | |
| data, samplerate = sf.read(temp_file.name) | |
| logger.info(f"Successfully read audio file: {len(data)} samples, {samplerate}Hz") | |
| except Exception as e: | |
| logger.warning(f"Could not verify audio with soundfile: {e}") | |
| # Continue anyway, the file might still be valid | |
| # Convert to base64 | |
| with open(temp_file.name, "rb") as f: | |
| audio_content = f.read() | |
| base64_audio = base64.b64encode(audio_content).decode('utf-8') | |
| logger.info(f"Successfully encoded audio to base64, length: {len(base64_audio)}") | |
| # Clean up temporary file | |
| try: | |
| os.unlink(temp_file.name) | |
| except: | |
| pass | |
| return mime_type, base64_audio | |
| except Exception as e: | |
| logger.error(f"Error fetching audio from URL: {e}", exc_info=True) | |
| return None, None | |
| def fetch_image_from_url(url): | |
| """Fetch image data from URL and convert to base64 | |
| Args: | |
| url (str): URL of the image file | |
| Returns: | |
| tuple: (mime_type, base64_encoded_data) if successful, (None, None) otherwise | |
| """ | |
| try: | |
| # Get the image file from the URL | |
| logger.info(f"Fetching image from URL: {url}") | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| # Determine MIME type based on URL | |
| file_extension = os.path.splitext(url)[1].lower() | |
| if file_extension in ['.jpg', '.jpeg']: | |
| mime_type = "image/jpeg" | |
| elif file_extension == '.png': | |
| mime_type = "image/png" | |
| elif file_extension == '.gif': | |
| mime_type = "image/gif" | |
| elif file_extension in ['.bmp', '.tiff', '.webp']: | |
| mime_type = f"image/{file_extension[1:]}" | |
| else: | |
| mime_type = "image/jpeg" # Default to JPEG | |
| # Convert to base64 | |
| base64_image = base64.b64encode(response.content).decode('utf-8') | |
| logger.info(f"Successfully fetched and encoded image, mime type: {mime_type}") | |
| return mime_type, base64_image | |
| except Exception as e: | |
| logger.error(f"Error fetching image from URL: {e}") | |
| return None, None | |
| def encode_base64_from_file(file_path): | |
| """Encode file content to base64 string and determine MIME type.""" | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| # Map file extensions to MIME types | |
| if file_extension in ['.jpg', '.jpeg']: | |
| mime_type = "image/jpeg" | |
| elif file_extension == '.png': | |
| mime_type = "image/png" | |
| elif file_extension == '.gif': | |
| mime_type = "image/gif" | |
| elif file_extension in ['.bmp', '.tiff', '.webp']: | |
| mime_type = f"image/{file_extension[1:]}" | |
| elif file_extension == '.flac': | |
| mime_type = "audio/flac" | |
| elif file_extension == '.wav': | |
| mime_type = "audio/wav" | |
| elif file_extension == '.mp3': | |
| mime_type = "audio/mpeg" | |
| elif file_extension in ['.m4a', '.aac']: | |
| mime_type = "audio/aac" | |
| elif file_extension == '.ogg': | |
| mime_type = "audio/ogg" | |
| else: | |
| mime_type = "application/octet-stream" | |
| # Read and encode file content | |
| with open(file_path, "rb") as file: | |
| encoded_string = base64.b64encode(file.read()).decode('utf-8') | |
| return encoded_string, mime_type | |
| def process_message(history, message, conversation_state): | |
| """Process user message and update both history and internal state.""" | |
| # Extract text and files | |
| text_content = message["text"] if message["text"] else "" | |
| image_files = [] | |
| audio_files = [] | |
| # Create content array for internal state | |
| content_items = [] | |
| # Add text if available | |
| if text_content: | |
| content_items.append({"type": "text", "text": text_content}) | |
| # Check if we need to clear history when uploading a second image or audio | |
| should_clear_history = False | |
| # Count existing images and audio in history | |
| existing_images = 0 | |
| existing_audio = 0 | |
| for msg in conversation_state: | |
| if msg["role"] == "user" and "content" in msg: | |
| for content_item in msg["content"]: | |
| if isinstance(content_item, dict): | |
| if content_item.get("type") == "image_url": | |
| existing_images += 1 | |
| elif content_item.get("type") == "audio_url": | |
| existing_audio += 1 | |
| # Process and immediately convert files to base64 | |
| if message["files"] and len(message["files"]) > 0: | |
| for file_path in message["files"]: | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| file_name = os.path.basename(file_path) | |
| # Convert the file to base64 immediately | |
| base64_content, mime_type = encode_base64_from_file(file_path) | |
| # Add to content items for the API | |
| if mime_type.startswith("image/"): | |
| content_items.append({ | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:{mime_type};base64,{base64_content}" | |
| } | |
| }) | |
| image_files.append(file_path) | |
| # Check if this is a second image | |
| if existing_images > 0: | |
| should_clear_history = True | |
| logger.info("Detected second image upload - clearing history") | |
| elif mime_type.startswith("audio/"): | |
| content_items.append({ | |
| "type": "audio_url", | |
| "audio_url": { | |
| "url": f"data:{mime_type};base64,{base64_content}" | |
| } | |
| }) | |
| audio_files.append(file_path) | |
| # Check if this is a second audio | |
| if existing_audio > 0: | |
| should_clear_history = True | |
| logger.info("Detected second audio upload - clearing history") | |
| # Only proceed if we have content | |
| if content_items: | |
| # Clear history if we're uploading a second image or audio | |
| if should_clear_history: | |
| history = [] | |
| conversation_state = [] | |
| logger.info("History cleared due to second image/audio upload") | |
| # Add to Gradio chatbot history (for display) | |
| history.append({"role": "user", "content": text_content}) | |
| # Add file messages if present | |
| for file_path in image_files + audio_files: | |
| history.append({"role": "user", "content": {"path": file_path}}) | |
| logger.info(f"Updated history with user message. Current conversation has {existing_images + len(image_files)} images and {existing_audio + len(audio_files)} audio files") | |
| # Add to internal conversation state (with base64 data) | |
| conversation_state.append({ | |
| "role": "user", | |
| "content": content_items | |
| }) | |
| return history, gr.MultimodalTextbox(value=None, interactive=False), conversation_state | |
| def process_text_example(example_text, history, conversation_state): | |
| """Process a text example directly.""" | |
| try: | |
| # Initialize history and conversation_state if they're None | |
| if history is None: | |
| history = [] | |
| if conversation_state is None: | |
| conversation_state = [] | |
| # Add text message to history for display | |
| history.append({"role": "user", "content": example_text}) | |
| # Add to conversation state | |
| content_items = [ | |
| {"type": "text", "text": example_text} | |
| ] | |
| conversation_state.append({ | |
| "role": "user", | |
| "content": content_items | |
| }) | |
| # Generate bot response | |
| return bot_response(history, conversation_state) | |
| except Exception as e: | |
| logger.error(f"Error processing text example: {e}", exc_info=True) | |
| if history is None: | |
| history = [] | |
| history.append({"role": "user", "content": example_text}) | |
| history.append({"role": "assistant", "content": f"Error: {str(e)}"}) | |
| return history, conversation_state | |
| def process_audio_example_direct(example_text, example_audio_url, history, conversation_state): | |
| """Process an audio example directly from a URL.""" | |
| try: | |
| logger.info(f"Processing audio example with text: {example_text}, URL: {example_audio_url}") | |
| # Initialize history and conversation_state if they're None | |
| if history is None: | |
| history = [] | |
| if conversation_state is None: | |
| conversation_state = [] | |
| # Check if we need to clear history (if there's already an audio in the conversation) | |
| should_clear_history = False | |
| for msg in conversation_state: | |
| if msg["role"] == "user" and "content" in msg: | |
| for content_item in msg["content"]: | |
| if isinstance(content_item, dict) and content_item.get("type") == "audio_url": | |
| should_clear_history = True | |
| break | |
| if should_clear_history: | |
| history = [] | |
| conversation_state = [] | |
| logger.info("History cleared due to example with second audio") | |
| # Fetch audio and convert to base64 directly using improved function | |
| mime_type, base64_audio = improved_fetch_audio_from_url(example_audio_url) | |
| if not mime_type or not base64_audio: | |
| error_msg = f"Failed to load audio from {example_audio_url}" | |
| logger.error(error_msg) | |
| history.append({"role": "user", "content": f"{example_text} (Audio URL: {example_audio_url})"}) | |
| history.append({"role": "assistant", "content": f"Error: {error_msg}"}) | |
| return history, conversation_state | |
| logger.info(f"Successfully loaded audio, mime type: {mime_type}, base64 length: {len(base64_audio)}") | |
| # Add text message to history for display | |
| history.append({"role": "user", "content": example_text}) | |
| # Add to conversation state | |
| content_items = [ | |
| {"type": "text", "text": example_text}, | |
| {"type": "audio_url", "audio_url": {"url": f"data:{mime_type};base64,{base64_audio}"}} | |
| ] | |
| conversation_state.append({ | |
| "role": "user", | |
| "content": content_items | |
| }) | |
| logger.info("Successfully prepared conversation state, now generating response") | |
| # Generate bot response | |
| return bot_response(history, conversation_state) | |
| except Exception as e: | |
| logger.error(f"Error processing audio example: {e}", exc_info=True) | |
| if history is None: | |
| history = [] | |
| history.append({"role": "user", "content": f"{example_text} (Audio URL: {example_audio_url})"}) | |
| history.append({"role": "assistant", "content": f"Error: {str(e)}"}) | |
| return history, conversation_state | |
| def process_image_example_direct(example_text, example_image_url, history, conversation_state): | |
| """Process an image example directly from a URL.""" | |
| try: | |
| # Initialize history and conversation_state if they're None | |
| if history is None: | |
| history = [] | |
| if conversation_state is None: | |
| conversation_state = [] | |
| # Check if we need to clear history (if there's already an image in the conversation) | |
| should_clear_history = False | |
| for msg in conversation_state: | |
| if msg["role"] == "user" and "content" in msg: | |
| for content_item in msg["content"]: | |
| if isinstance(content_item, dict) and content_item.get("type") == "image_url": | |
| should_clear_history = True | |
| break | |
| if should_clear_history: | |
| history = [] | |
| conversation_state = [] | |
| logger.info("History cleared due to example with second image") | |
| # Fetch image and convert to base64 directly | |
| mime_type, base64_image = fetch_image_from_url(example_image_url) | |
| if not mime_type or not base64_image: | |
| error_msg = f"Failed to load image from {example_image_url}" | |
| logger.error(error_msg) | |
| history.append({"role": "user", "content": f"{example_text} (Image URL: {example_image_url})"}) | |
| history.append({"role": "assistant", "content": f"Error: {error_msg}"}) | |
| return history, conversation_state | |
| # Add text message to history for display | |
| history.append({"role": "user", "content": example_text}) | |
| # Add to conversation state | |
| content_items = [ | |
| {"type": "text", "text": example_text}, | |
| {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}} | |
| ] | |
| conversation_state.append({ | |
| "role": "user", | |
| "content": content_items | |
| }) | |
| # Generate bot response | |
| return bot_response(history, conversation_state) | |
| except Exception as e: | |
| logger.error(f"Error processing image example: {e}", exc_info=True) | |
| if history is None: | |
| history = [] | |
| history.append({"role": "user", "content": f"{example_text} (Image URL: {example_image_url})"}) | |
| history.append({"role": "assistant", "content": f"Error: {str(e)}"}) | |
| return history, conversation_state | |
| def bot_response(history, conversation_state): | |
| """Generate bot response based on conversation state.""" | |
| if not conversation_state: | |
| return history, conversation_state | |
| # Create the payload | |
| payload = { | |
| "input_data": { | |
| "input_string": conversation_state | |
| } | |
| } | |
| # Log the payload for debugging (without base64 data) | |
| debug_payload = json.loads(json.dumps(payload)) | |
| for item in debug_payload["input_data"]["input_string"]: | |
| if "content" in item and isinstance(item["content"], list): | |
| for content_item in item["content"]: | |
| if "image_url" in content_item: | |
| parts = content_item["image_url"]["url"].split(",") | |
| if len(parts) > 1: | |
| content_item["image_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]" | |
| if "audio_url" in content_item: | |
| parts = content_item["audio_url"]["url"].split(",") | |
| if len(parts) > 1: | |
| content_item["audio_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]" | |
| logger.info(f"Sending payload: {json.dumps(debug_payload, indent=2)}") | |
| # Call Azure ML endpoint | |
| response = call_aml_endpoint(payload, url, api_key) | |
| # Extract text response from the Azure ML endpoint response | |
| try: | |
| if isinstance(response, dict): | |
| if "result" in response: | |
| result = response["result"] | |
| elif "output" in response: | |
| # Depending on your API's response format | |
| if isinstance(response["output"], list) and len(response["output"]) > 0: | |
| result = response["output"][0] | |
| else: | |
| result = str(response["output"]) | |
| elif "error" in response: | |
| result = f"Error: {response['error']}" | |
| else: | |
| # Just return the whole response as string if we can't parse it | |
| result = f"Received response: {json.dumps(response)}" | |
| else: | |
| result = str(response) | |
| except Exception as e: | |
| result = f"Error processing response: {str(e)}" | |
| # Add bot response to history | |
| if result=="None": | |
| result = "This demo does not support text + audio + image inputs in the same conversation. Please click Clear conversation button." | |
| history.append({"role": "assistant", "content": result}) | |
| # Add to conversation state | |
| conversation_state.append({ | |
| "role": "assistant", | |
| "content": [{"type": "text", "text": result}] | |
| }) | |
| return history, conversation_state | |
| def enable_input(): | |
| """Re-enable the input box after bot responds.""" | |
| return gr.MultimodalTextbox(interactive=True) | |
| def update_debug(conversation_state): | |
| """Update debug output with the last payload that would be sent.""" | |
| if not conversation_state: | |
| return {} | |
| # Create a payload from the conversation | |
| payload = { | |
| "input_data": { | |
| "input_string": conversation_state | |
| } | |
| } | |
| # Remove base64 data to avoid cluttering the UI | |
| sanitized_payload = json.loads(json.dumps(payload)) | |
| for item in sanitized_payload["input_data"]["input_string"]: | |
| if "content" in item and isinstance(item["content"], list): | |
| for content_item in item["content"]: | |
| if "image_url" in content_item: | |
| parts = content_item["image_url"]["url"].split(",") | |
| if len(parts) > 1: | |
| content_item["image_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]" | |
| if "audio_url" in content_item: | |
| parts = content_item["audio_url"]["url"].split(",") | |
| if len(parts) > 1: | |
| content_item["audio_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]" | |
| return sanitized_payload | |
| # Add this near the beginning of your Blocks definition, before you define your components | |
| css = """ | |
| #small-audio audio { | |
| height: 20px !important; | |
| width: 100px !important; | |
| } | |
| #small-audio .wrap { | |
| max-width: 220px !important; | |
| } | |
| #small-audio .audio-container { | |
| min-height: 0px !important; | |
| } | |
| """ | |
| # Create Gradio demo | |
| with gr.Blocks(theme=gr.themes.Soft(), css=css) as demo: | |
| title = gr.Markdown("# Phi-4-Multimodal Playground") | |
| description = gr.Markdown(""" | |
| This demo allows you to interact with the [Phi-4-Multimodal AI model](https://aka.ms/phi-4-multimodal/techreport). | |
| You can type messages, upload images, or record audio to communicate with the AI. | |
| Other demos include [Phi-4-Mini playground](https://huggingface.co/spaces/microsoft/phi-4-mini), [Thoughts Organizer](https://huggingface.co/spaces/microsoft/ThoughtsOrganizer), | |
| [Stories Come Alive](https://huggingface.co/spaces/microsoft/StoriesComeAlive), [Phine Speech Translator](https://huggingface.co/spaces/microsoft/PhineSpeechTranslator) | |
| """) | |
| # Store the conversation state with base64 data | |
| conversation_state = gr.State([]) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot( | |
| type="messages", | |
| avatar_images=(None, "https://upload.wikimedia.org/wikipedia/commons/d/d3/Phi-integrated-information-symbol.png",), | |
| height=600 | |
| ) | |
| # trash icon clear all | |
| chatbot.clear(lambda: [], None, conversation_state) | |
| with gr.Row(): | |
| chat_input = gr.MultimodalTextbox( | |
| interactive=True, | |
| file_count="multiple", | |
| placeholder="Enter a message or upload files (images, audio)...", | |
| show_label=False, | |
| sources=["microphone", "upload"], | |
| ) | |
| with gr.Row(): | |
| clear_btn = gr.ClearButton([chatbot, chat_input], value="Clear conversation") | |
| clear_btn.click(lambda: [], None, conversation_state) # Also clear the conversation state | |
| gr.HTML("<div style='text-align: right; margin-top: 5px;'><small>Powered by Microsoft <a href=\"https://aka.ms/phi-4-multimodal/azure\">Phi-4-multimodal</a> model on Azure AI.©2025</small></div>") | |
| with gr.Column(scale=1): | |
| with gr.Tab("Audio & Text"): | |
| # Example 1 | |
| gr.Audio("https://diamondfan.github.io/audio_files/english.weekend.plan.wav", | |
| label="Preview", elem_id="small-audio") | |
| example1_btn = gr.Button("Transcribe this audio clip") | |
| gr.Markdown("-----") | |
| # Example 2 | |
| gr.Audio("https://diamondfan.github.io/audio_files/japanese.seattle.trip.report.wav", | |
| label="Preview", elem_id="small-audio") | |
| example2_btn = gr.Button("Translate audio transcription to English") | |
| # Define handlers for audio examples | |
| def run_audio_example1(): | |
| return process_audio_example_direct( | |
| "Transcribe this audio clip", | |
| "https://diamondfan.github.io/audio_files/english.weekend.plan.wav", | |
| [], [] | |
| ) | |
| def run_audio_example2(): | |
| return process_audio_example_direct( | |
| "Translate audio transcription to English", | |
| "https://diamondfan.github.io/audio_files/japanese.seattle.trip.report.wav", | |
| [], [] | |
| ) | |
| # Connect buttons to handlers | |
| example1_btn.click( | |
| run_audio_example1, | |
| inputs=[], | |
| outputs=[chatbot, conversation_state] | |
| ) | |
| example2_btn.click( | |
| run_audio_example2, | |
| inputs=[], | |
| outputs=[chatbot, conversation_state] | |
| ) | |
| with gr.Tab("Image & Text"): | |
| # Example 1 | |
| gr.Image("https://huggingface.co/spaces/microsoft/phi-4-multimodal/resolve/main/Hanoi_Temple_of_Literature.jpg", label="Preview") | |
| img_example1_btn = gr.Button("Write a limerick about this image") | |
| # Example 2 | |
| gr.Image("https://pub-c2c1d9230f0b4abb9b0d2d95e06fd4ef.r2.dev/sites/566/2024/09/Screenshot-2024-09-16-115417.png", label="Preview") | |
| img_example2_btn = gr.Button("Convert the chart to a markdown table") | |
| # Define handlers for image examples | |
| def run_image_example1(): | |
| return process_image_example_direct( | |
| "Write a limerick about this image", | |
| "https://huggingface.co/spaces/microsoft/phi-4-multimodal/resolve/main/Hanoi_Temple_of_Literature.jpg", | |
| [], [] | |
| ) | |
| def run_image_example2(): | |
| return process_image_example_direct( | |
| "Convert the chart to a markdown table", | |
| "https://pub-c2c1d9230f0b4abb9b0d2d95e06fd4ef.r2.dev/sites/566/2024/09/Screenshot-2024-09-16-115417.png", | |
| [], [] | |
| ) | |
| # Connect buttons to handlers | |
| img_example1_btn.click( | |
| run_image_example1, | |
| inputs=[], | |
| outputs=[chatbot, conversation_state] | |
| ) | |
| img_example2_btn.click( | |
| run_image_example2, | |
| inputs=[], | |
| outputs=[chatbot, conversation_state] | |
| ) | |
| with gr.Tab("Text Only"): | |
| # Create a list of example texts | |
| text_example_list = [ | |
| "I'd like to buy a new car. Start by asking me about my budget and which features I care most about, then provide a recommendation.", | |
| "Coffee shops have been slimming down their menus lately. Is less choice making our coffee runs better or do we miss the variety?", | |
| "Explain the Transformer model to a medieval knight" | |
| ] | |
| # Create buttons for each example | |
| for i, example_text in enumerate(text_example_list): | |
| with gr.Row(): | |
| # gr.Markdown(f"Example {i+1}: **{example_text}**") | |
| text_example_btn = gr.Button(f"{example_text}") | |
| # Connect button to handler with the specific example text | |
| text_example_btn.click( | |
| fn=lambda text=example_text: process_text_example(text, [], []), | |
| inputs=[], | |
| outputs=[chatbot, conversation_state] | |
| ) | |
| gr.Markdown("### Instructions") | |
| gr.Markdown(""" | |
| - Type a question or statement | |
| - Upload images or audio files | |
| - You can combine text with media files | |
| - Support 2 modalities at the same time | |
| - The model can analyze images and transcribe audio | |
| - For best results with images, use JPG or PNG files | |
| - For audio, use WAV, MP3, or FLAC files | |
| """) | |
| gr.Markdown("### Capabilities") | |
| gr.Markdown(""" | |
| This chatbot can: | |
| - Answer questions and provide explanations | |
| - Describe and analyze images | |
| - Transcribe, translate, summarize, and analyze audio content | |
| - Process multiple inputs in the same message | |
| - Maintain context throughout the conversation | |
| """) | |
| with gr.Accordion("Debug Info", open=False): | |
| debug_output = gr.JSON( | |
| label="Last API Request", | |
| value={} | |
| ) | |
| # Set up event handlers | |
| msg_submit = chat_input.submit( | |
| process_message, [chatbot, chat_input, conversation_state], [chatbot, chat_input, conversation_state], queue=False | |
| ) | |
| msg_response = msg_submit.then( | |
| bot_response, [chatbot, conversation_state], [chatbot, conversation_state], api_name="bot_response" | |
| ) | |
| msg_response.then(enable_input, None, chat_input) | |
| # Update debug info | |
| msg_response.then(update_debug, conversation_state, debug_output) | |
| demo.launch(share=True, debug=True) |