Spaces:
Paused
Paused
| import logging | |
| import os | |
| import uuid | |
| from fnmatch import fnmatch | |
| from pathlib import Path | |
| from typing import Optional | |
| from urllib.parse import quote | |
| from fastapi import ( | |
| APIRouter, | |
| Depends, | |
| File, | |
| HTTPException, | |
| Request, | |
| UploadFile, | |
| status, | |
| Query, | |
| ) | |
| from fastapi.responses import FileResponse, StreamingResponse | |
| from open_webui.constants import ERROR_MESSAGES | |
| from open_webui.env import SRC_LOG_LEVELS | |
| from open_webui.models.users import Users | |
| from open_webui.models.files import ( | |
| FileForm, | |
| FileModel, | |
| FileModelResponse, | |
| Files, | |
| ) | |
| from open_webui.models.knowledge import Knowledges | |
| from open_webui.routers.knowledge import get_knowledge, get_knowledge_list | |
| from open_webui.routers.retrieval import ProcessFileForm, process_file | |
| from open_webui.routers.audio import transcribe | |
| from open_webui.storage.provider import Storage | |
| from open_webui.utils.auth import get_admin_user, get_verified_user | |
| from pydantic import BaseModel | |
| log = logging.getLogger(__name__) | |
| log.setLevel(SRC_LOG_LEVELS["MODELS"]) | |
| router = APIRouter() | |
| ############################ | |
| # Check if the current user has access to a file through any knowledge bases the user may be in. | |
| ############################ | |
| def has_access_to_file( | |
| file_id: Optional[str], access_type: str, user=Depends(get_verified_user) | |
| ) -> bool: | |
| file = Files.get_file_by_id(file_id) | |
| log.debug(f"Checking if user has {access_type} access to file") | |
| if not file: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| has_access = False | |
| knowledge_base_id = file.meta.get("collection_name") if file.meta else None | |
| if knowledge_base_id: | |
| knowledge_bases = Knowledges.get_knowledge_bases_by_user_id( | |
| user.id, access_type | |
| ) | |
| for knowledge_base in knowledge_bases: | |
| if knowledge_base.id == knowledge_base_id: | |
| has_access = True | |
| break | |
| return has_access | |
| ############################ | |
| # Upload File | |
| ############################ | |
| def upload_file( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| user=Depends(get_verified_user), | |
| file_metadata: dict = None, | |
| process: bool = Query(True), | |
| ): | |
| log.info(f"file.content_type: {file.content_type}") | |
| file_metadata = file_metadata if file_metadata else {} | |
| try: | |
| unsanitized_filename = file.filename | |
| filename = os.path.basename(unsanitized_filename) | |
| # replace filename with uuid | |
| id = str(uuid.uuid4()) | |
| name = filename | |
| filename = f"{id}_{filename}" | |
| tags = { | |
| "OpenWebUI-User-Email": user.email, | |
| "OpenWebUI-User-Id": user.id, | |
| "OpenWebUI-User-Name": user.name, | |
| "OpenWebUI-File-Id": id, | |
| } | |
| contents, file_path = Storage.upload_file(file.file, filename, tags) | |
| file_item = Files.insert_new_file( | |
| user.id, | |
| FileForm( | |
| **{ | |
| "id": id, | |
| "filename": name, | |
| "path": file_path, | |
| "meta": { | |
| "name": name, | |
| "content_type": file.content_type, | |
| "size": len(contents), | |
| "data": file_metadata, | |
| }, | |
| } | |
| ), | |
| ) | |
| if process: | |
| try: | |
| if file.content_type.startswith( | |
| ( | |
| "audio/mpeg", | |
| "audio/wav", | |
| "audio/ogg", | |
| "audio/x-m4a", | |
| "audio/webm", | |
| "video/webm", | |
| ) | |
| ): | |
| file_path = Storage.get_file(file_path) | |
| result = transcribe(request, file_path) | |
| process_file( | |
| request, | |
| ProcessFileForm(file_id=id, content=result.get("text", "")), | |
| user=user, | |
| ) | |
| elif file.content_type not in [ | |
| "image/png", | |
| "image/jpeg", | |
| "image/gif", | |
| "video/mp4", | |
| "video/ogg", | |
| "video/quicktime", | |
| ]: | |
| process_file(request, ProcessFileForm(file_id=id), user=user) | |
| file_item = Files.get_file_by_id(id=id) | |
| except Exception as e: | |
| log.exception(e) | |
| log.error(f"Error processing file: {file_item.id}") | |
| file_item = FileModelResponse( | |
| **{ | |
| **file_item.model_dump(), | |
| "error": str(e.detail) if hasattr(e, "detail") else str(e), | |
| } | |
| ) | |
| if file_item: | |
| return file_item | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error uploading file"), | |
| ) | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT(e), | |
| ) | |
| ############################ | |
| # List Files | |
| ############################ | |
| async def list_files(user=Depends(get_verified_user), content: bool = Query(True)): | |
| if user.role == "admin": | |
| files = Files.get_files() | |
| else: | |
| files = Files.get_files_by_user_id(user.id) | |
| if not content: | |
| for file in files: | |
| if "content" in file.data: | |
| del file.data["content"] | |
| return files | |
| ############################ | |
| # Search Files | |
| ############################ | |
| async def search_files( | |
| filename: str = Query( | |
| ..., | |
| description="Filename pattern to search for. Supports wildcards such as '*.txt'", | |
| ), | |
| content: bool = Query(True), | |
| user=Depends(get_verified_user), | |
| ): | |
| """ | |
| Search for files by filename with support for wildcard patterns. | |
| """ | |
| # Get files according to user role | |
| if user.role == "admin": | |
| files = Files.get_files() | |
| else: | |
| files = Files.get_files_by_user_id(user.id) | |
| # Get matching files | |
| matching_files = [ | |
| file for file in files if fnmatch(file.filename.lower(), filename.lower()) | |
| ] | |
| if not matching_files: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="No files found matching the pattern.", | |
| ) | |
| if not content: | |
| for file in matching_files: | |
| if "content" in file.data: | |
| del file.data["content"] | |
| return matching_files | |
| ############################ | |
| # Delete All Files | |
| ############################ | |
| async def delete_all_files(user=Depends(get_admin_user)): | |
| result = Files.delete_all_files() | |
| if result: | |
| try: | |
| Storage.delete_all_files() | |
| except Exception as e: | |
| log.exception(e) | |
| log.error("Error deleting files") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), | |
| ) | |
| return {"message": "All files deleted successfully"} | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), | |
| ) | |
| ############################ | |
| # Get File By Id | |
| ############################ | |
| async def get_file_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if not file: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| if ( | |
| file.user_id == user.id | |
| or user.role == "admin" | |
| or has_access_to_file(id, "read", user) | |
| ): | |
| return file | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # Get File Data Content By Id | |
| ############################ | |
| async def get_file_data_content_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if not file: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| if ( | |
| file.user_id == user.id | |
| or user.role == "admin" | |
| or has_access_to_file(id, "read", user) | |
| ): | |
| return {"content": file.data.get("content", "")} | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # Update File Data Content By Id | |
| ############################ | |
| class ContentForm(BaseModel): | |
| content: str | |
| async def update_file_data_content_by_id( | |
| request: Request, id: str, form_data: ContentForm, user=Depends(get_verified_user) | |
| ): | |
| file = Files.get_file_by_id(id) | |
| if not file: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| if ( | |
| file.user_id == user.id | |
| or user.role == "admin" | |
| or has_access_to_file(id, "write", user) | |
| ): | |
| try: | |
| process_file( | |
| request, | |
| ProcessFileForm(file_id=id, content=form_data.content), | |
| user=user, | |
| ) | |
| file = Files.get_file_by_id(id=id) | |
| except Exception as e: | |
| log.exception(e) | |
| log.error(f"Error processing file: {file.id}") | |
| return {"content": file.data.get("content", "")} | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # Get File Content By Id | |
| ############################ | |
| async def get_file_content_by_id( | |
| id: str, user=Depends(get_verified_user), attachment: bool = Query(False) | |
| ): | |
| file = Files.get_file_by_id(id) | |
| if not file: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| if ( | |
| file.user_id == user.id | |
| or user.role == "admin" | |
| or has_access_to_file(id, "read", user) | |
| ): | |
| try: | |
| file_path = Storage.get_file(file.path) | |
| file_path = Path(file_path) | |
| # Check if the file already exists in the cache | |
| if file_path.is_file(): | |
| # Handle Unicode filenames | |
| filename = file.meta.get("name", file.filename) | |
| encoded_filename = quote(filename) # RFC5987 encoding | |
| content_type = file.meta.get("content_type") | |
| filename = file.meta.get("name", file.filename) | |
| encoded_filename = quote(filename) | |
| headers = {} | |
| if attachment: | |
| headers["Content-Disposition"] = ( | |
| f"attachment; filename*=UTF-8''{encoded_filename}" | |
| ) | |
| else: | |
| if content_type == "application/pdf" or filename.lower().endswith( | |
| ".pdf" | |
| ): | |
| headers["Content-Disposition"] = ( | |
| f"inline; filename*=UTF-8''{encoded_filename}" | |
| ) | |
| content_type = "application/pdf" | |
| elif content_type != "text/plain": | |
| headers["Content-Disposition"] = ( | |
| f"attachment; filename*=UTF-8''{encoded_filename}" | |
| ) | |
| return FileResponse(file_path, headers=headers, media_type=content_type) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| except Exception as e: | |
| log.exception(e) | |
| log.error("Error getting file content") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error getting file content"), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| async def get_html_file_content_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if not file: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| file_user = Users.get_user_by_id(file.user_id) | |
| if not file_user.role == "admin": | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| if ( | |
| file.user_id == user.id | |
| or user.role == "admin" | |
| or has_access_to_file(id, "read", user) | |
| ): | |
| try: | |
| file_path = Storage.get_file(file.path) | |
| file_path = Path(file_path) | |
| # Check if the file already exists in the cache | |
| if file_path.is_file(): | |
| log.info(f"file_path: {file_path}") | |
| return FileResponse(file_path) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| except Exception as e: | |
| log.exception(e) | |
| log.error("Error getting file content") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error getting file content"), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| async def get_file_content_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if not file: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| if ( | |
| file.user_id == user.id | |
| or user.role == "admin" | |
| or has_access_to_file(id, "read", user) | |
| ): | |
| file_path = file.path | |
| # Handle Unicode filenames | |
| filename = file.meta.get("name", file.filename) | |
| encoded_filename = quote(filename) # RFC5987 encoding | |
| headers = { | |
| "Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}" | |
| } | |
| if file_path: | |
| file_path = Storage.get_file(file_path) | |
| file_path = Path(file_path) | |
| # Check if the file already exists in the cache | |
| if file_path.is_file(): | |
| return FileResponse(file_path, headers=headers) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| else: | |
| # File path doesn’t exist, return the content as .txt if possible | |
| file_content = file.content.get("content", "") | |
| file_name = file.filename | |
| # Create a generator that encodes the file content | |
| def generator(): | |
| yield file_content.encode("utf-8") | |
| return StreamingResponse( | |
| generator(), | |
| media_type="text/plain", | |
| headers=headers, | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # Delete File By Id | |
| ############################ | |
| async def delete_file_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if not file: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| if ( | |
| file.user_id == user.id | |
| or user.role == "admin" | |
| or has_access_to_file(id, "write", user) | |
| ): | |
| # We should add Chroma cleanup here | |
| result = Files.delete_file_by_id(id) | |
| if result: | |
| try: | |
| Storage.delete_file(file.path) | |
| except Exception as e: | |
| log.exception(e) | |
| log.error("Error deleting files") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), | |
| ) | |
| return {"message": "File deleted successfully"} | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error deleting file"), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |