Spaces:
Running
Running
| from typing import Any | |
| from openai.lib.streaming import AssistantEventHandler | |
| from langflow.base.astra_assistants.util import get_patched_openai_client | |
| from langflow.custom.custom_component.component_with_cache import ComponentWithCache | |
| from langflow.inputs import MultilineInput | |
| from langflow.schema import dotdict | |
| from langflow.schema.message import Message | |
| from langflow.template import Output | |
| class AssistantsRun(ComponentWithCache): | |
| display_name = "Run Assistant" | |
| description = "Executes an Assistant Run against a thread" | |
| icon = "AstraDB" | |
| def __init__(self, **kwargs) -> None: | |
| super().__init__(**kwargs) | |
| self.client = get_patched_openai_client(self._shared_component_cache) | |
| self.thread_id = None | |
| def update_build_config( | |
| self, | |
| build_config: dotdict, | |
| field_value: Any, | |
| field_name: str | None = None, | |
| ) -> None: | |
| if field_name == "thread_id": | |
| if field_value is None: | |
| thread = self.client.beta.threads.create() | |
| self.thread_id = thread.id | |
| build_config["thread_id"] = field_value | |
| inputs = [ | |
| MultilineInput( | |
| name="assistant_id", | |
| display_name="Assistant ID", | |
| info=( | |
| "The ID of the assistant to run. \n\n" | |
| "Can be retrieved using the List Assistants component or created with the Create Assistant component." | |
| ), | |
| ), | |
| MultilineInput( | |
| name="user_message", | |
| display_name="User Message", | |
| info="User message to pass to the run.", | |
| ), | |
| MultilineInput( | |
| name="thread_id", | |
| display_name="Thread ID", | |
| required=False, | |
| info="Thread ID to use with the run. If not provided, a new thread will be created.", | |
| ), | |
| MultilineInput( | |
| name="env_set", | |
| display_name="Environment Set", | |
| info="Dummy input to allow chaining with Dotenv Component.", | |
| ), | |
| ] | |
| outputs = [Output(display_name="Assistant Response", name="assistant_response", method="process_inputs")] | |
| def process_inputs(self) -> Message: | |
| text = "" | |
| if self.thread_id is None: | |
| thread = self.client.beta.threads.create() | |
| self.thread_id = thread.id | |
| # add the user message | |
| self.client.beta.threads.messages.create(thread_id=self.thread_id, role="user", content=self.user_message) | |
| class EventHandler(AssistantEventHandler): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| def on_exception(self, exception: Exception) -> None: | |
| raise exception | |
| event_handler = EventHandler() | |
| with self.client.beta.threads.runs.create_and_stream( | |
| thread_id=self.thread_id, | |
| assistant_id=self.assistant_id, | |
| event_handler=event_handler, | |
| ) as stream: | |
| for part in stream.text_deltas: | |
| text += part | |
| return Message(text=text) | |