Spaces:
Build error
Build error
| import os | |
| import shutil | |
| from unittest.mock import MagicMock, Mock | |
| import pytest | |
| from litellm import ChatCompletionMessageToolCall | |
| from openhands.controller.state.state import State | |
| from openhands.core.config.agent_config import AgentConfig | |
| from openhands.core.message import ImageContent, Message, TextContent | |
| from openhands.events.action import ( | |
| AgentFinishAction, | |
| CmdRunAction, | |
| MessageAction, | |
| ) | |
| from openhands.events.action.message import SystemMessageAction | |
| from openhands.events.event import ( | |
| Event, | |
| EventSource, | |
| FileEditSource, | |
| FileReadSource, | |
| RecallType, | |
| ) | |
| from openhands.events.observation import CmdOutputObservation | |
| from openhands.events.observation.agent import ( | |
| MicroagentKnowledge, | |
| RecallObservation, | |
| ) | |
| from openhands.events.observation.browse import BrowserOutputObservation | |
| from openhands.events.observation.commands import ( | |
| CmdOutputMetadata, | |
| IPythonRunCellObservation, | |
| ) | |
| from openhands.events.observation.delegate import AgentDelegateObservation | |
| from openhands.events.observation.error import ErrorObservation | |
| from openhands.events.observation.files import FileEditObservation, FileReadObservation | |
| from openhands.events.observation.reject import UserRejectObservation | |
| from openhands.events.tool import ToolCallMetadata | |
| from openhands.memory.conversation_memory import ConversationMemory | |
| from openhands.utils.prompt import PromptManager, RepositoryInfo, RuntimeInfo | |
| def agent_config(): | |
| return AgentConfig( | |
| enable_prompt_extensions=True, | |
| enable_som_visual_browsing=True, | |
| disabled_microagents=['disabled_agent'], | |
| ) | |
| def conversation_memory(agent_config): | |
| prompt_manager = MagicMock(spec=PromptManager) | |
| prompt_manager.get_system_message.return_value = 'System message' | |
| prompt_manager.build_workspace_context.return_value = ( | |
| 'Formatted repository and runtime info' | |
| ) | |
| # Make build_microagent_info return the actual content from the triggered agents | |
| def build_microagent_info(triggered_agents): | |
| if not triggered_agents: | |
| return '' | |
| return '\n'.join(agent.content for agent in triggered_agents) | |
| prompt_manager.build_microagent_info.side_effect = build_microagent_info | |
| return ConversationMemory(agent_config, prompt_manager) | |
| def prompt_dir(tmp_path): | |
| # Copy contents from "openhands/agenthub/codeact_agent" to the temp directory | |
| shutil.copytree( | |
| 'openhands/agenthub/codeact_agent/prompts', tmp_path, dirs_exist_ok=True | |
| ) | |
| # Return the temporary directory path | |
| return tmp_path | |
| def mock_state(): | |
| state = MagicMock(spec=State) | |
| state.history = [] | |
| return state | |
| def mock_prompt_manager(): | |
| return MagicMock() | |
| def test_process_events_with_message_action(conversation_memory): | |
| """Test that MessageAction is processed correctly.""" | |
| # Create a system message action | |
| system_message = SystemMessageAction(content='System message') | |
| system_message._source = EventSource.AGENT | |
| # Create user and assistant messages | |
| user_message = MessageAction(content='Hello') | |
| user_message._source = EventSource.USER | |
| assistant_message = MessageAction(content='Hi there') | |
| assistant_message._source = EventSource.AGENT | |
| # Process events | |
| messages = conversation_memory.process_events( | |
| condensed_history=[system_message, user_message, assistant_message], | |
| initial_user_action=user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Check that the messages were processed correctly | |
| assert len(messages) == 3 | |
| assert messages[0].role == 'system' | |
| assert messages[0].content[0].text == 'System message' | |
| # Test cases for _ensure_system_message | |
| def test_ensure_system_message_adds_if_missing(conversation_memory): | |
| """Test that _ensure_system_message adds a system message if none exists.""" | |
| user_message = MessageAction(content='User message') | |
| user_message._source = EventSource.USER | |
| events = [user_message] | |
| conversation_memory._ensure_system_message(events) | |
| assert len(events) == 2 | |
| assert isinstance(events[0], SystemMessageAction) | |
| assert events[0].content == 'System message' # From fixture | |
| assert isinstance(events[1], MessageAction) # Original event is still there | |
| def test_ensure_system_message_does_nothing_if_present(conversation_memory): | |
| """Test that _ensure_system_message does nothing if a system message is already present.""" | |
| original_system_message = SystemMessageAction(content='Existing system message') | |
| user_message = MessageAction(content='User message') | |
| user_message._source = EventSource.USER | |
| events = [ | |
| original_system_message, | |
| user_message, | |
| ] | |
| original_events = list(events) # Copy before modification | |
| conversation_memory._ensure_system_message(events) | |
| assert events == original_events # List should be unchanged | |
| # Test cases for _ensure_initial_user_message | |
| def initial_user_action(): | |
| msg = MessageAction(content='Initial User Message') | |
| msg._source = EventSource.USER | |
| return msg | |
| def test_ensure_initial_user_message_adds_if_only_system( | |
| conversation_memory, initial_user_action | |
| ): | |
| """Test adding the initial user message when only the system message exists.""" | |
| system_message = SystemMessageAction(content='System') | |
| system_message._source = EventSource.AGENT | |
| events = [system_message] | |
| conversation_memory._ensure_initial_user_message(events, initial_user_action) | |
| assert len(events) == 2 | |
| assert events[0] == system_message | |
| assert events[1] == initial_user_action | |
| def test_ensure_initial_user_message_correct_already_present( | |
| conversation_memory, initial_user_action | |
| ): | |
| """Test that nothing changes if the correct initial user message is at index 1.""" | |
| system_message = SystemMessageAction(content='System') | |
| agent_message = MessageAction(content='Assistant') | |
| agent_message._source = EventSource.USER | |
| events = [ | |
| system_message, | |
| initial_user_action, | |
| agent_message, | |
| ] | |
| original_events = list(events) | |
| conversation_memory._ensure_initial_user_message(events, initial_user_action) | |
| assert events == original_events | |
| def test_ensure_initial_user_message_incorrect_at_index_1( | |
| conversation_memory, initial_user_action | |
| ): | |
| """Test inserting the correct initial user message when an incorrect message is at index 1.""" | |
| system_message = SystemMessageAction(content='System') | |
| incorrect_second_message = MessageAction(content='Assistant') | |
| incorrect_second_message._source = EventSource.AGENT | |
| events = [system_message, incorrect_second_message] | |
| conversation_memory._ensure_initial_user_message(events, initial_user_action) | |
| assert len(events) == 3 | |
| assert events[0] == system_message | |
| assert events[1] == initial_user_action # Correct one inserted | |
| assert events[2] == incorrect_second_message # Original second message shifted | |
| def test_ensure_initial_user_message_correct_present_later( | |
| conversation_memory, initial_user_action | |
| ): | |
| """Test inserting the correct initial user message at index 1 even if it exists later.""" | |
| system_message = SystemMessageAction(content='System') | |
| incorrect_second_message = MessageAction(content='Assistant') | |
| incorrect_second_message._source = EventSource.AGENT | |
| # Correct initial message is present, but later in the list | |
| events = [system_message, incorrect_second_message] | |
| conversation_memory._ensure_system_message(events) | |
| conversation_memory._ensure_initial_user_message(events, initial_user_action) | |
| assert len(events) == 3 # Should still insert at index 1, not remove the later one | |
| assert events[0] == system_message | |
| assert events[1] == initial_user_action # Correct one inserted at index 1 | |
| assert events[2] == incorrect_second_message # Original second message shifted | |
| # The duplicate initial_user_action originally at index 2 is now at index 3 (implicitly tested by length and content) | |
| def test_ensure_initial_user_message_different_user_msg_at_index_1( | |
| conversation_memory, initial_user_action | |
| ): | |
| """Test inserting the correct initial user message when a *different* user message is at index 1.""" | |
| system_message = SystemMessageAction(content='System') | |
| different_user_message = MessageAction(content='Different User Message') | |
| different_user_message._source = EventSource.USER | |
| events = [system_message, different_user_message] | |
| conversation_memory._ensure_initial_user_message(events, initial_user_action) | |
| assert len(events) == 2 | |
| assert events[0] == system_message | |
| assert events[1] == different_user_message # Original second message remains | |
| def test_ensure_initial_user_message_different_user_msg_at_index_1_and_orphaned_obs( | |
| conversation_memory, initial_user_action | |
| ): | |
| """ | |
| Test process_events when an incorrect user message is at index 1 AND | |
| an orphaned observation (with tool_call_metadata but no matching action) exists. | |
| Expect: System msg, CORRECT initial user msg, the incorrect user msg (shifted). | |
| The orphaned observation should be filtered out. | |
| """ | |
| system_message = SystemMessageAction(content='System') | |
| different_user_message = MessageAction(content='Different User Message') | |
| different_user_message._source = EventSource.USER | |
| # Create an orphaned observation (no matching action/tool call request will exist) | |
| # Use a dictionary that mimics ModelResponse structure to satisfy Pydantic | |
| mock_response = { | |
| 'id': 'mock_response_id', | |
| 'choices': [{'message': {'content': None, 'tool_calls': []}}], | |
| 'created': 0, | |
| 'model': '', | |
| 'object': '', | |
| 'usage': {'completion_tokens': 0, 'prompt_tokens': 0, 'total_tokens': 0}, | |
| } | |
| orphaned_obs = CmdOutputObservation( | |
| command='orphan_cmd', | |
| content='Orphaned output', | |
| command_id=99, | |
| exit_code=0, | |
| ) | |
| orphaned_obs.tool_call_metadata = ToolCallMetadata( | |
| tool_call_id='orphan_call_id', | |
| function_name='execute_bash', | |
| model_response=mock_response, | |
| total_calls_in_response=1, | |
| ) | |
| # Initial events list: system, wrong user message, orphaned observation | |
| events = [system_message, different_user_message, orphaned_obs] | |
| # Call the main process_events method | |
| messages = conversation_memory.process_events( | |
| condensed_history=events, | |
| initial_user_action=initial_user_action, # Provide the *correct* initial action | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Assertions on the final messages list | |
| assert len(messages) == 2 | |
| # 1. System message should be first | |
| assert messages[0].role == 'system' | |
| assert messages[0].content[0].text == 'System' | |
| # 2. The different user message should be left at index 1 | |
| assert messages[1].role == 'user' | |
| assert messages[1].content[0].text == different_user_message.content | |
| # Implicitly assert that the orphaned_obs was filtered out by checking the length (2) | |
| def test_process_events_with_cmd_output_observation(conversation_memory): | |
| obs = CmdOutputObservation( | |
| command='echo hello', | |
| content='Command output', | |
| metadata=CmdOutputMetadata( | |
| exit_code=0, | |
| prefix='[THIS IS PREFIX]', | |
| suffix='[THIS IS SUFFIX]', | |
| ), | |
| ) | |
| initial_user_message = MessageAction(content='Initial user message') | |
| initial_user_message._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 # System + initial user + result | |
| result = messages[2] # The actual result is now at index 2 | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert 'Observed result of command executed by user:' in result.content[0].text | |
| assert '[Command finished with exit code 0]' in result.content[0].text | |
| assert '[THIS IS PREFIX]' in result.content[0].text | |
| assert '[THIS IS SUFFIX]' in result.content[0].text | |
| def test_process_events_with_ipython_run_cell_observation(conversation_memory): | |
| obs = IPythonRunCellObservation( | |
| code='plt.plot()', | |
| content='IPython output\n', | |
| ) | |
| initial_user_message = MessageAction(content='Initial user message') | |
| initial_user_message._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 # System + initial user + result | |
| result = messages[2] # The actual result is now at index 2 | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert 'IPython output' in result.content[0].text | |
| assert ( | |
| ' already displayed to user' | |
| in result.content[0].text | |
| ) | |
| assert 'ABC123' not in result.content[0].text | |
| def test_process_events_with_agent_delegate_observation(conversation_memory): | |
| obs = AgentDelegateObservation( | |
| content='Content', outputs={'content': 'Delegated agent output'} | |
| ) | |
| initial_user_message = MessageAction(content='Initial user message') | |
| initial_user_message._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 # System + initial user + result | |
| result = messages[2] # The actual result is now at index 2 | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert 'Delegated agent output' in result.content[0].text | |
| def test_process_events_with_error_observation(conversation_memory): | |
| obs = ErrorObservation('Error message') | |
| initial_user_message = MessageAction(content='Initial user message') | |
| initial_user_message._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 # System + initial user + result | |
| result = messages[2] # The actual result is now at index 2 | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert 'Error message' in result.content[0].text | |
| assert 'Error occurred in processing last action' in result.content[0].text | |
| def test_process_events_with_unknown_observation(conversation_memory): | |
| # Create a mock that inherits from Event but not Action or Observation | |
| obs = Mock(spec=Event) | |
| initial_user_message = MessageAction(content='Initial user message') | |
| initial_user_message._source = EventSource.USER | |
| with pytest.raises(ValueError, match='Unknown event type'): | |
| conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| def test_process_events_with_file_edit_observation(conversation_memory): | |
| obs = FileEditObservation( | |
| path='/test/file.txt', | |
| prev_exist=True, | |
| old_content='old content', | |
| new_content='new content', | |
| content='diff content', | |
| impl_source=FileEditSource.LLM_BASED_EDIT, | |
| ) | |
| initial_user_message = MessageAction(content='Initial user message') | |
| initial_user_message._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 # System + initial user + result | |
| result = messages[2] # The actual result is now at index 2 | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert '[Existing file /test/file.txt is edited with' in result.content[0].text | |
| def test_process_events_with_file_read_observation(conversation_memory): | |
| obs = FileReadObservation( | |
| path='/test/file.txt', | |
| content='File content', | |
| impl_source=FileReadSource.DEFAULT, | |
| ) | |
| initial_user_message = MessageAction(content='Initial user message') | |
| initial_user_message._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 # System + initial user + result | |
| result = messages[2] # The actual result is now at index 2 | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert result.content[0].text == '\n\nFile content' | |
| def test_process_events_with_browser_output_observation(conversation_memory): | |
| obs = BrowserOutputObservation( | |
| url='http://example.com', | |
| trigger_by_action='browse', | |
| screenshot='', | |
| content='Page loaded', | |
| error=False, | |
| ) | |
| initial_user_message = MessageAction(content='Initial user message') | |
| initial_user_message._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 # System + initial user + result | |
| result = messages[2] # The actual result is now at index 2 | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert '[Current URL: http://example.com]' in result.content[0].text | |
| def test_process_events_with_user_reject_observation(conversation_memory): | |
| obs = UserRejectObservation('Action rejected') | |
| initial_user_message = MessageAction(content='Initial user message') | |
| initial_user_message._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 # System + initial user + result | |
| result = messages[2] # The actual result is now at index 2 | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert 'Action rejected' in result.content[0].text | |
| assert '[Last action has been rejected by the user]' in result.content[0].text | |
| def test_process_events_with_empty_environment_info(conversation_memory): | |
| """Test that empty environment info observations return an empty list of messages without calling build_workspace_context.""" | |
| # Create a RecallObservation with empty info | |
| empty_obs = RecallObservation( | |
| recall_type=RecallType.WORKSPACE_CONTEXT, | |
| repo_name='', | |
| repo_directory='', | |
| repo_instructions='', | |
| runtime_hosts={}, | |
| additional_agent_instructions='', | |
| microagent_knowledge=[], | |
| content='Retrieved environment info', | |
| ) | |
| initial_user_message = MessageAction(content='Initial user message') | |
| initial_user_message._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[empty_obs], | |
| initial_user_action=initial_user_message, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Should only contain system message and initial user message | |
| assert len(messages) == 2 | |
| # Verify that build_workspace_context was NOT called since all input values were empty | |
| conversation_memory.prompt_manager.build_workspace_context.assert_not_called() | |
| def test_process_events_with_function_calling_observation(conversation_memory): | |
| mock_response = { | |
| 'id': 'mock_id', | |
| 'total_calls_in_response': 1, | |
| 'choices': [{'message': {'content': 'Task completed'}}], | |
| } | |
| obs = CmdOutputObservation( | |
| command='echo hello', | |
| content='Command output', | |
| command_id=1, | |
| exit_code=0, | |
| ) | |
| obs.tool_call_metadata = ToolCallMetadata( | |
| tool_call_id='123', | |
| function_name='execute_bash', | |
| model_response=mock_response, | |
| total_calls_in_response=1, | |
| ) | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # No direct message when using function calling | |
| assert ( | |
| len(messages) == 2 | |
| ) # should be no messages except system message and initial user message | |
| def test_process_events_with_message_action_with_image(conversation_memory): | |
| action = MessageAction( | |
| content='Message with image', | |
| image_urls=['http://example.com/image.jpg'], | |
| ) | |
| action._source = EventSource.AGENT | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[action], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=True, | |
| ) | |
| assert len(messages) == 3 | |
| result = messages[2] | |
| assert result.role == 'assistant' | |
| assert len(result.content) == 2 | |
| assert isinstance(result.content[0], TextContent) | |
| assert isinstance(result.content[1], ImageContent) | |
| assert result.content[0].text == 'Message with image' | |
| assert result.content[1].image_urls == ['http://example.com/image.jpg'] | |
| def test_process_events_with_user_cmd_action(conversation_memory): | |
| action = CmdRunAction(command='ls -l') | |
| action._source = EventSource.USER | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[action], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 | |
| result = messages[2] | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert 'User executed the command' in result.content[0].text | |
| assert 'ls -l' in result.content[0].text | |
| def test_process_events_with_agent_finish_action_with_tool_metadata( | |
| conversation_memory, | |
| ): | |
| mock_response = { | |
| 'id': 'mock_id', | |
| 'total_calls_in_response': 1, | |
| 'choices': [{'message': {'content': 'Task completed'}}], | |
| } | |
| action = AgentFinishAction(thought='Initial thought') | |
| action._source = EventSource.AGENT | |
| action.tool_call_metadata = ToolCallMetadata( | |
| tool_call_id='123', | |
| function_name='finish', | |
| model_response=mock_response, | |
| total_calls_in_response=1, | |
| ) | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[action], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 | |
| result = messages[2] | |
| assert result.role == 'assistant' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert 'Initial thought\nTask completed' in result.content[0].text | |
| def test_apply_prompt_caching(conversation_memory): | |
| messages = [ | |
| Message(role='system', content=[TextContent(text='System message')]), | |
| Message(role='user', content=[TextContent(text='User message')]), | |
| Message(role='assistant', content=[TextContent(text='Assistant message')]), | |
| Message(role='user', content=[TextContent(text='Another user message')]), | |
| ] | |
| conversation_memory.apply_prompt_caching(messages) | |
| # System message is hard-coded to be cached always | |
| assert messages[0].content[0].cache_prompt is True | |
| assert messages[1].content[0].cache_prompt is False | |
| assert messages[2].content[0].cache_prompt is False | |
| # Only the last user message should have cache_prompt=True | |
| assert messages[3].content[0].cache_prompt is True | |
| def test_process_events_with_environment_microagent_observation(conversation_memory): | |
| """Test processing a RecallObservation with ENVIRONMENT info type.""" | |
| obs = RecallObservation( | |
| recall_type=RecallType.WORKSPACE_CONTEXT, | |
| repo_name='test-repo', | |
| repo_directory='/path/to/repo', | |
| repo_instructions='# Test Repository\nThis is a test repository.', | |
| runtime_hosts={'localhost': 8080}, | |
| content='Retrieved environment info', | |
| ) | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 | |
| result = messages[2] | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| assert result.content[0].text == '\n\nFormatted repository and runtime info' | |
| # Verify the prompt_manager was called with the correct parameters | |
| conversation_memory.prompt_manager.build_workspace_context.assert_called_once() | |
| call_args = conversation_memory.prompt_manager.build_workspace_context.call_args[1] | |
| assert isinstance(call_args['repository_info'], RepositoryInfo) | |
| assert call_args['repository_info'].repo_name == 'test-repo' | |
| assert call_args['repository_info'].repo_directory == '/path/to/repo' | |
| assert isinstance(call_args['runtime_info'], RuntimeInfo) | |
| assert call_args['runtime_info'].available_hosts == {'localhost': 8080} | |
| assert ( | |
| call_args['repo_instructions'] | |
| == '# Test Repository\nThis is a test repository.' | |
| ) | |
| def test_process_events_with_knowledge_microagent_microagent_observation( | |
| conversation_memory, | |
| ): | |
| """Test processing a RecallObservation with KNOWLEDGE type.""" | |
| microagent_knowledge = [ | |
| MicroagentKnowledge( | |
| name='test_agent', | |
| trigger='test', | |
| content='This is test agent content', | |
| ), | |
| MicroagentKnowledge( | |
| name='another_agent', | |
| trigger='another', | |
| content='This is another agent content', | |
| ), | |
| MicroagentKnowledge( | |
| name='disabled_agent', | |
| trigger='disabled', | |
| content='This is disabled agent content', | |
| ), | |
| ] | |
| obs = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=microagent_knowledge, | |
| content='Retrieved knowledge from microagents', | |
| ) | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| assert len(messages) == 3 # System + Initial User + Result | |
| result = messages[2] # Result is now at index 2 | |
| assert result.role == 'user' | |
| assert len(result.content) == 1 | |
| assert isinstance(result.content[0], TextContent) | |
| # Verify that disabled_agent is filtered out and enabled agents are included | |
| assert 'This is test agent content' in result.content[0].text | |
| assert 'This is another agent content' in result.content[0].text | |
| assert 'This is disabled agent content' not in result.content[0].text | |
| # Verify the prompt_manager was called with the correct parameters | |
| conversation_memory.prompt_manager.build_microagent_info.assert_called_once() | |
| call_args = conversation_memory.prompt_manager.build_microagent_info.call_args[1] | |
| # Check that disabled_agent was filtered out | |
| triggered_agents = call_args['triggered_agents'] | |
| assert len(triggered_agents) == 2 | |
| agent_names = [agent.name for agent in triggered_agents] | |
| assert 'test_agent' in agent_names | |
| assert 'another_agent' in agent_names | |
| assert 'disabled_agent' not in agent_names | |
| def test_process_events_with_microagent_observation_extensions_disabled( | |
| agent_config, conversation_memory | |
| ): | |
| """Test processing a RecallObservation when prompt extensions are disabled.""" | |
| # Modify the agent config to disable prompt extensions | |
| agent_config.enable_prompt_extensions = False | |
| obs = RecallObservation( | |
| recall_type=RecallType.WORKSPACE_CONTEXT, | |
| repo_name='test-repo', | |
| repo_directory='/path/to/repo', | |
| content='Retrieved environment info', | |
| ) | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # When prompt extensions are disabled, the RecallObservation should be ignored | |
| assert len(messages) == 2 # System + Initial User | |
| # Verify the prompt_manager was not called | |
| conversation_memory.prompt_manager.build_workspace_context.assert_not_called() | |
| conversation_memory.prompt_manager.build_microagent_info.assert_not_called() | |
| def test_process_events_with_empty_microagent_knowledge(conversation_memory): | |
| """Test processing a RecallObservation with empty microagent knowledge.""" | |
| obs = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=[], | |
| content='Retrieved knowledge from microagents', | |
| ) | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # The implementation returns an empty string and it doesn't creates a message | |
| assert len(messages) == 2 # System + Initial User | |
| # When there are no triggered agents, build_microagent_info is not called | |
| conversation_memory.prompt_manager.build_microagent_info.assert_not_called() | |
| def test_conversation_memory_processes_microagent_observation(prompt_dir): | |
| """Test that ConversationMemory processes RecallObservations correctly.""" | |
| # Create a microagent_info.j2 template file | |
| template_path = os.path.join(prompt_dir, 'microagent_info.j2') | |
| if not os.path.exists(template_path): | |
| with open(template_path, 'w') as f: | |
| f.write("""{% for agent_info in triggered_agents %} | |
| <EXTRA_INFO> | |
| The following information has been included based on a keyword match for "{{ agent_info.trigger_word }}". | |
| It may or may not be relevant to the user's request. | |
| # Verify the template was correctly rendered | |
| {{ agent_info.content }} | |
| </EXTRA_INFO> | |
| {% endfor %} | |
| """) | |
| # Create a mock agent config | |
| agent_config = MagicMock(spec=AgentConfig) | |
| agent_config.enable_prompt_extensions = True | |
| agent_config.disabled_microagents = [] | |
| # Create a PromptManager | |
| prompt_manager = PromptManager(prompt_dir=prompt_dir) | |
| # Initialize ConversationMemory | |
| conversation_memory = ConversationMemory( | |
| config=agent_config, prompt_manager=prompt_manager | |
| ) | |
| # Create a RecallObservation with microagent knowledge | |
| microagent_observation = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=[ | |
| MicroagentKnowledge( | |
| name='test_agent', | |
| trigger='test_trigger', | |
| content='This is triggered content for testing.', | |
| ) | |
| ], | |
| content='Retrieved knowledge from microagents', | |
| ) | |
| # Process the observation | |
| messages = conversation_memory._process_observation( | |
| obs=microagent_observation, tool_call_id_to_message={}, max_message_chars=None | |
| ) | |
| # Verify the message was created correctly | |
| assert len(messages) == 1 | |
| message = messages[0] | |
| assert message.role == 'user' | |
| assert len(message.content) == 1 | |
| assert isinstance(message.content[0], TextContent) | |
| expected_text = """<EXTRA_INFO> | |
| The following information has been included based on a keyword match for "test_trigger". | |
| It may or may not be relevant to the user's request. | |
| This is triggered content for testing. | |
| </EXTRA_INFO>""" | |
| assert message.content[0].text.strip() == expected_text.strip() | |
| # Clean up | |
| os.remove(os.path.join(prompt_dir, 'microagent_info.j2')) | |
| def test_conversation_memory_processes_environment_microagent_observation(prompt_dir): | |
| """Test that ConversationMemory processes environment info RecallObservations correctly.""" | |
| # Create an additional_info.j2 template file | |
| template_path = os.path.join(prompt_dir, 'additional_info.j2') | |
| if not os.path.exists(template_path): | |
| with open(template_path, 'w') as f: | |
| f.write(""" | |
| {% if repository_info %} | |
| <REPOSITORY_INFO> | |
| At the user's request, repository {{ repository_info.repo_name }} has been cloned to directory {{ repository_info.repo_directory }}. | |
| </REPOSITORY_INFO> | |
| {% endif %} | |
| {% if repository_instructions %} | |
| <REPOSITORY_INSTRUCTIONS> | |
| {{ repository_instructions }} | |
| </REPOSITORY_INSTRUCTIONS> | |
| {% endif %} | |
| {% if runtime_info and runtime_info.available_hosts %} | |
| <RUNTIME_INFORMATION> | |
| The user has access to the following hosts for accessing a web application, | |
| each of which has a corresponding port: | |
| {% for host, port in runtime_info.available_hosts.items() %} | |
| * {{ host }} (port {{ port }}) | |
| {% endfor %} | |
| </RUNTIME_INFORMATION> | |
| {% endif %} | |
| """) | |
| # Create a mock agent config | |
| agent_config = MagicMock(spec=AgentConfig) | |
| agent_config.enable_prompt_extensions = True | |
| # Create a PromptManager | |
| prompt_manager = PromptManager(prompt_dir=prompt_dir) | |
| # Initialize ConversationMemory | |
| conversation_memory = ConversationMemory( | |
| config=agent_config, prompt_manager=prompt_manager | |
| ) | |
| # Create a RecallObservation with environment info | |
| microagent_observation = RecallObservation( | |
| recall_type=RecallType.WORKSPACE_CONTEXT, | |
| repo_name='owner/repo', | |
| repo_directory='/workspace/repo', | |
| repo_instructions='This repository contains important code.', | |
| runtime_hosts={'example.com': 8080}, | |
| content='Retrieved environment info', | |
| ) | |
| # Process the observation | |
| messages = conversation_memory._process_observation( | |
| obs=microagent_observation, tool_call_id_to_message={}, max_message_chars=None | |
| ) | |
| # Verify the message was created correctly | |
| assert len(messages) == 1 | |
| message = messages[0] | |
| assert message.role == 'user' | |
| assert len(message.content) == 1 | |
| assert isinstance(message.content[0], TextContent) | |
| # Check that the message contains the repository info | |
| assert '<REPOSITORY_INFO>' in message.content[0].text | |
| assert 'owner/repo' in message.content[0].text | |
| assert '/workspace/repo' in message.content[0].text | |
| # Check that the message contains the repository instructions | |
| assert '<REPOSITORY_INSTRUCTIONS>' in message.content[0].text | |
| assert 'This repository contains important code.' in message.content[0].text | |
| # Check that the message contains the runtime info | |
| assert '<RUNTIME_INFORMATION>' in message.content[0].text | |
| assert 'example.com (port 8080)' in message.content[0].text | |
| def test_process_events_with_microagent_observation_deduplication(conversation_memory): | |
| """Test that RecallObservations are properly deduplicated based on agent name. | |
| The deduplication logic should keep the FIRST occurrence of each microagent | |
| and filter out later occurrences to avoid redundant information. | |
| """ | |
| # Create a sequence of RecallObservations with overlapping agents | |
| obs1 = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=[ | |
| MicroagentKnowledge( | |
| name='python_agent', | |
| trigger='python', | |
| content='Python best practices v1', | |
| ), | |
| MicroagentKnowledge( | |
| name='git_agent', | |
| trigger='git', | |
| content='Git best practices v1', | |
| ), | |
| MicroagentKnowledge( | |
| name='image_agent', | |
| trigger='image', | |
| content='Image best practices v1', | |
| ), | |
| ], | |
| content='First retrieval', | |
| ) | |
| obs2 = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=[ | |
| MicroagentKnowledge( | |
| name='python_agent', | |
| trigger='python', | |
| content='Python best practices v2', | |
| ), | |
| ], | |
| content='Second retrieval', | |
| ) | |
| obs3 = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=[ | |
| MicroagentKnowledge( | |
| name='git_agent', | |
| trigger='git', | |
| content='Git best practices v3', | |
| ), | |
| ], | |
| content='Third retrieval', | |
| ) | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs1, obs2, obs3], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Verify that only the first occurrence of content for each agent is included | |
| assert len(messages) == 3 # System + Initial User + Result | |
| # Result is now at index 2 | |
| # First microagent should include all agents since they appear here first | |
| assert 'Image best practices v1' in messages[2].content[0].text | |
| assert 'Git best practices v1' in messages[2].content[0].text | |
| assert 'Python best practices v1' in messages[2].content[0].text | |
| def test_process_events_with_microagent_observation_deduplication_disabled_agents( | |
| conversation_memory, | |
| ): | |
| """Test that disabled agents are filtered out and deduplication keeps the first occurrence.""" | |
| # Create a sequence of RecallObservations with disabled agents | |
| obs1 = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=[ | |
| MicroagentKnowledge( | |
| name='disabled_agent', | |
| trigger='disabled', | |
| content='Disabled agent content', | |
| ), | |
| MicroagentKnowledge( | |
| name='enabled_agent', | |
| trigger='enabled', | |
| content='Enabled agent content v1', | |
| ), | |
| ], | |
| content='First retrieval', | |
| ) | |
| obs2 = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=[ | |
| MicroagentKnowledge( | |
| name='enabled_agent', | |
| trigger='enabled', | |
| content='Enabled agent content v2', | |
| ), | |
| ], | |
| content='Second retrieval', | |
| ) | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs1, obs2], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Verify that disabled agents are filtered out and only the first occurrence of enabled agents is included | |
| assert len(messages) == 3 # System + Initial User + Result | |
| # Result is now at index 2 | |
| # First microagent should include enabled_agent but not disabled_agent | |
| assert 'Disabled agent content' not in messages[2].content[0].text | |
| assert 'Enabled agent content v1' in messages[2].content[0].text | |
| def test_process_events_with_microagent_observation_deduplication_empty( | |
| conversation_memory, | |
| ): | |
| """Test that empty RecallObservations are handled correctly.""" | |
| obs = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=[], | |
| content='Empty retrieval', | |
| ) | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[obs], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Verify that empty RecallObservations are handled gracefully | |
| assert ( | |
| len(messages) == 2 # System + Initial User | |
| ) # an empty microagent is not added to Messages | |
| assert messages[0].role == 'system' | |
| assert messages[1].role == 'user' # Initial user message | |
| def test_has_agent_in_earlier_events(conversation_memory): | |
| """Test the _has_agent_in_earlier_events helper method.""" | |
| # Create test RecallObservations | |
| obs1 = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=[ | |
| MicroagentKnowledge( | |
| name='agent1', | |
| trigger='trigger1', | |
| content='Content 1', | |
| ), | |
| ], | |
| content='First retrieval', | |
| ) | |
| obs2 = RecallObservation( | |
| recall_type=RecallType.KNOWLEDGE, | |
| microagent_knowledge=[ | |
| MicroagentKnowledge( | |
| name='agent2', | |
| trigger='trigger2', | |
| content='Content 2', | |
| ), | |
| ], | |
| content='Second retrieval', | |
| ) | |
| obs3 = RecallObservation( | |
| recall_type=RecallType.WORKSPACE_CONTEXT, | |
| content='Environment info', | |
| ) | |
| # Create a list with mixed event types | |
| events = [obs1, MessageAction(content='User message'), obs2, obs3] | |
| # Test looking for existing agents | |
| assert conversation_memory._has_agent_in_earlier_events('agent1', 2, events) is True | |
| assert conversation_memory._has_agent_in_earlier_events('agent1', 3, events) is True | |
| assert conversation_memory._has_agent_in_earlier_events('agent1', 4, events) is True | |
| # Test looking for an agent in a later position (should not find it) | |
| assert ( | |
| conversation_memory._has_agent_in_earlier_events('agent2', 0, events) is False | |
| ) | |
| assert ( | |
| conversation_memory._has_agent_in_earlier_events('agent2', 1, events) is False | |
| ) | |
| # Test looking for an agent in a different microagent type (should not find it) | |
| assert ( | |
| conversation_memory._has_agent_in_earlier_events('non_existent', 3, events) | |
| is False | |
| ) | |
| class TestFilterUnmatchedToolCalls: | |
| def processor(self): | |
| return ConversationMemory() | |
| def test_empty_is_unchanged(self): | |
| assert list(ConversationMemory._filter_unmatched_tool_calls([])) == [] | |
| def test_no_tool_calls_is_unchanged(self): | |
| messages = [ | |
| Message(role='user', content=[TextContent(text='Hello')]), | |
| Message(role='assistant', content=[TextContent(text='Hi there')]), | |
| Message(role='user', content=[TextContent(text='How are you?')]), | |
| ] | |
| assert ( | |
| list(ConversationMemory._filter_unmatched_tool_calls(messages)) == messages | |
| ) | |
| def test_matched_tool_calls_are_unchanged(self): | |
| messages = [ | |
| Message(role='user', content=[TextContent(text="What's the weather?")]), | |
| Message( | |
| role='assistant', | |
| content=[], | |
| tool_calls=[ | |
| ChatCompletionMessageToolCall( | |
| id='call_1', | |
| type='function', | |
| function={'name': 'get_weather', 'arguments': ''}, | |
| ) | |
| ], | |
| ), | |
| Message( | |
| role='tool', | |
| tool_call_id='call_1', | |
| content=[TextContent(text='Sunny, 75°F')], | |
| ), | |
| Message(role='assistant', content=[TextContent(text="It's sunny today.")]), | |
| ] | |
| # All tool calls have matching responses, should remain unchanged | |
| assert ( | |
| list(ConversationMemory._filter_unmatched_tool_calls(messages)) == messages | |
| ) | |
| def test_tool_call_without_response_is_removed(self): | |
| messages = [ | |
| Message(role='user', content=[TextContent(text='Query')]), | |
| Message( | |
| role='tool', | |
| tool_call_id='missing_call', | |
| content=[TextContent(text='Response')], | |
| ), | |
| Message(role='assistant', content=[TextContent(text='Answer')]), | |
| ] | |
| expected_after_filter = [ | |
| Message(role='user', content=[TextContent(text='Query')]), | |
| Message(role='assistant', content=[TextContent(text='Answer')]), | |
| ] | |
| result = list(ConversationMemory._filter_unmatched_tool_calls(messages)) | |
| assert result == expected_after_filter | |
| def test_tool_response_without_call_is_removed(self): | |
| messages = [ | |
| Message(role='user', content=[TextContent(text='Query')]), | |
| Message( | |
| role='assistant', | |
| content=[], | |
| tool_calls=[ | |
| ChatCompletionMessageToolCall( | |
| id='unmatched_call', | |
| type='function', | |
| function={'name': 'some_function', 'arguments': ''}, | |
| ) | |
| ], | |
| ), | |
| Message(role='assistant', content=[TextContent(text='Answer')]), | |
| ] | |
| expected_after_filter = [ | |
| Message(role='user', content=[TextContent(text='Query')]), | |
| Message(role='assistant', content=[TextContent(text='Answer')]), | |
| ] | |
| result = list(ConversationMemory._filter_unmatched_tool_calls(messages)) | |
| assert result == expected_after_filter | |
| def test_partial_matched_tool_calls_retains_matched(self): | |
| """When there are both matched and unmatched tools calls in a message, retain the message and only matched calls""" | |
| messages = [ | |
| Message(role='user', content=[TextContent(text='Get data')]), | |
| Message( | |
| role='assistant', | |
| content=[], | |
| tool_calls=[ | |
| ChatCompletionMessageToolCall( | |
| id='matched_call', | |
| type='function', | |
| function={'name': 'function1', 'arguments': ''}, | |
| ), | |
| ChatCompletionMessageToolCall( | |
| id='unmatched_call', | |
| type='function', | |
| function={'name': 'function2', 'arguments': ''}, | |
| ), | |
| ], | |
| ), | |
| Message( | |
| role='tool', | |
| tool_call_id='matched_call', | |
| content=[TextContent(text='Data')], | |
| ), | |
| Message(role='assistant', content=[TextContent(text='Result')]), | |
| ] | |
| expected = [ | |
| Message(role='user', content=[TextContent(text='Get data')]), | |
| # This message should be modified to only include the matched tool call | |
| Message( | |
| role='assistant', | |
| content=[], | |
| tool_calls=[ | |
| ChatCompletionMessageToolCall( | |
| id='matched_call', | |
| type='function', | |
| function={'name': 'function1', 'arguments': ''}, | |
| ) | |
| ], | |
| ), | |
| Message( | |
| role='tool', | |
| tool_call_id='matched_call', | |
| content=[TextContent(text='Data')], | |
| ), | |
| Message(role='assistant', content=[TextContent(text='Result')]), | |
| ] | |
| result = list(ConversationMemory._filter_unmatched_tool_calls(messages)) | |
| # Verify result structure | |
| assert len(result) == len(expected) | |
| for i, msg in enumerate(result): | |
| assert msg == expected[i] | |
| def test_system_message_in_events(conversation_memory): | |
| """Test that SystemMessageAction in condensed_history is processed correctly.""" | |
| # Create a system message action | |
| system_message = SystemMessageAction(content='System message', tools=['test_tool']) | |
| system_message._source = EventSource.AGENT | |
| # Process events with the system message in condensed_history | |
| # Define initial user action | |
| initial_user_action = MessageAction(content='Initial user message') | |
| initial_user_action._source = EventSource.USER | |
| messages = conversation_memory.process_events( | |
| condensed_history=[system_message], | |
| initial_user_action=initial_user_action, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Check that the system message was processed correctly | |
| assert len(messages) == 2 # System + Initial User | |
| assert messages[0].role == 'system' | |
| assert messages[0].content[0].text == 'System message' | |
| assert messages[1].role == 'user' # Initial user message | |
| # Helper function to create mock tool call metadata | |
| def _create_mock_tool_call_metadata( | |
| tool_call_id: str, function_name: str, response_id: str = 'mock_response_id' | |
| ) -> ToolCallMetadata: | |
| # Use a dictionary that mimics ModelResponse structure to satisfy Pydantic | |
| mock_response = { | |
| 'id': response_id, | |
| 'choices': [ | |
| { | |
| 'message': { | |
| 'role': 'assistant', | |
| 'content': None, # Content is None for tool calls | |
| 'tool_calls': [ | |
| { | |
| 'id': tool_call_id, | |
| 'type': 'function', | |
| 'function': { | |
| 'name': function_name, | |
| 'arguments': '{}', | |
| }, # Args don't matter for this test | |
| } | |
| ], | |
| } | |
| } | |
| ], | |
| 'created': 0, | |
| 'model': 'mock_model', | |
| 'object': 'chat.completion', | |
| 'usage': {'completion_tokens': 0, 'prompt_tokens': 0, 'total_tokens': 0}, | |
| } | |
| return ToolCallMetadata( | |
| tool_call_id=tool_call_id, | |
| function_name=function_name, | |
| model_response=mock_response, | |
| total_calls_in_response=1, | |
| ) | |
| def test_process_events_partial_history(conversation_memory): | |
| """ | |
| Tests process_events with full and partial histories to verify | |
| _ensure_system_message, _ensure_initial_user_message, and tool call matching logic. | |
| """ | |
| # --- Define Common Events --- | |
| system_message = SystemMessageAction(content='System message') | |
| system_message._source = EventSource.AGENT | |
| user_message = MessageAction( | |
| content='Initial user query' | |
| ) # This is the crucial initial_user_action | |
| user_message._source = EventSource.USER | |
| recall_obs = RecallObservation( | |
| recall_type=RecallType.WORKSPACE_CONTEXT, | |
| repo_name='test-repo', | |
| repo_directory='/path/to/repo', | |
| content='Retrieved environment info', | |
| ) | |
| recall_obs._source = EventSource.AGENT | |
| cmd_action = CmdRunAction(command='ls', thought='Running ls') | |
| cmd_action._source = EventSource.AGENT | |
| cmd_action.tool_call_metadata = _create_mock_tool_call_metadata( | |
| tool_call_id='call_ls_1', function_name='execute_bash', response_id='resp_ls_1' | |
| ) | |
| cmd_obs = CmdOutputObservation( | |
| command_id=1, command='ls', content='file1.txt\nfile2.py', exit_code=0 | |
| ) | |
| cmd_obs._source = EventSource.AGENT | |
| cmd_obs.tool_call_metadata = _create_mock_tool_call_metadata( | |
| tool_call_id='call_ls_1', function_name='execute_bash', response_id='resp_ls_1' | |
| ) | |
| # --- Scenario 1: Full History --- | |
| full_history: list[Event] = [ | |
| system_message, | |
| user_message, # Correct initial user message at index 1 | |
| recall_obs, | |
| cmd_action, | |
| cmd_obs, | |
| ] | |
| messages_full = conversation_memory.process_events( | |
| condensed_history=list(full_history), # Pass a copy | |
| initial_user_action=user_message, # Provide the initial action | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Expected: System, User, Recall (formatted), Assistant (tool call), Tool Response | |
| assert len(messages_full) == 5 | |
| assert messages_full[0].role == 'system' | |
| assert messages_full[0].content[0].text == 'System message' | |
| assert messages_full[1].role == 'user' | |
| assert messages_full[1].content[0].text == 'Initial user query' | |
| assert messages_full[2].role == 'user' # Recall obs becomes user message | |
| assert ( | |
| 'Formatted repository and runtime info' in messages_full[2].content[0].text | |
| ) # From fixture mock | |
| assert messages_full[3].role == 'assistant' | |
| assert messages_full[3].tool_calls is not None | |
| assert len(messages_full[3].tool_calls) == 1 | |
| assert messages_full[3].tool_calls[0].id == 'call_ls_1' | |
| assert messages_full[4].role == 'tool' | |
| assert messages_full[4].tool_call_id == 'call_ls_1' | |
| assert 'file1.txt' in messages_full[4].content[0].text | |
| # --- Scenario 2: Partial History (Action + Observation) --- | |
| # Simulates processing only the last action/observation pair | |
| partial_history_action_obs: list[Event] = [ | |
| cmd_action, | |
| cmd_obs, | |
| ] | |
| messages_partial_action_obs = conversation_memory.process_events( | |
| condensed_history=list(partial_history_action_obs), # Pass a copy | |
| initial_user_action=user_message, # Provide the initial action | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Expected: System (added), Initial User (added), Assistant (tool call), Tool Response | |
| assert len(messages_partial_action_obs) == 4 | |
| assert ( | |
| messages_partial_action_obs[0].role == 'system' | |
| ) # Added by _ensure_system_message | |
| assert messages_partial_action_obs[0].content[0].text == 'System message' | |
| assert ( | |
| messages_partial_action_obs[1].role == 'user' | |
| ) # Added by _ensure_initial_user_message | |
| assert messages_partial_action_obs[1].content[0].text == 'Initial user query' | |
| assert messages_partial_action_obs[2].role == 'assistant' | |
| assert messages_partial_action_obs[2].tool_calls is not None | |
| assert len(messages_partial_action_obs[2].tool_calls) == 1 | |
| assert messages_partial_action_obs[2].tool_calls[0].id == 'call_ls_1' | |
| assert messages_partial_action_obs[3].role == 'tool' | |
| assert messages_partial_action_obs[3].tool_call_id == 'call_ls_1' | |
| assert 'file1.txt' in messages_partial_action_obs[3].content[0].text | |
| # --- Scenario 3: Partial History (Observation Only) --- | |
| # Simulates processing only the last observation | |
| partial_history_obs_only: list[Event] = [ | |
| cmd_obs, | |
| ] | |
| messages_partial_obs_only = conversation_memory.process_events( | |
| condensed_history=list(partial_history_obs_only), # Pass a copy | |
| initial_user_action=user_message, # Provide the initial action | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Expected: System (added), Initial User (added). | |
| # The CmdOutputObservation has tool_call_metadata, but there's no corresponding | |
| # assistant message (from CmdRunAction) with the matching tool_call.id in the input history. | |
| # Therefore, _filter_unmatched_tool_calls should remove the tool response message. | |
| assert len(messages_partial_obs_only) == 2 | |
| assert ( | |
| messages_partial_obs_only[0].role == 'system' | |
| ) # Added by _ensure_system_message | |
| assert messages_partial_obs_only[0].content[0].text == 'System message' | |
| assert ( | |
| messages_partial_obs_only[1].role == 'user' | |
| ) # Added by _ensure_initial_user_message | |
| assert messages_partial_obs_only[1].content[0].text == 'Initial user query' | |
| def test_process_ipython_observation_with_vision_enabled( | |
| agent_config, mock_prompt_manager | |
| ): | |
| """Test that _process_observation correctly handles IPythonRunCellObservation with image_urls when vision is enabled.""" | |
| # Create a ConversationMemory instance | |
| memory = ConversationMemory(agent_config, mock_prompt_manager) | |
| # Create an observation with image URLs | |
| obs = IPythonRunCellObservation( | |
| content='Test output', | |
| code="print('test')", | |
| image_urls=['data:image/png;base64,abc123'], | |
| ) | |
| # Process the observation with vision enabled | |
| messages = memory._process_observation( | |
| obs=obs, | |
| tool_call_id_to_message={}, | |
| max_message_chars=None, | |
| vision_is_active=True, | |
| ) | |
| # Check that the message contains both text and image content | |
| assert len(messages) == 1 | |
| message = messages[0] | |
| assert len(message.content) == 2 | |
| assert isinstance(message.content[0], TextContent) | |
| assert isinstance(message.content[1], ImageContent) | |
| assert message.content[1].image_urls == ['data:image/png;base64,abc123'] | |
| def test_process_ipython_observation_with_vision_disabled( | |
| agent_config, mock_prompt_manager | |
| ): | |
| """Test that _process_observation correctly handles IPythonRunCellObservation with image_urls when vision is disabled.""" | |
| # Create a ConversationMemory instance | |
| memory = ConversationMemory(agent_config, mock_prompt_manager) | |
| # Create an observation with image URLs | |
| obs = IPythonRunCellObservation( | |
| content='Test output', | |
| code="print('test')", | |
| image_urls=['data:image/png;base64,abc123'], | |
| ) | |
| # Process the observation with vision disabled | |
| messages = memory._process_observation( | |
| obs=obs, | |
| tool_call_id_to_message={}, | |
| max_message_chars=None, | |
| vision_is_active=False, | |
| ) | |
| # Check that the message contains only text content | |
| assert len(messages) == 1 | |
| message = messages[0] | |
| assert len(message.content) == 1 | |
| assert isinstance(message.content[0], TextContent) | |