# MiniCPM-4.1-8B-Eagle3 from pathlib import Path import time import logging import gradio as gr import torch import spaces import threading from transformers import AutoTokenizer, TextIteratorStreamer # 导入模型相关模块 from eagle.model.ea_model import EaModel from utils_chatbot import organize_messages, stream2display_text, mtp_new_tokens # 日志配置 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 全局模型实例 global_model = None # 全局模型缓存(在GPU进程中) _gpu_model_cache = None # 全局模型配置 model_config = dict( base_model_path = "openbmb/MiniCPM4.1-8B", ea_model_path = "openbmb/MiniCPM4.1-8B-Eagle3/MiniCPM4_1-8B-Eagle3-bf16", total_token=40, depth=3, top_k=10, threshold=1.0, use_eagle3=True, device_map = "cpu", trust_remote_code=True, ) # 提前加载 tokenizer tokenizer = AutoTokenizer.from_pretrained( "openbmb/MiniCPM4.1-8B", use_fast=False, device_map="cpu", ) def _initialize_gpu_model(): """在GPU进程中获取模型并移到GPU""" global _gpu_model_cache if _gpu_model_cache is None: logger.info(f"在GPU进程中初始化模型") _gpu_model_cache = EaModel.from_pretrained(**model_config) logger.info(f"模型在CPU上初始化完成") return _gpu_model_cache @spaces.GPU(duration=42) # default is 60 def gpu_handler(inputs): prompt_text = tokenizer.apply_chat_template( inputs, tokenize=False, add_generation_prompt=True, ) model_inputs = tokenizer([prompt_text], return_tensors="pt") inputs = { "model_inputs": model_inputs, "max_new_tokens": 65536, "temperature": 0.6, "top_p": 0.95, "top_k": 50, "max_length": 65536, } logger.info(f"向 GPU 搬运 global_model") """GPU推理处理器""" model = _initialize_gpu_model() cuda_inputs = dict( input_ids=inputs["model_inputs"].input_ids.to("cuda"), # attention_mask=inputs["model_inputs"].attention_mask.to("cuda"), max_new_tokens=inputs["max_new_tokens"], temperature=inputs["temperature"], top_p=inputs["top_p"], top_k=inputs["top_k"], max_length=inputs["max_length"], ) model.base_model.to("cuda") model.ea_layer.to("cuda") model.ea_layer.tree_mask_init.to("cuda") logger.info(f"pass inputs to global_model") output_ids = model.eagenerate(**cuda_inputs) logger.info(f"got outputs from global_model.eagenerate") new_text = tokenizer.decode( output_ids[0][model_inputs.input_ids.shape[1]:], skip_special_tokens=True, ) return new_text @spaces.GPU(duration=60) # default is 60 def gpu_handler_s( inputs, history, temperature, top_p, use_eagle, ): prompt_text = tokenizer.apply_chat_template( inputs, tokenize=False, add_generation_prompt=True, ) model_inputs = tokenizer([prompt_text], return_tensors="pt") inputs = { "model_inputs": model_inputs, "max_new_tokens": 4096, "temperature": temperature, "top_p": top_p, "top_k": 50, "max_length": 65536, } logger.info(f"向 GPU 搬运 global_model") """GPU推理处理器""" model = _initialize_gpu_model() cuda_inputs = dict( input_ids=inputs["model_inputs"].input_ids.to("cuda"), # attention_mask=inputs["model_inputs"].attention_mask.to("cuda"), max_new_tokens=inputs["max_new_tokens"], temperature=inputs["temperature"], top_p=inputs["top_p"], top_k=inputs["top_k"], max_length=inputs["max_length"], ) model.base_model.to("cuda") model.ea_layer.to("cuda") model.ea_layer.tree_mask_init.to("cuda") logger.info(f"pass inputs to global_model") yield "", history stop_token_ids = [ tokenizer.eos_token_id, tokenizer.convert_tokens_to_ids("<|eot_id|>") ] gen_tk_count, existing_tk_count = 0, len(inputs["model_inputs"].input_ids[0]) stream_text, start_time = "", time.time() generate_func = model.ea_generate if use_eagle else model.naive_generate for output_ids in generate_func(**cuda_inputs): # for output_ids in model.ea_generate(**cuda_inputs): new_tokens, gen_tk_count = mtp_new_tokens(output_ids, gen_tk_count, existing_tk_count, stop_token_ids) new_token_text = tokenizer.decode(new_tokens, skip_special_tokens=False) logger.info(f"[MTP]'''{new_token_text}'''") stream_text += new_token_text token_per_sec = gen_tk_count / (time.time() - start_time) display_text = stream2display_text(stream_text, token_per_sec) history[-1] = (history[-1][0], display_text) yield "", history history[-1] = (history[-1][0], stream_text.replace("<|im_end|>", "")) # 替换 history 为非 display 形态的 text class Model: """模型封装类,不持有实际模型对象""" def __init__(self): logger.info(f"创建封装类") def handler(self, inputs): """非流式推理处理器""" return gpu_handler(inputs) def stream_handler(self, inputs, history, **kwargs): """流式推理处理器""" yield from gpu_handler_s(inputs, history, **kwargs) def initialize_model(): """初始化全局模型""" global global_model, _gpu_model_cache # 默认配置 logger.info(f"="*50) logger.info(f"启动 MiniCPM-4.1-8B-Eagle3 Chatbot 服务") logger.info(f"="*50) # 创建模型封装类 global_model = Model() # 在主进程中预加载模型到CPU(For faster 首次推理) try: logger.info("在主进程中预加载模型到 CPU...") _gpu_model_cache = EaModel.from_pretrained(**model_config) logger.info("模型在主进程CPU上预加载完成") except Exception as e: logger.warning(f"主进程预加载模型失败, 将在GPU进程中加载: {e}") _gpu_model_cache = None return global_model def gen_response(message, history, temperature, top_p): chat_msg_ls = organize_messages(message, history) new_text = global_model.handler(chat_msg_ls) history.append((message, new_text)) return "", history def gen_response_stream( message, history, temperature, top_p, use_eagle, ): chat_msg_ls = organize_messages(message, history) history.append((message, "")) sampling_kwargs = dict( temperature = temperature, top_p = top_p, use_eagle = use_eagle, ) yield from global_model.stream_handler(chat_msg_ls, history, **sampling_kwargs) def create_app(): assets_path = Path.cwd().absolute()/"assets" gr.set_static_paths(paths=[assets_path]) logger.info(f"Static resource path: {assets_path}. READY.") theme = gr.themes.Soft( primary_hue="blue", secondary_hue="gray", neutral_hue="slate", font=[gr.themes.GoogleFont("Inter"), "Arial", "sans-serif"], ) with gr.Blocks( theme=theme, css=""" .logo-container { text-align: center; margin: 0.5rem 0 1rem 0; } .logo-container img { height: 96px; width: auto; max-width: 200px; display: inline-block; } .input-box { border: 1px solid #2f63b8; border-radius: 8px; } """, ) as demo: with gr.Row(): with gr.Column(scale=1): gr.HTML('
