""" Feedback Schema for RAG Chatbot This module defines dataclasses for feedback data structures and provides Snowflake schema generation. """ from dataclasses import dataclass, asdict, field from typing import List, Optional, Dict, Any, Union from datetime import datetime @dataclass class RetrievedDocument: """Single retrieved document metadata""" doc_id: str filename: str page: int score: float content: str metadata: Dict[str, Any] @dataclass class RetrievalEntry: """Single retrieval operation metadata""" rag_query: str documents_retrieved: List[RetrievedDocument] conversation_length: int filters_applied: Optional[Dict[str, Any]] = None timestamp: Optional[float] = None _raw_data: Optional[Dict[str, Any]] = None @dataclass class UserFeedback: """User feedback submission data""" feedback_id: str open_ended_feedback: Optional[str] score: int is_feedback_about_last_retrieval: bool retrieved_data: List[RetrievalEntry] conversation_id: str timestamp: float message_count: int has_retrievals: bool retrieval_count: int user_query: Optional[str] = None bot_response: Optional[str] = None created_at: str = field(default_factory=lambda: datetime.now().isoformat()) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary with nested data structures""" result = asdict(self) # Handle nested objects if self.retrieved_data: result['retrieved_data'] = [self._serialize_retrieval_entry(entry) for entry in self.retrieved_data] return result def _serialize_retrieval_entry(self, entry: RetrievalEntry) -> Dict[str, Any]: """Serialize retrieval entry to dict""" # If raw data exists, use it (it's already properly formatted) if hasattr(entry, '_raw_data') and entry._raw_data: return entry._raw_data # Otherwise, serialize the dataclass result = asdict(entry) if entry.documents_retrieved: result['documents_retrieved'] = [asdict(doc) for doc in entry.documents_retrieved] return result def to_snowflake_schema(self) -> Dict[str, Any]: """Generate Snowflake schema for this dataclass""" schema = { "feedback_id": "VARCHAR(255)", "open_ended_feedback": "VARCHAR(16777216)", # Large text "score": "INTEGER", "is_feedback_about_last_retrieval": "BOOLEAN", "conversation_id": "VARCHAR(255)", "timestamp": "NUMBER(20, 0)", "message_count": "INTEGER", "has_retrievals": "BOOLEAN", "retrieval_count": "INTEGER", "user_query": "VARCHAR(16777216)", "bot_response": "VARCHAR(16777216)", "created_at": "TIMESTAMP_NTZ", "retrieved_data": "VARIANT", # Array of retrieval entries # retrieved_data structure: # [ # { # "rag_query": "...", # "conversation_length": 5, # "timestamp": 1234567890, # "docs_retrieved": [ # {"filename": "...", "page": 14, "score": 0.95, ...}, # ... # ] # }, # ... # ] } return schema @classmethod def get_snowflake_create_table_sql(cls, table_name: str = "user_feedback") -> str: """Generate CREATE TABLE SQL for Snowflake""" schema = cls.to_snowflake_schema(None) columns = [] for col_name, col_type in schema.items(): nullable = "NULL" if col_name not in ["feedback_id", "score", "timestamp"] else "NOT NULL" columns.append(f" {col_name} {col_type} {nullable}") # Build SQL string properly columns_str = ",\n".join(columns) sql = f"""CREATE TABLE IF NOT EXISTS {table_name} ( {columns_str}, PRIMARY KEY (feedback_id) ); -- Create index on timestamp for querying by time CREATE INDEX IF NOT EXISTS idx_feedback_timestamp ON {table_name} (timestamp); -- Create index on conversation_id for querying by conversation CREATE INDEX IF NOT EXISTS idx_feedback_conversation ON {table_name} (conversation_id); -- Create index on score for feedback analysis CREATE INDEX IF NOT EXISTS idx_feedback_score ON {table_name} (score); """ return sql # Snowflake variant schema for retrieved_data array RETRIEVAL_ENTRY_SCHEMA = { "rag_query": "VARCHAR", "documents_retrieved": "ARRAY", # Array of document objects "conversation_length": "INTEGER", "filters_applied": "OBJECT", "timestamp": "NUMBER" } DOCUMENT_SCHEMA = { "doc_id": "VARCHAR", "filename": "VARCHAR", "page": "INTEGER", "score": "DOUBLE", "content": "VARCHAR(16777216)", "metadata": "OBJECT" } def generate_snowflake_schema_sql() -> str: """Generate complete Snowflake schema SQL for feedback system""" return UserFeedback.get_snowflake_create_table_sql("user_feedback") def create_feedback_from_dict(data: Dict[str, Any]) -> UserFeedback: """Create UserFeedback instance from dictionary""" # Parse retrieved_data if present retrieved_data = [] if "retrieved_data" in data and data["retrieved_data"]: for entry_dict in data.get("retrieved_data", []): # Map the actual structure from rag_retrieval_history # Entry has: conversation_up_to, rag_query_expansion, docs_retrieved try: # Try to map to expected structure entry = RetrievalEntry( rag_query=entry_dict.get("rag_query_expansion", ""), documents_retrieved=[], # Empty for now, will store as raw data conversation_length=len(entry_dict.get("conversation_up_to", [])), filters_applied=None, timestamp=entry_dict.get("timestamp", None) ) # Store raw data in the entry entry._raw_data = entry_dict # Store original for preservation retrieved_data.append(entry) except Exception as e: # If mapping fails, store as-is without strict typing pass return UserFeedback( feedback_id=data.get("feedback_id", f"feedback_{data.get('timestamp', 'unknown')}"), open_ended_feedback=data.get("open_ended_feedback"), score=data["score"], is_feedback_about_last_retrieval=data["is_feedback_about_last_retrieval"], retrieved_data=retrieved_data, conversation_id=data["conversation_id"], timestamp=data["timestamp"], message_count=data["message_count"], has_retrievals=data["has_retrievals"], retrieval_count=data["retrieval_count"], user_query=data.get("user_query"), bot_response=data.get("bot_response") )