Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| from typing import List | |
| import logging | |
| import urllib.request | |
| from utils import model_name_mapping, urial_template, openai_base_request, chat_template, openai_chat_request | |
| from constant import js_code_label, my_css, HEADER_MD, BASE_TO_ALIGNED, MODELS | |
| from openai import OpenAI | |
| import datetime | |
| # add logging info to console | |
| logging.basicConfig(level=logging.INFO) | |
| URIAL_VERSION = "inst_1k_v4.help" | |
| URIAL_URL = f"https://raw.githubusercontent.com/Re-Align/URIAL/main/urial_prompts/{URIAL_VERSION}.txt" | |
| urial_prompt = urllib.request.urlopen(URIAL_URL).read().decode('utf-8') | |
| urial_prompt = urial_prompt.replace("```", '"""') # new version of URIAL uses """ instead of ``` | |
| STOP_STRS = ['"""', '# Query:', '# Answer:'] | |
| addr_limit_counter = {} | |
| LAST_UPDATE_TIME = datetime.datetime.now() | |
| models = MODELS | |
| # mega_hist = { | |
| # "base": [], | |
| # "aligned": [] | |
| # } | |
| def respond( | |
| message, | |
| history: list[tuple[str, str]], | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| rp, | |
| model_name, | |
| model_type, | |
| api_key, | |
| request:gr.Request | |
| ): | |
| global STOP_STRS, urial_prompt, LAST_UPDATE_TIME, addr_limit_counter | |
| assert model_type in ["base", "aligned"] | |
| # if history: | |
| # if model_type == "base": | |
| # mega_hist["base"] = history | |
| # else: | |
| # mega_hist["aligned"] = history | |
| if model_type == "base": | |
| prompt = urial_template(urial_prompt, history, message) | |
| else: | |
| messages = chat_template(history, message) | |
| # _model_name = "meta-llama/Llama-3-8b-hf" | |
| _model_name = model_name_mapping(model_name) | |
| if api_key and len(api_key) == 64: | |
| api_key = api_key | |
| else: | |
| api_key = None | |
| # headers = request.headers | |
| # if already 24 hours passed, reset the counter | |
| if datetime.datetime.now() - LAST_UPDATE_TIME > datetime.timedelta(days=1): | |
| addr_limit_counter = {} | |
| LAST_UPDATE_TIME = datetime.datetime.now() | |
| host_addr = request.client.host | |
| if host_addr not in addr_limit_counter: | |
| addr_limit_counter[host_addr] = 0 | |
| if addr_limit_counter[host_addr] > 100: | |
| return "You have reached the limit of 100 requests for today. Please use your own API key." | |
| if model_type == "base": | |
| infer_request = openai_base_request(prompt=prompt, model=_model_name, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| top_p=top_p, | |
| repetition_penalty=rp, | |
| stop=STOP_STRS, api_key=api_key) | |
| else: | |
| infer_request = openai_chat_request(messages=messages, model=_model_name, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| top_p=top_p, | |
| repetition_penalty=rp, | |
| stop=STOP_STRS, api_key=api_key) | |
| addr_limit_counter[host_addr] += 1 | |
| logging.info(f"Requesting chat completion from OpenAI API with model {_model_name}") | |
| logging.info(f"addr_limit_counter: {addr_limit_counter}; Last update time: {LAST_UPDATE_TIME};") | |
| response = "" | |
| for msg in infer_request: | |
| # print(msg.choices[0].delta.keys()) | |
| if hasattr(msg.choices[0], "delta"): | |
| # Note: 'ChoiceDelta' object may or may not be not subscriptable | |
| if "content" in msg.choices[0].delta: | |
| token = msg.choices[0].delta["content"] | |
| else: | |
| token = msg.choices[0].delta.content | |
| else: | |
| token = msg.choices[0].text | |
| if model_type == "base": | |
| should_stop = False | |
| for _stop in STOP_STRS: | |
| if _stop in response + token: | |
| should_stop = True | |
| break | |
| if should_stop: | |
| break | |
| if token is None: | |
| continue | |
| response += token | |
| if model_type == "base": | |
| if response.endswith('\n"'): | |
| response = response[:-1] | |
| elif response.endswith('\n""'): | |
| response = response[:-2] | |
| yield history + [(message, response)] | |
| # mega_hist[model_type].append((message, response)) | |
| # yield mega_hist[model_type] | |
| def load_models(base_model_name): | |
| print(f"base_model_name={base_model_name}") | |
| out_box = [gr.Chatbot(), gr.Chatbot(), gr.Dropdown()] | |
| out_box[0] = (gr.update(label=f"Chat with Base LLM: {base_model_name}")) | |
| aligned_model_name = BASE_TO_ALIGNED[base_model_name] | |
| out_box[1] = (gr.update(label=f"Chat with Aligned LLM: {aligned_model_name}")) | |
| out_box[2] = (gr.update(value=aligned_model_name, interactive=False)) | |
| return out_box[0], out_box[1], out_box[2] | |
| def clear_fn(): | |
| # mega_hist["base"] = [] | |
| # mega_hist["aligned"] = [] | |
| return None, None, None | |
| with gr.Blocks(gr.themes.Soft(), js=js_code_label, css=my_css) as demo: | |
| api_key = gr.Textbox(label="π APIKey", placeholder="Enter your Together/Hyperbolic API Key. Leave it blank to use our key with limited usage.", type="password", elem_id="api_key", visible=False) | |
| gr.Markdown(HEADER_MD) | |
| with gr.Row(): | |
| chat_a = gr.Chatbot(height=500, label="Chat with Base LLMs via URIAL") | |
| chat_b = gr.Chatbot(height=500, label="Chat with Aligned LLMs") | |
| with gr.Group(): | |
| with gr.Row(): | |
| with gr.Column(scale=1.5): | |
| message = gr.Textbox(label="Prompt", placeholder="Enter your message here") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| with gr.Row(): | |
| left_model_choice = gr.Dropdown(label="Base Model", choices=models, interactive=True) | |
| right_model_choice = gr.Textbox(label="Aligned Model", placeholder="xxx", visible=True) | |
| with gr.Row(): | |
| btn = gr.Button("π Chat") | |
| # gr.Markdown("---") | |
| with gr.Row(): | |
| stop_btn = gr.Button("βΈοΈ Stop") | |
| clear_btn = gr.Button("π Clear") | |
| with gr.Row(): | |
| gr.Markdown(">> - We thank for the support of Llama-3.1-405B from [Hyperbolic AI](https://hyperbolic.xyz/). ") | |
| with gr.Column(scale=1): | |
| with gr.Accordion("βοΈ Params for **Base** LLM", open=True): | |
| with gr.Row(): | |
| max_tokens_1 = gr.Slider(label="Max tokens", value=256, minimum=0, maximum=2048, step=16, interactive=True, visible=True) | |
| temperature_1 = gr.Slider(label="Temperature", step=0.01, minimum=0.01, maximum=1.0, value=0.9) | |
| with gr.Row(): | |
| top_p_1 = gr.Slider(label="Top-P", step=0.01, minimum=0.01, maximum=1.0, value=0.9) | |
| rp_1 = gr.Slider(label="Repetition Penalty", step=0.1, minimum=0.1, maximum=2.0, value=1.1) | |
| with gr.Accordion("βοΈ Params for **Aligned** LLM", open=True): | |
| with gr.Row(): | |
| max_tokens_2 = gr.Slider(label="Max tokens", value=256, minimum=0, maximum=2048, step=16, interactive=True, visible=True) | |
| temperature_2 = gr.Slider(label="Temperature", step=0.01, minimum=0.01, maximum=1.0, value=0.9) | |
| with gr.Row(): | |
| top_p_2 = gr.Slider(label="Top-P", step=0.01, minimum=0.01, maximum=1.0, value=0.9) | |
| rp_2 = gr.Slider(label="Repetition Penalty", step=0.1, minimum=0.1, maximum=2.0, value=1.0) | |
| left_model_choice.value = "Llama-3.1-405B-FP8" | |
| right_model_choice.value = "Llama-3.1-405B-Instruct-BF16" | |
| left_model_choice.change(load_models, [left_model_choice], [chat_a, chat_b, right_model_choice]) | |
| model_type_left = gr.Textbox(visible=False, value="base") | |
| model_type_right = gr.Textbox(visible=False, value="aligned") | |
| go1 = btn.click(respond, [message, chat_a, max_tokens_1, temperature_1, top_p_1, rp_1, left_model_choice, model_type_left, api_key], chat_a) | |
| go2 = btn.click(respond, [message, chat_b, max_tokens_2, temperature_2, top_p_2, rp_2, right_model_choice, model_type_right, api_key], chat_b) | |
| stop_btn.click(None, None, None, cancels=[go1, go2]) | |
| clear_btn.click(clear_fn, None, [message, chat_a, chat_b]) | |
| if __name__ == "__main__": | |
| demo.launch(show_api=False) |