Spaces:
Running
Running
| import streamlit as st | |
| from llama_cpp import Llama | |
| from huggingface_hub import hf_hub_download | |
| import os | |
| import gc | |
| import shutil | |
| import re | |
| # ----- Custom CSS for pretty formatting of internal reasoning ----- | |
| CUSTOM_CSS = """ | |
| <style> | |
| /* Styles for the internal reasoning bullet list */ | |
| ul.think-list { | |
| margin: 0.5em 0 1em 1.5em; | |
| padding: 0; | |
| list-style-type: disc; | |
| } | |
| ul.think-list li { | |
| margin-bottom: 0.5em; | |
| } | |
| /* Container style for the "in progress" internal reasoning */ | |
| .chat-assistant { | |
| background-color: #f9f9f9; | |
| padding: 1em; | |
| border-radius: 5px; | |
| margin-bottom: 1em; | |
| } | |
| </style> | |
| """ | |
| st.markdown(CUSTOM_CSS, unsafe_allow_html=True) | |
| # ----- Set a threshold for required free storage (in bytes) ----- | |
| REQUIRED_SPACE_BYTES = 5 * 1024 ** 3 # 5 GB | |
| # ----- Available models ----- | |
| MODELS = { | |
| "Qwen2.5-7B-Instruct (Q2_K)": { | |
| "repo_id": "Qwen/Qwen2.5-7B-Instruct-GGUF", | |
| "filename": "qwen2.5-7b-instruct-q2_k.gguf", | |
| "description": "Qwen2.5-7B Instruct (Q2_K)" | |
| }, | |
| "Gemma-3-4B-IT (Q4_K_M)": { | |
| "repo_id": "unsloth/gemma-3-4b-it-GGUF", | |
| "filename": "gemma-3-4b-it-Q4_K_M.gguf", | |
| "description": "Gemma 3 4B IT (Q4_K_M)" | |
| }, | |
| "Phi-4-mini-Instruct (Q4_K_M)": { | |
| "repo_id": "unsloth/Phi-4-mini-instruct-GGUF", | |
| "filename": "Phi-4-mini-instruct-Q4_K_M.gguf", | |
| "description": "Phi-4 Mini Instruct (Q4_K_M)" | |
| }, | |
| "Meta-Llama-3.1-8B-Instruct (Q2_K)": { | |
| "repo_id": "MaziyarPanahi/Meta-Llama-3.1-8B-Instruct-GGUF", | |
| "filename": "Meta-Llama-3.1-8B-Instruct.Q2_K.gguf", | |
| "description": "Meta-Llama-3.1-8B-Instruct (Q2_K)" | |
| }, | |
| "DeepSeek-R1-Distill-Llama-8B (Q2_K)": { | |
| "repo_id": "unsloth/DeepSeek-R1-Distill-Llama-8B-GGUF", | |
| "filename": "DeepSeek-R1-Distill-Llama-8B-Q2_K.gguf", | |
| "description": "DeepSeek-R1-Distill-Llama-8B (Q2_K)" | |
| }, | |
| "Mistral-7B-Instruct-v0.3 (IQ3_XS)": { | |
| "repo_id": "MaziyarPanahi/Mistral-7B-Instruct-v0.3-GGUF", | |
| "filename": "Mistral-7B-Instruct-v0.3.IQ3_XS.gguf", | |
| "description": "Mistral-7B-Instruct-v0.3 (IQ3_XS)" | |
| }, | |
| "Qwen2.5-Coder-7B-Instruct (Q2_K)": { | |
| "repo_id": "Qwen/Qwen2.5-Coder-7B-Instruct-GGUF", | |
| "filename": "qwen2.5-coder-7b-instruct-q2_k.gguf", | |
| "description": "Qwen2.5-Coder-7B-Instruct (Q2_K)" | |
| }, | |
| } | |
| # ----- Sidebar settings ----- | |
| with st.sidebar: | |
| st.header("⚙️ Settings") | |
| selected_model_name = st.selectbox("Select Model", list(MODELS.keys())) | |
| system_prompt = st.text_area("System Prompt", value="You are a helpful assistant.", height=80) | |
| max_tokens = st.slider("Max tokens", 64, 2048, 512, step=32) | |
| temperature = st.slider("Temperature", 0.1, 2.0, 0.7) | |
| top_k = st.slider("Top-K", 1, 100, 40) | |
| top_p = st.slider("Top-P", 0.1, 1.0, 0.95) | |
| repeat_penalty = st.slider("Repetition Penalty", 1.0, 2.0, 1.1) | |
| if st.button("🧹 Clear All Cached Models"): | |
| try: | |
| for f in os.listdir("models"): | |
| if f.endswith(".gguf"): | |
| os.remove(os.path.join("models", f)) | |
| st.success("Model cache cleared.") | |
| except Exception as e: | |
| st.error(f"Failed to clear models: {e}") | |
| if st.button("📦 Show Disk Usage"): | |
| try: | |
| usage = shutil.disk_usage(".") | |
| used = usage.used / (1024 ** 3) | |
| free = usage.free / (1024 ** 3) | |
| st.info(f"Disk Used: {used:.2f} GB | Free: {free:.2f} GB") | |
| except Exception as e: | |
| st.error(f"Disk usage error: {e}") | |
| # ----- Model info ----- | |
| selected_model = MODELS[selected_model_name] | |
| model_path = os.path.join("models", selected_model["filename"]) | |
| # ----- Session state initialization ----- | |
| if "model_name" not in st.session_state: | |
| st.session_state.model_name = None | |
| if "llm" not in st.session_state: | |
| st.session_state.llm = None | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if "pending_response" not in st.session_state: | |
| st.session_state.pending_response = False | |
| # ----- Ensure model directory exists ----- | |
| os.makedirs("models", exist_ok=True) | |
| # ----- Functions for model management ----- | |
| def cleanup_old_models(): | |
| for f in os.listdir("models"): | |
| if f.endswith(".gguf") and f != selected_model["filename"]: | |
| try: | |
| os.remove(os.path.join("models", f)) | |
| except Exception as e: | |
| st.warning(f"Couldn't delete old model {f}: {e}") | |
| def download_model(): | |
| with st.spinner(f"Downloading {selected_model['filename']}..."): | |
| hf_hub_download( | |
| repo_id=selected_model["repo_id"], | |
| filename=selected_model["filename"], | |
| local_dir="./models", | |
| local_dir_use_symlinks=False, # Deprecated parameter; harmless warning. | |
| ) | |
| def try_load_model(path): | |
| try: | |
| return Llama( | |
| model_path=path, | |
| n_ctx=1024, | |
| n_threads=2, | |
| n_threads_batch=2, | |
| n_batch=4, | |
| n_gpu_layers=0, | |
| use_mlock=False, | |
| use_mmap=True, | |
| verbose=False, | |
| ) | |
| except Exception as e: | |
| return str(e) | |
| def validate_or_download_model(): | |
| # Download model if not present locally. | |
| if not os.path.exists(model_path): | |
| free_space = shutil.disk_usage(".").free | |
| if free_space < REQUIRED_SPACE_BYTES: | |
| st.info("Insufficient storage detected. Cleaning up old models to free up space.") | |
| cleanup_old_models() | |
| download_model() | |
| result = try_load_model(model_path) | |
| if isinstance(result, str): | |
| st.warning(f"Initial load failed: {result}\nAttempting re-download...") | |
| try: | |
| os.remove(model_path) | |
| except Exception: | |
| pass | |
| free_space = shutil.disk_usage(".").free | |
| if free_space < REQUIRED_SPACE_BYTES: | |
| st.info("Insufficient storage detected on re-download attempt. Cleaning up old models to free up space.") | |
| cleanup_old_models() | |
| download_model() | |
| result = try_load_model(model_path) | |
| if isinstance(result, str): | |
| st.error(f"Model still failed after re-download: {result}") | |
| st.stop() | |
| return result | |
| return result | |
| # ----- Load model if changed ----- | |
| if st.session_state.model_name != selected_model_name: | |
| if st.session_state.llm is not None: | |
| del st.session_state.llm | |
| gc.collect() | |
| st.session_state.llm = validate_or_download_model() | |
| st.session_state.model_name = selected_model_name | |
| llm = st.session_state.llm | |
| # ----- Display title and caption ----- | |
| st.title(f"🧠 {selected_model['description']} (Streamlit + GGUF)") | |
| st.caption(f"Powered by `llama.cpp` | Model: {selected_model['filename']}") | |
| # ----- Render full chat history ----- | |
| for chat in st.session_state.chat_history: | |
| with st.chat_message(chat["role"]): | |
| st.markdown(chat["content"]) | |
| # For assistant messages, if there's completed internal reasoning, display it behind an expander. | |
| if chat.get("role") == "assistant" and chat.get("thinking"): | |
| with st.expander("🧠 Model's Internal Reasoning"): | |
| for t in chat["thinking"]: | |
| st.markdown(t.strip()) | |
| # ----- Chat input widget ----- | |
| user_input = st.chat_input("Ask something...") | |
| if user_input: | |
| if st.session_state.pending_response: | |
| st.warning("Please wait for the assistant to finish responding.") | |
| else: | |
| st.session_state.chat_history.append({"role": "user", "content": user_input}) | |
| with st.chat_message("user"): | |
| st.markdown(user_input) | |
| st.session_state.pending_response = True | |
| MAX_TURNS = 8 | |
| trimmed_history = st.session_state.chat_history[-(MAX_TURNS * 2):] | |
| messages = [{"role": "system", "content": system_prompt}] + trimmed_history | |
| # ----- Streaming the assistant response ----- | |
| with st.chat_message("assistant"): | |
| visible_placeholder = st.empty() | |
| thinking_placeholder = st.empty() | |
| full_response = "" | |
| stream = llm.create_chat_completion( | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| repeat_penalty=repeat_penalty, | |
| stream=True, | |
| ) | |
| for chunk in stream: | |
| if "choices" in chunk: | |
| delta = chunk["choices"][0]["delta"].get("content", "") | |
| full_response += delta | |
| # Determine if there is an open (in-progress) <think> block | |
| open_think = re.search(r"<think>([^<]*)$", full_response, flags=re.DOTALL) | |
| in_progress = open_think.group(1).strip() if open_think else "" | |
| # Create the visible response by removing any complete <think>...</think> blocks, | |
| # and also removing any in-progress (unclosed) <think> content. | |
| visible_response = re.sub(r"<think>.*?</think>", "", full_response, flags=re.DOTALL) | |
| visible_response = re.sub(r"<think>.*$", "", visible_response, flags=re.DOTALL) | |
| visible_placeholder.markdown(visible_response) | |
| # If there's an in-progress thinking part, display it in a pretty style | |
| if in_progress: | |
| # You can further format in_progress as you like; here we wrap it in a styled div. | |
| thinking_html = f""" | |
| <div class="chat-assistant"> | |
| <strong>Internal Reasoning (in progress):</strong> | |
| <br>{in_progress} | |
| </div> | |
| """ | |
| thinking_placeholder.markdown(thinking_html, unsafe_allow_html=True) | |
| else: | |
| thinking_placeholder.empty() | |
| # After streaming completes: | |
| # Extract all completed <think> blocks (the final internal reasoning that was closed) | |
| final_thinking = re.findall(r"<think>(.*?)</think>", full_response, flags=re.DOTALL) | |
| # The final visible response: remove any <think> blocks or any in-progress open block. | |
| final_visible = re.sub(r"<think>.*?</think>", "", full_response, flags=re.DOTALL) | |
| final_visible = re.sub(r"<think>.*$", "", final_visible, flags=re.DOTALL) | |
| st.session_state.chat_history.append({ | |
| "role": "assistant", | |
| "content": final_visible, | |
| "thinking": final_thinking | |
| }) | |
| st.session_state.pending_response = False | |