rdune71 commited on
Commit
e900a8d
·
1 Parent(s): 1664e95

Implement coordinated multi-model AI system with HF endpoint monitoring

Browse files
core/__pycache__/session.cpython-313.pyc CHANGED
Binary files a/core/__pycache__/session.cpython-313.pyc and b/core/__pycache__/session.cpython-313.pyc differ
 
core/coordinator.py ADDED
@@ -0,0 +1,218 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import logging
3
+ from typing import List, Dict, Optional, Tuple
4
+ from core.llm_factory import llm_factory
5
+ from core.session import session_manager
6
+ from services.hf_endpoint_monitor import hf_monitor
7
+ from services.weather import weather_service
8
+ try:
9
+ from tavily import TavilyClient
10
+ TAVILY_AVAILABLE = True
11
+ except ImportError:
12
+ TavilyClient = None
13
+ TAVILY_AVAILABLE = False
14
+ import os
15
+
16
+ logger = logging.getLogger(__name__)
17
+
18
+ class AICoordinator:
19
+ """Coordinate multiple AI models and external services"""
20
+
21
+ def __init__(self):
22
+ self.tavily_client = None
23
+ if TAVILY_AVAILABLE and os.getenv("TAVILY_API_KEY"):
24
+ self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
25
+
26
+ async def coordinate_response(self, user_id: str, user_query: str) -> Dict:
27
+ """
28
+ Coordinate Ollama (fast) and HF (deep) responses
29
+
30
+ Returns:
31
+ Dict with 'immediate_response' and 'final_response'
32
+ """
33
+ try:
34
+ # Get conversation history
35
+ session = session_manager.get_session(user_id)
36
+ conversation_history = session.get("conversation", [])
37
+
38
+ # Step 1: Gather external data with Ollama
39
+ logger.info("Step 1: Gathering external data...")
40
+ external_data = await self._gather_external_data(user_query)
41
+
42
+ # Step 2: Get immediate Ollama response (fast)
43
+ logger.info("Step 2: Getting immediate Ollama response...")
44
+ immediate_response = await self._get_ollama_response(
45
+ user_query, conversation_history, external_data
46
+ )
47
+
48
+ # Step 3: Initialize HF endpoint in background
49
+ logger.info("Step 3: Initializing HF endpoint...")
50
+ hf_task = asyncio.create_task(self._initialize_and_get_hf_response(
51
+ user_query, conversation_history, external_data, immediate_response
52
+ ))
53
+
54
+ # Return immediate response while HF processes
55
+ return {
56
+ 'immediate_response': immediate_response,
57
+ 'hf_task': hf_task, # Background task for HF processing
58
+ 'external_data': external_data
59
+ }
60
+
61
+ except Exception as e:
62
+ logger.error(f"Coordination failed: {e}")
63
+ # Fallback to simple Ollama response
64
+ immediate_response = await self._get_ollama_response(
65
+ user_query, conversation_history, {}
66
+ )
67
+ return {
68
+ 'immediate_response': immediate_response,
69
+ 'hf_task': None,
70
+ 'external_data': {}
71
+ }
72
+
73
+ async def _gather_external_data(self, query: str) -> Dict:
74
+ """Gather external data from various sources"""
75
+ data = {}
76
+
77
+ # Tavily/DuckDuckGo search
78
+ if self.tavily_client:
79
+ try:
80
+ search_result = self.tavily_client.search(query, max_results=3)
81
+ data['search_results'] = search_result.get('results', [])
82
+ except Exception as e:
83
+ logger.warning(f"Tavily search failed: {e}")
84
+
85
+ # Weather data (if location mentioned)
86
+ if 'weather' in query.lower() or 'temperature' in query.lower():
87
+ try:
88
+ # Extract location from query or use default
89
+ location = self._extract_location(query) or "New York"
90
+ weather = weather_service.get_current_weather(location)
91
+ if weather:
92
+ data['weather'] = weather
93
+ except Exception as e:
94
+ logger.warning(f"Weather data failed: {e}")
95
+
96
+ # Current date/time
97
+ from datetime import datetime
98
+ data['current_datetime'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
99
+
100
+ return data
101
+
102
+ async def _get_ollama_response(self, query: str, history: List, external_data: Dict) -> str:
103
+ """Get fast response from Ollama"""
104
+ try:
105
+ # Enhance query with external data
106
+ enhanced_query = self._enhance_query_with_data(query, external_data)
107
+
108
+ # Get Ollama provider
109
+ ollama_provider = llm_factory.get_provider('ollama')
110
+ if not ollama_provider:
111
+ raise Exception("Ollama provider not available")
112
+
113
+ # Prepare conversation with external context
114
+ enhanced_history = history.copy()
115
+ if external_data:
116
+ context_message = {
117
+ "role": "system",
118
+ "content": f"External context: {str(external_data)}"
119
+ }
120
+ enhanced_history.insert(0, context_message)
121
+
122
+ enhanced_history.append({"role": "user", "content": enhanced_query})
123
+
124
+ # Generate response
125
+ response = ollama_provider.generate(enhanced_query, enhanced_history)
126
+ return response or "I'm processing your request..."
127
+
128
+ except Exception as e:
129
+ logger.error(f"Ollama response failed: {e}")
130
+ return "I'm thinking about your question..."
131
+
132
+ async def _initialize_and_get_hf_response(self, query: str, history: List,
133
+ external_data: Dict, ollama_response: str) -> Optional[str]:
134
+ """Initialize HF endpoint and get deep analysis"""
135
+ try:
136
+ # Check if HF endpoint is available
137
+ hf_status = hf_monitor.check_endpoint_status()
138
+
139
+ if not hf_status['available']:
140
+ logger.info("HF endpoint not available, attempting to warm up...")
141
+ # Try to warm up the endpoint
142
+ warmup_success = hf_monitor.warm_up_endpoint()
143
+ if not warmup_success:
144
+ return None
145
+
146
+ # Get HF provider
147
+ hf_provider = llm_factory.get_provider('huggingface')
148
+ if not hf_provider:
149
+ return None
150
+
151
+ # Prepare enhanced conversation for HF
152
+ enhanced_history = history.copy()
153
+
154
+ # Add Ollama's initial response for HF to consider
155
+ enhanced_history.append({
156
+ "role": "assistant",
157
+ "content": f"Initial response (for reference): {ollama_response}"
158
+ })
159
+
160
+ # Add external data context
161
+ if external_data:
162
+ context_message = {
163
+ "role": "system",
164
+ "content": f"Additional context data: {str(external_data)}"
165
+ }
166
+ enhanced_history.insert(0, context_message)
167
+
168
+ # Add HF's role instruction
169
+ enhanced_history.append({
170
+ "role": "system",
171
+ "content": "You are providing deep analysis and second opinions. Consider the initial response and enhance it with deeper insights."
172
+ })
173
+
174
+ enhanced_history.append({"role": "user", "content": query})
175
+
176
+ # Generate deep response
177
+ deep_response = hf_provider.generate(query, enhanced_history)
178
+ return deep_response
179
+
180
+ except Exception as e:
181
+ logger.error(f"HF response failed: {e}")
182
+ return None
183
+
184
+ def _enhance_query_with_data(self, query: str, data: Dict) -> str:
185
+ """Enhance query with gathered external data"""
186
+ if not data:
187
+ return query
188
+
189
+ context_parts = []
190
+
191
+ if 'search_results' in data:
192
+ context_parts.append("Recent information:")
193
+ for result in data['search_results'][:2]: # Limit to 2 results
194
+ context_parts.append(f"- {result.get('title', 'Result')}: {result.get('content', '')[:100]}...")
195
+
196
+ if 'weather' in data:
197
+ weather = data['weather']
198
+ context_parts.append(f"Current weather: {weather.get('temperature', 'N/A')}°C in {weather.get('city', 'Unknown')}")
199
+
200
+ if 'current_datetime' in data:
201
+ context_parts.append(f"Current time: {data['current_datetime']}")
202
+
203
+ if context_parts:
204
+ return f"{query}\n\nContext: {' '.join(context_parts)}"
205
+
206
+ return query
207
+
208
+ def _extract_location(self, query: str) -> Optional[str]:
209
+ """Extract location from query (simple implementation)"""
210
+ # This could be enhanced with NER or more sophisticated parsing
211
+ locations = ['New York', 'London', 'Tokyo', 'Paris', 'Berlin', 'Sydney']
212
+ for loc in locations:
213
+ if loc.lower() in query.lower():
214
+ return loc
215
+ return None
216
+
217
+ # Global coordinator instance
218
+ coordinator = AICoordinator()
core/session.py CHANGED
@@ -3,6 +3,7 @@ import time
3
  from typing import Dict, Any, Optional
4
  from core.memory import load_user_state, save_user_state
5
  import logging
 
6
 
7
  # Set up logging
8
  logging.basicConfig(level=logging.INFO)
@@ -91,6 +92,51 @@ class SessionManager:
91
  logger.error(f"Error updating session for user {user_id}: {e}")
92
  return False
93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
  def clear_session(self, user_id: str) -> bool:
95
  """Clear user session data
96
  Args:
 
3
  from typing import Dict, Any, Optional
4
  from core.memory import load_user_state, save_user_state
5
  import logging
6
+ from datetime import datetime
7
 
8
  # Set up logging
9
  logging.basicConfig(level=logging.INFO)
 
92
  logger.error(f"Error updating session for user {user_id}: {e}")
93
  return False
94
 
95
+ def update_session_with_ai_coordination(self, user_id: str, ai_data: Dict) -> bool:
96
+ """Update session with AI coordination data"""
97
+ try:
98
+ # Get existing session
99
+ session = self.get_session(user_id)
100
+
101
+ # Add AI coordination tracking
102
+ if 'ai_coordination' not in session:
103
+ session['ai_coordination'] = {
104
+ 'requests_processed': 0,
105
+ 'ollama_responses': 0,
106
+ 'hf_responses': 0,
107
+ 'last_coordination': None
108
+ }
109
+
110
+ coord_data = session['ai_coordination']
111
+ coord_data['requests_processed'] += 1
112
+ coord_data['last_coordination'] = datetime.now().isoformat()
113
+
114
+ # Track response types
115
+ if 'immediate_response' in ai_data:
116
+ coord_data['ollama_responses'] += 1
117
+ if ai_data.get('hf_response'):
118
+ coord_data['hf_responses'] += 1
119
+
120
+ # Convert complex data to JSON strings for Redis
121
+ redis_data = {}
122
+ for key, value in session.items():
123
+ if isinstance(value, (dict, list)):
124
+ redis_data[key] = json.dumps(value)
125
+ else:
126
+ redis_data[key] = value
127
+
128
+ # Save updated session
129
+ result = save_user_state(user_id, redis_data)
130
+ if result:
131
+ logger.debug(f"Successfully updated coordination session for user {user_id}")
132
+ else:
133
+ logger.warning(f"Failed to save coordination session for user {user_id}")
134
+
135
+ return result
136
+ except Exception as e:
137
+ logger.error(f"Error updating coordination session for user {user_id}: {e}")
138
+ return False
139
+
140
  def clear_session(self, user_id: str) -> bool:
141
  """Clear user session data
142
  Args:
demo_coordinated_ai.py ADDED
@@ -0,0 +1,84 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import sys
3
+ from pathlib import Path
4
+
5
+ # Add project root to path
6
+ project_root = Path(__file__).parent
7
+ sys.path.append(str(project_root))
8
+
9
+ from core.coordinator import coordinator
10
+ from core.session import session_manager
11
+ from services.hf_endpoint_monitor import hf_monitor
12
+
13
+ async def demo_coordinated_response():
14
+ """Demonstrate the coordinated AI response system"""
15
+ print("=== AI Life Coach Coordinated Response Demo ===")
16
+ print()
17
+
18
+ user_id = "demo_user"
19
+ user_query = "What's the weather like in New York today and how should I plan my day?"
20
+
21
+ print(f"User Query: {user_query}")
22
+ print()
23
+
24
+ # Show HF endpoint status
25
+ print("HF Endpoint Status:")
26
+ print(hf_monitor.get_status_summary())
27
+ print()
28
+
29
+ # Coordinate responses
30
+ print("Coordinating AI responses...")
31
+ coordination_result = await coordinator.coordinate_response(user_id, user_query)
32
+
33
+ # Show immediate response
34
+ print("Immediate Response (Ollama):")
35
+ print(coordination_result['immediate_response'])
36
+ print()
37
+
38
+ # Show external data gathered
39
+ print("External Data Gathered:")
40
+ for key, value in coordination_result['external_data'].items():
41
+ print(f" {key}: {value}")
42
+ print()
43
+
44
+ # Update session with coordination data
45
+ session_manager.update_session_with_ai_coordination(user_id, {
46
+ 'immediate_response': coordination_result['immediate_response'],
47
+ 'external_data': coordination_result['external_data']
48
+ })
49
+
50
+ # If HF task is available, wait for it
51
+ hf_task = coordination_result.get('hf_task')
52
+ if hf_task:
53
+ print("Waiting for deep analysis from HF endpoint...")
54
+ try:
55
+ hf_response = await hf_task
56
+ if hf_response:
57
+ print("Deep Analysis Response (HF Endpoint):")
58
+ print(hf_response)
59
+ print()
60
+
61
+ # Update session with HF response
62
+ session_manager.update_session_with_ai_coordination(user_id, {
63
+ 'hf_response': hf_response
64
+ })
65
+ else:
66
+ print("HF Endpoint did not provide a response (may still be initializing)")
67
+ except Exception as e:
68
+ print(f"Error getting HF response: {e}")
69
+
70
+ # Show session coordination data
71
+ session = session_manager.get_session(user_id)
72
+ if 'ai_coordination' in session:
73
+ coord_data = session['ai_coordination']
74
+ print("AI Coordination Statistics:")
75
+ print(f" Requests Processed: {coord_data['requests_processed']}")
76
+ print(f" Ollama Responses: {coord_data['ollama_responses']}")
77
+ print(f" HF Responses: {coord_data['hf_responses']}")
78
+ print(f" Last Coordination: {coord_data['last_coordination']}")
79
+
80
+ print()
81
+ print("🎉 Demo completed successfully!")
82
+
83
+ if __name__ == "__main__":
84
+ asyncio.run(demo_coordinated_response())
services/hf_endpoint_monitor.py ADDED
@@ -0,0 +1,109 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import time
3
+ import logging
4
+ from typing import Dict, Optional
5
+ from utils.config import config
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+ class HFEndpointMonitor:
10
+ """Monitor Hugging Face endpoint status and health"""
11
+
12
+ def __init__(self):
13
+ self.endpoint_url = config.hf_api_url
14
+ self.hf_token = config.hf_token
15
+ self.is_initialized = False
16
+ self.last_check = 0
17
+ self.check_interval = 60 # Check every minute
18
+
19
+ def check_endpoint_status(self) -> Dict:
20
+ """Check if HF endpoint is available and initialized"""
21
+ try:
22
+ # Check if endpoint exists and is responsive
23
+ headers = {"Authorization": f"Bearer {self.hf_token}"}
24
+
25
+ # Simple model list check (doesn't trigger initialization)
26
+ response = requests.get(
27
+ f"{self.endpoint_url}/models",
28
+ headers=headers,
29
+ timeout=10
30
+ )
31
+
32
+ status_info = {
33
+ 'available': response.status_code == 200,
34
+ 'status_code': response.status_code,
35
+ 'initialized': self._is_endpoint_initialized(response),
36
+ 'timestamp': time.time()
37
+ }
38
+
39
+ logger.info(f"HF Endpoint Status: {status_info}")
40
+ return status_info
41
+
42
+ except Exception as e:
43
+ logger.error(f"HF endpoint check failed: {e}")
44
+ return {
45
+ 'available': False,
46
+ 'status_code': None,
47
+ 'initialized': False,
48
+ 'error': str(e),
49
+ 'timestamp': time.time()
50
+ }
51
+
52
+ def _is_endpoint_initialized(self, response) -> bool:
53
+ """Determine if endpoint is fully initialized"""
54
+ # If we get a model list, it's likely initialized
55
+ try:
56
+ data = response.json()
57
+ return 'data' in data or 'models' in data
58
+ except:
59
+ return False
60
+
61
+ def warm_up_endpoint(self) -> bool:
62
+ """Send a warm-up request to initialize the endpoint"""
63
+ try:
64
+ logger.info("Warming up HF endpoint...")
65
+ headers = {
66
+ "Authorization": f"Bearer {self.hf_token}",
67
+ "Content-Type": "application/json"
68
+ }
69
+
70
+ # Simple test request to trigger initialization
71
+ payload = {
72
+ "model": "meta-llama/Llama-2-7b-chat-hf", # Adjust as needed
73
+ "messages": [{"role": "user", "content": "Hello"}],
74
+ "max_tokens": 10
75
+ }
76
+
77
+ response = requests.post(
78
+ f"{self.endpoint_url}/chat/completions",
79
+ headers=headers,
80
+ json=payload,
81
+ timeout=30
82
+ )
83
+
84
+ success = response.status_code in [200, 201]
85
+ if success:
86
+ self.is_initialized = True
87
+ logger.info("✅ HF endpoint warmed up successfully")
88
+ else:
89
+ logger.warning(f"⚠️ HF endpoint warm-up response: {response.status_code}")
90
+
91
+ return success
92
+
93
+ except Exception as e:
94
+ logger.error(f"HF endpoint warm-up failed: {e}")
95
+ return False
96
+
97
+ def get_status_summary(self) -> str:
98
+ """Get human-readable status summary"""
99
+ status = self.check_endpoint_status()
100
+ if status['available']:
101
+ if status.get('initialized', False):
102
+ return "🟢 HF Endpoint: Available and Initialized"
103
+ else:
104
+ return "🟡 HF Endpoint: Available but Initializing"
105
+ else:
106
+ return "🔴 HF Endpoint: Unavailable"
107
+
108
+ # Global instance
109
+ hf_monitor = HFEndpointMonitor()
test_hf_monitor.py ADDED
@@ -0,0 +1,42 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
+ from pathlib import Path
3
+
4
+ # Add project root to path
5
+ project_root = Path(__file__).parent
6
+ sys.path.append(str(project_root))
7
+
8
+ from services.hf_endpoint_monitor import hf_monitor
9
+
10
+ def test_hf_monitor():
11
+ """Test the HF endpoint monitor"""
12
+ print("=== HF Endpoint Monitor Test ===")
13
+ print()
14
+
15
+ # Show current status
16
+ print("Current HF Endpoint Status:")
17
+ status = hf_monitor.check_endpoint_status()
18
+ print(f" Available: {status['available']}")
19
+ print(f" Status Code: {status['status_code']}")
20
+ print(f" Initialized: {status.get('initialized', 'Unknown')}")
21
+ if 'error' in status:
22
+ print(f" Error: {status['error']}")
23
+ print()
24
+
25
+ # Show human-readable status
26
+ print("Human-Readable Status:")
27
+ print(hf_monitor.get_status_summary())
28
+ print()
29
+
30
+ # Try to warm up endpoint if not available
31
+ if not status['available']:
32
+ print("Attempting to warm up endpoint...")
33
+ success = hf_monitor.warm_up_endpoint()
34
+ print(f"Warm-up result: {'Success' if success else 'Failed'}")
35
+ print()
36
+
37
+ # Check status again
38
+ print("Status after warm-up attempt:")
39
+ print(hf_monitor.get_status_summary())
40
+
41
+ if __name__ == "__main__":
42
+ test_hf_monitor()