File size: 7,779 Bytes
fe5243a
 
9f203f4
fe5243a
9f203f4
 
fe5243a
 
9f203f4
a72fec7
fe5243a
 
 
 
 
 
 
 
9f203f4
fe5243a
 
 
 
 
 
9f203f4
 
 
 
 
 
 
 
 
fe5243a
 
 
 
f80e242
fe5243a
 
 
 
 
 
 
 
 
 
 
 
9f203f4
 
fe5243a
 
 
 
 
f80e242
 
 
 
 
 
 
 
 
 
 
fe5243a
9f203f4
fe5243a
9f203f4
fe5243a
 
 
 
 
 
9f203f4
fe5243a
9f203f4
fe5243a
9f203f4
fe5243a
 
 
9f203f4
fe5243a
9f203f4
fe5243a
 
 
 
9f203f4
 
fe5243a
9f203f4
 
fe5243a
9f203f4
7196ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe5243a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6f12b05
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# ────────────────────────────── memo/conversation.py ──────────────────────────────
"""
Conversation Management Orchestrator

Main conversation manager that coordinates session management,
context retrieval, and memory consolidation for natural conversation flow.
"""

from typing import List, Dict, Any, Tuple, Optional
import os

from utils.logger import get_logger
from utils.rag.embeddings import EmbeddingClient

logger = get_logger("CONVERSATION_MANAGER", __name__)

class ConversationManager:
    """
    Main conversation manager that orchestrates all conversation-related functionality.
    """
    
    def __init__(self, memory_system, embedder: EmbeddingClient):
        self.memory_system = memory_system
        self.embedder = embedder
        
        # Initialize sub-managers
        from memo.sessions import get_session_manager
        from memo.retrieval import get_retrieval_manager
        from memo.consolidation import get_consolidation_manager
        
        self.session_manager = get_session_manager()
        self.retrieval_manager = get_retrieval_manager(memory_system, embedder)
        self.consolidation_manager = get_consolidation_manager(memory_system, embedder)
    
    async def get_smart_context(self, user_id: str, question: str, 
                              nvidia_rotator=None, project_id: Optional[str] = None,
                              conversation_mode: str = "chat") -> Tuple[str, str, Dict[str, Any]]:
        """
        Get intelligent context for conversation with enhanced memory planning.
        
        Args:
            user_id: User identifier
            question: Current question/instruction
            nvidia_rotator: NVIDIA API rotator for AI enhancement
            project_id: Project context
            conversation_mode: "chat" or "report"
            
        Returns:
            Tuple of (recent_context, semantic_context, metadata)
        """
        try:
            return await self.retrieval_manager.get_smart_context(
                user_id, question, nvidia_rotator, project_id, conversation_mode
            )
        except Exception as e:
            logger.error(f"[CONVERSATION_MANAGER] Smart context failed: {e}")
            return "", "", {"error": str(e)}
    
    async def get_enhancement_context(self, user_id: str, question: str, 
                                    nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str, Dict[str, Any]]:
        """Get context specifically optimized for enhancement requests"""
        try:
            return await self.retrieval_manager.get_enhancement_context(
                user_id, question, nvidia_rotator, project_id
            )
        except Exception as e:
            logger.error(f"[CONVERSATION_MANAGER] Enhancement context failed: {e}")
            return "", "", {"error": str(e)}
    
    async def consolidate_memories(self, user_id: str, nvidia_rotator=None) -> Dict[str, Any]:
        """Consolidate and prune memories to prevent information overload"""
        try:
            return await self.consolidation_manager.consolidate_memories(user_id, nvidia_rotator)
        except Exception as e:
            logger.error(f"[CONVERSATION_MANAGER] Memory consolidation failed: {e}")
            return {"consolidated": 0, "pruned": 0, "error": str(e)}
    
    async def handle_context_switch(self, user_id: str, new_question: str, 
                                  nvidia_rotator=None) -> Dict[str, Any]:
        """Handle context switching when user changes topics"""
        try:
            return await self.session_manager.detect_context_switch(user_id, new_question, nvidia_rotator)
        except Exception as e:
            logger.error(f"[CONVERSATION_MANAGER] Context switch handling failed: {e}")
            return {"is_context_switch": False, "confidence": 0.0, "error": str(e)}
    
    def get_conversation_insights(self, user_id: str) -> Dict[str, Any]:
        """Get insights about the user's conversation patterns"""
        try:
            return self.session_manager.get_conversation_insights(user_id)
        except Exception as e:
            logger.error(f"[CONVERSATION_MANAGER] Failed to get conversation insights: {e}")
            return {"error": str(e)}
    
    def clear_session(self, user_id: str):
        """Clear conversation session for user"""
        try:
            self.session_manager.clear_session(user_id)
            logger.info(f"[CONVERSATION_MANAGER] Cleared session for user {user_id}")
        except Exception as e:
            logger.error(f"[CONVERSATION_MANAGER] Failed to clear session: {e}")
    
    def reset_all(self, user_id: str, project_id: str = None) -> Dict[str, Any]:
        """Reset all conversation-related components for a user"""
        try:
            results = {
                "session_cleared": False,
                "memory_cleared": False,
                "errors": []
            }
            
            # Clear session
            try:
                self.session_manager.clear_session(user_id)
                results["session_cleared"] = True
                logger.info(f"[CONVERSATION_MANAGER] Cleared session for user {user_id}")
            except Exception as e:
                error_msg = f"Failed to clear session: {e}"
                results["errors"].append(error_msg)
                logger.warning(f"[CONVERSATION_MANAGER] {error_msg}")
            
            # Clear memory using core memory system
            try:
                clear_results = self.memory_system.clear_all_memory(user_id, project_id)
                results["memory_cleared"] = clear_results.get("legacy_cleared", False) and clear_results.get("session_cleared", False)
                if clear_results.get("errors"):
                    results["errors"].extend(clear_results["errors"])
                logger.info(f"[CONVERSATION_MANAGER] Cleared memory for user {user_id}, project {project_id}")
            except Exception as e:
                error_msg = f"Failed to clear memory: {e}"
                results["errors"].append(error_msg)
                logger.warning(f"[CONVERSATION_MANAGER] {error_msg}")
            
            return results
            
        except Exception as e:
            logger.error(f"[CONVERSATION_MANAGER] Failed to reset all for user {user_id}: {e}")
            return {
                "session_cleared": False,
                "memory_cleared": False,
                "errors": [f"Critical error: {e}"]
            }


# ────────────────────────────── Global Instance ──────────────────────────────

_conversation_manager: Optional[ConversationManager] = None

def get_conversation_manager(memory_system=None, embedder: EmbeddingClient = None) -> ConversationManager:
    """Get the global conversation manager instance"""
    global _conversation_manager
    
    if _conversation_manager is None:
        if not memory_system:
            from memo.core import get_memory_system
            memory_system = get_memory_system()
        if not embedder:
            from utils.rag.embeddings import EmbeddingClient
            embedder = EmbeddingClient()
        
        _conversation_manager = ConversationManager(memory_system, embedder)
        logger.info("[CONVERSATION_MANAGER] Global conversation manager initialized")
    
    return _conversation_manager

# def reset_conversation_manager():
#     """Reset the global conversation manager (for testing)"""
#     global _conversation_manager
#     _conversation_manager = None