#!/usr/bin/env python3 """ Complete Pipeline Test Tests the full pipeline including Langfuse transcription download """ import os import sys import time from pathlib import Path from datetime import datetime # Add the current directory to Python path sys.path.append(os.path.dirname(os.path.abspath(__file__))) def test_complete_pipeline(): """Test the complete pipeline including Langfuse transcription download.""" print("๐Ÿฅ Complete Medical Document Pipeline Test") print("=" * 70) print("This test will:") print("1. Download transcriptions from Langfuse") print("2. Run the complete document processing pipeline") print("3. Validate the results") print("=" * 70) # Step 1: Download transcriptions from Langfuse print("\n๐Ÿ“ฅ Step 1: Downloading transcriptions from Langfuse...") try: from medical_transcription_retriever import MedicalTranscriptionRetriever retriever = MedicalTranscriptionRetriever() saved_files = retriever.run( limit=5, save_to_file=True, save_by_user=True) if not saved_files: print("โŒ No transcriptions downloaded from Langfuse") print("Please check your Langfuse configuration and try again") return None print( f"โœ… Successfully downloaded transcriptions: {len(saved_files)} files") except Exception as e: print(f"โŒ Error downloading transcriptions: {e}") print("Continuing with existing transcriptions if available...") # Step 2: Check if we have transcription files transcriptions_dir = "transcriptions" if not os.path.exists(transcriptions_dir): print(f"โŒ Transcriptions directory not found: {transcriptions_dir}") return None transcription_files = list(Path(transcriptions_dir).glob("*.json")) if not transcription_files: print(f"โŒ No transcription files found in {transcriptions_dir}") return None print(f"๐Ÿ“ Found {len(transcription_files)} transcription files") # Step 3: Test with the first transcription file first_transcription = transcription_files[0] print(f"๐Ÿ“„ Using transcription file: {first_transcription.name}") try: # Step 4: Initialize the orchestrator print( "\n๐Ÿš€ Step 2: Initializing orchestrator with automatic SFTP model detection...") from langchain_medical_agents_refactored import MedicalDocumentOrchestrator orchestrator = MedicalDocumentOrchestrator( template_path=None, # Let the SFTP agent find the template transcription_path=str(first_transcription), transcriptions_dir=transcriptions_dir ) # Step 5: Run the complete pipeline print("\n๐Ÿ”„ Step 3: Running complete pipeline...") print("This will include:") print(" ๐Ÿ“ฅ Step 0: SFTP Download (.rtf โ†’ .doc) - AUTOMATIC MODEL DETECTION") print(" ๐Ÿ“‹ Step 1: Template Analysis") print(" โœ๏ธ Step 2: Transcription Correction") print(" ๐Ÿ”ฌ Step 3: Medical Data Analysis") print(" ๐Ÿ“ Step 4: Title Generation") print(" ๐Ÿ“ Step 5: Section Generation") print(" ๐Ÿ“„ Step 6: Document Assembly") print(" ๐Ÿ“‹ Step 7: Validation") start_time = time.time() output_file = orchestrator.run_full_pipeline() end_time = time.time() execution_time = end_time - start_time print(f"\nโฑ๏ธ Pipeline execution time: {execution_time:.2f} seconds") print(f"\n๐ŸŽ‰ Pipeline completed successfully!") print(f"๐Ÿ“„ Output file: {output_file}") # Step 6: Show SFTP download summary if orchestrator.downloaded_models: successful_downloads = [ m for m in orchestrator.downloaded_models if m['status'] == 'success'] failed_downloads = [ m for m in orchestrator.downloaded_models if m['status'] == 'error'] print(f"\n๐Ÿ“ฅ SFTP Download Summary:") print( f" โœ… Successfully downloaded: {len(successful_downloads)} models") print(f" โŒ Failed downloads: {len(failed_downloads)} models") if successful_downloads: print(" ๐Ÿ“ Downloaded models:") for model in successful_downloads[:5]: # Show first 5 print( f" - {model['model_id']}: {model['local_filename']}") if len(successful_downloads) > 5: print(f" ... and {len(successful_downloads) - 5} more") # Step 7: Verify output file exists if os.path.exists(output_file): file_size = os.path.getsize(output_file) print(f"\nโœ… Output file verified:") print(f" ๐Ÿ“„ File: {output_file}") print(f" ๐Ÿ“ Size: {file_size} bytes") # Check if file is readable try: from docx import Document doc = Document(output_file) paragraph_count = len(doc.paragraphs) print(f" ๐Ÿ“ Paragraphs: {paragraph_count}") print(f" โœ… Document is readable and valid") except Exception as e: print(f" โš ๏ธ Document validation failed: {e}") else: print(f"\nโŒ Output file not found: {output_file}") return output_file except Exception as e: print(f"โŒ Error running pipeline: {str(e)}") import traceback traceback.print_exc() return None def cleanup_test_files(): """Clean up test files after testing.""" print("\n๐Ÿงน Cleaning up test files...") # Remove generated documents for file in Path("./transcriptions").glob("*.json"): try: os.remove(file) print(f"๐Ÿ—‘๏ธ Removed: {file}") except Exception as e: print(f"โš ๏ธ Could not remove {file}: {e}") for file in Path("./").glob("*.docx"): try: os.remove(file) print(f"๐Ÿ—‘๏ธ Removed: {file}") except Exception as e: print(f"โš ๏ธ Could not remove {file}: {e}") for file in Path("./").glob("*.json"): try: os.remove(file) print(f"๐Ÿ—‘๏ธ Removed: {file}") except Exception as e: print(f"โš ๏ธ Could not remove {file}: {e}") # Remove downloaded models models_dir = "models" if os.path.exists(models_dir): for file in Path(models_dir).glob("*.doc"): try: os.remove(file) print(f"๐Ÿ—‘๏ธ Removed: {file}") except Exception as e: print(f"โš ๏ธ Could not remove {file}: {e}") def main(): """Main test function.""" print("๐Ÿงช Complete Pipeline Test with Langfuse Integration") print("=" * 70) # Check if we're in the right directory if not os.path.exists("transcriptions"): print("โŒ Please run this script from the project root directory") print(" (where the 'transcriptions' folder is located)") return # Show current configuration try: from sftp_config import print_sftp_config print_sftp_config() except ImportError: print("โš ๏ธ SFTP config not available") # Run the complete pipeline test result = test_complete_pipeline() if result: print(f"\n๐ŸŽ‰ Complete pipeline test completed successfully!") print(f"๐Ÿ“„ Generated document: {result}") # Ask if user wants to clean up cleanup = input( "\n๐Ÿงน Do you want to clean up test files? (y/n): ").lower().strip() if cleanup in ['y', 'yes']: cleanup_test_files() else: print(f"\nโŒ Complete pipeline test failed") if __name__ == "__main__": main()