Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import shutil | |
| import fitz # PyMuPDF | |
| from simple_rag import generate_data_store, ask_question, DATA_PATH, CHROMA_PATH # <- Added CHROMA_PATH | |
| # --- Page Configuration --- | |
| st.set_page_config(page_title="SEALION v3.5 8B RAG", layout="wide") | |
| # --- DARK MODE STYLING --- | |
| st.markdown(""" | |
| <style> | |
| body { background-color: var(--background-color); } | |
| .stChatBubble { | |
| padding: 0.8rem 1rem; | |
| border-radius: 0.6rem; | |
| margin-bottom: 1rem; | |
| } | |
| .user { | |
| background-color: var(--primary-color-bg); | |
| border-left: 4px solid #4285F4; | |
| } | |
| .bot { | |
| background-color: var(--secondary-color-bg); | |
| border-left: 4px solid #34A853; | |
| } | |
| .context { | |
| font-size: 0.9rem; | |
| color: var(--text-color); | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # --- Set up state --- | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if "dark_mode" not in st.session_state: | |
| st.session_state.dark_mode = False | |
| # --- Sidebar: Settings + Clear Vector Store --- | |
| with st.sidebar: | |
| st.title("βοΈ Settings") | |
| # --- Clear Chroma Vector Store Option --- | |
| if st.button("ποΈ Clear Vector Store"): | |
| try: | |
| if os.path.exists(CHROMA_PATH): | |
| shutil.rmtree(CHROMA_PATH) | |
| st.success("β Vector store cleared.") | |
| else: | |
| st.info("βΉοΈ No vector store found.") | |
| except Exception as e: | |
| st.error(f"β Failed to clear vector store: {e}") | |
| # --- Title --- | |
| st.title("π¬ DocChat β Ask Anything From Your PDFs") | |
| # --- File Upload Sidebar --- | |
| with st.sidebar: | |
| st.subheader("π Uploaded PDFs") | |
| uploaded_files = st.file_uploader( | |
| "Upload PDF files", type=["pdf"], accept_multiple_files=True | |
| ) | |
| def render_pdf_preview(pdf_path, highlights=None, max_pages=3): | |
| doc = fitz.open(pdf_path) | |
| images = [] | |
| for page_num in range(min(len(doc), max_pages)): | |
| page = doc.load_page(page_num) | |
| if highlights: | |
| for h in highlights: | |
| if h["filename"] == os.path.basename(pdf_path) and h["page"] == page_num + 1: | |
| areas = page.search_for(h["text"], hit_max=5) | |
| for rect in areas: | |
| page.draw_rect(rect, color=(1, 1, 0), fill=(1, 1, 0), overlay=True) | |
| pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) | |
| img_bytes = pix.tobytes("png") | |
| images.append(img_bytes) | |
| doc.close() | |
| return images | |
| if uploaded_files: | |
| if os.path.exists(DATA_PATH): | |
| shutil.rmtree(DATA_PATH) | |
| os.makedirs(DATA_PATH, exist_ok=True) | |
| file_names = [] | |
| for uploaded_file in uploaded_files: | |
| file_path = os.path.join(DATA_PATH, uploaded_file.name) | |
| file_names.append(uploaded_file.name) | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.read()) | |
| st.success(f"Uploaded {len(uploaded_files)} file(s).") | |
| if st.button("β‘ Generate Vector Store"): | |
| with st.spinner("Processing and indexing..."): | |
| generate_data_store() | |
| st.success("β Knowledge base ready!") | |
| st.markdown("### π§Ύ File Manager") | |
| for name in file_names: | |
| st.markdown(f"- π `{name}`") | |
| with st.expander("πΌ Preview Pages"): | |
| file_path = os.path.join(DATA_PATH, name) | |
| file_highlights = [] | |
| for chat in st.session_state.chat_history: | |
| for chunk in chat["context"]: | |
| if chunk["filename"] == name: | |
| file_highlights.append(chunk) | |
| images = render_pdf_preview(file_path, highlights=file_highlights) | |
| for img in images: | |
| st.image(img, use_container_width=True) | |
| # --- Main Chat Section --- | |
| st.markdown("### π§ Ask Your Question") | |
| query = st.text_input("Type your question here...", placeholder="e.g. What is the conclusion of the document?") | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| if st.button("π Get Answer"): | |
| if not query.strip(): | |
| st.warning("Please enter a question.") | |
| else: | |
| with st.spinner("Thinking..."): | |
| answer, context_chunks = ask_question(query) | |
| st.session_state.chat_history.append({ | |
| "question": query, | |
| "answer": answer, | |
| "context": context_chunks | |
| }) | |
| with col2: | |
| if st.button("ποΈ Clear Chat"): | |
| st.session_state.chat_history = [] | |
| # --- Display Chat History --- | |
| if st.session_state.chat_history: | |
| st.markdown("### ποΈ Chat History") | |
| for chat in reversed(st.session_state.chat_history): | |
| st.markdown(f'<div class="stChatBubble user"><b>Q:</b> {chat["question"]}</div>', unsafe_allow_html=True) | |
| st.markdown(f'<div class="stChatBubble bot"><b>A:</b> {chat["answer"]}</div>', unsafe_allow_html=True) | |
| with st.expander("π§Ύ View Context Used"): | |
| for chunk in chat["context"]: | |
| st.markdown(f""" | |
| <div style="background-color: #fef9c3; padding: 10px; border-radius: 6px; margin-bottom: 10px;"> | |
| <b>π {chunk['filename']} β Page {chunk['page']}</b> | |
| <pre style="white-space: pre-wrap;">{chunk['text']}</pre> | |
| </div> | |
| """, unsafe_allow_html=True) |