Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| from pprint import pprint | |
| import os | |
| import shutil | |
| import sys | |
| root_path = Path(__file__).parent.parent | |
| sys.path.append(str(root_path)) | |
| from server.knowledge_base.kb_service.base import KBServiceFactory | |
| from server.knowledge_base.utils import get_kb_path, get_doc_path, KnowledgeFile | |
| from server.knowledge_base.migrate import folder2db, prune_db_docs, prune_folder_files | |
| # setup test knowledge base | |
| kb_name = "test_kb_for_migrate" | |
| test_files = { | |
| "readme.md": str(root_path / "readme.md"), | |
| } | |
| kb_path = get_kb_path(kb_name) | |
| doc_path = get_doc_path(kb_name) | |
| if not os.path.isdir(doc_path): | |
| os.makedirs(doc_path) | |
| for k, v in test_files.items(): | |
| shutil.copy(v, os.path.join(doc_path, k)) | |
| def test_recreate_vs(): | |
| folder2db([kb_name], "recreate_vs") | |
| kb = KBServiceFactory.get_service_by_name(kb_name) | |
| assert kb and kb.exists() | |
| files = kb.list_files() | |
| print(files) | |
| for name in test_files: | |
| assert name in files | |
| path = os.path.join(doc_path, name) | |
| # list docs based on file name | |
| docs = kb.list_docs(file_name=name) | |
| assert len(docs) > 0 | |
| pprint(docs[0]) | |
| for doc in docs: | |
| assert doc.metadata["source"] == name | |
| # list docs base on metadata | |
| docs = kb.list_docs(metadata={"source": name}) | |
| assert len(docs) > 0 | |
| for doc in docs: | |
| assert doc.metadata["source"] == name | |
| def test_increment(): | |
| kb = KBServiceFactory.get_service_by_name(kb_name) | |
| kb.clear_vs() | |
| assert kb.list_files() == [] | |
| assert kb.list_docs() == [] | |
| folder2db([kb_name], "increment") | |
| files = kb.list_files() | |
| print(files) | |
| for f in test_files: | |
| assert f in files | |
| docs = kb.list_docs(file_name=f) | |
| assert len(docs) > 0 | |
| pprint(docs[0]) | |
| for doc in docs: | |
| assert doc.metadata["source"] == f | |
| def test_prune_db(): | |
| del_file, keep_file = list(test_files)[:2] | |
| os.remove(os.path.join(doc_path, del_file)) | |
| prune_db_docs([kb_name]) | |
| kb = KBServiceFactory.get_service_by_name(kb_name) | |
| files = kb.list_files() | |
| print(files) | |
| assert del_file not in files | |
| assert keep_file in files | |
| docs = kb.list_docs(file_name=del_file) | |
| assert len(docs) == 0 | |
| docs = kb.list_docs(file_name=keep_file) | |
| assert len(docs) > 0 | |
| pprint(docs[0]) | |
| shutil.copy(test_files[del_file], os.path.join(doc_path, del_file)) | |
| def test_prune_folder(): | |
| del_file, keep_file = list(test_files)[:2] | |
| kb = KBServiceFactory.get_service_by_name(kb_name) | |
| # delete docs for file | |
| kb.delete_doc(KnowledgeFile(del_file, kb_name)) | |
| files = kb.list_files() | |
| print(files) | |
| assert del_file not in files | |
| assert keep_file in files | |
| docs = kb.list_docs(file_name=del_file) | |
| assert len(docs) == 0 | |
| docs = kb.list_docs(file_name=keep_file) | |
| assert len(docs) > 0 | |
| docs = kb.list_docs(file_name=del_file) | |
| assert len(docs) == 0 | |
| assert os.path.isfile(os.path.join(doc_path, del_file)) | |
| # prune folder | |
| prune_folder_files([kb_name]) | |
| # check result | |
| assert not os.path.isfile(os.path.join(doc_path, del_file)) | |
| assert os.path.isfile(os.path.join(doc_path, keep_file)) | |
| def test_drop_kb(): | |
| kb = KBServiceFactory.get_service_by_name(kb_name) | |
| kb.drop_kb() | |
| assert not kb.exists() | |
| assert not os.path.isdir(kb_path) | |
| kb = KBServiceFactory.get_service_by_name(kb_name) | |
| assert kb is None | |