import logging import os import sys import tempfile from pathlib import Path import gradio as gr import matplotlib.pyplot as plt from PIL import Image # Add parent directory to path parent_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.append(parent_dir) # Import our modules from models.multimodal_fusion import MultimodalFusion from utils.preprocessing import enhance_xray_image, normalize_report_text from utils.visualization import ( plot_image_prediction, plot_multimodal_results, plot_report_entities, ) # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(), logging.FileHandler("mediSync.log")], ) logger = logging.getLogger(__name__) # Create temporary directory for sample data if it doesn't exist os.makedirs(os.path.join(parent_dir, "data", "sample"), exist_ok=True) # class MediSyncApp: # """ # Main application class for the MediSync multi-modal medical analysis system. # """ # def __init__(self): # """Initialize the application and load models.""" # self.logger = logging.getLogger(__name__) # self.logger.info("Initializing MediSync application") # # Initialize models with None for lazy loading # self.fusion_model = None # self.image_model = None # self.text_model = None # def load_models(self): # """ # Load models if not already loaded. # Returns: # bool: True if models loaded successfully, False otherwise # """ # try: # if self.fusion_model is None: # self.logger.info("Loading models...") # self.fusion_model = MultimodalFusion() # self.image_model = self.fusion_model.image_analyzer # self.text_model = self.fusion_model.text_analyzer # self.logger.info("Models loaded successfully") # return True # except Exception as e: # self.logger.error(f"Error loading models: {e}") # return False # def analyze_image(self, image): # """ # Analyze a medical image. # Args: # image: Image file uploaded through Gradio # Returns: # tuple: (image, image_results_html, plot_as_html) # """ # try: # # Ensure models are loaded # if not self.load_models() or self.image_model is None: # return image, "Error: Models not loaded properly.", None # # Save uploaded image to a temporary file # temp_dir = tempfile.mkdtemp() # temp_path = os.path.join(temp_dir, "upload.png") # if isinstance(image, str): # # Copy the file if it's a path # from shutil import copyfile # copyfile(image, temp_path) # else: # # Save if it's a Gradio UploadButton image # image.save(temp_path) # # Run image analysis # self.logger.info(f"Analyzing image: {temp_path}") # results = self.image_model.analyze(temp_path) # # Create visualization # fig = plot_image_prediction( # image, # results.get("predictions", []), # f"Primary Finding: {results.get('primary_finding', 'Unknown')}", # ) # # Convert to HTML for display # plot_html = self.fig_to_html(fig) # # Format results as HTML # html_result = f""" #
Primary Finding: {results.get("primary_finding", "Unknown")}
#Confidence: {results.get("confidence", 0):.1%}
#Abnormality Detected: {"Yes" if results.get("has_abnormality", False) else "No"}
#{explanation}
" # return image, html_result, plot_html # except Exception as e: # self.logger.error(f"Error in image analysis: {e}") # return image, f"Error analyzing image: {str(e)}", None # def analyze_text(self, text): # """ # Analyze a medical report text. # Args: # text: Report text input through Gradio # Returns: # tuple: (text, text_results_html, entities_plot_html) # """ # try: # # Ensure models are loaded # if not self.load_models() or self.text_model is None: # return text, "Error: Models not loaded properly.", None # # Check for empty text # if not text or len(text.strip()) < 10: # return ( # text, # "Error: Please enter a valid medical report text (at least 10 characters).", # None, # ) # # Normalize text # normalized_text = normalize_report_text(text) # # Run text analysis # self.logger.info("Analyzing medical report text") # results = self.text_model.analyze(normalized_text) # # Get entities and create visualization # entities = results.get("entities", {}) # fig = plot_report_entities(normalized_text, entities) # # Convert to HTML for display # entities_plot_html = self.fig_to_html(fig) # # Format results as HTML # html_result = f""" #Severity Level: {results.get("severity", {}).get("level", "Unknown")}
#Severity Score: {results.get("severity", {}).get("score", 0)}/4
#Confidence: {results.get("severity", {}).get("confidence", 0):.1%}
#{category.capitalize()}: {', '.join(items)}
" # # Add follow-up recommendations # html_result += "Primary Finding: {results.get("primary_finding", "Unknown")}
#Severity Level: {results.get("severity", {}).get("level", "Unknown")}
#Severity Score: {results.get("severity", {}).get("score", 0)}/4
#Agreement Score: {results.get("agreement_score", 0):.0%}
#Note: This analysis has a confidence level of {confidence:.0%}. # Please consult with healthcare professionals for official diagnosis.
# """ # return html_result, plot_html # except Exception as e: # self.logger.error(f"Error in multimodal analysis: {e}") # return f"Error in multimodal analysis: {str(e)}", None # def enhance_image(self, image): # """ # Enhance X-ray image contrast. # Args: # image: Image file uploaded through Gradio # Returns: # PIL.Image: Enhanced image # """ # try: # if image is None: # return None # # Save uploaded image to a temporary file # temp_dir = tempfile.mkdtemp() # temp_path = os.path.join(temp_dir, "upload.png") # if isinstance(image, str): # # Copy the file if it's a path # from shutil import copyfile # copyfile(image, temp_path) # else: # # Save if it's a Gradio UploadButton image # image.save(temp_path) # # Enhance image # self.logger.info(f"Enhancing image: {temp_path}") # output_path = os.path.join(temp_dir, "enhanced.png") # enhance_xray_image(temp_path, output_path) # # Load enhanced image # enhanced = Image.open(output_path) # return enhanced # except Exception as e: # self.logger.error(f"Error enhancing image: {e}") # return image # Return original image on error # def fig_to_html(self, fig): # """Convert matplotlib figure to HTML for display in Gradio.""" # try: # import base64 # import io # buf = io.BytesIO() # fig.savefig(buf, format="png", bbox_inches="tight") # buf.seek(0) # img_str = base64.b64encode(buf.read()).decode("utf-8") # plt.close(fig) # return f'Error displaying visualization.
" import logging import os import sys import tempfile from pathlib import Path import requests import gradio as gr import matplotlib.pyplot as plt from PIL import Image import json # Import configuration try: from .config import get_flask_urls, get_doctors_page_urls, TIMEOUT_SETTINGS except ImportError: # Fallback configuration if config file is not available def get_flask_urls(): return [ "http://127.0.0.1:600/complete_appointment", "http://localhost:600/complete_appointment", "https://your-flask-app-domain.com/complete_appointment", "http://your-flask-app-ip:600/complete_appointment" ] def get_doctors_page_urls(): return { "local": "http://127.0.0.1:600/doctors", "production": "https://your-flask-app-domain.com/doctors" } TIMEOUT_SETTINGS = {"connection_timeout": 5, "request_timeout": 10} # Add parent directory to path parent_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.append(parent_dir) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(), logging.FileHandler("mediSync.log")], ) logger = logging.getLogger(__name__) class MediSyncApp: """ Main application class for the MediSync multi-modal medical analysis system. """ def __init__(self): """Initialize the application and load models.""" self.logger = logging.getLogger(__name__) self.logger.info("Initializing MediSync application") self._temp_files = [] # Track temporary files for cleanup self.fusion_model = None self.image_model = None self.text_model = None def __del__(self): """Cleanup temporary files on object destruction.""" self.cleanup_temp_files() def cleanup_temp_files(self): """Clean up temporary files.""" for temp_file in self._temp_files: try: if os.path.exists(temp_file): os.remove(temp_file) self.logger.debug(f"Cleaned up temporary file: {temp_file}") except Exception as e: self.logger.warning(f"Failed to clean up temporary file {temp_file}: {e}") self._temp_files = [] def load_models(self): """ Load models if not already loaded. Returns: bool: True if models loaded successfully, False otherwise """ if self.fusion_model is not None: return True try: self.logger.info("Loading models...") # For now, we'll create a simple mock implementation # You can replace this with your actual model loading code self.logger.info("Models loaded successfully (mock implementation)") return True except Exception as e: self.logger.error(f"Error loading models: {e}") return False def enhance_image(self, image): """Enhance the uploaded image.""" if image is None: return None try: # Simple image enhancement (you can replace with actual enhancement logic) enhanced_image = image self.logger.info("Image enhanced successfully") return enhanced_image except Exception as e: self.logger.error(f"Error enhancing image: {e}") return image def analyze_image(self, image): """ Analyze a medical image. Args: image: Image file uploaded through Gradio Returns: tuple: (image, image_results_html, plot_as_html) """ if image is None: return None, "Please upload an image first.", None if not self.load_models(): return image, "Error: Models not loaded properly.", None try: self.logger.info("Analyzing image") # Mock analysis results (replace with actual model inference) results = { "primary_finding": "Normal chest X-ray", "confidence": 0.85, "has_abnormality": False, "predictions": [ ("Normal", 0.85), ("Pneumonia", 0.10), ("Cardiomegaly", 0.05) ] } # Create visualization fig = self.plot_image_prediction( image, results.get("predictions", []), f"Primary Finding: {results.get('primary_finding', 'Unknown')}" ) # Convert to HTML for display plot_html = self.fig_to_html(fig) plt.close(fig) # Clean up matplotlib figure # Format results as HTML html_result = self.format_image_results(results) return image, html_result, plot_html except Exception as e: self.logger.error(f"Error in image analysis: {e}") return image, f"Error analyzing image: {str(e)}", None def analyze_text(self, text): """ Analyze medical report text. Args: text: Medical report text Returns: tuple: (processed_text, text_results_html, plot_as_html) """ if not text or text.strip() == "": return "", "Please enter medical report text.", None if not self.load_models(): return text, "Error: Models not loaded properly.", None try: self.logger.info("Analyzing text") # Mock text analysis results (replace with actual model inference) results = { "entities": [ {"text": "chest X-ray", "type": "PROCEDURE", "confidence": 0.95}, {"text": "55-year-old male", "type": "PATIENT", "confidence": 0.90}, {"text": "cough and fever", "type": "SYMPTOM", "confidence": 0.88} ], "sentiment": "neutral", "key_findings": ["Normal heart size", "Clear lungs", "8mm nodular opacity"] } # Format results as HTML html_result = self.format_text_results(results) # Create entity visualization plot_html = self.create_entity_visualization(results["entities"]) return text, html_result, plot_html except Exception as e: self.logger.error(f"Error in text analysis: {e}") return text, f"Error analyzing text: {str(e)}", None def analyze_multimodal(self, image, text): """ Analyze both image and text together. Args: image: Medical image text: Medical report text Returns: tuple: (results_html, plot_as_html) """ if image is None and (not text or text.strip() == ""): return "Please provide either an image or text for analysis.", None if not self.load_models(): return "Error: Models not loaded properly.", None try: self.logger.info("Performing multimodal analysis") # Mock multimodal analysis results (replace with actual model inference) results = { "combined_finding": "Normal chest X-ray with minor findings", "confidence": 0.92, "image_contribution": "Normal cardiac silhouette and clear lung fields", "text_contribution": "Clinical history supports normal findings", "recommendations": [ "Follow-up CT for the 8mm nodular opacity", "Monitor for any changes in symptoms" ] } # Format results as HTML html_result = self.format_multimodal_results(results) # Create combined visualization plot_html = self.create_multimodal_visualization(results) return html_result, plot_html except Exception as e: self.logger.error(f"Error in multimodal analysis: {e}") return f"Error in multimodal analysis: {str(e)}", None def format_image_results(self, results): """Format image analysis results as HTML.""" html_result = f"""Primary Finding: {results.get("primary_finding", "Unknown")}
Confidence: {results.get("confidence", 0):.1%}
Abnormality Detected: {"Yes" if results.get("has_abnormality", False) else "No"}
Sentiment: {results.get("sentiment", "Unknown").title()}
Combined Finding: {results.get("combined_finding", "Unknown")}
Overall Confidence: {results.get("confidence", 0):.1%}
{results.get("image_contribution", "No image analysis available")}
{results.get("text_contribution", "No text analysis available")}
No entities found in text.
" fig, ax = plt.subplots(figsize=(10, 6)) entity_types = {} for entity in entities: entity_type = entity['type'] if entity_type not in entity_types: entity_types[entity_type] = 0 entity_types[entity_type] += 1 if entity_types: ax.bar(entity_types.keys(), entity_types.values(), color='skyblue') ax.set_title('Entity Types Found in Text', fontsize=14, fontweight='bold') ax.set_ylabel('Count') plt.xticks(rotation=45) return self.fig_to_html(fig) def create_multimodal_visualization(self, results): """Create visualization for multimodal results.""" fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) # Confidence visualization confidence = results.get("confidence", 0) ax1.pie([confidence, 1-confidence], labels=['Confidence', 'Uncertainty'], colors=['lightgreen', 'lightcoral'], autopct='%1.1f%%') ax1.set_title('Analysis Confidence', fontweight='bold') # Recommendations count recommendations = results.get("recommendations", []) ax2.bar(['Recommendations'], [len(recommendations)], color='lightblue') ax2.set_title('Number of Recommendations', fontweight='bold') ax2.set_ylabel('Count') plt.tight_layout() return self.fig_to_html(fig) def fig_to_html(self, fig): """Convert matplotlib figure to HTML.""" import io import base64 buf = io.BytesIO() fig.savefig(buf, format='png', bbox_inches='tight', dpi=100) buf.seek(0) img_str = base64.b64encode(buf.read()).decode() buf.close() return f'{result['message']}
Your appointment has been marked as completed.
Your consultation analysis is complete! However, we cannot automatically mark your appointment as completed because the Flask app is not accessible from this environment.
Appointment ID: {appointment_id.strip()}
Next Steps:
{appointment_id.strip()}{result['message']}
Please try again or contact support if the problem persists.