""" Authentication middleware for W&B MCP Server. Implements Bearer token validation for HTTP transport as per MCP specification: https://modelcontextprotocol.io/specification/draft/basic/authorization Clients send their W&B API keys as Bearer tokens, which the server then uses for all W&B operations on behalf of that client. """ import os import logging import re from typing import Optional, Dict, Any from fastapi import HTTPException, Request, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.responses import JSONResponse logger = logging.getLogger(__name__) # Bearer token security scheme bearer_scheme = HTTPBearer(auto_error=False) class MCPAuthConfig: """ Configuration for MCP authentication. For HTTP transport: Accepts any W&B API key as a Bearer token. The server uses the client's token for all W&B operations. """ pass # Simple config, no OAuth metadata needed def is_valid_wandb_api_key(token: str) -> bool: """ Check if a token looks like a valid W&B API key format. W&B API keys are typically 40 characters but we'll be permissive. """ if not token: return False # Strip any whitespace that might have been included token = token.strip() # Be permissive - accept keys between 20 and 100 characters # The actual W&B API will validate the exact format if len(token) < 20 or len(token) > 100: return False # Basic validation - W&B keys contain alphanumeric and some special characters if re.match(r'^[a-zA-Z0-9_\-\.]+$', token): return True return False async def validate_bearer_token( credentials: Optional[HTTPAuthorizationCredentials], config: MCPAuthConfig ) -> str: """ Validate Bearer token (W&B API key) for MCP access. Accepts any valid-looking W&B API key. The actual validation happens when the key is used to call W&B APIs. Returns: The W&B API key to use for operations Raises: HTTPException: 401 Unauthorized with WWW-Authenticate header """ if not credentials or not credentials.credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization required - please provide your W&B API key as a Bearer token", headers={ "WWW-Authenticate": 'Bearer realm="W&B MCP"' } ) token = credentials.credentials.strip() # Strip any whitespace # Basic format validation if not is_valid_wandb_api_key(token): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid W&B API key format. Got {len(token)} characters. " f"Get your key at: https://wandb.ai/authorize", headers={ "WWW-Authenticate": 'Bearer realm="W&B MCP", error="invalid_token"' } ) logger.debug(f"Bearer token validated successfully (length: {len(token)})") return token async def mcp_auth_middleware(request: Request, call_next): """ FastAPI middleware for MCP authentication on HTTP transport. Only applies to MCP endpoints (/mcp/*). Extracts the client's W&B API key from the Bearer token and stores it for use in W&B operations. """ # Only apply auth to MCP endpoints if not request.url.path.startswith("/mcp"): return await call_next(request) # Skip auth if explicitly disabled (development only) if os.environ.get("MCP_AUTH_DISABLED", "false").lower() == "true": logger.warning("MCP authentication is disabled - endpoints are publicly accessible") return await call_next(request) config = MCPAuthConfig() try: # Extract bearer token from Authorization header authorization = request.headers.get("Authorization", "") credentials = None if authorization.startswith("Bearer "): # Remove "Bearer " prefix and strip any whitespace token = authorization[7:].strip() credentials = HTTPAuthorizationCredentials( scheme="Bearer", credentials=token ) # Validate and get the W&B API key wandb_api_key = await validate_bearer_token(credentials, config) # Make sure the key is clean (no extra whitespace or encoding issues) wandb_api_key = wandb_api_key.strip() # Store the API key in request state for W&B operations request.state.wandb_api_key = wandb_api_key # Set the API key for this request # Note: We don't restore the original value because with streaming responses, # the tool execution happens after call_next returns. Each request sets its own key. os.environ["WANDB_API_KEY"] = wandb_api_key # Debug logging logger.debug(f"Auth middleware: Set WANDB_API_KEY with length={len(wandb_api_key)}, " f"is_40_chars={len(wandb_api_key) == 40}") # Continue processing without restoring the env var # Each request will set its own API key response = await call_next(request) return response except HTTPException as e: # Return proper error response return JSONResponse( status_code=e.status_code, content={"error": e.detail}, headers=e.headers ) except Exception as e: logger.error(f"Authentication error: {e}") return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"error": "Authentication failed"}, headers={ "WWW-Authenticate": 'Bearer realm="W&B MCP"' } ) # OAuth-related functions removed - see AUTH_README.md for details