audit_assistant / src /reporting /snowflake_connector.py
Ara Yeroyan
add src
f5df983
raw
history blame
12.8 kB
"""
Snowflake Connector for Feedback System
This module handles inserting user feedback into Snowflake.
"""
import os
import json
import logging
from typing import Dict, Any, Optional
from src.reporting.feedback_schema import UserFeedback
# Try to import snowflake connector
try:
import snowflake.connector
SNOWFLAKE_AVAILABLE = True
except ImportError:
SNOWFLAKE_AVAILABLE = False
logging.warning("⚠️ snowflake-connector-python not installed. Install with: pip install snowflake-connector-python")
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SnowflakeFeedbackConnector:
"""Connector for inserting feedback into Snowflake"""
def __init__(
self,
user: str,
password: str,
account: str,
warehouse: str,
database: str = "SNOWFLAKE_LEARNING",
schema: str = "PUBLIC"
):
self.user = user
self.password = password
self.account = account
self.warehouse = warehouse
self.database = database
self.schema = schema
self._connection = None
def connect(self):
"""Establish Snowflake connection"""
if not SNOWFLAKE_AVAILABLE:
raise ImportError("snowflake-connector-python is not installed. Install with: pip install snowflake-connector-python")
logger.info("=" * 80)
logger.info("πŸ”Œ SNOWFLAKE CONNECTION: Attempting to connect...")
logger.info(f" - Account: {self.account}")
logger.info(f" - Warehouse: {self.warehouse}")
logger.info(f" - Database: {self.database}")
logger.info(f" - Schema: {self.schema}")
logger.info(f" - User: {self.user}")
try:
self._connection = snowflake.connector.connect(
user=self.user,
password=self.password,
account=self.account,
warehouse=self.warehouse
# Don't set database/schema in connection - we'll do it per query
)
logger.info("βœ… SNOWFLAKE CONNECTION: Successfully connected")
logger.info("=" * 80)
print(f"βœ… Connected to Snowflake: {self.database}.{self.schema}")
except Exception as e:
logger.error(f"❌ SNOWFLAKE CONNECTION FAILED: {e}")
logger.error("=" * 80)
print(f"❌ Failed to connect to Snowflake: {e}")
raise
def disconnect(self):
"""Close Snowflake connection"""
if self._connection:
self._connection.close()
print("βœ… Disconnected from Snowflake")
def insert_feedback(self, feedback: UserFeedback) -> bool:
"""Insert a single feedback record into Snowflake"""
logger.info("=" * 80)
logger.info("πŸ”„ SNOWFLAKE INSERT: Starting feedback insertion process")
logger.info(f"πŸ“ Feedback ID: {feedback.feedback_id}")
if not self._connection:
logger.error("❌ Not connected to Snowflake. Call connect() first.")
raise RuntimeError("Not connected to Snowflake. Call connect() first.")
try:
logger.info("πŸ“Š VALIDATION: Validating feedback data structure...")
# Validate feedback object
validation_errors = []
if not feedback.feedback_id:
validation_errors.append("Missing feedback_id")
if feedback.score is None:
validation_errors.append("Missing score")
if feedback.timestamp is None:
validation_errors.append("Missing timestamp")
if validation_errors:
logger.error(f"❌ VALIDATION FAILED: {validation_errors}")
return False
else:
logger.info("βœ… VALIDATION PASSED: All required fields present")
logger.info("πŸ“‹ Data Summary:")
logger.info(f" - Feedback ID: {feedback.feedback_id}")
logger.info(f" - Score: {feedback.score}")
logger.info(f" - Conversation ID: {feedback.conversation_id}")
logger.info(f" - Has Retrievals: {feedback.has_retrievals}")
logger.info(f" - Retrieval Count: {feedback.retrieval_count}")
logger.info(f" - Message Count: {feedback.message_count}")
logger.info(f" - Timestamp: {feedback.timestamp}")
cursor = self._connection.cursor()
logger.info("βœ… SNOWFLAKE CONNECTION: Cursor created")
# Set database and schema context
logger.info(f"πŸ”§ SETTING CONTEXT: Database={self.database}, Schema={self.schema}")
try:
cursor.execute(f'USE DATABASE "{self.database}"')
cursor.execute(f'USE SCHEMA "{self.schema}"')
cursor.execute("SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()")
current_db, current_schema = cursor.fetchone()
logger.info(f"βœ… Current context verified: Database={current_db}, Schema={current_schema}")
except Exception as e:
logger.error(f"❌ Could not set context: {e}")
raise
# Prepare data
logger.info("πŸ”§ DATA PREPARATION: Preparing retrieved_data...")
retrieved_data_raw = feedback.to_dict()['retrieved_data']
logger.info(f" - Retrieved data type (raw): {type(retrieved_data_raw).__name__}")
logger.info(f" - Retrieved data: {repr(retrieved_data_raw)[:200]}")
# If retrieved_data is already a string (from UI), parse it
if isinstance(retrieved_data_raw, str):
logger.info(" - Parsing string to Python object")
retrieved_data = json.loads(retrieved_data_raw)
elif retrieved_data_raw is None:
retrieved_data = None
else:
# It's already a Python object (list/dict)
logger.info(" - Data is already a Python object")
retrieved_data = retrieved_data_raw
logger.info(f" - Retrieved data size: {len(str(retrieved_data)) if retrieved_data else 0} characters")
logger.info(f" - Retrieved data type: {type(retrieved_data).__name__}")
# Convert to JSON string for TEXT column
if retrieved_data:
retrieved_data_for_db = json.dumps(retrieved_data)
logger.info(f" - Converting to JSON string for TEXT column")
logger.info(f" - JSON string length: {len(retrieved_data_for_db)}")
else:
logger.info(f" - Retrieved data is None, using NULL")
retrieved_data_for_db = None
# Build SQL with retrieved_data as a TEXT column parameter
sql = f"""INSERT INTO user_feedback (
feedback_id,
open_ended_feedback,
score,
is_feedback_about_last_retrieval,
conversation_id,
timestamp,
message_count,
has_retrievals,
retrieval_count,
user_query,
bot_response,
created_at,
retrieved_data
) VALUES (
%(feedback_id)s, %(open_ended_feedback)s, %(score)s, %(is_feedback_about_last_retrieval)s,
%(conversation_id)s, %(timestamp)s, %(message_count)s, %(has_retrievals)s,
%(retrieval_count)s, %(user_query)s, %(bot_response)s, %(created_at)s,
%(retrieved_data)s
)"""
logger.info("πŸ“ SQL PREPARATION: Building INSERT statement...")
logger.info(f" - Target table: user_feedback")
logger.info(f" - Database: {self.database}")
logger.info(f" - Schema: {self.schema}")
# Prepare parameters
params = {
'feedback_id': feedback.feedback_id,
'open_ended_feedback': feedback.open_ended_feedback,
'score': feedback.score,
'is_feedback_about_last_retrieval': feedback.is_feedback_about_last_retrieval,
'conversation_id': feedback.conversation_id,
'timestamp': int(feedback.timestamp),
'message_count': feedback.message_count,
'has_retrievals': feedback.has_retrievals,
'retrieval_count': feedback.retrieval_count,
'user_query': feedback.user_query,
'bot_response': feedback.bot_response,
'created_at': feedback.created_at,
'retrieved_data': retrieved_data_for_db
}
# Execute insert
logger.info("πŸš€ SQL EXECUTION: Executing INSERT query...")
cursor.execute(sql, params)
logger.info("βœ… SQL EXECUTION: Query executed successfully")
logger.info(f" - Rows affected: 1")
logger.info(f" - Status: SUCCESS")
cursor.close()
logger.info("βœ… SNOWFLAKE INSERT: Feedback inserted successfully")
logger.info(f"πŸ“ Inserted feedback: {feedback.feedback_id}")
logger.info("=" * 80)
return True
except Exception as e:
# Check if it's a Snowflake error
if SNOWFLAKE_AVAILABLE and "ProgrammingError" in str(type(e)):
logger.error(f"❌ SQL EXECUTION ERROR: {e}")
logger.error(f" - Error code: {getattr(e, 'errno', 'Unknown')}")
logger.error(f" - SQL state: {getattr(e, 'sqlstate', 'Unknown')}")
else:
logger.error(f"❌ SNOWFLAKE INSERT FAILED: {type(e).__name__}")
logger.error(f" - Error: {e}")
logger.error("=" * 80)
return False
def __enter__(self):
"""Context manager entry"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.disconnect()
def get_snowflake_connector_from_env() -> Optional[SnowflakeFeedbackConnector]:
"""Create Snowflake connector from environment variables"""
user = os.getenv("SNOWFLAKE_USER")
password = os.getenv("SNOWFLAKE_PASSWORD")
account = os.getenv("SNOWFLAKE_ACCOUNT")
warehouse = os.getenv("SNOWFLAKE_WAREHOUSE")
database = os.getenv("SNOWFLAKE_DATABASE", "SNOWFLAKE_LEARN")
schema = os.getenv("SNOWFLAKE_SCHEMA", "PUBLIC")
if not all([user, password, account, warehouse]):
print("⚠️ Snowflake credentials not found in environment variables")
print("Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE")
return None
return SnowflakeFeedbackConnector(
user=user,
password=password,
account=account,
warehouse=warehouse,
database=database,
schema=schema
)
def save_to_snowflake(feedback: UserFeedback) -> bool:
"""Helper function to save feedback to Snowflake"""
logger.info("=" * 80)
logger.info("πŸ”΅ SNOWFLAKE SAVE: Starting save process")
logger.info(f"πŸ“ Feedback ID: {feedback.feedback_id}")
connector = get_snowflake_connector_from_env()
if not connector:
logger.warning("⚠️ SNOWFLAKE SAVE: Skipping insertion (credentials not configured)")
logger.warning(" Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE")
logger.info("=" * 80)
return False
try:
logger.info("πŸ“‘ SNOWFLAKE SAVE: Establishing connection...")
connector.connect()
logger.info("βœ… SNOWFLAKE SAVE: Connection established")
logger.info("πŸ“₯ SNOWFLAKE SAVE: Attempting to insert feedback...")
success = connector.insert_feedback(feedback)
logger.info("πŸ”Œ SNOWFLAKE SAVE: Disconnecting...")
connector.disconnect()
if success:
logger.info("βœ… SNOWFLAKE SAVE: Successfully saved feedback")
else:
logger.error("❌ SNOWFLAKE SAVE: Failed to save feedback")
logger.info("=" * 80)
return success
except Exception as e:
logger.error(f"❌ SNOWFLAKE SAVE ERROR: {type(e).__name__}")
logger.error(f" - Error: {e}")
logger.info("=" * 80)
return False