Spaces:
Runtime error
Runtime error
| # Prompts_DB.py | |
| # Description: Functions to manage the prompts database. | |
| # | |
| # Imports | |
| import sqlite3 | |
| import logging | |
| # | |
| # External Imports | |
| import re | |
| from typing import Tuple | |
| # | |
| # Local Imports | |
| from App_Function_Libraries.Utils.Utils import get_database_path | |
| # | |
| ####################################################################################################################### | |
| # | |
| # Functions to manage prompts DB | |
| def create_prompts_db(): | |
| logging.debug("create_prompts_db: Creating prompts database.") | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| cursor.executescript(''' | |
| CREATE TABLE IF NOT EXISTS Prompts ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL UNIQUE, | |
| author TEXT, | |
| details TEXT, | |
| system TEXT, | |
| user TEXT | |
| ); | |
| CREATE TABLE IF NOT EXISTS Keywords ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| keyword TEXT NOT NULL UNIQUE COLLATE NOCASE | |
| ); | |
| CREATE TABLE IF NOT EXISTS PromptKeywords ( | |
| prompt_id INTEGER, | |
| keyword_id INTEGER, | |
| FOREIGN KEY (prompt_id) REFERENCES Prompts (id), | |
| FOREIGN KEY (keyword_id) REFERENCES Keywords (id), | |
| PRIMARY KEY (prompt_id, keyword_id) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_keywords_keyword ON Keywords(keyword); | |
| CREATE INDEX IF NOT EXISTS idx_promptkeywords_prompt_id ON PromptKeywords(prompt_id); | |
| CREATE INDEX IF NOT EXISTS idx_promptkeywords_keyword_id ON PromptKeywords(keyword_id); | |
| ''') | |
| # FIXME - dirty hack that should be removed later... | |
| # Migration function to add the 'author' column to the Prompts table | |
| def add_author_column_to_prompts(): | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| # Check if 'author' column already exists | |
| cursor.execute("PRAGMA table_info(Prompts)") | |
| columns = [col[1] for col in cursor.fetchall()] | |
| if 'author' not in columns: | |
| # Add the 'author' column | |
| cursor.execute('ALTER TABLE Prompts ADD COLUMN author TEXT') | |
| print("Author column added to Prompts table.") | |
| else: | |
| print("Author column already exists in Prompts table.") | |
| add_author_column_to_prompts() | |
| def normalize_keyword(keyword): | |
| return re.sub(r'\s+', ' ', keyword.strip().lower()) | |
| # FIXME - update calls to this function to use the new args | |
| def add_prompt(name, author, details, system=None, user=None, keywords=None): | |
| logging.debug(f"add_prompt: Adding prompt with name: {name}, author: {author}, system: {system}, user: {user}, keywords: {keywords}") | |
| if not name: | |
| logging.error("add_prompt: A name is required.") | |
| return "A name is required." | |
| try: | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO Prompts (name, author, details, system, user) | |
| VALUES (?, ?, ?, ?, ?) | |
| ''', (name, author, details, system, user)) | |
| prompt_id = cursor.lastrowid | |
| if keywords: | |
| normalized_keywords = [normalize_keyword(k) for k in keywords if k.strip()] | |
| for keyword in set(normalized_keywords): # Use set to remove duplicates | |
| cursor.execute(''' | |
| INSERT OR IGNORE INTO Keywords (keyword) VALUES (?) | |
| ''', (keyword,)) | |
| cursor.execute('SELECT id FROM Keywords WHERE keyword = ?', (keyword,)) | |
| keyword_id = cursor.fetchone()[0] | |
| cursor.execute(''' | |
| INSERT OR IGNORE INTO PromptKeywords (prompt_id, keyword_id) VALUES (?, ?) | |
| ''', (prompt_id, keyword_id)) | |
| return "Prompt added successfully." | |
| except sqlite3.IntegrityError: | |
| return "Prompt with this name already exists." | |
| except sqlite3.Error as e: | |
| return f"Database error: {e}" | |
| def fetch_prompt_details(name): | |
| logging.debug(f"fetch_prompt_details: Fetching details for prompt: {name}") | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT p.name, p.author, p.details, p.system, p.user, GROUP_CONCAT(k.keyword, ', ') as keywords | |
| FROM Prompts p | |
| LEFT JOIN PromptKeywords pk ON p.id = pk.prompt_id | |
| LEFT JOIN Keywords k ON pk.keyword_id = k.id | |
| WHERE p.name = ? | |
| GROUP BY p.id | |
| ''', (name,)) | |
| return cursor.fetchone() | |
| def list_prompts(page=1, per_page=10): | |
| logging.debug(f"list_prompts: Listing prompts for page {page} with {per_page} prompts per page.") | |
| offset = (page - 1) * per_page | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute('SELECT name FROM Prompts LIMIT ? OFFSET ?', (per_page, offset)) | |
| prompts = [row[0] for row in cursor.fetchall()] | |
| # Get total count of prompts | |
| cursor.execute('SELECT COUNT(*) FROM Prompts') | |
| total_count = cursor.fetchone()[0] | |
| total_pages = (total_count + per_page - 1) // per_page | |
| return prompts, total_pages, page | |
| def insert_prompt_to_db(title, author, description, system_prompt, user_prompt, keywords=None): | |
| return add_prompt(title, author, description, system_prompt, user_prompt, keywords) | |
| def get_prompt_db_connection(): | |
| prompt_db_path = get_database_path('prompts.db') | |
| return sqlite3.connect(prompt_db_path) | |
| def search_prompts(query): | |
| logging.debug(f"search_prompts: Searching prompts with query: {query}") | |
| try: | |
| with get_prompt_db_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT p.name, p.details, p.system, p.user, GROUP_CONCAT(k.keyword, ', ') as keywords | |
| FROM Prompts p | |
| LEFT JOIN PromptKeywords pk ON p.id = pk.prompt_id | |
| LEFT JOIN Keywords k ON pk.keyword_id = k.id | |
| WHERE p.name LIKE ? OR p.details LIKE ? OR p.system LIKE ? OR p.user LIKE ? OR k.keyword LIKE ? | |
| GROUP BY p.id | |
| ORDER BY p.name | |
| """, (f'%{query}%', f'%{query}%', f'%{query}%', f'%{query}%', f'%{query}%')) | |
| return cursor.fetchall() | |
| except sqlite3.Error as e: | |
| logging.error(f"Error searching prompts: {e}") | |
| return [] | |
| def search_prompts_by_keyword(keyword, page=1, per_page=10): | |
| logging.debug(f"search_prompts_by_keyword: Searching prompts by keyword: {keyword}") | |
| normalized_keyword = normalize_keyword(keyword) | |
| offset = (page - 1) * per_page | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT DISTINCT p.name | |
| FROM Prompts p | |
| JOIN PromptKeywords pk ON p.id = pk.prompt_id | |
| JOIN Keywords k ON pk.keyword_id = k.id | |
| WHERE k.keyword LIKE ? | |
| LIMIT ? OFFSET ? | |
| ''', ('%' + normalized_keyword + '%', per_page, offset)) | |
| prompts = [row[0] for row in cursor.fetchall()] | |
| # Get total count of matching prompts | |
| cursor.execute(''' | |
| SELECT COUNT(DISTINCT p.id) | |
| FROM Prompts p | |
| JOIN PromptKeywords pk ON p.id = pk.prompt_id | |
| JOIN Keywords k ON pk.keyword_id = k.id | |
| WHERE k.keyword LIKE ? | |
| ''', ('%' + normalized_keyword + '%',)) | |
| total_count = cursor.fetchone()[0] | |
| total_pages = (total_count + per_page - 1) // per_page | |
| return prompts, total_pages, page | |
| def update_prompt_keywords(prompt_name, new_keywords): | |
| logging.debug(f"update_prompt_keywords: Updating keywords for prompt: {prompt_name}") | |
| try: | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute('SELECT id FROM Prompts WHERE name = ?', (prompt_name,)) | |
| prompt_id = cursor.fetchone() | |
| if not prompt_id: | |
| return "Prompt not found." | |
| prompt_id = prompt_id[0] | |
| cursor.execute('DELETE FROM PromptKeywords WHERE prompt_id = ?', (prompt_id,)) | |
| normalized_keywords = [normalize_keyword(k) for k in new_keywords if k.strip()] | |
| for keyword in set(normalized_keywords): # Use set to remove duplicates | |
| cursor.execute('INSERT OR IGNORE INTO Keywords (keyword) VALUES (?)', (keyword,)) | |
| cursor.execute('SELECT id FROM Keywords WHERE keyword = ?', (keyword,)) | |
| keyword_id = cursor.fetchone()[0] | |
| cursor.execute('INSERT INTO PromptKeywords (prompt_id, keyword_id) VALUES (?, ?)', | |
| (prompt_id, keyword_id)) | |
| # Remove unused keywords | |
| cursor.execute(''' | |
| DELETE FROM Keywords | |
| WHERE id NOT IN (SELECT DISTINCT keyword_id FROM PromptKeywords) | |
| ''') | |
| return "Keywords updated successfully." | |
| except sqlite3.Error as e: | |
| return f"Database error: {e}" | |
| def add_or_update_prompt(title, author, description, system_prompt, user_prompt, keywords=None): | |
| logging.debug(f"add_or_update_prompt: Adding or updating prompt: {title}") | |
| if not title: | |
| return "Error: Title is required." | |
| existing_prompt = fetch_prompt_details(title) | |
| if existing_prompt: | |
| # Update existing prompt | |
| result = update_prompt_in_db(title, author, description, system_prompt, user_prompt) | |
| if "successfully" in result: | |
| # Update keywords if the prompt update was successful | |
| keyword_result = update_prompt_keywords(title, keywords or []) | |
| result += f" {keyword_result}" | |
| else: | |
| # Insert new prompt | |
| result = insert_prompt_to_db(title, author, description, system_prompt, user_prompt, keywords) | |
| return result | |
| def load_prompt_details(selected_prompt): | |
| logging.debug(f"load_prompt_details: Loading prompt details for {selected_prompt}") | |
| if selected_prompt: | |
| details = fetch_prompt_details(selected_prompt) | |
| if details: | |
| return details[0], details[1], details[2], details[3], details[4], details[5] | |
| return "", "", "", "", "", "" | |
| def update_prompt_in_db(title, author, description, system_prompt, user_prompt): | |
| logging.debug(f"update_prompt_in_db: Updating prompt: {title}") | |
| try: | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "UPDATE Prompts SET author = ?, details = ?, system = ?, user = ? WHERE name = ?", | |
| (author, description, system_prompt, user_prompt, title) | |
| ) | |
| if cursor.rowcount == 0: | |
| return "No prompt found with the given title." | |
| return "Prompt updated successfully!" | |
| except sqlite3.Error as e: | |
| return f"Error updating prompt: {e}" | |
| def delete_prompt(prompt_id): | |
| logging.debug(f"delete_prompt: Deleting prompt with ID: {prompt_id}") | |
| try: | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| # Delete associated keywords | |
| cursor.execute("DELETE FROM PromptKeywords WHERE prompt_id = ?", (prompt_id,)) | |
| # Delete the prompt | |
| cursor.execute("DELETE FROM Prompts WHERE id = ?", (prompt_id,)) | |
| if cursor.rowcount == 0: | |
| return f"No prompt found with ID {prompt_id}" | |
| else: | |
| conn.commit() | |
| return f"Prompt with ID {prompt_id} has been successfully deleted" | |
| except sqlite3.Error as e: | |
| return f"An error occurred: {e}" | |
| def delete_prompt_keyword(keyword: str) -> str: | |
| """ | |
| Delete a keyword and its associations from the prompts database. | |
| Args: | |
| keyword (str): The keyword to delete | |
| Returns: | |
| str: Success/failure message | |
| """ | |
| logging.debug(f"delete_prompt_keyword: Deleting keyword: {keyword}") | |
| try: | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| # First normalize the keyword | |
| normalized_keyword = normalize_keyword(keyword) | |
| # Get the keyword ID | |
| cursor.execute("SELECT id FROM Keywords WHERE keyword = ?", (normalized_keyword,)) | |
| result = cursor.fetchone() | |
| if not result: | |
| return f"Keyword '{keyword}' not found." | |
| keyword_id = result[0] | |
| # Delete keyword associations from PromptKeywords | |
| cursor.execute("DELETE FROM PromptKeywords WHERE keyword_id = ?", (keyword_id,)) | |
| # Delete the keyword itself | |
| cursor.execute("DELETE FROM Keywords WHERE id = ?", (keyword_id,)) | |
| # Get the number of affected prompts | |
| affected_prompts = cursor.rowcount | |
| conn.commit() | |
| logging.info(f"Keyword '{keyword}' deleted successfully") | |
| return f"Successfully deleted keyword '{keyword}' and removed it from {affected_prompts} prompts." | |
| except sqlite3.Error as e: | |
| error_msg = f"Database error deleting keyword: {str(e)}" | |
| logging.error(error_msg) | |
| return error_msg | |
| except Exception as e: | |
| error_msg = f"Error deleting keyword: {str(e)}" | |
| logging.error(error_msg) | |
| return error_msg | |
| def export_prompt_keywords_to_csv() -> Tuple[str, str]: | |
| """ | |
| Export all prompt keywords to a CSV file with associated metadata. | |
| Returns: | |
| Tuple[str, str]: (status_message, file_path) | |
| """ | |
| import csv | |
| import tempfile | |
| import os | |
| from datetime import datetime | |
| logging.debug("export_prompt_keywords_to_csv: Starting export") | |
| try: | |
| # Create a temporary file with a specific name in the system's temp directory | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| temp_dir = tempfile.gettempdir() | |
| file_path = os.path.join(temp_dir, f'prompt_keywords_export_{timestamp}.csv') | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| # Get keywords with related prompt information | |
| query = ''' | |
| SELECT | |
| k.keyword, | |
| GROUP_CONCAT(p.name, ' | ') as prompt_names, | |
| COUNT(DISTINCT p.id) as num_prompts, | |
| GROUP_CONCAT(DISTINCT p.author, ' | ') as authors | |
| FROM Keywords k | |
| LEFT JOIN PromptKeywords pk ON k.id = pk.keyword_id | |
| LEFT JOIN Prompts p ON pk.prompt_id = p.id | |
| GROUP BY k.id, k.keyword | |
| ORDER BY k.keyword | |
| ''' | |
| cursor.execute(query) | |
| results = cursor.fetchall() | |
| # Write to CSV | |
| with open(file_path, 'w', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.writer(csvfile) | |
| writer.writerow([ | |
| 'Keyword', | |
| 'Associated Prompts', | |
| 'Number of Prompts', | |
| 'Authors' | |
| ]) | |
| for row in results: | |
| writer.writerow([ | |
| row[0], # keyword | |
| row[1] if row[1] else '', # prompt_names (may be None) | |
| row[2], # num_prompts | |
| row[3] if row[3] else '' # authors (may be None) | |
| ]) | |
| status_msg = f"Successfully exported {len(results)} prompt keywords to CSV." | |
| logging.info(status_msg) | |
| return status_msg, file_path | |
| except sqlite3.Error as e: | |
| error_msg = f"Database error exporting keywords: {str(e)}" | |
| logging.error(error_msg) | |
| return error_msg, "None" | |
| except Exception as e: | |
| error_msg = f"Error exporting keywords: {str(e)}" | |
| logging.error(error_msg) | |
| return error_msg, "None" | |
| def view_prompt_keywords() -> str: | |
| """ | |
| View all keywords currently in the prompts database. | |
| Returns: | |
| str: Markdown formatted string of all keywords | |
| """ | |
| logging.debug("view_prompt_keywords: Retrieving all keywords") | |
| try: | |
| with sqlite3.connect(get_database_path('prompts.db')) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT k.keyword, COUNT(DISTINCT pk.prompt_id) as prompt_count | |
| FROM Keywords k | |
| LEFT JOIN PromptKeywords pk ON k.id = pk.keyword_id | |
| GROUP BY k.id, k.keyword | |
| ORDER BY k.keyword | |
| """) | |
| keywords = cursor.fetchall() | |
| if keywords: | |
| keyword_list = [f"- {k[0]} ({k[1]} prompts)" for k in keywords] | |
| return "### Current Prompt Keywords:\n" + "\n".join(keyword_list) | |
| return "No keywords found." | |
| except Exception as e: | |
| error_msg = f"Error retrieving keywords: {str(e)}" | |
| logging.error(error_msg) | |
| return error_msg | |
| def export_prompts( | |
| export_format='csv', | |
| filter_keywords=None, | |
| include_system=True, | |
| include_user=True, | |
| include_details=True, | |
| include_author=True, | |
| include_keywords=True, | |
| markdown_template=None | |
| ) -> Tuple[str, str]: | |
| """ | |
| Export prompts to CSV or Markdown with configurable options. | |
| Args: | |
| export_format (str): 'csv' or 'markdown' | |
| filter_keywords (List[str], optional): Keywords to filter prompts by | |
| include_system (bool): Include system prompts in export | |
| include_user (bool): Include user prompts in export | |
| include_details (bool): Include prompt details/descriptions | |
| include_author (bool): Include author information | |
| include_keywords (bool): Include associated keywords | |
| markdown_template (str, optional): Template for markdown export | |
| Returns: | |
| Tuple[str, str]: (status_message, file_path) | |
| """ | |
| import csv | |
| import tempfile | |
| import os | |
| import zipfile | |
| from datetime import datetime | |
| try: | |
| # Get prompts data | |
| with get_prompt_db_connection() as conn: | |
| cursor = conn.cursor() | |
| # Build query based on included fields | |
| select_fields = ['p.name'] | |
| if include_author: | |
| select_fields.append('p.author') | |
| if include_details: | |
| select_fields.append('p.details') | |
| if include_system: | |
| select_fields.append('p.system') | |
| if include_user: | |
| select_fields.append('p.user') | |
| query = f""" | |
| SELECT DISTINCT {', '.join(select_fields)} | |
| FROM Prompts p | |
| """ | |
| # Add keyword filtering if specified | |
| if filter_keywords: | |
| placeholders = ','.join(['?' for _ in filter_keywords]) | |
| query += f""" | |
| JOIN PromptKeywords pk ON p.id = pk.prompt_id | |
| JOIN Keywords k ON pk.keyword_id = k.id | |
| WHERE k.keyword IN ({placeholders}) | |
| """ | |
| cursor.execute(query, filter_keywords if filter_keywords else ()) | |
| prompts = cursor.fetchall() | |
| # Get keywords for each prompt if needed | |
| if include_keywords: | |
| prompt_keywords = {} | |
| for prompt in prompts: | |
| cursor.execute(""" | |
| SELECT k.keyword | |
| FROM Keywords k | |
| JOIN PromptKeywords pk ON k.id = pk.keyword_id | |
| JOIN Prompts p ON pk.prompt_id = p.id | |
| WHERE p.name = ? | |
| """, (prompt[0],)) | |
| prompt_keywords[prompt[0]] = [row[0] for row in cursor.fetchall()] | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| if export_format == 'csv': | |
| # Export as CSV | |
| temp_file = os.path.join(tempfile.gettempdir(), f'prompts_export_{timestamp}.csv') | |
| with open(temp_file, 'w', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.writer(csvfile) | |
| # Write header | |
| header = ['Name'] | |
| if include_author: | |
| header.append('Author') | |
| if include_details: | |
| header.append('Details') | |
| if include_system: | |
| header.append('System Prompt') | |
| if include_user: | |
| header.append('User Prompt') | |
| if include_keywords: | |
| header.append('Keywords') | |
| writer.writerow(header) | |
| # Write data | |
| for prompt in prompts: | |
| row = list(prompt) | |
| if include_keywords: | |
| row.append(', '.join(prompt_keywords.get(prompt[0], []))) | |
| writer.writerow(row) | |
| return f"Successfully exported {len(prompts)} prompts to CSV.", temp_file | |
| else: | |
| # Export as Markdown files in ZIP | |
| temp_dir = tempfile.mkdtemp() | |
| zip_path = os.path.join(tempfile.gettempdir(), f'prompts_export_{timestamp}.zip') | |
| # Define markdown templates | |
| templates = { | |
| "Basic Template": """# {title} | |
| {author_section} | |
| {details_section} | |
| {system_section} | |
| {user_section} | |
| {keywords_section} | |
| """, | |
| "Detailed Template": """# {title} | |
| ## Author | |
| {author_section} | |
| ## Description | |
| {details_section} | |
| ## System Prompt | |
| {system_section} | |
| ## User Prompt | |
| {user_section} | |
| ## Keywords | |
| {keywords_section} | |
| """ | |
| } | |
| template = templates.get(markdown_template, markdown_template or templates["Basic Template"]) | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for prompt in prompts: | |
| # Create markdown content | |
| md_content = template.format( | |
| title=prompt[0], | |
| author_section=f"Author: {prompt[1]}" if include_author else "", | |
| details_section=prompt[2] if include_details else "", | |
| system_section=prompt[3] if include_system else "", | |
| user_section=prompt[4] if include_user else "", | |
| keywords_section=', '.join(prompt_keywords.get(prompt[0], [])) if include_keywords else "" | |
| ) | |
| # Create safe filename | |
| safe_filename = re.sub(r'[^\w\-_\. ]', '_', prompt[0]) | |
| md_path = os.path.join(temp_dir, f"{safe_filename}.md") | |
| # Write markdown file | |
| with open(md_path, 'w', encoding='utf-8') as f: | |
| f.write(md_content) | |
| # Add to ZIP | |
| zipf.write(md_path, os.path.basename(md_path)) | |
| return f"Successfully exported {len(prompts)} prompts to Markdown files.", zip_path | |
| except Exception as e: | |
| error_msg = f"Error exporting prompts: {str(e)}" | |
| logging.error(error_msg) | |
| return error_msg, "None" | |
| create_prompts_db() | |
| # | |
| # End of Propmts_DB.py | |
| ####################################################################################################################### | |