Spaces:
Running
Running
| import base64 | |
| import gradio as gr | |
| import json | |
| import mimetypes | |
| import os | |
| import requests | |
| import time | |
| import modelscope_studio.components.legacy as legacy | |
| import modelscope_studio.components.antd as antd | |
| import modelscope_studio.components.antdx as antdx | |
| import modelscope_studio.components.base as ms | |
| import modelscope_studio.components.pro as pro | |
| from modelscope_studio.components.pro.chatbot import ( | |
| ChatbotActionConfig, ChatbotBotConfig, ChatbotMarkdownConfig, | |
| ChatbotPromptsConfig, ChatbotUserConfig, ChatbotWelcomeConfig) | |
| from config import DEFAULT_PROMPTS, EXAMPLES, SystemPrompt | |
| import re | |
| MODEL_VERSION = os.environ['MODEL_VERSION'] | |
| API_URL = os.environ['API_URL'] | |
| API_KEY = os.environ['API_KEY'] | |
| SYSTEM_PROMPT = os.environ.get('SYSTEM_PROMPT') | |
| MULTIMODAL_FLAG = os.environ.get('MULTIMODAL') | |
| MODEL_CONTROL_DEFAULTS = json.loads(os.environ['MODEL_CONTROL_DEFAULTS']) | |
| NAME_MAP = { | |
| 'system': os.environ.get('SYSTEM_NAME'), | |
| 'user': os.environ.get('USER_NAME'), | |
| } | |
| MODEL_NAME = 'MiniMax-M1' | |
| def prompt_select(e: gr.EventData): | |
| return gr.update(value=e._data["payload"][0]["value"]["description"]) | |
| def clear(): | |
| return gr.update(value=None) | |
| def retry(chatbot_value, e: gr.EventData): | |
| index = e._data["payload"][0]["index"] | |
| chatbot_value = chatbot_value[:index] | |
| yield gr.update(loading=True), gr.update(value=chatbot_value), gr.update(disabled=True) | |
| for chunk in submit(None, chatbot_value): | |
| yield chunk | |
| def cancel(chatbot_value): | |
| chatbot_value[-1]["loading"] = False | |
| chatbot_value[-1]["status"] = "done" | |
| chatbot_value[-1]["footer"] = "Chat completion paused" | |
| return gr.update(value=chatbot_value), gr.update(loading=False), gr.update(disabled=False) | |
| def add_name_for_message(message): | |
| name = NAME_MAP.get(message['role']) | |
| if name is not None: | |
| message['name'] = name | |
| def convert_content(content): | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, tuple): | |
| return [{ | |
| 'type': 'image_url', | |
| 'image_url': { | |
| 'url': encode_base64(content[0]), | |
| }, | |
| }] | |
| content_list = [] | |
| for key, val in content.items(): | |
| if key == 'text': | |
| content_list.append({ | |
| 'type': 'text', | |
| 'text': val, | |
| }) | |
| elif key == 'files': | |
| for f in val: | |
| content_list.append({ | |
| 'type': 'image_url', | |
| 'image_url': { | |
| 'url': encode_base64(f), | |
| }, | |
| }) | |
| return content_list | |
| def encode_base64(path): | |
| guess_type = mimetypes.guess_type(path)[0] | |
| if not guess_type.startswith('image/'): | |
| raise gr.Error('not an image ({}): {}'.format(guess_type, path)) | |
| with open(path, 'rb') as handle: | |
| data = handle.read() | |
| return 'data:{};base64,{}'.format( | |
| guess_type, | |
| base64.b64encode(data).decode(), | |
| ) | |
| def format_history(history): | |
| """Convert chatbot history format to API call format""" | |
| messages = [] | |
| if SYSTEM_PROMPT is not None: | |
| messages.append({ | |
| 'role': 'system', | |
| 'content': SYSTEM_PROMPT, | |
| }) | |
| for item in history: | |
| if item["role"] == "user": | |
| messages.append({ | |
| 'role': 'user', | |
| 'content': convert_content(item["content"]), | |
| }) | |
| elif item["role"] == "assistant": | |
| # Extract reasoning content and main content | |
| reasoning_content = "" | |
| main_content = "" | |
| if isinstance(item["content"], list): | |
| for content_item in item["content"]: | |
| if content_item.get("type") == "tool": | |
| reasoning_content = content_item.get("content", "") | |
| elif content_item.get("type") == "text": | |
| main_content = content_item.get("content", "") | |
| else: | |
| main_content = item["content"] | |
| messages.append({ | |
| 'role': 'assistant', | |
| 'content': convert_content(main_content), | |
| 'reasoning_content': convert_content(reasoning_content), | |
| }) | |
| return messages | |
| def submit(sender_value, chatbot_value): | |
| if sender_value is not None: | |
| chatbot_value.append({ | |
| "role": "user", | |
| "content": sender_value, | |
| }) | |
| api_messages = format_history(chatbot_value) | |
| for message in api_messages: | |
| add_name_for_message(message) | |
| chatbot_value.append({ | |
| "role": "assistant", | |
| "content": [], | |
| "loading": True, | |
| "status": "pending" | |
| }) | |
| yield { | |
| sender: gr.update(value=None, loading=True), | |
| clear_btn: gr.update(disabled=True), | |
| chatbot: gr.update(value=chatbot_value) | |
| } | |
| try: | |
| data = { | |
| 'model': MODEL_VERSION, | |
| 'messages': api_messages, | |
| 'stream': True, | |
| 'max_tokens': MODEL_CONTROL_DEFAULTS['tokens_to_generate'], | |
| 'temperature': MODEL_CONTROL_DEFAULTS['temperature'], | |
| 'top_p': MODEL_CONTROL_DEFAULTS['top_p'], | |
| } | |
| r = requests.post( | |
| API_URL, | |
| headers={ | |
| 'Content-Type': 'application/json', | |
| 'Authorization': 'Bearer {}'.format(API_KEY), | |
| }, | |
| data=json.dumps(data), | |
| stream=True, | |
| ) | |
| thought_done = False | |
| start_time = time.time() | |
| message_content = chatbot_value[-1]["content"] | |
| # Reasoning content (tool type) | |
| message_content.append({ | |
| "type": "tool", | |
| "content": "", | |
| "options": { | |
| "title": "π€ Thinking..." | |
| } | |
| }) | |
| # Main content (text type) | |
| message_content.append({ | |
| "type": "text", | |
| "content": "", | |
| }) | |
| reasoning_start_time = None | |
| reasoning_duration = None | |
| for row in r.iter_lines(): | |
| if row.startswith(b'data:'): | |
| data = json.loads(row[5:]) | |
| if 'choices' not in data: | |
| raise gr.Error('request failed') | |
| choice = data['choices'][0] | |
| if 'delta' in choice: | |
| delta = choice['delta'] | |
| reasoning_content = delta.get('reasoning_content', '') | |
| content = delta.get('content', '') | |
| chatbot_value[-1]["loading"] = False | |
| # Handle reasoning content | |
| if reasoning_content: | |
| if reasoning_start_time is None: | |
| reasoning_start_time = time.time() | |
| message_content[-2]["content"] += reasoning_content | |
| # Handle main content | |
| if content: | |
| message_content[-1]["content"] += content | |
| if not thought_done: | |
| thought_done = True | |
| if reasoning_start_time is not None: | |
| reasoning_duration = time.time() - reasoning_start_time | |
| thought_cost_time = "{:.2f}".format(reasoning_duration) | |
| else: | |
| reasoning_duration = 0.0 | |
| thought_cost_time = "0.00" | |
| message_content[-2]["options"] = {"title": f"End of Thought ({thought_cost_time}s)"} | |
| yield {chatbot: gr.update(value=chatbot_value)} | |
| elif 'message' in choice: | |
| message_data = choice['message'] | |
| reasoning_content = message_data.get('reasoning_content', '') | |
| main_content = message_data.get('content', '') | |
| message_content[-2]["content"] = reasoning_content | |
| message_content[-1]["content"] = main_content | |
| if reasoning_content and main_content: | |
| if reasoning_duration is None: | |
| if reasoning_start_time is not None: | |
| reasoning_duration = time.time() - reasoning_start_time | |
| thought_cost_time = "{:.2f}".format(reasoning_duration) | |
| else: | |
| reasoning_duration = 0.0 | |
| thought_cost_time = "0.00" | |
| else: | |
| thought_cost_time = "{:.2f}".format(reasoning_duration) | |
| message_content[-2]["options"] = {"title": f"End of Thought ({thought_cost_time}s)"} | |
| chatbot_value[-1]["loading"] = False | |
| yield {chatbot: gr.update(value=chatbot_value)} | |
| chatbot_value[-1]["footer"] = "{:.2f}s".format(time.time() - start_time) | |
| chatbot_value[-1]["status"] = "done" | |
| yield { | |
| clear_btn: gr.update(disabled=False), | |
| sender: gr.update(loading=False), | |
| chatbot: gr.update(value=chatbot_value), | |
| } | |
| except Exception as e: | |
| chatbot_value[-1]["loading"] = False | |
| chatbot_value[-1]["status"] = "done" | |
| chatbot_value[-1]["content"] = "Request failed, please try again." | |
| yield { | |
| clear_btn: gr.update(disabled=False), | |
| sender: gr.update(loading=False), | |
| chatbot: gr.update(value=chatbot_value), | |
| } | |
| raise e | |
| def remove_code_block(text): | |
| # Try to match code blocks with language markers | |
| patterns = [ | |
| r'```(?:html|HTML)\n([\s\S]+?)\n```', # Match ```html or ```HTML | |
| r'```\n([\s\S]+?)\n```', # Match code blocks without language markers | |
| r'```([\s\S]+?)```' # Match code blocks without line breaks | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, text, re.DOTALL) | |
| if match: | |
| extracted = match.group(1).strip() | |
| print("Successfully extracted code block:", extracted) | |
| return extracted | |
| # If no code block is found, check if the entire text is HTML | |
| if text.strip().startswith('<!DOCTYPE html>') or text.strip().startswith('<html'): | |
| print("Text appears to be raw HTML, using as is") | |
| return text.strip() | |
| print("No code block found in text:", text) | |
| return text.strip() | |
| def send_to_sandbox(code): | |
| # Add a wrapper to inject necessary permissions | |
| wrapped_code = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <script> | |
| // Create a safe storage alternative | |
| const safeStorage = {{ | |
| _data: {{}}, | |
| getItem: function(key) {{ | |
| return this._data[key] || null; | |
| }}, | |
| setItem: function(key, value) {{ | |
| this._data[key] = value; | |
| }}, | |
| removeItem: function(key) {{ | |
| delete this._data[key]; | |
| }}, | |
| clear: function() {{ | |
| this._data = {{}}; | |
| }} | |
| }}; | |
| // Replace native localStorage | |
| Object.defineProperty(window, 'localStorage', {{ | |
| value: safeStorage, | |
| writable: false | |
| }}); | |
| // Add error handling without using alert | |
| window.onerror = function(message, source, lineno, colno, error) {{ | |
| console.error('Error:', message); | |
| }}; | |
| </script> | |
| </head> | |
| <body> | |
| {code} | |
| </body> | |
| </html> | |
| """ | |
| encoded_html = base64.b64encode(wrapped_code.encode('utf-8')).decode('utf-8') | |
| data_uri = f"data:text/html;charset=utf-8;base64,{encoded_html}" | |
| iframe = f'<iframe src="{data_uri}" width="100%" height="920px" sandbox="allow-scripts allow-same-origin allow-forms allow-popups allow-modals allow-presentation" allow="display-capture"></iframe>' | |
| print("Generated iframe:", iframe) | |
| return iframe | |
| def select_example(example: dict): | |
| return lambda: gr.update(value=example["description"]) | |
| def generate_code(query: str): | |
| if not query: | |
| return None, None, None, gr.update(active_key="empty"), gr.update(active_key="reasoning", visible=False) | |
| print("Starting code generation with query:", query) | |
| messages = [{ | |
| 'role': 'system', | |
| 'content': SystemPrompt | |
| }, { | |
| 'role': 'user', | |
| 'content': query | |
| }] | |
| max_retries = 3 | |
| retry_count = 0 | |
| while retry_count < max_retries: | |
| try: | |
| data = { | |
| 'model': MODEL_VERSION, | |
| 'messages': messages, | |
| 'stream': True, | |
| 'max_tokens': MODEL_CONTROL_DEFAULTS['tokens_to_generate'], | |
| 'temperature': MODEL_CONTROL_DEFAULTS['temperature'], | |
| 'top_p': MODEL_CONTROL_DEFAULTS['top_p'], | |
| } | |
| print(f"Attempt {retry_count + 1}: Sending request to API with data:", json.dumps(data, indent=2)) | |
| r = requests.post( | |
| API_URL, | |
| headers={ | |
| 'Content-Type': 'application/json', | |
| 'Authorization': 'Bearer {}'.format(API_KEY), | |
| }, | |
| data=json.dumps(data), | |
| stream=True, | |
| timeout=60 # Set 60 seconds timeout | |
| ) | |
| content = "" | |
| reasoning_content = "" | |
| for row in r.iter_lines(): | |
| if row.startswith(b'data:'): | |
| data = json.loads(row[5:]) | |
| print("Received data from API:", json.dumps(data, indent=2)) | |
| if 'choices' not in data: | |
| raise gr.Error('request failed') | |
| choice = data['choices'][0] | |
| if 'delta' in choice: | |
| delta = choice['delta'] | |
| content += delta.get('content', '') | |
| reasoning_content += delta.get('reasoning_content', '') | |
| print("Current content:", content) | |
| print("Current reasoning:", reasoning_content) | |
| yield { | |
| code_output: content, | |
| reasoning_output: reasoning_content + "\n", | |
| state_tab: gr.update(active_key="loading"), | |
| output_tabs: gr.update(active_key="reasoning", visible=True) | |
| } | |
| elif 'message' in choice: | |
| message_data = choice['message'] | |
| content = message_data.get('content', '') | |
| reasoning_content = message_data.get('reasoning_content', '') | |
| print("Final content:", content) | |
| print("Final reasoning:", reasoning_content) | |
| html_content = remove_code_block(content) | |
| print("Extracted HTML:", html_content) | |
| yield { | |
| code_output: content, | |
| reasoning_output: reasoning_content + "\n", | |
| sandbox: send_to_sandbox(html_content), | |
| state_tab: gr.update(active_key="render"), | |
| output_tabs: gr.update(active_key="code", visible=True) # Switch to code tab when complete | |
| } | |
| # If successful, break out of retry loop | |
| break | |
| except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: | |
| retry_count += 1 | |
| if retry_count == max_retries: | |
| print(f"Failed after {max_retries} attempts:", str(e)) | |
| raise gr.Error(f"Request failed after {max_retries} attempts: {str(e)}") | |
| print(f"Attempt {retry_count} failed, retrying...") | |
| time.sleep(1) # Wait 1 second before retrying | |
| except Exception as e: | |
| print("Error occurred:", str(e)) | |
| raise gr.Error(str(e)) | |
| css = """ | |
| .output-loading { | |
| display: flex; | |
| flex-direction: column; | |
| align-items: center; | |
| justify-content: center; | |
| width: 100%; | |
| min-height: 680px; | |
| height: calc(100vh - 200px); | |
| } | |
| .output-html { | |
| display: flex; | |
| flex-direction: column; | |
| width: 100%; | |
| min-height: 680px; | |
| } | |
| .output-html > iframe { | |
| flex: 1; | |
| } | |
| .reasoning-box { | |
| height: 300px; | |
| overflow-y: auto; | |
| background-color: #f5f5f5; | |
| border-radius: 4px; | |
| margin-bottom: 12px; | |
| font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif; | |
| font-size: 14px; | |
| line-height: 1.2; | |
| white-space: pre-wrap; | |
| word-break: break-word; | |
| width: 100%; | |
| box-sizing: border-box; | |
| scroll-behavior: smooth; | |
| } | |
| .reasoning-box .ms-markdown { | |
| padding: 0 12px; | |
| } | |
| .reasoning-box::-webkit-scrollbar { | |
| width: 6px; | |
| } | |
| .reasoning-box::-webkit-scrollbar-track { | |
| background: #f1f1f1; | |
| border-radius: 3px; | |
| } | |
| .reasoning-box::-webkit-scrollbar-thumb { | |
| background: #888; | |
| border-radius: 3px; | |
| } | |
| .reasoning-box::-webkit-scrollbar-thumb:hover { | |
| background: #555; | |
| } | |
| .markdown-container { | |
| height: 300px; | |
| overflow-y: auto; | |
| background-color: #f5f5f5; | |
| border-radius: 4px; | |
| margin-bottom: 12px; | |
| font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif; | |
| font-size: 14px; | |
| line-height: 1.6; | |
| white-space: pre-wrap; | |
| word-break: break-word; | |
| width: 100%; | |
| box-sizing: border-box; | |
| scroll-behavior: smooth; | |
| } | |
| """ | |
| def scroll_to_bottom(): | |
| return """ | |
| function() { | |
| setTimeout(() => { | |
| const reasoningBox = document.querySelector('.reasoning-box'); | |
| if (reasoningBox) { | |
| reasoningBox.scrollTop = reasoningBox.scrollHeight; | |
| } | |
| const markdownContainer = document.querySelector('.markdown-container'); | |
| if (markdownContainer) { | |
| markdownContainer.scrollTop = markdownContainer.scrollHeight; | |
| } | |
| }, 100); | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo, ms.Application(), antdx.XProvider(): | |
| with antd.Tabs() as tabs: | |
| with antd.Tabs.Item(key="chat", label="Chatbot"): | |
| with antd.Flex(vertical=True, gap="middle"): | |
| chatbot = pro.Chatbot( | |
| height="calc(100vh - 200px)", | |
| markdown_config=ChatbotMarkdownConfig(allow_tags=["think"]), | |
| welcome_config=ChatbotWelcomeConfig( | |
| variant="borderless", | |
| icon="./assets/minimax-logo.png", | |
| title="Hello, I'm MiniMax-M1", | |
| description="You can input text to get started.", | |
| prompts=ChatbotPromptsConfig( | |
| title="How can I help you today?", | |
| styles={ | |
| "list": { | |
| "width": '100%', | |
| }, | |
| "item": { | |
| "flex": 1, | |
| }, | |
| }, | |
| items=DEFAULT_PROMPTS | |
| ) | |
| ), | |
| user_config=ChatbotUserConfig(actions=["copy", "edit"]), | |
| bot_config=ChatbotBotConfig( | |
| header=MODEL_NAME, | |
| avatar="./assets/minimax-logo.png", | |
| actions=["copy", "retry"] | |
| ) | |
| ) | |
| with antdx.Sender() as sender: | |
| with ms.Slot("prefix"): | |
| with antd.Button(value=None, color="default", variant="text") as clear_btn: | |
| with ms.Slot("icon"): | |
| antd.Icon("ClearOutlined") | |
| clear_btn.click(fn=clear, outputs=[chatbot]) | |
| submit_event = sender.submit( | |
| fn=submit, | |
| inputs=[sender, chatbot], | |
| outputs=[sender, chatbot, clear_btn] | |
| ) | |
| sender.cancel( | |
| fn=cancel, | |
| inputs=[chatbot], | |
| outputs=[chatbot, sender, clear_btn], | |
| cancels=[submit_event], | |
| queue=False | |
| ) | |
| chatbot.retry( | |
| fn=retry, | |
| inputs=[chatbot], | |
| outputs=[sender, chatbot, clear_btn] | |
| ) | |
| chatbot.welcome_prompt_select(fn=prompt_select, outputs=[sender]) | |
| with antd.Tabs.Item(key="code", label="Code Playground (WebDev)"): | |
| with antd.Row(gutter=[32, 12]): | |
| with antd.Col(span=12): | |
| with antd.Flex(vertical=True, gap="middle"): | |
| code_input = antd.InputTextarea( | |
| size="large", | |
| allow_clear=True, | |
| placeholder="Please enter what kind of application you want or choose an example below and click the button" | |
| ) | |
| code_btn = antd.Button("Generate Code", type="primary", size="large") | |
| with antd.Tabs(active_key="reasoning", visible=False) as output_tabs: | |
| with antd.Tabs.Item(key="reasoning", label="π€ Thinking Process"): | |
| reasoning_output = legacy.Markdown( | |
| elem_classes="reasoning-box" | |
| ) | |
| with antd.Tabs.Item(key="code", label="π» Generated Code"): | |
| code_output = legacy.Markdown(elem_classes="markdown-container") | |
| antd.Divider("Examples") | |
| # Examples | |
| with antd.Flex(gap="small", wrap=True): | |
| for example in EXAMPLES: | |
| with antd.Card( | |
| elem_style=dict( | |
| flex="1 1 fit-content"), | |
| hoverable=True) as example_card: | |
| antd.Card.Meta( | |
| title=example['title'], | |
| description=example['description']) | |
| example_card.click( | |
| fn=select_example( | |
| example), | |
| outputs=[code_input]) | |
| with antd.Col(span=12): | |
| with ms.Div(elem_classes="right_panel"): | |
| gr.HTML('<div class="render_header"><span class="header_btn"></span><span class="header_btn"></span><span class="header_btn"></span></div>') | |
| with ms.Slot("extra"): | |
| with ms.Div(elem_id="output-container-extra"): | |
| with antd.Button( | |
| "Download HTML", | |
| type="link", | |
| href_target="_blank", | |
| disabled=True, | |
| ) as download_btn: | |
| with ms.Slot("icon"): | |
| antd.Icon("DownloadOutlined") | |
| download_content = gr.Text(visible=False) | |
| view_code_btn = antd.Button( | |
| "π§βπ» View Code", type="primary") | |
| with antd.Tabs(active_key="empty", render_tab_bar="() => null") as state_tab: | |
| with antd.Tabs.Item(key="empty"): | |
| empty = antd.Empty(description="empty input", elem_classes="right_content") | |
| with antd.Tabs.Item(key="loading"): | |
| loading = antd.Spin(True, tip="coding...", size="large", elem_classes="output-loading") | |
| with antd.Tabs.Item(key="render"): | |
| sandbox = gr.HTML(elem_classes="output-html") | |
| code_btn.click( | |
| generate_code, | |
| inputs=[code_input], | |
| outputs=[code_output, reasoning_output, sandbox, state_tab, output_tabs] | |
| ) | |
| # Add auto-scroll functionality | |
| reasoning_output.change( | |
| fn=scroll_to_bottom, | |
| inputs=[], | |
| outputs=[], | |
| ) | |
| code_output.change( | |
| fn=scroll_to_bottom, | |
| inputs=[], | |
| outputs=[], | |
| ) | |
| def on_tab_change(tab_key): | |
| return gr.update(active_key=tab_key, visible=True) | |
| output_tabs.change( | |
| fn=on_tab_change, | |
| inputs=[output_tabs], | |
| outputs=[output_tabs], | |
| ) | |
| if __name__ == '__main__': | |
| demo.queue(default_concurrency_limit=50).launch(ssr_mode=False) | |