Spaces:
Running
Running
| from fastapi import APIRouter, HTTPException, Depends | |
| from sqlalchemy.orm import Session | |
| from typing import List, Dict, Any | |
| from .. import crud, database, schemas | |
| from ..services.schema_validator import schema_validator | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from ..routers.admin import verify_admin_token | |
| router = APIRouter() | |
| security = HTTPBearer() | |
| def get_db(): | |
| db = database.SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| def verify_admin_access(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| """Verify admin token for schema endpoints""" | |
| token = credentials.credentials | |
| if not verify_admin_token(token): | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid or expired admin token" | |
| ) | |
| return token | |
| async def get_schemas( | |
| db: Session = Depends(get_db), | |
| token: str = Depends(verify_admin_access) | |
| ): | |
| """Get all JSON schemas (admin only)""" | |
| try: | |
| # Get schemas from database | |
| db_schemas = crud.get_all_schemas(db) | |
| schemas_list = [] | |
| for schema in db_schemas: | |
| schemas_list.append({ | |
| "schema_id": schema.schema_id, | |
| "title": schema.title, | |
| "version": schema.version, | |
| "created_at": schema.created_at.isoformat() if schema.created_at else None, | |
| "schema": schema.schema | |
| }) | |
| return schemas_list | |
| except Exception as e: | |
| raise HTTPException(500, f"Failed to get schemas: {str(e)}") | |
| async def get_schema( | |
| schema_id: str, | |
| db: Session = Depends(get_db), | |
| token: str = Depends(verify_admin_access) | |
| ): | |
| """Get a specific JSON schema (admin only)""" | |
| try: | |
| schema = crud.get_schema(db, schema_id) | |
| if not schema: | |
| raise HTTPException(404, f"Schema {schema_id} not found") | |
| return { | |
| "schema_id": schema.schema_id, | |
| "title": schema.title, | |
| "version": schema.version, | |
| "created_at": schema.created_at.isoformat() if schema.created_at else None, | |
| "schema": schema.schema | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, f"Failed to get schema: {str(e)}") | |
| async def validate_data_against_schema( | |
| data: Dict[str, Any], | |
| schema_id: str, | |
| db: Session = Depends(get_db), | |
| token: str = Depends(verify_admin_access) | |
| ): | |
| """Validate JSON data against a specific schema (admin only)""" | |
| try: | |
| # Get the schema from database | |
| schema = crud.get_schema(db, schema_id) | |
| if not schema: | |
| raise HTTPException(404, f"Schema {schema_id} not found") | |
| # Validate the data | |
| is_valid, error_msg = schema_validator.validate_against_schema( | |
| data, schema.schema, schema_id | |
| ) | |
| return { | |
| "is_valid": is_valid, | |
| "error_message": error_msg, | |
| "schema_id": schema_id, | |
| "data": data | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, f"Validation failed: {str(e)}") | |
| async def test_schema_validation( | |
| image_type: str, | |
| data: Dict[str, Any], | |
| token: str = Depends(verify_admin_access) | |
| ): | |
| """Test data validation against image type schemas (admin only)""" | |
| try: | |
| # Validate data against the appropriate schema for the image type | |
| cleaned_data, is_valid, error_msg = schema_validator.clean_and_validate_data( | |
| data, image_type | |
| ) | |
| return { | |
| "is_valid": is_valid, | |
| "error_message": error_msg, | |
| "image_type": image_type, | |
| "original_data": data, | |
| "cleaned_data": cleaned_data if is_valid else None | |
| } | |
| except Exception as e: | |
| raise HTTPException(500, f"Test validation failed: {str(e)}") | |
| async def get_validation_stats( | |
| db: Session = Depends(get_db), | |
| token: str = Depends(verify_admin_access) | |
| ): | |
| """Get validation statistics (admin only)""" | |
| try: | |
| # Get recent images with validation info | |
| recent_images = crud.get_recent_images_with_validation(db, limit=100) | |
| stats = { | |
| "total_images": len(recent_images), | |
| "validation_passed": 0, | |
| "validation_failed": 0, | |
| "validation_errors": [], | |
| "by_image_type": { | |
| "crisis_map": {"total": 0, "passed": 0, "failed": 0}, | |
| "drone_image": {"total": 0, "passed": 0, "failed": 0} | |
| } | |
| } | |
| for img in recent_images: | |
| if hasattr(img, 'raw_json') and img.raw_json: | |
| image_type = img.image_type | |
| if image_type in stats["by_image_type"]: | |
| stats["by_image_type"][image_type]["total"] += 1 | |
| # Check if validation failed | |
| if img.raw_json.get("validation_failed"): | |
| stats["validation_failed"] += 1 | |
| if image_type in stats["by_image_type"]: | |
| stats["by_image_type"][image_type]["failed"] += 1 | |
| error = img.raw_json.get("validation_error", "Unknown error") | |
| stats["validation_errors"].append({ | |
| "image_id": str(img.image_id), | |
| "image_type": image_type, | |
| "error": error, | |
| "created_at": img.created_at.isoformat() if img.created_at else None | |
| }) | |
| else: | |
| stats["validation_passed"] += 1 | |
| if image_type in stats["by_image_type"]: | |
| stats["by_image_type"][image_type]["passed"] += 1 | |
| return stats | |
| except Exception as e: | |
| raise HTTPException(500, f"Failed to get validation stats: {str(e)}") | |
| async def update_schema( | |
| schema_id: str, | |
| schema_data: Dict[str, Any], | |
| db: Session = Depends(get_db), | |
| token: str = Depends(verify_admin_access) | |
| ): | |
| """Update a JSON schema (admin only)""" | |
| try: | |
| # Get the existing schema | |
| existing_schema = crud.get_schema(db, schema_id) | |
| if not existing_schema: | |
| raise HTTPException(404, f"Schema {schema_id} not found") | |
| # Update only the schema content | |
| existing_schema.schema = schema_data.get("schema", existing_schema.schema) | |
| db.commit() | |
| db.refresh(existing_schema) | |
| # Clear schema cache to ensure fresh data on next validation | |
| schema_validator.clear_schema_cache(schema_id) | |
| return { | |
| "schema_id": existing_schema.schema_id, | |
| "title": existing_schema.title, | |
| "version": existing_schema.version, | |
| "created_at": existing_schema.created_at.isoformat() if existing_schema.created_at else None, | |
| "schema": existing_schema.schema | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException(500, f"Failed to update schema: {str(e)}") |