File size: 8,103 Bytes
d0a1746 43b3cef d0a1746 100ea5d 43b3cef d0a1746 100ea5d d0a1746 100ea5d d0a1746 100ea5d d0a1746 100ea5d d0a1746 100ea5d d0a1746 100ea5d e0b4034 100ea5d e0b4034 100ea5d e0b4034 100ea5d d0a1746 100ea5d d0a1746 100ea5d d0a1746 43b3cef 100ea5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
import os
import gradio as gr
from os import getenv
import base64
from io import BytesIO
from dotenv import load_dotenv
import requests
import socket
import logging
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.callbacks import StreamingStdOutCallbackHandler
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment
dotenv_path = os.path.join(os.path.dirname(__file__), '.env')
load_dotenv(dotenv_path=dotenv_path)
# Debug env
logger.info(f"OPENROUTER_BASE_URL: {getenv('OPENROUTER_BASE_URL')}")
logger.info(f"OPENROUTER_API_KEY: {'Found' if getenv('OPENROUTER_API_KEY') else 'Missing'}")
# Connectivity test
def test_connectivity(url="https://openrouter.helicone.ai/api/v1"):
try:
return requests.get(url, timeout=5).status_code == 200
except (requests.RequestException, socket.error) as e:
logger.error(f"Connectivity test failed: {e}")
return False
if not test_connectivity():
logger.warning("No network to OpenRouter; responses may fail.")
# Helper to make direct API calls to OpenRouter when LangChain fails
def direct_api_call(messages, api_key, base_url):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://your-app-domain.com", # Add your domain
"X-Title": "Image Analysis App"
}
if getenv("HELICONE_API_KEY"):
headers["Helicone-Auth"] = f"Bearer {getenv('HELICONE_API_KEY')}"
payload = {
"model": "google/gemini-flash-1.5",
"messages": messages,
"stream": False,
}
try:
response = requests.post(
f"{base_url}/chat/completions",
headers=headers,
json=payload,
timeout=30
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except Exception as e:
logger.error(f"Direct API call failed: {e}")
return f"Error: {str(e)}"
# Initialize LLM with streaming and retry logic
def init_llm():
if not test_connectivity():
raise RuntimeError("No hay conexión a OpenRouter. Verifica red y claves.")
return ChatOpenAI(
openai_api_key=getenv("OPENROUTER_API_KEY"),
openai_api_base=getenv("OPENROUTER_BASE_URL"),
model_name="google/gemini-flash-1.5",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()],
model_kwargs={
"extra_headers": {"Helicone-Auth": f"Bearer {getenv('HELICONE_API_KEY')}"}
},
)
# Try to initialize LLM but handle failures gracefully
try:
llm = init_llm()
except Exception as e:
logger.error(f"Failed to initialize LLM: {e}")
llm = None
# Helpers
def encode_image_to_base64(pil_image):
buffer = BytesIO()
pil_image.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode()
# Core logic
def generate_response(message, chat_history, image):
# Convert chat history to standard format
formatted_history = []
for msg in chat_history:
role = msg.get('role')
content = msg.get('content')
if role == 'user':
formatted_history.append({"role": "user", "content": content})
else:
formatted_history.append({"role": "assistant", "content": content})
# Prepare system message
system_msg = {"role": "system", "content": "You are an expert image analysis assistant. Answer succinctly."}
# Prepare the latest message with image if provided
if image:
base64_image = encode_image_to_base64(image)
# Format for direct API call (OpenRouter/OpenAI format)
api_messages = [system_msg] + formatted_history + [{
"role": "user",
"content": [
{"type": "text", "text": message},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]
}]
# For LangChain format
content_for_langchain = [
{"type": "text", "text": message},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]
else:
api_messages = [system_msg] + formatted_history + [{"role": "user", "content": message}]
content_for_langchain = message
# Build LangChain messages
lc_messages = [HumanMessage(content="You are an expert image analysis assistant. Answer succinctly.")]
for msg in chat_history:
role = msg.get('role')
content = msg.get('content')
if role == 'user':
lc_messages.append(HumanMessage(content=content))
else:
lc_messages.append(AIMessage(content=content))
lc_messages.append(HumanMessage(content=content_for_langchain))
try:
# First try with LangChain
if llm:
try:
# Try streaming first
try:
stream_iter = llm.stream(lc_messages)
partial = ""
for chunk in stream_iter:
if chunk is None:
continue
content = getattr(chunk, 'content', None)
if content is None:
continue
partial += content
yield partial
# If we got this far, streaming worked
return
except Exception as e:
logger.warning(f"Streaming failed: {e}. Falling back to non-streaming mode")
# Try non-streaming
try:
response = llm.invoke(lc_messages)
yield response.content
return
except Exception as e:
logger.warning(f"Non-streaming LangChain invoke failed: {e}")
raise e
except Exception as e:
logger.warning(f"LangChain approach failed: {e}. Trying direct API call")
# Fallback to direct API call
logger.info("Using direct API call as fallback")
response_text = direct_api_call(
api_messages,
getenv("OPENROUTER_API_KEY"),
getenv("OPENROUTER_BASE_URL")
)
yield response_text
except Exception as e:
import traceback
error_trace = traceback.format_exc()
logger.exception(f"All approaches failed during response generation: {e}")
logger.error(f"Full traceback: {error_trace}")
yield f"⚠️ Error al generar respuesta: {str(e)}. Intenta más tarde."
# Gradio interface
def process_message(message, chat_history, image):
if chat_history is None:
chat_history = []
if image is None:
chat_history.append({'role':'assistant','content':'Por favor sube una imagen.'})
return "", chat_history
chat_history.append({'role':'user','content':message})
chat_history.append({'role':'assistant','content':'⏳ Procesando...'})
yield "", chat_history
for chunk in generate_response(message, chat_history, image):
chat_history[-1]['content'] = chunk
yield "", chat_history
return "", chat_history
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(type='messages', height=600)
msg = gr.Textbox(label="Mensaje", placeholder="Escribe tu pregunta...")
clear = gr.ClearButton([msg, chatbot])
with gr.Column(scale=1):
image_input = gr.Image(type="pil", label="Sube Imagen")
info = gr.Textbox(label="Info Imagen", interactive=False)
msg.submit(process_message, [msg, chatbot, image_input], [msg, chatbot])
image_input.change(lambda img: f"Tamaño: {img.size}" if img else "Sin imagen.", [image_input], [info])
demo.launch() |