legolasyiu's picture
Update app.py
bd75abd verified
# save as app.py
"""
Gradio streaming chat where:
- user messages are visible in the UI,
- system messages are hidden (kept for context),
- assistant output is streamed and updates in-place.
Requirements:
- transformers
- gradio
- torch
"""
import threading
import gradio as gr
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
MODEL_ID = "EpistemeAI/gpt-oss-20b-RL"
print("Loading tokenizer and model (this may take a while)...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
# Use auto dtype & device mapping as requested
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
torch_dtype="auto",
device_map="auto",
)
model.eval()
print("Model loaded. Example param device:", next(model.parameters()).device)
# Thread-safe global history
GLOBAL_HISTORY = [] # list of {"role": "system"|"user"|"assistant", "content": "..."}
HISTORY_LOCK = threading.Lock()
def build_prompt(system_message: str, history: list, user_message: str) -> str:
"""
Build prompt in the model's expected format. Adjust as needed.
"""
pieces = []
if system_message:
pieces.append(f"<|system|>\n{system_message}\n")
for turn in history:
role = turn.get("role", "user")
content = turn.get("content", "")
pieces.append(f"<|{role}|>\n{content}\n")
pieces.append(f"<|user|>\n{user_message}\n<|assistant|>\n")
return "\n".join(pieces)
def generate_stream(prompt: str, max_tokens: int, temperature: float, top_p: float):
"""
Stream partial strings via TextIteratorStreamer.
"""
inputs = tokenizer(prompt, return_tensors="pt")
# Move input ids to model param device where possible (works with many accelerate setups)
try:
input_ids = inputs["input_ids"].to(next(model.parameters()).device)
except Exception:
input_ids = inputs["input_ids"]
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
gen_kwargs = dict(
input_ids=input_ids,
max_new_tokens=int(max_tokens),
do_sample=True,
temperature=float(temperature),
top_p=float(top_p),
streamer=streamer,
)
thread = threading.Thread(target=model.generate, kwargs=gen_kwargs)
thread.start()
partial = ""
for token_str in streamer:
partial += token_str
yield partial
def visible_messages_from_history(real_history: list, streaming_partial: str | None):
"""
Convert internal history into the list of OpenAI-style messages for Gradio UI.
- Show user messages verbatim (visible).
- Show assistant messages (streamed or final).
- Omit system messages (kept only for model context).
"""
msgs = []
for entry in real_history:
role = entry.get("role")
content = entry.get("content", "")
if role == "system":
# hide system from UI
continue
# For assistant messages, we'll use content (may be empty)
msgs.append({"role": role, "content": content or ("thinking..." if role == "assistant" else "")})
# If we're currently streaming an assistant response, ensure it's reflected as the last assistant msg
if streaming_partial is not None:
# If last message is assistant, replace its content, otherwise append a new (user, assistant) pair
if msgs and msgs[-1]["role"] == "assistant":
msgs[-1]["content"] = streaming_partial
else:
# The user message that started this assistant reply should already be in history and visible.
# Append assistant partial as the reply
msgs.append({"role": "assistant", "content": streaming_partial})
return msgs
def respond_stream(user_message, system_message, max_tokens, temperature, top_p):
"""
Gradio streaming handler:
- Append real user message + assistant placeholder to GLOBAL_HISTORY
- Yield visible message lists as the assistant generates tokens
"""
# Add the user message and an assistant placeholder into the real history
with HISTORY_LOCK:
if system_message:
# include system message in real history for model context (but it won't be shown)
GLOBAL_HISTORY.append({"role": "system", "content": system_message})
GLOBAL_HISTORY.append({"role": "user", "content": user_message})
GLOBAL_HISTORY.append({"role": "assistant", "content": ""}) # placeholder
snapshot = list(GLOBAL_HISTORY)
# Immediately show user message and assistant placeholder ("thinking...")
initial_display = visible_messages_from_history(snapshot, streaming_partial="thinking...")
yield initial_display
# Build prompt using the real history but exclude the last assistant placeholder's empty content
with HISTORY_LOCK:
prompt_history = [h for h in GLOBAL_HISTORY[:-1]] # all except the placeholder assistant
prompt = build_prompt(system_message or "", prompt_history, user_message or "")
# Stream generation and update the last assistant entry
for partial in generate_stream(prompt, max_tokens, temperature, top_p):
with HISTORY_LOCK:
# update global last assistant content
if GLOBAL_HISTORY and GLOBAL_HISTORY[-1]["role"] == "assistant":
GLOBAL_HISTORY[-1]["content"] = partial
snapshot = list(GLOBAL_HISTORY)
display = visible_messages_from_history(snapshot, streaming_partial=partial)
yield display
# Finalize: ensure assistant final content is shown
with HISTORY_LOCK:
final_snapshot = list(GLOBAL_HISTORY)
final_display = visible_messages_from_history(final_snapshot, streaming_partial=final_snapshot[-1].get("content", ""))
yield final_display
# --- Gradio UI ---
with gr.Blocks() as demo:
gr.Markdown(f"**Model:** {MODEL_ID} — (system messages hidden; user visible)")
chatbot = gr.Chatbot(elem_id="chatbot", label="Chat", type="messages", height=560)
with gr.Row():
with gr.Column(scale=4):
user_input = gr.Textbox(placeholder="Type a message and press Send", label="Your message")
with gr.Column(scale=2):
system_input = gr.Textbox(value="You are a Vibe Coder assistant.", label="System message (hidden from UI)")
max_tokens = gr.Slider(minimum=1, maximum=4000, value=800, step=1, label="Max new tokens")
temperature = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, step=0.01, label="Temperature")
top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, step=0.05, label="Top-p (nucleus sampling)")
send_btn = gr.Button("Send")
send_btn.click(
fn=respond_stream,
inputs=[user_input, system_input, max_tokens, temperature, top_p],
outputs=[chatbot],
queue=True,
)
clear_btn = gr.Button("Reset conversation")
def reset_all():
with HISTORY_LOCK:
GLOBAL_HISTORY.clear()
return []
clear_btn.click(fn=reset_all, inputs=None, outputs=[chatbot])
gr.Markdown("Notes: model loading uses `device_map='auto'` and `torch_dtype='auto'`. "
"If running multi-worker (gunicorn) you will need an external history store (Redis/DB).")
if __name__ == "__main__":
demo.launch()