Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| from smolagents.tools import tool | |
| from difflib import SequenceMatcher | |
| try: | |
| from gradio_client import Client | |
| except ImportError: | |
| # Fallback import for older versions | |
| import gradio_client | |
| Client = gradio_client.Client | |
| from google import genai | |
| from google.genai import types | |
| import json | |
| import time | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure API keys | |
| TTS_API = os.getenv("TTS_API") | |
| STT_API = os.getenv("STT_API") | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| # Configure Google Gemini client | |
| if GOOGLE_API_KEY: | |
| gemini_client = genai.Client(api_key=GOOGLE_API_KEY) | |
| def generate_story(name: str, grade: str, topic: str) -> str: | |
| """ | |
| Generate a short, age-appropriate story for reading practice using LLM. | |
| Args: | |
| name (str): The child's name. | |
| grade (str): The student's grade level, e.g., "Grade 3". | |
| topic (str): The story topic, e.g., "space", "animals". | |
| Returns: | |
| str: Generated story text. | |
| """ | |
| # Extract grade number and determine age/reading level | |
| grade_num = int(''.join(filter(str.isdigit, grade)) or "3") | |
| age = grade_num + 5 # Grade 1 = ~6 years old, Grade 6 = ~11 years old | |
| # Dynamically determine story parameters based on grade | |
| if grade_num <= 2: | |
| # Grades 1-2: Very simple stories | |
| story_length = "2-3 short sentences" | |
| vocabulary_level = "very simple words (mostly 1-2 syllables)" | |
| sentence_structure = "short, simple sentences" | |
| complexity = "basic concepts" | |
| reading_level = "beginner" | |
| elif grade_num <= 4: | |
| # Grades 3-4: Intermediate stories | |
| story_length = "1-2 short paragraphs" | |
| vocabulary_level = "age-appropriate words with some longer words" | |
| sentence_structure = "mix of simple and compound sentences" | |
| complexity = "intermediate concepts with some detail" | |
| reading_level = "intermediate" | |
| else: | |
| # Grades 5-6: More advanced stories | |
| story_length = "2-3 paragraphs" | |
| vocabulary_level = "varied vocabulary including descriptive words" | |
| sentence_structure = "complex sentences with descriptive language" | |
| complexity = "detailed concepts and explanations" | |
| reading_level = "advanced elementary" | |
| # Create dynamic, grade-adaptive prompt | |
| prompt = f""" | |
| You are an expert children's reading coach. Create an engaging, educational story for a {age}-year-old child named {name} about {topic}. | |
| GRADE LEVEL: {grade} ({reading_level} level) | |
| Story Requirements: | |
| - Length: {story_length} | |
| - Vocabulary: Use {vocabulary_level} | |
| - Sentence structure: {sentence_structure} | |
| - Complexity: {complexity} | |
| - Include {name} as the main character | |
| - Teach something interesting about {topic} | |
| - End with a positive, encouraging message | |
| - Make it engaging and fun to read aloud | |
| Additional Guidelines: | |
| - For younger students (Grades 1-2): Focus on simple actions, basic emotions, and clear cause-and-effect | |
| - For middle students (Grades 3-4): Include some problem-solving, friendship themes, and basic science/nature facts | |
| - For older students (Grades 5-6): Add character development, more detailed explanations, and encourage curiosity | |
| The story should be perfectly suited for a {grade} student's reading ability and attention span. | |
| Story: | |
| """ | |
| # Use Google Gemini | |
| # Adjust generation parameters based on grade level | |
| max_tokens = 300 if grade_num <= 2 else 600 if grade_num <= 4 else 1000 | |
| generation_config = types.GenerateContentConfig( | |
| temperature=0.8, | |
| max_output_tokens=max_tokens, | |
| top_p=0.9, | |
| ) | |
| response = gemini_client.models.generate_content( | |
| model="gemini-2.0-flash", | |
| contents=[prompt], | |
| config=generation_config | |
| ) | |
| return response.text.strip() | |
| def text_to_speech(text: str) -> str: | |
| """ | |
| Convert story text into an audio URL via TTS service using the gradio_client. | |
| Args: | |
| text (str): The story to convert to speech. | |
| Returns: | |
| str: URL or file path of the generated audio. | |
| """ | |
| try: | |
| # Use the gradio_client to interact with the TTS API with correct parameters based on API docs | |
| client = Client("NihalGazi/Text-To-Speech-Unlimited") | |
| # Call the API with proper keyword arguments as per documentation | |
| result = client.predict( | |
| prompt=text, # Required: The text to convert to speech | |
| voice="nova", # Voice selection from available options | |
| emotion="neutral", # Required: Emotion style | |
| use_random_seed=True, # Use random seed for variety | |
| specific_seed=12345, # Specific seed value | |
| api_name="/text_to_speech_app" | |
| ) | |
| print(f"TTS result: {result}") | |
| print(f"TTS result type: {type(result)}") | |
| # According to API docs, returns tuple of (filepath, status_str) | |
| if isinstance(result, tuple) and len(result) >= 2: | |
| audio_path, status = result[0], result[1] | |
| print(f"TTS Status: {status}") | |
| # Return the audio file path | |
| if audio_path and isinstance(audio_path, str): | |
| print(f"TTS generated audio at: {audio_path}") | |
| return audio_path | |
| else: | |
| print(f"Invalid audio path: {audio_path}") | |
| return None | |
| else: | |
| print(f"Unexpected TTS result format: {result}") | |
| return None | |
| except Exception as e: | |
| print(f"TTS Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def transcribe_audio(audio_input: str) -> str: | |
| """ | |
| Transcribe the student's audio into text via Whisper STT service. | |
| Using abidlabs/whisper-large-v2 Hugging Face Space API. | |
| Args: | |
| audio_input: Either a file path (str) or tuple (sample_rate, numpy_array) from Gradio | |
| Returns: | |
| str: Transcribed speech text. | |
| """ | |
| try: | |
| print(f"Received audio input: {type(audio_input)}") | |
| # Handle different input formats | |
| if isinstance(audio_input, tuple) and len(audio_input) == 2: | |
| # Gradio microphone format: (sample_rate, numpy_array) | |
| sample_rate, audio_data = audio_input | |
| print(f"Audio tuple: sample_rate={sample_rate}, data_shape={audio_data.shape}") | |
| # Pass the tuple directly to the STT service | |
| audio_for_stt = audio_input | |
| elif isinstance(audio_input, (str, Path)): | |
| audio_for_stt = str(audio_input) | |
| else: | |
| print(f"Unsupported audio input type: {type(audio_input)}") | |
| return "Error: Unsupported audio format. Please try recording again." | |
| if isinstance(audio_for_stt, Path): | |
| audio_for_stt = str(audio_for_stt) | |
| # Initialize client with error handling | |
| print("Initializing Gradio client for STT...") | |
| try: | |
| client = Client("abidlabs/whisper-large-v2") | |
| except Exception as client_error: | |
| print(f"Failed to initialize client: {client_error}") | |
| # Try alternative approach | |
| try: | |
| print("Trying direct API approach...") | |
| return "Error: STT service initialization failed. Please try again." | |
| except Exception as fallback_error: | |
| print(f"Fallback also failed: {fallback_error}") | |
| return "Error: Speech recognition service unavailable. Please try again later." | |
| print("Sending audio for transcription...") | |
| # Make the API call with timeout and error handling | |
| try: | |
| if isinstance(audio_for_stt, tuple): | |
| result = client.predict(audio_for_stt, api_name="/predict") | |
| else: | |
| result = client.predict(audio_for_stt, api_name="/predict") | |
| except Exception as api_error: | |
| print(f"API call failed: {api_error}") | |
| if "extra_headers" in str(api_error): | |
| return "Error: Connection protocol mismatch. Please try recording again." | |
| elif "connection" in str(api_error).lower(): | |
| return "Error: Network connection issue. Please check your internet and try again." | |
| else: | |
| return "Error: Transcription service temporarily unavailable. Please try again." | |
| print(f"Raw transcription result: {result}") | |
| print(f"Result type: {type(result)}") | |
| # Handle different result types more robustly | |
| if result is None: | |
| return "Error: No transcription result. Please try speaking more clearly and loudly." | |
| # Extract text from result | |
| transcribed_text = "" | |
| if isinstance(result, str): | |
| transcribed_text = result.strip() | |
| elif isinstance(result, (list, tuple)): | |
| if len(result) > 0: | |
| # Try to find the text in the result structure | |
| transcribed_text = str(result[0]).strip() | |
| print(f"Extracted from list/tuple: {transcribed_text}") | |
| else: | |
| return "Error: Empty transcription result. Please try again." | |
| elif isinstance(result, dict): | |
| # Handle dictionary results - try common keys | |
| transcribed_text = result.get('text', result.get('transcription', str(result))).strip() | |
| print(f"Extracted from dict: {transcribed_text}") | |
| else: | |
| transcribed_text = str(result).strip() | |
| print(f"Converted to string: {transcribed_text}") | |
| # Clean up common API artifacts | |
| transcribed_text = transcribed_text.replace('```', '').replace('json', '').replace('{', '').replace('}', '') | |
| # Validate the transcription | |
| if not transcribed_text or (isinstance(transcribed_text, str) and transcribed_text.lower() in ['', 'none', 'null', 'error', 'undefined']): | |
| return "I couldn't hear any speech clearly. Please try recording again and speak more loudly." | |
| # Ensure transcribed_text is a string before further processing | |
| if not isinstance(transcribed_text, str): | |
| return "I couldn't hear any speech clearly. Please try recording again and speak more loudly." | |
| # Check for common error messages from the API | |
| error_indicators = ['error', 'failed', 'could not', 'unable to', 'timeout'] | |
| if any(indicator in transcribed_text.lower() for indicator in error_indicators): | |
| return "Transcription service had an issue. Please try recording again." | |
| # Clean up the transcribed text | |
| transcribed_text = transcribed_text.replace('\n', ' ').replace('\t', ' ') | |
| # Remove extra whitespace | |
| transcribed_text = ' '.join(transcribed_text.split()) | |
| if len(transcribed_text) < 3: | |
| return "The recording was too short or unclear. Please try reading more slowly and clearly." | |
| print(f"Final transcribed text: {transcribed_text}") | |
| return transcribed_text | |
| except ImportError as e: | |
| print(f"Import error: {str(e)}") | |
| return "Error: Missing required libraries. Please check your installation." | |
| except ConnectionError as e: | |
| print(f"Connection error: {str(e)}") | |
| return "Network connection error. Please check your internet connection and try again." | |
| except TimeoutError as e: | |
| print(f"Timeout error: {str(e)}") | |
| return "Transcription service is taking too long. Please try again with a shorter recording." | |
| except Exception as e: | |
| print(f"Unexpected transcription error: {str(e)}") | |
| error_msg = str(e).lower() | |
| # Provide helpful error messages based on the error type | |
| if "timeout" in error_msg or "connection" in error_msg: | |
| return "Network timeout. Please check your internet connection and try again." | |
| elif "file" in error_msg or "path" in error_msg: | |
| return "Audio file error. Please try recording again." | |
| elif "api" in error_msg or "client" in error_msg or "gradio" in error_msg: | |
| return "Transcription service temporarily unavailable. Please try again in a moment." | |
| elif "memory" in error_msg or "size" in error_msg: | |
| return "Audio file is too large or complex. Please try with a shorter recording." | |
| else: | |
| return f"Transcription failed. Please try recording again. If the problem persists, try speaking more clearly." | |
| def compare_texts_for_feedback(original: str, spoken: str) -> str: | |
| """ | |
| Compare the original and spoken text, provide age-appropriate feedback with pronunciation help. | |
| Agentic feedback system that adapts to student needs. | |
| Args: | |
| original (str): The original story text. | |
| spoken (str): The student's transcribed reading. | |
| Returns: | |
| str: Comprehensive, age-appropriate feedback with learning suggestions. | |
| """ | |
| # Check if the spoken text is too short to be meaningful | |
| if not spoken or len(spoken.split()) < 3: | |
| return "⚠️ Your reading was too short. Please try reading the complete story." | |
| # Clean and process text | |
| orig_words = [w.strip(".,!?;:\"'").lower() for w in original.split() if w.strip()] | |
| spoken_words = [w.strip(".,!?;:\"'").lower() for w in spoken.split() if w.strip()] | |
| # Set minimum threshold for overall matching - if nothing matches at all, | |
| # it's likely the student read something completely different | |
| common_words = set(orig_words).intersection(set(spoken_words)) | |
| if len(common_words) < max(2, len(orig_words) * 0.1): # At least 2 words or 10% must match | |
| return "⚠️ I couldn't recognize enough words from the story. Please try reading the story text shown on the screen.\n\nReading accuracy: 0.0%" | |
| # Calculate accuracy using sequence matching | |
| matcher = SequenceMatcher(None, orig_words, spoken_words, autojunk=False) | |
| accuracy = matcher.ratio() * 100 | |
| # Identify different types of errors using context-aware approach | |
| # Use difflib to get a more accurate understanding of missed words in context | |
| import difflib | |
| d = difflib.Differ() | |
| diff = list(d.compare([w.lower() for w in original.split() if w.strip()], | |
| [w.lower() for w in spoken.split() if w.strip()])) | |
| missed_words = [] | |
| for word in diff: | |
| if word.startswith('- '): # Words in original but not in spoken | |
| clean_word = word[2:].strip(".,!?;:\"'").lower() | |
| if clean_word and len(clean_word) > 1: # Avoid punctuation | |
| missed_words.append(clean_word) | |
| # Convert to set to remove duplicates but preserve order for important words | |
| missed_words_set = set(missed_words) | |
| # Extra words (might be mispronunciations or additions) | |
| extra_words = set(spoken_words) - set(orig_words) | |
| # Find mispronounced words (words that sound similar but are different) | |
| mispronounced = find_similar_words(orig_words, spoken_words) | |
| # Prioritize important words (like nouns, longer words) if available | |
| important_missed = [w for w in missed_words if len(w) > 4] | |
| if important_missed: | |
| missed_words_set = set(important_missed) | set([w for w in missed_words if w not in important_missed][:3]) | |
| # Generate age-appropriate feedback | |
| return generate_adaptive_feedback(accuracy, missed_words_set, extra_words, mispronounced, len(orig_words)) | |
| def find_similar_words(original_words: list, spoken_words: list) -> list: | |
| """ | |
| Find words that might be mispronounced (similar but not exact matches). | |
| Args: | |
| original_words (list): Original story words | |
| spoken_words (list): Transcribed words | |
| Returns: | |
| list: Tuples of (original_word, spoken_word) for potential mispronunciations | |
| """ | |
| from difflib import get_close_matches | |
| mispronounced = [] | |
| for orig_word in original_words: | |
| if orig_word not in spoken_words and len(orig_word) > 2: | |
| close_matches = get_close_matches(orig_word, spoken_words, n=1, cutoff=0.6) | |
| if close_matches: | |
| mispronounced.append((orig_word, close_matches[0])) | |
| return mispronounced[:5] | |
| def generate_adaptive_feedback(accuracy: float, missed_words: set, extra_words: set, | |
| mispronounced: list, total_words: int) -> str: | |
| """ | |
| Generate age-appropriate, encouraging feedback with specific learning guidance. | |
| Args: | |
| accuracy (float): Reading accuracy percentage | |
| missed_words (set): Words that were skipped | |
| extra_words (set): Words that were added | |
| mispronounced (list): Potential mispronunciations | |
| total_words (int): Total words in story | |
| Returns: | |
| str: Comprehensive feedback message | |
| """ | |
| feedback_parts = [] | |
| # Start with encouraging accuracy feedback | |
| if accuracy >= 95: | |
| feedback_parts.append("🌟 AMAZING! You read almost perfectly!") | |
| elif accuracy >= 85: | |
| feedback_parts.append("🎉 GREAT JOB! You're doing wonderful!") | |
| elif accuracy >= 70: | |
| feedback_parts.append("👍 GOOD WORK! You're getting better!") | |
| elif accuracy >= 50: | |
| feedback_parts.append("😊 NICE TRY! Keep practicing!") | |
| else: | |
| feedback_parts.append("🚀 GREAT START! Every practice makes you better!") | |
| feedback_parts.append(f"Reading accuracy: {accuracy:.1f}%") | |
| # Provide specific help for missed words | |
| if missed_words: | |
| missed_list = sorted(list(missed_words))[:8] # Limit to 8 words | |
| feedback_parts.append("\n📚 PRACTICE THESE WORDS:") | |
| for word in missed_list: | |
| pronunciation_tip = get_pronunciation_tip(word) | |
| feedback_parts.append(f"• {word.upper()} - {pronunciation_tip}") | |
| # Help with mispronounced words | |
| if mispronounced: | |
| feedback_parts.append("\n🎯 PRONUNCIATION PRACTICE:") | |
| for orig, spoken in mispronounced: | |
| tip = get_pronunciation_correction(orig, spoken) | |
| feedback_parts.append(f"• {orig.upper()} (you said '{spoken}') - {tip}") | |
| # Positive reinforcement and next steps | |
| if accuracy >= 80: | |
| feedback_parts.append("\n🏆 You're ready for more challenging stories!") | |
| elif accuracy >= 60: | |
| feedback_parts.append("\n💪 Try reading this story again to improve your score!") | |
| else: | |
| feedback_parts.append("\n🌱 Let's practice with shorter, simpler stories first!") | |
| return "\n".join(feedback_parts) | |
| def get_pronunciation_tip(word: str) -> str: | |
| """ | |
| Generate pronunciation tips for difficult words. | |
| Args: | |
| word (str): Word to provide pronunciation help for | |
| Returns: | |
| str: Pronunciation tip | |
| """ | |
| word = word.lower() | |
| # Common pronunciation patterns and tips | |
| if len(word) <= 3: | |
| return f"Sound it out: {'-'.join(word)}" | |
| elif word.endswith('tion'): | |
| return "Ends with 'shun' sound" | |
| elif word.endswith('sion'): | |
| return "Ends with 'zhun' or 'shun' sound" | |
| elif word.endswith('ed'): | |
| if word[-3] in 'td': | |
| return "Past tense - ends with 'ed' sound" | |
| else: | |
| return "Past tense - ends with 'd' sound" | |
| elif 'th' in word: | |
| return "Put your tongue between your teeth for 'th'" | |
| elif 'ch' in word: | |
| return "Make the 'ch' sound like in 'cheese'" | |
| elif 'sh' in word: | |
| return "Make the 'sh' sound like in 'ship'" | |
| elif word.startswith('kn'): | |
| return "The 'k' is silent, start with the 'n' sound" | |
| elif word.startswith('ph'): | |
| return "The 'ph' makes an 'f' sound" | |
| elif word.startswith('wh'): | |
| return "Starts with 'w' sound (like 'when')" | |
| elif word.endswith('igh'): | |
| return "The 'igh' makes a long 'i' sound like in 'night'" | |
| elif 'ou' in word: | |
| return "The 'ou' often sounds like 'ow' in 'cow'" | |
| elif 'ai' in word: | |
| return "The 'ai' makes the long 'a' sound" | |
| elif 'ea' in word: | |
| return "The 'ea' usually makes the long 'e' sound" | |
| elif len(word) >= 6: | |
| # Break longer words into syllables | |
| return f"Break it down: {break_into_syllables(word)}" | |
| else: | |
| return f"Sound it out slowly: {'-'.join(word[:len(word)//2])}-{'-'.join(word[len(word)//2:])}" | |
| def get_pronunciation_correction(original: str, spoken: str) -> str: | |
| """ | |
| Provide specific correction for mispronounced words. | |
| Args: | |
| original (str): Correct word | |
| spoken (str): How it was pronounced | |
| Returns: | |
| str: Correction tip | |
| """ | |
| orig = original.lower() | |
| spok = spoken.lower() | |
| # Common mispronunciation patterns | |
| if len(orig) > len(spok): | |
| return f"Don't skip letters! Say all sounds in '{orig}'" | |
| elif len(spok) > len(orig): | |
| return f"Not too fast! The word is just '{orig}'" | |
| elif orig[0] != spok[0]: | |
| return f"Starts with '{orig[0]}' sound, not '{spok[0]}'" | |
| elif orig[-1] != spok[-1]: | |
| return f"Ends with '{orig[-1]}' sound" | |
| # Check for vowel confusion | |
| orig_vowels = [c for c in orig if c in 'aeiou'] | |
| spok_vowels = [c for c in spok if c in 'aeiou'] | |
| if orig_vowels != spok_vowels: | |
| # Find the first different vowel | |
| for i in range(min(len(orig_vowels), len(spok_vowels))): | |
| if orig_vowels[i] != spok_vowels[i]: | |
| vowel_map = { | |
| 'a': "ah (like in 'cat')", | |
| 'e': "eh (like in 'bed')", | |
| 'i': "ih (like in 'sit')", | |
| 'o': "oh (like in 'hot')", | |
| 'u': "uh (like in 'cup')" | |
| } | |
| correct_sound = vowel_map.get(orig_vowels[i], f"'{orig_vowels[i]}'") | |
| wrong_sound = vowel_map.get(spok_vowels[i], f"'{spok_vowels[i]}'") | |
| return f"Say the vowel sound as {correct_sound}, not {wrong_sound}" | |
| # Default case | |
| return f"Listen carefully: '{orig}' - try saying it slower" | |
| def break_into_syllables(word: str) -> str: | |
| """ | |
| Improved syllable breaking for pronunciation help. | |
| Args: | |
| word (str): Word to break into syllables | |
| Returns: | |
| str: Word broken into syllables | |
| """ | |
| vowels = 'aeiouy' | |
| word = word.lower() | |
| syllables = [] | |
| current_syllable = '' | |
| consonant_cluster = '' | |
| # Handle common prefixes | |
| common_prefixes = ['re', 'pre', 'un', 'in', 'im', 'dis', 'mis', 'non', 'sub', 'inter', 'ex'] | |
| for prefix in common_prefixes: | |
| if word.startswith(prefix) and len(word) > len(prefix) + 1: | |
| syllables.append(prefix) | |
| word = word[len(prefix):] | |
| break | |
| # Handle common suffixes | |
| common_suffixes = ['ing', 'ed', 'er', 'est', 'ly', 'ful', 'ness', 'less', 'ment', 'able', 'ible'] | |
| for suffix in common_suffixes: | |
| if word.endswith(suffix) and len(word) > len(suffix) + 1: | |
| suffix_syllable = suffix | |
| word = word[:-len(suffix)] | |
| syllables.append(word) | |
| syllables.append(suffix_syllable) | |
| return '-'.join(syllables) | |
| # Process the word character by character | |
| i = 0 | |
| while i < len(word): | |
| char = word[i] | |
| # If we encounter a vowel | |
| if char in vowels: | |
| # Start or add to a syllable | |
| if consonant_cluster: | |
| # For consonant clusters, we generally add one consonant to the current syllable | |
| # and move the rest to the next syllable | |
| if len(consonant_cluster) > 1: | |
| if current_syllable: # If we already have a syllable started | |
| current_syllable += consonant_cluster[0] | |
| syllables.append(current_syllable) | |
| current_syllable = consonant_cluster[1:] + char | |
| else: # For starting consonant clusters | |
| current_syllable = consonant_cluster + char | |
| else: # Single consonant | |
| current_syllable += consonant_cluster + char | |
| consonant_cluster = '' | |
| else: | |
| current_syllable += char | |
| # Check for vowel pairs that should stay together | |
| if i < len(word) - 1 and word[i+1] in vowels: | |
| vowel_pairs = ['ea', 'ee', 'oo', 'ou', 'ie', 'ai', 'oa'] | |
| if word[i:i+2] in vowel_pairs: | |
| current_syllable += word[i+1] | |
| i += 1 # Skip the next vowel since we've added it | |
| else: # Consonant | |
| if current_syllable: # If we have an open syllable | |
| if i < len(word) - 1 and word[i+1] not in vowels: # Consonant cluster | |
| consonant_cluster += char | |
| else: # Single consonant followed by vowel | |
| current_syllable += char | |
| else: # Starting with consonant or building consonant cluster | |
| consonant_cluster += char | |
| # Handle end of word or ready to break syllable | |
| if i == len(word) - 1 or (char in vowels and i < len(word) - 1 and word[i+1] not in vowels): | |
| if current_syllable: | |
| syllables.append(current_syllable) | |
| current_syllable = '' | |
| i += 1 | |
| # Add any remaining parts | |
| if consonant_cluster: | |
| if syllables: | |
| syllables[-1] += consonant_cluster | |
| else: | |
| syllables.append(consonant_cluster) | |
| if current_syllable: | |
| syllables.append(current_syllable) | |
| # Special case handling | |
| result = '-'.join(syllables) if syllables else word | |
| # If we ended up with no breaks, provide a simpler approach | |
| if result == word and len(word) > 3: | |
| # Simple fallback: break after every other letter | |
| syllables = [word[i:i+2] for i in range(0, len(word), 2)] | |
| result = '-'.join(syllables) | |
| return result | |
| def generate_targeted_story(previous_feedback: str, name: str, grade: str, missed_words: list = None) -> str: | |
| """ | |
| Generate a new story that specifically targets words the student struggled with. | |
| Agentic story generation based on learning gaps. | |
| Args: | |
| previous_feedback (str): Previous reading feedback | |
| name (str): Student's name | |
| grade (str): Student's grade level | |
| missed_words (list): Words the student had trouble with | |
| Returns: | |
| str: New targeted story for practice | |
| """ | |
| grade_num = int(''.join(filter(str.isdigit, grade)) or "3") | |
| age = grade_num + 5 | |
| # Dynamically determine story parameters based on grade - match the same criteria as main story generation | |
| if grade_num <= 2: | |
| # Grades 1-2: Very simple stories | |
| story_length = "2-3 short sentences" | |
| vocabulary_level = "very simple words (mostly 1-2 syllables)" | |
| sentence_structure = "short, simple sentences" | |
| complexity = "basic concepts" | |
| reading_level = "beginner" | |
| elif grade_num <= 4: | |
| # Grades 3-4: Intermediate stories | |
| story_length = "1-2 short paragraphs" | |
| vocabulary_level = "age-appropriate words with some longer words" | |
| sentence_structure = "mix of simple and compound sentences" | |
| complexity = "intermediate concepts with some detail" | |
| reading_level = "intermediate" | |
| else: | |
| # Grades 5-6: More advanced stories | |
| story_length = "2-3 paragraphs" | |
| vocabulary_level = "varied vocabulary including descriptive words" | |
| sentence_structure = "complex sentences with descriptive language" | |
| complexity = "detailed concepts and explanations" | |
| reading_level = "advanced elementary" | |
| # Extract difficulty level from previous feedback | |
| if "AMAZING" in previous_feedback or "accuracy: 9" in previous_feedback: | |
| difficulty_adjustment = "slightly more challenging but still within grade level" | |
| focus_area = "new vocabulary and longer sentences" | |
| elif "GOOD" in previous_feedback or "accuracy: 8" in previous_feedback: | |
| difficulty_adjustment = "similar level with some new words" | |
| focus_area = "reinforcing current skills" | |
| else: | |
| difficulty_adjustment = "slightly simpler but still grade-appropriate" | |
| focus_area = "basic vocabulary and simple sentences" | |
| # Create targeted practice words | |
| if missed_words: | |
| practice_words = missed_words[:5] # Focus on top 5 missed words | |
| word_focus = f"Include and repeat these practice words: {', '.join(practice_words)}" | |
| else: | |
| word_focus = "Focus on common sight words for this grade level" | |
| # Generate adaptive prompt | |
| prompt = f""" | |
| You are an expert reading coach creating a personalized story for {name}, a {age}-year-old in {grade}. | |
| GRADE LEVEL: {grade} ({reading_level} level) | |
| STORY SPECIFICATIONS: | |
| - Length: {story_length} | |
| - Vocabulary: {vocabulary_level} | |
| - Sentence structure: {sentence_structure} | |
| - Complexity: {complexity} | |
| LEARNING ADAPTATION: | |
| - Make this story {difficulty_adjustment} | |
| - Focus on: {focus_area} | |
| - {word_focus} | |
| STORY REQUIREMENTS: | |
| - Feature {name} as the main character | |
| - Include an engaging adventure or discovery theme | |
| - Naturally incorporate the practice words multiple times | |
| - Make it fun and encouraging | |
| - End with {name} feeling proud and accomplished | |
| Create a story that helps {name} practice the words they found challenging while building confidence. | |
| Story: | |
| """ | |
| # Generate targeted story | |
| max_tokens = 300 if grade_num <= 2 else 600 if grade_num <= 4 else 1000 | |
| generation_config = genai.GenerationConfig( | |
| temperature=0.7, | |
| max_output_tokens=max_tokens, | |
| top_p=0.9, | |
| ) | |
| response = gemini_client.models.generate_content( | |
| model="gemini-2.5-flash", | |
| contents=[prompt], | |
| generation_config=generation_config | |
| ) | |
| return response.text.strip() | |
| class SessionManager: | |
| """Manages student sessions and progress tracking""" | |
| def __init__(self): | |
| self.sessions = {} | |
| self.student_progress = {} | |
| def start_session(self, student_name: str, grade: str) -> str: | |
| """Start a new reading session for a student""" | |
| session_id = f"{student_name}_{int(time.time())}" | |
| self.sessions[session_id] = { | |
| "student_name": student_name, | |
| "grade": grade, | |
| "start_time": time.time(), | |
| "stories_read": 0, | |
| "total_accuracy": 0, | |
| "feedback_history": [] | |
| } | |
| return session_id | |
| def get_session(self, session_id: str) -> dict: | |
| """Get session data""" | |
| return self.sessions.get(session_id, {}) | |
| def update_session(self, session_id: str, accuracy: float, feedback: str): | |
| """Update session with reading results""" | |
| if session_id in self.sessions: | |
| session = self.sessions[session_id] | |
| session["stories_read"] += 1 | |
| session["total_accuracy"] += accuracy | |
| session["feedback_history"].append({ | |
| "timestamp": time.time(), | |
| "accuracy": accuracy, | |
| "feedback": feedback | |
| }) | |
| class ReadingCoachAgent: | |
| """ | |
| Main agent class that provides the interface for the reading coach system. | |
| Wraps the individual tool functions and manages student sessions. | |
| """ | |
| def __init__(self): | |
| self.session_manager = SessionManager() | |
| self.current_session = None | |
| self.current_story = "" | |
| self.student_info = {"name": "", "grade": ""} | |
| def generate_story_for_student(self, name: str, grade: str, topic: str) -> str: | |
| """Generate a story for a student and start/update session""" | |
| # Store student info | |
| self.student_info = {"name": name, "grade": grade} | |
| # Start or update session | |
| session_id = self.session_manager.start_session(name, grade) | |
| self.current_session = session_id | |
| # Generate story using the tool function | |
| story = generate_story(name, grade, topic) | |
| self.current_story = story | |
| return story | |
| def create_audio_from_story(self, story: str) -> str: | |
| """Convert story to audio using TTS""" | |
| return text_to_speech(story) | |
| def analyze_student_reading(self, audio_path: str) -> tuple: | |
| """Analyze student's reading and provide feedback""" | |
| # Transcribe the audio | |
| transcribed_text = transcribe_audio(audio_path) | |
| # Check if the transcribed text is an error message or empty | |
| if transcribed_text.startswith("Error:") or transcribed_text.startswith("I couldn't hear") or len(transcribed_text.strip()) < 3: | |
| # Return a helpful message instead of giving feedback with accuracy points | |
| error_feedback = "⚠️ I couldn't hear your reading clearly. Please try again and make sure to:\n" | |
| error_feedback += "• Speak clearly and at a normal pace\n" | |
| error_feedback += "• Make sure your microphone is working properly\n" | |
| error_feedback += "• Try reading in a quieter environment\n" | |
| error_feedback += "• Read the complete story from beginning to end\n\n" | |
| error_feedback += "Reading accuracy: 0.0%" | |
| return transcribed_text, error_feedback, 0.0 | |
| # Compare with original story and get feedback | |
| feedback = compare_texts_for_feedback(self.current_story, transcribed_text) | |
| # Extract accuracy from feedback | |
| accuracy = self._extract_accuracy_from_feedback(feedback) | |
| # Update session if we have one | |
| if self.current_session: | |
| self.session_manager.update_session(self.current_session, accuracy, feedback) | |
| return transcribed_text, feedback, accuracy | |
| def generate_new_passage(self, topic: str) -> str: | |
| """Generate a new passage with the current student info""" | |
| if not self.student_info["name"] or not self.student_info["grade"]: | |
| raise ValueError("No active session. Please start a new session first.") | |
| # Generate new story | |
| story = generate_story(self.student_info["name"], self.student_info["grade"], topic) | |
| self.current_story = story | |
| return story | |
| def generate_practice_story(self, name: str, grade: str) -> str: | |
| """Generate a new targeted practice story based on previous feedback""" | |
| if not self.student_info.get("name") or not self.student_info.get("grade"): | |
| # Use provided parameters if student info is not available | |
| name = name or "Student" | |
| grade = grade or "Grade 3" | |
| else: | |
| name = self.student_info["name"] | |
| grade = self.student_info["grade"] | |
| # Get the last feedback to personalize the practice story | |
| last_feedback = "" | |
| missed_words_list = [] | |
| # Extract missed words from feedback if available | |
| if self.current_session: | |
| session_data = self.session_manager.get_session(self.current_session) | |
| if session_data and "feedback_history" in session_data and session_data["feedback_history"]: | |
| last_feedback = session_data["feedback_history"][-1]["feedback"] | |
| # Extract missed words from the feedback | |
| import re | |
| if "PRACTICE THESE WORDS:" in last_feedback: | |
| # Find all words that appear after bullet points | |
| matches = re.findall(r'• ([A-Z]+)', last_feedback) | |
| missed_words_list = [word.lower() for word in matches] | |
| # Generate a new practice story using the targeted story function | |
| practice_story = generate_targeted_story(last_feedback, name, grade, missed_words_list) | |
| self.current_story = practice_story | |
| return practice_story | |
| def clear_session(self): | |
| """Clear current session""" | |
| self.current_session = None | |
| self.current_story = "" | |
| self.student_info = {"name": "", "grade": ""} | |
| def reset_all_data(self): | |
| """Reset all current session state but keep tracked sessions.""" | |
| self.clear_session() | |
| def _extract_accuracy_from_feedback(self, feedback: str) -> float: | |
| """Extract accuracy percentage from feedback text""" | |
| import re | |
| # Look for "Reading accuracy: XX.X%" pattern in feedback | |
| match = re.search(r'Reading accuracy:\s*(\d+\.?\d*)%', feedback) | |
| if match: | |
| return float(match.group(1)) | |
| return 0.0 | |
| def _extract_missed_words_from_feedback(feedback: str) -> list: | |
| """ | |
| Extract missed words from feedback text. | |
| Args: | |
| feedback (str): Feedback text containing missed words | |
| Returns: | |
| list: List of missed words | |
| """ | |
| import re | |
| missed_words = [] | |
| # Check if feedback contains practice words section | |
| if "PRACTICE THESE WORDS:" in feedback: | |
| # Extract the section with practice words | |
| practice_section = feedback.split("PRACTICE THESE WORDS:")[1].split("\n")[1:] | |
| # Extract words that appear after bullet points | |
| for line in practice_section: | |
| if "•" in line and "-" in line: | |
| # Extract word before the dash | |
| match = re.search(r'• ([A-Z]+) -', line) | |
| if match: | |
| missed_words.append(match.group(1).lower()) | |
| # If we also have mispronounced words, add them too | |
| if "PRONUNCIATION PRACTICE:" in feedback: | |
| pronun_section = feedback.split("PRONUNCIATION PRACTICE:")[1].split("\n")[1:] | |
| for line in pronun_section: | |
| if "•" in line and "(you said" in line: | |
| match = re.search(r'• ([A-Z]+) \(you said', line) | |
| if match: | |
| missed_words.append(match.group(1).lower()) | |
| return missed_words | |