Spaces:
Build error
Build error
| import re | |
| import requests | |
| import chardet | |
| import config as cfg | |
| from bs4 import BeautifulSoup | |
| from pathlib import Path | |
| from transformers import AutoTokenizer | |
| from duckduckgo_search import DDGS | |
| def log_in(uid, state): | |
| state['chat_history'] = [] | |
| state['thinking_history'] = '' | |
| state['uid'] = uid | |
| if uid!=0: | |
| response = f"Your Log In UID: {uid}" | |
| else: | |
| response = f"You Are Not Logged In Yet, Use Public Directory" | |
| user_dir = Path(cfg.USER_DIR) / str(uid) | |
| user_dir.mkdir(parents=True, exist_ok=True) | |
| state['user_dir'] = user_dir | |
| # 加载历史会话 | |
| state['available_history'] = [] | |
| for json_file in user_dir.rglob("*.json"): | |
| state['available_history'].append(json_file.stem) | |
| return response, state | |
| def clean_response(response_content): | |
| response_content = re.sub(r'\*\*|__', '', response_content) | |
| response_content = re.sub(r'\\\(|\\\)|\\\[|\\\]', '', response_content) | |
| response_content = re.sub(r'\\boxed\{([^}]*)\}', r'\1', response_content) | |
| response_content = re.sub(r'\\\\', '', response_content) | |
| response_content = re.sub(r'\n\s*\n', '\n\n', response_content) | |
| response_content = re.sub(r'\s+', ' ', response_content) | |
| return response_content.strip() | |
| def parse_output(response_content): | |
| cleaned_content = clean_response(response_content) | |
| if "<think>" in cleaned_content and "</think>" in cleaned_content: | |
| split_pattern = r'<think>|</think>' | |
| parts = re.split(split_pattern, cleaned_content) | |
| return parts[1], parts[2] | |
| return None, cleaned_content | |
| def parse_chat_history(chat_history): | |
| """从保存的历史会话中解析出chatbot可以识别的格式 | |
| chat_history示例: | |
| [ | |
| { | |
| "role": "user", | |
| "content": "hello" | |
| }, | |
| { | |
| "role": "assistant", | |
| "content": " Hello! How can I assist you today? 😊" | |
| } | |
| ] | |
| Args: | |
| chat_history (list): _description_ | |
| """ | |
| from gradio import Warning | |
| try: | |
| assert len(chat_history) % 2 == 0 | |
| except AssertionError: | |
| Warning('历史会话可能有遗失,用户和AI的消息数不匹配,截断最后一条消息...') | |
| chat_history = chat_history[:-1] | |
| if len(chat_history) == 0: | |
| Warning('历史会话为空或无法匹配,加载历史会话失败...') | |
| return [] | |
| messages = [] | |
| responses = [] | |
| for conversation in chat_history: | |
| if conversation['role'] == 'user': | |
| messages.append(conversation['content']) | |
| elif conversation['role'] == 'assistant': | |
| responses.append(conversation['content']) | |
| if len(messages) != len(responses): | |
| Warning('用户和AI的消息无法匹配,加载历史会话失败...') | |
| return [] | |
| return list(zip(messages, responses)) | |
| def web_search(query: str, max_results: int = 3): | |
| """获取网络搜索结果并提取关键内容""" | |
| try: | |
| # 获取搜索结果链接 | |
| with DDGS() as ddgs: | |
| results = [r for r in ddgs.text(query, max_results=max_results)] | |
| # 提取网页正文 | |
| web_contents = [] | |
| for result in results: | |
| try: | |
| response = requests.get(result['href'], timeout=5) | |
| encoding = chardet.detect(response.content)['encoding'] | |
| if response.encoding != encoding: | |
| response.encoding = encoding | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # 提取主要文本内容(可根据网站结构调整) | |
| main_content = soup.find('main') or soup.find('article') or soup.body | |
| web_contents.append({ | |
| 'title': result['title'], | |
| 'content': main_content.get_text(separator=' ', strip=True)[:1000] # 限制长度 | |
| }) | |
| except Exception as e: | |
| print('该结果搜索异常...', e) | |
| continue | |
| return web_contents | |
| except Exception as e: | |
| print('网络搜索异常,返回空...', e) | |
| return [] | |
| def parse_net_search(search_res): | |
| res = [] | |
| for item in search_res: | |
| if len(item['content']) > 0: | |
| res.append(f"标题:\n{item['title']}\n内容:\n{item['content']}\n") | |
| return res | |
| def wash_up_content(doc_score): | |
| """_summary_ | |
| Args: | |
| doc (string): 整个文档的内容,可能有多行,用换行符分割 | |
| """ | |
| if isinstance(doc_score, tuple): | |
| doc, score = doc_score | |
| else: | |
| doc = doc_score | |
| score = None | |
| res = list(filter(lambda x: len(x) > 0, doc.split('\n'))) | |
| prefix = '✅<br>"' if score is None else '✅Recall with score:{:.3f}<br>'.format(score) | |
| res[0] = prefix + res[0] | |
| return res | |
| MODEL_HF_MAPPING = { | |
| "qwen2.5:14b-instruct": "Qwen/Qwen2.5-14B-Instruct", | |
| "qwen2.5:32b-instruct": "Qwen/Qwen2.5-32B-Instruct", | |
| "qwen2.5:7b-instruct": "Qwen/Qwen2.5-7B-Instruct", | |
| "qwen2.5:3b-instruct": "Qwen/Qwen2.5-3B-Instruct", | |
| "qwen2.5:0.5b-instruct": "Qwen/Qwen2.5-0.5B-Instruct", | |
| "qwen2.5:0.5b": "Qwen/Qwen2.5-0.5B", | |
| "qwen2.5:32b": "Qwen/Qwen2.5-32B", | |
| "qwen3:32b": "Qwen/Qwen3-32B", | |
| "qwen3:14b": "Qwen/Qwen3-14B", | |
| "qwen3:4b": "Qwen/Qwen3-4B", | |
| "qwen3:8b": "Qwen/Qwen3-8B", | |
| "qwen3:30b-a3b": "Qwen/Qwen3-30B-A3B", | |
| "qwq": "Qwen/QwQ-32B", | |
| "deepseek-r1:14b":"deepseek-ai/DeepSeek-R1-Distill-Qwen-14B", | |
| "deepseek-r1:7b": "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B", | |
| "deepseek-r1:32b": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", | |
| } | |
| def load_tokenizer(model_name): | |
| hf_model_name = MODEL_HF_MAPPING.get(model_name, model_name) | |
| return AutoTokenizer.from_pretrained(hf_model_name, use_fast=True) | |
| def messages_to_prompt(messages): | |
| # 按模型要求的格式拼接(此处为示例,需根据实际模型调整) | |
| prompt = "" | |
| for msg in messages: | |
| prompt += f"{msg['role']}: {msg['content']}\n" | |
| return prompt.strip() | |
| def count_tokens_local(messages, tokenizer): | |
| prompt = messages_to_prompt(messages) | |
| return len(tokenizer(prompt, return_tensors=None, truncation=False)["input_ids"]) | |
| def concate_metadata(metadata): | |
| """把Document对象的metadata的各个键值拼接起来 | |
| Args: | |
| metadata (dict): _description_ | |
| """ | |
| return '\n'.join([f"{k}: {v}" for k, v in metadata.items()]) | |
| if __name__ == "__main__": | |
| query = "今天几号" | |
| ret = web_search(query) | |
| for item in ret: | |
| print(item) | |
| ret = parse_net_search(ret) | |
| print(ret) |