pipeline2 / test_complete_pipeline.py
Nourhenem's picture
initial commit
f92da22 verified
#!/usr/bin/env python3
"""
Complete Pipeline Test
Tests the full pipeline including Langfuse transcription download
"""
import os
import sys
import time
from pathlib import Path
from datetime import datetime
# Add the current directory to Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def test_complete_pipeline():
"""Test the complete pipeline including Langfuse transcription download."""
print("πŸ₯ Complete Medical Document Pipeline Test")
print("=" * 70)
print("This test will:")
print("1. Download transcriptions from Langfuse")
print("2. Run the complete document processing pipeline")
print("3. Validate the results")
print("=" * 70)
# Step 1: Download transcriptions from Langfuse
print("\nπŸ“₯ Step 1: Downloading transcriptions from Langfuse...")
try:
from medical_transcription_retriever import MedicalTranscriptionRetriever
retriever = MedicalTranscriptionRetriever()
saved_files = retriever.run(
limit=5, save_to_file=True, save_by_user=True)
if not saved_files:
print("❌ No transcriptions downloaded from Langfuse")
print("Please check your Langfuse configuration and try again")
return None
print(
f"βœ… Successfully downloaded transcriptions: {len(saved_files)} files")
except Exception as e:
print(f"❌ Error downloading transcriptions: {e}")
print("Continuing with existing transcriptions if available...")
# Step 2: Check if we have transcription files
transcriptions_dir = "transcriptions"
if not os.path.exists(transcriptions_dir):
print(f"❌ Transcriptions directory not found: {transcriptions_dir}")
return None
transcription_files = list(Path(transcriptions_dir).glob("*.json"))
if not transcription_files:
print(f"❌ No transcription files found in {transcriptions_dir}")
return None
print(f"πŸ“ Found {len(transcription_files)} transcription files")
# Step 3: Test with the first transcription file
first_transcription = transcription_files[0]
print(f"πŸ“„ Using transcription file: {first_transcription.name}")
try:
# Step 4: Initialize the orchestrator
print(
"\nπŸš€ Step 2: Initializing orchestrator with automatic SFTP model detection...")
from langchain_medical_agents_refactored import MedicalDocumentOrchestrator
orchestrator = MedicalDocumentOrchestrator(
template_path=None, # Let the SFTP agent find the template
transcription_path=str(first_transcription),
transcriptions_dir=transcriptions_dir
)
# Step 5: Run the complete pipeline
print("\nπŸ”„ Step 3: Running complete pipeline...")
print("This will include:")
print(" πŸ“₯ Step 0: SFTP Download (.rtf β†’ .doc) - AUTOMATIC MODEL DETECTION")
print(" πŸ“‹ Step 1: Template Analysis")
print(" ✏️ Step 2: Transcription Correction")
print(" πŸ”¬ Step 3: Medical Data Analysis")
print(" πŸ“ Step 4: Title Generation")
print(" πŸ“ Step 5: Section Generation")
print(" πŸ“„ Step 6: Document Assembly")
print(" πŸ“‹ Step 7: Validation")
start_time = time.time()
output_file = orchestrator.run_full_pipeline()
end_time = time.time()
execution_time = end_time - start_time
print(f"\n⏱️ Pipeline execution time: {execution_time:.2f} seconds")
print(f"\nπŸŽ‰ Pipeline completed successfully!")
print(f"πŸ“„ Output file: {output_file}")
# Step 6: Show SFTP download summary
if orchestrator.downloaded_models:
successful_downloads = [
m for m in orchestrator.downloaded_models if m['status'] == 'success']
failed_downloads = [
m for m in orchestrator.downloaded_models if m['status'] == 'error']
print(f"\nπŸ“₯ SFTP Download Summary:")
print(
f" βœ… Successfully downloaded: {len(successful_downloads)} models")
print(f" ❌ Failed downloads: {len(failed_downloads)} models")
if successful_downloads:
print(" πŸ“ Downloaded models:")
for model in successful_downloads[:5]: # Show first 5
print(
f" - {model['model_id']}: {model['local_filename']}")
if len(successful_downloads) > 5:
print(f" ... and {len(successful_downloads) - 5} more")
# Step 7: Verify output file exists
if os.path.exists(output_file):
file_size = os.path.getsize(output_file)
print(f"\nβœ… Output file verified:")
print(f" πŸ“„ File: {output_file}")
print(f" πŸ“ Size: {file_size} bytes")
# Check if file is readable
try:
from docx import Document
doc = Document(output_file)
paragraph_count = len(doc.paragraphs)
print(f" πŸ“ Paragraphs: {paragraph_count}")
print(f" βœ… Document is readable and valid")
except Exception as e:
print(f" ⚠️ Document validation failed: {e}")
else:
print(f"\n❌ Output file not found: {output_file}")
return output_file
except Exception as e:
print(f"❌ Error running pipeline: {str(e)}")
import traceback
traceback.print_exc()
return None
def cleanup_test_files():
"""Clean up test files after testing."""
print("\n🧹 Cleaning up test files...")
# Remove generated documents
for file in Path("./transcriptions").glob("*.json"):
try:
os.remove(file)
print(f"πŸ—‘οΈ Removed: {file}")
except Exception as e:
print(f"⚠️ Could not remove {file}: {e}")
for file in Path("./").glob("*.docx"):
try:
os.remove(file)
print(f"πŸ—‘οΈ Removed: {file}")
except Exception as e:
print(f"⚠️ Could not remove {file}: {e}")
for file in Path("./").glob("*.json"):
try:
os.remove(file)
print(f"πŸ—‘οΈ Removed: {file}")
except Exception as e:
print(f"⚠️ Could not remove {file}: {e}")
# Remove downloaded models
models_dir = "models"
if os.path.exists(models_dir):
for file in Path(models_dir).glob("*.doc"):
try:
os.remove(file)
print(f"πŸ—‘οΈ Removed: {file}")
except Exception as e:
print(f"⚠️ Could not remove {file}: {e}")
def main():
"""Main test function."""
print("πŸ§ͺ Complete Pipeline Test with Langfuse Integration")
print("=" * 70)
# Check if we're in the right directory
if not os.path.exists("transcriptions"):
print("❌ Please run this script from the project root directory")
print(" (where the 'transcriptions' folder is located)")
return
# Show current configuration
try:
from sftp_config import print_sftp_config
print_sftp_config()
except ImportError:
print("⚠️ SFTP config not available")
# Run the complete pipeline test
result = test_complete_pipeline()
if result:
print(f"\nπŸŽ‰ Complete pipeline test completed successfully!")
print(f"πŸ“„ Generated document: {result}")
# Ask if user wants to clean up
cleanup = input(
"\n🧹 Do you want to clean up test files? (y/n): ").lower().strip()
if cleanup in ['y', 'yes']:
cleanup_test_files()
else:
print(f"\n❌ Complete pipeline test failed")
if __name__ == "__main__":
main()