|
|
import os |
|
|
import json |
|
|
import logging |
|
|
import numpy as np |
|
|
from typing import Dict, List, Optional, Tuple, Set |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
import pickle |
|
|
import re |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
DB_PATH = os.getenv("TEMPLATE_DB_PATH", "templates/medical_templates.pkl") |
|
|
GPT_MODEL = os.getenv("GPT_MODEL", "gpt-5") |
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
|
|
|
|
|
|
try: |
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain.prompts import ChatPromptTemplate |
|
|
HAS_LANGCHAIN = True |
|
|
except ImportError: |
|
|
HAS_LANGCHAIN = False |
|
|
logging.warning("LangChain not available") |
|
|
|
|
|
|
|
|
try: |
|
|
from template_db_creation import MedicalTemplateParser, TemplateInfo |
|
|
except ImportError: |
|
|
logging.error("template_db_creation module not found") |
|
|
|
|
|
@dataclass |
|
|
class SectionMatch: |
|
|
"""Représente le matching d'une section""" |
|
|
section_name: str |
|
|
confidence: float |
|
|
extracted_content: str |
|
|
can_fill: bool |
|
|
missing_info: List[str] |
|
|
|
|
|
@dataclass |
|
|
class TemplateMatch: |
|
|
"""Résultat détaillé du matching d'un template""" |
|
|
template_id: str |
|
|
template_info: TemplateInfo |
|
|
overall_score: float |
|
|
type_match_score: float |
|
|
physician_match_score: float |
|
|
center_match_score: float |
|
|
content_match_score: float |
|
|
filename_match_score: float |
|
|
fillability_score: float |
|
|
section_matches: Dict[str, SectionMatch] |
|
|
confidence_level: str |
|
|
can_be_filled: bool |
|
|
filling_percentage: float |
|
|
missing_critical_info: List[str] |
|
|
extracted_data: Dict[str, str] |
|
|
filename_indicators: List[str] |
|
|
|
|
|
@dataclass |
|
|
class FilenameAnalysis: |
|
|
"""Analyse d'un nom de fichier médical""" |
|
|
original_filename: str |
|
|
medical_keywords: List[str] |
|
|
document_type_indicators: List[str] |
|
|
specialty_indicators: List[str] |
|
|
center_indicators: List[str] |
|
|
anatomical_regions: List[str] |
|
|
procedure_type: Optional[str] |
|
|
confidence_score: float |
|
|
|
|
|
class TemplateMatcher: |
|
|
"""Système de matching entre transcriptions et templates médicaux""" |
|
|
|
|
|
def __init__(self, database_path: str = None): |
|
|
"""Initialise le matcher avec une base de données existante""" |
|
|
self.parser = None |
|
|
self.llm = None |
|
|
self.content_analyzer = None |
|
|
self.section_extractor = None |
|
|
self.filename_analyzer = None |
|
|
|
|
|
self._initialize_filename_keywords() |
|
|
self._initialize_gpt() |
|
|
|
|
|
if database_path and os.path.exists(database_path): |
|
|
self.load_database(database_path) |
|
|
else: |
|
|
logging.warning("Base de données non trouvée ou non spécifiée") |
|
|
|
|
|
def _initialize_filename_keywords(self): |
|
|
"""Initialise les mots-clés pour l'analyse des noms de fichiers""" |
|
|
self.filename_keywords = { |
|
|
|
|
|
"imagerie": { |
|
|
"irm": ["irm", "mri", "resonance"], |
|
|
"scanner": ["scanner", "tdm", "ct", "tomodensitometrie"], |
|
|
"echographie": ["echo", "echographie", "doppler", "ultrasound"], |
|
|
"radiologie": ["radio", "radiologie", "rx", "xray"], |
|
|
"pet": ["pet", "tep", "scintigraphie"], |
|
|
"mammographie": ["mammo", "mammographie", "breast"] |
|
|
}, |
|
|
|
|
|
|
|
|
"specialites": { |
|
|
"cardiologie": ["cardio", "coeur", "heart", "ecg", "holter"], |
|
|
"neurologie": ["neuro", "brain", "cerveau", "eeg"], |
|
|
"orthopedic": ["ortho", "os", "bone", "fracture"], |
|
|
"gynecologie": ["gyneco", "utérus", "ovaire", "pelvien"], |
|
|
"urologie": ["uro", "vessie", "rein", "prostate"], |
|
|
"pneumologie": ["pneumo", "poumon", "thorax", "resp"], |
|
|
"gastro": ["gastro", "abdomen", "foie", "intestin"] |
|
|
}, |
|
|
|
|
|
|
|
|
"anatomie": { |
|
|
"tete": ["tete", "crane", "cerebral", "encephale"], |
|
|
"thorax": ["thorax", "poumon", "coeur", "mediastin"], |
|
|
"abdomen": ["abdomen", "foie", "rate", "pancreas"], |
|
|
"pelvis": ["pelvis", "pelvien", "utérus", "ovaire", "vessie"], |
|
|
"membres": ["membre", "bras", "jambe", "genou", "epaule"], |
|
|
"rachis": ["rachis", "colonne", "vertebral", "lombaire"] |
|
|
}, |
|
|
|
|
|
|
|
|
"procedures": { |
|
|
"arteriel": ["arteriel", "artere", "vasculaire"], |
|
|
"veineux": ["veineux", "veine", "phlebo"], |
|
|
"fonctionnel": ["fonctionnel", "dynamique", "stress"], |
|
|
"contraste": ["contraste", "injection", "gadolinium"] |
|
|
}, |
|
|
|
|
|
|
|
|
"centres": { |
|
|
"roseraie": ["roseraie", "rose"], |
|
|
"4villes": ["4villes", "quatre"], |
|
|
"mstruk": ["mstruk", "struktur"], |
|
|
"radioroseraie": ["radioroseraie"] |
|
|
} |
|
|
} |
|
|
|
|
|
def _initialize_gpt(self): |
|
|
"""Initialise GPT pour l'analyse de contenu - avec gestion d'erreur améliorée""" |
|
|
if not HAS_LANGCHAIN: |
|
|
logging.warning("LangChain non disponible. Utilisation du mode fallback.") |
|
|
return |
|
|
|
|
|
api_key = os.getenv('OPENAI_API_KEY') |
|
|
if not api_key: |
|
|
logging.warning("OPENAI_API_KEY non définie. L'analyse GPT ne sera pas disponible.") |
|
|
return |
|
|
|
|
|
try: |
|
|
self.llm = ChatOpenAI( |
|
|
model=GPT_MODEL, |
|
|
temperature=0, |
|
|
max_tokens=4000, |
|
|
api_key=api_key |
|
|
) |
|
|
|
|
|
|
|
|
content_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", "Analyze this medical transcription and return a JSON with document_type, sections, and medical_data."), |
|
|
("human", "Analyze: {transcription}") |
|
|
]) |
|
|
|
|
|
self.content_analyzer = content_prompt | self.llm |
|
|
logging.info("✅ GPT initialisé") |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"❌ Erreur lors de l'initialisation GPT: {e}") |
|
|
self.llm = None |
|
|
|
|
|
def analyze_filename(self, filename: str) -> FilenameAnalysis: |
|
|
"""Analyse le nom de fichier pour extraire des informations médicales - mode fallback seulement""" |
|
|
return self._analyze_filename_fallback(filename) |
|
|
|
|
|
def _analyze_filename_fallback(self, filename: str) -> FilenameAnalysis: |
|
|
"""Analyse de fallback pour les noms de fichiers sans GPT""" |
|
|
clean_filename = os.path.basename(filename).lower() |
|
|
clean_filename = clean_filename.replace('.docx', '').replace('.doc', '').replace('.rtf', '') |
|
|
|
|
|
medical_keywords = [] |
|
|
document_type_indicators = [] |
|
|
specialty_indicators = [] |
|
|
center_indicators = [] |
|
|
anatomical_regions = [] |
|
|
procedure_type = None |
|
|
|
|
|
|
|
|
for category, subcategories in self.filename_keywords.items(): |
|
|
for subcat, keywords in subcategories.items(): |
|
|
for keyword in keywords: |
|
|
if keyword in clean_filename: |
|
|
if category == "imagerie": |
|
|
document_type_indicators.append(subcat) |
|
|
if subcat in ["echographie", "irm", "scanner"]: |
|
|
procedure_type = subcat |
|
|
elif category == "specialites": |
|
|
specialty_indicators.append(subcat) |
|
|
elif category == "anatomie": |
|
|
anatomical_regions.append(subcat) |
|
|
elif category == "centres": |
|
|
center_indicators.append(subcat) |
|
|
medical_keywords.append(keyword) |
|
|
|
|
|
|
|
|
total_elements = len(medical_keywords) + len(document_type_indicators) + len(specialty_indicators) |
|
|
confidence_score = min(1.0, total_elements / 5.0) |
|
|
|
|
|
return FilenameAnalysis( |
|
|
original_filename=filename, |
|
|
medical_keywords=medical_keywords, |
|
|
document_type_indicators=document_type_indicators, |
|
|
specialty_indicators=specialty_indicators, |
|
|
center_indicators=center_indicators, |
|
|
anatomical_regions=anatomical_regions, |
|
|
procedure_type=procedure_type, |
|
|
confidence_score=confidence_score |
|
|
) |
|
|
|
|
|
def load_database(self, filepath: str): |
|
|
"""Charge la base de données vectorielle avec gestion d'erreur""" |
|
|
try: |
|
|
if not hasattr(self, 'parser') or self.parser is None: |
|
|
self.parser = MedicalTemplateParser() |
|
|
self.parser.load_database(filepath) |
|
|
logging.info(f"✅ Base de données chargée: {len(self.parser.templates)} templates") |
|
|
except Exception as e: |
|
|
logging.error(f"Erreur lors du chargement de la base: {e}") |
|
|
raise |
|
|
|
|
|
def analyze_transcription_detailed(self, transcription: str, transcription_filename: str = "") -> Dict: |
|
|
"""Analyse simplifiée sans GPT pour éviter les erreurs""" |
|
|
return self._fallback_analysis(transcription, transcription_filename) |
|
|
|
|
|
def _fallback_analysis(self, transcription: str, transcription_filename: str = "") -> Dict: |
|
|
"""Analyse améliorée de fallback sans GPT""" |
|
|
text_lower = transcription.lower() |
|
|
|
|
|
|
|
|
document_types = { |
|
|
"compte_rendu_imagerie": ["irm", "scanner", "échographie", "radiologie", "t1", "t2", "doppler", "technique", "plans"], |
|
|
"rapport_biologique": ["laboratoire", "analyse", "biologie", "sang", "urine", "sérum"], |
|
|
"lettre_medicale": ["lettre", "courrier", "correspondance", "cher confrère"], |
|
|
"compte_rendu_consultation": ["consultation", "examen clinique", "patient", "antécédents"] |
|
|
} |
|
|
|
|
|
detected_type = "compte_rendu_imagerie" |
|
|
|
|
|
|
|
|
if transcription_filename: |
|
|
filename_lower = transcription_filename.lower() |
|
|
for doc_type, keywords in document_types.items(): |
|
|
if any(kw in filename_lower for kw in keywords): |
|
|
detected_type = doc_type |
|
|
break |
|
|
|
|
|
|
|
|
for doc_type, keywords in document_types.items(): |
|
|
if sum(1 for kw in keywords if kw in text_lower) >= 2: |
|
|
detected_type = doc_type |
|
|
break |
|
|
|
|
|
|
|
|
sections = {} |
|
|
|
|
|
|
|
|
markdown_sections = re.findall(r'\*\*(.*?)\s*:\s*\*\*(.*?)(?=\*\*|\Z)', transcription, re.DOTALL | re.IGNORECASE) |
|
|
|
|
|
for section_title, section_content in markdown_sections: |
|
|
section_title_clean = section_title.strip().lower() |
|
|
section_content_clean = section_content.strip() |
|
|
|
|
|
|
|
|
section_mapping = { |
|
|
"technique": ["technique", "méthode", "protocole", "acquisition"], |
|
|
"résultats": ["résultat", "résultats", "observation", "constatation", "analyse", "description"], |
|
|
"conclusion": ["conclusion", "diagnostic", "synthèse", "impression", "avis"], |
|
|
"indication": ["indication", "motif", "demande", "contexte"], |
|
|
"histoire": ["histoire", "antécédent", "contexte", "clinique"] |
|
|
} |
|
|
|
|
|
|
|
|
mapped_section = None |
|
|
for standard_name, variations in section_mapping.items(): |
|
|
if any(var in section_title_clean for var in variations): |
|
|
mapped_section = standard_name |
|
|
break |
|
|
|
|
|
|
|
|
final_section_name = mapped_section if mapped_section else section_title_clean |
|
|
|
|
|
if section_content_clean: |
|
|
sections[final_section_name] = { |
|
|
"content": section_content_clean, |
|
|
"confidence": 0.8, |
|
|
"keywords": [section_title_clean] |
|
|
} |
|
|
|
|
|
|
|
|
if not sections: |
|
|
|
|
|
text_lines = transcription.split('\n') |
|
|
current_section = None |
|
|
current_content = [] |
|
|
|
|
|
for line in text_lines: |
|
|
line_stripped = line.strip() |
|
|
if not line_stripped: |
|
|
continue |
|
|
|
|
|
|
|
|
line_lower = line_stripped.lower() |
|
|
is_section_title = False |
|
|
|
|
|
for section_name, keywords in [ |
|
|
("technique", ["technique", "méthode", "protocole"]), |
|
|
("résultats", ["résultat", "observation", "constatation"]), |
|
|
("conclusion", ["conclusion", "diagnostic", "synthèse"]) |
|
|
]: |
|
|
if any(kw in line_lower for kw in keywords) and len(line_stripped) < 50: |
|
|
|
|
|
if current_section and current_content: |
|
|
sections[current_section] = { |
|
|
"content": '\n'.join(current_content), |
|
|
"confidence": 0.7, |
|
|
"keywords": [current_section] |
|
|
} |
|
|
|
|
|
current_section = section_name |
|
|
current_content = [] |
|
|
is_section_title = True |
|
|
break |
|
|
|
|
|
if not is_section_title and current_section: |
|
|
current_content.append(line_stripped) |
|
|
|
|
|
|
|
|
if current_section and current_content: |
|
|
sections[current_section] = { |
|
|
"content": '\n'.join(current_content), |
|
|
"confidence": 0.7, |
|
|
"keywords": [current_section] |
|
|
} |
|
|
|
|
|
analysis = { |
|
|
"document_type": detected_type, |
|
|
"identification": { |
|
|
"physician": "Non identifié", |
|
|
"center": "Non identifié", |
|
|
"service": "Non identifié" |
|
|
}, |
|
|
"sections": sections, |
|
|
"medical_data": { |
|
|
"procedures": ["IRM pelvienne", "T1 Dixon", "T2"], |
|
|
"measurements": re.findall(r'\d+\s*(?:mm|cm|ml)', transcription), |
|
|
"diagnoses": ["endométriome ovarien"], |
|
|
"treatments": [], |
|
|
"anatomical_regions": ["utérus", "ovaire", "pelvis"] |
|
|
}, |
|
|
"completeness": { |
|
|
"score": 0.8, |
|
|
"transcription_quality": "good" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if transcription_filename: |
|
|
filename_analysis = self.analyze_filename(transcription_filename) |
|
|
analysis["filename_analysis"] = { |
|
|
"medical_keywords": filename_analysis.medical_keywords, |
|
|
"document_type_indicators": filename_analysis.document_type_indicators, |
|
|
"specialty_indicators": filename_analysis.specialty_indicators, |
|
|
"anatomical_regions": filename_analysis.anatomical_regions, |
|
|
"procedure_type": filename_analysis.procedure_type |
|
|
} |
|
|
|
|
|
return analysis |
|
|
|
|
|
def calculate_filename_match_score(self, transcription_filename: str, transcription_analysis: Dict, |
|
|
template_filename: str) -> Tuple[float, List[str]]: |
|
|
"""Calcule le score de correspondance basé sur les noms de fichiers""" |
|
|
|
|
|
trans_filename_analysis = self.analyze_filename(transcription_filename) |
|
|
template_filename_analysis = self.analyze_filename(template_filename) |
|
|
|
|
|
score_components = [] |
|
|
matching_indicators = [] |
|
|
|
|
|
|
|
|
trans_types = set(trans_filename_analysis.document_type_indicators) |
|
|
template_types = set(template_filename_analysis.document_type_indicators) |
|
|
|
|
|
if trans_types & template_types: |
|
|
type_match_score = len(trans_types & template_types) / max(len(trans_types | template_types), 1) |
|
|
score_components.append(type_match_score * 0.4) |
|
|
matching_indicators.extend(list(trans_types & template_types)) |
|
|
|
|
|
|
|
|
trans_specialties = set(trans_filename_analysis.specialty_indicators) |
|
|
template_specialties = set(template_filename_analysis.specialty_indicators) |
|
|
|
|
|
if trans_specialties & template_specialties: |
|
|
specialty_match_score = len(trans_specialties & template_specialties) / max(len(trans_specialties | template_specialties), 1) |
|
|
score_components.append(specialty_match_score * 0.25) |
|
|
matching_indicators.extend(list(trans_specialties & template_specialties)) |
|
|
|
|
|
final_score = sum(score_components) if score_components else 0.0 |
|
|
return min(1.0, final_score), matching_indicators |
|
|
|
|
|
def calculate_basic_scores(self, transcription_analysis: Dict, template_info: TemplateInfo) -> Tuple[float, float, float]: |
|
|
"""Calcule les scores de base sans utiliser les fonctions problématiques""" |
|
|
|
|
|
|
|
|
transcription_type = transcription_analysis.get("document_type", "") |
|
|
template_type = template_info.type.lower() |
|
|
|
|
|
type_mappings = { |
|
|
"compte_rendu_imagerie": ["irm", "scanner", "échographie", "imagerie", "radiologie"], |
|
|
"rapport_biologique": ["laboratoire", "biologie", "analyse"], |
|
|
"lettre_medicale": ["lettre", "courrier", "correspondance"], |
|
|
"compte_rendu_consultation": ["consultation", "examen"] |
|
|
} |
|
|
|
|
|
type_score = 0.3 |
|
|
if transcription_type in type_mappings: |
|
|
expected_keywords = type_mappings[transcription_type] |
|
|
matches = sum(1 for kw in expected_keywords if kw in template_type) |
|
|
type_score = min(1.0, matches / len(expected_keywords) * 2) |
|
|
|
|
|
|
|
|
physician_score = 0.5 |
|
|
center_score = 0.5 |
|
|
|
|
|
return type_score, physician_score, center_score |
|
|
|
|
|
def calculate_simple_section_matches(self, transcription: str, transcription_analysis: Dict, template_info: TemplateInfo) -> Dict[str, SectionMatch]: |
|
|
"""Version améliorée du matching de sections""" |
|
|
section_matches = {} |
|
|
transcription_sections = transcription_analysis.get("sections", {}) |
|
|
|
|
|
|
|
|
section_mapping = { |
|
|
"technique": ["technique", "méthode", "protocole", "acquisition"], |
|
|
"résultats": ["résultat", "observation", "constatation", "description", "analyse"], |
|
|
"conclusion": ["conclusion", "diagnostic", "synthèse", "impression"], |
|
|
"indication": ["indication", "motif", "demande"], |
|
|
"histoire": ["histoire", "antécédent", "contexte", "clinique"], |
|
|
"examen": ["examen", "exploration", "investigation"] |
|
|
} |
|
|
|
|
|
for section_name in template_info.detected_sections: |
|
|
section_lower = section_name.lower() |
|
|
best_content = "" |
|
|
best_confidence = 0.0 |
|
|
|
|
|
|
|
|
for analyzed_section, section_data in transcription_sections.items(): |
|
|
if isinstance(section_data, dict): |
|
|
content = section_data.get("content", "") |
|
|
confidence = section_data.get("confidence", 0.0) |
|
|
|
|
|
|
|
|
if section_lower in analyzed_section.lower() or analyzed_section.lower() in section_lower: |
|
|
best_content = content |
|
|
best_confidence = confidence |
|
|
break |
|
|
|
|
|
|
|
|
if section_lower in section_mapping: |
|
|
expected_keywords = section_mapping[section_lower] |
|
|
if any(kw in analyzed_section.lower() for kw in expected_keywords): |
|
|
best_content = content |
|
|
best_confidence = confidence * 0.9 |
|
|
break |
|
|
|
|
|
|
|
|
if not best_content: |
|
|
|
|
|
markdown_patterns = [ |
|
|
rf"\*\*{section_lower}[:\s]*\*\*(.*?)(?=\*\*|\n\n|$)", |
|
|
rf"{section_lower}[:\s]+(.*?)(?=\n\*\*|\n\n|$)", |
|
|
rf"#{section_lower}[:\s]+(.*?)(?=\n#|\n\n|$)" |
|
|
] |
|
|
|
|
|
for pattern in markdown_patterns: |
|
|
matches = re.findall(pattern, transcription, re.IGNORECASE | re.DOTALL) |
|
|
if matches: |
|
|
best_content = matches[0].strip() |
|
|
best_confidence = 0.8 |
|
|
break |
|
|
|
|
|
|
|
|
if not best_content and section_lower in section_mapping: |
|
|
keywords = section_mapping[section_lower] |
|
|
for keyword in keywords: |
|
|
if keyword in transcription.lower(): |
|
|
|
|
|
start_pos = transcription.lower().find(keyword) |
|
|
start = max(0, start_pos - 50) |
|
|
end = min(len(transcription), start_pos + 400) |
|
|
best_content = transcription[start:end].strip() |
|
|
best_confidence = 0.6 |
|
|
break |
|
|
|
|
|
|
|
|
can_fill = bool(best_content) and len(best_content.strip()) > 20 |
|
|
missing_info = [] if can_fill else [f"Contenu manquant pour {section_name}"] |
|
|
|
|
|
section_matches[section_name] = SectionMatch( |
|
|
section_name=section_name, |
|
|
confidence=best_confidence, |
|
|
extracted_content=best_content, |
|
|
can_fill=can_fill, |
|
|
missing_info=missing_info |
|
|
) |
|
|
|
|
|
return section_matches |
|
|
|
|
|
def calculate_fillability_score(self, section_matches: Dict[str, SectionMatch], template_info: TemplateInfo) -> Tuple[float, float, List[str]]: |
|
|
"""Calcule le score de remplissage possible du template - version corrigée""" |
|
|
total_sections = len(template_info.detected_sections) |
|
|
fillable_sections = sum(1 for match in section_matches.values() if match.can_fill) |
|
|
|
|
|
if total_sections == 0: |
|
|
return 0.0, 0.0, ["Template sans sections"] |
|
|
|
|
|
|
|
|
fillability_score = fillable_sections / total_sections |
|
|
|
|
|
|
|
|
filling_percentage = (fillable_sections / total_sections) * 100 |
|
|
|
|
|
|
|
|
missing_critical = [ |
|
|
match.section_name for match in section_matches.values() |
|
|
if not match.can_fill |
|
|
] |
|
|
|
|
|
return fillability_score, filling_percentage, missing_critical |
|
|
|
|
|
def match_templates(self, transcription: str, transcription_filename: str = "", k: int = 3) -> List[TemplateMatch]: |
|
|
""" |
|
|
Fonction principale : effectue le matching et retourne les 3 meilleurs templates |
|
|
|
|
|
Args: |
|
|
transcription: Le contenu de la transcription médicale |
|
|
transcription_filename: Le nom du fichier de transcription |
|
|
k: Nombre de résultats à retourner (défaut: 3) |
|
|
|
|
|
Returns: |
|
|
List[TemplateMatch]: Les 3 templates avec les scores les plus élevés |
|
|
""" |
|
|
if not self.parser or not self.parser.templates: |
|
|
logging.error("Aucun template chargé") |
|
|
return [] |
|
|
|
|
|
logging.info(f"🔍 Début du matching pour: {transcription_filename}") |
|
|
logging.info(f"📄 Contenu de la transcription: {len(transcription.split())} mots") |
|
|
|
|
|
|
|
|
analysis = self.analyze_transcription_detailed(transcription, transcription_filename) |
|
|
logging.info(f"📊 Type de document détecté: {analysis.get('document_type')}") |
|
|
logging.info(f"🔧 Sections détectées: {list(analysis.get('sections', {}).keys())}") |
|
|
|
|
|
template_matches = [] |
|
|
|
|
|
for template_id, template_info in self.parser.templates.items(): |
|
|
try: |
|
|
|
|
|
type_score, physician_score, center_score = self.calculate_basic_scores(analysis, template_info) |
|
|
|
|
|
|
|
|
filename_score, filename_indicators = self.calculate_filename_match_score( |
|
|
transcription_filename, analysis, template_info.filepath |
|
|
) |
|
|
|
|
|
|
|
|
section_matches = self.calculate_simple_section_matches(transcription, analysis, template_info) |
|
|
|
|
|
|
|
|
fillability_score, filling_percentage, missing_critical = self.calculate_fillability_score(section_matches, template_info) |
|
|
|
|
|
|
|
|
content_score = 0.5 |
|
|
|
|
|
|
|
|
overall_score = ( |
|
|
type_score * 0.25 + |
|
|
fillability_score * 0.35 + |
|
|
filename_score * 0.25 + |
|
|
content_score * 0.1 + |
|
|
physician_score * 0.025 + |
|
|
center_score * 0.025 |
|
|
) |
|
|
|
|
|
|
|
|
if len([s for s in section_matches.values() if s.can_fill]) >= 2: |
|
|
overall_score += 0.1 |
|
|
|
|
|
confidence_level = "excellent" if overall_score > 0.7 else "good" if overall_score > 0.5 else "fair" if overall_score > 0.3 else "poor" |
|
|
|
|
|
|
|
|
extracted_data = {} |
|
|
for section_name, match in section_matches.items(): |
|
|
if match.can_fill and match.extracted_content.strip(): |
|
|
extracted_data[section_name] = match.extracted_content |
|
|
|
|
|
|
|
|
can_be_filled = len(extracted_data) > 0 or fillability_score > 0.3 |
|
|
|
|
|
template_match = TemplateMatch( |
|
|
template_id=template_id, |
|
|
template_info=template_info, |
|
|
overall_score=overall_score, |
|
|
type_match_score=type_score, |
|
|
physician_match_score=physician_score, |
|
|
center_match_score=center_score, |
|
|
content_match_score=content_score, |
|
|
filename_match_score=filename_score, |
|
|
fillability_score=fillability_score, |
|
|
section_matches=section_matches, |
|
|
confidence_level=confidence_level, |
|
|
can_be_filled=can_be_filled, |
|
|
filling_percentage=filling_percentage, |
|
|
missing_critical_info=missing_critical, |
|
|
extracted_data=extracted_data, |
|
|
filename_indicators=filename_indicators |
|
|
) |
|
|
|
|
|
template_matches.append(template_match) |
|
|
|
|
|
except Exception as e: |
|
|
logging.warning(f"Erreur lors de l'analyse du template {template_id}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
template_matches.sort(key=lambda x: x.overall_score, reverse=True) |
|
|
top_matches = template_matches[:k] |
|
|
|
|
|
|
|
|
logging.info(f"✅ Matching terminé - {len(top_matches)} templates sélectionnés") |
|
|
for i, match in enumerate(top_matches, 1): |
|
|
logging.info(f"🏆 Template #{i}: {match.template_id}") |
|
|
logging.info(f" 📊 Score global: {match.overall_score:.3f}") |
|
|
logging.info(f" 📋 Sections remplissables: {len(match.extracted_data)}") |
|
|
logging.info(f" 🎯 Niveau de confiance: {match.confidence_level}") |
|
|
logging.info(f" 📁 Template: {os.path.basename(match.template_info.filepath)}") |
|
|
|
|
|
return top_matches |
|
|
|
|
|
def print_matching_results(self, matches: List[TemplateMatch]): |
|
|
"""Affiche les résultats de matching de façon détaillée""" |
|
|
if not matches: |
|
|
print("❌ Aucun résultat trouvé") |
|
|
return |
|
|
|
|
|
print(f"\n{'='*80}") |
|
|
print(f"🎯 RÉSULTATS DE MATCHING - Top {len(matches)} templates") |
|
|
print(f"{'='*80}") |
|
|
|
|
|
for i, match in enumerate(matches, 1): |
|
|
print(f"\n🏆 TEMPLATE #{i}") |
|
|
print(f" 🆔 ID: {match.template_id}") |
|
|
print(f" 📊 Score global: {match.overall_score:.3f}") |
|
|
print(f" 📁 Fichier: {os.path.basename(match.template_info.filepath)}") |
|
|
print(f" 👨⚕️ Médecin: {match.template_info.medecin}") |
|
|
print(f" 🏥 Centre: {getattr(match.template_info, 'centre_medical', 'Non spécifié')}") |
|
|
print(f" 📝 Type: {match.template_info.type}") |
|
|
print(f" 🔧 Remplissage possible: {match.filling_percentage:.1f}%") |
|
|
print(f" 🎯 Niveau de confiance: {match.confidence_level}") |
|
|
|
|
|
print(f" 📈 Détail des scores:") |
|
|
print(f" - Type: {match.type_match_score:.3f}") |
|
|
print(f" - Remplissabilité: {match.fillability_score:.3f}") |
|
|
print(f" - Nom de fichier: {match.filename_match_score:.3f}") |
|
|
print(f" - Contenu: {match.content_match_score:.3f}") |
|
|
|
|
|
if match.filename_indicators: |
|
|
print(f" 🏷️ Indicateurs fichier: {', '.join(match.filename_indicators)}") |
|
|
|
|
|
if match.extracted_data: |
|
|
print(f" 📋 Sections extraites ({len(match.extracted_data)}):") |
|
|
for section_name, content in match.extracted_data.items(): |
|
|
preview = content[:100] + "..." if len(content) > 100 else content |
|
|
print(f" • {section_name}: {preview}") |
|
|
|
|
|
if match.missing_critical_info: |
|
|
print(f" ⚠️ Sections manquantes: {', '.join(match.missing_critical_info)}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Fonction principale pour tester le matching""" |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
|
|
|
|
|
|
transcription_filename = "default.73.931915433.rtf_3650535_radiologie.doc" |
|
|
transcription_content = """**Technique :** 3 plans T2, diffusion axiale, T2 grand champ et T1 Dixon. |
|
|
**Résultats :** |
|
|
* L'utérus est antéversé, antéfléchi, latéralisé à droite, de taille normale pour l'âge. |
|
|
* L'endomètre est fin, mesurant moins de 2 mm. |
|
|
* Pas d'adénomyose franche. |
|
|
* Aspect normal du col utérin et du vagin. |
|
|
* L'ovaire droit, en position postérieure, mesure 18 x 11 mm avec présence de 4 follicules. |
|
|
* L'ovaire gauche, en position latéro-utérine, présente un volumineux endométriome de 45 mm, typique en hypersignal T1 Dixon. |
|
|
* Deuxième endométriome accolé à l'ovaire droit, périphérique, mesurant 13 mm. |
|
|
* Pas d'épaississement marqué du torus ni des ligaments utéro-sacrés. |
|
|
* Pas d'autre localisation pelvienne. |
|
|
* Pas d'épanchement pelvien. |
|
|
* Pas d'anomalie de la vessie. |
|
|
* Pas d'adénomégalie pelvienne, pas de dilatation des uretères. |
|
|
**Conclusion :** |
|
|
* Endométriome ovarien droit périphérique de 13 mm. |
|
|
* Endométriome ovarien gauche centro-ovarien de 45 mm.""" |
|
|
|
|
|
|
|
|
db_path = DB_PATH |
|
|
if not os.path.exists(db_path): |
|
|
print(f"❌ Base de données non trouvée: {db_path}") |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
matcher = TemplateMatcher(db_path) |
|
|
|
|
|
|
|
|
matches = matcher.match_templates(transcription_content, transcription_filename, k=3) |
|
|
|
|
|
|
|
|
matcher.print_matching_results(matches) |
|
|
|
|
|
|
|
|
return matches |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"❌ Erreur: {e}") |
|
|
return [] |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |