Spaces:
Running
Running
| import json | |
| import os | |
| from unittest.mock import patch | |
| import pytest | |
| from langflow.logging.logger import SizedLogBuffer | |
| def sized_log_buffer(): | |
| return SizedLogBuffer() | |
| def test_init_default(): | |
| buffer = SizedLogBuffer() | |
| assert buffer.max == 0 | |
| assert buffer._max_readers == 20 | |
| def test_init_with_env_variable(): | |
| with patch.dict(os.environ, {"LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE": "100"}): | |
| buffer = SizedLogBuffer() | |
| assert buffer.max == 100 | |
| def test_write(sized_log_buffer): | |
| message = json.dumps({"text": "Test log", "record": {"time": {"timestamp": 1625097600.1244334}}}) | |
| sized_log_buffer.max = 1 # Set max size to 1 for testing | |
| sized_log_buffer.write(message) | |
| assert len(sized_log_buffer.buffer) == 1 | |
| assert sized_log_buffer.buffer[0][0] == 1625097600124 | |
| assert sized_log_buffer.buffer[0][1] == "Test log" | |
| def test_write_overflow(sized_log_buffer): | |
| sized_log_buffer.max = 2 | |
| messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)] | |
| for message in messages: | |
| sized_log_buffer.write(message) | |
| assert len(sized_log_buffer.buffer) == 2 | |
| assert sized_log_buffer.buffer[0][0] == 1625097601000 | |
| assert sized_log_buffer.buffer[1][0] == 1625097602000 | |
| def test_len(sized_log_buffer): | |
| sized_log_buffer.max = 3 | |
| messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)] | |
| for message in messages: | |
| sized_log_buffer.write(message) | |
| assert len(sized_log_buffer) == 3 | |
| def test_get_after_timestamp(sized_log_buffer): | |
| sized_log_buffer.max = 5 | |
| messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] | |
| for message in messages: | |
| sized_log_buffer.write(message) | |
| result = sized_log_buffer.get_after_timestamp(1625097602000, lines=2) | |
| assert len(result) == 2 | |
| assert 1625097603000 in result | |
| assert 1625097602000 in result | |
| def test_get_before_timestamp(sized_log_buffer): | |
| sized_log_buffer.max = 5 | |
| messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] | |
| for message in messages: | |
| sized_log_buffer.write(message) | |
| result = sized_log_buffer.get_before_timestamp(1625097603000, lines=2) | |
| assert len(result) == 2 | |
| assert 1625097601000 in result | |
| assert 1625097602000 in result | |
| def test_get_last_n(sized_log_buffer): | |
| sized_log_buffer.max = 5 | |
| messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] | |
| for message in messages: | |
| sized_log_buffer.write(message) | |
| result = sized_log_buffer.get_last_n(3) | |
| assert len(result) == 3 | |
| assert 1625097602000 in result | |
| assert 1625097603000 in result | |
| assert 1625097604000 in result | |
| def test_enabled(sized_log_buffer): | |
| assert not sized_log_buffer.enabled() | |
| sized_log_buffer.max = 1 | |
| assert sized_log_buffer.enabled() | |
| def test_max_size(sized_log_buffer): | |
| assert sized_log_buffer.max_size() == 0 | |
| sized_log_buffer.max = 100 | |
| assert sized_log_buffer.max_size() == 100 | |