Spaces:
Paused
Paused
File size: 5,712 Bytes
f647629 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
"""
Pytest conftest.py for MCP Server test suite.
This file contains shared fixtures and hooks for the test suite, with a particular
focus on managing Weave evaluation logging in a distributed testing environment
using pytest-xdist.
The Weave aggregation logic has been extracted to `weave_test_aggregator.py` to
keep this file focused on pytest-specific concerns.
Problem with pytest-xdist and session-level Weave Logging:
When using pytest-xdist for parallel test execution, the `pytest_sessionfinish`
hook runs in each worker process AND the master process. To avoid duplicate Weave
evaluations, we ensure aggregation only happens in the master process.
Solution:
- Worker detection via `session.config.workerinput`
- Weave aggregation only runs when `worker_id == "master"`
- All Weave logic is delegated to the `WeaveTestAggregator` class
"""
import json
import logging
import os
import uuid
from datetime import datetime
import pytest
from dotenv import load_dotenv
from .weave_test_aggregator import aggregate_and_log_test_results
# Load environment variables
load_dotenv()
# Disable Weave tracing in worker processes by default
os.environ["WEAVE_DISABLED"] = "true"
os.environ["WANDB_SILENT"] = "true"
# Configure logging
logger = logging.getLogger("pytest.conftest")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info(f"Initial WEAVE_DISABLED set to: {os.environ.get('WEAVE_DISABLED')}")
# Weave/W&B configuration
WANDB_TEST_SUITE_PROJECT = os.environ.get("WANDB_PROJECT", "wandb-mcp-server-test-suite-outputs")
WANDB_TEST_SUITE_ENTITY = os.environ.get("WANDB_ENTITY", "wandb-applied-ai-team")
WEAVE_RESULTS_DIR_NAME = "weave_eval_results_json"
@pytest.fixture(scope="session", autouse=True)
def setup_weave_session_config(request):
"""Session-wide setup for Weave configuration."""
logger.info(f"Pytest session starting. Target Weave project: {WANDB_TEST_SUITE_ENTITY}/{WANDB_TEST_SUITE_PROJECT}")
def pytest_configure(config):
"""Configure pytest settings, particularly for async tests."""
if hasattr(config.option, "asyncio_mode"):
config.option.asyncio_mode = "auto"
config.option.asyncio_default_fixture_loop_scope = "function"
class DateTimeEncoder(json.JSONEncoder):
"""JSON encoder that handles datetime objects."""
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
@pytest.fixture(scope="session")
def weave_results_dir(tmp_path_factory):
"""Create a session-scoped temporary directory for Weave result files."""
results_dir = tmp_path_factory.mktemp(WEAVE_RESULTS_DIR_NAME, numbered=False)
logger.info(f"Session temp results directory created: {results_dir}")
yield results_dir
def pytest_sessionfinish(session):
"""
Handle session finish - aggregate and log Weave results from master process only.
This hook runs in both worker and master processes when using pytest-xdist.
We ensure Weave aggregation only happens once by checking the worker_id.
"""
invocation_id = str(uuid.uuid4())
# Determine if this is a worker or master process
worker_id = "master"
workerinput = getattr(session.config, "workerinput", None)
if workerinput is not None:
worker_id = workerinput.get("workerid", "worker_unknown")
logger.info(f"pytest_sessionfinish invoked (ID: {invocation_id}, PID: {os.getpid()}, Worker: {worker_id})")
if worker_id != "master":
logger.info(f"WORKER_LOGIC_SKIP: Skipping aggregation for worker '{worker_id}' (ID: {invocation_id})")
return
logger.info(f"MASTER_LOGIC_RUN: Running Weave aggregation in master process (ID: {invocation_id})")
# Temporarily enable Weave for the master process
original_weave_disabled = os.environ.get("WEAVE_DISABLED")
logger.info(f"(ID: {invocation_id}) Original WEAVE_DISABLED: {original_weave_disabled}")
try:
os.environ["WEAVE_DISABLED"] = "false"
logger.info(f"(ID: {invocation_id}) WEAVE_DISABLED temporarily set to 'false' for master")
# Get base temporary directory
try:
base_tmp_dir = session.config._tmp_path_factory.getbasetemp()
logger.info(f"(ID: {invocation_id}) Base temp directory: {base_tmp_dir}")
except Exception as e:
logger.error(f"(ID: {invocation_id}) Error accessing temp directory: {e}", exc_info=True)
return
# Delegate to the aggregator
success = aggregate_and_log_test_results(
entity=WANDB_TEST_SUITE_ENTITY,
project=WANDB_TEST_SUITE_PROJECT,
base_tmp_dir=base_tmp_dir,
invocation_id=invocation_id,
session_config=session.config,
results_dir_name=WEAVE_RESULTS_DIR_NAME
)
if success:
logger.info(f"(ID: {invocation_id}) Weave aggregation completed successfully")
else:
logger.warning(f"(ID: {invocation_id}) Weave aggregation completed with issues")
finally:
# Restore original WEAVE_DISABLED setting
if original_weave_disabled is None:
if os.environ.get("WEAVE_DISABLED") == "false":
del os.environ["WEAVE_DISABLED"]
else:
os.environ["WEAVE_DISABLED"] = original_weave_disabled
logger.info(f"(ID: {invocation_id}) WEAVE_DISABLED restored to: {os.environ.get('WEAVE_DISABLED')}")
|