File size: 7,439 Bytes
dbf2148 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
import io
import re
import time
import asyncio
from typing import List, Optional
from gtts import gTTS
import edge_tts
from config.settings import settings
from models.schemas import TTSRequest
class EnhancedTTSService:
def __init__(self):
self.supported_languages = settings.SUPPORTED_LANGUAGES
self.max_chunk_length = settings.MAX_CHUNK_LENGTH
def detect_language(self, text: str) -> str:
"""Đơn giản phát hiện ngôn ngữ dựa trên ký tự"""
vietnamese_chars = set('àáâãèéêìíòóôõùúýăđĩũơưạảấầẩẫậắằẳẵặẹẻẽếềểễệỉịọỏốồổỗộớờởỡợụủứừửữựỳỵỷỹ')
if any(char in vietnamese_chars for char in text.lower()):
return 'vi'
elif any(char in text for char in 'あいうえお'):
return 'ja'
elif any(char in text for char in '你好'):
return 'zh'
elif any(char in text for char in '안녕'):
return 'ko'
else:
return 'en'
def split_text_into_chunks(self, text: str, max_length: int = None) -> List[str]:
"""Chia văn bản thành các đoạn nhỏ cho TTS"""
if max_length is None:
max_length = self.max_chunk_length
sentences = re.split(r'[.!?]+', text)
chunks = []
current_chunk = ""
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
if len(sentence) > max_length:
parts = re.split(r'[,;:]', sentence)
for part in parts:
part = part.strip()
if not part:
continue
if len(current_chunk) + len(part) + 2 <= max_length:
if current_chunk:
current_chunk += ". " + part
else:
current_chunk = part
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = part
else:
if len(current_chunk) + len(sentence) + 2 <= max_length:
if current_chunk:
current_chunk += ". " + sentence
else:
current_chunk = sentence
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
if current_chunk:
chunks.append(current_chunk)
return chunks
def text_to_speech_gtts(self, text: str, language: str = 'vi') -> Optional[bytes]:
"""Sử dụng gTTS (Google Text-to-Speech) library"""
try:
chunks = self.split_text_into_chunks(text)
audio_chunks = []
for chunk in chunks:
if not chunk.strip():
continue
tts = gTTS(text=chunk, lang=language, slow=False)
audio_buffer = io.BytesIO()
tts.write_to_fp(audio_buffer)
audio_buffer.seek(0)
audio_chunks.append(audio_buffer.read())
time.sleep(0.1)
if audio_chunks:
return b''.join(audio_chunks)
return None
except Exception as e:
print(f"❌ Lỗi gTTS: {e}")
return None
async def text_to_speech_edgetts(self, text: str, voice: str = 'vi-VN-NamMinhNeural') -> Optional[bytes]:
"""Sử dụng Edge-TTS (Microsoft Edge) - async version"""
try:
communicate = edge_tts.Communicate(text, voice)
audio_buffer = io.BytesIO()
async for chunk in communicate.stream():
if chunk["type"] == "audio":
audio_buffer.write(chunk["data"])
audio_buffer.seek(0)
return audio_buffer.read()
except Exception as e:
print(f"❌ Lỗi Edge-TTS: {e}")
return None
def text_to_speech_edgetts_sync(self, text: str, voice: str = 'vi-VN-NamMinhNeural') -> Optional[bytes]:
"""Sync wrapper for Edge-TTS"""
try:
return asyncio.run(self.text_to_speech_edgetts(text, voice))
except Exception as e:
print(f"❌ Lỗi Edge-TTS sync: {e}")
return None
def text_to_speech(self, text: str, language: str = None, provider: str = "auto") -> Optional[bytes]:
"""Chuyển văn bản thành giọng nói với nhiều nhà cung cấp"""
if not text or len(text.strip()) == 0:
return None
if language is None:
language = self.detect_language(text)
text = self.clean_text(text)
try:
if provider == "auto" or provider == "gtts":
print(f"🔊 Đang sử dụng gTTS cho văn bản {len(text)} ký tự...")
audio_bytes = self.text_to_speech_gtts(text, language)
if audio_bytes:
return audio_bytes
if provider == "auto" or provider == "edgetts":
print(f"🔊 Đang thử Edge-TTS cho văn bản {len(text)} ký tự...")
voice_map = {
'vi': 'vi-VN-NamMinhNeural',
'en': 'en-US-AriaNeural',
'fr': 'fr-FR-DeniseNeural',
'es': 'es-ES-ElviraNeural',
'de': 'de-DE-KatjaNeural',
'ja': 'ja-JP-NanamiNeural',
'ko': 'ko-KR-SunHiNeural',
'zh': 'zh-CN-XiaoxiaoNeural'
}
voice = voice_map.get(language, 'vi-VN-NamMinhNeural')
audio_bytes = self.text_to_speech_edgetts_sync(text, voice)
if audio_bytes:
return audio_bytes
return self.text_to_speech_gtts(text, language)
except Exception as e:
print(f"❌ Lỗi TTS tổng hợp: {e}")
return None
def clean_text(self, text: str) -> str:
"""Làm sạch văn bản trước khi chuyển thành giọng nói"""
text = re.sub(r'http\S+', '', text)
text = re.sub(r'[^\w\sàáâãèéêìíòóôõùúýăđĩũơưạảấầẩẫậắằẳẵặẹẻẽếềểễệỉịọỏốồổỗộớờởỡợụủứừửữựỳỵỷỹ.,!?;:()-]', '', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def save_audio_to_file(self, audio_bytes: bytes, filename: str = None) -> str:
"""Lưu audio bytes thành file tạm thời"""
if audio_bytes is None:
return None
if filename is None:
filename = f"tts_output_{int(time.time())}.mp3"
import os
temp_dir = "temp_audio"
os.makedirs(temp_dir, exist_ok=True)
filepath = os.path.join(temp_dir, filename)
with open(filepath, 'wb') as f:
f.write(audio_bytes)
return filepath |