# save as app.py """ Gradio streaming chat where: - user messages are visible in the UI, - system messages are hidden (kept for context), - assistant output is streamed and updates in-place. Requirements: - transformers - gradio - torch """ import threading import gradio as gr import torch from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer MODEL_ID = "EpistemeAI/gpt-oss-20b-RL" print("Loading tokenizer and model (this may take a while)...") tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) # Use auto dtype & device mapping as requested model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype="auto", device_map="auto", ) model.eval() print("Model loaded. Example param device:", next(model.parameters()).device) # Thread-safe global history GLOBAL_HISTORY = [] # list of {"role": "system"|"user"|"assistant", "content": "..."} HISTORY_LOCK = threading.Lock() def build_prompt(system_message: str, history: list, user_message: str) -> str: """ Build prompt in the model's expected format. Adjust as needed. """ pieces = [] if system_message: pieces.append(f"<|system|>\n{system_message}\n") for turn in history: role = turn.get("role", "user") content = turn.get("content", "") pieces.append(f"<|{role}|>\n{content}\n") pieces.append(f"<|user|>\n{user_message}\n<|assistant|>\n") return "\n".join(pieces) def generate_stream(prompt: str, max_tokens: int, temperature: float, top_p: float): """ Stream partial strings via TextIteratorStreamer. """ inputs = tokenizer(prompt, return_tensors="pt") # Move input ids to model param device where possible (works with many accelerate setups) try: input_ids = inputs["input_ids"].to(next(model.parameters()).device) except Exception: input_ids = inputs["input_ids"] streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) gen_kwargs = dict( input_ids=input_ids, max_new_tokens=int(max_tokens), do_sample=True, temperature=float(temperature), top_p=float(top_p), streamer=streamer, ) thread = threading.Thread(target=model.generate, kwargs=gen_kwargs) thread.start() partial = "" for token_str in streamer: partial += token_str yield partial def visible_messages_from_history(real_history: list, streaming_partial: str | None): """ Convert internal history into the list of OpenAI-style messages for Gradio UI. - Show user messages verbatim (visible). - Show assistant messages (streamed or final). - Omit system messages (kept only for model context). """ msgs = [] for entry in real_history: role = entry.get("role") content = entry.get("content", "") if role == "system": # hide system from UI continue # For assistant messages, we'll use content (may be empty) msgs.append({"role": role, "content": content or ("thinking..." if role == "assistant" else "")}) # If we're currently streaming an assistant response, ensure it's reflected as the last assistant msg if streaming_partial is not None: # If last message is assistant, replace its content, otherwise append a new (user, assistant) pair if msgs and msgs[-1]["role"] == "assistant": msgs[-1]["content"] = streaming_partial else: # The user message that started this assistant reply should already be in history and visible. # Append assistant partial as the reply msgs.append({"role": "assistant", "content": streaming_partial}) return msgs def respond_stream(user_message, system_message, max_tokens, temperature, top_p): """ Gradio streaming handler: - Append real user message + assistant placeholder to GLOBAL_HISTORY - Yield visible message lists as the assistant generates tokens """ # Add the user message and an assistant placeholder into the real history with HISTORY_LOCK: if system_message: # include system message in real history for model context (but it won't be shown) GLOBAL_HISTORY.append({"role": "system", "content": system_message}) GLOBAL_HISTORY.append({"role": "user", "content": user_message}) GLOBAL_HISTORY.append({"role": "assistant", "content": ""}) # placeholder snapshot = list(GLOBAL_HISTORY) # Immediately show user message and assistant placeholder ("thinking...") initial_display = visible_messages_from_history(snapshot, streaming_partial="thinking...") yield initial_display # Build prompt using the real history but exclude the last assistant placeholder's empty content with HISTORY_LOCK: prompt_history = [h for h in GLOBAL_HISTORY[:-1]] # all except the placeholder assistant prompt = build_prompt(system_message or "", prompt_history, user_message or "") # Stream generation and update the last assistant entry for partial in generate_stream(prompt, max_tokens, temperature, top_p): with HISTORY_LOCK: # update global last assistant content if GLOBAL_HISTORY and GLOBAL_HISTORY[-1]["role"] == "assistant": GLOBAL_HISTORY[-1]["content"] = partial snapshot = list(GLOBAL_HISTORY) display = visible_messages_from_history(snapshot, streaming_partial=partial) yield display # Finalize: ensure assistant final content is shown with HISTORY_LOCK: final_snapshot = list(GLOBAL_HISTORY) final_display = visible_messages_from_history(final_snapshot, streaming_partial=final_snapshot[-1].get("content", "")) yield final_display # --- Gradio UI --- with gr.Blocks() as demo: gr.Markdown(f"**Model:** {MODEL_ID} — (system messages hidden; user visible)") chatbot = gr.Chatbot(elem_id="chatbot", label="Chat", type="messages", height=560) with gr.Row(): with gr.Column(scale=4): user_input = gr.Textbox(placeholder="Type a message and press Send", label="Your message") with gr.Column(scale=2): system_input = gr.Textbox(value="You are a Vibe Coder assistant.", label="System message (hidden from UI)") max_tokens = gr.Slider(minimum=1, maximum=4000, value=800, step=1, label="Max new tokens") temperature = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, step=0.01, label="Temperature") top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, step=0.05, label="Top-p (nucleus sampling)") send_btn = gr.Button("Send") send_btn.click( fn=respond_stream, inputs=[user_input, system_input, max_tokens, temperature, top_p], outputs=[chatbot], queue=True, ) clear_btn = gr.Button("Reset conversation") def reset_all(): with HISTORY_LOCK: GLOBAL_HISTORY.clear() return [] clear_btn.click(fn=reset_all, inputs=None, outputs=[chatbot]) gr.Markdown("Notes: model loading uses `device_map='auto'` and `torch_dtype='auto'`. " "If running multi-worker (gunicorn) you will need an external history store (Redis/DB).") if __name__ == "__main__": demo.launch()