Spaces:
Running
Running
| from uuid import UUID | |
| import pytest | |
| from httpx import AsyncClient | |
| from langflow.memory import aadd_messagetables | |
| # Assuming you have these imports available | |
| from langflow.services.database.models.message import MessageCreate, MessageRead, MessageUpdate | |
| from langflow.services.database.models.message.model import MessageTable | |
| from langflow.services.deps import async_session_scope | |
| async def created_message(): | |
| async with async_session_scope() as session: | |
| message = MessageCreate(text="Test message", sender="User", sender_name="User", session_id="session_id") | |
| messagetable = MessageTable.model_validate(message, from_attributes=True) | |
| messagetables = await aadd_messagetables([messagetable], session) | |
| return MessageRead.model_validate(messagetables[0], from_attributes=True) | |
| async def created_messages(session): # noqa: ARG001 | |
| async with async_session_scope() as _session: | |
| messages = [ | |
| MessageCreate(text="Test message 1", sender="User", sender_name="User", session_id="session_id2"), | |
| MessageCreate(text="Test message 2", sender="User", sender_name="User", session_id="session_id2"), | |
| MessageCreate(text="Test message 3", sender="User", sender_name="User", session_id="session_id2"), | |
| ] | |
| messagetables = [MessageTable.model_validate(message, from_attributes=True) for message in messages] | |
| return await aadd_messagetables(messagetables, _session) | |
| async def test_delete_messages(client: AsyncClient, created_messages, logged_in_headers): | |
| response = await client.request( | |
| "DELETE", "api/v1/monitor/messages", json=[str(msg.id) for msg in created_messages], headers=logged_in_headers | |
| ) | |
| assert response.status_code == 204, response.text | |
| assert response.reason_phrase == "No Content" | |
| async def test_update_message(client: AsyncClient, logged_in_headers, created_message): | |
| message_id = created_message.id | |
| message_update = MessageUpdate(text="Updated content") | |
| response = await client.put( | |
| f"api/v1/monitor/messages/{message_id}", json=message_update.model_dump(), headers=logged_in_headers | |
| ) | |
| assert response.status_code == 200, response.text | |
| updated_message = MessageRead(**response.json()) | |
| assert updated_message.text == "Updated content" | |
| async def test_update_message_not_found(client: AsyncClient, logged_in_headers): | |
| non_existent_id = UUID("00000000-0000-0000-0000-000000000000") | |
| message_update = MessageUpdate(text="Updated content") | |
| response = await client.put( | |
| f"api/v1/monitor/messages/{non_existent_id}", json=message_update.model_dump(), headers=logged_in_headers | |
| ) | |
| assert response.status_code == 404, response.text | |
| assert response.json()["detail"] == "Message not found" | |
| async def test_delete_messages_session(client: AsyncClient, created_messages, logged_in_headers): | |
| session_id = "session_id2" | |
| response = await client.delete(f"api/v1/monitor/messages/session/{session_id}", headers=logged_in_headers) | |
| assert response.status_code == 204 | |
| assert response.reason_phrase == "No Content" | |
| assert len(created_messages) == 3 | |
| response = await client.get("api/v1/monitor/messages", headers=logged_in_headers) | |
| assert response.status_code == 200 | |
| assert len(response.json()) == 0 | |
| # Successfully update session ID for all messages with the old session ID | |
| async def test_successfully_update_session_id(client, logged_in_headers, created_messages): | |
| old_session_id = "session_id2" | |
| new_session_id = "new_session_id" | |
| response = await client.patch( | |
| f"api/v1/monitor/messages/session/{old_session_id}", | |
| params={"new_session_id": new_session_id}, | |
| headers=logged_in_headers, | |
| ) | |
| assert response.status_code == 200, response.text | |
| updated_messages = response.json() | |
| assert len(updated_messages) == len(created_messages) | |
| for message in updated_messages: | |
| assert message["session_id"] == new_session_id | |
| response = await client.get( | |
| "api/v1/monitor/messages", headers=logged_in_headers, params={"session_id": new_session_id} | |
| ) | |
| assert response.status_code == 200 | |
| assert len(response.json()) == len(created_messages) | |
| for message in response.json(): | |
| assert message["session_id"] == new_session_id | |
| # No messages found with the given session ID | |
| async def test_no_messages_found_with_given_session_id(client, logged_in_headers): | |
| old_session_id = "non_existent_session_id" | |
| new_session_id = "new_session_id" | |
| response = await client.patch( | |
| f"/messages/session/{old_session_id}", params={"new_session_id": new_session_id}, headers=logged_in_headers | |
| ) | |
| assert response.status_code == 404, response.text | |
| assert response.json()["detail"] == "Not Found" | |