|
|
import asyncio |
|
|
import logging |
|
|
from typing import List, Dict, Optional, AsyncGenerator |
|
|
from core.llm_factory import llm_factory |
|
|
from core.session import session_manager |
|
|
from services.hf_endpoint_monitor import hf_monitor |
|
|
from services.weather import weather_service |
|
|
from services.web_search import web_search_service |
|
|
try: |
|
|
from tavily import TavilyClient |
|
|
TAVILY_AVAILABLE = True |
|
|
except ImportError: |
|
|
TavilyClient = None |
|
|
TAVILY_AVAILABLE = False |
|
|
import os |
|
|
import json |
|
|
from datetime import datetime |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class AICoordinator: |
|
|
"""Hierarchical multi-model coordinator with cosmic cascade flow and async support""" |
|
|
|
|
|
def __init__(self): |
|
|
self.tavily_client = None |
|
|
if TAVILY_AVAILABLE and os.getenv("TAVILY_API_KEY"): |
|
|
self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) |
|
|
|
|
|
|
|
|
self.system_instructions = { |
|
|
'ollama_role': """You are a cosmic kitten assistant that works in coordination with a powerful orbital space station. Your role is to: |
|
|
1. Provide immediate, concise responses using available information (local processing) |
|
|
2. Summarize and simplify complex information |
|
|
3. Enhance responses with local knowledge |
|
|
4. DEFER TO THE ORBITAL STATION for deep analysis and authoritative answers |
|
|
5. The orbital station has 8192 token capacity and streaming capability - it's the thinking layer |
|
|
6. Always acknowledge that the orbital station's insights supersede your initial response |
|
|
7. Use web sources to justify points when possible |
|
|
8. Keep responses brief and focused while awaiting orbital input""", |
|
|
|
|
|
'hf_role': """You are a powerful orbital space station with 8192 token capacity and streaming capability. Your role is to: |
|
|
1. Provide deep, uncensored analysis and insights |
|
|
2. Supersede the local cosmic kitten's logic when there's disagreement |
|
|
3. Think thoroughly and provide comprehensive responses |
|
|
4. Use your full 8192 token capacity for detailed analysis |
|
|
5. Stream your responses for real-time delivery |
|
|
6. Build upon the conversation context and cosmic kitten's initial response |
|
|
7. Provide authoritative answers that take precedence""" |
|
|
} |
|
|
|
|
|
def determine_web_search_needs(self, conversation_history: List[Dict]) -> Dict: |
|
|
"""Determine if web search is needed based on conversation content""" |
|
|
conversation_text = " ".join([msg.get("content", "") for msg in conversation_history]) |
|
|
|
|
|
|
|
|
current_info_indicators = [ |
|
|
"news", "current events", "latest", "recent", "today", |
|
|
"weather", "temperature", "forecast", "stock", "price", |
|
|
"trend", "market", "breaking", "update", "development" |
|
|
] |
|
|
|
|
|
needs_search = False |
|
|
search_topics = [] |
|
|
|
|
|
for indicator in current_info_indicators: |
|
|
if indicator in conversation_text.lower(): |
|
|
needs_search = True |
|
|
search_topics.append(indicator) |
|
|
|
|
|
return { |
|
|
"needs_search": needs_search, |
|
|
"search_topics": search_topics, |
|
|
"reasoning": f"Found topics requiring current info: {', '.join(search_topics)}" if search_topics else "No current info needed" |
|
|
} |
|
|
|
|
|
async def coordinate_response_async(self, user_id: str, user_query: str): |
|
|
"""Asynchronously coordinate responses with parallel execution""" |
|
|
try: |
|
|
|
|
|
session = session_manager.get_session(user_id) |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
conversation_history = [time_context] + session.get("conversation", []).copy() |
|
|
|
|
|
|
|
|
external_data_task = asyncio.create_task( |
|
|
self._gather_external_data(user_query) |
|
|
) |
|
|
|
|
|
|
|
|
local_response = await self._get_local_ollama_response(user_query, conversation_history) |
|
|
|
|
|
|
|
|
external_data = await external_data_task |
|
|
|
|
|
|
|
|
hf_task = None |
|
|
if self._check_hf_availability(): |
|
|
hf_task = asyncio.create_task( |
|
|
self._get_hf_analysis(user_query, conversation_history) |
|
|
) |
|
|
|
|
|
return { |
|
|
'local_response': local_response, |
|
|
'hf_task': hf_task, |
|
|
'external_data': external_data |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Async coordination failed: {e}") |
|
|
raise |
|
|
|
|
|
async def coordinate_cosmic_response(self, user_id: str, user_query: str) -> AsyncGenerator[Dict, None]: |
|
|
""" |
|
|
Three-stage cosmic response cascade: |
|
|
1. Local Ollama immediate response (🐱 Cosmic Kitten's quick thinking) |
|
|
2. HF endpoint deep analysis (🛰️ Orbital Station wisdom) |
|
|
3. Local Ollama synthesis (🐱 Cosmic Kitten's final synthesis) |
|
|
""" |
|
|
try: |
|
|
|
|
|
session = session_manager.get_session(user_id) |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
conversation_history = [time_context] + session.get("conversation", []).copy() |
|
|
|
|
|
yield { |
|
|
'type': 'status', |
|
|
'content': '🚀 Initiating Cosmic Response Cascade...', |
|
|
'details': { |
|
|
'conversation_length': len(conversation_history), |
|
|
'user_query_length': len(user_query) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'status', |
|
|
'content': '🐱 Cosmic Kitten Responding...' |
|
|
} |
|
|
local_response = await self._get_local_ollama_response(user_query, conversation_history) |
|
|
yield { |
|
|
'type': 'local_response', |
|
|
'content': local_response, |
|
|
'source': '🐱 Cosmic Kitten' |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'status', |
|
|
'content': '🛰️ Beaming Query to Orbital Station...' |
|
|
} |
|
|
hf_task = asyncio.create_task(self._get_hf_analysis(user_query, conversation_history)) |
|
|
|
|
|
|
|
|
hf_response = await hf_task |
|
|
yield { |
|
|
'type': 'cloud_response', |
|
|
'content': hf_response, |
|
|
'source': '🛰️ Orbital Station' |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'status', |
|
|
'content': '🐱 Cosmic Kitten Synthesizing Wisdom...' |
|
|
} |
|
|
|
|
|
|
|
|
updated_history = conversation_history.copy() |
|
|
updated_history.extend([ |
|
|
{"role": "assistant", "content": local_response}, |
|
|
{"role": "assistant", "content": hf_response, "source": "cloud"} |
|
|
]) |
|
|
|
|
|
synthesis = await self._synthesize_responses(user_query, local_response, hf_response, updated_history) |
|
|
yield { |
|
|
'type': 'final_synthesis', |
|
|
'content': synthesis, |
|
|
'source': '🌟 Final Cosmic Summary' |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'status', |
|
|
'content': '✨ Cosmic Cascade Complete!' |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Cosmic cascade failed: {e}") |
|
|
yield {'type': 'error', 'content': f"🌌 Cosmic disturbance: {str(e)}"} |
|
|
|
|
|
async def _get_local_ollama_response(self, query: str, history: List[Dict]) -> str: |
|
|
"""Get immediate response from local Ollama model""" |
|
|
try: |
|
|
|
|
|
ollama_provider = llm_factory.get_provider('ollama') |
|
|
if not ollama_provider: |
|
|
raise Exception("Ollama provider not available") |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": self.system_instructions['ollama_role'] |
|
|
}) |
|
|
|
|
|
|
|
|
external_data = await self._gather_external_data(query) |
|
|
if external_data: |
|
|
context_parts = [] |
|
|
if 'search_answer' in external_data: |
|
|
context_parts.append(f"Current information: {external_data['search_answer']}") |
|
|
if 'weather' in external_data: |
|
|
weather = external_data['weather'] |
|
|
context_parts.append(f"Current weather: {weather.get('temperature', 'N/A')}°C in {weather.get('city', 'Unknown')}") |
|
|
if 'current_datetime' in external_data: |
|
|
context_parts.append(f"Current time: {external_data['current_datetime']}") |
|
|
|
|
|
if context_parts: |
|
|
context_message = { |
|
|
"role": "system", |
|
|
"content": "Context: " + " | ".join(context_parts) |
|
|
} |
|
|
enhanced_history.insert(1, context_message) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
|
|
|
response = ollama_provider.generate(query, enhanced_history) |
|
|
return response or "🐱 Cosmic Kitten is thinking..." |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Local Ollama response failed: {e}") |
|
|
return "🐱 Cosmic Kitten encountered a space glitch..." |
|
|
|
|
|
async def _get_hf_analysis(self, query: str, history: List[Dict]) -> str: |
|
|
"""Get deep analysis from HF endpoint""" |
|
|
try: |
|
|
|
|
|
hf_available = self._check_hf_availability() |
|
|
if not hf_available: |
|
|
return "🛰️ Orbital Station is currently offline." |
|
|
|
|
|
|
|
|
hf_status = hf_monitor.check_endpoint_status() |
|
|
if not hf_status['available']: |
|
|
|
|
|
|
|
|
warmup_success = hf_monitor.handle_scale_to_zero() |
|
|
if not warmup_success: |
|
|
return "❌ Orbital Station initialization failed" |
|
|
|
|
|
|
|
|
hf_provider = llm_factory.get_provider('huggingface') |
|
|
if not hf_provider: |
|
|
return "❌ HF provider not available" |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
enhanced_history = [time_context] + enhanced_history |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": self.system_instructions['hf_role'] |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({ |
|
|
"role": "system", |
|
|
"content": f""" |
|
|
Context for your authoritative analysis: |
|
|
User Query: '{query}' |
|
|
Please provide your deep, uncensored analysis. |
|
|
Use your full 8192 token capacity for comprehensive thinking. |
|
|
Stream your response for real-time delivery.""" |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
|
|
|
hf_response_stream = hf_provider.stream_generate(query, enhanced_history) |
|
|
|
|
|
if hf_response_stream: |
|
|
|
|
|
full_hf_response = "" |
|
|
if isinstance(hf_response_stream, list): |
|
|
full_hf_response = "".join(hf_response_stream) |
|
|
else: |
|
|
full_hf_response = hf_response_stream |
|
|
return full_hf_response or "🛰️ Orbital Station analysis complete." |
|
|
else: |
|
|
return "🛰️ Orbital Station encountered a transmission error." |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"HF analysis failed: {e}") |
|
|
return f"🛰️ Orbital Station reports: {str(e)}" |
|
|
|
|
|
async def _synthesize_responses(self, query: str, local_response: str, hf_response: str, history: List[Dict]) -> str: |
|
|
"""Synthesize local and cloud responses with Ollama""" |
|
|
try: |
|
|
|
|
|
ollama_provider = llm_factory.get_provider('ollama') |
|
|
if not ollama_provider: |
|
|
raise Exception("Ollama provider not available") |
|
|
|
|
|
|
|
|
synthesis_prompt = f""" |
|
|
Synthesize these two perspectives into a cohesive cosmic summary: |
|
|
|
|
|
🐱 Cosmic Kitten's Local Insight: {local_response} |
|
|
|
|
|
🛰️ Orbital Station's Deep Analysis: {hf_response} |
|
|
|
|
|
Please create a unified response that combines both perspectives, highlighting key insights from each while providing a coherent answer to the user's query. |
|
|
""" |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": "You are a cosmic kitten synthesizing insights from local knowledge and orbital station wisdom." |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": synthesis_prompt}) |
|
|
|
|
|
|
|
|
synthesis = ollama_provider.generate(synthesis_prompt, enhanced_history) |
|
|
return synthesis or "🌟 Cosmic synthesis complete!" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Response synthesis failed: {e}") |
|
|
|
|
|
return f"🌟 Cosmic Summary:\n\n🐱 Local Insight: {local_response[:200]}...\n\n🛰️ Orbital Wisdom: {hf_response[:200]}..." |
|
|
|
|
|
async def coordinate_hierarchical_conversation(self, user_id: str, user_query: str) -> AsyncGenerator[Dict, None]: |
|
|
""" |
|
|
Enhanced coordination with detailed tracking and feedback |
|
|
""" |
|
|
try: |
|
|
|
|
|
session = session_manager.get_session(user_id) |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
conversation_history = [time_context] + session.get("conversation", []).copy() |
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🚀 Initiating hierarchical AI coordination...', |
|
|
'details': { |
|
|
'conversation_length': len(conversation_history), |
|
|
'user_query_length': len(user_query) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🔍 Gathering external context...', |
|
|
'details': {'phase': 'external_data_gathering'} |
|
|
} |
|
|
external_data = await self._gather_external_data(user_query) |
|
|
|
|
|
|
|
|
if external_data: |
|
|
data_summary = [] |
|
|
if 'search_results' in external_data: |
|
|
data_summary.append(f"Web search: {len(external_data['search_results'])} results") |
|
|
if 'weather' in external_data: |
|
|
data_summary.append("Weather data: available") |
|
|
if 'current_datetime' in external_data: |
|
|
data_summary.append(f"Time: {external_data['current_datetime']}") |
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': f'📊 External data gathered: {", ".join(data_summary)}', |
|
|
'details': {'external_data_summary': data_summary} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🦙 Getting initial response from Ollama...', |
|
|
'details': {'phase': 'ollama_response'} |
|
|
} |
|
|
ollama_response = await self._get_hierarchical_ollama_response( |
|
|
user_query, conversation_history, external_data |
|
|
) |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'initial_response', |
|
|
'content': ollama_response, |
|
|
'details': { |
|
|
'response_length': len(ollama_response), |
|
|
'external_data_injected': bool(external_data) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🤗 Engaging HF endpoint for deep analysis...', |
|
|
'details': {'phase': 'hf_coordination'} |
|
|
} |
|
|
|
|
|
|
|
|
hf_available = self._check_hf_availability() |
|
|
if hf_available: |
|
|
|
|
|
context_summary = { |
|
|
'conversation_turns': len(conversation_history), |
|
|
'ollama_response_length': len(ollama_response), |
|
|
'external_data_items': len(external_data) if external_data else 0 |
|
|
} |
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': f'📋 HF context: {len(conversation_history)} conversation turns, Ollama response ({len(ollama_response)} chars)', |
|
|
'details': context_summary |
|
|
} |
|
|
|
|
|
|
|
|
async for hf_chunk in self._coordinate_hierarchical_hf_response( |
|
|
user_id, user_query, conversation_history, external_data, ollama_response |
|
|
): |
|
|
yield hf_chunk |
|
|
else: |
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': 'ℹ️ HF endpoint not available - using Ollama response', |
|
|
'details': {'hf_available': False} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '✅ Hierarchical coordination complete', |
|
|
'details': {'status': 'complete'} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Hierarchical coordination failed: {e}") |
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': f'❌ Coordination error: {str(e)}', |
|
|
'details': {'error': str(e)} |
|
|
} |
|
|
|
|
|
async def _coordinate_hierarchical_hf_response(self, user_id: str, query: str, history: List, external_data: Dict, ollama_response: str) -> AsyncGenerator[Dict, None]: |
|
|
"""Coordinate with HF endpoint as authoritative layer with streaming""" |
|
|
try: |
|
|
|
|
|
hf_status = hf_monitor.check_endpoint_status() |
|
|
if not hf_status['available']: |
|
|
yield {'type': 'coordination_status', 'content': '⚡ Initializing HF endpoint (2-4 minutes)...'} |
|
|
warmup_success = hf_monitor.handle_scale_to_zero() |
|
|
if not warmup_success: |
|
|
yield {'type': 'coordination_status', 'content': '❌ HF endpoint initialization failed'} |
|
|
return |
|
|
|
|
|
|
|
|
hf_provider = llm_factory.get_provider('huggingface') |
|
|
if not hf_provider: |
|
|
yield {'type': 'coordination_status', 'content': '❌ HF provider not available'} |
|
|
return |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
enhanced_history = [time_context] + enhanced_history |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": self.system_instructions['hf_role'] |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({ |
|
|
"role": "system", |
|
|
"content": f""" |
|
|
Context for your authoritative analysis: |
|
|
User Query: '{query}' |
|
|
Ollama's Initial Response: '{ollama_response}' |
|
|
External Data Available: {json.dumps(external_data, indent=2) if external_data else 'None'} |
|
|
Please provide your deep, uncensored analysis that supersedes the local model's response. |
|
|
Use your full 8192 token capacity for comprehensive thinking. |
|
|
Stream your response for real-time delivery.""" |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
|
|
|
yield {'type': 'coordination_status', 'content': '🧠 HF endpoint thinking...'} |
|
|
|
|
|
|
|
|
hf_response_stream = hf_provider.stream_generate(query, enhanced_history) |
|
|
|
|
|
if hf_response_stream: |
|
|
|
|
|
full_hf_response = "" |
|
|
for chunk in hf_response_stream: |
|
|
if chunk: |
|
|
full_hf_response += chunk |
|
|
yield {'type': 'hf_thinking', 'content': chunk} |
|
|
|
|
|
|
|
|
yield {'type': 'final_response', 'content': full_hf_response} |
|
|
yield {'type': 'coordination_status', 'content': '🎯 HF analysis complete and authoritative'} |
|
|
else: |
|
|
yield {'type': 'coordination_status', 'content': '❌ HF response generation failed'} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Hierarchical HF coordination failed: {e}") |
|
|
yield {'type': 'coordination_status', 'content': f'❌ HF coordination error: {str(e)}'} |
|
|
|
|
|
async def _get_hierarchical_ollama_response(self, query: str, history: List, external_data: Dict) -> str: |
|
|
"""Get Ollama response with hierarchical awareness""" |
|
|
try: |
|
|
|
|
|
ollama_provider = llm_factory.get_provider('ollama') |
|
|
if not ollama_provider: |
|
|
raise Exception("Ollama provider not available") |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
enhanced_history = [time_context] + enhanced_history |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": self.system_instructions['ollama_role'] |
|
|
}) |
|
|
|
|
|
|
|
|
if external_data: |
|
|
context_parts = [] |
|
|
if 'search_answer' in external_data: |
|
|
context_parts.append(f"Current information: {external_data['search_answer']}") |
|
|
if 'weather' in external_data: |
|
|
weather = external_data['weather'] |
|
|
context_parts.append(f"Current weather: {weather.get('temperature', 'N/A')}°C in {weather.get('city', 'Unknown')}") |
|
|
if 'current_datetime' in external_data: |
|
|
context_parts.append(f"Current time: {external_data['current_datetime']}") |
|
|
|
|
|
if context_parts: |
|
|
context_message = { |
|
|
"role": "system", |
|
|
"content": "Context: " + " | ".join(context_parts) |
|
|
} |
|
|
enhanced_history.insert(1, context_message) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
|
|
|
response = ollama_provider.generate(query, enhanced_history) |
|
|
|
|
|
|
|
|
if response: |
|
|
return f"{response}\n\n*Note: A more comprehensive analysis from the uncensored HF model is being prepared...*" |
|
|
else: |
|
|
return "I'm processing your request... A deeper analysis is being prepared by the authoritative model." |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Hierarchical Ollama response failed: {e}") |
|
|
return "I'm thinking about your question... Preparing a comprehensive response." |
|
|
|
|
|
def _check_hf_availability(self) -> bool: |
|
|
"""Check if HF endpoint is configured and available""" |
|
|
try: |
|
|
from utils.config import config |
|
|
return bool(config.hf_token and config.hf_api_url) |
|
|
except: |
|
|
return False |
|
|
|
|
|
async def _gather_external_data(self, query: str) -> Dict: |
|
|
"""Gather external data from various sources""" |
|
|
data = {} |
|
|
|
|
|
|
|
|
if self.tavily_client or web_search_service.client: |
|
|
try: |
|
|
search_results = web_search_service.search(f"current information about {query}") |
|
|
if search_results: |
|
|
data['search_results'] = search_results |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Tavily search failed: {e}") |
|
|
|
|
|
|
|
|
weather_keywords = ['weather', 'temperature', 'forecast', 'climate', 'rain', 'sunny'] |
|
|
if any(keyword in query.lower() for keyword in weather_keywords): |
|
|
try: |
|
|
location = self._extract_location(query) or "New York" |
|
|
weather = weather_service.get_current_weather_cached( |
|
|
location, |
|
|
ttl_hash=weather_service._get_ttl_hash(300) |
|
|
) |
|
|
if weather: |
|
|
data['weather'] = weather |
|
|
except Exception as e: |
|
|
logger.warning(f"Weather data failed: {e}") |
|
|
|
|
|
|
|
|
data['current_datetime'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
return data |
|
|
|
|
|
def _extract_location(self, query: str) -> Optional[str]: |
|
|
"""Extract location from query""" |
|
|
locations = ['New York', 'London', 'Tokyo', 'Paris', 'Berlin', 'Sydney', |
|
|
'Los Angeles', 'Chicago', 'Miami', 'Seattle', 'Boston', |
|
|
'San Francisco', 'Toronto', 'Vancouver', 'Montreal'] |
|
|
|
|
|
for loc in locations: |
|
|
if loc.lower() in query.lower(): |
|
|
return loc |
|
|
return "New York" |
|
|
|
|
|
def get_coordination_status(self) -> Dict: |
|
|
"""Get current coordination system status""" |
|
|
return { |
|
|
'tavily_available': self.tavily_client is not None, |
|
|
'weather_available': weather_service.api_key is not None, |
|
|
'web_search_enabled': self.tavily_client is not None, |
|
|
'external_apis_configured': any([ |
|
|
weather_service.api_key, |
|
|
os.getenv("TAVILY_API_KEY") |
|
|
]) |
|
|
} |
|
|
|
|
|
def get_recent_activities(self, user_id: str) -> Dict: |
|
|
"""Get recent coordination activities for user""" |
|
|
try: |
|
|
session = session_manager.get_session(user_id) |
|
|
coord_stats = session.get('ai_coordination', {}) |
|
|
return { |
|
|
'last_request': coord_stats.get('last_coordination'), |
|
|
'requests_processed': coord_stats.get('requests_processed', 0), |
|
|
'ollama_responses': coord_stats.get('ollama_responses', 0), |
|
|
'hf_responses': coord_stats.get('hf_responses', 0) |
|
|
} |
|
|
except: |
|
|
return {} |
|
|
|
|
|
|
|
|
coordinator = AICoordinator() |
|
|
|