Spaces:
Running
Running
| from typing import List | |
| from llama_index.core import StorageContext, VectorStoreIndex | |
| from llama_index.core.schema import Document | |
| from .config import get_vector_store, store_ingested_repo | |
| async def ingest_documents_async(documents: List[Document], repo_name: str = None): | |
| """Async version of document ingestion with detailed logging and repo tracking""" | |
| print(f"π Starting async ingestion of {len(documents)} documents") | |
| if repo_name: | |
| print(f"π Repository: {repo_name}") | |
| try: | |
| # Get vector store | |
| vector_store = get_vector_store() | |
| print(f"β Vector store retrieved: {type(vector_store)}") | |
| # Create storage context | |
| vector_store_context = StorageContext.from_defaults(vector_store=vector_store) | |
| print(f"β Vector Store context created: {type(vector_store_context)}") | |
| # Process documents and ensure repo metadata | |
| print("π Processing documents through pipeline...") | |
| ingested_files = [] | |
| for i, doc in enumerate(documents): | |
| print(f"π Doc {i + 1}: {doc.doc_id} - {len(doc.text)} chars") | |
| print(f" Metadata: {doc.metadata}") | |
| # Ensure repo metadata is properly set | |
| if repo_name and "repo" not in doc.metadata: | |
| doc.metadata["repo"] = repo_name | |
| print(f" β Added repo metadata: {repo_name}") | |
| # Track ingested file paths | |
| file_path = doc.metadata.get("file_path", doc.doc_id) | |
| if file_path not in ingested_files: | |
| ingested_files.append(file_path) | |
| # Run the ingestion | |
| print("π Starting vector store ingestion...") | |
| vc_store_index = VectorStoreIndex.from_documents( | |
| documents=documents, | |
| storage_context=vector_store_context, | |
| show_progress=True, | |
| ) | |
| print("β Document Ingestion completed Successfully") | |
| # Store repository metadata if repo_name is provided | |
| if repo_name and ingested_files: | |
| store_success = store_ingested_repo(repo_name, ingested_files) | |
| if store_success: | |
| print(f"β Repository metadata stored for {repo_name}") | |
| else: | |
| print(f"β οΈ Failed to store repository metadata for {repo_name}") | |
| return vc_store_index | |
| except Exception as e: | |
| print(f"β Error in async ingestion: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| raise e |