|
|
|
|
|
""" |
|
|
Medical Transcription Retriever from Langfuse |
|
|
Retrieves medical transcriptions from Langfuse traces and saves them locally. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import time |
|
|
from datetime import datetime, timedelta |
|
|
from dotenv import load_dotenv |
|
|
from langfuse import Langfuse |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
class MedicalTranscriptionRetriever: |
|
|
"""Retrieves medical transcriptions from Langfuse traces.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the retriever with Langfuse credentials.""" |
|
|
self.public_key = os.getenv('LANGFUSE_PUBLIC_KEY') |
|
|
self.secret_key = os.getenv('LANGFUSE_SECRET_KEY') |
|
|
self.host = os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com') |
|
|
|
|
|
if not self.public_key or not self.secret_key: |
|
|
raise ValueError("Missing Langfuse keys in .env file") |
|
|
|
|
|
self.client = Langfuse( |
|
|
public_key=self.public_key, |
|
|
secret_key=self.secret_key, |
|
|
host=self.host |
|
|
) |
|
|
|
|
|
def extract_transcription_from_input(self, input_data): |
|
|
"""Extract transcription from document input data.""" |
|
|
if isinstance(input_data, str): |
|
|
if "Voici le document:" in input_data: |
|
|
parts = input_data.split("Voici le document:") |
|
|
if len(parts) > 1: |
|
|
return parts[1].strip() |
|
|
|
|
|
elif isinstance(input_data, dict): |
|
|
|
|
|
if 'messages' in input_data: |
|
|
for message in input_data['messages']: |
|
|
if isinstance(message, dict) and message.get('role') == 'user': |
|
|
content = message.get('content', '') |
|
|
if isinstance(content, str) and "Voici le document:" in content: |
|
|
parts = content.split("Voici le document:") |
|
|
if len(parts) > 1: |
|
|
return parts[1].strip() |
|
|
|
|
|
|
|
|
for key, value in input_data.items(): |
|
|
if isinstance(value, str) and "Voici le document:" in value: |
|
|
parts = value.split("Voici le document:") |
|
|
if len(parts) > 1: |
|
|
return parts[1].strip() |
|
|
|
|
|
elif isinstance(input_data, list): |
|
|
for message in input_data: |
|
|
if isinstance(message, dict): |
|
|
content = message.get('content', '') |
|
|
if isinstance(content, str) and "Voici le document:" in content: |
|
|
parts = content.split("Voici le document:") |
|
|
if len(parts) > 1: |
|
|
return parts[1].strip() |
|
|
|
|
|
return None |
|
|
|
|
|
def get_traces_with_transcriptions(self, limit=50, days_back=7): |
|
|
"""Retrieve traces containing medical transcriptions.""" |
|
|
print(f"π Searching for transcriptions in the last {limit} traces...") |
|
|
|
|
|
try: |
|
|
|
|
|
traces = self.client.get_traces(limit=limit) |
|
|
print(f"β
{len(traces.data)} traces retrieved") |
|
|
|
|
|
transcriptions = [] |
|
|
|
|
|
for i, trace in enumerate(traces.data): |
|
|
print( |
|
|
f"π Analyzing trace {i+1}/{len(traces.data)}: {trace.id}") |
|
|
|
|
|
try: |
|
|
|
|
|
if hasattr(trace, 'input') and trace.input is not None: |
|
|
transcription = self.extract_transcription_from_input( |
|
|
trace.input) |
|
|
|
|
|
if transcription: |
|
|
trans_info = { |
|
|
'trace_id': trace.id, |
|
|
'trace_name': trace.name, |
|
|
'user_id': trace.user_id, |
|
|
'trace_timestamp': trace.timestamp.isoformat() if trace.timestamp else None, |
|
|
'transcription': transcription, |
|
|
'extracted_at': datetime.now().isoformat() |
|
|
} |
|
|
transcriptions.append(trans_info) |
|
|
print(f" β
Transcription found and extracted!") |
|
|
else: |
|
|
print(f" β No transcription found in trace.input") |
|
|
else: |
|
|
print(f" β οΈ No input available for this trace") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" β οΈ Error analyzing trace {trace.id}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
if i < len(traces.data) - 1: |
|
|
time.sleep(1) |
|
|
|
|
|
print(f"\nπ Summary: {len(transcriptions)} transcriptions found") |
|
|
return transcriptions |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error retrieving traces: {e}") |
|
|
return [] |
|
|
|
|
|
def save_transcriptions(self, transcriptions, filename=None): |
|
|
"""Save transcriptions to a JSON file.""" |
|
|
if not filename: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"medical_transcriptions_{timestamp}.json" |
|
|
|
|
|
try: |
|
|
|
|
|
transcription_texts = [trans['transcription'] |
|
|
for trans in transcriptions] |
|
|
concatenated_transcription = "\n\n".join(transcription_texts) |
|
|
|
|
|
|
|
|
data_to_save = { |
|
|
"extracted_at": datetime.now().isoformat(), |
|
|
"total_transcriptions": len(transcriptions), |
|
|
"transcription": concatenated_transcription |
|
|
} |
|
|
|
|
|
with open(filename, 'w', encoding='utf-8') as f: |
|
|
json.dump(data_to_save, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
print(f"πΎ Transcriptions saved to {filename}") |
|
|
return filename |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error during save: {e}") |
|
|
return None |
|
|
|
|
|
def save_transcriptions_by_user(self, transcriptions): |
|
|
"""Save transcriptions by user in separate files.""" |
|
|
if not transcriptions: |
|
|
print("π No transcriptions to save") |
|
|
return |
|
|
|
|
|
|
|
|
transcriptions_dir = "transcriptions" |
|
|
if not os.path.exists(transcriptions_dir): |
|
|
os.makedirs(transcriptions_dir) |
|
|
print(f"π Directory '{transcriptions_dir}' created") |
|
|
|
|
|
|
|
|
user_transcriptions = {} |
|
|
for trans in transcriptions: |
|
|
user_id = trans.get('user_id', 'unknown') |
|
|
if user_id not in user_transcriptions: |
|
|
user_transcriptions[user_id] = [] |
|
|
user_transcriptions[user_id].append(trans) |
|
|
|
|
|
|
|
|
saved_files = [] |
|
|
for user_id, user_trans in user_transcriptions.items(): |
|
|
|
|
|
if '.rtf' not in user_id: |
|
|
print(f"βοΈ Skipped {user_id} (no .rtf)") |
|
|
continue |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"transcriptions_{user_id}_{timestamp}.json" |
|
|
filepath = os.path.join(transcriptions_dir, filename) |
|
|
|
|
|
try: |
|
|
|
|
|
transcription_texts = [trans['transcription'] |
|
|
for trans in user_trans] |
|
|
concatenated_transcription = "\n\n".join(transcription_texts) |
|
|
|
|
|
|
|
|
data_to_save = { |
|
|
"user_id": user_id, |
|
|
"extracted_at": datetime.now().isoformat(), |
|
|
"total_transcriptions": len(user_trans), |
|
|
"transcription": concatenated_transcription |
|
|
} |
|
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
|
json.dump(data_to_save, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
saved_files.append(filepath) |
|
|
print(f"πΎ Saved transcriptions for {user_id}: {filename}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error saving transcriptions for {user_id}: {e}") |
|
|
|
|
|
print(f"\nπ Summary: {len(saved_files)} files saved") |
|
|
return saved_files |
|
|
|
|
|
def display_transcriptions_summary(self, transcriptions): |
|
|
"""Display a summary of retrieved transcriptions.""" |
|
|
if not transcriptions: |
|
|
print("π No transcriptions to display") |
|
|
return |
|
|
|
|
|
print("\nπ TRANSCRIPTIONS SUMMARY") |
|
|
print("=" * 50) |
|
|
print(f"Total transcriptions: {len(transcriptions)}") |
|
|
|
|
|
|
|
|
user_counts = {} |
|
|
for trans in transcriptions: |
|
|
user_id = trans.get('user_id', 'unknown') |
|
|
user_counts[user_id] = user_counts.get(user_id, 0) + 1 |
|
|
|
|
|
print(f"Unique users: {len(user_counts)}") |
|
|
for user_id, count in user_counts.items(): |
|
|
print(f" - {user_id}: {count} transcriptions") |
|
|
|
|
|
def run(self, limit=50, save_to_file=True, save_by_user=True): |
|
|
"""Run the complete transcription retrieval process.""" |
|
|
print("π Starting medical transcription retrieval...") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
transcriptions = self.get_traces_with_transcriptions(limit=limit) |
|
|
|
|
|
if not transcriptions: |
|
|
print("β No transcriptions found") |
|
|
return None |
|
|
|
|
|
|
|
|
self.display_transcriptions_summary(transcriptions) |
|
|
|
|
|
|
|
|
saved_files = [] |
|
|
if save_to_file: |
|
|
saved_file = self.save_transcriptions(transcriptions) |
|
|
if saved_file: |
|
|
saved_files.append(saved_file) |
|
|
|
|
|
if save_by_user: |
|
|
user_files = self.save_transcriptions_by_user(transcriptions) |
|
|
saved_files.extend(user_files) |
|
|
|
|
|
print(f"\nβ
Retrieval completed! {len(saved_files)} files saved") |
|
|
return saved_files |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main function to run the transcription retriever.""" |
|
|
print("π₯ Medical Transcription Retriever") |
|
|
print("=" * 40) |
|
|
|
|
|
try: |
|
|
retriever = MedicalTranscriptionRetriever() |
|
|
saved_files = retriever.run( |
|
|
limit=50, save_to_file=True, save_by_user=True) |
|
|
|
|
|
if saved_files: |
|
|
print(f"\nπ Success! Files saved: {len(saved_files)}") |
|
|
else: |
|
|
print("\nβ No files were saved") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|