Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, APIRouter, UploadFile, HTTPException | |
| from fastapi import File, UploadFile | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import JSONResponse | |
| from pathlib import Path | |
| from typing import List | |
| import os | |
| import shutil | |
| import logging | |
| from app.data_pipeline.data_loader import DocumentLoader | |
| from app.data_pipeline.embedding_manager import split_text,initialize_embedding_model,create_and_store_embeddings | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| logger = logging.getLogger(__name__) | |
| from app.settings import Config | |
| conf = Config() | |
| upload_router = APIRouter() | |
| UPLOAD_DIR = conf.UPLOAD_DIR | |
| COLLECTION_NAME = conf.COLLECTION_NAME | |
| PERSIST_DIRECTORY = conf.PERSIST_DIRECTORY | |
| # Type of files allowed to be uploaded | |
| def is_allowed_file(filename): | |
| allowed_extensions = {"pdf", "csv", "doc", "docx", "txt", "xlsx", "xls"} | |
| return "." in filename and filename.rsplit(".", 1)[1].lower() in allowed_extensions | |
| def empty_folder(folder_path): | |
| # Check if the folder exists | |
| if os.path.exists(folder_path): | |
| # Iterate through all items in the folder | |
| for item in os.listdir(folder_path): | |
| item_path = os.path.join(folder_path, item) | |
| # Remove files and folders | |
| if os.path.isfile(item_path) or os.path.islink(item_path): | |
| os.remove(item_path) | |
| elif os.path.isdir(item_path): | |
| shutil.rmtree(item_path) | |
| print(f"The folder '{folder_path}' has been emptied.") | |
| else: | |
| print(f"The folder '{folder_path}' does not exist.") | |
| async def upload_files(files: List[UploadFile] = File(...)): | |
| try: | |
| # Empty the upload directory | |
| empty_folder(UPLOAD_DIR) | |
| logger.info(f"{UPLOAD_DIR} is now empty.") | |
| # Check if UPLOAD_DIR exists | |
| if not os.path.exists(UPLOAD_DIR): | |
| logger.error(f"Upload directory '{UPLOAD_DIR}' does not exist.") | |
| return JSONResponse(content={"error": f"Folder '{UPLOAD_DIR}' does not exist"}, status_code=404) | |
| # Save uploaded files | |
| for uploaded_file in files: | |
| if not is_allowed_file(uploaded_file.filename): | |
| logger.error(f"File type of '{uploaded_file.filename}' not allowed.") | |
| return JSONResponse(content={"error": "File type not allowed"}, status_code=400) | |
| file_path = os.path.join(UPLOAD_DIR, uploaded_file.filename) | |
| with open(file_path, "wb") as buffer: | |
| buffer.write(uploaded_file.file.read()) | |
| logger.info(f"File '{uploaded_file.filename}' uploaded successfully.") | |
| # Load documents from the upload directory | |
| try: | |
| document_loader = DocumentLoader(UPLOAD_DIR) | |
| documents = document_loader.load_all_documents() | |
| logger.info(f"Loaded {len(documents)} documents.") | |
| except Exception as e: | |
| logger.error(f"Error loading documents: {e}") | |
| return JSONResponse(content={"error": "Failed to load documents"}, status_code=500) | |
| # Process documents into chunks for embedding | |
| try: | |
| chunks = split_text(documents) | |
| logger.info(f"Processed {len(chunks)} chunks for embedding.") | |
| except Exception as e: | |
| logger.error(f"Error processing documents: {e}") | |
| return JSONResponse(content={"error": "Failed to process documents"}, status_code=500) | |
| # Initialize the embedding model | |
| try: | |
| embedding_function = initialize_embedding_model() | |
| except Exception as e: | |
| logger.error(f"Error initializing embedding model: {e}") | |
| return JSONResponse(content={"error": "Failed to initialize embedding model"}, status_code=500) | |
| # Create and store embeddings | |
| try: | |
| create_and_store_embeddings(chunks, COLLECTION_NAME, embedding_function, PERSIST_DIRECTORY) | |
| logger.info("Embeddings created and stored successfully.") | |
| except Exception as e: | |
| logger.error(f"Error creating or storing embeddings: {e}") | |
| return JSONResponse(content={"error": "Failed to create and store embeddings"}, status_code=500) | |
| # Return success message if everything is successful | |
| return JSONResponse(content={"message": "Documents successfully loaded and processed."}) | |
| except Exception as e: | |
| logger.error(f"Unexpected error in upload_files endpoint: {e}") | |
| raise HTTPException(status_code=500, detail="Internal server error.") | |
| # @upload_router.post("/upload") | |
| # async def upload_files(files: List[UploadFile] = File(...)): | |
| # empty_folder(UPLOAD_DIR) | |
| # logger.info(f" {UPLOAD_DIR} is empty Now") | |
| # if not os.path.exists(UPLOAD_DIR): | |
| # logger.error(f"{UPLOAD_DIR}' does not exist") | |
| # return JSONResponse(content={"error": f"Folder '{UPLOAD_DIR}' does not exist"}, status_code=404) | |
| # for uploaded_file in files: | |
| # if not is_allowed_file(uploaded_file.filename): | |
| # logger.error(f"File type not allowed") | |
| # return JSONResponse(content={"error": "File type not allowed"}, status_code=400) | |
| # file_path = os.path.join(UPLOAD_DIR, uploaded_file.filename) | |
| # with open(file_path, "wb") as buffer: | |
| # buffer.write(uploaded_file.file.read()) | |
| # logger.info(f"Files uploaded successfully") | |
| # try: | |
| # document_loader = DocumentLoader(UPLOAD_DIR) | |
| # documents = document_loader.load_all_documents() | |
| # logger.info(f"Loaded {len(documents)} documents.") | |
| # except Exception as e: | |
| # logger.error(f"Error loading documents: {e}") | |
| # return | |
| # try: | |
| # chunks = split_text(documents) | |
| # logger.info(f"Processed {len(chunks)} chunks for embedding.", ) | |
| # except Exception as e: | |
| # logger.error(f"Error processing documents: {e}") | |
| # return | |
| # try: | |
| # embedding_function = initialize_embedding_model() | |
| # except Exception: | |
| # return # Stop execution if embedding model fails | |
| # create_and_store_embeddings(chunks, COLLECTION_NAME, embedding_function, PERSIST_DIRECTORY) | |
| # logger.info(f'Documents Successfully loades') | |
| # return JSONResponse(content={"message": "Documents Successfully loades"}) | |