|
|
import os |
|
|
import json |
|
|
import logging |
|
|
import numpy as np |
|
|
from typing import Dict, List, Optional, Tuple, Set |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
import pickle |
|
|
import re |
|
|
from sentence_transformers import SentenceTransformer |
|
|
import faiss |
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain.prompts import ChatPromptTemplate |
|
|
|
|
|
|
|
|
from template_db_creation import MedicalTemplateParser, TemplateInfo |
|
|
|
|
|
@dataclass |
|
|
class SectionMatch: |
|
|
"""Représente le matching d'une section""" |
|
|
section_name: str |
|
|
confidence: float |
|
|
extracted_content: str |
|
|
can_fill: bool |
|
|
missing_info: List[str] |
|
|
|
|
|
@dataclass |
|
|
class TemplateMatch: |
|
|
"""Résultat détaillé du matching d'un template""" |
|
|
template_id: str |
|
|
template_info: TemplateInfo |
|
|
overall_score: float |
|
|
type_match_score: float |
|
|
physician_match_score: float |
|
|
center_match_score: float |
|
|
content_match_score: float |
|
|
filename_match_score: float |
|
|
fillability_score: float |
|
|
section_matches: Dict[str, SectionMatch] |
|
|
confidence_level: str |
|
|
can_be_filled: bool |
|
|
filling_percentage: float |
|
|
missing_critical_info: List[str] |
|
|
extracted_data: Dict[str, str] |
|
|
filename_indicators: List[str] |
|
|
|
|
|
@dataclass |
|
|
class FilenameAnalysis: |
|
|
"""Analyse d'un nom de fichier médical""" |
|
|
original_filename: str |
|
|
medical_keywords: List[str] |
|
|
document_type_indicators: List[str] |
|
|
specialty_indicators: List[str] |
|
|
center_indicators: List[str] |
|
|
anatomical_regions: List[str] |
|
|
procedure_type: Optional[str] |
|
|
confidence_score: float |
|
|
|
|
|
class SmartTranscriptionMatcher: |
|
|
"""Système intelligent de matching entre transcriptions et templates médicaux""" |
|
|
|
|
|
def __init__(self, database_path: str = None): |
|
|
"""Initialise le matcher avec une base de données existante""" |
|
|
self.parser = MedicalTemplateParser() |
|
|
self.llm = None |
|
|
self.content_analyzer = None |
|
|
self.section_extractor = None |
|
|
self.filename_analyzer = None |
|
|
self._initialize_gpt() |
|
|
self._initialize_filename_keywords() |
|
|
|
|
|
if database_path and os.path.exists(database_path): |
|
|
self.load_database(database_path) |
|
|
else: |
|
|
logging.warning("Base de données non trouvée ou non spécifiée") |
|
|
|
|
|
def _initialize_filename_keywords(self): |
|
|
"""Initialise les mots-clés pour l'analyse des noms de fichiers""" |
|
|
self.filename_keywords = { |
|
|
|
|
|
"imagerie": { |
|
|
"irm": ["irm", "mri", "resonance"], |
|
|
"scanner": ["scanner", "tdm", "ct", "tomodensitometrie"], |
|
|
"echographie": ["echo", "echographie", "doppler", "ultrasound"], |
|
|
"radiologie": ["radio", "radiologie", "rx", "xray"], |
|
|
"pet": ["pet", "tep", "scintigraphie"], |
|
|
"mammographie": ["mammo", "mammographie", "breast"] |
|
|
}, |
|
|
|
|
|
|
|
|
"specialites": { |
|
|
"cardiologie": ["cardio", "coeur", "heart", "ecg", "holter"], |
|
|
"neurologie": ["neuro", "brain", "cerveau", "eeg"], |
|
|
"orthopedic": ["ortho", "os", "bone", "fracture"], |
|
|
"gynecologie": ["gyneco", "utérus", "ovaire", "pelvien"], |
|
|
"urologie": ["uro", "vessie", "rein", "prostate"], |
|
|
"pneumologie": ["pneumo", "poumon", "thorax", "resp"], |
|
|
"gastro": ["gastro", "abdomen", "foie", "intestin"] |
|
|
}, |
|
|
|
|
|
|
|
|
"anatomie": { |
|
|
"tete": ["tete", "crane", "cerebral", "encephale"], |
|
|
"thorax": ["thorax", "poumon", "coeur", "mediastin"], |
|
|
"abdomen": ["abdomen", "foie", "rate", "pancreas"], |
|
|
"pelvis": ["pelvis", "pelvien", "utérus", "ovaire", "vessie"], |
|
|
"membres": ["membre", "bras", "jambe", "genou", "epaule"], |
|
|
"rachis": ["rachis", "colonne", "vertebral", "lombaire"] |
|
|
}, |
|
|
|
|
|
|
|
|
"procedures": { |
|
|
"arteriel": ["arteriel", "artere", "vasculaire"], |
|
|
"veineux": ["veineux", "veine", "phlebo"], |
|
|
"fonctionnel": ["fonctionnel", "dynamique", "stress"], |
|
|
"contraste": ["contraste", "injection", "gadolinium"] |
|
|
}, |
|
|
|
|
|
|
|
|
"centres": { |
|
|
"roseraie": ["roseraie", "rose"], |
|
|
"4villes": ["4villes", "quatre"], |
|
|
"mstruk": ["mstruk", "struktur"], |
|
|
"radioroseraie": ["radioroseraie"] |
|
|
} |
|
|
} |
|
|
|
|
|
def _initialize_gpt(self): |
|
|
"""Initialise GPT pour l'analyse de contenu""" |
|
|
api_key = os.getenv('OPENAI_API_KEY') |
|
|
if not api_key: |
|
|
logging.warning("OPENAI_API_KEY non définie. L'analyse GPT ne sera pas disponible.") |
|
|
return |
|
|
|
|
|
try: |
|
|
self.llm = ChatOpenAI( |
|
|
model="gpt-4o", |
|
|
temperature=0, |
|
|
max_tokens=4000, |
|
|
api_key=api_key |
|
|
) |
|
|
|
|
|
|
|
|
content_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", """Vous êtes un expert en analyse de transcriptions médicales. Analysez la transcription fournie et retournez UNIQUEMENT un JSON valide. |
|
|
|
|
|
Votre tâche est de : |
|
|
|
|
|
1. **Identifier le type de document précis** : |
|
|
- "compte_rendu_imagerie" : IRM, scanner, échographie, radiologie |
|
|
- "rapport_biologique" : analyses de laboratoire, résultats biologiques |
|
|
- "lettre_medicale" : correspondance entre médecins, lettres de sortie |
|
|
- "compte_rendu_consultation" : consultation médicale, examen clinique |
|
|
- "rapport_operatoire" : comptes-rendus d'intervention chirurgicale |
|
|
- "autre" : si aucun type ne correspond clairement |
|
|
|
|
|
2. **Extraire les informations d'identification** : |
|
|
- Médecin/praticien (nom complet si trouvé) |
|
|
- Centre médical/hôpital/clinique |
|
|
- Service médical |
|
|
- Adresse et contacts si mentionnés |
|
|
|
|
|
3. **Décomposer en sections structurées** : |
|
|
- Identifier toutes les sections présentes (Technique, Résultats, Conclusion, etc.) |
|
|
- Extraire le contenu complet de chaque section |
|
|
- Identifier les sections manquantes mais attendues pour ce type de document |
|
|
|
|
|
4. **Extraire les données médicales spécifiques** : |
|
|
- Examens/procédures réalisés |
|
|
- Mesures et valeurs numériques |
|
|
- Diagnostics et observations |
|
|
- Traitements ou recommandations |
|
|
- Dates et références |
|
|
|
|
|
5. **Évaluer la complétude** : |
|
|
- Score de complétude (0-1) |
|
|
- Informations manquantes importantes |
|
|
- Qualité de la transcription |
|
|
|
|
|
Retournez un JSON avec cette structure exacte : |
|
|
{{ |
|
|
"document_type": "type identifié", |
|
|
"identification": {{ |
|
|
"physician": "nom complet du médecin ou 'Non identifié'", |
|
|
"center": "nom du centre médical ou 'Non identifié'", |
|
|
"service": "service médical ou 'Non identifié'", |
|
|
"address": "adresse complète si trouvée", |
|
|
"phone": "numéro de téléphone si trouvé" |
|
|
}}, |
|
|
"sections": {{ |
|
|
"nom_section": {{ |
|
|
"content": "contenu complet de la section", |
|
|
"confidence": 0.9, |
|
|
"keywords": ["mots", "clés", "identifiés"] |
|
|
}} |
|
|
}}, |
|
|
"medical_data": {{ |
|
|
"procedures": ["liste des procédures/examens"], |
|
|
"measurements": ["mesures avec valeurs numériques"], |
|
|
"diagnoses": ["diagnostics identifiés"], |
|
|
"treatments": ["traitements mentionnés"], |
|
|
"dates": ["dates importantes trouvées"], |
|
|
"anatomical_regions": ["régions anatomiques concernées"] |
|
|
}}, |
|
|
"completeness": {{ |
|
|
"score": 0.85, |
|
|
"missing_sections": ["sections manquantes attendues"], |
|
|
"missing_info": ["informations importantes manquantes"], |
|
|
"transcription_quality": "excellent|good|fair|poor" |
|
|
}}, |
|
|
"key_indicators": ["indicateurs clés pour le matching"] |
|
|
}}"""), |
|
|
("human", "Analysez cette transcription médicale :\n\n{transcription}") |
|
|
]) |
|
|
|
|
|
|
|
|
section_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", """Vous êtes un expert en extraction d'informations médicales. |
|
|
|
|
|
On vous donne : |
|
|
1. Une transcription médicale complète |
|
|
2. Le nom d'une section spécifique à remplir dans un template |
|
|
3. La description de ce qui est attendu dans cette section |
|
|
|
|
|
Votre tâche est d'extraire UNIQUEMENT le contenu pertinent de la transcription pour remplir cette section du template. |
|
|
|
|
|
Retournez UNIQUEMENT un JSON avec cette structure : |
|
|
{{ |
|
|
"extracted_content": "contenu extrait pertinent pour cette section", |
|
|
"confidence": 0.85, |
|
|
"can_fill": true/false, |
|
|
"missing_elements": ["éléments manquants pour compléter la section"], |
|
|
"source_indicators": ["mots/phrases de la transcription qui justifient l'extraction"] |
|
|
}} |
|
|
|
|
|
Si aucun contenu pertinent n'est trouvé, retournez can_fill: false."""), |
|
|
("human", """Transcription complète : |
|
|
{transcription} |
|
|
|
|
|
Section à remplir : {section_name} |
|
|
Description attendue : {section_description} |
|
|
|
|
|
Extrayez le contenu pertinent :""") |
|
|
]) |
|
|
|
|
|
|
|
|
filename_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", """Vous êtes un expert en analyse de noms de fichiers médicaux. Analysez le nom de fichier fourni et extrayez les informations médicales qu'il contient. |
|
|
|
|
|
Retournez UNIQUEMENT un JSON avec cette structure : |
|
|
{{ |
|
|
"medical_keywords": ["mots-clés médicaux identifiés"], |
|
|
"document_type_indicators": ["indicateurs du type de document"], |
|
|
"specialty_indicators": ["indicateurs de spécialité médicale"], |
|
|
"center_indicators": ["indicateurs de centre médical"], |
|
|
"anatomical_regions": ["régions anatomiques mentionnées"], |
|
|
"procedure_type": "type de procédure principal ou null", |
|
|
"confidence_score": 0.85 |
|
|
}} |
|
|
|
|
|
Exemples d'analyse : |
|
|
- "ECHOGRAPHIE" → document_type_indicators: ["echographie"] |
|
|
- "ECHODOPPLER" → procedure_type: "echo-doppler" |
|
|
- "ARTERIEL" → medical_keywords: ["arteriel"] |
|
|
- "MEMBRES.SUPERIEURS" → anatomical_regions: ["membres supérieurs"] |
|
|
- "radioroseraie" → center_indicators: ["roseraie"], specialty_indicators: ["radiologie"]"""), |
|
|
("human", "Analysez ce nom de fichier médical : {filename}") |
|
|
]) |
|
|
|
|
|
self.content_analyzer = content_prompt | self.llm |
|
|
self.section_extractor = section_prompt | self.llm |
|
|
self.filename_analyzer = filename_prompt | self.llm |
|
|
logging.info("✅ GPT initialisé pour l'analyse intelligente avec noms de fichiers") |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"❌ Erreur lors de l'initialisation GPT: {e}") |
|
|
self.llm = None |
|
|
|
|
|
def analyze_filename(self, filename: str) -> FilenameAnalysis: |
|
|
"""Analyse le nom de fichier pour extraire des informations médicales""" |
|
|
|
|
|
|
|
|
clean_filename = os.path.basename(filename) |
|
|
clean_filename = clean_filename.replace('.docx', '').replace('.doc', '').replace('.rtf', '') |
|
|
|
|
|
|
|
|
if self.filename_analyzer: |
|
|
try: |
|
|
response = self.filename_analyzer.invoke({"filename": clean_filename}) |
|
|
result = response.content.strip() |
|
|
|
|
|
if result.startswith("```json"): |
|
|
result = result[7:] |
|
|
if result.endswith("```"): |
|
|
result = result[:-3] |
|
|
|
|
|
gpt_analysis = json.loads(result) |
|
|
|
|
|
return FilenameAnalysis( |
|
|
original_filename=filename, |
|
|
medical_keywords=gpt_analysis.get("medical_keywords", []), |
|
|
document_type_indicators=gpt_analysis.get("document_type_indicators", []), |
|
|
specialty_indicators=gpt_analysis.get("specialty_indicators", []), |
|
|
center_indicators=gpt_analysis.get("center_indicators", []), |
|
|
anatomical_regions=gpt_analysis.get("anatomical_regions", []), |
|
|
procedure_type=gpt_analysis.get("procedure_type"), |
|
|
confidence_score=gpt_analysis.get("confidence_score", 0.0) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logging.warning(f"Erreur analyse GPT du nom de fichier: {e}") |
|
|
|
|
|
|
|
|
return self._analyze_filename_fallback(filename) |
|
|
|
|
|
def _analyze_filename_fallback(self, filename: str) -> FilenameAnalysis: |
|
|
"""Analyse de fallback pour les noms de fichiers sans GPT""" |
|
|
clean_filename = os.path.basename(filename).lower() |
|
|
clean_filename = clean_filename.replace('.docx', '').replace('.doc', '').replace('.rtf', '') |
|
|
|
|
|
medical_keywords = [] |
|
|
document_type_indicators = [] |
|
|
specialty_indicators = [] |
|
|
center_indicators = [] |
|
|
anatomical_regions = [] |
|
|
procedure_type = None |
|
|
|
|
|
|
|
|
for category, subcategories in self.filename_keywords.items(): |
|
|
for subcat, keywords in subcategories.items(): |
|
|
for keyword in keywords: |
|
|
if keyword in clean_filename: |
|
|
if category == "imagerie": |
|
|
document_type_indicators.append(subcat) |
|
|
if subcat in ["echographie", "irm", "scanner"]: |
|
|
procedure_type = subcat |
|
|
elif category == "specialites": |
|
|
specialty_indicators.append(subcat) |
|
|
elif category == "anatomie": |
|
|
anatomical_regions.append(subcat) |
|
|
elif category == "centres": |
|
|
center_indicators.append(subcat) |
|
|
medical_keywords.append(keyword) |
|
|
|
|
|
|
|
|
patterns = { |
|
|
"doppler": r"doppler|echo.*doppler", |
|
|
"arteriel": r"arteriel|artere", |
|
|
"veineux": r"veineux|veine", |
|
|
"membres_superieurs": r"membre.*superieur|bras", |
|
|
"membres_inferieurs": r"membre.*inferieur|jambe", |
|
|
"pelvien": r"pelvi|utérus|ovaire", |
|
|
"radiologie": r"radio" |
|
|
} |
|
|
|
|
|
for pattern_name, pattern in patterns.items(): |
|
|
if re.search(pattern, clean_filename): |
|
|
if pattern_name == "doppler": |
|
|
procedure_type = "echo-doppler" |
|
|
elif pattern_name in ["arteriel", "veineux"]: |
|
|
medical_keywords.append(pattern_name) |
|
|
elif "membre" in pattern_name: |
|
|
anatomical_regions.append(pattern_name.replace("_", " ")) |
|
|
elif pattern_name == "pelvien": |
|
|
anatomical_regions.append("pelvis") |
|
|
elif pattern_name == "radiologie": |
|
|
specialty_indicators.append("radiologie") |
|
|
|
|
|
|
|
|
total_elements = len(medical_keywords) + len(document_type_indicators) + len(specialty_indicators) |
|
|
confidence_score = min(1.0, total_elements / 5.0) |
|
|
|
|
|
return FilenameAnalysis( |
|
|
original_filename=filename, |
|
|
medical_keywords=medical_keywords, |
|
|
document_type_indicators=document_type_indicators, |
|
|
specialty_indicators=specialty_indicators, |
|
|
center_indicators=center_indicators, |
|
|
anatomical_regions=anatomical_regions, |
|
|
procedure_type=procedure_type, |
|
|
confidence_score=confidence_score |
|
|
) |
|
|
|
|
|
def calculate_filename_match_score(self, transcription_filename: str, transcription_analysis: Dict, |
|
|
template_filename: str) -> Tuple[float, List[str]]: |
|
|
"""Calcule le score de correspondance basé sur les noms de fichiers""" |
|
|
|
|
|
|
|
|
trans_filename_analysis = self.analyze_filename(transcription_filename) |
|
|
template_filename_analysis = self.analyze_filename(template_filename) |
|
|
|
|
|
score_components = [] |
|
|
matching_indicators = [] |
|
|
|
|
|
|
|
|
trans_types = set(trans_filename_analysis.document_type_indicators) |
|
|
template_types = set(template_filename_analysis.document_type_indicators) |
|
|
|
|
|
if trans_types & template_types: |
|
|
type_match_score = len(trans_types & template_types) / max(len(trans_types | template_types), 1) |
|
|
score_components.append(type_match_score * 0.4) |
|
|
matching_indicators.extend(list(trans_types & template_types)) |
|
|
|
|
|
|
|
|
trans_specialties = set(trans_filename_analysis.specialty_indicators) |
|
|
template_specialties = set(template_filename_analysis.specialty_indicators) |
|
|
|
|
|
if trans_specialties & template_specialties: |
|
|
specialty_match_score = len(trans_specialties & template_specialties) / max(len(trans_specialties | template_specialties), 1) |
|
|
score_components.append(specialty_match_score * 0.25) |
|
|
matching_indicators.extend(list(trans_specialties & template_specialties)) |
|
|
|
|
|
|
|
|
trans_anatomy = set(trans_filename_analysis.anatomical_regions) |
|
|
template_anatomy = set(template_filename_analysis.anatomical_regions) |
|
|
|
|
|
if trans_anatomy & template_anatomy: |
|
|
anatomy_match_score = len(trans_anatomy & template_anatomy) / max(len(trans_anatomy | template_anatomy), 1) |
|
|
score_components.append(anatomy_match_score * 0.2) |
|
|
matching_indicators.extend(list(trans_anatomy & template_anatomy)) |
|
|
|
|
|
|
|
|
trans_centers = set(trans_filename_analysis.center_indicators) |
|
|
template_centers = set(template_filename_analysis.center_indicators) |
|
|
|
|
|
if trans_centers & template_centers: |
|
|
center_match_score = len(trans_centers & template_centers) / max(len(trans_centers | template_centers), 1) |
|
|
score_components.append(center_match_score * 0.1) |
|
|
matching_indicators.extend(list(trans_centers & template_centers)) |
|
|
|
|
|
|
|
|
if (trans_filename_analysis.procedure_type and |
|
|
template_filename_analysis.procedure_type and |
|
|
trans_filename_analysis.procedure_type == template_filename_analysis.procedure_type): |
|
|
score_components.append(0.05) |
|
|
matching_indicators.append(f"procédure: {trans_filename_analysis.procedure_type}") |
|
|
|
|
|
|
|
|
trans_keywords = set(trans_filename_analysis.medical_keywords) |
|
|
template_keywords = set(template_filename_analysis.medical_keywords) |
|
|
|
|
|
common_keywords = trans_keywords & template_keywords |
|
|
if common_keywords: |
|
|
keyword_bonus = min(0.1, len(common_keywords) * 0.02) |
|
|
score_components.append(keyword_bonus) |
|
|
matching_indicators.extend(list(common_keywords)) |
|
|
|
|
|
|
|
|
final_score = sum(score_components) |
|
|
|
|
|
|
|
|
if ("radiologie" in transcription_filename.lower() and |
|
|
any("radio" in indicator for indicator in matching_indicators)): |
|
|
final_score += 0.05 |
|
|
matching_indicators.append("cohérence radiologie") |
|
|
|
|
|
return min(1.0, final_score), matching_indicators |
|
|
|
|
|
def load_database(self, filepath: str): |
|
|
"""Charge la base de données vectorielle""" |
|
|
self.parser.load_database(filepath) |
|
|
logging.info(f"✅ Base de données chargée: {len(self.parser.templates)} templates") |
|
|
|
|
|
def analyze_transcription_detailed(self, transcription: str, transcription_filename: str = "") -> Dict: |
|
|
"""Analyse détaillée d'une transcription avec GPT, en incluant le nom de fichier""" |
|
|
if not self.content_analyzer: |
|
|
return self._fallback_analysis(transcription, transcription_filename) |
|
|
|
|
|
try: |
|
|
logging.info("🔍 Analyse détaillée de la transcription...") |
|
|
|
|
|
|
|
|
enhanced_transcription = transcription |
|
|
if transcription_filename: |
|
|
enhanced_transcription = f"Nom de fichier: {transcription_filename}\n\nContenu:\n{transcription}" |
|
|
|
|
|
response = self.content_analyzer.invoke({"transcription": enhanced_transcription}) |
|
|
result = response.content.strip() |
|
|
|
|
|
|
|
|
if result.startswith("```json"): |
|
|
result = result[7:] |
|
|
if result.endswith("```"): |
|
|
result = result[:-3] |
|
|
result = result.strip() |
|
|
|
|
|
analysis = json.loads(result) |
|
|
|
|
|
|
|
|
if transcription_filename: |
|
|
filename_analysis = self.analyze_filename(transcription_filename) |
|
|
analysis["filename_analysis"] = { |
|
|
"medical_keywords": filename_analysis.medical_keywords, |
|
|
"document_type_indicators": filename_analysis.document_type_indicators, |
|
|
"specialty_indicators": filename_analysis.specialty_indicators, |
|
|
"anatomical_regions": filename_analysis.anatomical_regions, |
|
|
"procedure_type": filename_analysis.procedure_type |
|
|
} |
|
|
|
|
|
logging.info("✅ Analyse détaillée terminée") |
|
|
return analysis |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"❌ Erreur analyse détaillée: {e}") |
|
|
return self._fallback_analysis(transcription, transcription_filename) |
|
|
|
|
|
def _fallback_analysis(self, transcription: str, transcription_filename: str = "") -> Dict: |
|
|
"""Analyse de fallback sans GPT""" |
|
|
text_lower = transcription.lower() |
|
|
|
|
|
|
|
|
document_types = { |
|
|
"compte_rendu_imagerie": ["irm", "scanner", "échographie", "radiologie", "t1", "t2", "doppler"], |
|
|
"rapport_biologique": ["laboratoire", "analyse", "biologie", "sang", "urine", "sérum"], |
|
|
"lettre_medicale": ["lettre", "courrier", "correspondance", "cher confrère"], |
|
|
"compte_rendu_consultation": ["consultation", "examen clinique", "patient", "antécédents"] |
|
|
} |
|
|
|
|
|
detected_type = "autre" |
|
|
|
|
|
|
|
|
if transcription_filename: |
|
|
filename_lower = transcription_filename.lower() |
|
|
for doc_type, keywords in document_types.items(): |
|
|
if sum(1 for kw in keywords if kw in filename_lower) >= 1: |
|
|
detected_type = doc_type |
|
|
break |
|
|
|
|
|
|
|
|
if detected_type == "autre": |
|
|
for doc_type, keywords in document_types.items(): |
|
|
if sum(1 for kw in keywords if kw in text_lower) >= 2: |
|
|
detected_type = doc_type |
|
|
break |
|
|
|
|
|
|
|
|
sections = {} |
|
|
section_patterns = { |
|
|
"technique": ["technique", "méthode", "protocole"], |
|
|
"résultats": ["résultat", "observation", "constatation"], |
|
|
"conclusion": ["conclusion", "diagnostic", "synthèse"] |
|
|
} |
|
|
|
|
|
for section, keywords in section_patterns.items(): |
|
|
for keyword in keywords: |
|
|
if keyword in text_lower: |
|
|
start = text_lower.find(keyword) |
|
|
end = min(len(transcription), start + 500) |
|
|
content = transcription[start:end] |
|
|
sections[section] = { |
|
|
"content": content, |
|
|
"confidence": 0.6, |
|
|
"keywords": [keyword] |
|
|
} |
|
|
break |
|
|
|
|
|
analysis = { |
|
|
"document_type": detected_type, |
|
|
"identification": { |
|
|
"physician": "Non identifié", |
|
|
"center": "Non identifié", |
|
|
"service": "Non identifié" |
|
|
}, |
|
|
"sections": sections, |
|
|
"medical_data": { |
|
|
"procedures": [], |
|
|
"measurements": re.findall(r'\d+\s*(?:mm|cm|ml)', transcription), |
|
|
"diagnoses": [], |
|
|
"treatments": [] |
|
|
}, |
|
|
"completeness": { |
|
|
"score": 0.6, |
|
|
"transcription_quality": "fair" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if transcription_filename: |
|
|
filename_analysis = self.analyze_filename(transcription_filename) |
|
|
analysis["filename_analysis"] = { |
|
|
"medical_keywords": filename_analysis.medical_keywords, |
|
|
"document_type_indicators": filename_analysis.document_type_indicators, |
|
|
"specialty_indicators": filename_analysis.specialty_indicators, |
|
|
"anatomical_regions": filename_analysis.anatomical_regions, |
|
|
"procedure_type": filename_analysis.procedure_type |
|
|
} |
|
|
|
|
|
return analysis |
|
|
|
|
|
def calculate_type_match_score(self, transcription_analysis: Dict, template_info: TemplateInfo) -> float: |
|
|
"""Calcule le score de correspondance du type de document""" |
|
|
transcription_type = transcription_analysis.get("document_type", "") |
|
|
template_type = template_info.type.lower() |
|
|
|
|
|
|
|
|
type_mappings = { |
|
|
"compte_rendu_imagerie": ["irm", "scanner", "échographie", "imagerie", "radiologie"], |
|
|
"rapport_biologique": ["laboratoire", "biologie", "analyse"], |
|
|
"lettre_medicale": ["lettre", "courrier", "correspondance"], |
|
|
"compte_rendu_consultation": ["consultation", "examen"] |
|
|
} |
|
|
|
|
|
if transcription_type in type_mappings: |
|
|
expected_keywords = type_mappings[transcription_type] |
|
|
matches = sum(1 for kw in expected_keywords if kw in template_type) |
|
|
return min(1.0, matches / len(expected_keywords) * 2) |
|
|
|
|
|
return 0.3 |
|
|
|
|
|
def calculate_physician_match_score(self, transcription_analysis: Dict, template_info: TemplateInfo) -> float: |
|
|
"""Calcule le score de correspondance du médecin""" |
|
|
transcription_physician = transcription_analysis.get("identification", {}).get("physician", "") |
|
|
template_physician = template_info.medecin |
|
|
|
|
|
if not transcription_physician or transcription_physician == "Non identifié": |
|
|
return 0.5 |
|
|
|
|
|
if not template_physician: |
|
|
return 0.5 |
|
|
|
|
|
|
|
|
trans_words = set(transcription_physician.lower().split()) |
|
|
temp_words = set(template_physician.lower().split()) |
|
|
|
|
|
if trans_words & temp_words: |
|
|
return 1.0 |
|
|
|
|
|
return 0.0 |
|
|
|
|
|
def calculate_center_match_score(self, transcription_analysis: Dict, template_info: TemplateInfo) -> float: |
|
|
"""Calcule le score de correspondance du centre médical""" |
|
|
transcription_center = transcription_analysis.get("identification", {}).get("center", "") |
|
|
template_center = getattr(template_info, 'centre_medical', '') or getattr(template_info, 'center', '') |
|
|
|
|
|
if not transcription_center or transcription_center == "Non identifié": |
|
|
return 0.5 |
|
|
|
|
|
if not template_center: |
|
|
return 0.5 |
|
|
|
|
|
|
|
|
if transcription_center.lower() in template_center.lower() or template_center.lower() in transcription_center.lower(): |
|
|
return 1.0 |
|
|
|
|
|
return 0.0 |
|
|
|
|
|
def calculate_section_matches(self, transcription: str, transcription_analysis: Dict, template_info: TemplateInfo) -> Dict[str, SectionMatch]: |
|
|
"""Calcule les correspondances pour chaque section du template""" |
|
|
section_matches = {} |
|
|
transcription_sections = transcription_analysis.get("sections", {}) |
|
|
|
|
|
for section_name in template_info.detected_sections: |
|
|
section_match = self._match_single_section( |
|
|
section_name, |
|
|
transcription, |
|
|
transcription_sections, |
|
|
template_info |
|
|
) |
|
|
section_matches[section_name] = section_match |
|
|
|
|
|
return section_matches |
|
|
|
|
|
def _match_single_section(self, section_name: str, transcription: str, |
|
|
transcription_sections: Dict, template_info: TemplateInfo) -> SectionMatch: |
|
|
"""Analyse le matching d'une section spécifique""" |
|
|
section_lower = section_name.lower() |
|
|
|
|
|
|
|
|
best_match_content = "" |
|
|
best_confidence = 0.0 |
|
|
|
|
|
for analyzed_section, section_data in transcription_sections.items(): |
|
|
if isinstance(section_data, dict): |
|
|
content = section_data.get("content", "") |
|
|
confidence = section_data.get("confidence", 0.0) |
|
|
|
|
|
|
|
|
section_keywords = section_lower.split() |
|
|
analyzed_keywords = analyzed_section.lower().split() |
|
|
|
|
|
keyword_match = len(set(section_keywords) & set(analyzed_keywords)) / max(len(section_keywords), 1) |
|
|
|
|
|
if keyword_match > 0.3 and confidence > best_confidence: |
|
|
best_match_content = content |
|
|
best_confidence = confidence * keyword_match |
|
|
|
|
|
|
|
|
if self.section_extractor and not best_match_content: |
|
|
try: |
|
|
section_description = f"Section {section_name} d'un document médical" |
|
|
response = self.section_extractor.invoke({ |
|
|
"transcription": transcription, |
|
|
"section_name": section_name, |
|
|
"section_description": section_description |
|
|
}) |
|
|
|
|
|
result = response.content.strip() |
|
|
if result.startswith("```json"): |
|
|
result = result[7:] |
|
|
if result.endswith("```"): |
|
|
result = result[:-3] |
|
|
|
|
|
extraction_result = json.loads(result) |
|
|
if extraction_result.get("can_fill", False): |
|
|
best_match_content = extraction_result.get("extracted_content", "") |
|
|
best_confidence = extraction_result.get("confidence", 0.0) |
|
|
|
|
|
except Exception as e: |
|
|
logging.warning(f"Erreur extraction section {section_name}: {e}") |
|
|
|
|
|
|
|
|
can_fill = bool(best_match_content) and len(best_match_content.strip()) > 10 |
|
|
missing_info = [] if can_fill else [f"Contenu manquant pour {section_name}"] |
|
|
|
|
|
return SectionMatch( |
|
|
section_name=section_name, |
|
|
confidence=best_confidence, |
|
|
extracted_content=best_match_content, |
|
|
can_fill=can_fill, |
|
|
missing_info=missing_info |
|
|
) |
|
|
|
|
|
def calculate_fillability_score(self, section_matches: Dict[str, SectionMatch], template_info: TemplateInfo) -> Tuple[float, float, List[str]]: |
|
|
"""Calcule le score de remplissage possible du template""" |
|
|
total_sections = len(template_info.detected_sections) |
|
|
fillable_sections = sum(1 for match in section_matches.values() if match.can_fill) |
|
|
critical_sections = sum(1 for match in section_matches.values() if match.can_fill and match.confidence > 0.7) |
|
|
|
|
|
if total_sections == 0: |
|
|
return 0.0, 0.0, ["Template sans sections"] |
|
|
|
|
|
fillability_score = fillable_sections / total_sections |
|
|
filling_percentage = (critical_sections / total_sections) * 100 |
|
|
|
|
|
missing_critical = [ |
|
|
match.section_name for match in section_matches.values() |
|
|
if not match.can_fill |
|
|
] |
|
|
|
|
|
return fillability_score, filling_percentage, missing_critical |
|
|
|
|
|
def smart_match_transcription(self, transcription: str, transcription_filename: str = "", k: int = 10) -> List[TemplateMatch]: |
|
|
"""Matching intelligent entre transcription et templates avec analyse des noms de fichiers""" |
|
|
if not self.parser.templates: |
|
|
logging.error("Aucun template chargé") |
|
|
return [] |
|
|
|
|
|
logging.info("Analyse intelligente de la transcription...") |
|
|
|
|
|
|
|
|
analysis = self.analyze_transcription_detailed(transcription, transcription_filename) |
|
|
|
|
|
|
|
|
candidate_templates = self._filter_templates_by_type_and_filename(analysis, transcription_filename) |
|
|
|
|
|
if not candidate_templates: |
|
|
logging.warning("Aucun template candidat trouvé, utilisation de tous les templates") |
|
|
candidate_templates = list(self.parser.templates.keys()) |
|
|
|
|
|
logging.info(f"{len(candidate_templates)} templates candidats retenus") |
|
|
|
|
|
|
|
|
template_matches = [] |
|
|
|
|
|
for template_id in candidate_templates: |
|
|
template_info = self.parser.get_template_info(template_id) |
|
|
if not template_info: |
|
|
continue |
|
|
|
|
|
|
|
|
type_score = self.calculate_type_match_score(analysis, template_info) |
|
|
physician_score = self.calculate_physician_match_score(analysis, template_info) |
|
|
center_score = self.calculate_center_match_score(analysis, template_info) |
|
|
|
|
|
|
|
|
filename_score, filename_indicators = self.calculate_filename_match_score( |
|
|
transcription_filename, analysis, template_info.filepath |
|
|
) |
|
|
|
|
|
|
|
|
section_matches = self.calculate_section_matches(transcription, analysis, template_info) |
|
|
|
|
|
|
|
|
fillability_score, filling_percentage, missing_critical = self.calculate_fillability_score(section_matches, template_info) |
|
|
|
|
|
|
|
|
content_score = self._calculate_content_similarity(transcription, template_id) |
|
|
|
|
|
|
|
|
overall_score = ( |
|
|
type_score * 0.25 + |
|
|
fillability_score * 0.3 + |
|
|
filename_score * 0.2 + |
|
|
content_score * 0.15 + |
|
|
physician_score * 0.05 + |
|
|
center_score * 0.05 |
|
|
) |
|
|
|
|
|
|
|
|
confidence_level = self._determine_confidence_level(overall_score, fillability_score, analysis) |
|
|
|
|
|
|
|
|
extracted_data = self._extract_template_data(section_matches) |
|
|
|
|
|
template_match = TemplateMatch( |
|
|
template_id=template_id, |
|
|
template_info=template_info, |
|
|
overall_score=overall_score, |
|
|
type_match_score=type_score, |
|
|
physician_match_score=physician_score, |
|
|
center_match_score=center_score, |
|
|
content_match_score=content_score, |
|
|
filename_match_score=filename_score, |
|
|
fillability_score=fillability_score, |
|
|
section_matches=section_matches, |
|
|
confidence_level=confidence_level, |
|
|
can_be_filled=fillability_score > 0.6, |
|
|
filling_percentage=filling_percentage, |
|
|
missing_critical_info=missing_critical, |
|
|
extracted_data=extracted_data, |
|
|
filename_indicators=filename_indicators |
|
|
) |
|
|
|
|
|
template_matches.append(template_match) |
|
|
|
|
|
|
|
|
template_matches.sort(key=lambda x: x.overall_score, reverse=True) |
|
|
|
|
|
logging.info(f"{len(template_matches)} templates analysés") |
|
|
return template_matches[:k] |
|
|
|
|
|
def _filter_templates_by_type_and_filename(self, analysis: Dict, transcription_filename: str) -> List[str]: |
|
|
"""Filtre les templates par type de document et nom de fichier""" |
|
|
document_type = analysis.get("document_type", "") |
|
|
filename_analysis = analysis.get("filename_analysis", {}) |
|
|
|
|
|
|
|
|
filter_keywords = set() |
|
|
|
|
|
|
|
|
if document_type != "autre": |
|
|
type_keywords = { |
|
|
"compte_rendu_imagerie": ["irm", "scanner", "echo", "radio", "imagerie"], |
|
|
"rapport_biologique": ["labo", "biologie", "analyse", "sang"], |
|
|
"lettre_medicale": ["lettre", "courrier"], |
|
|
"compte_rendu_consultation": ["consultation", "examen", "clinique"] |
|
|
} |
|
|
filter_keywords.update(type_keywords.get(document_type, [])) |
|
|
|
|
|
|
|
|
if filename_analysis: |
|
|
filter_keywords.update(filename_analysis.get("medical_keywords", [])) |
|
|
filter_keywords.update(filename_analysis.get("document_type_indicators", [])) |
|
|
filter_keywords.update(filename_analysis.get("specialty_indicators", [])) |
|
|
|
|
|
|
|
|
if not filter_keywords: |
|
|
return list(self.parser.templates.keys()) |
|
|
|
|
|
|
|
|
matching_templates = [] |
|
|
|
|
|
for template_id, template_info in self.parser.templates.items(): |
|
|
template_filepath_lower = template_info.filepath.lower() |
|
|
template_type_lower = template_info.type.lower() |
|
|
|
|
|
|
|
|
filename_matches = sum(1 for keyword in filter_keywords if keyword in template_filepath_lower) |
|
|
type_matches = sum(1 for keyword in filter_keywords if keyword in template_type_lower) |
|
|
|
|
|
|
|
|
if filename_matches > 0 or type_matches > 0: |
|
|
matching_templates.append(template_id) |
|
|
|
|
|
return matching_templates if matching_templates else list(self.parser.templates.keys()) |
|
|
|
|
|
def _calculate_content_similarity(self, transcription: str, template_id: str) -> float: |
|
|
"""Calcule la similarité de contenu via recherche vectorielle""" |
|
|
try: |
|
|
results = self.parser.search_similar_templates(transcription, k=50) |
|
|
for tid, score in results: |
|
|
if tid == template_id: |
|
|
return score |
|
|
return 0.0 |
|
|
except Exception as e: |
|
|
logging.warning(f"Erreur similarité vectorielle: {e}") |
|
|
return 0.0 |
|
|
|
|
|
def _determine_confidence_level(self, overall_score: float, fillability_score: float, analysis: Dict) -> str: |
|
|
"""Détermine le niveau de confiance global""" |
|
|
transcription_quality = analysis.get("completeness", {}).get("transcription_quality", "fair") |
|
|
|
|
|
|
|
|
quality_modifier = { |
|
|
"excellent": 1.0, |
|
|
"good": 0.9, |
|
|
"fair": 0.8, |
|
|
"poor": 0.6 |
|
|
}.get(transcription_quality, 0.8) |
|
|
|
|
|
adjusted_score = overall_score * quality_modifier |
|
|
|
|
|
if adjusted_score > 0.8 and fillability_score > 0.8: |
|
|
return "excellent" |
|
|
elif adjusted_score > 0.6 and fillability_score > 0.6: |
|
|
return "good" |
|
|
elif adjusted_score > 0.4 and fillability_score > 0.4: |
|
|
return "fair" |
|
|
else: |
|
|
return "poor" |
|
|
|
|
|
def _extract_template_data(self, section_matches: Dict[str, SectionMatch]) -> Dict[str, str]: |
|
|
"""Extrait les données prêtes pour remplir le template""" |
|
|
extracted_data = {} |
|
|
|
|
|
for section_name, match in section_matches.items(): |
|
|
if match.can_fill and match.extracted_content: |
|
|
|
|
|
content = match.extracted_content.strip() |
|
|
if content: |
|
|
extracted_data[section_name] = content |
|
|
|
|
|
return extracted_data |
|
|
|
|
|
def print_smart_results(self, matches: List[TemplateMatch]): |
|
|
"""Affichage détaillé des résultats de matching intelligent""" |
|
|
if not matches: |
|
|
print("Aucun résultat trouvé") |
|
|
return |
|
|
|
|
|
print(f"\n{'='*100}") |
|
|
print(f"RÉSULTATS DE MATCHING INTELLIGENT - {len(matches)} templates analysés") |
|
|
print(f"{'='*100}") |
|
|
|
|
|
for i, match in enumerate(matches, 1): |
|
|
print(f"\nTEMPLATE #{i}") |
|
|
print(f"{'='*60}") |
|
|
print(f"ID: {match.template_id}") |
|
|
print(f"Score global: {match.overall_score:.3f}") |
|
|
print(f"Confiance: {match.confidence_level}") |
|
|
print(f"Template: {os.path.basename(match.template_info.filepath)}") |
|
|
print(f"Médecin: {match.template_info.medecin}") |
|
|
|
|
|
print(f"\nSCORES DÉTAILLÉS:") |
|
|
print(f" • Type de document: {match.type_match_score:.3f}") |
|
|
print(f" • Nom de fichier: {match.filename_match_score:.3f}") |
|
|
print(f" • Médecin: {match.physician_match_score:.3f}") |
|
|
print(f" • Centre: {match.center_match_score:.3f}") |
|
|
print(f" • Contenu: {match.content_match_score:.3f}") |
|
|
print(f" • Remplissage: {match.fillability_score:.3f}") |
|
|
|
|
|
|
|
|
if match.filename_indicators: |
|
|
print(f"\nINDICATEURS NOM DE FICHIER:") |
|
|
print(f" • Correspondances: {', '.join(match.filename_indicators)}") |
|
|
|
|
|
print(f"\nCAPACITÉ DE REMPLISSAGE:") |
|
|
print(f" • Peut être rempli: {'OUI' if match.can_be_filled else 'NON'}") |
|
|
print(f" • Pourcentage: {match.filling_percentage:.1f}%") |
|
|
|
|
|
if match.section_matches: |
|
|
fillable = [s for s in match.section_matches.values() if s.can_fill] |
|
|
missing = [s for s in match.section_matches.values() if not s.can_fill] |
|
|
|
|
|
print(f" • Sections remplissables: {len(fillable)}/{len(match.section_matches)}") |
|
|
|
|
|
if fillable: |
|
|
print(f" • Remplissables: {', '.join([s.section_name for s in fillable])}") |
|
|
|
|
|
if missing: |
|
|
print(f" • Manquantes: {', '.join([s.section_name for s in missing])}") |
|
|
|
|
|
if match.extracted_data: |
|
|
print(f"\nDONNÉES EXTRAITES:") |
|
|
for section, content in match.extracted_data.items(): |
|
|
preview = content[:100] + "..." if len(content) > 100 else content |
|
|
print(f" • {section}: {preview}") |
|
|
|
|
|
print(f"{'='*60}") |
|
|
|
|
|
def get_best_fillable_match(self, transcription: str, transcription_filename: str = "") -> Optional[TemplateMatch]: |
|
|
"""Retourne le meilleur template qui peut être effectivement rempli""" |
|
|
matches = self.smart_match_transcription(transcription, transcription_filename, k=10) |
|
|
|
|
|
|
|
|
fillable_matches = [m for m in matches if m.can_be_filled and m.fillability_score > 0.6] |
|
|
|
|
|
return fillable_matches[0] if fillable_matches else None |
|
|
|
|
|
def test_with_provided_example(self): |
|
|
"""Teste le système avec l'exemple fourni par l'utilisateur""" |
|
|
|
|
|
|
|
|
transcription_filename = "default.73.931915433.rtf_3650535_radiologie.doc" |
|
|
transcription_content = """**Technique :** 3 plans T2, diffusion axiale, T2 grand champ et T1 Dixon. |
|
|
**Résultats :** |
|
|
* L'utérus est antéversé, antéfléchi, latéralisé à droite, de taille normale pour l'âge. |
|
|
* L'endomètre est fin, mesurant moins de 2 mm. |
|
|
* Pas d'adénomyose franche. |
|
|
* Aspect normal du col utérin et du vagin. |
|
|
* L'ovaire droit, en position postérieure, mesure 18 x 11 mm avec présence de 4 follicules. |
|
|
* L'ovaire gauche, en position latéro-utérine, présente un volumineux endométriome de 45 mm, typique en hypersignal T1 Dixon. |
|
|
* Deuxième endométriome accolé à l'ovaire droit, périphérique, mesurant 13 mm. |
|
|
* Pas d'épaississement marqué du torus ni des ligaments utéro-sacrés. |
|
|
* Pas d'autre localisation pelvienne. |
|
|
* Pas d'épanchement pelvien. |
|
|
* Pas d'anomalie de la vessie. |
|
|
* Pas d'adénomégalie pelvienne, pas de dilatation des uretères. |
|
|
**Conclusion :** |
|
|
* Endométriome ovarien droit périphérique de 13 mm. |
|
|
* Endométriome ovarien gauche centro-ovarien de 45 mm.""" |
|
|
|
|
|
print("ANALYSE DE L'EXEMPLE FOURNI") |
|
|
print("="*80) |
|
|
print(f"Nom de fichier: {transcription_filename}") |
|
|
print(f"Contenu: {len(transcription_content.split())} mots") |
|
|
|
|
|
|
|
|
filename_analysis = self.analyze_filename(transcription_filename) |
|
|
print(f"\nANALYSE DU NOM DE FICHIER:") |
|
|
print(f"Mots-clés médicaux: {filename_analysis.medical_keywords}") |
|
|
print(f"Indicateurs de type: {filename_analysis.document_type_indicators}") |
|
|
print(f"Spécialités: {filename_analysis.specialty_indicators}") |
|
|
print(f"Centres: {filename_analysis.center_indicators}") |
|
|
print(f"Régions anatomiques: {filename_analysis.anatomical_regions}") |
|
|
print(f"Type de procédure: {filename_analysis.procedure_type}") |
|
|
print(f"Score de confiance: {filename_analysis.confidence_score:.3f}") |
|
|
|
|
|
|
|
|
print(f"\nMATCHING EN COURS...") |
|
|
results = self.smart_match_transcription(transcription_content, transcription_filename, k=5) |
|
|
|
|
|
|
|
|
self.print_smart_results(results) |
|
|
|
|
|
|
|
|
best_match = self.get_best_fillable_match(transcription_content, transcription_filename) |
|
|
if best_match: |
|
|
print(f"\nMEILLEUR TEMPLATE REMPLISSABLE:") |
|
|
print(f"Template: {best_match.template_id}") |
|
|
print(f"Score global: {best_match.overall_score:.3f}") |
|
|
print(f"Score nom de fichier: {best_match.filename_match_score:.3f}") |
|
|
print(f"Indicateurs nom de fichier: {', '.join(best_match.filename_indicators)}") |
|
|
print(f"Capacité de remplissage: {best_match.filling_percentage:.1f}%") |
|
|
|
|
|
def main(): |
|
|
"""Fonction principale pour tester le matching intelligent avec noms de fichiers""" |
|
|
|
|
|
|
|
|
db_path = input("Chemin vers la base de données (templates/medical_templates.pkl): ").strip() |
|
|
|
|
|
if not db_path: |
|
|
db_path = "medical_templates.pkl" |
|
|
|
|
|
if not os.path.exists(db_path): |
|
|
print(f"Fichier de base de données non trouvé: {db_path}") |
|
|
return |
|
|
|
|
|
print(f"\nInitialisation du système de matching intelligent...") |
|
|
|
|
|
|
|
|
matcher = SmartTranscriptionMatcher(db_path) |
|
|
|
|
|
|
|
|
print(f"\nOPTIONS DE TEST:") |
|
|
print("1. Utiliser l'exemple fourni (radiologie)") |
|
|
print("2. Saisie manuelle") |
|
|
print("3. Lecture depuis fichier") |
|
|
|
|
|
choice = input("\nChoisissez une option (1-3): ").strip() |
|
|
|
|
|
if choice == "1": |
|
|
|
|
|
matcher.test_with_provided_example() |
|
|
return |
|
|
|
|
|
elif choice == "2": |
|
|
|
|
|
transcription_filename = input("Nom du fichier de transcription: ").strip() |
|
|
print("\nEntrez votre transcription (tapez 'FIN' sur une ligne vide pour terminer):") |
|
|
lines = [] |
|
|
while True: |
|
|
line = input() |
|
|
if line.strip() == 'FIN': |
|
|
break |
|
|
lines.append(line) |
|
|
transcription = '\n'.join(lines) |
|
|
|
|
|
elif choice == "3": |
|
|
|
|
|
filepath = input("Chemin vers le fichier de transcription: ").strip() |
|
|
try: |
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
|
transcription = f.read() |
|
|
transcription_filename = os.path.basename(filepath) |
|
|
print(f"Fichier lu: {len(transcription.split())} mots") |
|
|
except Exception as e: |
|
|
print(f"Erreur de lecture: {e}") |
|
|
return |
|
|
|
|
|
else: |
|
|
print("Choix invalide") |
|
|
return |
|
|
|
|
|
if not transcription.strip(): |
|
|
print("Aucune transcription fournie") |
|
|
return |
|
|
|
|
|
print(f"\nAnalyse intelligente en cours...") |
|
|
|
|
|
|
|
|
results = matcher.smart_match_transcription(transcription, transcription_filename, k=5) |
|
|
|
|
|
|
|
|
matcher.print_smart_results(results) |
|
|
|
|
|
|
|
|
best_fillable = matcher.get_best_fillable_match(transcription, transcription_filename) |
|
|
|
|
|
if best_fillable: |
|
|
print(f"\nMEILLEUR TEMPLATE REMPLISSABLE:") |
|
|
print(f"{'='*60}") |
|
|
print(f"Template: {best_fillable.template_id}") |
|
|
print(f"Score global: {best_fillable.overall_score:.3f}") |
|
|
print(f"Score nom de fichier: {best_fillable.filename_match_score:.3f}") |
|
|
print(f"Indicateurs fichier: {', '.join(best_fillable.filename_indicators)}") |
|
|
print(f"Capacité de remplissage: {best_fillable.filling_percentage:.1f}%") |
|
|
print(f"Confiance: {best_fillable.confidence_level}") |
|
|
|
|
|
if best_fillable.extracted_data: |
|
|
print(f"\nTEMPLATE PRÊT À REMPLIR:") |
|
|
print(f"Sections avec données extraites:") |
|
|
for section, content in best_fillable.extracted_data.items(): |
|
|
print(f"\n[{section.upper()}]") |
|
|
print(f"{content}") |
|
|
|
|
|
|
|
|
show_details = input(f"\nAfficher les détails complets du template? (y/n): ").strip().lower() |
|
|
if show_details == 'y': |
|
|
matcher.parser.print_template_summary(best_fillable.template_id) |
|
|
|
|
|
|
|
|
generate_filled = input(f"\nGénérer le template rempli? (y/n): ").strip().lower() |
|
|
if generate_filled == 'y': |
|
|
generate_filled_template(matcher, best_fillable, transcription) |
|
|
else: |
|
|
print(f"\nAucun template ne peut être suffisamment rempli avec cette transcription") |
|
|
|
|
|
if results: |
|
|
print(f"\nMeilleurs candidats (mais insuffisamment remplissables):") |
|
|
for i, result in enumerate(results[:3], 1): |
|
|
print(f"{i}. {result.template_id} - Score: {result.overall_score:.3f}") |
|
|
print(f" Score fichier: {result.filename_match_score:.3f}") |
|
|
print(f" Remplissage: {result.filling_percentage:.1f}%") |
|
|
|
|
|
def generate_filled_template(matcher: SmartTranscriptionMatcher, best_match: TemplateMatch, transcription: str): |
|
|
"""Génère un template rempli avec les données extraites""" |
|
|
|
|
|
print(f"\nGÉNÉRATION DU TEMPLATE REMPLI") |
|
|
print(f"{'='*80}") |
|
|
|
|
|
try: |
|
|
|
|
|
template_info = best_match.template_info |
|
|
|
|
|
|
|
|
if os.path.exists(template_info.filepath): |
|
|
with open(template_info.filepath, 'r', encoding='utf-8') as f: |
|
|
template_content = f.read() |
|
|
else: |
|
|
print(f"Fichier template non trouvé: {template_info.filepath}") |
|
|
return |
|
|
|
|
|
filled_content = template_content |
|
|
replacement_count = 0 |
|
|
|
|
|
|
|
|
for section_name, extracted_content in best_match.extracted_data.items(): |
|
|
|
|
|
patterns = [ |
|
|
f"[{section_name.upper()}]", |
|
|
f"[{section_name}]", |
|
|
f"{{{section_name}}}", |
|
|
f"__{section_name}__", |
|
|
f"<!-- {section_name} -->", |
|
|
f"_{section_name}_", |
|
|
] |
|
|
|
|
|
|
|
|
section_keywords = section_name.lower().split() |
|
|
for keyword in section_keywords: |
|
|
patterns.extend([ |
|
|
f"[{keyword.upper()}]", |
|
|
f"{{{keyword}}}", |
|
|
f"__{keyword}__" |
|
|
]) |
|
|
|
|
|
|
|
|
for pattern in patterns: |
|
|
if pattern in filled_content: |
|
|
filled_content = filled_content.replace(pattern, extracted_content) |
|
|
replacement_count += 1 |
|
|
print(f"Section '{section_name}' remplie ({pattern})") |
|
|
break |
|
|
else: |
|
|
|
|
|
lines = filled_content.split('\n') |
|
|
for i, line in enumerate(lines): |
|
|
if any(keyword in line.lower() for keyword in section_keywords): |
|
|
|
|
|
lines.insert(i + 1, f"\n{extracted_content}\n") |
|
|
filled_content = '\n'.join(lines) |
|
|
replacement_count += 1 |
|
|
print(f"Section '{section_name}' insérée après ligne similaire") |
|
|
break |
|
|
else: |
|
|
print(f"Section '{section_name}' non intégrée - pattern non trouvé") |
|
|
|
|
|
|
|
|
output_filename = f"template_rempli_{best_match.template_id}.txt" |
|
|
try: |
|
|
with open(output_filename, 'w', encoding='utf-8') as f: |
|
|
f.write(filled_content) |
|
|
print(f"\nTemplate rempli sauvegardé: {output_filename}") |
|
|
except Exception as e: |
|
|
print(f"Erreur lors de la sauvegarde: {e}") |
|
|
|
|
|
|
|
|
show_preview = input(f"\nAfficher un aperçu du template rempli? (y/n): ").strip().lower() |
|
|
if show_preview == 'y': |
|
|
print(f"\n{'='*80}") |
|
|
print(f"APERÇU DU TEMPLATE REMPLI") |
|
|
print(f"{'='*80}") |
|
|
|
|
|
|
|
|
preview = filled_content[:2000] |
|
|
if len(filled_content) > 2000: |
|
|
preview += "\n\n[... Tronqué pour l'aperçu ...]" |
|
|
|
|
|
print(preview) |
|
|
print(f"\n{'='*80}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Erreur lors de la génération: {e}") |
|
|
logging.error(f"Erreur génération template: {e}") |
|
|
|
|
|
def analyze_transcription_quality(transcription: str) -> Dict: |
|
|
"""Analyse rapide de la qualité d'une transcription""" |
|
|
|
|
|
words = transcription.split() |
|
|
sentences = transcription.split('.') |
|
|
|
|
|
|
|
|
metrics = { |
|
|
"word_count": len(words), |
|
|
"sentence_count": len([s for s in sentences if s.strip()]), |
|
|
"avg_sentence_length": len(words) / max(len(sentences), 1), |
|
|
"has_medical_terms": bool(re.search(r'\b(mm|cm|ml|IRM|scanner|échographie|résultats?|conclusion)\b', transcription.lower())), |
|
|
"has_measurements": bool(re.search(r'\d+\s*(mm|cm|ml)', transcription)), |
|
|
"has_sections": bool(re.search(r'\b(technique|résultats?|conclusion|indication)\b', transcription.lower())), |
|
|
"structure_score": 0 |
|
|
} |
|
|
|
|
|
|
|
|
structure_indicators = ['technique', 'résultat', 'conclusion', 'indication', 'observation'] |
|
|
structure_count = sum(1 for indicator in structure_indicators if indicator in transcription.lower()) |
|
|
metrics["structure_score"] = min(1.0, structure_count / 3.0) |
|
|
|
|
|
|
|
|
if (metrics["word_count"] > 100 and |
|
|
metrics["has_medical_terms"] and |
|
|
metrics["has_sections"] and |
|
|
metrics["structure_score"] > 0.5): |
|
|
quality = "excellent" |
|
|
elif (metrics["word_count"] > 50 and |
|
|
metrics["has_medical_terms"] and |
|
|
metrics["structure_score"] > 0.3): |
|
|
quality = "good" |
|
|
elif metrics["word_count"] > 20 and metrics["has_medical_terms"]: |
|
|
quality = "fair" |
|
|
else: |
|
|
quality = "poor" |
|
|
|
|
|
metrics["overall_quality"] = quality |
|
|
return metrics |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |