Spaces:
Running
Running
| from collections.abc import Generator | |
| from enum import Enum | |
| from typing import Literal | |
| from pydantic import BaseModel | |
| from typing_extensions import TypedDict | |
| from langflow.schema.data import Data | |
| from langflow.schema.dataframe import DataFrame | |
| from langflow.schema.message import Message | |
| from langflow.schema.serialize import recursive_serialize_or_str | |
| INPUT_FIELD_NAME = "input_value" | |
| InputType = Literal["chat", "text", "any"] | |
| OutputType = Literal["chat", "text", "any", "debug"] | |
| class LogType(str, Enum): | |
| MESSAGE = "message" | |
| DATA = "data" | |
| STREAM = "stream" | |
| OBJECT = "object" | |
| ARRAY = "array" | |
| TEXT = "text" | |
| UNKNOWN = "unknown" | |
| class StreamURL(TypedDict): | |
| location: str | |
| class ErrorLog(TypedDict): | |
| errorMessage: str | |
| stackTrace: str | |
| class OutputValue(BaseModel): | |
| message: ErrorLog | StreamURL | dict | list | str | |
| type: str | |
| def get_type(payload): | |
| result = LogType.UNKNOWN | |
| match payload: | |
| case Message(): | |
| result = LogType.MESSAGE | |
| case Data(): | |
| result = LogType.DATA | |
| case dict(): | |
| result = LogType.OBJECT | |
| case list() | DataFrame(): | |
| result = LogType.ARRAY | |
| case str(): | |
| result = LogType.TEXT | |
| if result == LogType.UNKNOWN and ( | |
| (payload and isinstance(payload, Generator)) | |
| or (isinstance(payload, Message) and isinstance(payload.text, Generator)) | |
| ): | |
| result = LogType.STREAM | |
| return result | |
| def get_message(payload): | |
| message = None | |
| if hasattr(payload, "data"): | |
| message = payload.data | |
| elif hasattr(payload, "model_dump"): | |
| message = payload.model_dump() | |
| if message is None and isinstance(payload, dict | str | Data): | |
| message = payload.data if isinstance(payload, Data) else payload | |
| return message or payload | |
| def build_output_logs(vertex, result) -> dict: | |
| outputs: dict[str, OutputValue] = {} | |
| component_instance = result[0] | |
| for index, output in enumerate(vertex.outputs): | |
| if component_instance.status is None: | |
| payload = component_instance._results | |
| output_result = payload.get(output["name"]) | |
| else: | |
| payload = component_instance._artifacts | |
| output_result = payload.get(output["name"], {}).get("raw") | |
| message = get_message(output_result) | |
| type_ = get_type(output_result) | |
| match type_: | |
| case LogType.STREAM if "stream_url" in message: | |
| message = StreamURL(location=message["stream_url"]) | |
| case LogType.STREAM: | |
| message = "" | |
| case LogType.MESSAGE if hasattr(message, "message"): | |
| message = message.message | |
| case LogType.UNKNOWN: | |
| message = "" | |
| case LogType.ARRAY: | |
| if isinstance(message, DataFrame): | |
| message = message.to_dict(orient="records") | |
| message = [recursive_serialize_or_str(item) for item in message] | |
| name = output.get("name", f"output_{index}") | |
| outputs |= {name: OutputValue(message=message, type=type_).model_dump()} | |
| return outputs | |