import os import joblib # Use joblib to load files saved with joblib.dump import pandas as pd from typing import Dict, Any, List # Define the models and the version based on the training script output class EndpointHandler: VERSION = "1.0" # Maps user-friendly aliases to the EXACT filenames from your training script MODEL_MAP = { "decision_tree": "classifier_FULL_Decision_Tree.pkl", "random_forest": "classifier_FULL_Random_Forest.pkl", "xgboost": "classifier_FULL_XGBoost.pkl", } # List of all features the model expects (is_fraud is excluded as it's the target) EXPECTED_FEATURES = [ "cc_num", "merchant", "category", "amt", "gender", "state", "zip", "lat", "long", "city_pop", "job", "unix_time", "merch_lat", "merch_long", "age", "trans_hour", "trans_day", "trans_month", "trans_weekday", "distance" ] def __init__(self, path="."): """Loads all three Pipeline objects using joblib.""" self.models = {} print(f"Server starting up for version: {self.VERSION}") for alias, filename in self.MODEL_MAP.items(): model_path = os.path.join(path, filename) try: # Use joblib.load for files saved with joblib.dump self.models[alias] = joblib.load(model_path) print(f"✅ Pipeline loaded for {alias}") except Exception as e: # Note: Errors here are often due to package version mismatch print(f"❌ Error loading {filename}. Check scikit-learn/xgboost versions: {e}") def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: """Handles the API request, selects the model, and performs inference.""" inputs = data.get("inputs", {}) target_model_alias = inputs.get("model_name") requested_version = inputs.get("model_version") features_list = inputs.get("features") base_response = {"server_version": self.VERSION} # 1. Validation Checks if requested_version != self.VERSION: return { **base_response, "error": f"Requested version '{requested_version}' does not match server version '{self.VERSION}'." } if not target_model_alias or target_model_alias not in self.models: return { **base_response, "error": f"Model '{target_model_alias}' not found. Available: {list(self.MODEL_MAP.keys())}", } if not features_list: return { **base_response, "error": "No transaction features provided in the 'features' list." } # 2. Prepare Data model_pipeline = self.models[target_model_alias] try: # Convert list of dicts to DataFrame and ensure column order matches training data df_features = pd.DataFrame(features_list) # CRITICAL: Reindex to ensure the columns are in the exact order the pipeline expects if set(df_features.columns) != set(self.EXPECTED_FEATURES): return { **base_response, "error": "Input features do not match expected features. Check column names." } df_features = df_features[self.EXPECTED_FEATURES] except Exception as e: return { **base_response, "error": f"Data preparation failed. Ensure JSON fields match all expected features: {str(e)}" } # 3. Predict Probability try: # predict_proba runs the ColumnTransformer (preprocessing) and then the classifier # We take the probability of the positive class (Fraud=1), which is column [:, 1] probabilities = model_pipeline.predict_proba(df_features)[:, 1] return { **base_response, "model_used": target_model_alias, "model_version": requested_version, "prediction_probabilities": probabilities.tolist(), } except Exception as e: return { **base_response, "error": f"Prediction execution failed. This may indicate a data type mismatch: {str(e)}", }