Spaces:
Sleeping
Sleeping
| """ | |
| Feedback Schema for RAG Chatbot | |
| This module defines dataclasses for feedback data structures | |
| and provides Snowflake schema generation. | |
| """ | |
| from dataclasses import dataclass, asdict, field | |
| from typing import List, Optional, Dict, Any, Union | |
| from datetime import datetime | |
| class RetrievedDocument: | |
| """Single retrieved document metadata""" | |
| doc_id: str | |
| filename: str | |
| page: int | |
| score: float | |
| content: str | |
| metadata: Dict[str, Any] | |
| class RetrievalEntry: | |
| """Single retrieval operation metadata""" | |
| rag_query: str | |
| documents_retrieved: List[RetrievedDocument] | |
| conversation_length: int | |
| filters_applied: Optional[Dict[str, Any]] = None | |
| timestamp: Optional[float] = None | |
| _raw_data: Optional[Dict[str, Any]] = None | |
| class UserFeedback: | |
| """User feedback submission data""" | |
| feedback_id: str | |
| open_ended_feedback: Optional[str] | |
| score: int | |
| is_feedback_about_last_retrieval: bool | |
| retrieved_data: List[RetrievalEntry] | |
| conversation_id: str | |
| timestamp: float | |
| message_count: int | |
| has_retrievals: bool | |
| retrieval_count: int | |
| user_query: Optional[str] = None | |
| bot_response: Optional[str] = None | |
| created_at: str = field(default_factory=lambda: datetime.now().isoformat()) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary with nested data structures""" | |
| result = asdict(self) | |
| # Handle nested objects | |
| if self.retrieved_data: | |
| result['retrieved_data'] = [self._serialize_retrieval_entry(entry) for entry in self.retrieved_data] | |
| return result | |
| def _serialize_retrieval_entry(self, entry: RetrievalEntry) -> Dict[str, Any]: | |
| """Serialize retrieval entry to dict""" | |
| # If raw data exists, use it (it's already properly formatted) | |
| if hasattr(entry, '_raw_data') and entry._raw_data: | |
| return entry._raw_data | |
| # Otherwise, serialize the dataclass | |
| result = asdict(entry) | |
| if entry.documents_retrieved: | |
| result['documents_retrieved'] = [asdict(doc) for doc in entry.documents_retrieved] | |
| return result | |
| def to_snowflake_schema(self) -> Dict[str, Any]: | |
| """Generate Snowflake schema for this dataclass""" | |
| schema = { | |
| "feedback_id": "VARCHAR(255)", | |
| "open_ended_feedback": "VARCHAR(16777216)", # Large text | |
| "score": "INTEGER", | |
| "is_feedback_about_last_retrieval": "BOOLEAN", | |
| "conversation_id": "VARCHAR(255)", | |
| "timestamp": "NUMBER(20, 0)", | |
| "message_count": "INTEGER", | |
| "has_retrievals": "BOOLEAN", | |
| "retrieval_count": "INTEGER", | |
| "user_query": "VARCHAR(16777216)", | |
| "bot_response": "VARCHAR(16777216)", | |
| "created_at": "TIMESTAMP_NTZ", | |
| "retrieved_data": "VARIANT", # Array of retrieval entries | |
| # retrieved_data structure: | |
| # [ | |
| # { | |
| # "rag_query": "...", | |
| # "conversation_length": 5, | |
| # "timestamp": 1234567890, | |
| # "docs_retrieved": [ | |
| # {"filename": "...", "page": 14, "score": 0.95, ...}, | |
| # ... | |
| # ] | |
| # }, | |
| # ... | |
| # ] | |
| } | |
| return schema | |
| def get_snowflake_create_table_sql(cls, table_name: str = "user_feedback") -> str: | |
| """Generate CREATE TABLE SQL for Snowflake""" | |
| schema = cls.to_snowflake_schema(None) | |
| columns = [] | |
| for col_name, col_type in schema.items(): | |
| nullable = "NULL" if col_name not in ["feedback_id", "score", "timestamp"] else "NOT NULL" | |
| columns.append(f" {col_name} {col_type} {nullable}") | |
| # Build SQL string properly | |
| columns_str = ",\n".join(columns) | |
| sql = f"""CREATE TABLE IF NOT EXISTS {table_name} ( | |
| {columns_str}, | |
| PRIMARY KEY (feedback_id) | |
| ); | |
| -- Create index on timestamp for querying by time | |
| CREATE INDEX IF NOT EXISTS idx_feedback_timestamp ON {table_name} (timestamp); | |
| -- Create index on conversation_id for querying by conversation | |
| CREATE INDEX IF NOT EXISTS idx_feedback_conversation ON {table_name} (conversation_id); | |
| -- Create index on score for feedback analysis | |
| CREATE INDEX IF NOT EXISTS idx_feedback_score ON {table_name} (score); | |
| """ | |
| return sql | |
| # Snowflake variant schema for retrieved_data array | |
| RETRIEVAL_ENTRY_SCHEMA = { | |
| "rag_query": "VARCHAR", | |
| "documents_retrieved": "ARRAY", # Array of document objects | |
| "conversation_length": "INTEGER", | |
| "filters_applied": "OBJECT", | |
| "timestamp": "NUMBER" | |
| } | |
| DOCUMENT_SCHEMA = { | |
| "doc_id": "VARCHAR", | |
| "filename": "VARCHAR", | |
| "page": "INTEGER", | |
| "score": "DOUBLE", | |
| "content": "VARCHAR(16777216)", | |
| "metadata": "OBJECT" | |
| } | |
| def generate_snowflake_schema_sql() -> str: | |
| """Generate complete Snowflake schema SQL for feedback system""" | |
| return UserFeedback.get_snowflake_create_table_sql("user_feedback") | |
| def create_feedback_from_dict(data: Dict[str, Any]) -> UserFeedback: | |
| """Create UserFeedback instance from dictionary""" | |
| # Parse retrieved_data if present | |
| retrieved_data = [] | |
| if "retrieved_data" in data and data["retrieved_data"]: | |
| for entry_dict in data.get("retrieved_data", []): | |
| # Map the actual structure from rag_retrieval_history | |
| # Entry has: conversation_up_to, rag_query_expansion, docs_retrieved | |
| try: | |
| # Try to map to expected structure | |
| entry = RetrievalEntry( | |
| rag_query=entry_dict.get("rag_query_expansion", ""), | |
| documents_retrieved=[], # Empty for now, will store as raw data | |
| conversation_length=len(entry_dict.get("conversation_up_to", [])), | |
| filters_applied=None, | |
| timestamp=entry_dict.get("timestamp", None) | |
| ) | |
| # Store raw data in the entry | |
| entry._raw_data = entry_dict # Store original for preservation | |
| retrieved_data.append(entry) | |
| except Exception as e: | |
| # If mapping fails, store as-is without strict typing | |
| pass | |
| return UserFeedback( | |
| feedback_id=data.get("feedback_id", f"feedback_{data.get('timestamp', 'unknown')}"), | |
| open_ended_feedback=data.get("open_ended_feedback"), | |
| score=data["score"], | |
| is_feedback_about_last_retrieval=data["is_feedback_about_last_retrieval"], | |
| retrieved_data=retrieved_data, | |
| conversation_id=data["conversation_id"], | |
| timestamp=data["timestamp"], | |
| message_count=data["message_count"], | |
| has_retrievals=data["has_retrievals"], | |
| retrieval_count=data["retrieval_count"], | |
| user_query=data.get("user_query"), | |
| bot_response=data.get("bot_response") | |
| ) | |