Spaces:
Running
Running
| import base64 | |
| import contextlib | |
| import hashlib | |
| import tempfile | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING, Any | |
| from fastapi import UploadFile | |
| from platformdirs import user_cache_dir | |
| if TYPE_CHECKING: | |
| from langflow.api.v1.schemas import BuildStatus | |
| CACHE: dict[str, Any] = {} | |
| CACHE_DIR = user_cache_dir("langflow", "langflow") | |
| PREFIX = "langflow_cache" | |
| class CacheMiss: | |
| def __repr__(self) -> str: | |
| return "<CACHE_MISS>" | |
| def __bool__(self) -> bool: | |
| return False | |
| def create_cache_folder(func): | |
| def wrapper(*args, **kwargs): | |
| # Get the destination folder | |
| cache_path = Path(CACHE_DIR) / PREFIX | |
| # Create the destination folder if it doesn't exist | |
| cache_path.mkdir(parents=True, exist_ok=True) | |
| return func(*args, **kwargs) | |
| return wrapper | |
| def clear_old_cache_files(max_cache_size: int = 3) -> None: | |
| cache_dir = Path(tempfile.gettempdir()) / PREFIX | |
| cache_files = list(cache_dir.glob("*.dill")) | |
| if len(cache_files) > max_cache_size: | |
| cache_files_sorted_by_mtime = sorted(cache_files, key=lambda x: x.stat().st_mtime, reverse=True) | |
| for cache_file in cache_files_sorted_by_mtime[max_cache_size:]: | |
| with contextlib.suppress(OSError): | |
| cache_file.unlink() | |
| def filter_json(json_data): | |
| filtered_data = json_data.copy() | |
| # Remove 'viewport' and 'chatHistory' keys | |
| if "viewport" in filtered_data: | |
| del filtered_data["viewport"] | |
| if "chatHistory" in filtered_data: | |
| del filtered_data["chatHistory"] | |
| # Filter nodes | |
| if "nodes" in filtered_data: | |
| for node in filtered_data["nodes"]: | |
| if "position" in node: | |
| del node["position"] | |
| if "positionAbsolute" in node: | |
| del node["positionAbsolute"] | |
| if "selected" in node: | |
| del node["selected"] | |
| if "dragging" in node: | |
| del node["dragging"] | |
| return filtered_data | |
| def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> str: | |
| """Save a binary file to the specified folder. | |
| Args: | |
| content: The content of the file as a bytes object. | |
| file_name: The name of the file, including its extension. | |
| accepted_types: A list of accepted file types. | |
| Returns: | |
| The path to the saved file. | |
| """ | |
| if not any(file_name.endswith(suffix) for suffix in accepted_types): | |
| msg = f"File {file_name} is not accepted" | |
| raise ValueError(msg) | |
| # Get the destination folder | |
| cache_path = Path(CACHE_DIR) / PREFIX | |
| if not content: | |
| msg = "Please, reload the file in the loader." | |
| raise ValueError(msg) | |
| data = content.split(",")[1] | |
| decoded_bytes = base64.b64decode(data) | |
| # Create the full file path | |
| file_path = cache_path / file_name | |
| # Save the binary content to the file | |
| file_path.write_bytes(decoded_bytes) | |
| return str(file_path) | |
| def save_uploaded_file(file: UploadFile, folder_name): | |
| """Save an uploaded file to the specified folder with a hash of its content as the file name. | |
| Args: | |
| file: The uploaded file object. | |
| folder_name: The name of the folder to save the file in. | |
| Returns: | |
| The path to the saved file. | |
| """ | |
| cache_path = Path(CACHE_DIR) | |
| folder_path = cache_path / folder_name | |
| filename = file.filename | |
| file_extension = Path(filename).suffix if isinstance(filename, str | Path) else "" | |
| file_object = file.file | |
| # Create the folder if it doesn't exist | |
| if not folder_path.exists(): | |
| folder_path.mkdir() | |
| # Create a hash of the file content | |
| sha256_hash = hashlib.sha256() | |
| # Reset the file cursor to the beginning of the file | |
| file_object.seek(0) | |
| # Iterate over the uploaded file in small chunks to conserve memory | |
| while chunk := file_object.read(8192): # Read 8KB at a time (adjust as needed) | |
| sha256_hash.update(chunk) | |
| # Use the hex digest of the hash as the file name | |
| hex_dig = sha256_hash.hexdigest() | |
| file_name = f"{hex_dig}{file_extension}" | |
| # Reset the file cursor to the beginning of the file | |
| file_object.seek(0) | |
| # Save the file with the hash as its name | |
| file_path = folder_path / file_name | |
| with file_path.open("wb") as new_file: | |
| while chunk := file_object.read(8192): | |
| new_file.write(chunk) | |
| return file_path | |
| def update_build_status(cache_service, flow_id: str, status: "BuildStatus") -> None: | |
| cached_flow = cache_service[flow_id] | |
| if cached_flow is None: | |
| msg = f"Flow {flow_id} not found in cache" | |
| raise ValueError(msg) | |
| cached_flow["status"] = status | |
| cache_service[flow_id] = cached_flow | |
| cached_flow["status"] = status | |
| cache_service[flow_id] = cached_flow | |
| CACHE_MISS = CacheMiss() | |