Spaces:
Sleeping
Sleeping
| import os | |
| import streamlit as st | |
| import tempfile | |
| from pymongo import MongoClient | |
| from datetime import datetime | |
| from pathlib import Path | |
| from document_chunker import DocumentChunker | |
| from urllib.parse import quote_plus | |
| # === MongoDB connection via Hugging Face secrets === | |
| user = quote_plus(os.getenv("MONGO_USER")) | |
| password = quote_plus(os.getenv("MONGO_PASS")) | |
| cluster = os.getenv("MONGO_CLUSTER") | |
| db_name = os.environ.get("MONGO_DB", "grant_docs") | |
| mongo_uri = f"mongodb+srv://{user}:{password}@{cluster}/{db_name}?retryWrites=true&w=majority&tls=true&tlsAllowInvalidCertificates=true" | |
| client = MongoClient(mongo_uri, tls=True, tlsAllowInvalidCertificates=True, serverSelectionTimeoutMS=20000) | |
| db = client[db_name] | |
| st.set_page_config(page_title="Doc Chunker", layout="wide") | |
| def gate_ui(): | |
| APP_PASSWORD=st.secrets.get("APP_PASSWORD", os.getenv("APP_PASSWORD")).strip() | |
| if "authed" not in st.session_state: | |
| st.session_state.authed = False | |
| if not APP_PASSWORD: | |
| st.session_state.authed = True | |
| return True | |
| if st.session_state.authed: | |
| return True | |
| st.title("🔒 Document Chunker Login") | |
| pwd=st.text_input("Enter password", type="password") | |
| if st.button("Login"): | |
| if pwd==APP_PASSWORD: | |
| st.session_state.authed=True | |
| st.rerun() | |
| else: | |
| st.error("Incorrect password.") | |
| return False | |
| # === Streamlit UI === | |
| def main(): | |
| if not gate_ui(): | |
| return | |
| st.title("📄 Document Chunker & Uploader") | |
| with st.sidebar: | |
| st.header("Settings") | |
| # Fetch collection names for dropdown | |
| try: | |
| existing_categories = db["final_chunks"].distinct("collection_category") or [] | |
| except Exception: | |
| existing_categories = [] | |
| existing_categories=sorted([c for c in existing_categories if c])+["Create New Category"] | |
| selected_category = st.selectbox( | |
| "Choose Category (collection_category)", | |
| existing_categories, | |
| index=existing_categories.index("Create New Category") if "Create New Category" in existing_categories else 0 | |
| ) | |
| if selected_category == "Create New Category": | |
| selected_category = st.sidebar.text_input("Enter Category Name:") | |
| if not selected_category: | |
| st.warning("⚠️ Enter a category name to proceed.") | |
| st.stop() | |
| is_grant_app = st.toggle("Is this a Grant Application?", value=False) | |
| uploaded_file = st.file_uploader("Upload a DOCX, TXT, or PDF file", type=["docx", "txt", "pdf"]) | |
| if uploaded_file: | |
| temp_path = Path(tempfile.gettempdir()) / uploaded_file.name | |
| with open(temp_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| st.success(f"Uploaded `{uploaded_file.name}`") | |
| modified_time = datetime.now().isoformat() | |
| collection = db['final_chunks'] | |
| already = collection.find_one({ | |
| "metadata.title": uploaded_file.name, | |
| "collection_category": selected_category | |
| }) | |
| if already: | |
| st.warning(f"⚠️ `{uploaded_file.name}` already exists in category `{selected_category}`. Skipping…") | |
| else: | |
| st.write("⏳ Processing with DocumentChunker...") | |
| chunker = DocumentChunker() | |
| chunks = chunker.process_document(str(temp_path)) | |
| if chunks: | |
| for chunk in chunks: | |
| chunk['collection_category']=selected_category | |
| chunk['metadata'].update({ | |
| "title": uploaded_file.name, | |
| "uploaded_at": modified_time, | |
| "is_grant_app": is_grant_app, | |
| }) | |
| collection.insert_one(chunk) | |
| st.success(f"✅ {len(chunks)} chunks inserted into `final_chunks` (category: `{selected_category}`)") | |
| # Show a few previews | |
| for i, c in enumerate(chunks[:3]): | |
| st.subheader(f"Chunk {i+1}: {c['metadata'].get('header') or 'No Header'}") | |
| st.markdown(c['text'][:400] + "...") | |
| st.caption(f"Topics: {', '.join(c['metadata']['topics'])} | Category: {c['metadata']['category']}") | |
| st.progress(c['metadata']['confidence_score']) | |
| if len(chunks) > 3: | |
| st.info(f"... and {len(chunks)-3} more chunks processed.") | |
| else: | |
| st.warning("⚠️ No chunks were generated.") | |
| if __name__ == "__main__": | |
| main() | |
| # try: | |
| # os.remove(temp_path) | |
| # except Exception as e: | |
| # st.warning(f"⚠️ Could not delete temp file: {e}") | |