# modules/analyzer.py from openai import OpenAI import requests import time import logging from modules.server_cache import RedisServerStatusCache from modules.status_logger import log_server_status class Analyzer: def __init__(self, base_url, api_key): self.client = OpenAI( base_url=base_url, api_key=api_key ) self.base_url = base_url.rstrip('/') self.health_check_url = self.base_url + "/health" self.models_url = self.base_url + "/models" self.headers = {"Authorization": f"Bearer {api_key}"} self.cache_key = f"server_status_{base_url}" # Connect to Redis cache self.cache = RedisServerStatusCache() def is_server_ready(self): # Check cache first cached_status = self.cache.get(self.cache_key) if cached_status is not None: logging.info(f"Using cached server status: {cached_status}") return cached_status # Try multiple approaches to check if server is ready is_ready = False # Approach 1: Try /models endpoint (commonly available) try: logging.info(f"Checking server models at: {self.models_url}") response = requests.get(self.models_url, headers=self.headers, timeout=10) if response.status_code in [200, 401, 403]: # 401/403 means auth required but endpoint exists is_ready = True logging.info(f"Server models check response: {response.status_code}") else: logging.info(f"Server models check response: {response.status_code}") except requests.exceptions.RequestException as e: logging.info(f"Models endpoint check failed: {str(e)}") # Approach 2: Try a lightweight API call if models endpoint failed if not is_ready: try: logging.info("Trying lightweight API call to test server availability") # Make a request to list models (doesn't consume tokens) response = requests.get(f"{self.base_url}/models", headers=self.headers, timeout=10) if response.status_code in [200, 401, 403]: is_ready = True logging.info(f"Lightweight API call response: {response.status_code}") else: logging.info(f"Lightweight API call response: {response.status_code}") except requests.exceptions.RequestException as e: logging.info(f"Lightweight API call failed: {str(e)}") # Cache the result for a shorter time since we're not using a proper health endpoint self.cache.set(self.cache_key, is_ready, ttl=60) # Cache for 1 minute log_server_status(self.cache_key, is_ready) return is_ready def wait_for_server(self, timeout=180, interval=15): if self.is_server_ready(): logging.info("✅ Server is already ready (from cache or direct check).") return True logging.info("⏳ Server not ready. Starting polling...") start_time = time.time() while time.time() - start_time < timeout: logging.info(f"Polling server... ({int(time.time() - start_time)}s elapsed)") if self.is_server_ready(): logging.info("✅ Server is now ready!") return True time.sleep(interval) logging.warning("⏰ Server initialization timeout reached") return False def analyze_stream(self, query, search_results): """ Analyze search results using the custom LLM with streaming output Yields chunks of the response as they arrive """ # Prepare context from search results context = "\n\n".join([ f"Source: {result.get('url', 'N/A')}\nTitle: {result.get('title', 'N/A')}\nContent: {result.get('content', 'N/A')}" for result in search_results[:5] # Limit to top 5 for context ]) prompt = f""" You are an expert research analyst. Analyze the following query and information to provide a comprehensive summary. Query: {query} Information: {context} Please provide: 1. A brief overview of the topic 2. Key findings or developments 3. Different perspectives or approaches 4. Potential implications or future directions 5. Any controversies or conflicting viewpoints Structure your response clearly with these sections. If there is insufficient information, state that clearly. """ try: # First check if server is ready logging.info("Checking if server is ready for analysis...") if not self.wait_for_server(timeout=180): # 3 minutes timeout error_msg = "⚠️ The AI model server is still initializing. Please try again in a few minutes." logging.warning(error_msg) yield error_msg return logging.info("Server is ready. Sending streaming request to AI model...") # Send streaming request response = self.client.chat.completions.create( model="DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf", messages=[ {"role": "system", "content": "You are a helpful research assistant that provides structured, analytical responses."}, {"role": "user", "content": prompt} ], temperature=0.7, max_tokens=1500, stream=True # Enable streaming ) # Yield chunks as they arrive for chunk in response: if chunk.choices[0].delta.content: yield chunk.choices[0].delta.content logging.info("Analysis streaming completed successfully") except Exception as e: error_msg = str(e) logging.error(f"Analysis streaming failed: {error_msg}") if "503" in error_msg or "Service Unavailable" in error_msg: yield "⚠️ The AI model server is currently unavailable. It may be initializing. Please try again in a few minutes." elif "timeout" in error_msg.lower() or "read timeout" in error_msg.lower(): yield "⚠️ The AI model request timed out. The server may be overloaded or initializing. Please try again in a few minutes." elif "404" in error_msg: yield "⚠️ The AI model endpoint was not found. Please check the configuration." else: yield f"Analysis failed: {str(e)}" def analyze(self, query, search_results): """ Non-streaming version for compatibility """ # Prepare context from search results context = "\n\n".join([ f"Source: {result.get('url', 'N/A')}\nTitle: {result.get('title', 'N/A')}\nContent: {result.get('content', 'N/A')}" for result in search_results[:5] # Limit to top 5 for context ]) prompt = f""" You are an expert research analyst. Analyze the following query and information to provide a comprehensive summary. Query: {query} Information: {context} Please provide: 1. A brief overview of the topic 2. Key findings or developments 3. Different perspectives or approaches 4. Potential implications or future directions 5. Any controversies or conflicting viewpoints Structure your response clearly with these sections. If there is insufficient information, state that clearly. """ try: # First check if server is ready logging.info("Checking if server is ready for analysis...") if not self.wait_for_server(timeout=180): # 3 minutes timeout error_msg = "⚠️ The AI model server is still initializing. Please try again in a few minutes." logging.warning(error_msg) return error_msg logging.info("Server is ready. Sending request to AI model...") response = self.client.chat.completions.create( model="DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf", messages=[ {"role": "system", "content": "You are a helpful research assistant that provides structured, analytical responses."}, {"role": "user", "content": prompt} ], temperature=0.7, max_tokens=1500, stream=False ) result_content = response.choices[0].message.content logging.info("Analysis completed successfully") return result_content except Exception as e: error_msg = str(e) logging.error(f"Analysis failed: {error_msg}") if "503" in error_msg or "Service Unavailable" in error_msg: return "⚠️ The AI model server is currently unavailable. It may be initializing. Please try again in a few minutes." elif "timeout" in error_msg.lower() or "read timeout" in error_msg.lower(): return "⚠️ The AI model request timed out. The server may be overloaded or initializing. Please try again in a few minutes." elif "404" in error_msg: return "⚠️ The AI model endpoint was not found. Please check the configuration." return f"Analysis failed: {str(e)}"