Spaces:
Sleeping
Sleeping
| """ | |
| Snowflake Connector for Feedback System | |
| This module handles inserting user feedback into Snowflake. | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from typing import Dict, Any, Optional | |
| from src.reporting.feedback_schema import UserFeedback | |
| # Try to import snowflake connector | |
| try: | |
| import snowflake.connector | |
| SNOWFLAKE_AVAILABLE = True | |
| except ImportError: | |
| SNOWFLAKE_AVAILABLE = False | |
| logging.warning("β οΈ snowflake-connector-python not installed. Install with: pip install snowflake-connector-python") | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class SnowflakeFeedbackConnector: | |
| """Connector for inserting feedback into Snowflake""" | |
| def __init__( | |
| self, | |
| user: str, | |
| password: str, | |
| account: str, | |
| warehouse: str, | |
| database: str = "SNOWFLAKE_LEARNING", | |
| schema: str = "PUBLIC" | |
| ): | |
| self.user = user | |
| self.password = password | |
| self.account = account | |
| self.warehouse = warehouse | |
| self.database = database | |
| self.schema = schema | |
| self._connection = None | |
| def connect(self): | |
| """Establish Snowflake connection""" | |
| if not SNOWFLAKE_AVAILABLE: | |
| raise ImportError("snowflake-connector-python is not installed. Install with: pip install snowflake-connector-python") | |
| logger.info("=" * 80) | |
| logger.info("π SNOWFLAKE CONNECTION: Attempting to connect...") | |
| logger.info(f" - Account: {self.account}") | |
| logger.info(f" - Warehouse: {self.warehouse}") | |
| logger.info(f" - Database: {self.database}") | |
| logger.info(f" - Schema: {self.schema}") | |
| logger.info(f" - User: {self.user}") | |
| try: | |
| self._connection = snowflake.connector.connect( | |
| user=self.user, | |
| password=self.password, | |
| account=self.account, | |
| warehouse=self.warehouse | |
| # Don't set database/schema in connection - we'll do it per query | |
| ) | |
| logger.info("β SNOWFLAKE CONNECTION: Successfully connected") | |
| logger.info("=" * 80) | |
| print(f"β Connected to Snowflake: {self.database}.{self.schema}") | |
| except Exception as e: | |
| logger.error(f"β SNOWFLAKE CONNECTION FAILED: {e}") | |
| logger.error("=" * 80) | |
| print(f"β Failed to connect to Snowflake: {e}") | |
| raise | |
| def disconnect(self): | |
| """Close Snowflake connection""" | |
| if self._connection: | |
| self._connection.close() | |
| print("β Disconnected from Snowflake") | |
| def insert_feedback(self, feedback: UserFeedback) -> bool: | |
| """Insert a single feedback record into Snowflake""" | |
| logger.info("=" * 80) | |
| logger.info("π SNOWFLAKE INSERT: Starting feedback insertion process") | |
| logger.info(f"π Feedback ID: {feedback.feedback_id}") | |
| if not self._connection: | |
| logger.error("β Not connected to Snowflake. Call connect() first.") | |
| raise RuntimeError("Not connected to Snowflake. Call connect() first.") | |
| try: | |
| logger.info("π VALIDATION: Validating feedback data structure...") | |
| # Validate feedback object | |
| validation_errors = [] | |
| if not feedback.feedback_id: | |
| validation_errors.append("Missing feedback_id") | |
| if feedback.score is None: | |
| validation_errors.append("Missing score") | |
| if feedback.timestamp is None: | |
| validation_errors.append("Missing timestamp") | |
| if validation_errors: | |
| logger.error(f"β VALIDATION FAILED: {validation_errors}") | |
| return False | |
| else: | |
| logger.info("β VALIDATION PASSED: All required fields present") | |
| logger.info("π Data Summary:") | |
| logger.info(f" - Feedback ID: {feedback.feedback_id}") | |
| logger.info(f" - Score: {feedback.score}") | |
| logger.info(f" - Conversation ID: {feedback.conversation_id}") | |
| logger.info(f" - Has Retrievals: {feedback.has_retrievals}") | |
| logger.info(f" - Retrieval Count: {feedback.retrieval_count}") | |
| logger.info(f" - Message Count: {feedback.message_count}") | |
| logger.info(f" - Timestamp: {feedback.timestamp}") | |
| cursor = self._connection.cursor() | |
| logger.info("β SNOWFLAKE CONNECTION: Cursor created") | |
| # Set database and schema context | |
| logger.info(f"π§ SETTING CONTEXT: Database={self.database}, Schema={self.schema}") | |
| try: | |
| cursor.execute(f'USE DATABASE "{self.database}"') | |
| cursor.execute(f'USE SCHEMA "{self.schema}"') | |
| cursor.execute("SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()") | |
| current_db, current_schema = cursor.fetchone() | |
| logger.info(f"β Current context verified: Database={current_db}, Schema={current_schema}") | |
| except Exception as e: | |
| logger.error(f"β Could not set context: {e}") | |
| raise | |
| # Prepare data | |
| logger.info("π§ DATA PREPARATION: Preparing retrieved_data...") | |
| retrieved_data_raw = feedback.to_dict()['retrieved_data'] | |
| logger.info(f" - Retrieved data type (raw): {type(retrieved_data_raw).__name__}") | |
| logger.info(f" - Retrieved data: {repr(retrieved_data_raw)[:200]}") | |
| # If retrieved_data is already a string (from UI), parse it | |
| if isinstance(retrieved_data_raw, str): | |
| logger.info(" - Parsing string to Python object") | |
| retrieved_data = json.loads(retrieved_data_raw) | |
| elif retrieved_data_raw is None: | |
| retrieved_data = None | |
| else: | |
| # It's already a Python object (list/dict) | |
| logger.info(" - Data is already a Python object") | |
| retrieved_data = retrieved_data_raw | |
| logger.info(f" - Retrieved data size: {len(str(retrieved_data)) if retrieved_data else 0} characters") | |
| logger.info(f" - Retrieved data type: {type(retrieved_data).__name__}") | |
| # Convert to JSON string for TEXT column | |
| if retrieved_data: | |
| retrieved_data_for_db = json.dumps(retrieved_data) | |
| logger.info(f" - Converting to JSON string for TEXT column") | |
| logger.info(f" - JSON string length: {len(retrieved_data_for_db)}") | |
| else: | |
| logger.info(f" - Retrieved data is None, using NULL") | |
| retrieved_data_for_db = None | |
| # Build SQL with retrieved_data as a TEXT column parameter | |
| sql = f"""INSERT INTO user_feedback ( | |
| feedback_id, | |
| open_ended_feedback, | |
| score, | |
| is_feedback_about_last_retrieval, | |
| conversation_id, | |
| timestamp, | |
| message_count, | |
| has_retrievals, | |
| retrieval_count, | |
| user_query, | |
| bot_response, | |
| created_at, | |
| retrieved_data | |
| ) VALUES ( | |
| %(feedback_id)s, %(open_ended_feedback)s, %(score)s, %(is_feedback_about_last_retrieval)s, | |
| %(conversation_id)s, %(timestamp)s, %(message_count)s, %(has_retrievals)s, | |
| %(retrieval_count)s, %(user_query)s, %(bot_response)s, %(created_at)s, | |
| %(retrieved_data)s | |
| )""" | |
| logger.info("π SQL PREPARATION: Building INSERT statement...") | |
| logger.info(f" - Target table: user_feedback") | |
| logger.info(f" - Database: {self.database}") | |
| logger.info(f" - Schema: {self.schema}") | |
| # Prepare parameters | |
| params = { | |
| 'feedback_id': feedback.feedback_id, | |
| 'open_ended_feedback': feedback.open_ended_feedback, | |
| 'score': feedback.score, | |
| 'is_feedback_about_last_retrieval': feedback.is_feedback_about_last_retrieval, | |
| 'conversation_id': feedback.conversation_id, | |
| 'timestamp': int(feedback.timestamp), | |
| 'message_count': feedback.message_count, | |
| 'has_retrievals': feedback.has_retrievals, | |
| 'retrieval_count': feedback.retrieval_count, | |
| 'user_query': feedback.user_query, | |
| 'bot_response': feedback.bot_response, | |
| 'created_at': feedback.created_at, | |
| 'retrieved_data': retrieved_data_for_db | |
| } | |
| # Execute insert | |
| logger.info("π SQL EXECUTION: Executing INSERT query...") | |
| cursor.execute(sql, params) | |
| logger.info("β SQL EXECUTION: Query executed successfully") | |
| logger.info(f" - Rows affected: 1") | |
| logger.info(f" - Status: SUCCESS") | |
| cursor.close() | |
| logger.info("β SNOWFLAKE INSERT: Feedback inserted successfully") | |
| logger.info(f"π Inserted feedback: {feedback.feedback_id}") | |
| logger.info("=" * 80) | |
| return True | |
| except Exception as e: | |
| # Check if it's a Snowflake error | |
| if SNOWFLAKE_AVAILABLE and "ProgrammingError" in str(type(e)): | |
| logger.error(f"β SQL EXECUTION ERROR: {e}") | |
| logger.error(f" - Error code: {getattr(e, 'errno', 'Unknown')}") | |
| logger.error(f" - SQL state: {getattr(e, 'sqlstate', 'Unknown')}") | |
| else: | |
| logger.error(f"β SNOWFLAKE INSERT FAILED: {type(e).__name__}") | |
| logger.error(f" - Error: {e}") | |
| logger.error("=" * 80) | |
| return False | |
| def __enter__(self): | |
| """Context manager entry""" | |
| self.connect() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit""" | |
| self.disconnect() | |
| def get_snowflake_connector_from_env() -> Optional[SnowflakeFeedbackConnector]: | |
| """Create Snowflake connector from environment variables""" | |
| user = os.getenv("SNOWFLAKE_USER") | |
| password = os.getenv("SNOWFLAKE_PASSWORD") | |
| account = os.getenv("SNOWFLAKE_ACCOUNT") | |
| warehouse = os.getenv("SNOWFLAKE_WAREHOUSE") | |
| database = os.getenv("SNOWFLAKE_DATABASE", "SNOWFLAKE_LEARN") | |
| schema = os.getenv("SNOWFLAKE_SCHEMA", "PUBLIC") | |
| if not all([user, password, account, warehouse]): | |
| print("β οΈ Snowflake credentials not found in environment variables") | |
| print("Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE") | |
| return None | |
| return SnowflakeFeedbackConnector( | |
| user=user, | |
| password=password, | |
| account=account, | |
| warehouse=warehouse, | |
| database=database, | |
| schema=schema | |
| ) | |
| def save_to_snowflake(feedback: UserFeedback) -> bool: | |
| """Helper function to save feedback to Snowflake""" | |
| logger.info("=" * 80) | |
| logger.info("π΅ SNOWFLAKE SAVE: Starting save process") | |
| logger.info(f"π Feedback ID: {feedback.feedback_id}") | |
| connector = get_snowflake_connector_from_env() | |
| if not connector: | |
| logger.warning("β οΈ SNOWFLAKE SAVE: Skipping insertion (credentials not configured)") | |
| logger.warning(" Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE") | |
| logger.info("=" * 80) | |
| return False | |
| try: | |
| logger.info("π‘ SNOWFLAKE SAVE: Establishing connection...") | |
| connector.connect() | |
| logger.info("β SNOWFLAKE SAVE: Connection established") | |
| logger.info("π₯ SNOWFLAKE SAVE: Attempting to insert feedback...") | |
| success = connector.insert_feedback(feedback) | |
| logger.info("π SNOWFLAKE SAVE: Disconnecting...") | |
| connector.disconnect() | |
| if success: | |
| logger.info("β SNOWFLAKE SAVE: Successfully saved feedback") | |
| else: | |
| logger.error("β SNOWFLAKE SAVE: Failed to save feedback") | |
| logger.info("=" * 80) | |
| return success | |
| except Exception as e: | |
| logger.error(f"β SNOWFLAKE SAVE ERROR: {type(e).__name__}") | |
| logger.error(f" - Error: {e}") | |
| logger.info("=" * 80) | |
| return False | |