audit_assistant / src /reporting /feedback_schema.py
Ara Yeroyan
add src
f5df983
raw
history blame
6.99 kB
"""
Feedback Schema for RAG Chatbot
This module defines dataclasses for feedback data structures
and provides Snowflake schema generation.
"""
from dataclasses import dataclass, asdict, field
from typing import List, Optional, Dict, Any, Union
from datetime import datetime
@dataclass
class RetrievedDocument:
"""Single retrieved document metadata"""
doc_id: str
filename: str
page: int
score: float
content: str
metadata: Dict[str, Any]
@dataclass
class RetrievalEntry:
"""Single retrieval operation metadata"""
rag_query: str
documents_retrieved: List[RetrievedDocument]
conversation_length: int
filters_applied: Optional[Dict[str, Any]] = None
timestamp: Optional[float] = None
_raw_data: Optional[Dict[str, Any]] = None
@dataclass
class UserFeedback:
"""User feedback submission data"""
feedback_id: str
open_ended_feedback: Optional[str]
score: int
is_feedback_about_last_retrieval: bool
retrieved_data: List[RetrievalEntry]
conversation_id: str
timestamp: float
message_count: int
has_retrievals: bool
retrieval_count: int
user_query: Optional[str] = None
bot_response: Optional[str] = None
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary with nested data structures"""
result = asdict(self)
# Handle nested objects
if self.retrieved_data:
result['retrieved_data'] = [self._serialize_retrieval_entry(entry) for entry in self.retrieved_data]
return result
def _serialize_retrieval_entry(self, entry: RetrievalEntry) -> Dict[str, Any]:
"""Serialize retrieval entry to dict"""
# If raw data exists, use it (it's already properly formatted)
if hasattr(entry, '_raw_data') and entry._raw_data:
return entry._raw_data
# Otherwise, serialize the dataclass
result = asdict(entry)
if entry.documents_retrieved:
result['documents_retrieved'] = [asdict(doc) for doc in entry.documents_retrieved]
return result
def to_snowflake_schema(self) -> Dict[str, Any]:
"""Generate Snowflake schema for this dataclass"""
schema = {
"feedback_id": "VARCHAR(255)",
"open_ended_feedback": "VARCHAR(16777216)", # Large text
"score": "INTEGER",
"is_feedback_about_last_retrieval": "BOOLEAN",
"conversation_id": "VARCHAR(255)",
"timestamp": "NUMBER(20, 0)",
"message_count": "INTEGER",
"has_retrievals": "BOOLEAN",
"retrieval_count": "INTEGER",
"user_query": "VARCHAR(16777216)",
"bot_response": "VARCHAR(16777216)",
"created_at": "TIMESTAMP_NTZ",
"retrieved_data": "VARIANT", # Array of retrieval entries
# retrieved_data structure:
# [
# {
# "rag_query": "...",
# "conversation_length": 5,
# "timestamp": 1234567890,
# "docs_retrieved": [
# {"filename": "...", "page": 14, "score": 0.95, ...},
# ...
# ]
# },
# ...
# ]
}
return schema
@classmethod
def get_snowflake_create_table_sql(cls, table_name: str = "user_feedback") -> str:
"""Generate CREATE TABLE SQL for Snowflake"""
schema = cls.to_snowflake_schema(None)
columns = []
for col_name, col_type in schema.items():
nullable = "NULL" if col_name not in ["feedback_id", "score", "timestamp"] else "NOT NULL"
columns.append(f" {col_name} {col_type} {nullable}")
# Build SQL string properly
columns_str = ",\n".join(columns)
sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (
{columns_str},
PRIMARY KEY (feedback_id)
);
-- Create index on timestamp for querying by time
CREATE INDEX IF NOT EXISTS idx_feedback_timestamp ON {table_name} (timestamp);
-- Create index on conversation_id for querying by conversation
CREATE INDEX IF NOT EXISTS idx_feedback_conversation ON {table_name} (conversation_id);
-- Create index on score for feedback analysis
CREATE INDEX IF NOT EXISTS idx_feedback_score ON {table_name} (score);
"""
return sql
# Snowflake variant schema for retrieved_data array
RETRIEVAL_ENTRY_SCHEMA = {
"rag_query": "VARCHAR",
"documents_retrieved": "ARRAY", # Array of document objects
"conversation_length": "INTEGER",
"filters_applied": "OBJECT",
"timestamp": "NUMBER"
}
DOCUMENT_SCHEMA = {
"doc_id": "VARCHAR",
"filename": "VARCHAR",
"page": "INTEGER",
"score": "DOUBLE",
"content": "VARCHAR(16777216)",
"metadata": "OBJECT"
}
def generate_snowflake_schema_sql() -> str:
"""Generate complete Snowflake schema SQL for feedback system"""
return UserFeedback.get_snowflake_create_table_sql("user_feedback")
def create_feedback_from_dict(data: Dict[str, Any]) -> UserFeedback:
"""Create UserFeedback instance from dictionary"""
# Parse retrieved_data if present
retrieved_data = []
if "retrieved_data" in data and data["retrieved_data"]:
for entry_dict in data.get("retrieved_data", []):
# Map the actual structure from rag_retrieval_history
# Entry has: conversation_up_to, rag_query_expansion, docs_retrieved
try:
# Try to map to expected structure
entry = RetrievalEntry(
rag_query=entry_dict.get("rag_query_expansion", ""),
documents_retrieved=[], # Empty for now, will store as raw data
conversation_length=len(entry_dict.get("conversation_up_to", [])),
filters_applied=None,
timestamp=entry_dict.get("timestamp", None)
)
# Store raw data in the entry
entry._raw_data = entry_dict # Store original for preservation
retrieved_data.append(entry)
except Exception as e:
# If mapping fails, store as-is without strict typing
pass
return UserFeedback(
feedback_id=data.get("feedback_id", f"feedback_{data.get('timestamp', 'unknown')}"),
open_ended_feedback=data.get("open_ended_feedback"),
score=data["score"],
is_feedback_about_last_retrieval=data["is_feedback_about_last_retrieval"],
retrieved_data=retrieved_data,
conversation_id=data["conversation_id"],
timestamp=data["timestamp"],
message_count=data["message_count"],
has_retrievals=data["has_retrievals"],
retrieval_count=data["retrieval_count"],
user_query=data.get("user_query"),
bot_response=data.get("bot_response")
)