""" Snowflake Connector for Feedback System This module handles inserting user feedback into Snowflake. """ import os import json import logging from typing import Dict, Any, Optional from src.reporting.feedback_schema import UserFeedback # Try to import snowflake connector try: import snowflake.connector SNOWFLAKE_AVAILABLE = True except ImportError: SNOWFLAKE_AVAILABLE = False logging.warning("⚠️ snowflake-connector-python not installed. Install with: pip install snowflake-connector-python") # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class SnowflakeFeedbackConnector: """Connector for inserting feedback into Snowflake""" def __init__( self, user: str, password: str, account: str, warehouse: str, database: str = "SNOWFLAKE_LEARNING", schema: str = "PUBLIC" ): self.user = user self.password = password self.account = account self.warehouse = warehouse self.database = database self.schema = schema self._connection = None def connect(self): """Establish Snowflake connection""" if not SNOWFLAKE_AVAILABLE: raise ImportError("snowflake-connector-python is not installed. Install with: pip install snowflake-connector-python") logger.info("=" * 80) logger.info("🔌 SNOWFLAKE CONNECTION: Attempting to connect...") logger.info(f" - Account: {self.account}") logger.info(f" - Warehouse: {self.warehouse}") logger.info(f" - Database: {self.database}") logger.info(f" - Schema: {self.schema}") logger.info(f" - User: {self.user}") try: self._connection = snowflake.connector.connect( user=self.user, password=self.password, account=self.account, warehouse=self.warehouse # Don't set database/schema in connection - we'll do it per query ) logger.info("✅ SNOWFLAKE CONNECTION: Successfully connected") logger.info("=" * 80) print(f"✅ Connected to Snowflake: {self.database}.{self.schema}") except Exception as e: logger.error(f"❌ SNOWFLAKE CONNECTION FAILED: {e}") logger.error("=" * 80) print(f"❌ Failed to connect to Snowflake: {e}") raise def disconnect(self): """Close Snowflake connection""" if self._connection: self._connection.close() print("✅ Disconnected from Snowflake") def insert_feedback(self, feedback: UserFeedback) -> bool: """Insert a single feedback record into Snowflake""" logger.info("=" * 80) logger.info("🔄 SNOWFLAKE INSERT: Starting feedback insertion process") logger.info(f"📝 Feedback ID: {feedback.feedback_id}") if not self._connection: logger.error("❌ Not connected to Snowflake. Call connect() first.") raise RuntimeError("Not connected to Snowflake. Call connect() first.") try: logger.info("📊 VALIDATION: Validating feedback data structure...") # Validate feedback object validation_errors = [] if not feedback.feedback_id: validation_errors.append("Missing feedback_id") if feedback.score is None: validation_errors.append("Missing score") if feedback.timestamp is None: validation_errors.append("Missing timestamp") if validation_errors: logger.error(f"❌ VALIDATION FAILED: {validation_errors}") return False else: logger.info("✅ VALIDATION PASSED: All required fields present") logger.info("📋 Data Summary:") logger.info(f" - Feedback ID: {feedback.feedback_id}") logger.info(f" - Score: {feedback.score}") logger.info(f" - Conversation ID: {feedback.conversation_id}") logger.info(f" - Has Retrievals: {feedback.has_retrievals}") logger.info(f" - Retrieval Count: {feedback.retrieval_count}") logger.info(f" - Message Count: {feedback.message_count}") logger.info(f" - Timestamp: {feedback.timestamp}") cursor = self._connection.cursor() logger.info("✅ SNOWFLAKE CONNECTION: Cursor created") # Set database and schema context logger.info(f"🔧 SETTING CONTEXT: Database={self.database}, Schema={self.schema}") try: cursor.execute(f'USE DATABASE "{self.database}"') cursor.execute(f'USE SCHEMA "{self.schema}"') cursor.execute("SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()") current_db, current_schema = cursor.fetchone() logger.info(f"✅ Current context verified: Database={current_db}, Schema={current_schema}") except Exception as e: logger.error(f"❌ Could not set context: {e}") raise # Prepare data logger.info("🔧 DATA PREPARATION: Preparing retrieved_data...") retrieved_data_raw = feedback.to_dict()['retrieved_data'] logger.info(f" - Retrieved data type (raw): {type(retrieved_data_raw).__name__}") logger.info(f" - Retrieved data: {repr(retrieved_data_raw)[:200]}") # If retrieved_data is already a string (from UI), parse it if isinstance(retrieved_data_raw, str): logger.info(" - Parsing string to Python object") retrieved_data = json.loads(retrieved_data_raw) elif retrieved_data_raw is None: retrieved_data = None else: # It's already a Python object (list/dict) logger.info(" - Data is already a Python object") retrieved_data = retrieved_data_raw logger.info(f" - Retrieved data size: {len(str(retrieved_data)) if retrieved_data else 0} characters") logger.info(f" - Retrieved data type: {type(retrieved_data).__name__}") # Convert to JSON string for TEXT column if retrieved_data: retrieved_data_for_db = json.dumps(retrieved_data) logger.info(f" - Converting to JSON string for TEXT column") logger.info(f" - JSON string length: {len(retrieved_data_for_db)}") else: logger.info(f" - Retrieved data is None, using NULL") retrieved_data_for_db = None # Build SQL with retrieved_data as a TEXT column parameter sql = f"""INSERT INTO user_feedback ( feedback_id, open_ended_feedback, score, is_feedback_about_last_retrieval, conversation_id, timestamp, message_count, has_retrievals, retrieval_count, user_query, bot_response, created_at, retrieved_data ) VALUES ( %(feedback_id)s, %(open_ended_feedback)s, %(score)s, %(is_feedback_about_last_retrieval)s, %(conversation_id)s, %(timestamp)s, %(message_count)s, %(has_retrievals)s, %(retrieval_count)s, %(user_query)s, %(bot_response)s, %(created_at)s, %(retrieved_data)s )""" logger.info("📝 SQL PREPARATION: Building INSERT statement...") logger.info(f" - Target table: user_feedback") logger.info(f" - Database: {self.database}") logger.info(f" - Schema: {self.schema}") # Prepare parameters params = { 'feedback_id': feedback.feedback_id, 'open_ended_feedback': feedback.open_ended_feedback, 'score': feedback.score, 'is_feedback_about_last_retrieval': feedback.is_feedback_about_last_retrieval, 'conversation_id': feedback.conversation_id, 'timestamp': int(feedback.timestamp), 'message_count': feedback.message_count, 'has_retrievals': feedback.has_retrievals, 'retrieval_count': feedback.retrieval_count, 'user_query': feedback.user_query, 'bot_response': feedback.bot_response, 'created_at': feedback.created_at, 'retrieved_data': retrieved_data_for_db } # Execute insert logger.info("🚀 SQL EXECUTION: Executing INSERT query...") cursor.execute(sql, params) logger.info("✅ SQL EXECUTION: Query executed successfully") logger.info(f" - Rows affected: 1") logger.info(f" - Status: SUCCESS") cursor.close() logger.info("✅ SNOWFLAKE INSERT: Feedback inserted successfully") logger.info(f"📝 Inserted feedback: {feedback.feedback_id}") logger.info("=" * 80) return True except Exception as e: # Check if it's a Snowflake error if SNOWFLAKE_AVAILABLE and "ProgrammingError" in str(type(e)): logger.error(f"❌ SQL EXECUTION ERROR: {e}") logger.error(f" - Error code: {getattr(e, 'errno', 'Unknown')}") logger.error(f" - SQL state: {getattr(e, 'sqlstate', 'Unknown')}") else: logger.error(f"❌ SNOWFLAKE INSERT FAILED: {type(e).__name__}") logger.error(f" - Error: {e}") logger.error("=" * 80) return False def __enter__(self): """Context manager entry""" self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit""" self.disconnect() def get_snowflake_connector_from_env() -> Optional[SnowflakeFeedbackConnector]: """Create Snowflake connector from environment variables""" user = os.getenv("SNOWFLAKE_USER") password = os.getenv("SNOWFLAKE_PASSWORD") account = os.getenv("SNOWFLAKE_ACCOUNT") warehouse = os.getenv("SNOWFLAKE_WAREHOUSE") database = os.getenv("SNOWFLAKE_DATABASE", "SNOWFLAKE_LEARN") schema = os.getenv("SNOWFLAKE_SCHEMA", "PUBLIC") if not all([user, password, account, warehouse]): print("⚠️ Snowflake credentials not found in environment variables") print("Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE") return None return SnowflakeFeedbackConnector( user=user, password=password, account=account, warehouse=warehouse, database=database, schema=schema ) def save_to_snowflake(feedback: UserFeedback) -> bool: """Helper function to save feedback to Snowflake""" logger.info("=" * 80) logger.info("🔵 SNOWFLAKE SAVE: Starting save process") logger.info(f"📝 Feedback ID: {feedback.feedback_id}") connector = get_snowflake_connector_from_env() if not connector: logger.warning("⚠️ SNOWFLAKE SAVE: Skipping insertion (credentials not configured)") logger.warning(" Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE") logger.info("=" * 80) return False try: logger.info("📡 SNOWFLAKE SAVE: Establishing connection...") connector.connect() logger.info("✅ SNOWFLAKE SAVE: Connection established") logger.info("📥 SNOWFLAKE SAVE: Attempting to insert feedback...") success = connector.insert_feedback(feedback) logger.info("🔌 SNOWFLAKE SAVE: Disconnecting...") connector.disconnect() if success: logger.info("✅ SNOWFLAKE SAVE: Successfully saved feedback") else: logger.error("❌ SNOWFLAKE SAVE: Failed to save feedback") logger.info("=" * 80) return success except Exception as e: logger.error(f"❌ SNOWFLAKE SAVE ERROR: {type(e).__name__}") logger.error(f" - Error: {e}") logger.info("=" * 80) return False