| import io | |
| import re | |
| import time | |
| import asyncio | |
| from typing import List, Optional | |
| from gtts import gTTS | |
| import edge_tts | |
| from config.settings import settings | |
| from models.schemas import TTSRequest | |
| class EnhancedTTSService: | |
| def __init__(self): | |
| self.supported_languages = settings.SUPPORTED_LANGUAGES | |
| self.max_chunk_length = settings.MAX_CHUNK_LENGTH | |
| def detect_language(self, text: str) -> str: | |
| """Đơn giản phát hiện ngôn ngữ dựa trên ký tự""" | |
| vietnamese_chars = set('àáâãèéêìíòóôõùúýăđĩũơưạảấầẩẫậắằẳẵặẹẻẽếềểễệỉịọỏốồổỗộớờởỡợụủứừửữựỳỵỷỹ') | |
| if any(char in vietnamese_chars for char in text.lower()): | |
| return 'vi' | |
| elif any(char in text for char in 'あいうえお'): | |
| return 'ja' | |
| elif any(char in text for char in '你好'): | |
| return 'zh' | |
| elif any(char in text for char in '안녕'): | |
| return 'ko' | |
| else: | |
| return 'en' | |
| def split_text_into_chunks(self, text: str, max_length: int = None) -> List[str]: | |
| """Chia văn bản thành các đoạn nhỏ cho TTS""" | |
| if max_length is None: | |
| max_length = self.max_chunk_length | |
| sentences = re.split(r'[.!?]+', text) | |
| chunks = [] | |
| current_chunk = "" | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| if len(sentence) > max_length: | |
| parts = re.split(r'[,;:]', sentence) | |
| for part in parts: | |
| part = part.strip() | |
| if not part: | |
| continue | |
| if len(current_chunk) + len(part) + 2 <= max_length: | |
| if current_chunk: | |
| current_chunk += ". " + part | |
| else: | |
| current_chunk = part | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| current_chunk = part | |
| else: | |
| if len(current_chunk) + len(sentence) + 2 <= max_length: | |
| if current_chunk: | |
| current_chunk += ". " + sentence | |
| else: | |
| current_chunk = sentence | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| current_chunk = sentence | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| return chunks | |
| def text_to_speech_gtts(self, text: str, language: str = 'vi') -> Optional[bytes]: | |
| """Sử dụng gTTS (Google Text-to-Speech) library""" | |
| try: | |
| chunks = self.split_text_into_chunks(text) | |
| audio_chunks = [] | |
| for chunk in chunks: | |
| if not chunk.strip(): | |
| continue | |
| tts = gTTS(text=chunk, lang=language, slow=False) | |
| audio_buffer = io.BytesIO() | |
| tts.write_to_fp(audio_buffer) | |
| audio_buffer.seek(0) | |
| audio_chunks.append(audio_buffer.read()) | |
| time.sleep(0.1) | |
| if audio_chunks: | |
| return b''.join(audio_chunks) | |
| return None | |
| except Exception as e: | |
| print(f"❌ Lỗi gTTS: {e}") | |
| return None | |
| async def text_to_speech_edgetts(self, text: str, voice: str = 'vi-VN-NamMinhNeural') -> Optional[bytes]: | |
| """Sử dụng Edge-TTS (Microsoft Edge) - async version""" | |
| try: | |
| communicate = edge_tts.Communicate(text, voice) | |
| audio_buffer = io.BytesIO() | |
| async for chunk in communicate.stream(): | |
| if chunk["type"] == "audio": | |
| audio_buffer.write(chunk["data"]) | |
| audio_buffer.seek(0) | |
| return audio_buffer.read() | |
| except Exception as e: | |
| print(f"❌ Lỗi Edge-TTS: {e}") | |
| return None | |
| def text_to_speech_edgetts_sync(self, text: str, voice: str = 'vi-VN-NamMinhNeural') -> Optional[bytes]: | |
| """Sync wrapper for Edge-TTS""" | |
| try: | |
| return asyncio.run(self.text_to_speech_edgetts(text, voice)) | |
| except Exception as e: | |
| print(f"❌ Lỗi Edge-TTS sync: {e}") | |
| return None | |
| def text_to_speech(self, text: str, language: str = None, provider: str = "auto") -> Optional[bytes]: | |
| """Chuyển văn bản thành giọng nói với nhiều nhà cung cấp""" | |
| if not text or len(text.strip()) == 0: | |
| return None | |
| if language is None: | |
| language = self.detect_language(text) | |
| text = self.clean_text(text) | |
| try: | |
| if provider == "auto" or provider == "gtts": | |
| print(f"🔊 Đang sử dụng gTTS cho văn bản {len(text)} ký tự...") | |
| audio_bytes = self.text_to_speech_gtts(text, language) | |
| if audio_bytes: | |
| return audio_bytes | |
| if provider == "auto" or provider == "edgetts": | |
| print(f"🔊 Đang thử Edge-TTS cho văn bản {len(text)} ký tự...") | |
| voice_map = { | |
| 'vi': 'vi-VN-NamMinhNeural', | |
| 'en': 'en-US-AriaNeural', | |
| 'fr': 'fr-FR-DeniseNeural', | |
| 'es': 'es-ES-ElviraNeural', | |
| 'de': 'de-DE-KatjaNeural', | |
| 'ja': 'ja-JP-NanamiNeural', | |
| 'ko': 'ko-KR-SunHiNeural', | |
| 'zh': 'zh-CN-XiaoxiaoNeural' | |
| } | |
| voice = voice_map.get(language, 'vi-VN-NamMinhNeural') | |
| audio_bytes = self.text_to_speech_edgetts_sync(text, voice) | |
| if audio_bytes: | |
| return audio_bytes | |
| return self.text_to_speech_gtts(text, language) | |
| except Exception as e: | |
| print(f"❌ Lỗi TTS tổng hợp: {e}") | |
| return None | |
| def clean_text(self, text: str) -> str: | |
| """Làm sạch văn bản trước khi chuyển thành giọng nói""" | |
| text = re.sub(r'http\S+', '', text) | |
| text = re.sub(r'[^\w\sàáâãèéêìíòóôõùúýăđĩũơưạảấầẩẫậắằẳẵặẹẻẽếềểễệỉịọỏốồổỗộớờởỡợụủứừửữựỳỵỷỹ.,!?;:()-]', '', text) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def save_audio_to_file(self, audio_bytes: bytes, filename: str = None) -> str: | |
| """Lưu audio bytes thành file tạm thời""" | |
| if audio_bytes is None: | |
| return None | |
| if filename is None: | |
| filename = f"tts_output_{int(time.time())}.mp3" | |
| import os | |
| temp_dir = "temp_audio" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| filepath = os.path.join(temp_dir, filename) | |
| with open(filepath, 'wb') as f: | |
| f.write(audio_bytes) | |
| return filepath |