Spaces:
Running
Running
| from __future__ import annotations | |
| import re | |
| import warnings | |
| from typing import Dict | |
| from langchain.callbacks.manager import ( | |
| AsyncCallbackManagerForChainRun, | |
| CallbackManagerForChainRun, | |
| ) | |
| from langchain.chains.llm import LLMChain | |
| from langchain.pydantic_v1 import Extra, root_validator | |
| from langchain.schema import BasePromptTemplate | |
| from langchain.schema.language_model import BaseLanguageModel | |
| from typing import List, Any, Optional | |
| from langchain.prompts import PromptTemplate | |
| import sys | |
| import os | |
| import json | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) | |
| from server.chat.knowledge_base_chat import knowledge_base_chat | |
| from configs import VECTOR_SEARCH_TOP_K, SCORE_THRESHOLD, MAX_TOKENS | |
| import asyncio | |
| from server.agent import model_container | |
| from pydantic import BaseModel, Field | |
| async def search_knowledge_base_iter(database: str, query: str): | |
| response = await knowledge_base_chat(query=query, | |
| knowledge_base_name=database, | |
| model_name=model_container.MODEL.model_name, | |
| temperature=0.01, | |
| history=[], | |
| top_k=VECTOR_SEARCH_TOP_K, | |
| max_tokens=MAX_TOKENS, | |
| prompt_name="knowledge_base_chat", | |
| score_threshold=SCORE_THRESHOLD, | |
| stream=False) | |
| contents = "" | |
| async for data in response.body_iterator: # 这里的data是一个json字符串 | |
| data = json.loads(data) | |
| contents += data["answer"] | |
| docs = data["docs"] | |
| return contents | |
| _PROMPT_TEMPLATE = """ | |
| 用户会提出一个需要你查询知识库的问题,你应该按照我提供的思想进行思考 | |
| Question: ${{用户的问题}} | |
| 这些数据库是你能访问的,冒号之前是他们的名字,冒号之后是他们的功能: | |
| {database_names} | |
| 你的回答格式应该按照下面的内容,请注意,格式内的```text 等标记都必须输出,这是我用来提取答案的标记。 | |
| ```text | |
| ${{知识库的名称}} | |
| ``` | |
| ```output | |
| 数据库查询的结果 | |
| ``` | |
| 答案: ${{答案}} | |
| 现在,这是我的问题: | |
| 问题: {question} | |
| """ | |
| PROMPT = PromptTemplate( | |
| input_variables=["question", "database_names"], | |
| template=_PROMPT_TEMPLATE, | |
| ) | |
| class LLMKnowledgeChain(LLMChain): | |
| llm_chain: LLMChain | |
| llm: Optional[BaseLanguageModel] = None | |
| """[Deprecated] LLM wrapper to use.""" | |
| prompt: BasePromptTemplate = PROMPT | |
| """[Deprecated] Prompt to use to translate to python if necessary.""" | |
| database_names: Dict[str, str] = model_container.DATABASE | |
| input_key: str = "question" #: :meta private: | |
| output_key: str = "answer" #: :meta private: | |
| class Config: | |
| """Configuration for this pydantic object.""" | |
| extra = Extra.forbid | |
| arbitrary_types_allowed = True | |
| def raise_deprecation(cls, values: Dict) -> Dict: | |
| if "llm" in values: | |
| warnings.warn( | |
| "Directly instantiating an LLMKnowledgeChain with an llm is deprecated. " | |
| "Please instantiate with llm_chain argument or using the from_llm " | |
| "class method." | |
| ) | |
| if "llm_chain" not in values and values["llm"] is not None: | |
| prompt = values.get("prompt", PROMPT) | |
| values["llm_chain"] = LLMChain(llm=values["llm"], prompt=prompt) | |
| return values | |
| def input_keys(self) -> List[str]: | |
| """Expect input key. | |
| :meta private: | |
| """ | |
| return [self.input_key] | |
| def output_keys(self) -> List[str]: | |
| """Expect output key. | |
| :meta private: | |
| """ | |
| return [self.output_key] | |
| def _evaluate_expression(self, dataset, query) -> str: | |
| try: | |
| output = asyncio.run(search_knowledge_base_iter(dataset, query)) | |
| except Exception as e: | |
| output = "输入的信息有误或不存在知识库" | |
| return output | |
| return output | |
| def _process_llm_result( | |
| self, | |
| llm_output: str, | |
| llm_input: str, | |
| run_manager: CallbackManagerForChainRun | |
| ) -> Dict[str, str]: | |
| run_manager.on_text(llm_output, color="green", verbose=self.verbose) | |
| llm_output = llm_output.strip() | |
| text_match = re.search(r"^```text(.*?)```", llm_output, re.DOTALL) | |
| if text_match: | |
| database = text_match.group(1).strip() | |
| output = self._evaluate_expression(database, llm_input) | |
| run_manager.on_text("\nAnswer: ", verbose=self.verbose) | |
| run_manager.on_text(output, color="yellow", verbose=self.verbose) | |
| answer = "Answer: " + output | |
| elif llm_output.startswith("Answer:"): | |
| answer = llm_output | |
| elif "Answer:" in llm_output: | |
| answer = "Answer: " + llm_output.split("Answer:")[-1] | |
| else: | |
| return {self.output_key: f"输入的格式不对: {llm_output}"} | |
| return {self.output_key: answer} | |
| async def _aprocess_llm_result( | |
| self, | |
| llm_output: str, | |
| run_manager: AsyncCallbackManagerForChainRun, | |
| ) -> Dict[str, str]: | |
| await run_manager.on_text(llm_output, color="green", verbose=self.verbose) | |
| llm_output = llm_output.strip() | |
| text_match = re.search(r"^```text(.*?)```", llm_output, re.DOTALL) | |
| if text_match: | |
| expression = text_match.group(1) | |
| output = self._evaluate_expression(expression) | |
| await run_manager.on_text("\nAnswer: ", verbose=self.verbose) | |
| await run_manager.on_text(output, color="yellow", verbose=self.verbose) | |
| answer = "Answer: " + output | |
| elif llm_output.startswith("Answer:"): | |
| answer = llm_output | |
| elif "Answer:" in llm_output: | |
| answer = "Answer: " + llm_output.split("Answer:")[-1] | |
| else: | |
| raise ValueError(f"unknown format from LLM: {llm_output}") | |
| return {self.output_key: answer} | |
| def _call( | |
| self, | |
| inputs: Dict[str, str], | |
| run_manager: Optional[CallbackManagerForChainRun] = None, | |
| ) -> Dict[str, str]: | |
| _run_manager = run_manager or CallbackManagerForChainRun.get_noop_manager() | |
| _run_manager.on_text(inputs[self.input_key]) | |
| data_formatted_str = ',\n'.join([f' "{k}":"{v}"' for k, v in self.database_names.items()]) | |
| llm_output = self.llm_chain.predict( | |
| database_names=data_formatted_str, | |
| question=inputs[self.input_key], | |
| stop=["```output"], | |
| callbacks=_run_manager.get_child(), | |
| ) | |
| return self._process_llm_result(llm_output, inputs[self.input_key], _run_manager) | |
| async def _acall( | |
| self, | |
| inputs: Dict[str, str], | |
| run_manager: Optional[AsyncCallbackManagerForChainRun] = None, | |
| ) -> Dict[str, str]: | |
| _run_manager = run_manager or AsyncCallbackManagerForChainRun.get_noop_manager() | |
| await _run_manager.on_text(inputs[self.input_key]) | |
| data_formatted_str = ',\n'.join([f' "{k}":"{v}"' for k, v in self.database_names.items()]) | |
| llm_output = await self.llm_chain.apredict( | |
| database_names=data_formatted_str, | |
| question=inputs[self.input_key], | |
| stop=["```output"], | |
| callbacks=_run_manager.get_child(), | |
| ) | |
| return await self._aprocess_llm_result(llm_output, inputs[self.input_key], _run_manager) | |
| def _chain_type(self) -> str: | |
| return "llm_knowledge_chain" | |
| def from_llm( | |
| cls, | |
| llm: BaseLanguageModel, | |
| prompt: BasePromptTemplate = PROMPT, | |
| **kwargs: Any, | |
| ) -> LLMKnowledgeChain: | |
| llm_chain = LLMChain(llm=llm, prompt=prompt) | |
| return cls(llm_chain=llm_chain, **kwargs) | |
| def search_knowledgebase_once(query: str): | |
| model = model_container.MODEL | |
| llm_knowledge = LLMKnowledgeChain.from_llm(model, verbose=True, prompt=PROMPT) | |
| ans = llm_knowledge.run(query) | |
| return ans | |
| class KnowledgeSearchInput(BaseModel): | |
| location: str = Field(description="The query to be searched") | |
| if __name__ == "__main__": | |
| result = search_knowledgebase_once("大数据的男女比例") | |
| print(result) | |