Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from response_parser import * | |
| def initialization(state_dict: Dict) -> None: | |
| if not os.path.exists('cache'): | |
| os.mkdir('cache') | |
| if state_dict["bot_backend"] is None: | |
| state_dict["bot_backend"] = BotBackend() | |
| if 'OPENAI_API_KEY' in os.environ: | |
| del os.environ['OPENAI_API_KEY'] | |
| def get_bot_backend(state_dict: Dict) -> BotBackend: | |
| return state_dict["bot_backend"] | |
| def switch_to_gpt4(state_dict: Dict, whether_switch: bool) -> None: | |
| bot_backend = get_bot_backend(state_dict) | |
| if whether_switch: | |
| bot_backend.update_gpt_model_choice("GPT-4") | |
| else: | |
| bot_backend.update_gpt_model_choice("GPT-3.5") | |
| def add_text(state_dict: Dict, history: List, text: str) -> Tuple[List, Dict]: | |
| bot_backend = get_bot_backend(state_dict) | |
| bot_backend.add_text_message(user_text=text) | |
| history = history + [(text, None)] | |
| return history, gr.update(value="", interactive=False) | |
| def add_file(state_dict: Dict, history: List, files) -> List: | |
| bot_backend = get_bot_backend(state_dict) | |
| for file in files: | |
| path = file.name | |
| filename = os.path.basename(path) | |
| bot_msg = [f'📁[{filename}]', None] | |
| history.append(bot_msg) | |
| bot_backend.add_file_message(path=path, bot_msg=bot_msg) | |
| _, suffix = os.path.splitext(filename) | |
| if suffix in {'.jpg', '.jpeg', '.png', '.bmp', '.webp'}: | |
| copied_file_path = f'{bot_backend.jupyter_work_dir}/{filename}' | |
| width, height = get_image_size(copied_file_path) | |
| bot_msg[0] += \ | |
| f'\n<img src=\"file={copied_file_path}\" style=\'{"" if width < 800 else "width: 800px;"} max-width' \ | |
| f':none; max-height:none\'> ' | |
| return history | |
| def undo_upload_file(state_dict: Dict, history: List) -> Tuple[List, Dict]: | |
| bot_backend = get_bot_backend(state_dict) | |
| bot_msg = bot_backend.revoke_file() | |
| if bot_msg is None: | |
| return history, gr.Button.update(interactive=False) | |
| else: | |
| assert history[-1] == bot_msg | |
| del history[-1] | |
| if bot_backend.revocable_files: | |
| return history, gr.Button.update(interactive=True) | |
| else: | |
| return history, gr.Button.update(interactive=False) | |
| def refresh_file_display(state_dict: Dict) -> List[str]: | |
| bot_backend = get_bot_backend(state_dict) | |
| work_dir = bot_backend.jupyter_work_dir | |
| filenames = os.listdir(work_dir) | |
| paths = [] | |
| for filename in filenames: | |
| path = os.path.join(work_dir, filename) | |
| if not os.path.isdir(path): | |
| paths.append(path) | |
| return paths | |
| def refresh_token_count(state_dict: Dict): | |
| bot_backend = get_bot_backend(state_dict) | |
| model_choice = bot_backend.gpt_model_choice | |
| sliced = bot_backend.sliced | |
| token_count = bot_backend.context_window_tokens | |
| token_limit = config['model_context_window'][config['model'][model_choice]['model_name']] | |
| display_text = f"**Context token:** {token_count}/{token_limit}" | |
| if sliced: | |
| display_text += '\\\nToken limit exceeded, conversion has been sliced.' | |
| return gr.Markdown.update(value=display_text) | |
| def restart_ui(history: List) -> Tuple[List, Dict, Dict, Dict, Dict, Dict, Dict]: | |
| history.clear() | |
| return ( | |
| history, | |
| gr.Textbox.update(value="", interactive=False), | |
| gr.Button.update(interactive=False), | |
| gr.Button.update(interactive=False), | |
| gr.Button.update(interactive=False), | |
| gr.Button.update(interactive=False), | |
| gr.Button.update(visible=False) | |
| ) | |
| def restart_bot_backend(state_dict: Dict) -> None: | |
| bot_backend = get_bot_backend(state_dict) | |
| bot_backend.restart() | |
| def stop_generating(state_dict: Dict) -> None: | |
| bot_backend = get_bot_backend(state_dict) | |
| if bot_backend.code_executing: | |
| bot_backend.send_interrupt_signal() | |
| else: | |
| bot_backend.update_stop_generating_state(stop_generating=True) | |
| def bot(state_dict: Dict, history: List) -> List: | |
| bot_backend = get_bot_backend(state_dict) | |
| while bot_backend.finish_reason in ('new_input', 'function_call'): | |
| if history[-1][1]: | |
| history.append([None, ""]) | |
| else: | |
| history[-1][1] = "" | |
| try: | |
| response = chat_completion(bot_backend=bot_backend) | |
| for chunk in response: | |
| if chunk['choices'] and chunk['choices'][0]['finish_reason'] == 'function_call': | |
| if bot_backend.function_name in bot_backend.jupyter_kernel.available_functions: | |
| yield history, gr.Button.update(value='⏹️ Interrupt execution'), gr.Button.update(visible=False) | |
| else: | |
| yield history, gr.Button.update(interactive=False), gr.Button.update(visible=False) | |
| if bot_backend.stop_generating: | |
| response.close() | |
| if bot_backend.content: | |
| bot_backend.add_gpt_response_content_message() | |
| if bot_backend.display_code_block: | |
| bot_backend.update_display_code_block( | |
| display_code_block="\n⚫Stopped:\n```python\n{}\n```".format(bot_backend.code_str) | |
| ) | |
| history = copy.deepcopy(bot_backend.bot_history) | |
| history[-1][1] += bot_backend.display_code_block | |
| bot_backend.add_function_call_response_message(function_response=None) | |
| bot_backend.reset_gpt_response_log_values() | |
| break | |
| history, weather_exit = parse_response( | |
| chunk=chunk, | |
| history=history, | |
| bot_backend=bot_backend | |
| ) | |
| yield ( | |
| history, | |
| gr.Button.update( | |
| interactive=False if bot_backend.stop_generating else True, | |
| value='⏹️ Stop generating' | |
| ), | |
| gr.Button.update(visible=False) | |
| ) | |
| if weather_exit: | |
| exit(-1) | |
| except openai.OpenAIError as openai_error: | |
| bot_backend.reset_gpt_response_log_values(exclude=['finish_reason']) | |
| yield history, gr.Button.update(interactive=False), gr.Button.update(visible=True) | |
| raise openai_error | |
| yield history, gr.Button.update(interactive=False, value='⏹️ Stop generating'), gr.Button.update(visible=False) | |
| if __name__ == '__main__': | |
| config = get_config() | |
| with gr.Blocks(theme=gr.themes.Base()) as block: | |
| """ | |
| Reference: https://www.gradio.app/guides/creating-a-chatbot-fast | |
| """ | |
| # UI components | |
| state = gr.State(value={"bot_backend": None}) | |
| with gr.Tab("Chat"): | |
| chatbot = gr.Chatbot([], elem_id="chatbot", label="Local Code Interpreter", height=750) | |
| with gr.Row(): | |
| with gr.Column(scale=0.85): | |
| text_box = gr.Textbox( | |
| show_label=False, | |
| placeholder="Enter text and press enter, or upload a file", | |
| container=False | |
| ) | |
| with gr.Column(scale=0.15, min_width=0): | |
| file_upload_button = gr.UploadButton("📁", file_count='multiple', file_types=['file']) | |
| with gr.Row(equal_height=True): | |
| with gr.Column(scale=0.08, min_width=0): | |
| check_box = gr.Checkbox(label="Use GPT-4", interactive=config['model']['GPT-4']['available']) | |
| with gr.Column(scale=0.314, min_width=0): | |
| model_token_limit = config['model_context_window'][config['model']['GPT-3.5']['model_name']] | |
| token_count_display_text = f"**Context token:** 0/{model_token_limit}" | |
| token_monitor = gr.Markdown(value=token_count_display_text) | |
| with gr.Column(scale=0.15, min_width=0): | |
| retry_button = gr.Button(value='🔂OpenAI Error, click here to retry', visible=False) | |
| with gr.Column(scale=0.15, min_width=0): | |
| stop_generation_button = gr.Button(value='⏹️ Stop generating', interactive=False) | |
| with gr.Column(scale=0.15, min_width=0): | |
| restart_button = gr.Button(value='🔄 Restart') | |
| with gr.Column(scale=0.15, min_width=0): | |
| undo_file_button = gr.Button(value="↩️Undo upload file", interactive=False) | |
| with gr.Tab("Files"): | |
| file_output = gr.Files() | |
| # Components function binding | |
| txt_msg = text_box.submit(add_text, [state, chatbot, text_box], [chatbot, text_box], queue=False).then( | |
| lambda: gr.Button.update(interactive=False), None, [undo_file_button], queue=False | |
| ).then( | |
| bot, [state, chatbot], [chatbot, stop_generation_button, retry_button] | |
| ) | |
| txt_msg.then(fn=refresh_file_display, inputs=[state], outputs=[file_output]) | |
| txt_msg.then(lambda: gr.update(interactive=True), None, [text_box], queue=False) | |
| txt_msg.then(fn=refresh_token_count, inputs=[state], outputs=[token_monitor]) | |
| retry_button.click(lambda: gr.Button.update(visible=False), None, [retry_button], queue=False).then( | |
| bot, [state, chatbot], [chatbot, stop_generation_button, retry_button] | |
| ).then( | |
| fn=refresh_file_display, inputs=[state], outputs=[file_output] | |
| ).then( | |
| lambda: gr.update(interactive=True), None, [text_box], queue=False | |
| ).then( | |
| fn=refresh_token_count, inputs=[state], outputs=[token_monitor] | |
| ) | |
| check_box.change(fn=switch_to_gpt4, inputs=[state, check_box]).then( | |
| fn=refresh_token_count, inputs=[state], outputs=[token_monitor] | |
| ) | |
| file_msg = file_upload_button.upload( | |
| add_file, [state, chatbot, file_upload_button], [chatbot], queue=False | |
| ) | |
| file_msg.then(lambda: gr.Button.update(interactive=True), None, [undo_file_button], queue=False) | |
| file_msg.then(fn=refresh_file_display, inputs=[state], outputs=[file_output]) | |
| undo_file_button.click( | |
| fn=undo_upload_file, inputs=[state, chatbot], outputs=[chatbot, undo_file_button] | |
| ).then( | |
| fn=refresh_file_display, inputs=[state], outputs=[file_output] | |
| ) | |
| stop_generation_button.click(fn=stop_generating, inputs=[state], queue=False).then( | |
| fn=lambda: gr.Button.update(interactive=False), inputs=None, outputs=[stop_generation_button], queue=False | |
| ) | |
| restart_button.click( | |
| fn=restart_ui, inputs=[chatbot], | |
| outputs=[ | |
| chatbot, text_box, restart_button, file_upload_button, undo_file_button, stop_generation_button, | |
| retry_button | |
| ] | |
| ).then( | |
| fn=restart_bot_backend, inputs=[state], queue=False | |
| ).then( | |
| fn=refresh_file_display, inputs=[state], outputs=[file_output] | |
| ).then( | |
| fn=lambda: (gr.Textbox.update(interactive=True), gr.Button.update(interactive=True), | |
| gr.Button.update(interactive=True)), | |
| inputs=None, outputs=[text_box, restart_button, file_upload_button], queue=False | |
| ).then( | |
| fn=refresh_token_count, | |
| inputs=[state], outputs=[token_monitor] | |
| ) | |
| block.load(fn=initialization, inputs=[state]) | |
| block.queue() | |
| block.launch(inbrowser=True) | |