Spaces:
Runtime error
Runtime error
| # character_chat_db.py | |
| # Database functions for managing character cards and chat histories. | |
| # # | |
| # Imports | |
| import configparser | |
| import logging | |
| import sqlite3 | |
| import json | |
| import os | |
| import sys | |
| from typing import List, Dict, Optional, Tuple, Any, Union | |
| # | |
| from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path | |
| # | |
| ####################################################################################################################### | |
| # | |
| # | |
| def ensure_database_directory(): | |
| os.makedirs(get_database_dir(), exist_ok=True) | |
| ensure_database_directory() | |
| # Construct the path to the config file | |
| config_path = get_project_relative_path('Config_Files/config.txt') | |
| # Read the config file | |
| config = configparser.ConfigParser() | |
| config.read(config_path) | |
| # Get the chat db path from the config, or use the default if not specified | |
| chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db')) | |
| print(f"Chat Database path: {chat_DB_PATH}") | |
| ######################################################################################################## | |
| # | |
| # Functions | |
| # FIXME - Setup properly and test/add documentation for its existence... | |
| def initialize_database(): | |
| """Initialize the SQLite database with required tables and FTS5 virtual tables.""" | |
| conn = None | |
| try: | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| # Enable foreign key constraints | |
| cursor.execute("PRAGMA foreign_keys = ON;") | |
| # Create CharacterCards table with V2 fields | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS CharacterCards ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT UNIQUE NOT NULL, | |
| description TEXT, | |
| personality TEXT, | |
| scenario TEXT, | |
| image BLOB, | |
| post_history_instructions TEXT, | |
| first_mes TEXT, | |
| mes_example TEXT, | |
| creator_notes TEXT, | |
| system_prompt TEXT, | |
| alternate_greetings TEXT, | |
| tags TEXT, | |
| creator TEXT, | |
| character_version TEXT, | |
| extensions TEXT, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| """) | |
| # Create FTS5 virtual table for CharacterCards | |
| cursor.execute(""" | |
| CREATE VIRTUAL TABLE IF NOT EXISTS CharacterCards_fts USING fts5( | |
| name, | |
| description, | |
| personality, | |
| scenario, | |
| system_prompt, | |
| content='CharacterCards', | |
| content_rowid='id' | |
| ); | |
| """) | |
| # Create triggers to keep FTS5 table in sync with CharacterCards | |
| cursor.executescript(""" | |
| CREATE TRIGGER IF NOT EXISTS CharacterCards_ai AFTER INSERT ON CharacterCards BEGIN | |
| INSERT INTO CharacterCards_fts( | |
| rowid, | |
| name, | |
| description, | |
| personality, | |
| scenario, | |
| system_prompt | |
| ) VALUES ( | |
| new.id, | |
| new.name, | |
| new.description, | |
| new.personality, | |
| new.scenario, | |
| new.system_prompt | |
| ); | |
| END; | |
| CREATE TRIGGER IF NOT EXISTS CharacterCards_ad AFTER DELETE ON CharacterCards BEGIN | |
| DELETE FROM CharacterCards_fts WHERE rowid = old.id; | |
| END; | |
| CREATE TRIGGER IF NOT EXISTS CharacterCards_au AFTER UPDATE ON CharacterCards BEGIN | |
| UPDATE CharacterCards_fts SET | |
| name = new.name, | |
| description = new.description, | |
| personality = new.personality, | |
| scenario = new.scenario, | |
| system_prompt = new.system_prompt | |
| WHERE rowid = new.id; | |
| END; | |
| """) | |
| # Create CharacterChats table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS CharacterChats ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| character_id INTEGER NOT NULL, | |
| conversation_name TEXT, | |
| chat_history TEXT, | |
| is_snapshot BOOLEAN DEFAULT FALSE, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE | |
| ); | |
| """) | |
| # Create FTS5 virtual table for CharacterChats | |
| cursor.execute(""" | |
| CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5( | |
| conversation_name, | |
| chat_history, | |
| content='CharacterChats', | |
| content_rowid='id' | |
| ); | |
| """) | |
| # Create triggers to keep FTS5 table in sync with CharacterChats | |
| cursor.executescript(""" | |
| CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN | |
| INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history) | |
| VALUES (new.id, new.conversation_name, new.chat_history); | |
| END; | |
| CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN | |
| DELETE FROM CharacterChats_fts WHERE rowid = old.id; | |
| END; | |
| CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN | |
| UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history | |
| WHERE rowid = new.id; | |
| END; | |
| """) | |
| # Create ChatKeywords table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS ChatKeywords ( | |
| chat_id INTEGER NOT NULL, | |
| keyword TEXT NOT NULL, | |
| FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE | |
| ); | |
| """) | |
| # Create indexes for faster searches | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword); | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id); | |
| """) | |
| conn.commit() | |
| logging.info("Database initialized successfully.") | |
| except sqlite3.Error as e: | |
| logging.error(f"SQLite error occurred during database initialization: {e}") | |
| if conn: | |
| conn.rollback() | |
| raise | |
| except Exception as e: | |
| logging.error(f"Unexpected error occurred during database initialization: {e}") | |
| if conn: | |
| conn.rollback() | |
| raise | |
| finally: | |
| if conn: | |
| conn.close() | |
| # Call initialize_database() at the start of your application | |
| def setup_chat_database(): | |
| try: | |
| initialize_database() | |
| except Exception as e: | |
| logging.critical(f"Failed to initialize database: {e}") | |
| sys.exit(1) | |
| setup_chat_database() | |
| ######################################################################################################## | |
| # | |
| # Character Card handling | |
| def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Parse and validate a character card according to V2 specification.""" | |
| v2_data = { | |
| 'name': card_data.get('name', ''), | |
| 'description': card_data.get('description', ''), | |
| 'personality': card_data.get('personality', ''), | |
| 'scenario': card_data.get('scenario', ''), | |
| 'first_mes': card_data.get('first_mes', ''), | |
| 'mes_example': card_data.get('mes_example', ''), | |
| 'creator_notes': card_data.get('creator_notes', ''), | |
| 'system_prompt': card_data.get('system_prompt', ''), | |
| 'post_history_instructions': card_data.get('post_history_instructions', ''), | |
| 'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])), | |
| 'tags': json.dumps(card_data.get('tags', [])), | |
| 'creator': card_data.get('creator', ''), | |
| 'character_version': card_data.get('character_version', ''), | |
| 'extensions': json.dumps(card_data.get('extensions', {})) | |
| } | |
| # Handle 'image' separately as it might be binary data | |
| if 'image' in card_data: | |
| v2_data['image'] = card_data['image'] | |
| return v2_data | |
| def add_character_card(card_data: Dict[str, Any]) -> Optional[int]: | |
| """Add or update a character card in the database.""" | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| parsed_card = parse_character_card(card_data) | |
| # Check if character already exists | |
| cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],)) | |
| row = cursor.fetchone() | |
| if row: | |
| # Update existing character | |
| character_id = row[0] | |
| update_query = """ | |
| UPDATE CharacterCards | |
| SET description = ?, personality = ?, scenario = ?, image = ?, | |
| post_history_instructions = ?, first_mes = ?, mes_example = ?, | |
| creator_notes = ?, system_prompt = ?, alternate_greetings = ?, | |
| tags = ?, creator = ?, character_version = ?, extensions = ? | |
| WHERE id = ? | |
| """ | |
| cursor.execute(update_query, ( | |
| parsed_card['description'], parsed_card['personality'], parsed_card['scenario'], | |
| parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'], | |
| parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'], | |
| parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'], | |
| parsed_card['character_version'], parsed_card['extensions'], character_id | |
| )) | |
| else: | |
| # Insert new character | |
| insert_query = """ | |
| INSERT INTO CharacterCards (name, description, personality, scenario, image, | |
| post_history_instructions, first_mes, mes_example, creator_notes, system_prompt, | |
| alternate_greetings, tags, creator, character_version, extensions) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """ | |
| cursor.execute(insert_query, ( | |
| parsed_card['name'], parsed_card['description'], parsed_card['personality'], | |
| parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'], | |
| parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'], | |
| parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'], | |
| parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions'] | |
| )) | |
| character_id = cursor.lastrowid | |
| conn.commit() | |
| return character_id | |
| except sqlite3.IntegrityError as e: | |
| logging.error(f"Error adding character card: {e}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Unexpected error adding character card: {e}") | |
| return None | |
| finally: | |
| conn.close() | |
| # def add_character_card(card_data: Dict) -> Optional[int]: | |
| # """Add or update a character card in the database. | |
| # | |
| # Returns the ID of the inserted character or None if failed. | |
| # """ | |
| # conn = sqlite3.connect(chat_DB_PATH) | |
| # cursor = conn.cursor() | |
| # try: | |
| # # Ensure all required fields are present | |
| # required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message'] | |
| # for field in required_fields: | |
| # if field not in card_data: | |
| # card_data[field] = '' # Assign empty string if field is missing | |
| # | |
| # # Check if character already exists | |
| # cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],)) | |
| # row = cursor.fetchone() | |
| # | |
| # if row: | |
| # # Update existing character | |
| # character_id = row[0] | |
| # cursor.execute(""" | |
| # UPDATE CharacterCards | |
| # SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ? | |
| # WHERE id = ? | |
| # """, ( | |
| # card_data['description'], | |
| # card_data['personality'], | |
| # card_data['scenario'], | |
| # card_data['image'], | |
| # card_data['post_history_instructions'], | |
| # card_data['first_message'], | |
| # character_id | |
| # )) | |
| # else: | |
| # # Insert new character | |
| # cursor.execute(""" | |
| # INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message) | |
| # VALUES (?, ?, ?, ?, ?, ?, ?) | |
| # """, ( | |
| # card_data['name'], | |
| # card_data['description'], | |
| # card_data['personality'], | |
| # card_data['scenario'], | |
| # card_data['image'], | |
| # card_data['post_history_instructions'], | |
| # card_data['first_message'] | |
| # )) | |
| # character_id = cursor.lastrowid | |
| # | |
| # conn.commit() | |
| # return cursor.lastrowid | |
| # except sqlite3.IntegrityError as e: | |
| # logging.error(f"Error adding character card: {e}") | |
| # return None | |
| # except Exception as e: | |
| # logging.error(f"Unexpected error adding character card: {e}") | |
| # return None | |
| # finally: | |
| # conn.close() | |
| def get_character_cards() -> List[Dict]: | |
| """Retrieve all character cards from the database.""" | |
| logging.debug(f"Fetching characters from DB: {chat_DB_PATH}") | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM CharacterCards") | |
| rows = cursor.fetchall() | |
| columns = [description[0] for description in cursor.description] | |
| conn.close() | |
| characters = [dict(zip(columns, row)) for row in rows] | |
| #logging.debug(f"Characters fetched from DB: {characters}") | |
| return characters | |
| def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve a single character card by its ID. | |
| Args: | |
| character_id: Can be either an integer ID or a dictionary containing character data. | |
| Returns: | |
| A dictionary containing the character card data, or None if not found. | |
| """ | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| if isinstance(character_id, dict): | |
| # If a dictionary is passed, assume it's already a character card | |
| return character_id | |
| elif isinstance(character_id, int): | |
| # If an integer is passed, fetch the character from the database | |
| cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| columns = [description[0] for description in cursor.description] | |
| return dict(zip(columns, row)) | |
| else: | |
| logging.warning(f"Invalid type for character_id: {type(character_id)}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error in get_character_card_by_id: {e}") | |
| return None | |
| finally: | |
| conn.close() | |
| def update_character_card(character_id: int, card_data: Dict) -> bool: | |
| """Update an existing character card.""" | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute(""" | |
| UPDATE CharacterCards | |
| SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ? | |
| WHERE id = ? | |
| """, ( | |
| card_data.get('name'), | |
| card_data.get('description'), | |
| card_data.get('personality'), | |
| card_data.get('scenario'), | |
| card_data.get('image'), | |
| card_data.get('post_history_instructions', ''), | |
| card_data.get('first_message', "Hello! I'm ready to chat."), | |
| character_id | |
| )) | |
| conn.commit() | |
| return cursor.rowcount > 0 | |
| except sqlite3.IntegrityError as e: | |
| logging.error(f"Error updating character card: {e}") | |
| return False | |
| finally: | |
| conn.close() | |
| def delete_character_card(character_id: int) -> bool: | |
| """Delete a character card and its associated chats.""" | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| # Delete associated chats first due to foreign key constraint | |
| cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,)) | |
| cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,)) | |
| conn.commit() | |
| return cursor.rowcount > 0 | |
| except sqlite3.Error as e: | |
| logging.error(f"Error deleting character card: {e}") | |
| return False | |
| finally: | |
| conn.close() | |
| def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]: | |
| """ | |
| Add a new chat history for a character, optionally associating keywords. | |
| Args: | |
| character_id (int): The ID of the character. | |
| conversation_name (str): Name of the conversation. | |
| chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples. | |
| keywords (Optional[List[str]]): List of keywords to associate with this chat. | |
| is_snapshot (bool, optional): Whether this chat is a snapshot. | |
| Returns: | |
| Optional[int]: The ID of the inserted chat or None if failed. | |
| """ | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| chat_history_json = json.dumps(chat_history) | |
| cursor.execute(""" | |
| INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot) | |
| VALUES (?, ?, ?, ?) | |
| """, ( | |
| character_id, | |
| conversation_name, | |
| chat_history_json, | |
| is_snapshot | |
| )) | |
| chat_id = cursor.lastrowid | |
| if keywords: | |
| # Insert keywords into ChatKeywords table | |
| keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords] | |
| cursor.executemany(""" | |
| INSERT INTO ChatKeywords (chat_id, keyword) | |
| VALUES (?, ?) | |
| """, keyword_records) | |
| conn.commit() | |
| return chat_id | |
| except sqlite3.Error as e: | |
| logging.error(f"Error adding character chat: {e}") | |
| return None | |
| finally: | |
| conn.close() | |
| def get_character_chats(character_id: Optional[int] = None) -> List[Dict]: | |
| """Retrieve all chats, or chats for a specific character if character_id is provided.""" | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| if character_id is not None: | |
| cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,)) | |
| else: | |
| cursor.execute("SELECT * FROM CharacterChats") | |
| rows = cursor.fetchall() | |
| columns = [description[0] for description in cursor.description] | |
| conn.close() | |
| return [dict(zip(columns, row)) for row in rows] | |
| def get_character_chat_by_id(chat_id: int) -> Optional[Dict]: | |
| """Retrieve a single chat by its ID.""" | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| if row: | |
| columns = [description[0] for description in cursor.description] | |
| chat = dict(zip(columns, row)) | |
| chat['chat_history'] = json.loads(chat['chat_history']) | |
| return chat | |
| return None | |
| def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]: | |
| """ | |
| Search for character chats using FTS5, optionally filtered by character_id. | |
| Args: | |
| query (str): The search query. | |
| character_id (Optional[int]): The ID of the character to filter chats by. | |
| Returns: | |
| Tuple[List[Dict], str]: A list of matching chats and a status message. | |
| """ | |
| if not query.strip(): | |
| return [], "Please enter a search query." | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| if character_id is not None: | |
| # Search with character_id filter | |
| cursor.execute(""" | |
| SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history | |
| FROM CharacterChats_fts | |
| JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id | |
| WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ? | |
| ORDER BY rank | |
| """, (query, character_id)) | |
| else: | |
| # Search without character_id filter | |
| cursor.execute(""" | |
| SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history | |
| FROM CharacterChats_fts | |
| JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id | |
| WHERE CharacterChats_fts MATCH ? | |
| ORDER BY rank | |
| """, (query,)) | |
| rows = cursor.fetchall() | |
| columns = [description[0] for description in cursor.description] | |
| results = [dict(zip(columns, row)) for row in rows] | |
| if character_id is not None: | |
| status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character." | |
| else: | |
| status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters." | |
| return results, status_message | |
| except Exception as e: | |
| logging.error(f"Error searching chats with FTS5: {e}") | |
| return [], f"Error occurred during search: {e}" | |
| finally: | |
| conn.close() | |
| def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool: | |
| """Update an existing chat history.""" | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| chat_history_json = json.dumps(chat_history) | |
| cursor.execute(""" | |
| UPDATE CharacterChats | |
| SET chat_history = ? | |
| WHERE id = ? | |
| """, ( | |
| chat_history_json, | |
| chat_id | |
| )) | |
| conn.commit() | |
| return cursor.rowcount > 0 | |
| except sqlite3.Error as e: | |
| logging.error(f"Error updating character chat: {e}") | |
| return False | |
| finally: | |
| conn.close() | |
| def delete_character_chat(chat_id: int) -> bool: | |
| """Delete a specific chat.""" | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,)) | |
| conn.commit() | |
| return cursor.rowcount > 0 | |
| except sqlite3.Error as e: | |
| logging.error(f"Error deleting character chat: {e}") | |
| return False | |
| finally: | |
| conn.close() | |
| def fetch_keywords_for_chats(keywords: List[str]) -> List[int]: | |
| """ | |
| Fetch chat IDs associated with any of the specified keywords. | |
| Args: | |
| keywords (List[str]): List of keywords to search for. | |
| Returns: | |
| List[int]: List of chat IDs associated with the keywords. | |
| """ | |
| if not keywords: | |
| return [] | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| # Construct the WHERE clause to search for each keyword | |
| keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords)) | |
| sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}" | |
| cursor.execute(sql_query, keywords) | |
| rows = cursor.fetchall() | |
| chat_ids = [row[0] for row in rows] | |
| return chat_ids | |
| except Exception as e: | |
| logging.error(f"Error in fetch_keywords_for_chats: {e}") | |
| return [] | |
| finally: | |
| conn.close() | |
| def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]: | |
| """Save chat history to the CharacterChats table. | |
| Returns the ID of the inserted chat or None if failed. | |
| """ | |
| return add_character_chat(character_id, conversation_name, chat_history) | |
| def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Perform a full-text search on specified fields with optional filtering and pagination. | |
| Args: | |
| query (str): The search query. | |
| fields (List[str]): List of fields to search in. | |
| where_clause (str, optional): Additional SQL WHERE clause to filter results. | |
| page (int, optional): Page number for pagination. | |
| results_per_page (int, optional): Number of results per page. | |
| Returns: | |
| List[Dict[str, Any]]: List of matching chat records with content and metadata. | |
| """ | |
| if not query.strip(): | |
| return [] | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| # Construct the MATCH query for FTS5 | |
| match_query = " AND ".join(fields) + f" MATCH ?" | |
| # Adjust the query with the fields | |
| fts_query = f""" | |
| SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history | |
| FROM CharacterChats_fts | |
| JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id | |
| WHERE {match_query} | |
| """ | |
| if where_clause: | |
| fts_query += f" AND ({where_clause})" | |
| fts_query += " ORDER BY rank LIMIT ? OFFSET ?" | |
| offset = (page - 1) * results_per_page | |
| cursor.execute(fts_query, (query, results_per_page, offset)) | |
| rows = cursor.fetchall() | |
| columns = [description[0] for description in cursor.description] | |
| results = [dict(zip(columns, row)) for row in rows] | |
| return results | |
| except Exception as e: | |
| logging.error(f"Error in search_db: {e}") | |
| return [] | |
| finally: | |
| conn.close() | |
| def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \ | |
| List[Dict[str, Any]]: | |
| """ | |
| Perform a full-text search within the specified chat IDs using FTS5. | |
| Args: | |
| query (str): The user's query. | |
| relevant_chat_ids (List[int]): List of chat IDs to search within. | |
| page (int): Pagination page number. | |
| results_per_page (int): Number of results per page. | |
| Returns: | |
| List[Dict[str, Any]]: List of search results with content and metadata. | |
| """ | |
| try: | |
| # Construct a WHERE clause to limit the search to relevant chat IDs | |
| where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids]) | |
| if not where_clause: | |
| where_clause = "1" # No restriction if no chat IDs | |
| # Perform full-text search using FTS5 | |
| fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page) | |
| filtered_fts_results = [ | |
| { | |
| "content": result['content'], | |
| "metadata": {"media_id": result['id']} | |
| } | |
| for result in fts_results | |
| if result['id'] in relevant_chat_ids | |
| ] | |
| return filtered_fts_results | |
| except Exception as e: | |
| logging.error(f"Error in perform_full_text_search_chat: {str(e)}") | |
| return [] | |
| def fetch_all_chats() -> List[Dict[str, Any]]: | |
| """ | |
| Fetch all chat messages from the database. | |
| Returns: | |
| List[Dict[str, Any]]: List of chat messages with relevant metadata. | |
| """ | |
| try: | |
| chats = get_character_chats() # Modify this function to retrieve all chats | |
| return chats | |
| except Exception as e: | |
| logging.error(f"Error fetching all chats: {str(e)}") | |
| return [] | |
| def search_character_chat(query: str, fts_top_k: int = 10, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Perform a full-text search on the Character Chat database. | |
| Args: | |
| query: Search query string. | |
| fts_top_k: Maximum number of results to return. | |
| relevant_media_ids: Optional list of character IDs to filter results. | |
| Returns: | |
| List of search results with content and metadata. | |
| """ | |
| if not query.strip(): | |
| return [] | |
| try: | |
| # Construct a WHERE clause to limit the search to relevant character IDs | |
| where_clause = "" | |
| if relevant_media_ids: | |
| placeholders = ','.join(['?'] * len(relevant_media_ids)) | |
| where_clause = f"CharacterChats.character_id IN ({placeholders})" | |
| # Perform full-text search using existing search_db function | |
| results = search_db(query, ["conversation_name", "chat_history"], where_clause, results_per_page=fts_top_k) | |
| # Format results | |
| formatted_results = [] | |
| for r in results: | |
| formatted_results.append({ | |
| "content": r['chat_history'], | |
| "metadata": { | |
| "chat_id": r['id'], | |
| "conversation_name": r['conversation_name'], | |
| "character_id": r['character_id'] | |
| } | |
| }) | |
| return formatted_results | |
| except Exception as e: | |
| logging.error(f"Error in search_character_chat: {e}") | |
| return [] | |
| def search_character_cards(query: str, fts_top_k: int = 10, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Perform a full-text search on the Character Cards database. | |
| Args: | |
| query: Search query string. | |
| fts_top_k: Maximum number of results to return. | |
| relevant_media_ids: Optional list of character IDs to filter results. | |
| Returns: | |
| List of search results with content and metadata. | |
| """ | |
| if not query.strip(): | |
| return [] | |
| try: | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| # Construct the query | |
| sql_query = """ | |
| SELECT CharacterCards.id, CharacterCards.name, CharacterCards.description, CharacterCards.personality, CharacterCards.scenario | |
| FROM CharacterCards_fts | |
| JOIN CharacterCards ON CharacterCards_fts.rowid = CharacterCards.id | |
| WHERE CharacterCards_fts MATCH ? | |
| """ | |
| params = [query] | |
| # Add filtering by character IDs if provided | |
| if relevant_media_ids: | |
| placeholders = ','.join(['?'] * len(relevant_media_ids)) | |
| sql_query += f" AND CharacterCards.id IN ({placeholders})" | |
| params.extend(relevant_media_ids) | |
| sql_query += " LIMIT ?" | |
| params.append(fts_top_k) | |
| cursor.execute(sql_query, params) | |
| rows = cursor.fetchall() | |
| columns = [description[0] for description in cursor.description] | |
| results = [dict(zip(columns, row)) for row in rows] | |
| # Format results | |
| formatted_results = [] | |
| for r in results: | |
| content = f"Name: {r['name']}\nDescription: {r['description']}\nPersonality: {r['personality']}\nScenario: {r['scenario']}" | |
| formatted_results.append({ | |
| "content": content, | |
| "metadata": { | |
| "character_id": r['id'], | |
| "name": r['name'] | |
| } | |
| }) | |
| return formatted_results | |
| except Exception as e: | |
| logging.error(f"Error in search_character_cards: {e}") | |
| return [] | |
| finally: | |
| conn.close() | |
| def fetch_character_ids_by_keywords(keywords: List[str]) -> List[int]: | |
| """ | |
| Fetch character IDs associated with any of the specified keywords. | |
| Args: | |
| keywords (List[str]): List of keywords to search for. | |
| Returns: | |
| List[int]: List of character IDs associated with the keywords. | |
| """ | |
| if not keywords: | |
| return [] | |
| conn = sqlite3.connect(chat_DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| # Assuming 'tags' column in CharacterCards table stores tags as JSON array | |
| placeholders = ','.join(['?'] * len(keywords)) | |
| sql_query = f""" | |
| SELECT DISTINCT id FROM CharacterCards | |
| WHERE EXISTS ( | |
| SELECT 1 FROM json_each(tags) | |
| WHERE json_each.value IN ({placeholders}) | |
| ) | |
| """ | |
| cursor.execute(sql_query, keywords) | |
| rows = cursor.fetchall() | |
| character_ids = [row[0] for row in rows] | |
| return character_ids | |
| except Exception as e: | |
| logging.error(f"Error in fetch_character_ids_by_keywords: {e}") | |
| return [] | |
| finally: | |
| conn.close() | |
| ################################################################### | |
| # | |
| # Character Keywords | |
| def view_char_keywords(): | |
| try: | |
| with sqlite3.connect(chat_DB_PATH) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT DISTINCT keyword | |
| FROM CharacterCards | |
| CROSS JOIN json_each(tags) | |
| WHERE json_valid(tags) | |
| ORDER BY keyword | |
| """) | |
| keywords = cursor.fetchall() | |
| if keywords: | |
| keyword_list = [k[0] for k in keywords] | |
| return "### Current Character Keywords:\n" + "\n".join( | |
| [f"- {k}" for k in keyword_list]) | |
| return "No keywords found." | |
| except Exception as e: | |
| return f"Error retrieving keywords: {str(e)}" | |
| def add_char_keywords(name: str, keywords: str): | |
| try: | |
| keywords_list = [k.strip() for k in keywords.split(",") if k.strip()] | |
| with sqlite3.connect('character_chat.db') as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "SELECT tags FROM CharacterCards WHERE name = ?", | |
| (name,) | |
| ) | |
| result = cursor.fetchone() | |
| if not result: | |
| return "Character not found." | |
| current_tags = result[0] if result[0] else "[]" | |
| current_keywords = set(current_tags[1:-1].split(',')) if current_tags != "[]" else set() | |
| updated_keywords = current_keywords.union(set(keywords_list)) | |
| cursor.execute( | |
| "UPDATE CharacterCards SET tags = ? WHERE name = ?", | |
| (str(list(updated_keywords)), name) | |
| ) | |
| conn.commit() | |
| return f"Successfully added keywords to character {name}" | |
| except Exception as e: | |
| return f"Error adding keywords: {str(e)}" | |
| def delete_char_keyword(char_name: str, keyword: str) -> str: | |
| """ | |
| Delete a keyword from a character's tags. | |
| Args: | |
| char_name (str): The name of the character | |
| keyword (str): The keyword to delete | |
| Returns: | |
| str: Success/failure message | |
| """ | |
| try: | |
| with sqlite3.connect(chat_DB_PATH) as conn: | |
| cursor = conn.cursor() | |
| # First, check if the character exists | |
| cursor.execute("SELECT tags FROM CharacterCards WHERE name = ?", (char_name,)) | |
| result = cursor.fetchone() | |
| if not result: | |
| return f"Character '{char_name}' not found." | |
| # Parse existing tags | |
| current_tags = json.loads(result[0]) if result[0] else [] | |
| if keyword not in current_tags: | |
| return f"Keyword '{keyword}' not found in character '{char_name}' tags." | |
| # Remove the keyword | |
| updated_tags = [tag for tag in current_tags if tag != keyword] | |
| # Update the character's tags | |
| cursor.execute( | |
| "UPDATE CharacterCards SET tags = ? WHERE name = ?", | |
| (json.dumps(updated_tags), char_name) | |
| ) | |
| conn.commit() | |
| logging.info(f"Keyword '{keyword}' deleted from character '{char_name}'") | |
| return f"Successfully deleted keyword '{keyword}' from character '{char_name}'." | |
| except Exception as e: | |
| error_msg = f"Error deleting keyword: {str(e)}" | |
| logging.error(error_msg) | |
| return error_msg | |
| def export_char_keywords_to_csv() -> Tuple[str, str]: | |
| """ | |
| Export all character keywords to a CSV file with associated metadata. | |
| Returns: | |
| Tuple[str, str]: (status_message, file_path) | |
| """ | |
| import csv | |
| from tempfile import NamedTemporaryFile | |
| from datetime import datetime | |
| try: | |
| # Create a temporary CSV file | |
| temp_file = NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', newline='') | |
| with sqlite3.connect(chat_DB_PATH) as conn: | |
| cursor = conn.cursor() | |
| # Get all characters and their tags | |
| cursor.execute(""" | |
| SELECT | |
| name, | |
| tags, | |
| (SELECT COUNT(*) FROM CharacterChats WHERE CharacterChats.character_id = CharacterCards.id) as chat_count | |
| FROM CharacterCards | |
| WHERE json_valid(tags) | |
| ORDER BY name | |
| """) | |
| results = cursor.fetchall() | |
| # Process the results to create rows for the CSV | |
| csv_rows = [] | |
| for name, tags_json, chat_count in results: | |
| tags = json.loads(tags_json) if tags_json else [] | |
| for tag in tags: | |
| csv_rows.append([ | |
| tag, # keyword | |
| name, # character name | |
| chat_count # number of chats | |
| ]) | |
| # Write to CSV | |
| writer = csv.writer(temp_file) | |
| writer.writerow(['Keyword', 'Character Name', 'Number of Chats']) | |
| writer.writerows(csv_rows) | |
| temp_file.close() | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| status_msg = f"Successfully exported {len(csv_rows)} character keyword entries to CSV." | |
| logging.info(status_msg) | |
| return status_msg, temp_file.name | |
| except Exception as e: | |
| error_msg = f"Error exporting keywords: {str(e)}" | |
| logging.error(error_msg) | |
| return error_msg, "" | |
| # | |
| # End of Character chat keyword functions | |
| ###################################################### | |
| # | |
| # End of Character_Chat_DB.py | |
| ####################################################################################################################### | |