Spaces:
Runtime error
Runtime error
| import logging | |
| import time | |
| import torch | |
| import psutil # Ensure psutil is imported here as well | |
| import GPUtil | |
| from datetime import datetime, timedelta | |
| import gc # Import garbage collector | |
| logger = logging.getLogger(__name__) | |
| class EnsembleMonitorAgent: | |
| def __init__(self): | |
| logger.info("Initializing EnsembleMonitorAgent.") | |
| self.performance_metrics = {} | |
| self.alerts = [] | |
| def monitor_prediction(self, model_id, prediction_label, confidence_score, inference_time): | |
| logger.info(f"Monitoring prediction for model '{model_id}'. Label: {prediction_label}, Confidence: {confidence_score:.2f}, Time: {inference_time:.4f}s") | |
| if model_id not in self.performance_metrics: | |
| self.performance_metrics[model_id] = { | |
| "total_predictions": 0, | |
| "correct_predictions": 0, # This would require ground truth, which we don't have here. | |
| "total_confidence": 0.0, | |
| "total_inference_time": 0.0 | |
| } | |
| metrics = self.performance_metrics[model_id] | |
| metrics["total_predictions"] += 1 | |
| metrics["total_confidence"] += confidence_score | |
| metrics["total_inference_time"] += inference_time | |
| # Example alert: model taking too long | |
| if inference_time > 5.0: # Threshold for slow inference | |
| alert_msg = f"ALERT: Model '{model_id}' inference time exceeded 5.0s: {inference_time:.4f}s" | |
| self.alerts.append(alert_msg) | |
| logger.warning(alert_msg) | |
| # Example alert: low confidence | |
| if confidence_score < 0.5: # Threshold for low confidence | |
| alert_msg = f"ALERT: Model '{model_id}' returned low confidence: {confidence_score:.2f}" | |
| self.alerts.append(alert_msg) | |
| logger.warning(alert_msg) | |
| logger.info(f"Updated metrics for '{model_id}': {metrics}") | |
| def get_performance_summary(self): | |
| logger.info("Generating performance summary for all models.") | |
| summary = {} | |
| for model_id, metrics in self.performance_metrics.items(): | |
| avg_confidence = metrics["total_confidence"] / metrics["total_predictions"] if metrics["total_predictions"] > 0 else 0 | |
| avg_inference_time = metrics["total_inference_time"] / metrics["total_predictions"] if metrics["total_predictions"] > 0 else 0 | |
| summary[model_id] = { | |
| "avg_confidence": avg_confidence, | |
| "avg_inference_time": avg_inference_time, | |
| "total_predictions": metrics["total_predictions"] | |
| } | |
| logger.info(f"Performance summary: {summary}") | |
| return summary | |
| class WeightOptimizationAgent: | |
| def __init__(self, weight_manager): | |
| logger.info("Initializing WeightOptimizationAgent.") | |
| self.weight_manager = weight_manager | |
| self.prediction_history = [] | |
| self.performance_window = timedelta(hours=24) # Evaluate performance over last 24 hours | |
| def analyze_performance(self, final_prediction, ground_truth=None): | |
| logger.info(f"Analyzing performance. Final prediction: {final_prediction}, Ground truth: {ground_truth}") | |
| timestamp = datetime.now() | |
| self.prediction_history.append({ | |
| "timestamp": timestamp, | |
| "final_prediction": final_prediction, | |
| "ground_truth": ground_truth # Ground truth is often not available in real-time | |
| }) | |
| # Keep history windowed | |
| self.prediction_history = [p for p in self.prediction_history if timestamp - p["timestamp"] < self.performance_window] | |
| logger.info(f"Prediction history length: {len(self.prediction_history)}") | |
| # In a real scenario, this would involve a more complex optimization logic | |
| # For now, it just logs the history length. | |
| class SystemHealthAgent: | |
| def __init__(self): | |
| logger.info("Initializing SystemHealthAgent.") | |
| self.health_metrics = { | |
| "cpu_percent": 0, | |
| "memory_usage": {"total": 0, "available": 0, "percent": 0}, | |
| "gpu_utilization": [] | |
| } | |
| def monitor_system_health(self): | |
| logger.info("Monitoring system health...") | |
| self.health_metrics["cpu_percent"] = psutil.cpu_percent(interval=1) | |
| mem = psutil.virtual_memory() | |
| self.health_metrics["memory_usage"] = { | |
| "total": mem.total, | |
| "available": mem.available, | |
| "percent": mem.percent | |
| } | |
| # Holy moly, been at 99% for hours whoops | |
| if mem.percent > 90: | |
| logger.warning(f"CRITICAL: System memory usage is at {mem.percent}%. Attempting to clear memory cache...") | |
| gc.collect() | |
| logger.info("Garbage collection triggered. Re-checking memory usage...") | |
| mem_after_gc = psutil.virtual_memory() | |
| self.health_metrics["memory_usage_after_gc"] = { | |
| "total": mem_after_gc.total, | |
| "available": mem_after_gc.available, | |
| "percent": mem_after_gc.percent | |
| } | |
| logger.info(f"Memory usage after GC: {mem_after_gc.percent}%") | |
| gpu_info = [] | |
| try: | |
| gpus = GPUtil.getGPUs() | |
| for gpu in gpus: | |
| gpu_info.append({ | |
| "id": gpu.id, | |
| "name": gpu.name, | |
| "load": gpu.load, | |
| "memoryUtil": gpu.memoryUtil, | |
| "memoryTotal": gpu.memoryTotal, | |
| "memoryUsed": gpu.memoryUsed | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Could not retrieve GPU information: {e}") | |
| gpu_info.append({"error": str(e)}) | |
| self.health_metrics["gpu_utilization"] = gpu_info | |
| logger.info(f"System health metrics: CPU: {self.health_metrics['cpu_percent']}%, Memory: {self.health_metrics['memory_usage']['percent']}%, GPU: {gpu_info}") |