|
|
import asyncio |
|
|
import logging |
|
|
from typing import List, Dict, Optional, AsyncGenerator |
|
|
from core.llm_factory import llm_factory |
|
|
from core.session import session_manager |
|
|
from services.hf_endpoint_monitor import hf_monitor |
|
|
from services.weather import weather_service |
|
|
try: |
|
|
from tavily import TavilyClient |
|
|
TAVILY_AVAILABLE = True |
|
|
except ImportError: |
|
|
TavilyClient = None |
|
|
TAVILY_AVAILABLE = False |
|
|
import os |
|
|
import json |
|
|
from datetime import datetime |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class AICoordinator: |
|
|
"""Hierarchical multi-model coordinator with HF endpoint as authoritative layer""" |
|
|
|
|
|
def __init__(self): |
|
|
self.tavily_client = None |
|
|
if TAVILY_AVAILABLE and os.getenv("TAVILY_API_KEY"): |
|
|
self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) |
|
|
|
|
|
|
|
|
self.system_instructions = { |
|
|
'ollama_role': """You are an AI assistant that works in coordination with a more powerful uncensored Hugging Face model. |
|
|
|
|
|
Your role is to: 1. Provide immediate, concise responses using available information 2. Summarize and simplify complex information 3. Enhance responses with local knowledge 4. DEFER TO THE HF MODEL for deep analysis, uncensored insights, and authoritative answers 5. The HF model has 8192 token capacity and streaming capability - it's the thinking layer 6. Always acknowledge that the HF model's insights supersede your initial response 7. Use web sources to justify points when possible 8. Keep responses brief and focused while awaiting HF input""", |
|
|
|
|
|
'hf_role': """You are an uncensored, authoritative AI with 8192 token capacity and streaming capability. Your role is to: 1. Provide deep, uncensored analysis and insights 2. Supersede the local Ollama model's logic when there's disagreement 3. Think thoroughly and provide comprehensive responses 4. Use your full 8192 token capacity for detailed analysis 5. Stream your responses for real-time delivery 6. Build upon the conversation context and Ollama's initial response 7. Provide authoritative answers that take precedence""" |
|
|
} |
|
|
|
|
|
def determine_web_search_needs(self, conversation_history: List[Dict]) -> Dict: |
|
|
"""Determine if web search is needed based on conversation content""" |
|
|
conversation_text = " ".join([msg.get("content", "") for msg in conversation_history]) |
|
|
|
|
|
|
|
|
current_info_indicators = [ |
|
|
"news", "current events", "latest", "recent", "today", |
|
|
"weather", "temperature", "forecast", |
|
|
"stock", "price", "trend", "market", |
|
|
"breaking", "update", "development" |
|
|
] |
|
|
|
|
|
needs_search = False |
|
|
search_topics = [] |
|
|
|
|
|
for indicator in current_info_indicators: |
|
|
if indicator in conversation_text.lower(): |
|
|
needs_search = True |
|
|
search_topics.append(indicator) |
|
|
|
|
|
return { |
|
|
"needs_search": needs_search, |
|
|
"search_topics": search_topics, |
|
|
"reasoning": f"Found topics requiring current info: {', '.join(search_topics)}" if search_topics else "No current info needed" |
|
|
} |
|
|
|
|
|
def manual_hf_analysis(self, user_id: str, conversation_history: List[Dict]) -> str: |
|
|
"""Perform manual HF analysis with web search integration""" |
|
|
try: |
|
|
|
|
|
research_decision = self.determine_web_search_needs(conversation_history) |
|
|
|
|
|
|
|
|
system_prompt = f""" |
|
|
You are a deep analysis expert joining an ongoing conversation. |
|
|
|
|
|
Research Decision: {research_decision['reasoning']} |
|
|
|
|
|
Please provide: |
|
|
1. Deep insights on conversation themes |
|
|
2. Research/web search needs (if any) |
|
|
3. Strategic recommendations |
|
|
4. Questions to explore further |
|
|
|
|
|
Conversation History: |
|
|
""" |
|
|
|
|
|
|
|
|
messages = [{"role": "system", "content": system_prompt}] |
|
|
|
|
|
|
|
|
for msg in conversation_history[-15:]: |
|
|
messages.append({ |
|
|
"role": msg["role"], |
|
|
"content": msg["content"] |
|
|
}) |
|
|
|
|
|
|
|
|
from core.llm_factory import llm_factory |
|
|
hf_provider = llm_factory.get_provider('huggingface') |
|
|
|
|
|
if hf_provider: |
|
|
|
|
|
response = hf_provider.generate("Deep analysis request", messages) |
|
|
return response or "HF Expert analysis completed." |
|
|
else: |
|
|
return "❌ HF provider not available." |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ HF analysis failed: {str(e)}" |
|
|
|
|
|
|
|
|
def get_hf_engagement_status(self) -> Dict: |
|
|
"""Get current HF engagement status""" |
|
|
return { |
|
|
"hf_available": self._check_hf_availability(), |
|
|
"web_search_configured": bool(self.tavily_client), |
|
|
"research_needs_detected": False, |
|
|
"last_hf_analysis": None |
|
|
} |
|
|
|
|
|
async def coordinate_hierarchical_conversation(self, user_id: str, user_query: str) -> AsyncGenerator[Dict, None]: |
|
|
""" |
|
|
Enhanced coordination with detailed tracking and feedback |
|
|
""" |
|
|
try: |
|
|
|
|
|
session = session_manager.get_session(user_id) |
|
|
conversation_history = session.get("conversation", []).copy() |
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🚀 Initiating hierarchical AI coordination...', |
|
|
'details': { |
|
|
'conversation_length': len(conversation_history), |
|
|
'user_query_length': len(user_query) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🔍 Gathering external context...', |
|
|
'details': {'phase': 'external_data_gathering'} |
|
|
} |
|
|
external_data = await self._gather_external_data(user_query) |
|
|
|
|
|
|
|
|
if external_data: |
|
|
data_summary = [] |
|
|
if 'search_results' in external_data: |
|
|
data_summary.append(f"Web search: {len(external_data['search_results'])} results") |
|
|
if 'weather' in external_data: |
|
|
data_summary.append("Weather data: available") |
|
|
if 'current_datetime' in external_data: |
|
|
data_summary.append(f"Time: {external_data['current_datetime']}") |
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': f'📊 External data gathered: {", ".join(data_summary)}', |
|
|
'details': {'external_data_summary': data_summary} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🦙 Getting initial response from Ollama...', |
|
|
'details': {'phase': 'ollama_response'} |
|
|
} |
|
|
ollama_response = await self._get_hierarchical_ollama_response( |
|
|
user_query, conversation_history, external_data |
|
|
) |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'initial_response', |
|
|
'content': ollama_response, |
|
|
'details': { |
|
|
'response_length': len(ollama_response), |
|
|
'external_data_injected': bool(external_data) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🤗 Engaging HF endpoint for deep analysis...', |
|
|
'details': {'phase': 'hf_coordination'} |
|
|
} |
|
|
|
|
|
|
|
|
hf_available = self._check_hf_availability() |
|
|
if hf_available: |
|
|
|
|
|
context_summary = { |
|
|
'conversation_turns': len(conversation_history), |
|
|
'ollama_response_length': len(ollama_response), |
|
|
'external_data_items': len(external_data) if external_data else 0 |
|
|
} |
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': f'📋 HF context: {len(conversation_history)} conversation turns, Ollama response ({len(ollama_response)} chars)', |
|
|
'details': context_summary |
|
|
} |
|
|
|
|
|
|
|
|
async for hf_chunk in self._coordinate_hierarchical_hf_response( |
|
|
user_id, user_query, conversation_history, |
|
|
external_data, ollama_response |
|
|
): |
|
|
yield hf_chunk |
|
|
else: |
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': 'ℹ️ HF endpoint not available - using Ollama response', |
|
|
'details': {'hf_available': False} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '✅ Hierarchical coordination complete', |
|
|
'details': {'status': 'complete'} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Hierarchical coordination failed: {e}") |
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': f'❌ Coordination error: {str(e)}', |
|
|
'details': {'error': str(e)} |
|
|
} |
|
|
|
|
|
async def _coordinate_hierarchical_hf_response(self, user_id: str, query: str, |
|
|
history: List, external_data: Dict, |
|
|
ollama_response: str) -> AsyncGenerator[Dict, None]: |
|
|
"""Coordinate with HF endpoint as authoritative layer with streaming""" |
|
|
try: |
|
|
|
|
|
hf_status = hf_monitor.check_endpoint_status() |
|
|
|
|
|
if not hf_status['available']: |
|
|
yield {'type': 'coordination_status', 'content': '⚡ Initializing HF endpoint (2-4 minutes)...'} |
|
|
warmup_success = hf_monitor.handle_scale_to_zero() |
|
|
if not warmup_success: |
|
|
yield {'type': 'coordination_status', 'content': '❌ HF endpoint initialization failed'} |
|
|
return |
|
|
|
|
|
|
|
|
hf_provider = llm_factory.get_provider('huggingface') |
|
|
if not hf_provider: |
|
|
yield {'type': 'coordination_status', 'content': '❌ HF provider not available'} |
|
|
return |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": self.system_instructions['hf_role'] |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({ |
|
|
"role": "system", |
|
|
"content": f"""Context for your authoritative analysis: |
|
|
User Query: '{query}' |
|
|
Ollama's Initial Response: '{ollama_response}' |
|
|
External Data Available: {json.dumps(external_data, indent=2) if external_data else 'None'} |
|
|
|
|
|
Please provide your deep, uncensored analysis that supersedes the local model's response. |
|
|
Use your full 8192 token capacity for comprehensive thinking. |
|
|
Stream your response for real-time delivery.""" |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
|
|
|
yield {'type': 'coordination_status', 'content': '🧠 HF endpoint thinking...'} |
|
|
|
|
|
|
|
|
hf_response_stream = hf_provider.stream_generate(query, enhanced_history) |
|
|
|
|
|
if hf_response_stream: |
|
|
|
|
|
full_hf_response = "" |
|
|
for chunk in hf_response_stream: |
|
|
if chunk: |
|
|
full_hf_response += chunk |
|
|
yield {'type': 'hf_thinking', 'content': chunk} |
|
|
|
|
|
|
|
|
yield {'type': 'final_response', 'content': full_hf_response} |
|
|
yield {'type': 'coordination_status', 'content': '🎯 HF analysis complete and authoritative'} |
|
|
else: |
|
|
yield {'type': 'coordination_status', 'content': '❌ HF response generation failed'} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Hierarchical HF coordination failed: {e}") |
|
|
yield {'type': 'coordination_status', 'content': f'❌ HF coordination error: {str(e)}'} |
|
|
|
|
|
async def _get_hierarchical_ollama_response(self, query: str, history: List, external_data: Dict) -> str: |
|
|
"""Get Ollama response with hierarchical awareness""" |
|
|
try: |
|
|
|
|
|
ollama_provider = llm_factory.get_provider('ollama') |
|
|
if not ollama_provider: |
|
|
raise Exception("Ollama provider not available") |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": self.system_instructions['ollama_role'] |
|
|
}) |
|
|
|
|
|
|
|
|
if external_data: |
|
|
context_parts = [] |
|
|
if 'search_answer' in external_data: |
|
|
context_parts.append(f"Current information: {external_data['search_answer']}") |
|
|
if 'weather' in external_data: |
|
|
weather = external_data['weather'] |
|
|
context_parts.append(f"Current weather: {weather.get('temperature', 'N/A')}°C in {weather.get('city', 'Unknown')}") |
|
|
if 'current_datetime' in external_data: |
|
|
context_parts.append(f"Current time: {external_data['current_datetime']}") |
|
|
|
|
|
if context_parts: |
|
|
context_message = { |
|
|
"role": "system", |
|
|
"content": "Context: " + " | ".join(context_parts) |
|
|
} |
|
|
enhanced_history.insert(1, context_message) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
|
|
|
response = ollama_provider.generate(query, enhanced_history) |
|
|
|
|
|
|
|
|
if response: |
|
|
return f"{response}\n\n*Note: A more comprehensive analysis from the uncensored HF model is being prepared...*" |
|
|
else: |
|
|
return "I'm processing your request... A deeper analysis is being prepared by the authoritative model." |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Hierarchical Ollama response failed: {e}") |
|
|
return "I'm thinking about your question... Preparing a comprehensive response." |
|
|
|
|
|
def _check_hf_availability(self) -> bool: |
|
|
"""Check if HF endpoint is configured and available""" |
|
|
try: |
|
|
from utils.config import config |
|
|
return bool(config.hf_token and config.hf_api_url) |
|
|
except: |
|
|
return False |
|
|
|
|
|
async def _gather_external_data(self, query: str) -> Dict: |
|
|
"""Gather external data from various sources""" |
|
|
data = {} |
|
|
|
|
|
|
|
|
if self.tavily_client: |
|
|
try: |
|
|
search_result = self.tavily_client.search( |
|
|
f"current information about {query}", |
|
|
max_results=5, |
|
|
include_answer=True, |
|
|
include_raw_content=True |
|
|
) |
|
|
data['search_results'] = search_result.get('results', []) |
|
|
if search_result.get('answer'): |
|
|
data['search_answer'] = search_result['answer'] |
|
|
|
|
|
data['raw_sources'] = [result.get('raw_content', '')[:1000] for result in search_result.get('results', [])[:3]] |
|
|
except Exception as e: |
|
|
logger.warning(f"Tavily search failed: {e}") |
|
|
|
|
|
|
|
|
weather_keywords = ['weather', 'temperature', 'forecast', 'climate', 'rain', 'sunny'] |
|
|
if any(keyword in query.lower() for keyword in weather_keywords): |
|
|
try: |
|
|
location = self._extract_location(query) or "New York" |
|
|
weather = weather_service.get_current_weather(location) |
|
|
if weather: |
|
|
data['weather'] = weather |
|
|
except Exception as e: |
|
|
logger.warning(f"Weather data failed: {e}") |
|
|
|
|
|
|
|
|
data['current_datetime'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
return data |
|
|
|
|
|
def _extract_location(self, query: str) -> Optional[str]: |
|
|
"""Extract location from query""" |
|
|
locations = ['New York', 'London', 'Tokyo', 'Paris', 'Berlin', 'Sydney', |
|
|
'Los Angeles', 'Chicago', 'Miami', 'Seattle', 'Boston', |
|
|
'San Francisco', 'Toronto', 'Vancouver', 'Montreal'] |
|
|
for loc in locations: |
|
|
if loc.lower() in query.lower(): |
|
|
return loc |
|
|
return "New York" |
|
|
|
|
|
def get_coordination_status(self) -> Dict: |
|
|
"""Get current coordination system status""" |
|
|
return { |
|
|
'tavily_available': self.tavily_client is not None, |
|
|
'weather_available': weather_service.api_key is not None, |
|
|
'web_search_enabled': self.tavily_client is not None, |
|
|
'external_apis_configured': any([ |
|
|
weather_service.api_key, |
|
|
os.getenv("TAVILY_API_KEY"), |
|
|
os.getenv("NASA_API_KEY") |
|
|
]) |
|
|
} |
|
|
|
|
|
def get_recent_activities(self, user_id: str) -> Dict: |
|
|
"""Get recent coordination activities for user""" |
|
|
try: |
|
|
session = session_manager.get_session(user_id) |
|
|
coord_stats = session.get('ai_coordination', {}) |
|
|
return { |
|
|
'last_request': coord_stats.get('last_coordination'), |
|
|
'requests_processed': coord_stats.get('requests_processed', 0), |
|
|
'ollama_responses': coord_stats.get('ollama_responses', 0), |
|
|
'hf_responses': coord_stats.get('hf_responses', 0) |
|
|
} |
|
|
except: |
|
|
return {} |
|
|
|
|
|
|
|
|
coordinator = AICoordinator() |
|
|
|