Spaces:
Sleeping
Sleeping
| # SIMPLIFIED PRONUNCIATION ASSESSMENT API | |
| # Input: Audio + Reference Text → Output: Word highlights + Phoneme diff + Wrong words | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, APIRouter | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import List, Dict, Optional | |
| import tempfile | |
| import os | |
| import numpy as np | |
| import nltk | |
| import eng_to_ipa as ipa | |
| import whisper | |
| import re | |
| from collections import defaultdict | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| # Download required NLTK data | |
| try: | |
| nltk.download("cmudict", quiet=True) | |
| from nltk.corpus import cmudict | |
| except: | |
| print("Warning: NLTK data not available") | |
| # ============================================================================= | |
| # MODELS | |
| # ============================================================================= | |
| router = APIRouter(prefix="/pronunciation", tags=["Pronunciation"]) | |
| class PronunciationAssessmentResult(BaseModel): | |
| transcript: str | |
| overall_score: float | |
| word_highlights: List[Dict] | |
| phoneme_differences: List[Dict] | |
| wrong_words: List[Dict] | |
| feedback: List[str] | |
| # ============================================================================= | |
| # CORE COMPONENTS | |
| # ============================================================================= | |
| class SimpleG2P: | |
| """Simple Grapheme-to-Phoneme converter""" | |
| def __init__(self): | |
| try: | |
| self.cmu_dict = cmudict.dict() | |
| except: | |
| self.cmu_dict = {} | |
| print("Warning: CMU dictionary not available") | |
| def text_to_phonemes(self, text: str) -> List[Dict]: | |
| """Convert text to phoneme sequence""" | |
| words = self._clean_text(text).split() | |
| phoneme_sequence = [] | |
| for word in words: | |
| word_phonemes = self._get_word_phonemes(word) | |
| phoneme_sequence.append( | |
| {"word": word, "phonemes": word_phonemes, "ipa": self._get_ipa(word)} | |
| ) | |
| return phoneme_sequence | |
| def _clean_text(self, text: str) -> str: | |
| """Clean text for processing""" | |
| text = re.sub(r"[^\w\s\']", " ", text) | |
| text = re.sub(r"\s+", " ", text) | |
| return text.lower().strip() | |
| def _get_word_phonemes(self, word: str) -> List[str]: | |
| """Get phonemes for a word""" | |
| word_lower = word.lower() | |
| if word_lower in self.cmu_dict: | |
| # Remove stress markers | |
| phonemes = self.cmu_dict[word_lower][0] | |
| return [re.sub(r"[0-9]", "", p) for p in phonemes] | |
| else: | |
| # Simple fallback | |
| return self._estimate_phonemes(word) | |
| def _get_ipa(self, word: str) -> str: | |
| """Get IPA transcription""" | |
| try: | |
| return ipa.convert(word) | |
| except: | |
| return f"/{word}/" | |
| def _estimate_phonemes(self, word: str) -> List[str]: | |
| """Estimate phonemes for unknown words""" | |
| phoneme_map = { | |
| "ch": ["CH"], | |
| "sh": ["SH"], | |
| "th": ["TH"], | |
| "ph": ["F"], | |
| "ck": ["K"], | |
| "ng": ["NG"], | |
| "qu": ["K", "W"], | |
| "a": ["AE"], | |
| "e": ["EH"], | |
| "i": ["IH"], | |
| "o": ["AH"], | |
| "u": ["AH"], | |
| "b": ["B"], | |
| "c": ["K"], | |
| "d": ["D"], | |
| "f": ["F"], | |
| "g": ["G"], | |
| "h": ["HH"], | |
| "j": ["JH"], | |
| "k": ["K"], | |
| "l": ["L"], | |
| "m": ["M"], | |
| "n": ["N"], | |
| "p": ["P"], | |
| "r": ["R"], | |
| "s": ["S"], | |
| "t": ["T"], | |
| "v": ["V"], | |
| "w": ["W"], | |
| "x": ["K", "S"], | |
| "y": ["Y"], | |
| "z": ["Z"], | |
| } | |
| word = word.lower() | |
| phonemes = [] | |
| i = 0 | |
| while i < len(word): | |
| # Check 2-letter combinations first | |
| if i <= len(word) - 2: | |
| two_char = word[i : i + 2] | |
| if two_char in phoneme_map: | |
| phonemes.extend(phoneme_map[two_char]) | |
| i += 2 | |
| continue | |
| # Single character | |
| char = word[i] | |
| if char in phoneme_map: | |
| phonemes.extend(phoneme_map[char]) | |
| i += 1 | |
| return phonemes | |
| class SimplePhonemeComparator: | |
| """Simple phoneme comparison""" | |
| def __init__(self): | |
| # Vietnamese difficulty map | |
| self.difficulty_map = { | |
| "TH": 0.9, | |
| "DH": 0.9, | |
| "V": 0.8, | |
| "Z": 0.8, | |
| "ZH": 0.9, | |
| "R": 0.7, | |
| "L": 0.6, | |
| "W": 0.5, | |
| "F": 0.4, | |
| "S": 0.3, | |
| "SH": 0.5, | |
| "CH": 0.4, | |
| "JH": 0.5, | |
| "NG": 0.3, | |
| } | |
| # Common substitution patterns for Vietnamese speakers | |
| self.substitution_patterns = { | |
| "TH": ["F", "S", "T"], | |
| "DH": ["D", "Z", "V"], | |
| "V": ["W", "F"], | |
| "R": ["L"], | |
| "L": ["R"], | |
| "Z": ["S"], | |
| } | |
| def compare_phonemes( | |
| self, reference_phonemes: List[Dict], learner_phonemes: List[Dict] | |
| ) -> List[Dict]: | |
| """Compare reference and learner phoneme sequences""" | |
| # Flatten phoneme sequences | |
| ref_sequence = [] | |
| learner_sequence = [] | |
| for word_data in reference_phonemes: | |
| for phoneme in word_data["phonemes"]: | |
| ref_sequence.append({"phoneme": phoneme, "word": word_data["word"]}) | |
| for word_data in learner_phonemes: | |
| for phoneme in word_data["phonemes"]: | |
| learner_sequence.append({"phoneme": phoneme, "word": word_data["word"]}) | |
| # Simple alignment and comparison | |
| comparisons = [] | |
| max_len = max(len(ref_sequence), len(learner_sequence)) | |
| for i in range(max_len): | |
| ref_item = ref_sequence[i] if i < len(ref_sequence) else None | |
| learner_item = learner_sequence[i] if i < len(learner_sequence) else None | |
| if ref_item and learner_item: | |
| ref_phoneme = ref_item["phoneme"] | |
| learner_phoneme = learner_item["phoneme"] | |
| if ref_phoneme == learner_phoneme: | |
| status = "correct" | |
| score = 1.0 | |
| elif self._is_acceptable_substitution(ref_phoneme, learner_phoneme): | |
| status = "acceptable" | |
| score = 0.7 | |
| else: | |
| status = "wrong" | |
| score = 0.3 | |
| comparisons.append( | |
| { | |
| "position": i, | |
| "reference_phoneme": ref_phoneme, | |
| "learner_phoneme": learner_phoneme, | |
| "status": status, | |
| "score": score, | |
| "word": ref_item["word"], | |
| "difficulty": self.difficulty_map.get(ref_phoneme, 0.3), | |
| } | |
| ) | |
| elif ref_item and not learner_item: | |
| # Missing phoneme | |
| comparisons.append( | |
| { | |
| "position": i, | |
| "reference_phoneme": ref_item["phoneme"], | |
| "learner_phoneme": "", | |
| "status": "missing", | |
| "score": 0.0, | |
| "word": ref_item["word"], | |
| "difficulty": self.difficulty_map.get(ref_item["phoneme"], 0.3), | |
| } | |
| ) | |
| elif learner_item and not ref_item: | |
| # Extra phoneme | |
| comparisons.append( | |
| { | |
| "position": i, | |
| "reference_phoneme": "", | |
| "learner_phoneme": learner_item["phoneme"], | |
| "status": "extra", | |
| "score": 0.0, | |
| "word": learner_item["word"], | |
| "difficulty": 0.3, | |
| } | |
| ) | |
| return comparisons | |
| def _is_acceptable_substitution(self, reference: str, learner: str) -> bool: | |
| """Check if substitution is acceptable for Vietnamese speakers""" | |
| acceptable = self.substitution_patterns.get(reference, []) | |
| return learner in acceptable | |
| class SimplePronunciationAssessor: | |
| """Simplified pronunciation assessor focused on core functionality""" | |
| def __init__(self): | |
| print("Initializing Whisper model...") | |
| self.whisper_model = whisper.load_model("base.en", in_memory=True) | |
| print("Whisper model loaded successfully") | |
| self.g2p = SimpleG2P() | |
| self.comparator = SimplePhonemeComparator() | |
| self.sample_rate = 16000 | |
| def assess_pronunciation(self, audio_path: str, reference_text: str) -> Dict: | |
| """Main assessment function""" | |
| # Step 1: Whisper ASR | |
| print("Running Whisper transcription...") | |
| asr_result = self.whisper_model.transcribe(audio_path) | |
| transcript = asr_result["text"].strip() | |
| print(f"Transcript: '{transcript}'") | |
| # Step 2: Get reference phonemes | |
| print("Getting reference phonemes...") | |
| reference_phonemes = self.g2p.text_to_phonemes(reference_text) | |
| # Step 3: Get learner phonemes from transcript | |
| print("Getting learner phonemes...") | |
| learner_phonemes = self.g2p.text_to_phonemes(transcript) | |
| # Step 4: Compare phonemes | |
| print("Comparing phonemes...") | |
| phoneme_comparisons = self.comparator.compare_phonemes( | |
| reference_phonemes, learner_phonemes | |
| ) | |
| # Step 5: Generate word highlights | |
| print("Generating word highlights...") | |
| word_highlights = self._generate_word_highlights( | |
| reference_phonemes, learner_phonemes, phoneme_comparisons | |
| ) | |
| # Step 6: Identify wrong words | |
| print("Identifying wrong words...") | |
| wrong_words = self._identify_wrong_words(word_highlights, phoneme_comparisons) | |
| # Step 7: Calculate overall score | |
| overall_score = self._calculate_overall_score(phoneme_comparisons) | |
| # Step 8: Generate feedback | |
| feedback = self._generate_simple_feedback( | |
| overall_score, wrong_words, phoneme_comparisons | |
| ) | |
| return { | |
| "transcript": transcript, | |
| "overall_score": overall_score, | |
| "word_highlights": word_highlights, | |
| "phoneme_differences": phoneme_comparisons, | |
| "wrong_words": wrong_words, | |
| "feedback": feedback, | |
| } | |
| def _generate_word_highlights( | |
| self, | |
| reference_phonemes: List[Dict], | |
| learner_phonemes: List[Dict], | |
| phoneme_comparisons: List[Dict], | |
| ) -> List[Dict]: | |
| """Generate word highlighting data""" | |
| word_highlights = [] | |
| # Group comparisons by word | |
| word_scores = defaultdict(list) | |
| for comparison in phoneme_comparisons: | |
| word = comparison.get("word", "unknown") | |
| if comparison["status"] in ["correct", "acceptable", "wrong"]: | |
| word_scores[word].append(comparison["score"]) | |
| # Create highlights for reference words | |
| for word_data in reference_phonemes: | |
| word = word_data["word"] | |
| scores = word_scores.get(word, [0.0]) | |
| avg_score = float(np.mean(scores)) | |
| highlight = { | |
| "word": word, | |
| "score": avg_score, | |
| "status": self._get_word_status(avg_score), | |
| "color": self._get_word_color(avg_score), | |
| "phonemes": word_data["phonemes"], | |
| "ipa": word_data["ipa"], | |
| "issues": self._get_word_issues(word, phoneme_comparisons), | |
| } | |
| word_highlights.append(highlight) | |
| return word_highlights | |
| def _identify_wrong_words( | |
| self, word_highlights: List[Dict], phoneme_comparisons: List[Dict] | |
| ) -> List[Dict]: | |
| """Identify words that were pronounced incorrectly""" | |
| wrong_words = [] | |
| for word_highlight in word_highlights: | |
| if word_highlight["score"] < 0.6: # Threshold for "wrong" | |
| word = word_highlight["word"] | |
| # Find specific issues for this word | |
| word_issues = [] | |
| wrong_phonemes = [] | |
| missing_phonemes = [] | |
| for comparison in phoneme_comparisons: | |
| if comparison.get("word") == word: | |
| if comparison["status"] == "wrong": | |
| wrong_phonemes.append( | |
| { | |
| "expected": comparison["reference_phoneme"], | |
| "actual": comparison["learner_phoneme"], | |
| } | |
| ) | |
| elif comparison["status"] == "missing": | |
| missing_phonemes.append(comparison["reference_phoneme"]) | |
| if wrong_phonemes: | |
| word_issues.append( | |
| f"Wrong sounds: {', '.join([p['expected'] for p in wrong_phonemes])}" | |
| ) | |
| if missing_phonemes: | |
| word_issues.append(f"Missing sounds: {', '.join(missing_phonemes)}") | |
| wrong_word = { | |
| "word": word, | |
| "score": word_highlight["score"], | |
| "expected_phonemes": word_highlight["phonemes"], | |
| "ipa": word_highlight["ipa"], | |
| "issues": word_issues, | |
| "wrong_phonemes": wrong_phonemes, | |
| "missing_phonemes": missing_phonemes, | |
| "tips": self._get_pronunciation_tips( | |
| word, wrong_phonemes, missing_phonemes | |
| ), | |
| } | |
| wrong_words.append(wrong_word) | |
| return wrong_words | |
| def _calculate_overall_score(self, phoneme_comparisons: List[Dict]) -> float: | |
| """Calculate overall pronunciation score""" | |
| if not phoneme_comparisons: | |
| return 0.0 | |
| total_score = 0.0 | |
| for comparison in phoneme_comparisons: | |
| total_score += comparison["score"] | |
| return total_score / len(phoneme_comparisons) | |
| def _generate_simple_feedback( | |
| self, | |
| overall_score: float, | |
| wrong_words: List[Dict], | |
| phoneme_comparisons: List[Dict], | |
| ) -> List[str]: | |
| """Generate simple, actionable feedback""" | |
| feedback = [] | |
| # Overall feedback | |
| if overall_score >= 0.8: | |
| feedback.append("Phát âm tốt! Bạn đã làm rất tốt.") | |
| elif overall_score >= 0.6: | |
| feedback.append("Phát âm khá tốt, còn một vài điểm cần cải thiện.") | |
| elif overall_score >= 0.4: | |
| feedback.append( | |
| "Cần luyện tập thêm. Tập trung vào những từ được đánh dấu đỏ." | |
| ) | |
| else: | |
| feedback.append("Hãy luyện tập chậm và rõ ràng hơn.") | |
| # Wrong words feedback | |
| if wrong_words: | |
| word_names = [w["word"] for w in wrong_words[:3]] | |
| feedback.append(f"Các từ cần luyện tập: {', '.join(word_names)}") | |
| # Phoneme-specific feedback for Vietnamese speakers | |
| problem_phonemes = defaultdict(int) | |
| for comparison in phoneme_comparisons: | |
| if comparison["status"] == "wrong": | |
| phoneme = comparison["reference_phoneme"] | |
| problem_phonemes[phoneme] += 1 | |
| # Vietnamese-specific tips for most problematic sounds | |
| vietnamese_tips = { | |
| "TH": "Đặt lưỡi giữa răng, thổi nhẹ", | |
| "DH": "Giống TH nhưng rung dây thanh", | |
| "V": "Chạm môi dưới vào răng trên", | |
| "R": "Cuộn lưỡi, không chạm vòm miệng", | |
| "L": "Đầu lưỡi chạm vòm miệng", | |
| "Z": "Giống S nhưng có rung dây thanh", | |
| } | |
| if problem_phonemes: | |
| most_difficult = sorted( | |
| problem_phonemes.items(), key=lambda x: x[1], reverse=True | |
| ) | |
| for phoneme, count in most_difficult[:2]: | |
| if phoneme in vietnamese_tips: | |
| feedback.append(f"Âm {phoneme}: {vietnamese_tips[phoneme]}") | |
| return feedback | |
| def _get_word_status(self, score: float) -> str: | |
| """Get word status from score""" | |
| if score >= 0.8: | |
| return "excellent" | |
| elif score >= 0.6: | |
| return "good" | |
| elif score >= 0.4: | |
| return "needs_practice" | |
| else: | |
| return "poor" | |
| def _get_word_color(self, score: float) -> str: | |
| """Get color for word highlighting""" | |
| if score >= 0.8: | |
| return "#22c55e" # Green | |
| elif score >= 0.6: | |
| return "#84cc16" # Light green | |
| elif score >= 0.4: | |
| return "#eab308" # Yellow | |
| else: | |
| return "#ef4444" # Red | |
| def _get_word_issues(self, word: str, phoneme_comparisons: List[Dict]) -> List[str]: | |
| """Get specific issues for a word""" | |
| issues = [] | |
| word_comparisons = [c for c in phoneme_comparisons if c.get("word") == word] | |
| wrong_count = len([c for c in word_comparisons if c["status"] == "wrong"]) | |
| missing_count = len([c for c in word_comparisons if c["status"] == "missing"]) | |
| if wrong_count > 0: | |
| issues.append(f"{wrong_count} sai âm") | |
| if missing_count > 0: | |
| issues.append(f"{missing_count} thiếu âm") | |
| return issues | |
| def _get_pronunciation_tips( | |
| self, word: str, wrong_phonemes: List[Dict], missing_phonemes: List[str] | |
| ) -> List[str]: | |
| """Get pronunciation tips for wrong words""" | |
| tips = [] | |
| # Tips for specific problematic phonemes | |
| phoneme_tips = { | |
| "TH": "Đặt lưỡi giữa răng trên và dưới, thổi nhẹ", | |
| "DH": "Giống TH nhưng rung dây thanh âm", | |
| "V": "Chạm môi dưới vào răng trên, không dùng cả hai môi", | |
| "R": "Cuộn lưỡi nhưng không chạm vào vòm miệng", | |
| "L": "Đầu lưỡi chạm vào vòm miệng sau răng", | |
| "Z": "Giống âm S nhưng có rung dây thanh âm", | |
| } | |
| # Add tips for wrong phonemes | |
| for wrong in wrong_phonemes: | |
| expected = wrong["expected"] | |
| if expected in phoneme_tips: | |
| tips.append(f"Âm {expected}: {phoneme_tips[expected]}") | |
| # Add tips for missing phonemes | |
| for missing in missing_phonemes: | |
| if missing in phoneme_tips: | |
| tips.append(f"Thiếu âm {missing}: {phoneme_tips[missing]}") | |
| # General tip if no specific tips | |
| if not tips: | |
| tips.append(f"Luyện tập từ '{word}' chậm và rõ ràng") | |
| return tips | |
| # ============================================================================= | |
| # MAIN API ENDPOINT | |
| # ============================================================================= | |
| # Initialize assessor | |
| assessor = SimplePronunciationAssessor() | |
| def convert_numpy_types(obj): | |
| """Convert numpy types to Python native types""" | |
| if isinstance(obj, np.integer): | |
| return int(obj) | |
| elif isinstance(obj, np.floating): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| elif isinstance(obj, dict): | |
| return {key: convert_numpy_types(value) for key, value in obj.items()} | |
| elif isinstance(obj, list): | |
| return [convert_numpy_types(item) for item in obj] | |
| else: | |
| return obj | |
| async def assess_pronunciation( | |
| audio: UploadFile = File(..., description="Audio file (.wav, .mp3, .m4a)"), | |
| reference_text: str = Form(..., description="Reference text to compare against"), | |
| ): | |
| """ | |
| Main API: Pronunciation Assessment | |
| Input: Audio file + Reference text | |
| Output: Word highlights + Phoneme differences + Wrong words | |
| Features: | |
| - Whisper ASR for transcript | |
| - CMU Dict phoneme mapping | |
| - Vietnamese-optimized comparison | |
| - Simple UI-ready output | |
| """ | |
| import time | |
| start_time = time.time() | |
| # Validate inputs | |
| if not reference_text.strip(): | |
| raise HTTPException(status_code=400, detail="Reference text cannot be empty") | |
| if len(reference_text) > 500: | |
| raise HTTPException( | |
| status_code=400, detail="Reference text too long (max 500 characters)" | |
| ) | |
| # Check for valid English characters | |
| if not re.match(r"^[a-zA-Z\s\'\-\.!?,;:]+$", reference_text): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Text must contain only English letters, spaces, and basic punctuation", | |
| ) | |
| try: | |
| # Save uploaded file temporarily | |
| file_extension = ".wav" | |
| if audio.filename and "." in audio.filename: | |
| file_extension = f".{audio.filename.split('.')[-1]}" | |
| with tempfile.NamedTemporaryFile( | |
| delete=False, suffix=file_extension | |
| ) as tmp_file: | |
| content = await audio.read() | |
| tmp_file.write(content) | |
| tmp_file.flush() | |
| print(f"Processing audio file: {tmp_file.name}") | |
| # Run assessment | |
| result = assessor.assess_pronunciation(tmp_file.name, reference_text) | |
| # Clean up temporary file | |
| os.unlink(tmp_file.name) | |
| # Convert numpy types for JSON serialization | |
| final_result = convert_numpy_types(result) | |
| processing_time = time.time() - start_time | |
| print(f"Assessment completed in {processing_time:.2f} seconds") | |
| return PronunciationAssessmentResult(**final_result) | |
| except Exception as e: | |
| print(f"Assessment error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=f"Assessment failed: {str(e)}") | |
| # ============================================================================= | |
| # UTILITY ENDPOINTS | |
| # ============================================================================= | |
| async def get_word_phonemes(word: str): | |
| """Get phoneme breakdown for a specific word""" | |
| try: | |
| phoneme_data = assessor.g2p.text_to_phonemes(word)[0] | |
| # Add difficulty analysis | |
| difficulty_scores = [] | |
| for phoneme in phoneme_data["phonemes"]: | |
| difficulty = assessor.comparator.difficulty_map.get(phoneme, 0.3) | |
| difficulty_scores.append(difficulty) | |
| avg_difficulty = float(np.mean(difficulty_scores)) if difficulty_scores else 0.3 | |
| return { | |
| "word": word, | |
| "phonemes": phoneme_data["phonemes"], | |
| "ipa": phoneme_data["ipa"], | |
| "difficulty_score": avg_difficulty, | |
| "difficulty_level": ( | |
| "hard" | |
| if avg_difficulty > 0.6 | |
| else "medium" if avg_difficulty > 0.4 else "easy" | |
| ), | |
| "challenging_phonemes": [ | |
| { | |
| "phoneme": p, | |
| "difficulty": assessor.comparator.difficulty_map.get(p, 0.3), | |
| } | |
| for p in phoneme_data["phonemes"] | |
| if assessor.comparator.difficulty_map.get(p, 0.3) > 0.6 | |
| ], | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Word analysis error: {str(e)}") | |
| async def health_check(): | |
| """Simple health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "whisper_model": "tiny", | |
| "cmu_dict_size": len(assessor.g2p.cmu_dict), | |
| "vietnamese_optimized": True, | |
| } | |