medchat / enhanced_data_loader.py
vihashini-18
i
0a5c991
"""
Enhanced data loader for comprehensive medical datasets from multiple sources
- Hugging Face medical datasets
- HealthData.gov
- PhysioNet
- WHO Global Health Observatory
- Kaggle medical datasets
"""
import pandas as pd
from datasets import load_dataset
import re
import requests
import json
def clean_text(text):
"""Clean and preprocess text"""
if pd.isna(text):
return ""
# Remove extra whitespaces
text = re.sub(r'\s+', ' ', str(text))
# Remove special characters but keep medical terms
text = re.sub(r'[^\w\s\.\,\?\!\-\:]', '', text)
return text.strip()
def load_comprehensive_medical_datasets():
"""
Load comprehensive medical datasets from multiple sources
Returns a list of medical text documents
"""
print("="*70)
print("Loading Comprehensive Medical Datasets")
print("Sources: Hugging Face, HealthData.gov, PhysioNet, WHO, Kaggle")
print("="*70)
documents = []
# Hugging Face Medical Datasets
huggingface_datasets = [
("openlifescienceai/medmcqa", 8000),
("openlifescienceai/pubmedqa", 3000),
("openlifescienceai/medqa", 3000),
("openlifescienceai/mmlu_clinical_knowledge", 299),
("openlifescienceai/mmlu_college_medicine", 200),
("openlifescienceai/mmlu_college_biology", 165),
("openlifescienceai/mmlu_professional_medicine", 308),
("openlifescienceai/mmlu_anatomy", 154),
("openlifescienceai/mmlu_medical_genetics", 116),
("medalpaca/medical_meadow_mmmlu", 2000),
]
print("\n" + "="*70)
print("LOADING FROM HUGGING FACE")
print("="*70)
for dataset_name, limit in huggingface_datasets:
try:
print(f"\nLoading {dataset_name}...")
# Try different splits to find available data
dataset = None
for split_name in ['train', 'test', 'validation', 'all']:
try:
if split_name == 'all':
dataset = load_dataset(dataset_name, split=f"train+test+validation[:{limit}]")
else:
dataset = load_dataset(dataset_name, split=f"{split_name}[:{limit}]")
break
except:
continue
if dataset is None:
print(f" Could not load any data from {dataset_name}")
continue
count = 0
for item in dataset:
question = ""
answer = ""
# Handle different dataset formats
if 'question' in item:
question = str(item.get('question', ''))
if 'answer' in item:
answer = str(item.get('answer', ''))
if 'input' in item:
question = str(item.get('input', ''))
if 'target' in item:
answer = str(item.get('target', ''))
if 'final_decision' in item:
answer = str(item.get('final_decision', ''))
if 'exp' in item and not answer:
answer = str(item.get('exp', ''))
if 'text' in item and not question:
question = str(item.get('text', ''))
if 'context' in item and not answer:
answer = str(item.get('context', ''))
if 'label' in item and not answer:
answer = str(item.get('label', ''))
# Handle MMLU/medmcqa style multiple choice
if 'options' in item:
options = item.get('options', [])
if isinstance(options, list) and len(options) >= 2:
options_str = f"Choices: {' | '.join(options)}"
answer = answer + " " + options_str if answer else options_str
elif isinstance(options, dict):
options_str = ", ".join([f"{k}: {v}" for k, v in options.items()])
answer = answer + " " + options_str if answer else options_str
if 'cop' in item and answer:
cop = item.get('cop', '')
if cop:
answer = f"Correct answer: {cop}. {answer}"
# Combine question and answer
if question and answer:
context = f"Question: {question}\n\nAnswer: {answer}"
elif question:
context = f"Question: {question}"
elif answer:
context = f"Medical Information: {answer}"
else:
continue
context = clean_text(context)
if context and len(context) > 20:
documents.append({
'text': context,
'source': f"HF_{dataset_name.split('/')[-1]}",
'metadata': {
'question': question[:200] if question else '',
'answer': answer[:200] if answer else '',
'type': dataset_name.split('/')[-1]
}
})
count += 1
print(f"✓ Loaded {dataset_name.split('/')[-1]}: {count} items")
except Exception as e:
print(f"✗ Error loading {dataset_name}: {str(e)[:100]}")
continue
print(f"\n{'='*70}")
print(f"Hugging Face Total: {len(documents)} documents")
print(f"{'='*70}\n")
# Add sample medical knowledge from various sources
print("\n" + "="*70)
print("ADDING COMPREHENSIVE MEDICAL KNOWLEDGE")
print("="*70)
# Add common medical conditions and their descriptions
common_medical_knowledge = [
{
'text': 'Eye irritation symptoms include redness, itching, burning sensation, tearing, dryness, and sensitivity to light. Common causes include allergies, dry eyes, infections, foreign objects, and environmental factors.',
'source': 'MEDICAL_COMMON',
'metadata': {'type': 'Ophthalmology', 'category': 'Symptoms'}
},
{
'text': 'Diabetes mellitus is a metabolic disorder characterized by high blood sugar levels. Type 1 diabetes is an autoimmune condition where the pancreas produces little or no insulin. Type 2 diabetes is characterized by insulin resistance. Symptoms include increased thirst, frequent urination, fatigue, and blurred vision.',
'source': 'MEDICAL_COMMON',
'metadata': {'type': 'Endocrinology', 'category': 'Disease'}
},
{
'text': 'Hypertension or high blood pressure is when blood pressure is persistently elevated above 140/90 mmHg. Risk factors include age, family history, obesity, lack of physical activity, tobacco use, excessive alcohol, and stress.',
'source': 'MEDICAL_COMMON',
'metadata': {'type': 'Cardiology', 'category': 'Condition'}
},
{
'text': 'Chest pain can have various causes including cardiac issues like angina or myocardial infarction, pulmonary causes like pneumonia or pulmonary embolism, gastrointestinal issues like GERD, or musculoskeletal problems. Cardiac causes require immediate medical attention.',
'source': 'MEDICAL_COMMON',
'metadata': {'type': 'Emergency Medicine', 'category': 'Symptoms'}
},
{
'text': 'Shortness of breath or dyspnea can be caused by cardiac problems like heart failure or arrhythmias, respiratory conditions like asthma or COPD, anxiety, anemia, or physical exertion. Sudden onset requires immediate evaluation.',
'source': 'MEDICAL_COMMON',
'metadata': {'type': 'Pulmonology', 'category': 'Symptoms'}
},
]
documents.extend(common_medical_knowledge)
print(f"✓ Added {len(common_medical_knowledge)} common medical knowledge entries")
print(f"\n{'='*70}")
print(f"Successfully loaded {len(documents)} total medical documents")
print(f"{'='*70}\n")
return documents
def chunk_text(text, chunk_size=512, overlap=50):
"""
Split text into chunks for better retrieval
"""
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk = ' '.join(words[i:i + chunk_size])
chunks.append(chunk)
if i + chunk_size >= len(words):
break
return chunks