|
|
|
|
|
|
|
|
import time |
|
|
import functools |
|
|
import logging |
|
|
from typing import List, Dict, Callable, Any |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def timing_decorator(func: Callable) -> Callable: |
|
|
"""Decorator para medir tiempo de ejecución""" |
|
|
@functools.wraps(func) |
|
|
def wrapper(*args, **kwargs): |
|
|
start_time = time.time() |
|
|
result = func(*args, **kwargs) |
|
|
end_time = time.time() |
|
|
|
|
|
logger.info(f"{func.__name__} ejecutado en {end_time - start_time:.2f}s") |
|
|
return result |
|
|
return wrapper |
|
|
|
|
|
def sanitize_input(text: str, max_length: int = 2000) -> str: |
|
|
"""Sanitizar entrada del usuario""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
|
|
|
|
|
|
text = text[:max_length] |
|
|
|
|
|
|
|
|
text = text.replace('\x00', '') |
|
|
text = text.strip() |
|
|
|
|
|
return text |
|
|
|
|
|
def format_history(history: List[List[str]]) -> List[List[str]]: |
|
|
"""Formatear y validar historial de chat""" |
|
|
if not history: |
|
|
return [] |
|
|
|
|
|
formatted_history = [] |
|
|
for item in history: |
|
|
if isinstance(item, list) and len(item) == 2: |
|
|
user_msg = sanitize_input(str(item[0])) |
|
|
assistant_msg = sanitize_input(str(item[1])) |
|
|
|
|
|
if user_msg and assistant_msg: |
|
|
formatted_history.append([user_msg, assistant_msg]) |
|
|
|
|
|
|
|
|
return formatted_history[-10:] |
|
|
|
|
|
def estimate_tokens(text: str) -> int: |
|
|
"""Estimación aproximada de tokens""" |
|
|
|
|
|
return len(text) // 4 |
|
|
|
|
|
def validate_parameters(max_tokens: int, temperature: float) -> Dict[str, Any]: |
|
|
"""Validar parámetros de generación""" |
|
|
from config import Config |
|
|
|
|
|
errors = [] |
|
|
|
|
|
|
|
|
if not isinstance(max_tokens, int): |
|
|
max_tokens = Config.DEFAULT_MAX_TOKENS |
|
|
errors.append("max_tokens debe ser un entero") |
|
|
elif max_tokens < Config.MIN_TOKENS_LIMIT: |
|
|
max_tokens = Config.MIN_TOKENS_LIMIT |
|
|
errors.append(f"max_tokens mínimo es {Config.MIN_TOKENS_LIMIT}") |
|
|
elif max_tokens > Config.MAX_TOKENS_LIMIT: |
|
|
max_tokens = Config.MAX_TOKENS_LIMIT |
|
|
errors.append(f"max_tokens máximo es {Config.MAX_TOKENS_LIMIT}") |
|
|
|
|
|
|
|
|
if not isinstance(temperature, (int, float)): |
|
|
temperature = Config.DEFAULT_TEMPERATURE |
|
|
errors.append("temperature debe ser un número") |
|
|
elif temperature < Config.MIN_TEMPERATURE: |
|
|
temperature = Config.MIN_TEMPERATURE |
|
|
errors.append(f"temperature mínima es {Config.MIN_TEMPERATURE}") |
|
|
elif temperature > Config.MAX_TEMPERATURE: |
|
|
temperature = Config.MAX_TEMPERATURE |
|
|
errors.append(f"temperature máxima es {Config.MAX_TEMPERATURE}") |
|
|
|
|
|
return { |
|
|
"max_tokens": max_tokens, |
|
|
"temperature": float(temperature), |
|
|
"errors": errors |
|
|
} |
|
|
|
|
|
def create_error_response(error_msg: str) -> Dict[str, Any]: |
|
|
"""Crear respuesta de error estandarizada""" |
|
|
return { |
|
|
"response": f"Error: {error_msg}", |
|
|
"queue_status": { |
|
|
"queue_size": 0, |
|
|
"is_processing": False, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"error": True |
|
|
} |
|
|
} |
|
|
|
|
|
def truncate_context(text: str, max_length: int = 1800) -> str: |
|
|
"""Truncar contexto manteniendo coherencia""" |
|
|
if len(text) <= max_length: |
|
|
return text |
|
|
|
|
|
|
|
|
paragraphs = text.split('\n\n') |
|
|
truncated = "" |
|
|
|
|
|
for paragraph in paragraphs: |
|
|
if len(truncated + paragraph) <= max_length: |
|
|
truncated += paragraph + '\n\n' |
|
|
else: |
|
|
break |
|
|
|
|
|
|
|
|
if not truncated: |
|
|
sentences = text.split('. ') |
|
|
for sentence in sentences: |
|
|
if len(truncated + sentence) <= max_length: |
|
|
truncated += sentence + '. ' |
|
|
else: |
|
|
break |
|
|
|
|
|
|
|
|
if not truncated: |
|
|
truncated = text[:max_length] |
|
|
|
|
|
return truncated.strip() |
|
|
|
|
|
class PerformanceMonitor: |
|
|
"""Monitor de rendimiento simple""" |
|
|
|
|
|
def __init__(self): |
|
|
self.stats = { |
|
|
"total_requests": 0, |
|
|
"successful_requests": 0, |
|
|
"failed_requests": 0, |
|
|
"total_tokens_generated": 0, |
|
|
"average_response_time": 0, |
|
|
"start_time": datetime.now() |
|
|
} |
|
|
|
|
|
def record_request(self, success: bool, tokens_generated: int = 0, response_time: float = 0): |
|
|
"""Registrar una request""" |
|
|
self.stats["total_requests"] += 1 |
|
|
|
|
|
if success: |
|
|
self.stats["successful_requests"] += 1 |
|
|
self.stats["total_tokens_generated"] += tokens_generated |
|
|
else: |
|
|
self.stats["failed_requests"] += 1 |
|
|
|
|
|
|
|
|
if response_time > 0: |
|
|
current_avg = self.stats["average_response_time"] |
|
|
total_requests = self.stats["total_requests"] |
|
|
|
|
|
self.stats["average_response_time"] = ( |
|
|
(current_avg * (total_requests - 1) + response_time) / total_requests |
|
|
) |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""Obtener estadísticas""" |
|
|
uptime = datetime.now() - self.stats["start_time"] |
|
|
|
|
|
return { |
|
|
**self.stats, |
|
|
"uptime_seconds": uptime.total_seconds(), |
|
|
"success_rate": ( |
|
|
self.stats["successful_requests"] / max(self.stats["total_requests"], 1) |
|
|
) * 100, |
|
|
"tokens_per_minute": ( |
|
|
self.stats["total_tokens_generated"] / max(uptime.total_seconds() / 60, 1) |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
performance_monitor = PerformanceMonitor() |
|
|
|