from fastapi import UploadFile, File, Form, HTTPException, APIRouter from pydantic import BaseModel from typing import List, Dict, Optional import tempfile import numpy as np import re import warnings import asyncio import concurrent.futures import time from loguru import logger from src.utils.speaking_utils import convert_numpy_types # Import the new evaluation system from src.apis.controllers.speaking_controller import ProductionPronunciationAssessor, EnhancedG2P warnings.filterwarnings("ignore") router = APIRouter(prefix="/speaking", tags=["Speaking"]) # ============================================================================= # OPTIMIZATION FUNCTIONS # ============================================================================= async def optimize_post_assessment_processing(result: Dict, reference_text: str) -> None: """ Tối ưu hóa xử lý sau assessment bằng cách chạy song song các task độc lập Giảm thời gian xử lý từ ~0.3-0.5s xuống ~0.1-0.2s """ start_time = time.time() # Tạo shared G2P instance để tránh tạo mới nhiều lần g2p = get_shared_g2p() # Định nghĩa các task có thể chạy song song async def process_reference_phonemes_and_ipa(): """Xử lý reference phonemes và IPA song song""" loop = asyncio.get_event_loop() executor = get_shared_executor() reference_words = reference_text.strip().split() # Chạy song song cho từng word futures = [] for word in reference_words: clean_word = word.strip('.,!?;:') future = loop.run_in_executor(executor, g2p.text_to_phonemes, clean_word) futures.append(future) # Collect results word_results = await asyncio.gather(*futures) reference_phonemes_list = [] reference_ipa_list = [] for word_data in word_results: if word_data and len(word_data) > 0: reference_phonemes_list.append(word_data[0]["phoneme_string"]) reference_ipa_list.append(word_data[0]["ipa"]) result["reference_phonemes"] = " ".join(reference_phonemes_list) result["reference_ipa"] = " ".join(reference_ipa_list) async def process_user_ipa(): """Xử lý user IPA từ transcript song song""" if "transcript" not in result or not result["transcript"]: result["user_ipa"] = None return try: user_transcript = result["transcript"].strip() user_words = user_transcript.split() if not user_words: result["user_ipa"] = None return loop = asyncio.get_event_loop() executor = get_shared_executor() # Chạy song song cho từng word futures = [] clean_words = [] for word in user_words: clean_word = word.strip('.,!?;:').lower() if clean_word: # Skip empty words clean_words.append(clean_word) future = loop.run_in_executor(executor, safe_get_word_ipa, g2p, clean_word) futures.append(future) # Collect results if futures: user_ipa_results = await asyncio.gather(*futures) user_ipa_list = [ipa for ipa in user_ipa_results if ipa] result["user_ipa"] = " ".join(user_ipa_list) if user_ipa_list else None else: result["user_ipa"] = None logger.info(f"Generated user IPA from transcript '{user_transcript}': '{result.get('user_ipa', 'None')}'") except Exception as e: logger.warning(f"Failed to generate user IPA from transcript: {e}") result["user_ipa"] = None # Chạy song song cả 2 task chính await asyncio.gather( process_reference_phonemes_and_ipa(), process_user_ipa() ) optimization_time = time.time() - start_time logger.info(f"Post-assessment optimization completed in {optimization_time:.3f}s") def safe_get_word_ipa(g2p: EnhancedG2P, word: str) -> Optional[str]: """ Safely get IPA for a word with fallback """ try: word_phonemes = g2p.text_to_phonemes(word)[0] return word_phonemes["ipa"] except Exception as e: logger.warning(f"Failed to get IPA for word '{word}': {e}") # Fallback: use the word itself with IPA notation return f"/{word}/" # ============================================================================= # OPTIMIZED CACHE MANAGEMENT # ============================================================================= # Shared G2P cache cho multiple requests _shared_g2p_cache = {} _cache_lock = asyncio.Lock() async def get_cached_g2p_result(word: str) -> Optional[Dict]: """ Cache G2P results để tránh tính toán lại cho các từ đã xử lý """ async with _cache_lock: if word in _shared_g2p_cache: return _shared_g2p_cache[word] return None async def cache_g2p_result(word: str, result: Dict) -> None: """ Cache G2P result với size limit """ async with _cache_lock: # Limit cache size to 1000 entries if len(_shared_g2p_cache) > 1000: # Remove oldest 100 entries oldest_keys = list(_shared_g2p_cache.keys())[:100] for key in oldest_keys: del _shared_g2p_cache[key] _shared_g2p_cache[word] = result async def optimize_ipa_assessment_processing( base_result: Dict, target_word: str, target_ipa: Optional[str], focus_phonemes: Optional[str] ) -> Dict: """ Tối ưu hóa xử lý IPA assessment bằng cách chạy song song các task """ start_time = time.time() # Shared G2P instance g2p = get_shared_g2p() # Parse focus phonemes trước focus_phonemes_list = [] if focus_phonemes: focus_phonemes_list = [p.strip() for p in focus_phonemes.split(",")] async def get_target_phonemes_data(): """Get target IPA and phonemes""" if not target_ipa: loop = asyncio.get_event_loop() executor = get_shared_executor() target_phonemes_data = await loop.run_in_executor( executor, lambda: g2p.text_to_phonemes(target_word)[0] ) return target_phonemes_data["ipa"], target_phonemes_data["phonemes"] else: # Parse provided IPA clean_ipa = target_ipa.replace("/", "").strip() return target_ipa, list(clean_ipa) async def create_character_analysis(final_target_ipa: str, target_phonemes: List[str]): """Create character analysis optimized""" character_analysis = [] target_chars = list(target_word) target_phoneme_chars = list(final_target_ipa.replace("/", "")) # Pre-calculate phoneme scores mapping phoneme_score_map = {} if base_result.get("phoneme_differences"): for phoneme_diff in base_result["phoneme_differences"]: ref_phoneme = phoneme_diff.get("reference_phoneme") if ref_phoneme: phoneme_score_map[ref_phoneme] = phoneme_diff.get("score", 0.0) for i, char in enumerate(target_chars): char_phoneme = target_phoneme_chars[i] if i < len(target_phoneme_chars) else "" char_score = phoneme_score_map.get(char_phoneme, base_result.get("overall_score", 0.0)) color_class = ("text-green-600" if char_score > 0.8 else "text-yellow-600" if char_score > 0.6 else "text-red-600") character_analysis.append({ "character": char, "phoneme": char_phoneme, "score": float(char_score), "color_class": color_class, "is_focus": char_phoneme in focus_phonemes_list }) return character_analysis async def create_phoneme_scores(target_phonemes: List[str]): """Create phoneme scores optimized""" phoneme_scores = [] # Pre-calculate phoneme scores mapping phoneme_score_map = {} if base_result.get("phoneme_differences"): for phoneme_diff in base_result["phoneme_differences"]: ref_phoneme = phoneme_diff.get("reference_phoneme") if ref_phoneme: phoneme_score_map[ref_phoneme] = phoneme_diff.get("score", 0.0) for phoneme in target_phonemes: phoneme_score = phoneme_score_map.get(phoneme, base_result.get("overall_score", 0.0)) color_class = ("bg-green-100 text-green-800" if phoneme_score > 0.8 else "bg-yellow-100 text-yellow-800" if phoneme_score > 0.6 else "bg-red-100 text-red-800") phoneme_scores.append({ "phoneme": phoneme, "score": float(phoneme_score), "color_class": color_class, "percentage": int(phoneme_score * 100), "is_focus": phoneme in focus_phonemes_list }) return phoneme_scores async def create_focus_analysis(): """Create focus phonemes analysis optimized""" focus_phonemes_analysis = [] # Pre-calculate phoneme scores mapping phoneme_score_map = {} if base_result.get("phoneme_differences"): for phoneme_diff in base_result["phoneme_differences"]: ref_phoneme = phoneme_diff.get("reference_phoneme") if ref_phoneme: phoneme_score_map[ref_phoneme] = phoneme_diff.get("score", 0.0) for focus_phoneme in focus_phonemes_list: score = phoneme_score_map.get(focus_phoneme, base_result.get("overall_score", 0.0)) phoneme_analysis = { "phoneme": focus_phoneme, "score": float(score), "status": "correct" if score > 0.8 else "incorrect", "vietnamese_tip": get_vietnamese_tip(focus_phoneme), "difficulty": "medium", "color_class": ("bg-green-100 text-green-800" if score > 0.8 else "bg-yellow-100 text-yellow-800" if score > 0.6 else "bg-red-100 text-red-800") } focus_phonemes_analysis.append(phoneme_analysis) return focus_phonemes_analysis # Get target phonemes data first final_target_ipa, target_phonemes = await get_target_phonemes_data() # Run parallel processing for analysis character_analysis, phoneme_scores, focus_phonemes_analysis = await asyncio.gather( create_character_analysis(final_target_ipa, target_phonemes), create_phoneme_scores(target_phonemes), create_focus_analysis() ) # Generate tips and recommendations asynchronously loop = asyncio.get_event_loop() executor = get_shared_executor() vietnamese_tips_future = loop.run_in_executor( executor, generate_vietnamese_tips, target_phonemes, focus_phonemes_list ) practice_recommendations_future = loop.run_in_executor( executor, generate_practice_recommendations, base_result.get("overall_score", 0.0), focus_phonemes_analysis ) vietnamese_tips, practice_recommendations = await asyncio.gather( vietnamese_tips_future, practice_recommendations_future ) optimization_time = time.time() - start_time logger.info(f"IPA assessment optimization completed in {optimization_time:.3f}s") return { "target_ipa": final_target_ipa, "character_analysis": character_analysis, "phoneme_scores": phoneme_scores, "focus_phonemes_analysis": focus_phonemes_analysis, "vietnamese_tips": vietnamese_tips, "practice_recommendations": practice_recommendations } def generate_vietnamese_tips(target_phonemes: List[str], focus_phonemes_list: List[str]) -> List[str]: """Generate Vietnamese tips for difficult phonemes""" vietnamese_tips = [] difficult_phonemes = ["θ", "ð", "v", "z", "ʒ", "r", "w", "æ", "ɪ", "ʊ", "ɛ"] for phoneme in set(target_phonemes + focus_phonemes_list): if phoneme in difficult_phonemes: tip = get_vietnamese_tip(phoneme) if tip not in vietnamese_tips: vietnamese_tips.append(tip) return vietnamese_tips def generate_practice_recommendations(overall_score: float, focus_phonemes_analysis: List[Dict]) -> List[str]: """Generate practice recommendations based on score""" practice_recommendations = [] if overall_score < 0.7: practice_recommendations.extend([ "Nghe từ mẫu nhiều lần trước khi phát âm", "Phát âm chậm và rõ ràng từng âm vị", "Chú ý đến vị trí lưỡi và môi khi phát âm" ]) # Add specific recommendations for focus phonemes for analysis in focus_phonemes_analysis: if analysis["score"] < 0.6: practice_recommendations.append( f"Luyện đặc biệt âm /{analysis['phoneme']}/: {analysis['vietnamese_tip']}" ) if overall_score >= 0.8: practice_recommendations.append("Phát âm rất tốt! Tiếp tục luyện tập để duy trì chất lượng") elif overall_score >= 0.6: practice_recommendations.append("Phát âm khá tốt, cần cải thiện một số âm vị") return practice_recommendations # ============================================================================= # MODEL DEFINITIONS # ============================================================================= class PronunciationAssessmentResult(BaseModel): transcript: str # What the user actually said (character transcript) transcript_phonemes: str # User's phonemes user_phonemes: str # Alias for transcript_phonemes for UI clarity user_ipa: Optional[str] = None # User's IPA notation reference_ipa: str # Reference IPA notation reference_phonemes: str # Reference phonemes character_transcript: str overall_score: float word_highlights: List[Dict] phoneme_differences: List[Dict] wrong_words: List[Dict] feedback: List[str] processing_info: Dict # Enhanced features phoneme_pairs: Optional[List[Dict]] = None phoneme_comparison: Optional[Dict] = None prosody_analysis: Optional[Dict] = None assessment_mode: Optional[str] = None character_level_analysis: Optional[bool] = None class IPAAssessmentResult(BaseModel): """Optimized response model for IPA-focused pronunciation assessment""" # Core assessment data transcript: str # What the user actually said user_ipa: Optional[str] = None # User's IPA transcription target_word: str # Target word being assessed target_ipa: str # Target IPA transcription overall_score: float # Overall pronunciation score (0-1) # Character-level analysis for IPA mapping character_analysis: List[Dict] # Each character with its IPA and score # Phoneme-specific analysis phoneme_scores: List[Dict] # Individual phoneme scores with colors focus_phonemes_analysis: List[Dict] # Detailed analysis of target phonemes # Feedback and recommendations vietnamese_tips: List[str] # Vietnamese-specific pronunciation tips practice_recommendations: List[str] # Practice suggestions feedback: List[str] # General feedback messages # Assessment metadata processing_info: Dict # Processing details assessment_type: str = "ipa_focused" error: Optional[str] = None # Global assessor instance - singleton pattern for performance global_assessor = None global_g2p = None # Shared G2P instance for caching global_executor = None # Shared ThreadPoolExecutor def get_assessor(): """Get or create the global assessor instance""" global global_assessor if global_assessor is None: logger.info("Creating global ProductionPronunciationAssessor instance...") global_assessor = ProductionPronunciationAssessor() return global_assessor def get_shared_g2p(): """Get or create the shared G2P instance for caching""" global global_g2p if global_g2p is None: logger.info("Creating shared EnhancedG2P instance...") global_g2p = EnhancedG2P() return global_g2p def get_shared_executor(): """Get or create the shared ThreadPoolExecutor""" global global_executor if global_executor is None: logger.info("Creating shared ThreadPoolExecutor...") global_executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) return global_executor @router.post("/assess", response_model=PronunciationAssessmentResult) async def assess_pronunciation( audio_file: UploadFile = File(..., description="Audio file (.wav, .mp3, .m4a)"), reference_text: str = Form(..., description="Reference text to pronounce"), mode: str = Form( "auto", description="Assessment mode: 'word', 'sentence', or 'auto' (determined by text length)", ), ): """ Enhanced Pronunciation Assessment API with word/sentence mode support Key Features: - Word mode: For single words or short phrases (1-3 words) - Sentence mode: For longer sentences with prosody analysis - Advanced phoneme comparison using Levenshtein distance - Prosody analysis (pitch, rhythm, intensity) for sentence mode - Detailed phoneme pair visualization - Vietnamese-optimized feedback and tips Input: Audio file + Reference text + Mode Output: Enhanced assessment results with visualization data """ import time start_time = time.time() # Validate mode and set to auto if invalid if mode not in ["word", "sentence", "auto"]: mode = "auto" # Set to auto as default instead of throwing error logger.info(f"Invalid mode '{mode}' provided, defaulting to 'auto' mode") # Validate inputs if not reference_text.strip(): raise HTTPException(status_code=400, detail="Reference text cannot be empty") if len(reference_text) > 500: raise HTTPException( status_code=400, detail="Reference text too long (max 500 characters)" ) # Check for valid English characters if not re.match(r"^[a-zA-Z\s\'\-\.!?,;:]+$", reference_text): raise HTTPException( status_code=400, detail="Text must contain only English letters, spaces, and basic punctuation", ) try: # Save uploaded file temporarily file_extension = ".wav" if audio_file.filename and "." in audio_file.filename: file_extension = f".{audio_file.filename.split('.')[-1]}" with tempfile.NamedTemporaryFile( delete=False, suffix=file_extension ) as tmp_file: content = await audio_file.read() tmp_file.write(content) tmp_file.flush() logger.info(f"Processing audio file: {tmp_file.name} with mode: {mode}") # Run assessment using enhanced assessor (singleton) assessor = get_assessor() result = assessor.assess_pronunciation(tmp_file.name, reference_text, mode) # Optimize post-processing with parallel execution await optimize_post_assessment_processing(result, reference_text) # Add processing time processing_time = time.time() - start_time if "processing_info" not in result: result["processing_info"] = {} result["processing_info"]["processing_time"] = processing_time # Convert numpy types for JSON serialization final_result = convert_numpy_types(result) logger.info( f"Assessment completed in {processing_time:.2f} seconds using {mode} mode" ) return PronunciationAssessmentResult(**final_result) except Exception as e: logger.error(f"Assessment error: {str(e)}") import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=f"Assessment failed: {str(e)}") @router.post("/assess-ipa", response_model=IPAAssessmentResult) async def assess_ipa_pronunciation( audio_file: UploadFile = File(..., description="Audio file (.wav, .mp3, .m4a)"), target_word: str = Form(..., description="Target word to assess (e.g., 'bed')"), target_ipa: str = Form(None, description="Target IPA notation (e.g., '/bɛd/')"), focus_phonemes: str = Form(None, description="Comma-separated focus phonemes (e.g., 'ɛ,b')"), ): """ Optimized IPA pronunciation assessment for phoneme-focused learning Evaluates: - Overall word pronunciation accuracy - Character-to-phoneme mapping accuracy - Specific phoneme pronunciation (e.g., /ɛ/ in 'bed') - Vietnamese-optimized feedback and tips - Dynamic color scoring for UI visualization Example: Assessing 'bed' /bɛd/ with focus on /ɛ/ phoneme """ import time start_time = time.time() # Validate inputs if not target_word.strip(): raise HTTPException(status_code=400, detail="Target word cannot be empty") if len(target_word) > 50: raise HTTPException(status_code=400, detail="Target word too long (max 50 characters)") # Clean target word target_word = target_word.strip().lower() try: # Save uploaded file temporarily file_extension = ".wav" if audio_file.filename and "." in audio_file.filename: file_extension = f".{audio_file.filename.split('.')[-1]}" with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as tmp_file: content = await audio_file.read() tmp_file.write(content) tmp_file.flush() logger.info(f"IPA assessment for word '{target_word}' with IPA '{target_ipa}'") # Get the assessor instance assessor = get_assessor() # Run base pronunciation assessment in word mode base_result = assessor.assess_pronunciation(tmp_file.name, target_word, "word") # Optimize IPA assessment processing with parallel execution optimized_results = await optimize_ipa_assessment_processing( base_result, target_word, target_ipa, focus_phonemes ) # Extract optimized results target_ipa = optimized_results["target_ipa"] character_analysis = optimized_results["character_analysis"] phoneme_scores = optimized_results["phoneme_scores"] focus_phonemes_analysis = optimized_results["focus_phonemes_analysis"] vietnamese_tips = optimized_results["vietnamese_tips"] practice_recommendations = optimized_results["practice_recommendations"] # Get overall score from base result overall_score = base_result.get("overall_score", 0.0) # Handle error cases error_message = None feedback = base_result.get("feedback", []) if base_result.get("error"): error_message = base_result["error"] feedback = [f"Lỗi: {error_message}"] # Processing information processing_time = time.time() - start_time processing_info = { "processing_time": processing_time, "mode": "ipa_focused", "model_used": "Wav2Vec2-Enhanced", "confidence": base_result.get("processing_info", {}).get("confidence", 0.0), "enhanced_features": True } # Create final result result = IPAAssessmentResult( transcript=base_result.get("transcript", ""), user_ipa=base_result.get("user_ipa", ""), target_word=target_word, target_ipa=target_ipa, overall_score=float(overall_score), character_analysis=character_analysis, phoneme_scores=phoneme_scores, focus_phonemes_analysis=focus_phonemes_analysis, vietnamese_tips=vietnamese_tips, practice_recommendations=practice_recommendations, feedback=feedback, processing_info=processing_info, error=error_message ) logger.info(f"IPA assessment completed for '{target_word}' in {processing_time:.2f}s with score {overall_score:.2f}") return result except Exception as e: logger.error(f"IPA assessment error: {str(e)}") import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=f"IPA assessment failed: {str(e)}") # ============================================================================= # UTILITY ENDPOINTS # ============================================================================= @router.get("/phonemes/{word}") def get_word_phonemes(word: str): """Get phoneme breakdown for a specific word""" try: # Use the new EnhancedG2P from evaluation module from evalution import EnhancedG2P g2p = EnhancedG2P() phoneme_data = g2p.text_to_phonemes(word)[0] # Add difficulty analysis for Vietnamese speakers difficulty_scores = [] for phoneme in phoneme_data["phonemes"]: difficulty = g2p.get_difficulty_score(phoneme) difficulty_scores.append(difficulty) avg_difficulty = float(np.mean(difficulty_scores)) if difficulty_scores else 0.3 return { "word": word, "phonemes": phoneme_data["phonemes"], "phoneme_string": phoneme_data["phoneme_string"], "ipa": phoneme_data["ipa"], "difficulty_score": avg_difficulty, "difficulty_level": ( "hard" if avg_difficulty > 0.6 else "medium" if avg_difficulty > 0.4 else "easy" ), "challenging_phonemes": [ { "phoneme": p, "difficulty": g2p.get_difficulty_score(p), "vietnamese_tip": get_vietnamese_tip(p), } for p in phoneme_data["phonemes"] if g2p.get_difficulty_score(p) > 0.6 ], } except Exception as e: raise HTTPException(status_code=500, detail=f"Word analysis error: {str(e)}") def get_vietnamese_tip(phoneme: str) -> str: """Get Vietnamese pronunciation tip for a phoneme""" tips = { "θ": "Đặt lưỡi giữa răng, thổi nhẹ", "ð": "Giống θ nhưng rung dây thanh âm", "v": "Môi dưới chạm răng trên", "r": "Cuộn lưỡi, không chạm vòm miệng", "l": "Lưỡi chạm vòm miệng sau răng", "z": "Như 's' nhưng rung dây thanh", "ʒ": "Như 'ʃ' nhưng rung dây thanh", "w": "Tròn môi như 'u'", "ɛ": "Mở miệng vừa phải, lưỡi hạ thấp như 'e' tiếng Việt", "æ": "Mở miệng rộng, lưỡi thấp như nói 'a' nhưng ngắn hơn", "ɪ": "Âm 'i' ngắn, lưỡi không căng như 'i' tiếng Việt", "ʊ": "Âm 'u' ngắn, môi tròn nhẹ", "ə": "Âm trung tính, miệng thả lỏng", "ɔ": "Mở miệng tròn như 'o' nhưng rộng hơn", "ʌ": "Miệng mở vừa, lưỡi ở giữa", "f": "Răng trên chạm môi dưới, thổi nhẹ", "b": "Hai môi chạm nhau, rung dây thanh", "p": "Hai môi chạm nhau, không rung dây thanh", "d": "Lưỡi chạm nướu răng trên, rung dây thanh", "t": "Lưỡi chạm nướu răng trên, không rung dây thanh", "k": "Lưỡi chạm vòm miệng, không rung dây thanh", "g": "Lưỡi chạm vòm miệng, rung dây thanh" } return tips.get(phoneme, f"Luyện tập phát âm /{phoneme}/") def get_phoneme_difficulty(phoneme: str) -> str: """Get difficulty level for Vietnamese speakers""" hard_phonemes = ["θ", "ð", "r", "w", "æ", "ʌ", "ɪ", "ʊ"] medium_phonemes = ["v", "z", "ʒ", "ɛ", "ə", "ɔ", "f"] if phoneme in hard_phonemes: return "hard" elif phoneme in medium_phonemes: return "medium" else: return "easy"