|
|
import re |
|
|
import json |
|
|
import numpy as np |
|
|
import os |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
from dataclasses import dataclass, field |
|
|
from pathlib import Path |
|
|
import pickle |
|
|
from sentence_transformers import SentenceTransformer |
|
|
import faiss |
|
|
from docx import Document |
|
|
import logging |
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain.prompts import ChatPromptTemplate |
|
|
from docx import Document |
|
|
from docx.shared import RGBColor |
|
|
import glob |
|
|
import logging |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
|
|
|
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2") |
|
|
GPT_MODEL = os.getenv("GPT_MODEL", "gpt-5") |
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
|
|
|
|
|
|
def setup_logging(log_file: str = None): |
|
|
"""Configure le système de logging""" |
|
|
if log_file is None: |
|
|
log_file = f"medical_parser_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" |
|
|
|
|
|
|
|
|
log_dir = "logs" |
|
|
if not os.path.exists(log_dir): |
|
|
os.makedirs(log_dir) |
|
|
|
|
|
log_path = os.path.join(log_dir, log_file) |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
|
handlers=[ |
|
|
logging.FileHandler(log_path, encoding='utf-8'), |
|
|
logging.StreamHandler() |
|
|
] |
|
|
) |
|
|
|
|
|
return log_path |
|
|
|
|
|
|
|
|
log_path = setup_logging() |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class TemplateInfo: |
|
|
"""Structure pour stocker les informations d'un template""" |
|
|
id: str |
|
|
type: str |
|
|
has_asr_zone: bool |
|
|
asr_tag_position: int |
|
|
detected_sections: List[str] |
|
|
medecin: str |
|
|
embedding: np.ndarray |
|
|
filepath: str |
|
|
content: str |
|
|
asr_context: str = "" |
|
|
sections_data: Dict = field(default_factory=dict) |
|
|
user_fields: List[str] = field(default_factory=list) |
|
|
|
|
|
class MedicalTemplateParser: |
|
|
"""Parser pour templates médicaux avec base vectorielle et GPT""" |
|
|
|
|
|
def __init__(self, model_name: str = EMBEDDING_MODEL): |
|
|
""" |
|
|
Initialise le parser avec un modèle d'embedding et GPT |
|
|
|
|
|
Args: |
|
|
model_name: Nom du modèle SentenceTransformer à utiliser |
|
|
""" |
|
|
self.model = SentenceTransformer(model_name) |
|
|
self.templates: Dict[str, TemplateInfo] = {} |
|
|
self.vector_index = None |
|
|
self.template_ids = [] |
|
|
|
|
|
|
|
|
self.llm = None |
|
|
self.section_classifier = None |
|
|
self._initialize_gpt() |
|
|
|
|
|
|
|
|
self.document_types = { |
|
|
"compte_rendu_imagerie": ["imagerie", "scanner", "IRM", "échographie", "radiologie", "TECHNIQUE", "RESULTATS"], |
|
|
"lettre_confrere": ["confrère", "cher", "collègue", "salutations", "cordialement"], |
|
|
"resultats_laboratoire": ["laboratoire", "analyses", "biologie", "résultats", "valeurs"], |
|
|
"demande_examen": ["demande", "prescription", "examen", "bilan"], |
|
|
"ordonnance": ["ordonnance", "prescription", "posologie", "traitement"] |
|
|
} |
|
|
|
|
|
def _initialize_gpt(self): |
|
|
"""Initialise le modèle GPT pour l'analyse des sections""" |
|
|
api_key = OPENAI_API_KEY |
|
|
if not api_key: |
|
|
logger.warning("OPENAI_API_KEY non définie. L'analyse GPT ne sera pas disponible.") |
|
|
return |
|
|
|
|
|
try: |
|
|
self.llm = ChatOpenAI( |
|
|
model=GPT_MODEL, |
|
|
temperature=0, |
|
|
max_tokens=4000, |
|
|
api_key=api_key |
|
|
) |
|
|
|
|
|
|
|
|
section_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", """ |
|
|
Vous êtes un expert en analyse de documents médicaux. Je vais vous fournir le texte complet d'un rapport médical. |
|
|
IMPORTANT : Réponds UNIQUEMENT avec un JSON valide. Aucun texte supplémentaire, aucune explication, aucune balise markdown. |
|
|
|
|
|
Votre tâche est de : |
|
|
|
|
|
1. **Identifier le type de document** : |
|
|
- Si le document contient "Destinataire :" → c'est une "lettre médicale" |
|
|
- Si le document contient des mots-clés d'examen (SCANNER, IRM, ECHOGRAPHIE, RADIO, etc.) → c'est un "rapport médical" |
|
|
- Si le document contient "TITRE" seul → c'est un "rapport médical" (pas type "TITRE") |
|
|
- Sinon → "rapport médical" par défaut |
|
|
|
|
|
2. **Extraire les informations du centre médical** (si elles existent) : |
|
|
- Chercher le nom du centre/association/hôpital |
|
|
- Chercher l'adresse si présente |
|
|
- Chercher le téléphone si présent |
|
|
- Chercher le service médical (ex: "Service D'IMAGERIE MEDICALE") |
|
|
- Chercher l'équipement médical mentionné |
|
|
- Ces informations doivent être stockées dans un champ "center_info" séparé |
|
|
- Si aucune information de centre n'est trouvée, laisser center_info vide ou null |
|
|
|
|
|
3. **Identifier le médecin** (si mentionné) : |
|
|
- Chercher les signatures en fin de document (ex: "Dr Eric AUBERTON") |
|
|
- Chercher les mentions "PRESCRIPTEUR :" suivi du nom |
|
|
- Chercher les formules comme "DR M NOM Prénom" où NOM/Prénom sont des champs à remplir |
|
|
- Si aucun médecin n'est identifié, retourner "Non spécifié" |
|
|
|
|
|
4. **Identifier les sections à remplir** : |
|
|
Une section est définie par : |
|
|
- Un champ d'information suivi de deux-points : "Patient:", "Initiales:", "Date:", "Médecin:", etc. |
|
|
- Un titre de section suivi de deux-points : "Indication:", "Technique:", "Résultats:", "Conclusion:", "Compte-rendu:", etc. |
|
|
- Une ligne seule en majuscules représentant un champ à remplir : "TITRE", "CLINIQUE", "TECHNIQUE", "RÉSULTATS", "CONCLUSION" |
|
|
- Pour les lettres : "Destinataire:" et le contenu principal de la lettre (zone ASR_VOX) |
|
|
|
|
|
**EXCLUSIONS importantes** : |
|
|
- Les titres encadrés de tirets (ex: "- EXAMEN TOMODENSITOMETRIQUE THORACIQUE -") → document_type uniquement |
|
|
- Les informations d'en-tête du centre médical → center_info uniquement |
|
|
- Les informations administratives fixes → ne pas traiter comme sections |
|
|
|
|
|
5. **Pour chaque section identifiée** : |
|
|
- Extraire son contenu en collectant toutes les lignes suivantes jusqu'à la prochaine section |
|
|
- Identifier les champs à remplir par l'utilisateur : |
|
|
* Les balises <ASR_VOX> indiquent des champs à remplir |
|
|
* Les balises <ASR> indiquent des champs à remplir |
|
|
* Les textes génériques comme "xxx", "xxxx", "XXX" indiquent des champs à remplir |
|
|
* Les formules conditionnelles comme "SI(Civilité Nom usuel médecin..." indiquent des champs à remplir |
|
|
* Les balises [NOM_PATIENT], [DATE], [MEDECIN], etc. indiquent des champs à remplir |
|
|
* Les champs vides après ":" indiquent des champs à remplir |
|
|
* Les mots "NOM", "Prénom" dans le contexte médical indiquent des champs à remplir |
|
|
|
|
|
6. **Gestion spéciale des lettres médicales** : |
|
|
- "Destinataire:" est une section à remplir |
|
|
- Le contenu principal (zone ASR_VOX) doit être identifié comme "Contenu" ou "Corps de lettre" |
|
|
|
|
|
7. **Identifier les zones ASR** et leur position dans le document |
|
|
|
|
|
8. **Retourner un objet JSON valide** avec cette structure exacte : |
|
|
{{ |
|
|
"document_type": "rapport médical|lettre médicale|autre", |
|
|
"center_info": {{ |
|
|
"name": "nom du centre/association si trouvé, sinon null", |
|
|
"address": "adresse complète si trouvée, sinon null", |
|
|
"phone": "téléphone si trouvé, sinon null", |
|
|
"service": "service médical si trouvé, sinon null", |
|
|
"equipment": "équipement mentionné si trouvé, sinon null" |
|
|
}}, |
|
|
"physician": "nom du médecin identifié ou 'Non spécifié'", |
|
|
"asr_zones": [ |
|
|
{{ |
|
|
"tag": "balise ASR trouvée", |
|
|
"position": "position approximative dans le texte", |
|
|
"context": "contexte autour de la balise" |
|
|
}} |
|
|
], |
|
|
"sections": {{ |
|
|
"nom_section": {{ |
|
|
"content": "contenu brut de la section", |
|
|
"has_user_fields": true, |
|
|
"user_fields": ["liste des champs à remplir"] |
|
|
}} |
|
|
}} |
|
|
}} |
|
|
|
|
|
**Règles importantes** : |
|
|
- Extraire UNIQUEMENT les informations qui existent réellement dans le document |
|
|
- Les informations administratives du centre ne sont PAS des sections à remplir (si elles existent) |
|
|
- Si aucune information de centre n'est trouvée, laisser les champs center_info à null |
|
|
- "TITRE" seul indique un rapport médical, pas un type "TITRE" |
|
|
- Les lettres ont un traitement spécial avec Destinataire + Contenu principal |
|
|
- Identifier le médecin quand c'est possible, sinon "Non spécifié" |
|
|
- Distinguer clairement les champs à remplir des informations fixes |
|
|
- Adaptation flexible : tous les documents n'ont pas la même structure |
|
|
|
|
|
Répondez UNIQUEMENT avec le JSON—aucun commentaire supplémentaire. |
|
|
"""), |
|
|
("human", "Voici le texte complet du rapport médical :\n\n{document_text}\n\nExtrayez toutes les sections, identifiez les champs à remplir et les zones ASR.") |
|
|
]) |
|
|
|
|
|
self.section_classifier = section_prompt | self.llm |
|
|
print("✅ GPT initialisé avec succès") |
|
|
logger.info(f"✅ GPT initialisé avec succès") |
|
|
|
|
|
except Exception as e: |
|
|
logger.info(f"❌ Erreur lors de l'initialisation de GPT: {e}") |
|
|
|
|
|
self.llm = None |
|
|
self.section_classifier = None |
|
|
|
|
|
def extract_text_from_docx(self, filepath: str) -> Tuple[str, Dict]: |
|
|
""" |
|
|
Extrait le texte d'un fichier Word en préservant la structure |
|
|
|
|
|
Args: |
|
|
filepath: Chemin vers le fichier DOCX |
|
|
|
|
|
Returns: |
|
|
Tuple[str, Dict]: (texte_complet, informations_structure) |
|
|
""" |
|
|
logger.info(f"📄 Extraction du texte DOCX: {os.path.basename(filepath)}") |
|
|
|
|
|
try: |
|
|
doc = Document(filepath) |
|
|
text_content = [] |
|
|
structure_info = { |
|
|
"paragraphs": [], |
|
|
"tables": [], |
|
|
"headers": [], |
|
|
"footers": [], |
|
|
"styles": [], |
|
|
"formatting": [] |
|
|
} |
|
|
|
|
|
|
|
|
for i, paragraph in enumerate(doc.paragraphs): |
|
|
para_text = paragraph.text.strip() |
|
|
if para_text: |
|
|
text_content.append(para_text) |
|
|
|
|
|
|
|
|
para_info = { |
|
|
"index": i, |
|
|
"text": para_text, |
|
|
"style": paragraph.style.name if paragraph.style else "Normal", |
|
|
"alignment": str(paragraph.alignment) if paragraph.alignment else "None", |
|
|
"runs": [] |
|
|
} |
|
|
|
|
|
|
|
|
for run in paragraph.runs: |
|
|
if run.text.strip(): |
|
|
run_info = { |
|
|
"text": run.text, |
|
|
"bold": run.bold, |
|
|
"italic": run.italic, |
|
|
"underline": run.underline, |
|
|
"font_name": run.font.name if run.font.name else "Default", |
|
|
"font_size": run.font.size.pt if run.font.size else None, |
|
|
"color": self._get_color_info(run.font.color) if run.font.color else None |
|
|
} |
|
|
para_info["runs"].append(run_info) |
|
|
|
|
|
structure_info["paragraphs"].append(para_info) |
|
|
structure_info["styles"].append(paragraph.style.name if paragraph.style else "Normal") |
|
|
|
|
|
|
|
|
for table_idx, table in enumerate(doc.tables): |
|
|
table_info = { |
|
|
"index": table_idx, |
|
|
"rows": len(table.rows), |
|
|
"cols": len(table.columns), |
|
|
"content": [] |
|
|
} |
|
|
|
|
|
table_text = [] |
|
|
for row_idx, row in enumerate(table.rows): |
|
|
row_data = [] |
|
|
row_text = [] |
|
|
for cell_idx, cell in enumerate(row.cells): |
|
|
cell_text = cell.text.strip() |
|
|
row_data.append(cell_text) |
|
|
row_text.append(cell_text) |
|
|
if cell_text: |
|
|
text_content.append(cell_text) |
|
|
|
|
|
table_info["content"].append(row_data) |
|
|
table_text.append(" | ".join(row_text)) |
|
|
|
|
|
structure_info["tables"].append(table_info) |
|
|
|
|
|
text_content.extend(table_text) |
|
|
|
|
|
|
|
|
for section in doc.sections: |
|
|
|
|
|
if section.header: |
|
|
header_text = [] |
|
|
for paragraph in section.header.paragraphs: |
|
|
if paragraph.text.strip(): |
|
|
header_text.append(paragraph.text.strip()) |
|
|
text_content.append(paragraph.text.strip()) |
|
|
|
|
|
if header_text: |
|
|
structure_info["headers"].append({ |
|
|
"content": header_text, |
|
|
"section_index": doc.sections.index(section) |
|
|
}) |
|
|
|
|
|
|
|
|
if section.footer: |
|
|
footer_text = [] |
|
|
for paragraph in section.footer.paragraphs: |
|
|
if paragraph.text.strip(): |
|
|
footer_text.append(paragraph.text.strip()) |
|
|
text_content.append(paragraph.text.strip()) |
|
|
|
|
|
if footer_text: |
|
|
structure_info["footers"].append({ |
|
|
"content": footer_text, |
|
|
"section_index": doc.sections.index(section) |
|
|
}) |
|
|
|
|
|
|
|
|
structure_info["styles"] = list(set(structure_info["styles"])) |
|
|
|
|
|
|
|
|
final_text = "\n".join(text_content) |
|
|
|
|
|
logger.info(f"✅ Texte extrait avec succès:") |
|
|
logger.info(f" - Paragraphes: {len(structure_info['paragraphs'])}") |
|
|
logger.info(f" - Tableaux: {len(structure_info['tables'])}") |
|
|
logger.info(f" - En-têtes: {len(structure_info['headers'])}") |
|
|
logger.info(f" - Pieds de page: {len(structure_info['footers'])}") |
|
|
logger.info(f" - Styles utilisés: {len(structure_info['styles'])}") |
|
|
|
|
|
return final_text, structure_info |
|
|
|
|
|
except Exception as e: |
|
|
logger.info(f"❌ Erreur lors de l'extraction du texte DOCX de {filepath}: {e}") |
|
|
return "", {} |
|
|
|
|
|
def _get_color_info(self, color): |
|
|
"""Extrait les informations de couleur d'un run""" |
|
|
try: |
|
|
if color.rgb: |
|
|
return f"rgb({color.rgb.red}, {color.rgb.green}, {color.rgb.blue})" |
|
|
elif color.theme_color: |
|
|
return f"theme_{color.theme_color}" |
|
|
else: |
|
|
return "default" |
|
|
except: |
|
|
return "default" |
|
|
|
|
|
def analyze_document_with_gpt(self, text: str) -> Dict: |
|
|
""" |
|
|
Analyse le document avec GPT pour extraire sections et zones ASR |
|
|
|
|
|
Args: |
|
|
text: Texte complet du document |
|
|
|
|
|
Returns: |
|
|
Dict: Résultats de l'analyse GPT |
|
|
""" |
|
|
if not self.section_classifier: |
|
|
logger.info("⚠️ GPT non disponible, utilisation des méthodes classiques") |
|
|
return self._fallback_analysis(text) |
|
|
|
|
|
try: |
|
|
logger.info("🔍 Analyse du document avec GPT...") |
|
|
response = self.section_classifier.invoke({"document_text": text}) |
|
|
result = response.content.strip() |
|
|
|
|
|
if not result: |
|
|
logger.info("❌ Réponse GPT vide") |
|
|
return self._fallback_analysis(text) |
|
|
logger.info(f"📝 Réponse GPT (premiers 200 caractères): {result[:200]}...") |
|
|
if result.startswith("```json"): |
|
|
result = result[7:] |
|
|
if result.endswith("```"): |
|
|
result = result[:-3] |
|
|
|
|
|
|
|
|
result = result.strip() |
|
|
|
|
|
if not result.startswith('{') or not result.endswith('}'): |
|
|
logger.info(f"❌ Format JSON invalide. Début: '{result[:50]}...' Fin: '...{result[-50:]}'") |
|
|
return self._fallback_analysis(text) |
|
|
|
|
|
analysis_data = json.loads(result) |
|
|
logger.info("✅ Analyse GPT terminée avec succès") |
|
|
return analysis_data |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
logger.info(f"❌ Erreur de parsing JSON GPT: {e}") |
|
|
return self._fallback_analysis(text) |
|
|
except Exception as e: |
|
|
logger.info(f"❌ Erreur lors de l'analyse GPT: {e}") |
|
|
return self._fallback_analysis(text) |
|
|
|
|
|
def _fallback_analysis(self, text: str) -> Dict: |
|
|
"""Analyse de fallback sans GPT""" |
|
|
logger.info("📊 Utilisation de l'analyse classique...") |
|
|
|
|
|
|
|
|
has_asr, asr_pos, asr_context = self.detect_asr_zone_classic(text) |
|
|
|
|
|
|
|
|
sections = self.extract_sections_classic(text) |
|
|
|
|
|
|
|
|
doc_type = self.classify_document_type(text, sections) |
|
|
|
|
|
return { |
|
|
"document_type": doc_type, |
|
|
"asr_zones": [{"tag": "<ASR_VOX>", "position": asr_pos, "context": asr_context}] if has_asr else [], |
|
|
"sections": {section: {"content": "", "has_user_fields": False, "user_fields": []} for section in sections} |
|
|
} |
|
|
|
|
|
def detect_asr_zone_classic(self, text: str) -> Tuple[bool, int, str]: |
|
|
""" |
|
|
Détection ASR classique (fallback) |
|
|
|
|
|
Returns: |
|
|
(has_asr_zone, position, context_around_asr) |
|
|
""" |
|
|
asr_patterns = [ |
|
|
r"<ASR_VOX>", |
|
|
r"<ASR>", |
|
|
r"\[DICTEE\]", |
|
|
r"\[ASR\]", |
|
|
r"<!-- ASR -->" |
|
|
] |
|
|
|
|
|
for pattern in asr_patterns: |
|
|
match = re.search(pattern, text, re.IGNORECASE) |
|
|
if match: |
|
|
position = match.start() |
|
|
start_context = max(0, position - 200) |
|
|
end_context = min(len(text), position + 200) |
|
|
context = text[start_context:end_context] |
|
|
|
|
|
return True, position, context |
|
|
|
|
|
return False, -1, "" |
|
|
|
|
|
def extract_sections_classic(self, text: str) -> List[str]: |
|
|
"""Extraction de sections classique (fallback)""" |
|
|
sections = set() |
|
|
|
|
|
section_patterns = [ |
|
|
r"([A-ZÉÈÀÇÊ][A-ZÉÈÀÇÊ\s]{2,}):", |
|
|
r"([A-ZÉÈÀÇÊ][a-zéèàçê\s]{3,}):", |
|
|
r"(\d+\.\s*[A-ZÉÈÀÇÊ][a-zéèàçê\s]{3,}):", |
|
|
] |
|
|
|
|
|
for pattern in section_patterns: |
|
|
matches = re.findall(pattern, text, re.MULTILINE) |
|
|
for match in matches: |
|
|
section = match.strip().rstrip(':').strip() |
|
|
if len(section) > 2 and len(section) < 50: |
|
|
sections.add(section) |
|
|
|
|
|
return sorted(list(sections)) |
|
|
|
|
|
def classify_document_type(self, text: str, sections: List[str]) -> str: |
|
|
"""Classifie le type de document basé sur le contenu et les sections""" |
|
|
text_lower = text.lower() |
|
|
sections_lower = [s.lower() for s in sections] |
|
|
all_text = text_lower + " " + " ".join(sections_lower) |
|
|
|
|
|
max_score = 0 |
|
|
best_type = "autre" |
|
|
|
|
|
for doc_type, keywords in self.document_types.items(): |
|
|
score = 0 |
|
|
for keyword in keywords: |
|
|
if keyword.lower() in all_text: |
|
|
score += 1 |
|
|
|
|
|
if doc_type == "compte_rendu_imagerie" and any("technique" in s for s in sections_lower): |
|
|
score += 2 |
|
|
|
|
|
if score > max_score: |
|
|
max_score = score |
|
|
best_type = doc_type |
|
|
|
|
|
return best_type |
|
|
|
|
|
def extract_doctor_name(self, text: str) -> str: |
|
|
"""Extrait le nom du médecin du template""" |
|
|
doctor_patterns = [ |
|
|
r"Dr\.?\s+([A-ZÉÈÀÇÊ][a-zéèàçê]+\s+[A-ZÉÈÀÇÊ][a-zéèàçê]+)", |
|
|
r"Docteur\s+([A-ZÉÈÀÇÊ][a-zéèàçê]+\s+[A-ZÉÈÀÇÊ][a-zéèàçê]+)", |
|
|
r"Praticien\s*:\s*([A-ZÉÈÀÇÊ][a-zéèàçê]+\s+[A-ZÉÈÀÇÊ][a-zéèàçê]+)", |
|
|
] |
|
|
|
|
|
for pattern in doctor_patterns: |
|
|
match = re.search(pattern, text) |
|
|
if match: |
|
|
return match.group(1).strip() |
|
|
|
|
|
return "Non spécifié" |
|
|
|
|
|
def parse_template(self, filepath: str, template_id: str = None) -> TemplateInfo: |
|
|
""" |
|
|
Parse un template et extrait toutes les informations avec GPT |
|
|
|
|
|
Args: |
|
|
filepath: Chemin vers le fichier template |
|
|
template_id: ID unique du template (optionnel) |
|
|
|
|
|
Returns: |
|
|
TemplateInfo: Informations structurées du template |
|
|
""" |
|
|
if template_id is None: |
|
|
template_id = Path(filepath).stem |
|
|
|
|
|
logger.info(f"\n📄 Traitement du fichier: {os.path.basename(filepath)}") |
|
|
|
|
|
|
|
|
if filepath.endswith('.docx'): |
|
|
text, _ = self.extract_text_from_docx(filepath) |
|
|
else: |
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
|
text = f.read() |
|
|
|
|
|
if not text.strip(): |
|
|
logger.info("❌ Aucun texte extrait du fichier") |
|
|
return None |
|
|
|
|
|
|
|
|
analysis_data = self.analyze_document_with_gpt(text) |
|
|
|
|
|
|
|
|
doc_type = analysis_data.get("document_type", "autre") |
|
|
sections_data = analysis_data.get("sections", {}) |
|
|
asr_zones = analysis_data.get("asr_zones", []) |
|
|
|
|
|
logger.info(f"📋 Type de document détecté: {doc_type}") |
|
|
logger.info(f"🔍 Zones ASR trouvées: {len(asr_zones)}") |
|
|
logger.info(f"📑 Sections détectées: {len(sections_data)}") |
|
|
|
|
|
|
|
|
has_asr = len(asr_zones) > 0 |
|
|
asr_pos = asr_zones[0]["position"] if asr_zones else -1 |
|
|
asr_context = asr_zones[0]["context"] if asr_zones else "" |
|
|
|
|
|
|
|
|
detected_sections = list(sections_data.keys()) |
|
|
|
|
|
|
|
|
user_fields = [] |
|
|
for section_data in sections_data.values(): |
|
|
if isinstance(section_data, dict) and section_data.get("has_user_fields"): |
|
|
user_fields.extend(section_data.get("user_fields", [])) |
|
|
|
|
|
|
|
|
medecin = self.extract_doctor_name(text) |
|
|
logger.info(f"👨⚕️ Médecin détecté: {medecin}") |
|
|
|
|
|
|
|
|
embedding_text = self.create_embedding_text(text, asr_context, detected_sections, doc_type) |
|
|
|
|
|
|
|
|
embedding = self.model.encode([embedding_text])[0] |
|
|
|
|
|
|
|
|
template_info = TemplateInfo( |
|
|
id=template_id, |
|
|
type=doc_type, |
|
|
has_asr_zone=has_asr, |
|
|
asr_tag_position=asr_pos, |
|
|
detected_sections=detected_sections, |
|
|
medecin=medecin, |
|
|
embedding=embedding, |
|
|
filepath=filepath, |
|
|
content=text, |
|
|
asr_context=asr_context, |
|
|
sections_data=sections_data, |
|
|
user_fields=user_fields |
|
|
) |
|
|
|
|
|
logger.info(f"✅ Template {template_id} traité avec succès") |
|
|
return template_info |
|
|
|
|
|
def create_embedding_text(self, text: str, asr_context: str, sections: List[str], doc_type: str) -> str: |
|
|
""" |
|
|
Crée le texte optimisé pour l'embedding |
|
|
|
|
|
Args: |
|
|
text: Texte complet du template |
|
|
asr_context: Contexte autour de la zone ASR |
|
|
sections: Sections détectées |
|
|
doc_type: Type de document |
|
|
|
|
|
Returns: |
|
|
str: Texte optimisé pour l'embedding |
|
|
""" |
|
|
lines = text.split('\n') |
|
|
header = ' '.join(lines[:5]) |
|
|
|
|
|
embedding_parts = [ |
|
|
f"Type: {doc_type}", |
|
|
f"Sections: {', '.join(sections[:5])}", |
|
|
f"Contexte: {header[:200]}", |
|
|
] |
|
|
|
|
|
if asr_context: |
|
|
embedding_parts.append(f"Zone ASR: {asr_context[:100]}") |
|
|
|
|
|
return ' | '.join(embedding_parts) |
|
|
|
|
|
def process_docx_folder(self, folder_path: str) -> List[TemplateInfo]: |
|
|
""" |
|
|
Traite tous les fichiers DOCX dans un dossier |
|
|
|
|
|
Args: |
|
|
folder_path: Chemin vers le dossier contenant les fichiers DOCX |
|
|
|
|
|
Returns: |
|
|
List[TemplateInfo]: Liste des templates traités |
|
|
""" |
|
|
logger.info(f"🗂️ Traitement du dossier: {folder_path}") |
|
|
|
|
|
|
|
|
docx_files = glob.glob(os.path.join(folder_path, "*.docx")) |
|
|
|
|
|
if not docx_files: |
|
|
logger.info("❌ Aucun fichier DOCX trouvé dans le dossier") |
|
|
return [] |
|
|
|
|
|
logger.info(f"📁 {len(docx_files)} fichiers DOCX trouvés") |
|
|
|
|
|
templates = [] |
|
|
for i, filepath in enumerate(docx_files, 1): |
|
|
logger.info(f"\n{'='*60}") |
|
|
logger.info(f"📄 Fichier {i}/{len(docx_files)}: {os.path.basename(filepath)}") |
|
|
logger.info(f"{'='*60}") |
|
|
|
|
|
try: |
|
|
template_info = self.parse_template(filepath) |
|
|
if template_info: |
|
|
templates.append(template_info) |
|
|
self.templates[template_info.id] = template_info |
|
|
except Exception as e: |
|
|
logger.info(f"❌ Erreur lors du traitement de {filepath}: {e}") |
|
|
continue |
|
|
|
|
|
logger.info(f"\n🎉 Traitement terminé: {len(templates)} templates traités avec succès") |
|
|
return templates |
|
|
|
|
|
def build_vector_database(self, templates: List[TemplateInfo]): |
|
|
""" |
|
|
Construit la base vectorielle avec FAISS |
|
|
|
|
|
Args: |
|
|
templates: Liste des templates parsés |
|
|
""" |
|
|
if not templates: |
|
|
logger.info("❌ Aucun template fourni pour construire la base vectorielle") |
|
|
return |
|
|
|
|
|
logger.info(f"🔧 Construction de la base vectorielle avec {len(templates)} templates...") |
|
|
|
|
|
embeddings = np.array([template.embedding for template in templates]) |
|
|
|
|
|
dimension = embeddings.shape[1] |
|
|
self.vector_index = faiss.IndexFlatIP(dimension) |
|
|
|
|
|
faiss.normalize_L2(embeddings) |
|
|
self.vector_index.add(embeddings) |
|
|
|
|
|
self.template_ids = [template.id for template in templates] |
|
|
|
|
|
logger.info(f"✅ Base vectorielle construite avec succès") |
|
|
|
|
|
def search_similar_templates(self, query_text: str, k: int = 5) -> List[Tuple[str, float]]: |
|
|
""" |
|
|
Recherche les templates similaires à une requête |
|
|
|
|
|
Args: |
|
|
query_text: Texte de la requête |
|
|
k: Nombre de résultats à retourner |
|
|
|
|
|
Returns: |
|
|
List[Tuple[str, float]]: Liste des (template_id, score) les plus similaires |
|
|
""" |
|
|
if self.vector_index is None: |
|
|
logger.info("❌ Base vectorielle non construite") |
|
|
return [] |
|
|
|
|
|
logger.info(f"🔍 Recherche pour: '{query_text}'") |
|
|
|
|
|
query_embedding = self.model.encode([query_text]) |
|
|
faiss.normalize_L2(query_embedding) |
|
|
|
|
|
scores, indices = self.vector_index.search(query_embedding, k) |
|
|
|
|
|
results = [] |
|
|
for i, (score, idx) in enumerate(zip(scores[0], indices[0])): |
|
|
if idx < len(self.template_ids): |
|
|
template_id = self.template_ids[idx] |
|
|
results.append((template_id, float(score))) |
|
|
|
|
|
return results |
|
|
|
|
|
def save_database(self, filepath: str): |
|
|
"""Sauvegarde la base vectorielle et les templates""" |
|
|
logger.info(f"💾 Sauvegarde de la base de données...") |
|
|
|
|
|
database_data = { |
|
|
'templates': self.templates, |
|
|
'template_ids': self.template_ids, |
|
|
'model_name': self.model.get_sentence_embedding_dimension() |
|
|
} |
|
|
|
|
|
with open(filepath, 'wb') as f: |
|
|
pickle.dump(database_data, f) |
|
|
|
|
|
if self.vector_index is not None: |
|
|
faiss.write_index(self.vector_index, filepath.replace('.pkl', '.faiss')) |
|
|
|
|
|
logger.info(f"✅ Base de données sauvegardée dans {filepath}") |
|
|
|
|
|
def load_database(self, filepath: str): |
|
|
"""Charge la base vectorielle et les templates""" |
|
|
logger.info(f"📂 Chargement de la base de données depuis {filepath}...") |
|
|
|
|
|
with open(filepath, 'rb') as f: |
|
|
database_data = pickle.load(f) |
|
|
|
|
|
self.templates = database_data['templates'] |
|
|
self.template_ids = database_data['template_ids'] |
|
|
|
|
|
faiss_path = filepath.replace('.pkl', '.faiss') |
|
|
if Path(faiss_path).exists(): |
|
|
self.vector_index = faiss.read_index(faiss_path) |
|
|
|
|
|
logger.info(f"✅ Base de données chargée avec succès") |
|
|
|
|
|
def get_template_info(self, template_id: str) -> Optional[TemplateInfo]: |
|
|
"""Récupère les informations d'un template par son ID""" |
|
|
return self.templates.get(template_id) |
|
|
|
|
|
def print_template_summary(self, template_id: str): |
|
|
"""Affiche un résumé des informations d'un template""" |
|
|
template = self.get_template_info(template_id) |
|
|
if template: |
|
|
logger.info(f"\n{'='*60}") |
|
|
logger.info(f"📋 Template: {template.id}") |
|
|
logger.info(f"{'='*60}") |
|
|
logger.info(f"📄 Type: {template.type}") |
|
|
logger.info(f"👨⚕️ Médecin: {template.medecin}") |
|
|
logger.info(f"🎤 Zone ASR: {'✅ Oui' if template.has_asr_zone else '❌ Non'}") |
|
|
logger.info(f"📑 Sections détectées ({len(template.detected_sections)}): {', '.join(template.detected_sections)}") |
|
|
logger.info(f"⚠️ Champs utilisateur ({len(template.user_fields)}): {', '.join(template.user_fields[:3])}{'...' if len(template.user_fields) > 3 else ''}") |
|
|
logger.info(f"📁 Fichier: {os.path.basename(template.filepath)}") |
|
|
logger.info(f"{'='*60}") |
|
|
|
|
|
|
|
|
for section_name, section_data in template.sections_data.items(): |
|
|
if isinstance(section_data, dict): |
|
|
logger.info(f"📋 Section: {section_name}") |
|
|
if section_data.get("has_user_fields"): |
|
|
fields = section_data.get('user_fields', []) |
|
|
logger.info(f" ⚠️ Champs à remplir: {', '.join(fields)}") |
|
|
else: |
|
|
logger.info(f" ✅ Section complète") |
|
|
|
|
|
|
|
|
def print_global_summary(self): |
|
|
"""Affiche un résumé global de tous les templates""" |
|
|
logger.info(f"\n{'='*80}") |
|
|
logger.info(f"📊 RÉSUMÉ GLOBAL - {len(self.templates)} TEMPLATES TRAITÉS") |
|
|
logger.info(f"{'='*80}") |
|
|
|
|
|
|
|
|
types_count = {} |
|
|
asr_count = 0 |
|
|
total_sections = 0 |
|
|
total_user_fields = 0 |
|
|
|
|
|
for template in self.templates.values(): |
|
|
types_count[template.type] = types_count.get(template.type, 0) + 1 |
|
|
if template.has_asr_zone: |
|
|
asr_count += 1 |
|
|
total_sections += len(template.detected_sections) |
|
|
total_user_fields += len(template.user_fields) |
|
|
|
|
|
logger.info(f"📈 Statistiques générales:") |
|
|
logger.info(f" - Total templates: {len(self.templates)}") |
|
|
logger.info(f" - Templates avec ASR: {asr_count}") |
|
|
logger.info(f" - Total sections: {total_sections}") |
|
|
logger.info(f" - Total champs utilisateur: {total_user_fields}") |
|
|
|
|
|
logger.info(f"\n📊 Répartition par type:") |
|
|
for doc_type, count in types_count.items(): |
|
|
logger.info(f" - {doc_type}: {count}") |
|
|
|
|
|
logger.info(f"\n📋 Templates individuels:") |
|
|
for template_id in sorted(self.templates.keys()): |
|
|
template = self.templates[template_id] |
|
|
asr_icon = "🎤" if template.has_asr_zone else "❌" |
|
|
logger.info(f" {asr_icon} {template_id} ({template.type}) - {len(template.detected_sections)} sections - {template.medecin}") |
|
|
|
|
|
def main(): |
|
|
"""Fonction principale pour traiter un dossier de fichiers docx""" |
|
|
|
|
|
|
|
|
docx_folder = input("Entrez le chemin vers le dossier contenant les fichiers docx: ").strip() |
|
|
|
|
|
if not os.path.exists(docx_folder): |
|
|
logger.info(f"❌ Le dossier {docx_folder} n'existe pas") |
|
|
return |
|
|
|
|
|
logger.info(f"\n🚀 Démarrage du traitement des fichiers docx...") |
|
|
logger.info(f"📁 Dossier source: {docx_folder}") |
|
|
|
|
|
|
|
|
parser = MedicalTemplateParser() |
|
|
|
|
|
|
|
|
templates = parser.process_docx_folder(docx_folder) |
|
|
|
|
|
if not templates: |
|
|
logger.info("❌ Aucun template traité avec succès") |
|
|
return |
|
|
|
|
|
|
|
|
parser.build_vector_database(templates) |
|
|
|
|
|
|
|
|
parser.print_global_summary() |
|
|
|
|
|
|
|
|
logger.info(f"\n{'='*80}") |
|
|
logger.info(f"📄 DÉTAILS DES TEMPLATES") |
|
|
logger.info(f"{'='*80}") |
|
|
|
|
|
for template_id in sorted(parser.templates.keys()): |
|
|
parser.print_template_summary(template_id) |
|
|
|
|
|
|
|
|
logger.info(f"\n{'='*80}") |
|
|
logger.info(f"🔍 TEST DE RECHERCHE") |
|
|
logger.info(f"{'='*80}") |
|
|
|
|
|
test_queries = [ |
|
|
"échographie abdominale", |
|
|
"scanner thoracique", |
|
|
"compte rendu imagerie", |
|
|
"résultats laboratoire" |
|
|
] |
|
|
|
|
|
for query in test_queries: |
|
|
logger.info(f"\n🔍 Recherche pour: '{query}'") |
|
|
results = parser.search_similar_templates(query, k=3) |
|
|
|
|
|
if results: |
|
|
logger.info("📊 Résultats:") |
|
|
for i, (template_id, score) in enumerate(results, 1): |
|
|
template = parser.get_template_info(template_id) |
|
|
logger.info(f" {i}. {template_id} (score: {score:.3f}) - {template.type} - {template.medecin}") |
|
|
else: |
|
|
logger.info("❌ Aucun résultat trouvé") |
|
|
|
|
|
|
|
|
save_path = os.path.join(docx_folder, 'medical_templates.pkl') |
|
|
parser.save_database(save_path) |
|
|
|
|
|
logger.info(f"\n✅ Traitement terminé avec succès!") |
|
|
logger.info(f"💾 Base de données sauvegardée: {save_path}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |