Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import pyperclip | |
| from pathlib import Path | |
| from src.fabrics_processor.database import initialize_qdrant_database | |
| from src.fabrics_processor.database_updater import update_qdrant_database | |
| from src.fabrics_processor.file_change_detector import detect_file_changes | |
| from src.search_qdrant.database_query import query_qdrant_database | |
| from src.fabrics_processor.obsidian2fabric import sync_folders | |
| from src.fabrics_processor.logger import setup_logger | |
| import logging | |
| import atexit | |
| from src.fabrics_processor.config import config | |
| from src.fabrics_processor.deduplicator import remove_duplicates | |
| # Configure logging | |
| logger = setup_logger() | |
| def init_session_state(): | |
| """Initialize session state variables.""" | |
| if 'client' not in st.session_state: | |
| client = initialize_qdrant_database(api_key=st.secrets["api_key"]) | |
| st.session_state.client = client | |
| # Register cleanup function | |
| atexit.register(lambda: client.close() if hasattr(client, '_transport') else None) | |
| if 'selected_prompts' not in st.session_state: | |
| st.session_state.selected_prompts = [] | |
| if 'comparing' not in st.session_state: | |
| st.session_state.comparing = False | |
| if 'comparison_selected' not in st.session_state: | |
| st.session_state.comparison_selected = None | |
| def show_comparison_view(prompts): | |
| """Show a full-width comparison view of the selected prompts.""" | |
| st.write("## Compare Selected Prompts") | |
| # Add the back button at the top | |
| if st.button("Back to search"): | |
| st.session_state.comparing = False | |
| st.rerun() | |
| # Create columns for each prompt | |
| cols = st.columns(len(prompts)) | |
| # Track which prompt is selected for copying | |
| selected_idx = None | |
| for idx, (col, prompt) in enumerate(zip(cols, prompts)): | |
| with col: | |
| st.markdown(f"### {prompt.metadata['filename']}") | |
| # Create two columns for trigger and button | |
| trigger_col, button_col = st.columns([0.7, 0.3]) | |
| with trigger_col: | |
| # Add trigger field | |
| current_trigger = prompt.metadata.get('trigger', '') | |
| new_trigger = st.text_input("Trigger", | |
| value=current_trigger, | |
| key=f"trigger_{idx}") | |
| # Update trigger if changed | |
| if new_trigger != current_trigger: | |
| try: | |
| st.session_state.client.set_payload( | |
| collection_name=config.embedding.collection_name, | |
| payload={"trigger": new_trigger}, | |
| points=[prompt.id] | |
| ) | |
| st.success(f"Updated trigger to: {new_trigger}") | |
| except Exception as e: | |
| st.error(f"Failed to update trigger: {str(e)}") | |
| with button_col: | |
| # Align button with text input using empty space | |
| st.write("") # This creates some vertical space | |
| if st.button(f"Use this prompt", key=f"compare_use_{idx}"): | |
| selected_idx = idx | |
| # Display content as markdown | |
| st.markdown("### Content") | |
| st.markdown(prompt.metadata["content"]) | |
| # Handle selection | |
| if selected_idx is not None: | |
| pyperclip.copy(prompts[selected_idx].metadata['content']) | |
| st.success(f"Copied {prompts[selected_idx].metadata['filename']} to clipboard!") | |
| # Clear comparison view | |
| st.session_state.comparing = False | |
| st.rerun() | |
| def search_interface(): | |
| """Show the search interface.""" | |
| if st.session_state.comparing: | |
| show_comparison_view(st.session_state.selected_prompts) | |
| return | |
| st.subheader("Search for prompts") | |
| query = st.text_area("What are you trying to accomplish? I will then search for good prompts to give you a good start.") | |
| if query: | |
| try: | |
| results = query_qdrant_database( | |
| query=query, | |
| client=st.session_state.client, | |
| num_results=5, | |
| collection_name=config.embedding.collection_name | |
| ) | |
| if results: | |
| st.write("Which prompts would you like to investigate? Max 3.") | |
| # Create checkboxes for selection | |
| selected = [] | |
| for r in results: | |
| if st.checkbox(f"{r.metadata['filename']}", key=f"select_{r.id}"): | |
| selected.append(r) | |
| st.session_state.selected_prompts = selected | |
| if selected: | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Use: copy to clipboard"): | |
| if len(selected) == 1: | |
| pyperclip.copy(selected[0].metadata['content']) | |
| st.success("Copied to clipboard!") | |
| with col2: | |
| if len(selected) > 1 and st.button("Compare"): | |
| st.session_state.comparing = True | |
| st.rerun() | |
| except Exception as e: | |
| logger.error(f"Error in search_interface: {e}", exc_info=True) | |
| st.error(f"Error searching database: {e}") | |
| def update_database(): | |
| """Update the markdown folder with prompt files from Obsidian. | |
| Then update the Qdrant database. | |
| Finally based on the Qdrant database create a new espanso YAML file and | |
| the Obsidian Textgenerator markdown files.""" | |
| try: | |
| with st.spinner("Processing markdown files..."): | |
| # First check if there are any changes in the prompt files in Obsidian. | |
| # If so, add them to the markdown folder before updating the database. | |
| sync_folders(source_dir=Path(config.obsidian_input_folder), target_dir=Path(config.fabric_patterns_folder)) | |
| # Get current collection info | |
| collection_info = st.session_state.client.get_collection(config.embedding.collection_name) | |
| initial_points = collection_info.points_count | |
| # Detect file changes | |
| new_files, modified_files, deleted_files = detect_file_changes( | |
| client=st.session_state.client, | |
| fabric_patterns_folder=config.fabric_patterns_folder | |
| ) | |
| # Update the database if there are changes | |
| if any([new_files, modified_files, deleted_files]): | |
| st.info("Changes detected. Updating database...") | |
| update_qdrant_database( | |
| client=st.session_state.client, | |
| collection_name=config.embedding.collection_name, | |
| new_files=new_files, | |
| modified_files=modified_files, | |
| deleted_files=deleted_files | |
| ) | |
| else: | |
| st.info("No changes detected in input folders.") | |
| # Create a separate section for deduplication - ALWAYS run this regardless of file changes | |
| st.subheader("Deduplication Process") | |
| with st.spinner("Checking for and removing duplicate entries..."): | |
| # Run the deduplication process | |
| duplicates_removed = remove_duplicates( | |
| client=st.session_state.client, | |
| collection_name=config.embedding.collection_name | |
| ) | |
| if duplicates_removed > 0: | |
| st.success(f"Successfully removed {duplicates_removed} duplicate entries from the database") | |
| else: | |
| st.info("No duplicate entries found in the database") | |
| # Get updated collection info | |
| collection_info = st.session_state.client.get_collection(config.embedding.collection_name) | |
| final_points = collection_info.points_count | |
| # Show summary | |
| st.success(f""" | |
| Database update completed successfully! | |
| Changes detected: | |
| - {len(new_files)} new files | |
| - {len(modified_files)} modified files | |
| - {len(deleted_files)} deleted files | |
| - {duplicates_removed} duplicate entries removed | |
| Database entries: | |
| - Initial: {initial_points} | |
| - Final: {final_points} | |
| """) | |
| except Exception as e: | |
| logger.error(f"Error updating database: {e}", exc_info=True) | |
| st.error(f"Error updating database: {e}") | |
| def display_trigger_table(): | |
| """Display the trigger table in the sidebar.""" | |
| with st.sidebar: | |
| # Add some space to push the table to the bottom | |
| st.markdown("<br>" * 10, unsafe_allow_html=True) | |
| # Create the table | |
| st.markdown(""" | |
| | trigger | description | | |
| |---------|-------------| | |
| | ;;c | code | | |
| | ;;s | summarize and extract | | |
| | ;;t | think | | |
| """) | |
| def main(): | |
| st.set_page_config( | |
| page_title="Fabric to Espanso Prompt Manager", | |
| layout="wide") | |
| init_session_state() | |
| # Sidebar | |
| with st.sidebar: | |
| # Add logo to sidebar | |
| image_path = Path(__file__).parent.parent.parent / "data" / "Fab2Esp_transparent.png" | |
| st.image(str(image_path), width=200, use_container_width=False) | |
| st.title("Prompt Manager") | |
| page = st.radio("Select Option:", ["Search for prompts", "Update database and prompt files"]) | |
| if st.button("Quit"): | |
| if hasattr(st.session_state.client, '_transport'): | |
| st.session_state.client.close() | |
| st.success("Database connection closed.") | |
| st.stop() | |
| # Main content | |
| if page == "Search for prompts": | |
| search_interface() | |
| else: | |
| st.subheader("Update Database") | |
| if st.button("Start Update"): | |
| update_database() | |
| # Add the trigger table at the end | |
| display_trigger_table() | |
| # Add credits at the bottom left | |
| st.markdown(""" | |
| <style> | |
| .credits { | |
| position: fixed; | |
| left: 1rem; | |
| bottom: 1rem; | |
| font-size: 0.8rem; | |
| color: #666; | |
| max-width: 600px; | |
| } | |
| </style> | |
| <div class="credits"> | |
| This tool searches the great list of prompts available at <a href="https://github.com/danielmiessler/fabric">https://github.com/danielmiessler/fabric</a>. | |
| A great commandline utilty build by Daniel Miessler to make the use of LLM more frictionless.<br> | |
| All credits to him and his fellow fabric builders. | |
| </div> | |
| """, unsafe_allow_html=True) | |
| if __name__ == "__main__": | |
| main() | |