pipeline2 / template_db_creation.py
Nourhenem's picture
initial commit
f92da22 verified
import re
import json
import numpy as np
import os
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from pathlib import Path
import pickle
from sentence_transformers import SentenceTransformer
import faiss
from docx import Document
import logging
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from docx import Document
from docx.shared import RGBColor
import glob
import logging
from datetime import datetime
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
GPT_MODEL = os.getenv("GPT_MODEL", "gpt-5")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Configuration du logging
def setup_logging(log_file: str = None):
"""Configure le système de logging"""
if log_file is None:
log_file = f"medical_parser_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# Créer le dossier logs s'il n'existe pas
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_path = os.path.join(log_dir, log_file)
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_path, encoding='utf-8'),
logging.StreamHandler() # Pour afficher aussi dans la console
]
)
return log_path
# Initialiser le logging
log_path = setup_logging()
logger = logging.getLogger(__name__)
# Configuration du logging
#logging.basicConfig(level=logging.INFO)
#logger = logging.getLogger(__name__)
@dataclass
class TemplateInfo:
"""Structure pour stocker les informations d'un template"""
id: str
type: str
has_asr_zone: bool
asr_tag_position: int
detected_sections: List[str]
medecin: str
embedding: np.ndarray
filepath: str
content: str
asr_context: str = ""
sections_data: Dict = field(default_factory=dict)
user_fields: List[str] = field(default_factory=list)
class MedicalTemplateParser:
"""Parser pour templates médicaux avec base vectorielle et GPT"""
def __init__(self, model_name: str = EMBEDDING_MODEL):
"""
Initialise le parser avec un modèle d'embedding et GPT
Args:
model_name: Nom du modèle SentenceTransformer à utiliser
"""
self.model = SentenceTransformer(model_name)
self.templates: Dict[str, TemplateInfo] = {}
self.vector_index = None
self.template_ids = []
# Initialiser GPT pour l'analyse des sections
self.llm = None
self.section_classifier = None
self._initialize_gpt()
# Types de documents médicaux
self.document_types = {
"compte_rendu_imagerie": ["imagerie", "scanner", "IRM", "échographie", "radiologie", "TECHNIQUE", "RESULTATS"],
"lettre_confrere": ["confrère", "cher", "collègue", "salutations", "cordialement"],
"resultats_laboratoire": ["laboratoire", "analyses", "biologie", "résultats", "valeurs"],
"demande_examen": ["demande", "prescription", "examen", "bilan"],
"ordonnance": ["ordonnance", "prescription", "posologie", "traitement"]
}
def _initialize_gpt(self):
"""Initialise le modèle GPT pour l'analyse des sections"""
api_key = OPENAI_API_KEY
if not api_key:
logger.warning("OPENAI_API_KEY non définie. L'analyse GPT ne sera pas disponible.")
return
try:
self.llm = ChatOpenAI(
model=GPT_MODEL,
temperature=0,
max_tokens=4000,
api_key=api_key
)
# Définir le prompt pour l'analyse des sections
section_prompt = ChatPromptTemplate.from_messages([
("system", """
Vous êtes un expert en analyse de documents médicaux. Je vais vous fournir le texte complet d'un rapport médical.
IMPORTANT : Réponds UNIQUEMENT avec un JSON valide. Aucun texte supplémentaire, aucune explication, aucune balise markdown.
Votre tâche est de :
1. **Identifier le type de document** :
- Si le document contient "Destinataire :" → c'est une "lettre médicale"
- Si le document contient des mots-clés d'examen (SCANNER, IRM, ECHOGRAPHIE, RADIO, etc.) → c'est un "rapport médical"
- Si le document contient "TITRE" seul → c'est un "rapport médical" (pas type "TITRE")
- Sinon → "rapport médical" par défaut
2. **Extraire les informations du centre médical** (si elles existent) :
- Chercher le nom du centre/association/hôpital
- Chercher l'adresse si présente
- Chercher le téléphone si présent
- Chercher le service médical (ex: "Service D'IMAGERIE MEDICALE")
- Chercher l'équipement médical mentionné
- Ces informations doivent être stockées dans un champ "center_info" séparé
- Si aucune information de centre n'est trouvée, laisser center_info vide ou null
3. **Identifier le médecin** (si mentionné) :
- Chercher les signatures en fin de document (ex: "Dr Eric AUBERTON")
- Chercher les mentions "PRESCRIPTEUR :" suivi du nom
- Chercher les formules comme "DR M NOM Prénom" où NOM/Prénom sont des champs à remplir
- Si aucun médecin n'est identifié, retourner "Non spécifié"
4. **Identifier les sections à remplir** :
Une section est définie par :
- Un champ d'information suivi de deux-points : "Patient:", "Initiales:", "Date:", "Médecin:", etc.
- Un titre de section suivi de deux-points : "Indication:", "Technique:", "Résultats:", "Conclusion:", "Compte-rendu:", etc.
- Une ligne seule en majuscules représentant un champ à remplir : "TITRE", "CLINIQUE", "TECHNIQUE", "RÉSULTATS", "CONCLUSION"
- Pour les lettres : "Destinataire:" et le contenu principal de la lettre (zone ASR_VOX)
**EXCLUSIONS importantes** :
- Les titres encadrés de tirets (ex: "- EXAMEN TOMODENSITOMETRIQUE THORACIQUE -") → document_type uniquement
- Les informations d'en-tête du centre médical → center_info uniquement
- Les informations administratives fixes → ne pas traiter comme sections
5. **Pour chaque section identifiée** :
- Extraire son contenu en collectant toutes les lignes suivantes jusqu'à la prochaine section
- Identifier les champs à remplir par l'utilisateur :
* Les balises <ASR_VOX> indiquent des champs à remplir
* Les balises <ASR> indiquent des champs à remplir
* Les textes génériques comme "xxx", "xxxx", "XXX" indiquent des champs à remplir
* Les formules conditionnelles comme "SI(Civilité Nom usuel médecin..." indiquent des champs à remplir
* Les balises [NOM_PATIENT], [DATE], [MEDECIN], etc. indiquent des champs à remplir
* Les champs vides après ":" indiquent des champs à remplir
* Les mots "NOM", "Prénom" dans le contexte médical indiquent des champs à remplir
6. **Gestion spéciale des lettres médicales** :
- "Destinataire:" est une section à remplir
- Le contenu principal (zone ASR_VOX) doit être identifié comme "Contenu" ou "Corps de lettre"
7. **Identifier les zones ASR** et leur position dans le document
8. **Retourner un objet JSON valide** avec cette structure exacte :
{{
"document_type": "rapport médical|lettre médicale|autre",
"center_info": {{
"name": "nom du centre/association si trouvé, sinon null",
"address": "adresse complète si trouvée, sinon null",
"phone": "téléphone si trouvé, sinon null",
"service": "service médical si trouvé, sinon null",
"equipment": "équipement mentionné si trouvé, sinon null"
}},
"physician": "nom du médecin identifié ou 'Non spécifié'",
"asr_zones": [
{{
"tag": "balise ASR trouvée",
"position": "position approximative dans le texte",
"context": "contexte autour de la balise"
}}
],
"sections": {{
"nom_section": {{
"content": "contenu brut de la section",
"has_user_fields": true,
"user_fields": ["liste des champs à remplir"]
}}
}}
}}
**Règles importantes** :
- Extraire UNIQUEMENT les informations qui existent réellement dans le document
- Les informations administratives du centre ne sont PAS des sections à remplir (si elles existent)
- Si aucune information de centre n'est trouvée, laisser les champs center_info à null
- "TITRE" seul indique un rapport médical, pas un type "TITRE"
- Les lettres ont un traitement spécial avec Destinataire + Contenu principal
- Identifier le médecin quand c'est possible, sinon "Non spécifié"
- Distinguer clairement les champs à remplir des informations fixes
- Adaptation flexible : tous les documents n'ont pas la même structure
Répondez UNIQUEMENT avec le JSON—aucun commentaire supplémentaire.
"""),
("human", "Voici le texte complet du rapport médical :\n\n{document_text}\n\nExtrayez toutes les sections, identifiez les champs à remplir et les zones ASR.")
])
self.section_classifier = section_prompt | self.llm
print("✅ GPT initialisé avec succès")
logger.info(f"✅ GPT initialisé avec succès")
except Exception as e:
logger.info(f"❌ Erreur lors de l'initialisation de GPT: {e}")
self.llm = None
self.section_classifier = None
def extract_text_from_docx(self, filepath: str) -> Tuple[str, Dict]:
"""
Extrait le texte d'un fichier Word en préservant la structure
Args:
filepath: Chemin vers le fichier DOCX
Returns:
Tuple[str, Dict]: (texte_complet, informations_structure)
"""
logger.info(f"📄 Extraction du texte DOCX: {os.path.basename(filepath)}")
try:
doc = Document(filepath)
text_content = []
structure_info = {
"paragraphs": [],
"tables": [],
"headers": [],
"footers": [],
"styles": [],
"formatting": []
}
# Traiter les paragraphes
for i, paragraph in enumerate(doc.paragraphs):
para_text = paragraph.text.strip()
if para_text: # Ignorer les paragraphes vides
text_content.append(para_text)
# Collecter les informations de structure
para_info = {
"index": i,
"text": para_text,
"style": paragraph.style.name if paragraph.style else "Normal",
"alignment": str(paragraph.alignment) if paragraph.alignment else "None",
"runs": []
}
# Analyser les runs (formatage)
for run in paragraph.runs:
if run.text.strip():
run_info = {
"text": run.text,
"bold": run.bold,
"italic": run.italic,
"underline": run.underline,
"font_name": run.font.name if run.font.name else "Default",
"font_size": run.font.size.pt if run.font.size else None,
"color": self._get_color_info(run.font.color) if run.font.color else None
}
para_info["runs"].append(run_info)
structure_info["paragraphs"].append(para_info)
structure_info["styles"].append(paragraph.style.name if paragraph.style else "Normal")
# Traiter les tableaux
for table_idx, table in enumerate(doc.tables):
table_info = {
"index": table_idx,
"rows": len(table.rows),
"cols": len(table.columns),
"content": []
}
table_text = []
for row_idx, row in enumerate(table.rows):
row_data = []
row_text = []
for cell_idx, cell in enumerate(row.cells):
cell_text = cell.text.strip()
row_data.append(cell_text)
row_text.append(cell_text)
if cell_text:
text_content.append(cell_text)
table_info["content"].append(row_data)
table_text.append(" | ".join(row_text))
structure_info["tables"].append(table_info)
# Ajouter le contenu du tableau au texte principal
text_content.extend(table_text)
# Traiter les en-têtes et pieds de page
for section in doc.sections:
# En-têtes
if section.header:
header_text = []
for paragraph in section.header.paragraphs:
if paragraph.text.strip():
header_text.append(paragraph.text.strip())
text_content.append(paragraph.text.strip())
if header_text:
structure_info["headers"].append({
"content": header_text,
"section_index": doc.sections.index(section)
})
# Pieds de page
if section.footer:
footer_text = []
for paragraph in section.footer.paragraphs:
if paragraph.text.strip():
footer_text.append(paragraph.text.strip())
text_content.append(paragraph.text.strip())
if footer_text:
structure_info["footers"].append({
"content": footer_text,
"section_index": doc.sections.index(section)
})
# Nettoyer les styles dupliqués
structure_info["styles"] = list(set(structure_info["styles"]))
# Créer le texte final
final_text = "\n".join(text_content)
logger.info(f"✅ Texte extrait avec succès:")
logger.info(f" - Paragraphes: {len(structure_info['paragraphs'])}")
logger.info(f" - Tableaux: {len(structure_info['tables'])}")
logger.info(f" - En-têtes: {len(structure_info['headers'])}")
logger.info(f" - Pieds de page: {len(structure_info['footers'])}")
logger.info(f" - Styles utilisés: {len(structure_info['styles'])}")
return final_text, structure_info
except Exception as e:
logger.info(f"❌ Erreur lors de l'extraction du texte DOCX de {filepath}: {e}")
return "", {}
def _get_color_info(self, color):
"""Extrait les informations de couleur d'un run"""
try:
if color.rgb:
return f"rgb({color.rgb.red}, {color.rgb.green}, {color.rgb.blue})"
elif color.theme_color:
return f"theme_{color.theme_color}"
else:
return "default"
except:
return "default"
def analyze_document_with_gpt(self, text: str) -> Dict:
"""
Analyse le document avec GPT pour extraire sections et zones ASR
Args:
text: Texte complet du document
Returns:
Dict: Résultats de l'analyse GPT
"""
if not self.section_classifier:
logger.info("⚠️ GPT non disponible, utilisation des méthodes classiques")
return self._fallback_analysis(text)
try:
logger.info("🔍 Analyse du document avec GPT...")
response = self.section_classifier.invoke({"document_text": text})
result = response.content.strip()
# Vérifier si la réponse est vide
if not result:
logger.info("❌ Réponse GPT vide")
return self._fallback_analysis(text)
logger.info(f"📝 Réponse GPT (premiers 200 caractères): {result[:200]}...")
if result.startswith("```json"):
result = result[7:] # Supprimer ```json
if result.endswith("```"):
result = result[:-3] # Supprimer ```
# Supprimer les espaces en début et fin
result = result.strip()
# Vérifier que ça commence par { et finit par }
if not result.startswith('{') or not result.endswith('}'):
logger.info(f"❌ Format JSON invalide. Début: '{result[:50]}...' Fin: '...{result[-50:]}'")
return self._fallback_analysis(text)
# Parser la réponse JSON
analysis_data = json.loads(result)
logger.info("✅ Analyse GPT terminée avec succès")
return analysis_data
except json.JSONDecodeError as e:
logger.info(f"❌ Erreur de parsing JSON GPT: {e}")
return self._fallback_analysis(text)
except Exception as e:
logger.info(f"❌ Erreur lors de l'analyse GPT: {e}")
return self._fallback_analysis(text)
def _fallback_analysis(self, text: str) -> Dict:
"""Analyse de fallback sans GPT"""
logger.info("📊 Utilisation de l'analyse classique...")
# Détection ASR classique
has_asr, asr_pos, asr_context = self.detect_asr_zone_classic(text)
# Extraction sections classique
sections = self.extract_sections_classic(text)
# Classification type classique
doc_type = self.classify_document_type(text, sections)
return {
"document_type": doc_type,
"asr_zones": [{"tag": "<ASR_VOX>", "position": asr_pos, "context": asr_context}] if has_asr else [],
"sections": {section: {"content": "", "has_user_fields": False, "user_fields": []} for section in sections}
}
def detect_asr_zone_classic(self, text: str) -> Tuple[bool, int, str]:
"""
Détection ASR classique (fallback)
Returns:
(has_asr_zone, position, context_around_asr)
"""
asr_patterns = [
r"<ASR_VOX>",
r"<ASR>",
r"\[DICTEE\]",
r"\[ASR\]",
r"<!-- ASR -->"
]
for pattern in asr_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
position = match.start()
start_context = max(0, position - 200)
end_context = min(len(text), position + 200)
context = text[start_context:end_context]
return True, position, context
return False, -1, ""
def extract_sections_classic(self, text: str) -> List[str]:
"""Extraction de sections classique (fallback)"""
sections = set()
section_patterns = [
r"([A-ZÉÈÀÇÊ][A-ZÉÈÀÇÊ\s]{2,}):",
r"([A-ZÉÈÀÇÊ][a-zéèàçê\s]{3,}):",
r"(\d+\.\s*[A-ZÉÈÀÇÊ][a-zéèàçê\s]{3,}):",
]
for pattern in section_patterns:
matches = re.findall(pattern, text, re.MULTILINE)
for match in matches:
section = match.strip().rstrip(':').strip()
if len(section) > 2 and len(section) < 50:
sections.add(section)
return sorted(list(sections))
def classify_document_type(self, text: str, sections: List[str]) -> str:
"""Classifie le type de document basé sur le contenu et les sections"""
text_lower = text.lower()
sections_lower = [s.lower() for s in sections]
all_text = text_lower + " " + " ".join(sections_lower)
max_score = 0
best_type = "autre"
for doc_type, keywords in self.document_types.items():
score = 0
for keyword in keywords:
if keyword.lower() in all_text:
score += 1
if doc_type == "compte_rendu_imagerie" and any("technique" in s for s in sections_lower):
score += 2
if score > max_score:
max_score = score
best_type = doc_type
return best_type
def extract_doctor_name(self, text: str) -> str:
"""Extrait le nom du médecin du template"""
doctor_patterns = [
r"Dr\.?\s+([A-ZÉÈÀÇÊ][a-zéèàçê]+\s+[A-ZÉÈÀÇÊ][a-zéèàçê]+)",
r"Docteur\s+([A-ZÉÈÀÇÊ][a-zéèàçê]+\s+[A-ZÉÈÀÇÊ][a-zéèàçê]+)",
r"Praticien\s*:\s*([A-ZÉÈÀÇÊ][a-zéèàçê]+\s+[A-ZÉÈÀÇÊ][a-zéèàçê]+)",
]
for pattern in doctor_patterns:
match = re.search(pattern, text)
if match:
return match.group(1).strip()
return "Non spécifié"
def parse_template(self, filepath: str, template_id: str = None) -> TemplateInfo:
"""
Parse un template et extrait toutes les informations avec GPT
Args:
filepath: Chemin vers le fichier template
template_id: ID unique du template (optionnel)
Returns:
TemplateInfo: Informations structurées du template
"""
if template_id is None:
template_id = Path(filepath).stem
logger.info(f"\n📄 Traitement du fichier: {os.path.basename(filepath)}")
# Extraire le texte selon le type de fichier
if filepath.endswith('.docx'):
text, _ = self.extract_text_from_docx(filepath) # Utiliser la méthode existante
else:
with open(filepath, 'r', encoding='utf-8') as f:
text = f.read()
if not text.strip():
logger.info("❌ Aucun texte extrait du fichier")
return None
# Analyser avec GPT
analysis_data = self.analyze_document_with_gpt(text)
# Extraire les informations de l'analyse GPT
doc_type = analysis_data.get("document_type", "autre")
sections_data = analysis_data.get("sections", {})
asr_zones = analysis_data.get("asr_zones", [])
logger.info(f"📋 Type de document détecté: {doc_type}")
logger.info(f"🔍 Zones ASR trouvées: {len(asr_zones)}")
logger.info(f"📑 Sections détectées: {len(sections_data)}")
# Déterminer les informations ASR
has_asr = len(asr_zones) > 0
asr_pos = asr_zones[0]["position"] if asr_zones else -1
asr_context = asr_zones[0]["context"] if asr_zones else ""
# Extraire les sections détectées
detected_sections = list(sections_data.keys())
# Collecter tous les champs utilisateur
user_fields = []
for section_data in sections_data.values():
if isinstance(section_data, dict) and section_data.get("has_user_fields"):
user_fields.extend(section_data.get("user_fields", []))
# Extraire le nom du médecin
medecin = self.extract_doctor_name(text)
logger.info(f"👨‍⚕️ Médecin détecté: {medecin}")
# Créer le texte pour l'embedding
embedding_text = self.create_embedding_text(text, asr_context, detected_sections, doc_type)
# Générer l'embedding
embedding = self.model.encode([embedding_text])[0]
# Créer l'objet TemplateInfo
template_info = TemplateInfo(
id=template_id,
type=doc_type,
has_asr_zone=has_asr,
asr_tag_position=asr_pos,
detected_sections=detected_sections,
medecin=medecin,
embedding=embedding,
filepath=filepath,
content=text,
asr_context=asr_context,
sections_data=sections_data,
user_fields=user_fields
)
logger.info(f"✅ Template {template_id} traité avec succès")
return template_info
def create_embedding_text(self, text: str, asr_context: str, sections: List[str], doc_type: str) -> str:
"""
Crée le texte optimisé pour l'embedding
Args:
text: Texte complet du template
asr_context: Contexte autour de la zone ASR
sections: Sections détectées
doc_type: Type de document
Returns:
str: Texte optimisé pour l'embedding
"""
lines = text.split('\n')
header = ' '.join(lines[:5])
embedding_parts = [
f"Type: {doc_type}",
f"Sections: {', '.join(sections[:5])}",
f"Contexte: {header[:200]}",
]
if asr_context:
embedding_parts.append(f"Zone ASR: {asr_context[:100]}")
return ' | '.join(embedding_parts)
def process_docx_folder(self, folder_path: str) -> List[TemplateInfo]:
"""
Traite tous les fichiers DOCX dans un dossier
Args:
folder_path: Chemin vers le dossier contenant les fichiers DOCX
Returns:
List[TemplateInfo]: Liste des templates traités
"""
logger.info(f"🗂️ Traitement du dossier: {folder_path}")
# Chercher tous les fichiers DOCX
docx_files = glob.glob(os.path.join(folder_path, "*.docx"))
if not docx_files:
logger.info("❌ Aucun fichier DOCX trouvé dans le dossier")
return []
logger.info(f"📁 {len(docx_files)} fichiers DOCX trouvés")
templates = []
for i, filepath in enumerate(docx_files, 1):
logger.info(f"\n{'='*60}")
logger.info(f"📄 Fichier {i}/{len(docx_files)}: {os.path.basename(filepath)}")
logger.info(f"{'='*60}")
try:
template_info = self.parse_template(filepath)
if template_info:
templates.append(template_info)
self.templates[template_info.id] = template_info
except Exception as e:
logger.info(f"❌ Erreur lors du traitement de {filepath}: {e}")
continue
logger.info(f"\n🎉 Traitement terminé: {len(templates)} templates traités avec succès")
return templates
def build_vector_database(self, templates: List[TemplateInfo]):
"""
Construit la base vectorielle avec FAISS
Args:
templates: Liste des templates parsés
"""
if not templates:
logger.info("❌ Aucun template fourni pour construire la base vectorielle")
return
logger.info(f"🔧 Construction de la base vectorielle avec {len(templates)} templates...")
embeddings = np.array([template.embedding for template in templates])
dimension = embeddings.shape[1]
self.vector_index = faiss.IndexFlatIP(dimension)
faiss.normalize_L2(embeddings)
self.vector_index.add(embeddings)
self.template_ids = [template.id for template in templates]
logger.info(f"✅ Base vectorielle construite avec succès")
def search_similar_templates(self, query_text: str, k: int = 5) -> List[Tuple[str, float]]:
"""
Recherche les templates similaires à une requête
Args:
query_text: Texte de la requête
k: Nombre de résultats à retourner
Returns:
List[Tuple[str, float]]: Liste des (template_id, score) les plus similaires
"""
if self.vector_index is None:
logger.info("❌ Base vectorielle non construite")
return []
logger.info(f"🔍 Recherche pour: '{query_text}'")
query_embedding = self.model.encode([query_text])
faiss.normalize_L2(query_embedding)
scores, indices = self.vector_index.search(query_embedding, k)
results = []
for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
if idx < len(self.template_ids):
template_id = self.template_ids[idx]
results.append((template_id, float(score)))
return results
def save_database(self, filepath: str):
"""Sauvegarde la base vectorielle et les templates"""
logger.info(f"💾 Sauvegarde de la base de données...")
database_data = {
'templates': self.templates,
'template_ids': self.template_ids,
'model_name': self.model.get_sentence_embedding_dimension()
}
with open(filepath, 'wb') as f:
pickle.dump(database_data, f)
if self.vector_index is not None:
faiss.write_index(self.vector_index, filepath.replace('.pkl', '.faiss'))
logger.info(f"✅ Base de données sauvegardée dans {filepath}")
def load_database(self, filepath: str):
"""Charge la base vectorielle et les templates"""
logger.info(f"📂 Chargement de la base de données depuis {filepath}...")
with open(filepath, 'rb') as f:
database_data = pickle.load(f)
self.templates = database_data['templates']
self.template_ids = database_data['template_ids']
faiss_path = filepath.replace('.pkl', '.faiss')
if Path(faiss_path).exists():
self.vector_index = faiss.read_index(faiss_path)
logger.info(f"✅ Base de données chargée avec succès")
def get_template_info(self, template_id: str) -> Optional[TemplateInfo]:
"""Récupère les informations d'un template par son ID"""
return self.templates.get(template_id)
def print_template_summary(self, template_id: str):
"""Affiche un résumé des informations d'un template"""
template = self.get_template_info(template_id)
if template:
logger.info(f"\n{'='*60}")
logger.info(f"📋 Template: {template.id}")
logger.info(f"{'='*60}")
logger.info(f"📄 Type: {template.type}")
logger.info(f"👨‍⚕️ Médecin: {template.medecin}")
logger.info(f"🎤 Zone ASR: {'✅ Oui' if template.has_asr_zone else '❌ Non'}")
logger.info(f"📑 Sections détectées ({len(template.detected_sections)}): {', '.join(template.detected_sections)}")
logger.info(f"⚠️ Champs utilisateur ({len(template.user_fields)}): {', '.join(template.user_fields[:3])}{'...' if len(template.user_fields) > 3 else ''}")
logger.info(f"📁 Fichier: {os.path.basename(template.filepath)}")
logger.info(f"{'='*60}")
# Afficher les détails des sections
for section_name, section_data in template.sections_data.items():
if isinstance(section_data, dict):
logger.info(f"📋 Section: {section_name}")
if section_data.get("has_user_fields"):
fields = section_data.get('user_fields', [])
logger.info(f" ⚠️ Champs à remplir: {', '.join(fields)}")
else:
logger.info(f" ✅ Section complète")
def print_global_summary(self):
"""Affiche un résumé global de tous les templates"""
logger.info(f"\n{'='*80}")
logger.info(f"📊 RÉSUMÉ GLOBAL - {len(self.templates)} TEMPLATES TRAITÉS")
logger.info(f"{'='*80}")
# Statistiques par type
types_count = {}
asr_count = 0
total_sections = 0
total_user_fields = 0
for template in self.templates.values():
types_count[template.type] = types_count.get(template.type, 0) + 1
if template.has_asr_zone:
asr_count += 1
total_sections += len(template.detected_sections)
total_user_fields += len(template.user_fields)
logger.info(f"📈 Statistiques générales:")
logger.info(f" - Total templates: {len(self.templates)}")
logger.info(f" - Templates avec ASR: {asr_count}")
logger.info(f" - Total sections: {total_sections}")
logger.info(f" - Total champs utilisateur: {total_user_fields}")
logger.info(f"\n📊 Répartition par type:")
for doc_type, count in types_count.items():
logger.info(f" - {doc_type}: {count}")
logger.info(f"\n📋 Templates individuels:")
for template_id in sorted(self.templates.keys()):
template = self.templates[template_id]
asr_icon = "🎤" if template.has_asr_zone else "❌"
logger.info(f" {asr_icon} {template_id} ({template.type}) - {len(template.detected_sections)} sections - {template.medecin}")
def main():
"""Fonction principale pour traiter un dossier de fichiers docx"""
# Chemin vers le dossier contenant les fichiers docx
docx_folder = input("Entrez le chemin vers le dossier contenant les fichiers docx: ").strip()
if not os.path.exists(docx_folder):
logger.info(f"❌ Le dossier {docx_folder} n'existe pas")
return
logger.info(f"\n🚀 Démarrage du traitement des fichiers docx...")
logger.info(f"📁 Dossier source: {docx_folder}")
# Initialiser le parser
parser = MedicalTemplateParser()
# CORRECTION: Utiliser process_docx_folder au lieu de extract_text_from_docx
templates = parser.process_docx_folder(docx_folder)
if not templates:
logger.info("❌ Aucun template traité avec succès")
return
# Construire la base vectorielle
parser.build_vector_database(templates)
# Afficher le résumé global
parser.print_global_summary()
# Afficher les détails de chaque template
logger.info(f"\n{'='*80}")
logger.info(f"📄 DÉTAILS DES TEMPLATES")
logger.info(f"{'='*80}")
for template_id in sorted(parser.templates.keys()):
parser.print_template_summary(template_id)
# Tester la recherche
logger.info(f"\n{'='*80}")
logger.info(f"🔍 TEST DE RECHERCHE")
logger.info(f"{'='*80}")
test_queries = [
"échographie abdominale",
"scanner thoracique",
"compte rendu imagerie",
"résultats laboratoire"
]
for query in test_queries:
logger.info(f"\n🔍 Recherche pour: '{query}'")
results = parser.search_similar_templates(query, k=3)
if results:
logger.info("📊 Résultats:")
for i, (template_id, score) in enumerate(results, 1):
template = parser.get_template_info(template_id)
logger.info(f" {i}. {template_id} (score: {score:.3f}) - {template.type} - {template.medecin}")
else:
logger.info("❌ Aucun résultat trouvé")
# Sauvegarder la base
save_path = os.path.join(docx_folder, 'medical_templates.pkl')
parser.save_database(save_path)
logger.info(f"\n✅ Traitement terminé avec succès!")
logger.info(f"💾 Base de données sauvegardée: {save_path}")
if __name__ == "__main__":
main()