Spaces:
Running
Running
| import random | |
| import time | |
| from pathlib import Path | |
| import httpx | |
| import orjson | |
| from locust import FastHttpUser, between, task | |
| from rich import print # noqa: A004 | |
| class NameTest(FastHttpUser): | |
| wait_time = between(1, 5) | |
| with Path("names.txt").open(encoding="utf-8") as file: | |
| names = [line.strip() for line in file] | |
| headers: dict = {} | |
| def poll_task(self, task_id, sleep_time=1): | |
| while True: | |
| with self.rest( | |
| "GET", | |
| f"/task/{task_id}", | |
| name="task_status", | |
| headers=self.headers, | |
| ) as response: | |
| status = response.js.get("status") | |
| print(f"Poll Response: {response.js}") | |
| if status == "SUCCESS": | |
| return response.js.get("result") | |
| if status in {"FAILURE", "REVOKED"}: | |
| msg = f"Task failed with status: {status}" | |
| raise ValueError(msg) | |
| time.sleep(sleep_time) | |
| def process(self, name, flow_id, payload): | |
| task_id = None | |
| print(f"Processing {payload}") | |
| with self.rest( | |
| "POST", | |
| f"/process/{flow_id}", | |
| json=payload, | |
| name="process", | |
| headers=self.headers, | |
| ) as response: | |
| print(response.js) | |
| if response.status_code != 200: | |
| response.failure("Process call failed") | |
| msg = "Process call failed" | |
| raise ValueError(msg) | |
| task_id = response.js.get("id") | |
| session_id = response.js.get("session_id") | |
| assert task_id, "Inner Task ID not found" | |
| assert task_id, "Task ID not found" | |
| result = self.poll_task(task_id) | |
| print(f"Result for {name}: {result}") | |
| return result, session_id | |
| def send_name_and_check(self): | |
| name = random.choice(self.names) # noqa: S311 | |
| payload1 = { | |
| "inputs": {"text": f"Hello, My name is {name}"}, | |
| "sync": False, | |
| } | |
| _result1, session_id = self.process(name, self.flow_id, payload1) | |
| payload2 = { | |
| "inputs": {"text": "What is my name? Please, answer like this: Your name is <name>"}, | |
| "session_id": session_id, | |
| "sync": False, | |
| } | |
| result2, session_id = self.process(name, self.flow_id, payload2) | |
| assert f"Your name is {name}" in str(result2), "Name not found in response" | |
| def on_start(self): | |
| print("Starting") | |
| login_data = {"username": "superuser", "password": "superuser"} | |
| response = httpx.post(f"{self.host}/login", data=login_data) | |
| print(response.json()) | |
| tokens = response.json() | |
| print(tokens) | |
| a_token = tokens["access_token"] | |
| logged_in_headers = {"Authorization": f"Bearer {a_token}"} | |
| print("Logged in") | |
| json_flow = (Path(__file__).parent.parent / "data" / "BasicChatwithPromptandHistory.json").read_text( | |
| encoding="utf-8" | |
| ) | |
| flow = orjson.loads(json_flow) | |
| data = flow["data"] | |
| # Create test data | |
| flow = {"name": "Flow 1", "description": "description", "data": data} | |
| print("Creating flow") | |
| # Make request to endpoint | |
| response = httpx.post( | |
| f"{self.host}/flows/", | |
| json=flow, | |
| headers=logged_in_headers, | |
| ) | |
| self.flow_id = response.json()["id"] | |
| print(f"Flow ID: {self.flow_id}") | |
| # read all users | |
| response = httpx.get( | |
| f"{self.host}/users/", | |
| headers=logged_in_headers, | |
| ) | |
| print(response.json()) | |
| user_id = next( | |
| (user["id"] for user in response.json()["users"] if user["username"] == "superuser"), | |
| None, | |
| ) | |
| # Create api key | |
| response = httpx.post( | |
| f"{self.host}/api_key/", | |
| json={"user_id": user_id}, | |
| headers=logged_in_headers, | |
| ) | |
| print(response.json()) | |
| self.headers["x-api-key"] = response.json()["api_key"] | |