myspace134v / modules /analyzer.py
rdune71's picture
34534
03df531
raw
history blame
9.83 kB
# modules/analyzer.py
from openai import OpenAI
import requests
import time
import logging
from modules.server_cache import RedisServerStatusCache
from modules.status_logger import log_server_status
class Analyzer:
def __init__(self, base_url, api_key):
self.client = OpenAI(
base_url=base_url,
api_key=api_key
)
self.base_url = base_url.rstrip('/')
self.health_check_url = self.base_url + "/health"
self.models_url = self.base_url + "/models"
self.headers = {"Authorization": f"Bearer {api_key}"}
self.cache_key = f"server_status_{base_url}"
# Connect to Redis cache
self.cache = RedisServerStatusCache()
def is_server_ready(self):
# Check cache first
cached_status = self.cache.get(self.cache_key)
if cached_status is not None:
logging.info(f"Using cached server status: {cached_status}")
return cached_status
# Try multiple approaches to check if server is ready
is_ready = False
# Approach 1: Try /models endpoint (commonly available)
try:
logging.info(f"Checking server models at: {self.models_url}")
response = requests.get(self.models_url, headers=self.headers, timeout=10)
if response.status_code in [200, 401, 403]: # 401/403 means auth required but endpoint exists
is_ready = True
logging.info(f"Server models check response: {response.status_code}")
else:
logging.info(f"Server models check response: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.info(f"Models endpoint check failed: {str(e)}")
# Approach 2: Try a lightweight API call if models endpoint failed
if not is_ready:
try:
logging.info("Trying lightweight API call to test server availability")
# Make a request to list models (doesn't consume tokens)
response = requests.get(f"{self.base_url}/models", headers=self.headers, timeout=10)
if response.status_code in [200, 401, 403]:
is_ready = True
logging.info(f"Lightweight API call response: {response.status_code}")
else:
logging.info(f"Lightweight API call response: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.info(f"Lightweight API call failed: {str(e)}")
# Cache the result for a shorter time since we're not using a proper health endpoint
self.cache.set(self.cache_key, is_ready, ttl=60) # Cache for 1 minute
log_server_status(self.cache_key, is_ready)
return is_ready
def wait_for_server(self, timeout=180, interval=15):
if self.is_server_ready():
logging.info("✅ Server is already ready (from cache or direct check).")
return True
logging.info("⏳ Server not ready. Starting polling...")
start_time = time.time()
while time.time() - start_time < timeout:
logging.info(f"Polling server... ({int(time.time() - start_time)}s elapsed)")
if self.is_server_ready():
logging.info("✅ Server is now ready!")
return True
time.sleep(interval)
logging.warning("⏰ Server initialization timeout reached")
return False
def analyze_stream(self, query, search_results):
"""
Analyze search results using the custom LLM with streaming output
Yields chunks of the response as they arrive
"""
# Prepare context from search results
context = "\n\n".join([
f"Source: {result.get('url', 'N/A')}\nTitle: {result.get('title', 'N/A')}\nContent: {result.get('content', 'N/A')}"
for result in search_results[:5] # Limit to top 5 for context
])
prompt = f"""
You are an expert research analyst. Analyze the following query and information to provide a comprehensive summary.
Query: {query}
Information:
{context}
Please provide:
1. A brief overview of the topic
2. Key findings or developments
3. Different perspectives or approaches
4. Potential implications or future directions
5. Any controversies or conflicting viewpoints
Structure your response clearly with these sections. If there is insufficient information, state that clearly.
"""
try:
# First check if server is ready
logging.info("Checking if server is ready for analysis...")
if not self.wait_for_server(timeout=180): # 3 minutes timeout
error_msg = "⚠️ The AI model server is still initializing. Please try again in a few minutes."
logging.warning(error_msg)
yield error_msg
return
logging.info("Server is ready. Sending streaming request to AI model...")
# Send streaming request
response = self.client.chat.completions.create(
model="DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf",
messages=[
{"role": "system", "content": "You are a helpful research assistant that provides structured, analytical responses."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=1500,
stream=True # Enable streaming
)
# Yield chunks as they arrive
for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
logging.info("Analysis streaming completed successfully")
except Exception as e:
error_msg = str(e)
logging.error(f"Analysis streaming failed: {error_msg}")
if "503" in error_msg or "Service Unavailable" in error_msg:
yield "⚠️ The AI model server is currently unavailable. It may be initializing. Please try again in a few minutes."
elif "timeout" in error_msg.lower() or "read timeout" in error_msg.lower():
yield "⚠️ The AI model request timed out. The server may be overloaded or initializing. Please try again in a few minutes."
elif "404" in error_msg:
yield "⚠️ The AI model endpoint was not found. Please check the configuration."
else:
yield f"Analysis failed: {str(e)}"
def analyze(self, query, search_results):
"""
Non-streaming version for compatibility
"""
# Prepare context from search results
context = "\n\n".join([
f"Source: {result.get('url', 'N/A')}\nTitle: {result.get('title', 'N/A')}\nContent: {result.get('content', 'N/A')}"
for result in search_results[:5] # Limit to top 5 for context
])
prompt = f"""
You are an expert research analyst. Analyze the following query and information to provide a comprehensive summary.
Query: {query}
Information:
{context}
Please provide:
1. A brief overview of the topic
2. Key findings or developments
3. Different perspectives or approaches
4. Potential implications or future directions
5. Any controversies or conflicting viewpoints
Structure your response clearly with these sections. If there is insufficient information, state that clearly.
"""
try:
# First check if server is ready
logging.info("Checking if server is ready for analysis...")
if not self.wait_for_server(timeout=180): # 3 minutes timeout
error_msg = "⚠️ The AI model server is still initializing. Please try again in a few minutes."
logging.warning(error_msg)
return error_msg
logging.info("Server is ready. Sending request to AI model...")
response = self.client.chat.completions.create(
model="DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf",
messages=[
{"role": "system", "content": "You are a helpful research assistant that provides structured, analytical responses."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=1500,
stream=False
)
result_content = response.choices[0].message.content
logging.info("Analysis completed successfully")
return result_content
except Exception as e:
error_msg = str(e)
logging.error(f"Analysis failed: {error_msg}")
if "503" in error_msg or "Service Unavailable" in error_msg:
return "⚠️ The AI model server is currently unavailable. It may be initializing. Please try again in a few minutes."
elif "timeout" in error_msg.lower() or "read timeout" in error_msg.lower():
return "⚠️ The AI model request timed out. The server may be overloaded or initializing. Please try again in a few minutes."
elif "404" in error_msg:
return "⚠️ The AI model endpoint was not found. Please check the configuration."
return f"Analysis failed: {str(e)}"