Spaces:
Running
Running
| from typing import TYPE_CHECKING, Any | |
| from loguru import logger | |
| from langflow.base.flow_processing.utils import build_data_from_result_data | |
| from langflow.custom import CustomComponent | |
| from langflow.graph.graph.base import Graph | |
| from langflow.graph.vertex.base import Vertex | |
| from langflow.helpers.flow import get_flow_inputs | |
| from langflow.schema import Data | |
| from langflow.schema.dotdict import dotdict | |
| from langflow.template.field.base import Input | |
| if TYPE_CHECKING: | |
| from langflow.graph.schema import RunOutputs | |
| class SubFlowComponent(CustomComponent): | |
| display_name = "Sub Flow" | |
| description = ( | |
| "Dynamically Generates a Component from a Flow. The output is a list of data with keys 'result' and 'message'." | |
| ) | |
| beta: bool = True | |
| field_order = ["flow_name"] | |
| name = "SubFlow" | |
| def get_flow_names(self) -> list[str]: | |
| flow_datas = self.list_flows() | |
| return [flow_data.data["name"] for flow_data in flow_datas] | |
| def get_flow(self, flow_name: str) -> Data | None: | |
| flow_datas = self.list_flows() | |
| for flow_data in flow_datas: | |
| if flow_data.data["name"] == flow_name: | |
| return flow_data | |
| return None | |
| def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): | |
| logger.debug(f"Updating build config with field value {field_value} and field name {field_name}") | |
| if field_name == "flow_name": | |
| build_config["flow_name"]["options"] = self.get_flow_names() | |
| # Clean up the build config | |
| for key in list(build_config.keys()): | |
| if key not in {*self.field_order, "code", "_type", "get_final_results_only"}: | |
| del build_config[key] | |
| if field_value is not None and field_name == "flow_name": | |
| try: | |
| flow_data = self.get_flow(field_value) | |
| except Exception: # noqa: BLE001 | |
| logger.exception(f"Error getting flow {field_value}") | |
| else: | |
| if not flow_data: | |
| msg = f"Flow {field_value} not found." | |
| logger.error(msg) | |
| else: | |
| try: | |
| graph = Graph.from_payload(flow_data.data["data"]) | |
| # Get all inputs from the graph | |
| inputs = get_flow_inputs(graph) | |
| # Add inputs to the build config | |
| build_config = self.add_inputs_to_build_config(inputs, build_config) | |
| except Exception: # noqa: BLE001 | |
| logger.exception(f"Error building graph for flow {field_value}") | |
| return build_config | |
| def add_inputs_to_build_config(self, inputs: list[Vertex], build_config: dotdict): | |
| new_fields: list[Input] = [] | |
| for vertex in inputs: | |
| field = Input( | |
| display_name=vertex.display_name, | |
| name=vertex.id, | |
| info=vertex.description, | |
| field_type="str", | |
| value=None, | |
| ) | |
| new_fields.append(field) | |
| logger.debug(new_fields) | |
| for field in new_fields: | |
| build_config[field.name] = field.to_dict() | |
| return build_config | |
| def build_config(self): | |
| return { | |
| "input_value": { | |
| "display_name": "Input Value", | |
| "multiline": True, | |
| }, | |
| "flow_name": { | |
| "display_name": "Flow Name", | |
| "info": "The name of the flow to run.", | |
| "options": [], | |
| "real_time_refresh": True, | |
| "refresh_button": True, | |
| }, | |
| "tweaks": { | |
| "display_name": "Tweaks", | |
| "info": "Tweaks to apply to the flow.", | |
| }, | |
| "get_final_results_only": { | |
| "display_name": "Get Final Results Only", | |
| "info": "If False, the output will contain all outputs from the flow.", | |
| "advanced": True, | |
| }, | |
| } | |
| async def build(self, flow_name: str, **kwargs) -> list[Data]: | |
| tweaks = {key: {"input_value": value} for key, value in kwargs.items()} | |
| run_outputs: list[RunOutputs | None] = await self.run_flow( | |
| tweaks=tweaks, | |
| flow_name=flow_name, | |
| ) | |
| if not run_outputs: | |
| return [] | |
| run_output = run_outputs[0] | |
| data = [] | |
| if run_output is not None: | |
| for output in run_output.outputs: | |
| if output: | |
| data.extend(build_data_from_result_data(output)) | |
| self.status = data | |
| logger.debug(data) | |
| return data | |