Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Integration test for the MCP server | |
| """ | |
| import asyncio | |
| import sys | |
| import os | |
| import time | |
| import subprocess | |
| import signal | |
| # Add the web directory to the path | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) | |
| def test_mcp_server_startup(): | |
| """Test that we can start and stop the MCP server.""" | |
| try: | |
| # Import the MCP server | |
| from mcp_server import RTSGameMCP | |
| # Create an instance | |
| server = RTSGameMCP() | |
| # Verify that the server was created with the correct parameters | |
| assert hasattr(server, 'mcp'), "Server should have an mcp attribute" | |
| assert server.mcp.name == "RTS Commander MCP Server", "Server should have the correct name" | |
| assert server.mcp.settings.port == 8001, "Server should be configured to run on port 8001" | |
| print("โ MCP server creation and configuration successful") | |
| return True | |
| except Exception as e: | |
| print(f"โ Failed to create or configure MCP server: {e}") | |
| return False | |
| def test_mcp_tools_registration(): | |
| """Test that tools are properly registered.""" | |
| try: | |
| # Import the MCP server | |
| from mcp_server import RTSGameMCP | |
| # Create an instance | |
| server = RTSGameMCP() | |
| # Verify that tools are registered | |
| # Note: This is a simplified test - in practice, you'd need to check the FastMCP internals | |
| print("โ MCP tools registration test completed") | |
| return True | |
| except Exception as e: | |
| print(f"โ Failed to test tools registration: {e}") | |
| return False | |
| def test_mcp_resources_registration(): | |
| """Test that resources are properly registered.""" | |
| try: | |
| # Import the MCP server | |
| from mcp_server import RTSGameMCP | |
| # Create an instance | |
| server = RTSGameMCP() | |
| # Verify that resources are registered | |
| # Note: This is a simplified test - in practice, you'd need to check the FastMCP internals | |
| print("โ MCP resources registration test completed") | |
| return True | |
| except Exception as e: | |
| print(f"โ Failed to test resources registration: {e}") | |
| return False | |
| def main(): | |
| """Run all integration tests.""" | |
| print("Running MCP integration tests...") | |
| tests = [ | |
| test_mcp_server_startup, | |
| test_mcp_tools_registration, | |
| test_mcp_resources_registration, | |
| ] | |
| results = [] | |
| for test in tests: | |
| try: | |
| results.append(test()) | |
| except Exception as e: | |
| print(f"โ Test {test.__name__} failed with exception: {e}") | |
| results.append(False) | |
| if all(results): | |
| print("\nโ All MCP integration tests passed!") | |
| return 0 | |
| else: | |
| print("\nโ Some MCP integration tests failed!") | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |