Spaces:
Sleeping
Sleeping
| # ────────────────────────────── memo/sessions.py ────────────────────────────── | |
| """ | |
| Conversation Session Management | |
| Handles conversation session tracking, context switching detection, | |
| and conversation insights. | |
| """ | |
| import re, os | |
| import time | |
| from typing import Dict, Any, Tuple, Optional | |
| from utils.logger import get_logger | |
| logger = get_logger("SESSION_MANAGER", __name__) | |
| class SessionManager: | |
| """ | |
| Manages conversation sessions and tracks conversation state. | |
| """ | |
| def __init__(self): | |
| self.conversation_sessions = {} # Track active conversation sessions | |
| self.context_cache = {} # Cache recent context for performance | |
| def get_or_create_session(self, user_id: str, question: str, conversation_mode: str) -> Dict[str, Any]: | |
| """Get or create conversation session for user""" | |
| current_time = time.time() | |
| if user_id not in self.conversation_sessions: | |
| # New session | |
| self.conversation_sessions[user_id] = { | |
| "session_id": f"{user_id}_{int(current_time)}", | |
| "start_time": current_time, | |
| "last_activity": current_time, | |
| "message_count": 0, | |
| "context_switches": 0, | |
| "depth": 0, | |
| "enhancement_rate": 0.0, | |
| "conversation_mode": conversation_mode, | |
| "last_question": "", | |
| "is_continuation": False | |
| } | |
| return self.conversation_sessions[user_id] | |
| session = self.conversation_sessions[user_id] | |
| # Check if this is a continuation (within 30 minutes and same mode) | |
| time_since_last = current_time - session["last_activity"] | |
| is_continuation = (time_since_last < 1800 and # 30 minutes | |
| session["conversation_mode"] == conversation_mode) | |
| session["is_continuation"] = is_continuation | |
| session["last_activity"] = current_time | |
| session["message_count"] += 1 | |
| return session | |
| def update_session(self, user_id: str, original_question: str, | |
| enhanced_input: str, context_used: bool): | |
| """Update session with new information""" | |
| if user_id not in self.conversation_sessions: | |
| return | |
| session = self.conversation_sessions[user_id] | |
| session["last_question"] = original_question | |
| session["depth"] += 1 | |
| # Update enhancement rate | |
| total_enhancements = session.get("total_enhancements", 0) | |
| if context_used: | |
| total_enhancements += 1 | |
| session["total_enhancements"] = total_enhancements | |
| session["enhancement_rate"] = total_enhancements / session["message_count"] | |
| async def detect_context_switch(self, user_id: str, new_question: str, | |
| nvidia_rotator=None) -> Dict[str, Any]: | |
| """Detect if user has switched context/topic""" | |
| try: | |
| session_info = self.conversation_sessions.get(user_id, {}) | |
| if not session_info: | |
| return {"is_context_switch": False, "confidence": 0.0} | |
| # Check if this is a context switch | |
| is_switch, confidence = await self._detect_context_switch( | |
| session_info.get("last_question", ""), new_question, nvidia_rotator, user_id | |
| ) | |
| if is_switch and confidence > 0.7: | |
| # Clear recent context cache for fresh start | |
| self.context_cache.pop(user_id, None) | |
| # Update session to indicate context switch | |
| session_info["context_switches"] = session_info.get("context_switches", 0) + 1 | |
| session_info["last_context_switch"] = time.time() | |
| logger.info(f"[SESSION_MANAGER] Context switch detected for user {user_id} (confidence: {confidence:.2f})") | |
| return { | |
| "is_context_switch": True, | |
| "confidence": confidence, | |
| "switch_count": session_info["context_switches"] | |
| } | |
| return {"is_context_switch": False, "confidence": confidence} | |
| except Exception as e: | |
| logger.error(f"[SESSION_MANAGER] Context switch detection failed: {e}") | |
| return {"is_context_switch": False, "confidence": 0.0, "error": str(e)} | |
| def get_conversation_insights(self, user_id: str) -> Dict[str, Any]: | |
| """Get insights about the user's conversation patterns""" | |
| try: | |
| session_info = self.conversation_sessions.get(user_id, {}) | |
| if not session_info: | |
| return {"status": "no_active_session"} | |
| return { | |
| "session_duration": time.time() - session_info.get("start_time", time.time()), | |
| "message_count": session_info.get("message_count", 0), | |
| "context_switches": session_info.get("context_switches", 0), | |
| "last_activity": session_info.get("last_activity", 0), | |
| "conversation_depth": session_info.get("depth", 0), | |
| "enhancement_rate": session_info.get("enhancement_rate", 0.0) | |
| } | |
| except Exception as e: | |
| logger.error(f"[SESSION_MANAGER] Failed to get conversation insights: {e}") | |
| return {"error": str(e)} | |
| def clear_session(self, user_id: str): | |
| """Clear session for user""" | |
| if user_id in self.conversation_sessions: | |
| del self.conversation_sessions[user_id] | |
| if user_id in self.context_cache: | |
| del self.context_cache[user_id] | |
| # ────────────────────────────── Private Helper Methods ────────────────────────────── | |
| async def _detect_context_switch(self, last_question: str, new_question: str, | |
| nvidia_rotator, user_id: str = "") -> Tuple[bool, float]: | |
| """Detect if user has switched context/topic""" | |
| try: | |
| if not last_question or not new_question: | |
| return False, 0.0 | |
| if nvidia_rotator: | |
| try: | |
| from utils.api.router import generate_answer_with_model | |
| from utils.analytics import get_analytics_tracker | |
| # Track memory agent usage | |
| tracker = get_analytics_tracker() | |
| if tracker and user_id: | |
| await tracker.track_agent_usage( | |
| user_id=user_id, | |
| agent_name="memory", | |
| action="detect", | |
| context="context_switch_detection", | |
| metadata={"last_question": last_question[:100], "new_question": new_question[:100]} | |
| ) | |
| sys_prompt = """You are an expert at detecting context switches in conversations. | |
| Given two consecutive questions, determine if the user has switched to a completely different topic or context. | |
| Consider: | |
| - Different subject matter | |
| - Different intent or goal | |
| - No logical connection between questions | |
| - Change in conversation direction | |
| Respond with a JSON object: {"is_context_switch": true/false, "confidence": 0.0-1.0}""" | |
| user_prompt = f"""PREVIOUS QUESTION: {last_question} | |
| CURRENT QUESTION: {new_question} | |
| Is this a context switch?""" | |
| selection = {"provider": "nvidia", "model": os.getenv("NVIDIA_SMALL", "meta/llama-3.1-8b-instruct")} | |
| response = await generate_answer_with_model( | |
| selection=selection, | |
| system_prompt=sys_prompt, | |
| user_prompt=user_prompt, | |
| gemini_rotator=None, | |
| nvidia_rotator=nvidia_rotator, | |
| user_id=user_id, | |
| context="context_switch_detection" | |
| ) | |
| # Parse JSON response | |
| import json | |
| try: | |
| result = json.loads(response.strip()) | |
| return result.get("is_context_switch", False), result.get("confidence", 0.0) | |
| except: | |
| pass | |
| except Exception as e: | |
| logger.warning(f"[SESSION_MANAGER] Context switch detection failed: {e}") | |
| # Fallback: simple keyword-based detection | |
| return self._simple_context_switch_detection(last_question, new_question) | |
| except Exception as e: | |
| logger.warning(f"[SESSION_MANAGER] Context switch detection failed: {e}") | |
| return False, 0.0 | |
| def _simple_context_switch_detection(self, last_question: str, new_question: str) -> Tuple[bool, float]: | |
| """Simple keyword-based context switch detection""" | |
| try: | |
| # Extract keywords from both questions | |
| last_words = set(re.findall(r'\b\w+\b', last_question.lower())) | |
| new_words = set(re.findall(r'\b\w+\b', new_question.lower())) | |
| # Calculate overlap | |
| overlap = len(last_words.intersection(new_words)) | |
| total_unique = len(last_words.union(new_words)) | |
| if total_unique == 0: | |
| return False, 0.0 | |
| similarity = overlap / total_unique | |
| # Context switch if similarity is very low | |
| is_switch = similarity < 0.1 | |
| confidence = 1.0 - similarity if is_switch else similarity | |
| return is_switch, confidence | |
| except Exception as e: | |
| logger.warning(f"[SESSION_MANAGER] Simple context switch detection failed: {e}") | |
| return False, 0.0 | |
| # ────────────────────────────── Global Instance ────────────────────────────── | |
| _session_manager: Optional[SessionManager] = None | |
| def get_session_manager() -> SessionManager: | |
| """Get the global session manager instance""" | |
| global _session_manager | |
| if _session_manager is None: | |
| _session_manager = SessionManager() | |
| logger.info("[SESSION_MANAGER] Global session manager initialized") | |
| return _session_manager | |
| # def reset_session_manager(): | |
| # """Reset the global session manager (for testing)""" | |
| # global _session_manager | |
| # _session_manager = None | |