Spaces:
Running
Running
| from collections.abc import Generator | |
| from enum import Enum | |
| from fastapi.encoders import jsonable_encoder | |
| from loguru import logger | |
| from pydantic import BaseModel | |
| from langflow.schema.data import Data | |
| from langflow.schema.dataframe import DataFrame | |
| from langflow.schema.encoders import CUSTOM_ENCODERS | |
| from langflow.schema.message import Message | |
| from langflow.schema.serialize import recursive_serialize_or_str | |
| class ArtifactType(str, Enum): | |
| TEXT = "text" | |
| DATA = "data" | |
| OBJECT = "object" | |
| ARRAY = "array" | |
| STREAM = "stream" | |
| UNKNOWN = "unknown" | |
| MESSAGE = "message" | |
| def get_artifact_type(value, build_result=None) -> str: | |
| result = ArtifactType.UNKNOWN | |
| match value: | |
| case Message(): | |
| if not isinstance(value.text, str): | |
| enum_value = get_artifact_type(value.text) | |
| result = ArtifactType(enum_value) | |
| else: | |
| result = ArtifactType.MESSAGE | |
| case Data(): | |
| enum_value = get_artifact_type(value.data) | |
| result = ArtifactType(enum_value) | |
| case str(): | |
| result = ArtifactType.TEXT | |
| case dict(): | |
| result = ArtifactType.OBJECT | |
| case list() | DataFrame(): | |
| result = ArtifactType.ARRAY | |
| if result == ArtifactType.UNKNOWN and ( | |
| (build_result and isinstance(build_result, Generator)) | |
| or (isinstance(value, Message) and isinstance(value.text, Generator)) | |
| ): | |
| result = ArtifactType.STREAM | |
| return result.value | |
| def _to_list_of_dicts(raw): | |
| raw_ = [] | |
| for item in raw: | |
| if hasattr(item, "dict") or hasattr(item, "model_dump"): | |
| raw_.append(recursive_serialize_or_str(item)) | |
| else: | |
| raw_.append(str(item)) | |
| return raw_ | |
| def post_process_raw(raw, artifact_type: str): | |
| if artifact_type == ArtifactType.STREAM.value: | |
| raw = "" | |
| elif artifact_type == ArtifactType.ARRAY.value: | |
| raw = raw.to_dict(orient="records") if isinstance(raw, DataFrame) else _to_list_of_dicts(raw) | |
| elif artifact_type == ArtifactType.UNKNOWN.value and raw is not None: | |
| if isinstance(raw, BaseModel | dict): | |
| try: | |
| raw = jsonable_encoder(raw, custom_encoder=CUSTOM_ENCODERS) | |
| artifact_type = ArtifactType.OBJECT.value | |
| except Exception: # noqa: BLE001 | |
| logger.opt(exception=True).debug(f"Error converting to json: {raw} ({type(raw)})") | |
| raw = "Built Successfully ✨" | |
| else: | |
| raw = "Built Successfully ✨" | |
| return raw, artifact_type | |