import io import re import time import asyncio from typing import List, Optional from gtts import gTTS import edge_tts from config.settings import settings from models.schemas import TTSRequest class EnhancedTTSService: def __init__(self): self.supported_languages = settings.SUPPORTED_LANGUAGES self.max_chunk_length = settings.MAX_CHUNK_LENGTH def detect_language(self, text: str) -> str: """Đơn giản phát hiện ngôn ngữ dựa trên ký tự""" vietnamese_chars = set('àáâãèéêìíòóôõùúýăđĩũơưạảấầẩẫậắằẳẵặẹẻẽếềểễệỉịọỏốồổỗộớờởỡợụủứừửữựỳỵỷỹ') if any(char in vietnamese_chars for char in text.lower()): return 'vi' elif any(char in text for char in 'あいうえお'): return 'ja' elif any(char in text for char in '你好'): return 'zh' elif any(char in text for char in '안녕'): return 'ko' else: return 'en' def split_text_into_chunks(self, text: str, max_length: int = None) -> List[str]: """Chia văn bản thành các đoạn nhỏ cho TTS""" if max_length is None: max_length = self.max_chunk_length sentences = re.split(r'[.!?]+', text) chunks = [] current_chunk = "" for sentence in sentences: sentence = sentence.strip() if not sentence: continue if len(sentence) > max_length: parts = re.split(r'[,;:]', sentence) for part in parts: part = part.strip() if not part: continue if len(current_chunk) + len(part) + 2 <= max_length: if current_chunk: current_chunk += ". " + part else: current_chunk = part else: if current_chunk: chunks.append(current_chunk) current_chunk = part else: if len(current_chunk) + len(sentence) + 2 <= max_length: if current_chunk: current_chunk += ". " + sentence else: current_chunk = sentence else: if current_chunk: chunks.append(current_chunk) current_chunk = sentence if current_chunk: chunks.append(current_chunk) return chunks def text_to_speech_gtts(self, text: str, language: str = 'vi') -> Optional[bytes]: """Sử dụng gTTS (Google Text-to-Speech) library""" try: chunks = self.split_text_into_chunks(text) audio_chunks = [] for chunk in chunks: if not chunk.strip(): continue tts = gTTS(text=chunk, lang=language, slow=False) audio_buffer = io.BytesIO() tts.write_to_fp(audio_buffer) audio_buffer.seek(0) audio_chunks.append(audio_buffer.read()) time.sleep(0.1) if audio_chunks: return b''.join(audio_chunks) return None except Exception as e: print(f"❌ Lỗi gTTS: {e}") return None async def text_to_speech_edgetts(self, text: str, voice: str = 'vi-VN-NamMinhNeural') -> Optional[bytes]: """Sử dụng Edge-TTS (Microsoft Edge) - async version""" try: communicate = edge_tts.Communicate(text, voice) audio_buffer = io.BytesIO() async for chunk in communicate.stream(): if chunk["type"] == "audio": audio_buffer.write(chunk["data"]) audio_buffer.seek(0) return audio_buffer.read() except Exception as e: print(f"❌ Lỗi Edge-TTS: {e}") return None def text_to_speech_edgetts_sync(self, text: str, voice: str = 'vi-VN-NamMinhNeural') -> Optional[bytes]: """Sync wrapper for Edge-TTS""" try: return asyncio.run(self.text_to_speech_edgetts(text, voice)) except Exception as e: print(f"❌ Lỗi Edge-TTS sync: {e}") return None def text_to_speech(self, text: str, language: str = None, provider: str = "auto") -> Optional[bytes]: """Chuyển văn bản thành giọng nói với nhiều nhà cung cấp""" if not text or len(text.strip()) == 0: return None if language is None: language = self.detect_language(text) text = self.clean_text(text) try: if provider == "auto" or provider == "gtts": print(f"🔊 Đang sử dụng gTTS cho văn bản {len(text)} ký tự...") audio_bytes = self.text_to_speech_gtts(text, language) if audio_bytes: return audio_bytes if provider == "auto" or provider == "edgetts": print(f"🔊 Đang thử Edge-TTS cho văn bản {len(text)} ký tự...") voice_map = { 'vi': 'vi-VN-NamMinhNeural', 'en': 'en-US-AriaNeural', 'fr': 'fr-FR-DeniseNeural', 'es': 'es-ES-ElviraNeural', 'de': 'de-DE-KatjaNeural', 'ja': 'ja-JP-NanamiNeural', 'ko': 'ko-KR-SunHiNeural', 'zh': 'zh-CN-XiaoxiaoNeural' } voice = voice_map.get(language, 'vi-VN-NamMinhNeural') audio_bytes = self.text_to_speech_edgetts_sync(text, voice) if audio_bytes: return audio_bytes return self.text_to_speech_gtts(text, language) except Exception as e: print(f"❌ Lỗi TTS tổng hợp: {e}") return None def clean_text(self, text: str) -> str: """Làm sạch văn bản trước khi chuyển thành giọng nói""" text = re.sub(r'http\S+', '', text) text = re.sub(r'[^\w\sàáâãèéêìíòóôõùúýăđĩũơưạảấầẩẫậắằẳẵặẹẻẽếềểễệỉịọỏốồổỗộớờởỡợụủứừửữựỳỵỷỹ.,!?;:()-]', '', text) text = re.sub(r'\s+', ' ', text) return text.strip() def save_audio_to_file(self, audio_bytes: bytes, filename: str = None) -> str: """Lưu audio bytes thành file tạm thời""" if audio_bytes is None: return None if filename is None: filename = f"tts_output_{int(time.time())}.mp3" import os temp_dir = "temp_audio" os.makedirs(temp_dir, exist_ok=True) filepath = os.path.join(temp_dir, filename) with open(filepath, 'wb') as f: f.write(audio_bytes) return filepath def save_tts_audio(self, audio_bytes: bytes, filename: str = None) -> str: """Lưu audio bytes thành file tạm thời - tương thích với chat service""" if audio_bytes is None: return None if filename is None: import time filename = f"tts_output_{int(time.time())}.mp3" import os temp_dir = "temp_audio" os.makedirs(temp_dir, exist_ok=True) filepath = os.path.join(temp_dir, filename) with open(filepath, 'wb') as f: f.write(audio_bytes) return filepath