Spaces:
Running
Running
| import copy | |
| import json | |
| from datetime import datetime | |
| from decimal import Decimal | |
| from typing import cast | |
| from uuid import UUID | |
| from langchain_core.documents import Document | |
| from langchain_core.messages import AIMessage, BaseMessage, HumanMessage | |
| from loguru import logger | |
| from pydantic import BaseModel, model_serializer, model_validator | |
| from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_USER | |
| from langflow.utils.image import create_data_url | |
| class Data(BaseModel): | |
| """Represents a record with text and optional data. | |
| Attributes: | |
| data (dict, optional): Additional data associated with the record. | |
| """ | |
| text_key: str = "text" | |
| data: dict = {} | |
| default_value: str | None = "" | |
| def validate_data(cls, values): | |
| if not isinstance(values, dict): | |
| msg = "Data must be a dictionary" | |
| raise ValueError(msg) # noqa: TRY004 | |
| if not values.get("data"): | |
| values["data"] = {} | |
| # Any other keyword should be added to the data dictionary | |
| for key in values: | |
| if key not in values["data"] and key not in {"text_key", "data", "default_value"}: | |
| values["data"][key] = values[key] | |
| return values | |
| def serialize_model(self): | |
| return {k: v.to_json() if hasattr(v, "to_json") else v for k, v in self.data.items()} | |
| def get_text(self): | |
| """Retrieves the text value from the data dictionary. | |
| If the text key is present in the data dictionary, the corresponding value is returned. | |
| Otherwise, the default value is returned. | |
| Returns: | |
| The text value from the data dictionary or the default value. | |
| """ | |
| return self.data.get(self.text_key, self.default_value) | |
| def set_text(self, text: str | None) -> str: | |
| r"""Sets the text value in the data dictionary. | |
| The object's `text` value is set to `text parameter as given, with the following modifications: | |
| - `text` value of `None` is converted to an empty string. | |
| - `text` value is converted to `str` type. | |
| Args: | |
| text (str): The text to be set in the data dictionary. | |
| Returns: | |
| str: The text value that was set in the data dictionary. | |
| """ | |
| new_text = "" if text is None else str(text) | |
| self.data[self.text_key] = new_text | |
| return new_text | |
| def from_document(cls, document: Document) -> "Data": | |
| """Converts a Document to a Data. | |
| Args: | |
| document (Document): The Document to convert. | |
| Returns: | |
| Data: The converted Data. | |
| """ | |
| data = document.metadata | |
| data["text"] = document.page_content | |
| return cls(data=data, text_key="text") | |
| def from_lc_message(cls, message: BaseMessage) -> "Data": | |
| """Converts a BaseMessage to a Data. | |
| Args: | |
| message (BaseMessage): The BaseMessage to convert. | |
| Returns: | |
| Data: The converted Data. | |
| """ | |
| data: dict = {"text": message.content} | |
| data["metadata"] = cast("dict", message.to_json()) | |
| return cls(data=data, text_key="text") | |
| def __add__(self, other: "Data") -> "Data": | |
| """Combines the data of two data by attempting to add values for overlapping keys. | |
| Combines the data of two data by attempting to add values for overlapping keys | |
| for all types that support the addition operation. Falls back to the value from 'other' | |
| record when addition is not supported. | |
| """ | |
| combined_data = self.data.copy() | |
| for key, value in other.data.items(): | |
| # If the key exists in both data and both values support the addition operation | |
| if key in combined_data: | |
| try: | |
| combined_data[key] += value | |
| except TypeError: | |
| # Fallback: Use the value from 'other' record if addition is not supported | |
| combined_data[key] = value | |
| else: | |
| # If the key is not in the first record, simply add it | |
| combined_data[key] = value | |
| return Data(data=combined_data) | |
| def to_lc_document(self) -> Document: | |
| """Converts the Data to a Document. | |
| Returns: | |
| Document: The converted Document. | |
| """ | |
| data_copy = self.data.copy() | |
| text = data_copy.pop(self.text_key, self.default_value) | |
| if isinstance(text, str): | |
| return Document(page_content=text, metadata=data_copy) | |
| return Document(page_content=str(text), metadata=data_copy) | |
| def to_lc_message( | |
| self, | |
| ) -> BaseMessage: | |
| """Converts the Data to a BaseMessage. | |
| Returns: | |
| BaseMessage: The converted BaseMessage. | |
| """ | |
| # The idea of this function is to be a helper to convert a Data to a BaseMessage | |
| # It will use the "sender" key to determine if the message is Human or AI | |
| # If the key is not present, it will default to AI | |
| # But first we check if all required keys are present in the data dictionary | |
| # they are: "text", "sender" | |
| if not all(key in self.data for key in ["text", "sender"]): | |
| msg = f"Missing required keys ('text', 'sender') in Data: {self.data}" | |
| raise ValueError(msg) | |
| sender = self.data.get("sender", MESSAGE_SENDER_AI) | |
| text = self.data.get("text", "") | |
| files = self.data.get("files", []) | |
| if sender == MESSAGE_SENDER_USER: | |
| if files: | |
| contents = [{"type": "text", "text": text}] | |
| for file_path in files: | |
| image_url = create_data_url(file_path) | |
| contents.append({"type": "image_url", "image_url": {"url": image_url}}) | |
| human_message = HumanMessage(content=contents) | |
| else: | |
| human_message = HumanMessage( | |
| content=[{"type": "text", "text": text}], | |
| ) | |
| return human_message | |
| return AIMessage(content=text) | |
| def __getattr__(self, key): | |
| """Allows attribute-like access to the data dictionary.""" | |
| try: | |
| if key.startswith("__"): | |
| return self.__getattribute__(key) | |
| if key in {"data", "text_key"} or key.startswith("_"): | |
| return super().__getattr__(key) | |
| return self.data[key] | |
| except KeyError as e: | |
| # Fallback to default behavior to raise AttributeError for undefined attributes | |
| msg = f"'{type(self).__name__}' object has no attribute '{key}'" | |
| raise AttributeError(msg) from e | |
| def __setattr__(self, key, value) -> None: | |
| """Set attribute-like values in the data dictionary. | |
| Allows attribute-like setting of values in the data dictionary. | |
| while still allowing direct assignment to class attributes. | |
| """ | |
| if key in {"data", "text_key"} or key.startswith("_"): | |
| super().__setattr__(key, value) | |
| elif key in self.model_fields: | |
| self.data[key] = value | |
| super().__setattr__(key, value) | |
| else: | |
| self.data[key] = value | |
| def __delattr__(self, key) -> None: | |
| """Allows attribute-like deletion from the data dictionary.""" | |
| if key in {"data", "text_key"} or key.startswith("_"): | |
| super().__delattr__(key) | |
| else: | |
| del self.data[key] | |
| def __deepcopy__(self, memo): | |
| """Custom deepcopy implementation to handle copying of the Data object.""" | |
| # Create a new Data object with a deep copy of the data dictionary | |
| return Data(data=copy.deepcopy(self.data, memo), text_key=self.text_key, default_value=self.default_value) | |
| # check which attributes the Data has by checking the keys in the data dictionary | |
| def __dir__(self): | |
| return super().__dir__() + list(self.data.keys()) | |
| def __str__(self) -> str: | |
| # return a JSON string representation of the Data atributes | |
| try: | |
| data = {k: v.to_json() if hasattr(v, "to_json") else v for k, v in self.data.items()} | |
| return serialize_data(data) # use the custom serializer | |
| except Exception: # noqa: BLE001 | |
| logger.opt(exception=True).debug("Error converting Data to JSON") | |
| return str(self.data) | |
| def __contains__(self, key) -> bool: | |
| return key in self.data | |
| def __eq__(self, /, other): | |
| return isinstance(other, Data) and self.data == other.data | |
| def custom_serializer(obj): | |
| if isinstance(obj, datetime): | |
| return obj.astimezone().isoformat() | |
| if isinstance(obj, Decimal): | |
| return float(obj) | |
| if isinstance(obj, UUID): | |
| return str(obj) | |
| if isinstance(obj, BaseModel): | |
| return obj.model_dump() | |
| # Add more custom serialization rules as needed | |
| msg = f"Type {type(obj)} not serializable" | |
| raise TypeError(msg) | |
| def serialize_data(data): | |
| return json.dumps(data, indent=4, default=custom_serializer) | |