|
|
|
|
|
""" |
|
|
Complete Pipeline Test |
|
|
Tests the full pipeline including Langfuse transcription download |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import time |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__))) |
|
|
|
|
|
|
|
|
def test_complete_pipeline(): |
|
|
"""Test the complete pipeline including Langfuse transcription download.""" |
|
|
print("π₯ Complete Medical Document Pipeline Test") |
|
|
print("=" * 70) |
|
|
print("This test will:") |
|
|
print("1. Download transcriptions from Langfuse") |
|
|
print("2. Run the complete document processing pipeline") |
|
|
print("3. Validate the results") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
print("\nπ₯ Step 1: Downloading transcriptions from Langfuse...") |
|
|
try: |
|
|
from medical_transcription_retriever import MedicalTranscriptionRetriever |
|
|
|
|
|
retriever = MedicalTranscriptionRetriever() |
|
|
saved_files = retriever.run( |
|
|
limit=5, save_to_file=True, save_by_user=True) |
|
|
|
|
|
if not saved_files: |
|
|
print("β No transcriptions downloaded from Langfuse") |
|
|
print("Please check your Langfuse configuration and try again") |
|
|
return None |
|
|
|
|
|
print( |
|
|
f"β
Successfully downloaded transcriptions: {len(saved_files)} files") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error downloading transcriptions: {e}") |
|
|
print("Continuing with existing transcriptions if available...") |
|
|
|
|
|
|
|
|
transcriptions_dir = "transcriptions" |
|
|
if not os.path.exists(transcriptions_dir): |
|
|
print(f"β Transcriptions directory not found: {transcriptions_dir}") |
|
|
return None |
|
|
|
|
|
transcription_files = list(Path(transcriptions_dir).glob("*.json")) |
|
|
if not transcription_files: |
|
|
print(f"β No transcription files found in {transcriptions_dir}") |
|
|
return None |
|
|
|
|
|
print(f"π Found {len(transcription_files)} transcription files") |
|
|
|
|
|
|
|
|
first_transcription = transcription_files[0] |
|
|
print(f"π Using transcription file: {first_transcription.name}") |
|
|
|
|
|
try: |
|
|
|
|
|
print( |
|
|
"\nπ Step 2: Initializing orchestrator with automatic SFTP model detection...") |
|
|
from langchain_medical_agents_refactored import MedicalDocumentOrchestrator |
|
|
|
|
|
orchestrator = MedicalDocumentOrchestrator( |
|
|
template_path=None, |
|
|
transcription_path=str(first_transcription), |
|
|
transcriptions_dir=transcriptions_dir |
|
|
) |
|
|
|
|
|
|
|
|
print("\nπ Step 3: Running complete pipeline...") |
|
|
print("This will include:") |
|
|
print(" π₯ Step 0: SFTP Download (.rtf β .doc) - AUTOMATIC MODEL DETECTION") |
|
|
print(" π Step 1: Template Analysis") |
|
|
print(" βοΈ Step 2: Transcription Correction") |
|
|
print(" π¬ Step 3: Medical Data Analysis") |
|
|
print(" π Step 4: Title Generation") |
|
|
print(" π Step 5: Section Generation") |
|
|
print(" π Step 6: Document Assembly") |
|
|
print(" π Step 7: Validation") |
|
|
|
|
|
start_time = time.time() |
|
|
output_file = orchestrator.run_full_pipeline() |
|
|
end_time = time.time() |
|
|
|
|
|
execution_time = end_time - start_time |
|
|
print(f"\nβ±οΈ Pipeline execution time: {execution_time:.2f} seconds") |
|
|
|
|
|
print(f"\nπ Pipeline completed successfully!") |
|
|
print(f"π Output file: {output_file}") |
|
|
|
|
|
|
|
|
if orchestrator.downloaded_models: |
|
|
successful_downloads = [ |
|
|
m for m in orchestrator.downloaded_models if m['status'] == 'success'] |
|
|
failed_downloads = [ |
|
|
m for m in orchestrator.downloaded_models if m['status'] == 'error'] |
|
|
|
|
|
print(f"\nπ₯ SFTP Download Summary:") |
|
|
print( |
|
|
f" β
Successfully downloaded: {len(successful_downloads)} models") |
|
|
print(f" β Failed downloads: {len(failed_downloads)} models") |
|
|
|
|
|
if successful_downloads: |
|
|
print(" π Downloaded models:") |
|
|
for model in successful_downloads[:5]: |
|
|
print( |
|
|
f" - {model['model_id']}: {model['local_filename']}") |
|
|
if len(successful_downloads) > 5: |
|
|
print(f" ... and {len(successful_downloads) - 5} more") |
|
|
|
|
|
|
|
|
if os.path.exists(output_file): |
|
|
file_size = os.path.getsize(output_file) |
|
|
print(f"\nβ
Output file verified:") |
|
|
print(f" π File: {output_file}") |
|
|
print(f" π Size: {file_size} bytes") |
|
|
|
|
|
|
|
|
try: |
|
|
from docx import Document |
|
|
doc = Document(output_file) |
|
|
paragraph_count = len(doc.paragraphs) |
|
|
print(f" π Paragraphs: {paragraph_count}") |
|
|
print(f" β
Document is readable and valid") |
|
|
except Exception as e: |
|
|
print(f" β οΈ Document validation failed: {e}") |
|
|
else: |
|
|
print(f"\nβ Output file not found: {output_file}") |
|
|
|
|
|
return output_file |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error running pipeline: {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
|
|
|
def cleanup_test_files(): |
|
|
"""Clean up test files after testing.""" |
|
|
print("\nπ§Ή Cleaning up test files...") |
|
|
|
|
|
|
|
|
for file in Path("./transcriptions").glob("*.json"): |
|
|
try: |
|
|
os.remove(file) |
|
|
print(f"ποΈ Removed: {file}") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Could not remove {file}: {e}") |
|
|
|
|
|
for file in Path("./").glob("*.docx"): |
|
|
try: |
|
|
os.remove(file) |
|
|
print(f"ποΈ Removed: {file}") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Could not remove {file}: {e}") |
|
|
|
|
|
for file in Path("./").glob("*.json"): |
|
|
try: |
|
|
os.remove(file) |
|
|
print(f"ποΈ Removed: {file}") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Could not remove {file}: {e}") |
|
|
|
|
|
|
|
|
models_dir = "models" |
|
|
if os.path.exists(models_dir): |
|
|
for file in Path(models_dir).glob("*.doc"): |
|
|
try: |
|
|
os.remove(file) |
|
|
print(f"ποΈ Removed: {file}") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Could not remove {file}: {e}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main test function.""" |
|
|
print("π§ͺ Complete Pipeline Test with Langfuse Integration") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
if not os.path.exists("transcriptions"): |
|
|
print("β Please run this script from the project root directory") |
|
|
print(" (where the 'transcriptions' folder is located)") |
|
|
return |
|
|
|
|
|
|
|
|
try: |
|
|
from sftp_config import print_sftp_config |
|
|
print_sftp_config() |
|
|
except ImportError: |
|
|
print("β οΈ SFTP config not available") |
|
|
|
|
|
|
|
|
result = test_complete_pipeline() |
|
|
|
|
|
if result: |
|
|
print(f"\nπ Complete pipeline test completed successfully!") |
|
|
print(f"π Generated document: {result}") |
|
|
|
|
|
|
|
|
cleanup = input( |
|
|
"\nπ§Ή Do you want to clean up test files? (y/n): ").lower().strip() |
|
|
if cleanup in ['y', 'yes']: |
|
|
cleanup_test_files() |
|
|
else: |
|
|
print(f"\nβ Complete pipeline test failed") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|