Spaces:
Sleeping
Sleeping
| # save as app.py | |
| """ | |
| Gradio streaming chat where: | |
| - user messages are visible in the UI, | |
| - system messages are hidden (kept for context), | |
| - assistant output is streamed and updates in-place. | |
| Requirements: | |
| - transformers | |
| - gradio | |
| - torch | |
| """ | |
| import threading | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer | |
| MODEL_ID = "EpistemeAI/gpt-oss-20b-RL" | |
| print("Loading tokenizer and model (this may take a while)...") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) | |
| # Use auto dtype & device mapping as requested | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| torch_dtype="auto", | |
| device_map="auto", | |
| ) | |
| model.eval() | |
| print("Model loaded. Example param device:", next(model.parameters()).device) | |
| # Thread-safe global history | |
| GLOBAL_HISTORY = [] # list of {"role": "system"|"user"|"assistant", "content": "..."} | |
| HISTORY_LOCK = threading.Lock() | |
| def build_prompt(system_message: str, history: list, user_message: str) -> str: | |
| """ | |
| Build prompt in the model's expected format. Adjust as needed. | |
| """ | |
| pieces = [] | |
| if system_message: | |
| pieces.append(f"<|system|>\n{system_message}\n") | |
| for turn in history: | |
| role = turn.get("role", "user") | |
| content = turn.get("content", "") | |
| pieces.append(f"<|{role}|>\n{content}\n") | |
| pieces.append(f"<|user|>\n{user_message}\n<|assistant|>\n") | |
| return "\n".join(pieces) | |
| def generate_stream(prompt: str, max_tokens: int, temperature: float, top_p: float): | |
| """ | |
| Stream partial strings via TextIteratorStreamer. | |
| """ | |
| inputs = tokenizer(prompt, return_tensors="pt") | |
| # Move input ids to model param device where possible (works with many accelerate setups) | |
| try: | |
| input_ids = inputs["input_ids"].to(next(model.parameters()).device) | |
| except Exception: | |
| input_ids = inputs["input_ids"] | |
| streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
| gen_kwargs = dict( | |
| input_ids=input_ids, | |
| max_new_tokens=int(max_tokens), | |
| do_sample=True, | |
| temperature=float(temperature), | |
| top_p=float(top_p), | |
| streamer=streamer, | |
| ) | |
| thread = threading.Thread(target=model.generate, kwargs=gen_kwargs) | |
| thread.start() | |
| partial = "" | |
| for token_str in streamer: | |
| partial += token_str | |
| yield partial | |
| def visible_messages_from_history(real_history: list, streaming_partial: str | None): | |
| """ | |
| Convert internal history into the list of OpenAI-style messages for Gradio UI. | |
| - Show user messages verbatim (visible). | |
| - Show assistant messages (streamed or final). | |
| - Omit system messages (kept only for model context). | |
| """ | |
| msgs = [] | |
| for entry in real_history: | |
| role = entry.get("role") | |
| content = entry.get("content", "") | |
| if role == "system": | |
| # hide system from UI | |
| continue | |
| # For assistant messages, we'll use content (may be empty) | |
| msgs.append({"role": role, "content": content or ("thinking..." if role == "assistant" else "")}) | |
| # If we're currently streaming an assistant response, ensure it's reflected as the last assistant msg | |
| if streaming_partial is not None: | |
| # If last message is assistant, replace its content, otherwise append a new (user, assistant) pair | |
| if msgs and msgs[-1]["role"] == "assistant": | |
| msgs[-1]["content"] = streaming_partial | |
| else: | |
| # The user message that started this assistant reply should already be in history and visible. | |
| # Append assistant partial as the reply | |
| msgs.append({"role": "assistant", "content": streaming_partial}) | |
| return msgs | |
| def respond_stream(user_message, system_message, max_tokens, temperature, top_p): | |
| """ | |
| Gradio streaming handler: | |
| - Append real user message + assistant placeholder to GLOBAL_HISTORY | |
| - Yield visible message lists as the assistant generates tokens | |
| """ | |
| # Add the user message and an assistant placeholder into the real history | |
| with HISTORY_LOCK: | |
| if system_message: | |
| # include system message in real history for model context (but it won't be shown) | |
| GLOBAL_HISTORY.append({"role": "system", "content": system_message}) | |
| GLOBAL_HISTORY.append({"role": "user", "content": user_message}) | |
| GLOBAL_HISTORY.append({"role": "assistant", "content": ""}) # placeholder | |
| snapshot = list(GLOBAL_HISTORY) | |
| # Immediately show user message and assistant placeholder ("thinking...") | |
| initial_display = visible_messages_from_history(snapshot, streaming_partial="thinking...") | |
| yield initial_display | |
| # Build prompt using the real history but exclude the last assistant placeholder's empty content | |
| with HISTORY_LOCK: | |
| prompt_history = [h for h in GLOBAL_HISTORY[:-1]] # all except the placeholder assistant | |
| prompt = build_prompt(system_message or "", prompt_history, user_message or "") | |
| # Stream generation and update the last assistant entry | |
| for partial in generate_stream(prompt, max_tokens, temperature, top_p): | |
| with HISTORY_LOCK: | |
| # update global last assistant content | |
| if GLOBAL_HISTORY and GLOBAL_HISTORY[-1]["role"] == "assistant": | |
| GLOBAL_HISTORY[-1]["content"] = partial | |
| snapshot = list(GLOBAL_HISTORY) | |
| display = visible_messages_from_history(snapshot, streaming_partial=partial) | |
| yield display | |
| # Finalize: ensure assistant final content is shown | |
| with HISTORY_LOCK: | |
| final_snapshot = list(GLOBAL_HISTORY) | |
| final_display = visible_messages_from_history(final_snapshot, streaming_partial=final_snapshot[-1].get("content", "")) | |
| yield final_display | |
| # --- Gradio UI --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown(f"**Model:** {MODEL_ID} — (system messages hidden; user visible)") | |
| chatbot = gr.Chatbot(elem_id="chatbot", label="Chat", type="messages", height=560) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| user_input = gr.Textbox(placeholder="Type a message and press Send", label="Your message") | |
| with gr.Column(scale=2): | |
| system_input = gr.Textbox(value="You are a Vibe Coder assistant.", label="System message (hidden from UI)") | |
| max_tokens = gr.Slider(minimum=1, maximum=4000, value=800, step=1, label="Max new tokens") | |
| temperature = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, step=0.01, label="Temperature") | |
| top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, step=0.05, label="Top-p (nucleus sampling)") | |
| send_btn = gr.Button("Send") | |
| send_btn.click( | |
| fn=respond_stream, | |
| inputs=[user_input, system_input, max_tokens, temperature, top_p], | |
| outputs=[chatbot], | |
| queue=True, | |
| ) | |
| clear_btn = gr.Button("Reset conversation") | |
| def reset_all(): | |
| with HISTORY_LOCK: | |
| GLOBAL_HISTORY.clear() | |
| return [] | |
| clear_btn.click(fn=reset_all, inputs=None, outputs=[chatbot]) | |
| gr.Markdown("Notes: model loading uses `device_map='auto'` and `torch_dtype='auto'`. " | |
| "If running multi-worker (gunicorn) you will need an external history store (Redis/DB).") | |
| if __name__ == "__main__": | |
| demo.launch() | |