Spaces:
Sleeping
Sleeping
| from server.db.models.knowledge_base_model import KnowledgeBaseModel | |
| from server.db.models.knowledge_file_model import KnowledgeFileModel, FileDocModel | |
| from server.db.session import with_session | |
| from server.knowledge_base.utils import KnowledgeFile | |
| from typing import List, Dict | |
| def list_docs_from_db(session, | |
| kb_name: str, | |
| file_name: str = None, | |
| metadata: Dict = {}, | |
| ) -> List[Dict]: | |
| ''' | |
| 列出某知识库某文件对应的所有Document。 | |
| 返回形式:[{"id": str, "metadata": dict}, ...] | |
| ''' | |
| docs = session.query(FileDocModel).filter(FileDocModel.kb_name.ilike(kb_name)) | |
| if file_name: | |
| docs = docs.filter(FileDocModel.file_name.ilike(file_name)) | |
| for k, v in metadata.items(): | |
| docs = docs.filter(FileDocModel.meta_data[k].as_string()==str(v)) | |
| return [{"id": x.doc_id, "metadata": x.metadata} for x in docs.all()] | |
| def delete_docs_from_db(session, | |
| kb_name: str, | |
| file_name: str = None, | |
| ) -> List[Dict]: | |
| ''' | |
| 删除某知识库某文件对应的所有Document,并返回被删除的Document。 | |
| 返回形式:[{"id": str, "metadata": dict}, ...] | |
| ''' | |
| docs = list_docs_from_db(kb_name=kb_name, file_name=file_name) | |
| query = session.query(FileDocModel).filter(FileDocModel.kb_name.ilike(kb_name)) | |
| if file_name: | |
| query = query.filter(FileDocModel.file_name.ilike(file_name)) | |
| query.delete(synchronize_session=False) | |
| session.commit() | |
| return docs | |
| def add_docs_to_db(session, | |
| kb_name: str, | |
| file_name: str, | |
| doc_infos: List[Dict]): | |
| ''' | |
| 将某知识库某文件对应的所有Document信息添加到数据库。 | |
| doc_infos形式:[{"id": str, "metadata": dict}, ...] | |
| ''' | |
| #! 这里会出现doc_infos为None的情况,需要进一步排查 | |
| if doc_infos is None: | |
| print("输入的server.db.repository.knowledge_file_repository.add_docs_to_db的doc_infos参数为None") | |
| return False | |
| for d in doc_infos: | |
| obj = FileDocModel( | |
| kb_name=kb_name, | |
| file_name=file_name, | |
| doc_id=d["id"], | |
| meta_data=d["metadata"], | |
| ) | |
| session.add(obj) | |
| return True | |
| def count_files_from_db(session, kb_name: str) -> int: | |
| return session.query(KnowledgeFileModel).filter(KnowledgeFileModel.kb_name.ilike(kb_name)).count() | |
| def list_files_from_db(session, kb_name): | |
| files = session.query(KnowledgeFileModel).filter(KnowledgeFileModel.kb_name.ilike(kb_name)).all() | |
| docs = [f.file_name for f in files] | |
| return docs | |
| def add_file_to_db(session, | |
| kb_file: KnowledgeFile, | |
| docs_count: int = 0, | |
| custom_docs: bool = False, | |
| doc_infos: List[Dict] = [], # 形式:[{"id": str, "metadata": dict}, ...] | |
| ): | |
| kb = session.query(KnowledgeBaseModel).filter_by(kb_name=kb_file.kb_name).first() | |
| if kb: | |
| # 如果已经存在该文件,则更新文件信息与版本号 | |
| existing_file: KnowledgeFileModel = (session.query(KnowledgeFileModel) | |
| .filter(KnowledgeFileModel.kb_name.ilike(kb_file.kb_name), | |
| KnowledgeFileModel.file_name.ilike(kb_file.filename)) | |
| .first()) | |
| mtime = kb_file.get_mtime() | |
| size = kb_file.get_size() | |
| if existing_file: | |
| existing_file.file_mtime = mtime | |
| existing_file.file_size = size | |
| existing_file.docs_count = docs_count | |
| existing_file.custom_docs = custom_docs | |
| existing_file.file_version += 1 | |
| # 否则,添加新文件 | |
| else: | |
| new_file = KnowledgeFileModel( | |
| file_name=kb_file.filename, | |
| file_ext=kb_file.ext, | |
| kb_name=kb_file.kb_name, | |
| document_loader_name=kb_file.document_loader_name, | |
| text_splitter_name=kb_file.text_splitter_name or "SpacyTextSplitter", | |
| file_mtime=mtime, | |
| file_size=size, | |
| docs_count = docs_count, | |
| custom_docs=custom_docs, | |
| ) | |
| kb.file_count += 1 | |
| session.add(new_file) | |
| add_docs_to_db(kb_name=kb_file.kb_name, file_name=kb_file.filename, doc_infos=doc_infos) | |
| return True | |
| def delete_file_from_db(session, kb_file: KnowledgeFile): | |
| existing_file = (session.query(KnowledgeFileModel) | |
| .filter(KnowledgeFileModel.file_name.ilike(kb_file.filename), | |
| KnowledgeFileModel.kb_name.ilike(kb_file.kb_name)) | |
| .first()) | |
| if existing_file: | |
| session.delete(existing_file) | |
| delete_docs_from_db(kb_name=kb_file.kb_name, file_name=kb_file.filename) | |
| session.commit() | |
| kb = session.query(KnowledgeBaseModel).filter(KnowledgeBaseModel.kb_name.ilike(kb_file.kb_name)).first() | |
| if kb: | |
| kb.file_count -= 1 | |
| session.commit() | |
| return True | |
| def delete_files_from_db(session, knowledge_base_name: str): | |
| session.query(KnowledgeFileModel).filter(KnowledgeFileModel.kb_name.ilike(knowledge_base_name)).delete(synchronize_session=False) | |
| session.query(FileDocModel).filter(FileDocModel.kb_name.ilike(knowledge_base_name)).delete(synchronize_session=False) | |
| kb = session.query(KnowledgeBaseModel).filter(KnowledgeBaseModel.kb_name.ilike(knowledge_base_name)).first() | |
| if kb: | |
| kb.file_count = 0 | |
| session.commit() | |
| return True | |
| def file_exists_in_db(session, kb_file: KnowledgeFile): | |
| existing_file = (session.query(KnowledgeFileModel) | |
| .filter(KnowledgeFileModel.file_name.ilike(kb_file.filename), | |
| KnowledgeFileModel.kb_name.ilike(kb_file.kb_name)) | |
| .first()) | |
| return True if existing_file else False | |
| def get_file_detail(session, kb_name: str, filename: str) -> dict: | |
| file: KnowledgeFileModel = (session.query(KnowledgeFileModel) | |
| .filter(KnowledgeFileModel.file_name.ilike(filename), | |
| KnowledgeFileModel.kb_name.ilike(kb_name)) | |
| .first()) | |
| if file: | |
| return { | |
| "kb_name": file.kb_name, | |
| "file_name": file.file_name, | |
| "file_ext": file.file_ext, | |
| "file_version": file.file_version, | |
| "document_loader": file.document_loader_name, | |
| "text_splitter": file.text_splitter_name, | |
| "create_time": file.create_time, | |
| "file_mtime": file.file_mtime, | |
| "file_size": file.file_size, | |
| "custom_docs": file.custom_docs, | |
| "docs_count": file.docs_count, | |
| } | |
| else: | |
| return {} | |