Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| from os import getenv | |
| import base64 | |
| from io import BytesIO | |
| from dotenv import load_dotenv | |
| import requests | |
| import socket | |
| import logging | |
| import json | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import HumanMessage, AIMessage | |
| from langchain_core.callbacks import StreamingStdOutCallbackHandler | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Load environment | |
| dotenv_path = os.path.join(os.path.dirname(__file__), '.env') | |
| load_dotenv(dotenv_path=dotenv_path) | |
| # Debug env | |
| logger.info(f"OPENROUTER_BASE_URL: {getenv('OPENROUTER_BASE_URL')}") | |
| logger.info(f"OPENROUTER_API_KEY: {'Found' if getenv('OPENROUTER_API_KEY') else 'Missing'}") | |
| # Connectivity test | |
| def test_connectivity(url="https://openrouter.helicone.ai/api/v1"): | |
| try: | |
| return requests.get(url, timeout=5).status_code == 200 | |
| except (requests.RequestException, socket.error) as e: | |
| logger.error(f"Connectivity test failed: {e}") | |
| return False | |
| if not test_connectivity(): | |
| logger.warning("No network to OpenRouter; responses may fail.") | |
| # Helper to make direct API calls to OpenRouter when LangChain fails | |
| def direct_api_call(messages, api_key, base_url): | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {api_key}", | |
| "HTTP-Referer": "https://your-app-domain.com", # Add your domain | |
| "X-Title": "Image Analysis App" | |
| } | |
| if getenv("HELICONE_API_KEY"): | |
| headers["Helicone-Auth"] = f"Bearer {getenv('HELICONE_API_KEY')}" | |
| payload = { | |
| "model": "google/gemini-flash-1.5", | |
| "messages": messages, | |
| "stream": False, | |
| } | |
| try: | |
| response = requests.post( | |
| f"{base_url}/chat/completions", | |
| headers=headers, | |
| json=payload, | |
| timeout=30 | |
| ) | |
| response.raise_for_status() | |
| return response.json()["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| logger.error(f"Direct API call failed: {e}") | |
| return f"Error: {str(e)}" | |
| # Initialize LLM with streaming and retry logic | |
| def init_llm(): | |
| if not test_connectivity(): | |
| raise RuntimeError("No hay conexión a OpenRouter. Verifica red y claves.") | |
| return ChatOpenAI( | |
| openai_api_key=getenv("OPENROUTER_API_KEY"), | |
| openai_api_base=getenv("OPENROUTER_BASE_URL"), | |
| model_name="google/gemini-flash-1.5", | |
| streaming=True, | |
| callbacks=[StreamingStdOutCallbackHandler()], | |
| model_kwargs={ | |
| "extra_headers": {"Helicone-Auth": f"Bearer {getenv('HELICONE_API_KEY')}"} | |
| }, | |
| ) | |
| # Try to initialize LLM but handle failures gracefully | |
| try: | |
| llm = init_llm() | |
| except Exception as e: | |
| logger.error(f"Failed to initialize LLM: {e}") | |
| llm = None | |
| # Helpers | |
| def encode_image_to_base64(pil_image): | |
| buffer = BytesIO() | |
| pil_image.save(buffer, format="PNG") | |
| return base64.b64encode(buffer.getvalue()).decode() | |
| # Core logic | |
| def generate_response(message, chat_history, image): | |
| # Convert chat history to standard format | |
| formatted_history = [] | |
| for msg in chat_history: | |
| role = msg.get('role') | |
| content = msg.get('content') | |
| if role == 'user': | |
| formatted_history.append({"role": "user", "content": content}) | |
| else: | |
| formatted_history.append({"role": "assistant", "content": content}) | |
| # Prepare system message | |
| system_msg = {"role": "system", "content": "You are an expert image analysis assistant. Answer succinctly."} | |
| # Prepare the latest message with image if provided | |
| if image: | |
| base64_image = encode_image_to_base64(image) | |
| # Format for direct API call (OpenRouter/OpenAI format) | |
| api_messages = [system_msg] + formatted_history + [{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": message}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} | |
| ] | |
| }] | |
| # For LangChain format | |
| content_for_langchain = [ | |
| {"type": "text", "text": message}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} | |
| ] | |
| else: | |
| api_messages = [system_msg] + formatted_history + [{"role": "user", "content": message}] | |
| content_for_langchain = message | |
| # Build LangChain messages | |
| lc_messages = [HumanMessage(content="You are an expert image analysis assistant. Answer succinctly.")] | |
| for msg in chat_history: | |
| role = msg.get('role') | |
| content = msg.get('content') | |
| if role == 'user': | |
| lc_messages.append(HumanMessage(content=content)) | |
| else: | |
| lc_messages.append(AIMessage(content=content)) | |
| lc_messages.append(HumanMessage(content=content_for_langchain)) | |
| try: | |
| # First try with LangChain | |
| if llm: | |
| try: | |
| # Try streaming first | |
| try: | |
| stream_iter = llm.stream(lc_messages) | |
| partial = "" | |
| for chunk in stream_iter: | |
| if chunk is None: | |
| continue | |
| content = getattr(chunk, 'content', None) | |
| if content is None: | |
| continue | |
| partial += content | |
| yield partial | |
| # If we got this far, streaming worked | |
| return | |
| except Exception as e: | |
| logger.warning(f"Streaming failed: {e}. Falling back to non-streaming mode") | |
| # Try non-streaming | |
| try: | |
| response = llm.invoke(lc_messages) | |
| yield response.content | |
| return | |
| except Exception as e: | |
| logger.warning(f"Non-streaming LangChain invoke failed: {e}") | |
| raise e | |
| except Exception as e: | |
| logger.warning(f"LangChain approach failed: {e}. Trying direct API call") | |
| # Fallback to direct API call | |
| logger.info("Using direct API call as fallback") | |
| response_text = direct_api_call( | |
| api_messages, | |
| getenv("OPENROUTER_API_KEY"), | |
| getenv("OPENROUTER_BASE_URL") | |
| ) | |
| yield response_text | |
| except Exception as e: | |
| import traceback | |
| error_trace = traceback.format_exc() | |
| logger.exception(f"All approaches failed during response generation: {e}") | |
| logger.error(f"Full traceback: {error_trace}") | |
| yield f"⚠️ Error al generar respuesta: {str(e)}. Intenta más tarde." | |
| # Gradio interface | |
| def process_message(message, chat_history, image): | |
| if chat_history is None: | |
| chat_history = [] | |
| if image is None: | |
| chat_history.append({'role':'assistant','content':'Por favor sube una imagen.'}) | |
| return "", chat_history | |
| chat_history.append({'role':'user','content':message}) | |
| chat_history.append({'role':'assistant','content':'⏳ Procesando...'}) | |
| yield "", chat_history | |
| for chunk in generate_response(message, chat_history, image): | |
| chat_history[-1]['content'] = chunk | |
| yield "", chat_history | |
| return "", chat_history | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(type='messages', height=600) | |
| msg = gr.Textbox(label="Mensaje", placeholder="Escribe tu pregunta...") | |
| clear = gr.ClearButton([msg, chatbot]) | |
| with gr.Column(scale=1): | |
| image_input = gr.Image(type="pil", label="Sube Imagen") | |
| info = gr.Textbox(label="Info Imagen", interactive=False) | |
| msg.submit(process_message, [msg, chatbot, image_input], [msg, chatbot]) | |
| image_input.change(lambda img: f"Tamaño: {img.size}" if img else "Sin imagen.", [image_input], [info]) | |
| demo.launch() |