voicebot / services /streaming_voice_service.py
datbkpro's picture
Update services/streaming_voice_service.py
1067054 verified
# import io
# import numpy as np
# import soundfile as sf
# import time
# import traceback
# import threading
# from groq import Groq
# import queue
# from typing import Optional, Dict, Any, Callable
# from config.settings import settings
# from core.rag_system import EnhancedRAGSystem
# from core.tts_service import EnhancedTTSService
# from core.speechbrain_vad import SpeechBrainVAD
# from core.silero_vad import SileroVAD
# class StreamingVoiceService:
# def __init__(self, groq_client: Groq, rag_system, tts_service):
# self.client = groq_client
# self.rag_system = rag_system
# self.tts_service = tts_service
# # Khởi tạo VAD tối ưu với double buffer
# self.vad_processor = SileroVAD()
# self.is_listening = False
# self.speech_callback = None
# self.is_processing = False
# self.processing_lock = threading.Lock()
# # Conversation context
# self.conversation_history = []
# self.current_transcription = ""
# # Multi-thread processing
# self.response_queue = queue.Queue()
# self.processing_threads = []
# self.max_workers = 2 # Số lượng thread xử lý song song
# # Latency tracking
# self.latency_metrics = {
# 'asr': [],
# 'rag': [],
# 'llm': [],
# 'tts': [],
# 'total': []
# }
# # Audio buffer management
# self.pending_audio_segments = []
# self.segment_lock = threading.Lock()
# def start_listening(self, speech_callback: Callable) -> bool:
# """Bắt đầu lắng nghe với multi-thread processing"""
# if self.is_listening:
# return False
# self.speech_callback = speech_callback
# success = self.vad_processor.start_stream(self._on_speech_detected)
# if success:
# self.is_listening = True
# self.is_processing = False
# # Khởi động worker threads
# for i in range(self.max_workers):
# thread = threading.Thread(
# target=self._process_response_worker,
# daemon=True,
# name=f"ASR-Worker-{i}"
# )
# thread.start()
# self.processing_threads.append(thread)
# print("🎙️ Đã bắt đầu lắng nghe với multi-thread processing")
# return success
# def stop_listening(self):
# """Dừng lắng nghe"""
# self.vad_processor.stop_stream()
# self.is_listening = False
# self.is_processing = False
# self.speech_callback = None
# # Clear queue
# while not self.response_queue.empty():
# try:
# self.response_queue.get_nowait()
# self.response_queue.task_done()
# except queue.Empty:
# break
# print("🛑 Đã dừng lắng nghe")
# def _on_speech_detected(self, speech_audio: np.ndarray, sample_rate: int):
# """Callback khi VAD phát hiện speech - không blocking"""
# audio_duration = len(speech_audio) / sample_rate
# print(f"🎯 VAD phát hiện speech: {audio_duration:.2f}s - Đưa vào queue")
# # Đưa vào queue mà không chờ đợi
# self.response_queue.put((speech_audio, sample_rate))
# def _process_response_worker(self):
# """Worker thread xử lý speech segments"""
# while self.is_listening:
# try:
# # Chờ item từ queue với timeout
# speech_audio, sample_rate = self.response_queue.get(timeout=1.0)
# # Xử lý speech segment
# self._process_speech_segment(speech_audio, sample_rate)
# # Đánh dấu task hoàn thành
# self.response_queue.task_done()
# except queue.Empty:
# continue
# except Exception as e:
# print(f"❌ Lỗi trong worker thread: {e}")
# continue
# def _process_speech_segment(self, speech_audio: np.ndarray, sample_rate: int):
# """Xử lý speech segment - optimized version"""
# total_start_time = time.time()
# try:
# # 1. ASR Transcription
# asr_start = time.time()
# transcription = self._transcribe_audio_optimized(speech_audio, sample_rate)
# asr_latency = time.time() - asr_start
# if not transcription or len(transcription.strip()) < 2:
# print("⚠️ Transcription quá ngắn hoặc trống")
# return
# print(f"📝 Transcription: {transcription}")
# self.current_transcription = transcription
# # 2. AI Response Generation
# rag_start = time.time()
# response = self._generate_ai_response_optimized(transcription)
# rag_latency = time.time() - rag_start
# # 3. TTS Conversion
# tts_start = time.time()
# tts_audio_path = self._text_to_speech_optimized(response)
# tts_latency = time.time() - tts_start
# total_latency = time.time() - total_start_time
# # Log latency metrics
# self._log_latency_metrics({
# 'asr': asr_latency,
# 'rag': rag_latency,
# 'tts': tts_latency,
# 'total': total_latency
# })
# # Gửi kết quả đến callback
# if self.speech_callback:
# result = {
# 'transcription': transcription,
# 'response': response,
# 'tts_audio': tts_audio_path,
# 'status': 'completed',
# 'latency': total_latency
# }
# self.speech_callback(result)
# except Exception as e:
# print(f"❌ Lỗi xử lý speech segment: {e}")
# traceback.print_exc()
# if self.speech_callback:
# self.speech_callback({
# 'transcription': f"Lỗi: {str(e)}",
# 'response': "Xin lỗi, có lỗi xảy ra",
# 'tts_audio': None,
# 'status': 'error'
# })
# def _transcribe_audio_optimized(self, audio_data: np.ndarray, sample_rate: int) -> Optional[str]:
# """Chuyển audio -> text với tối ưu hiệu suất"""
# asr_start = time.time()
# try:
# # Chuẩn hóa audio data
# if audio_data.dtype != np.int16:
# if audio_data.dtype in [np.float32, np.float64]:
# audio_data = (audio_data * 32767).astype(np.int16)
# else:
# audio_data = audio_data.astype(np.int16)
# if audio_data.ndim > 1:
# audio_data = np.mean(audio_data, axis=1).astype(np.int16)
# # Resample nếu cần
# target_sample_rate = 16000
# if sample_rate != target_sample_rate:
# audio_data = self._resample_audio(audio_data, sample_rate, target_sample_rate)
# sample_rate = target_sample_rate
# # Giới hạn độ dài audio
# max_duration = 30
# max_samples = sample_rate * max_duration
# if len(audio_data) > max_samples:
# audio_data = audio_data[:max_samples]
# print(f"🔊 Gửi audio đến Whisper: {len(audio_data)} samples, {sample_rate}Hz")
# # Tạo temporary file trong memory
# buffer = io.BytesIO()
# sf.write(buffer, audio_data, sample_rate, format='wav', subtype='PCM_16')
# buffer.seek(0)
# # Gọi API Whisper với timeout
# api_start = time.time()
# try:
# transcription = self.client.audio.transcriptions.create(
# model=settings.WHISPER_MODEL,
# file=("speech.wav", buffer.read(), "audio/wav"),
# response_format="text",
# language="vi",
# temperature=0.0,
# )
# except Exception as e:
# print(f"❌ Lỗi Whisper API: {e}")
# return None
# api_latency = time.time() - api_start
# # Xử lý response
# if hasattr(transcription, 'text'):
# result = transcription.text.strip()
# elif isinstance(transcription, str):
# result = transcription.strip()
# else:
# result = str(transcription).strip()
# total_asr_latency = time.time() - asr_start
# print(f"✅ ASR Latency: {total_asr_latency:.2f}s (API: {api_latency:.2f}s)")
# # Kiểm tra chất lượng transcription
# if self._is_valid_transcription(result):
# return result
# else:
# return None
# except Exception as e:
# print(f"❌ Lỗi transcription: {e}")
# return None
# def _is_valid_transcription(self, text: str) -> bool:
# """Kiểm tra tính hợp lệ của transcription"""
# if not text or len(text.strip()) == 0:
# return False
# # Danh sách từ ngắn được chấp nhận
# short_responses = {
# 'ừ', 'um', 'à', 'ờ', 'ê', 'ô',
# 'có', 'không', 'đúng', 'sai', 'ok', 'okay', 'ừm',
# 'vâng', 'dạ', 'ạ', 'được', 'tốt', 'hay', 'ừ ừ',
# 'yes', 'no', 'yeah', 'yep', 'nope'
# }
# normalized_text = text.lower().strip()
# # Loại bỏ dấu câu và khoảng trắng thừa
# import re
# normalized_text = re.sub(r'[^\w\s]', '', normalized_text)
# # Nếu là từ ngắn thông dụng, chấp nhận
# if normalized_text in short_responses:
# return True
# # Kiểm tra độ dài tối thiểu cho các từ khác
# min_length = 2
# return len(normalized_text) >= min_length
# def _generate_ai_response_optimized(self, user_input: str) -> str:
# """Sinh phản hồi AI với tối ưu hiệu suất"""
# llm_start = time.time()
# try:
# # Thêm vào lịch sử
# self.conversation_history.append({"role": "user", "content": user_input})
# # Tìm kiếm RAG (có thể bỏ qua nếu cần tốc độ)
# rag_start = time.time()
# rag_results = self.rag_system.semantic_search(user_input, top_k=2) if self.rag_system else []
# rag_latency = time.time() - rag_start
# context_text = "\n".join([f"- {result.get('text', str(result))}" for result in rag_results]) if rag_results else ""
# system_prompt = f"""Bạn là trợ lý AI thông minh chuyên về tiếng Việt.
# Hãy trả lời ngắn gọn, tự nhiên và hữu ích (dưới 100 từ).
# Thông tin tham khảo:
# {context_text}
# """
# messages = [{"role": "system", "content": system_prompt}]
# # Giữ lại 6 tin nhắn gần nhất
# messages.extend(self.conversation_history[-6:])
# llm_inference_start = time.time()
# completion = self.client.chat.completions.create(
# model=settings.MULTILINGUAL_LLM_MODEL,
# messages=messages,
# max_tokens=300,
# temperature=0.7
# )
# ttft = time.time() - llm_inference_start
# response = completion.choices[0].message.content
# self.conversation_history.append({"role": "assistant", "content": response})
# total_llm_latency = time.time() - llm_start
# # Giới hạn lịch sử
# if len(self.conversation_history) > 12:
# self.conversation_history = self.conversation_history[-12:]
# print(f"✅ RAG Latency: {rag_latency:.2f}s")
# print(f"✅ LLM TTFT: {ttft:.2f}s")
# print(f"✅ Total LLM Latency: {total_llm_latency:.2f}s")
# return response
# except Exception as e:
# print(f"❌ Lỗi tạo AI response: {e}")
# return "Xin lỗi, tôi gặp lỗi khi tạo phản hồi. Vui lòng thử lại."
# def _text_to_speech_optimized(self, text: str) -> Optional[str]:
# """Chuyển văn bản thành giọng nói với tối ưu"""
# tts_start = time.time()
# try:
# if not text or text.startswith("❌") or text.startswith("Xin lỗi"):
# return None
# tts_bytes = self.tts_service.text_to_speech(text, 'vi')
# tts_latency = time.time() - tts_start
# if tts_bytes:
# audio_path = self.tts_service.save_audio_to_file(tts_bytes)
# print(f"✅ TTS Latency: {tts_latency:.2f}s")
# return audio_path
# except Exception as e:
# print(f"❌ Lỗi TTS: {e}")
# return None
# def process_streaming_audio(self, audio_data: tuple) -> Dict[str, Any]:
# """Xử lý audio streaming manual mode"""
# if not audio_data:
# return self._create_error_response("❌ Không có dữ liệu âm thanh")
# try:
# sample_rate, audio_array = audio_data
# print(f"🎯 Nhận audio manual: {len(audio_array)} samples, SR: {sample_rate}")
# # Kiểm tra và chuyển đổi kiểu dữ liệu
# if isinstance(audio_array, np.ndarray):
# if audio_array.dtype == np.float32 or audio_array.dtype == np.float64:
# audio_array = (audio_array * 32767).astype(np.int16)
# if len(audio_array) == 0:
# return self._create_error_response("❌ Âm thanh trống")
# # Kiểm tra âm lượng
# audio_abs = np.abs(audio_array.astype(np.float32))
# audio_rms = np.sqrt(np.mean(audio_abs**2)) / 32767.0
# if audio_rms < 0.005:
# return self._create_error_response("❌ Âm thanh quá yếu")
# # Sử dụng VAD để kiểm tra speech
# if not self.vad_processor.is_speech(audio_array, sample_rate):
# return self._create_error_response("❌ Không phát hiện giọng nói")
# # Chuyển đổi thành văn bản
# transcription = self._transcribe_audio_optimized(audio_array, sample_rate)
# if not transcription:
# return self._create_error_response("❌ Không nghe rõ")
# print(f"📝 Đã chuyển đổi: {transcription}")
# self.current_transcription = transcription
# # Tạo phản hồi AI
# response = self._generate_ai_response_optimized(transcription)
# # Tạo TTS
# tts_audio_path = self._text_to_speech_optimized(response)
# return {
# 'transcription': transcription,
# 'response': response,
# 'tts_audio': tts_audio_path,
# 'status': 'completed'
# }
# except Exception as e:
# print(f"❌ Lỗi xử lý streaming audio: {e}")
# return self._create_error_response(f"❌ Lỗi: {str(e)}")
# def _create_error_response(self, message: str) -> Dict[str, Any]:
# """Tạo response lỗi chuẩn"""
# return {
# 'transcription': message,
# 'response': "Vui lòng thử lại",
# 'tts_audio': None,
# 'status': 'error'
# }
# def _resample_audio(self, audio_data: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray:
# """Resample audio"""
# try:
# from scipy import signal
# duration = len(audio_data) / orig_sr
# new_length = int(duration * target_sr)
# resampled_audio = signal.resample(audio_data, new_length)
# return np.clip(resampled_audio, -32768, 32767).astype(np.int16)
# except Exception:
# return audio_data
# def _log_latency_metrics(self, latencies: dict):
# """Log và theo dõi latency metrics"""
# for key, value in latencies.items():
# if key in self.latency_metrics:
# self.latency_metrics[key].append(value)
# # Giữ chỉ 100 mẫu gần nhất
# if len(self.latency_metrics[key]) > 100:
# self.latency_metrics[key] = self.latency_metrics[key][-100:]
# print("📊 LATENCY REPORT:")
# for component, latency in latencies.items():
# print(f" {component.upper()}: {latency:.2f}s")
# def get_latency_stats(self) -> dict:
# """Lấy thống kê latency"""
# stats = {}
# for component, latencies in self.latency_metrics.items():
# if latencies:
# stats[component] = {
# 'avg': sum(latencies) / len(latencies),
# 'min': min(latencies),
# 'max': max(latencies),
# 'count': len(latencies),
# 'recent_avg': sum(latencies[-10:]) / min(10, len(latencies))
# }
# else:
# stats[component] = {'avg': 0, 'min': 0, 'max': 0, 'count': 0, 'recent_avg': 0}
# return stats
# def get_conversation_state(self) -> dict:
# """Lấy trạng thái hội thoại"""
# return {
# 'is_listening': self.is_listening,
# 'is_processing': self.is_processing,
# 'history_length': len(self.conversation_history),
# 'current_transcription': self.current_transcription,
# 'queue_size': self.response_queue.qsize(),
# 'worker_threads': len(self.processing_threads),
# 'last_update': time.strftime("%H:%M:%S")
# }
# def clear_conversation(self):
# """Xóa lịch sử hội thoại"""
# self.conversation_history = []
# self.current_transcription = ""
# print("🗑️ Đã xóa lịch sử hội thoại")
# import io
# import numpy as np
# import soundfile as sf
# import time
# import traceback
# import threading
# from groq import Groq
# import queue
# from typing import Optional, Dict, Any, Callable
# from config.settings import settings
# from core.rag_system import EnhancedRAGSystem
# from core.tts_service import EnhancedTTSService
# from core.speechbrain_vad import SpeechBrainVAD
# from core.silero_vad import SileroVAD
# class StreamingVoiceService:
# def __init__(self, groq_client: Groq, rag_system, tts_service):
# self.client = groq_client
# self.rag_system = rag_system
# self.tts_service = tts_service
# # Khởi tạo VAD
# self.vad_processor = SileroVAD()
# self.is_listening = False
# self.speech_callback = None
# self.is_processing = False
# self.processing_lock = threading.Lock()
# # Conversation context
# self.conversation_history = []
# self.current_transcription = ""
# # Multi-thread processing - FIXED: Khởi tạo queue đúng cách
# self.response_queue = queue.Queue()
# self.processing_threads = []
# self.max_workers = 2
# # Latency tracking - FIXED: Thêm các metrics còn thiếu
# self.latency_metrics = {
# 'asr': [],
# 'rag': [],
# 'llm': [],
# 'tts': [],
# 'total': [],
# 'vad_detection': [],
# 'queue_waiting': []
# }
# # FIXED: Thêm biến để track callback function
# self.current_callback = None
# def start_listening(self, speech_callback: Callable) -> bool:
# """Bắt đầu lắng nghe với multi-thread processing - FIXED VERSION"""
# if self.is_listening:
# return False
# # FIXED: Lưu callback function
# self.current_callback = speech_callback
# success = self.vad_processor.start_stream(self._on_speech_detected)
# if success:
# self.is_listening = True
# self.is_processing = False
# # FIXED: Khởi động worker threads chỉ khi chưa có
# if not self.processing_threads:
# for i in range(self.max_workers):
# thread = threading.Thread(
# target=self._process_response_worker,
# daemon=True,
# name=f"ASR-Worker-{i}"
# )
# thread.start()
# self.processing_threads.append(thread)
# print("🎙️ Đã bắt đầu lắng nghe với multi-thread processing")
# return True
# return False
# def stop_listening(self):
# """Dừng lắng nghe - FIXED VERSION"""
# self.vad_processor.stop_stream()
# self.is_listening = False
# self.is_processing = False
# self.current_callback = None
# # FIXED: Clear queue đúng cách
# while not self.response_queue.empty():
# try:
# self.response_queue.get_nowait()
# self.response_queue.task_done()
# except queue.Empty:
# break
# print("🛑 Đã dừng lắng nghe")
# def _on_speech_detected(self, speech_audio: np.ndarray, sample_rate: int):
# """Callback khi VAD phát hiện speech - FIXED: Thêm latency tracking"""
# vad_start = time.time()
# audio_duration = len(speech_audio) / sample_rate
# # FIXED: Đo lường VAD detection latency
# vad_latency = time.time() - vad_start
# self._add_latency_sample('vad_detection', vad_latency)
# print(f"🎯 VAD phát hiện speech: {audio_duration:.2f}s - Đưa vào queue")
# # FIXED: Thêm vào queue với timeout
# try:
# self.response_queue.put((speech_audio, sample_rate), timeout=0.5)
# except queue.Full:
# print("⚠️ Queue đầy, bỏ qua audio segment")
# def _process_response_worker(self):
# """Worker thread xử lý speech segments - FIXED VERSION"""
# while self.is_listening:
# try:
# # FIXED: Thêm queue waiting latency tracking
# queue_start = time.time()
# speech_audio, sample_rate = self.response_queue.get(timeout=1.0)
# queue_latency = time.time() - queue_start
# self._add_latency_sample('queue_waiting', queue_latency)
# # Xử lý speech segment
# self._process_speech_segment(speech_audio, sample_rate)
# # Đánh dấu task hoàn thành
# self.response_queue.task_done()
# except queue.Empty:
# continue
# except Exception as e:
# print(f"❌ Lỗi trong worker thread: {e}")
# continue
# def _process_speech_segment(self, speech_audio: np.ndarray, sample_rate: int):
# """Xử lý speech segment - FIXED: Sử dụng callback đúng cách"""
# total_start_time = time.time()
# try:
# # 1. ASR Transcription
# asr_start = time.time()
# transcription = self._transcribe_audio_optimized(speech_audio, sample_rate)
# asr_latency = time.time() - asr_start
# if not transcription or len(transcription.strip()) < 2:
# print("⚠️ Transcription quá ngắn hoặc trống")
# return
# print(f"📝 Transcription: {transcription}")
# self.current_transcription = transcription
# # 2. AI Response Generation
# rag_start = time.time()
# response = self._generate_ai_response_optimized(transcription)
# rag_latency = time.time() - rag_start
# # 3. TTS Conversion
# tts_start = time.time()
# tts_audio_path = self._text_to_speech_optimized(response)
# tts_latency = time.time() - tts_start
# total_latency = time.time() - total_start_time
# # Log latency metrics
# self._log_latency_metrics({
# 'asr': asr_latency,
# 'rag': rag_latency,
# 'tts': tts_latency,
# 'total': total_latency
# })
# # FIXED: Sử dụng callback đã lưu
# if self.current_callback:
# result = {
# 'transcription': transcription,
# 'response': response,
# 'tts_audio': tts_audio_path,
# 'status': 'completed',
# 'latency': total_latency
# }
# self.current_callback(result)
# except Exception as e:
# print(f"❌ Lỗi xử lý speech segment: {e}")
# traceback.print_exc()
# # FIXED: Gọi callback ngay cả khi có lỗi
# if self.current_callback:
# self.current_callback({
# 'transcription': f"Lỗi: {str(e)}",
# 'response': "Xin lỗi, có lỗi xảy ra",
# 'tts_audio': None,
# 'status': 'error'
# })
# def _transcribe_audio_optimized(self, audio_data: np.ndarray, sample_rate: int) -> Optional[str]:
# """Chuyển audio -> text với tối ưu hiệu suất - FIXED VERSION"""
# asr_start = time.time()
# try:
# # Chuẩn hóa audio data
# if audio_data.dtype != np.int16:
# if audio_data.dtype in [np.float32, np.float64]:
# audio_data = (audio_data * 32767).astype(np.int16)
# else:
# audio_data = audio_data.astype(np.int16)
# if audio_data.ndim > 1:
# audio_data = np.mean(audio_data, axis=1).astype(np.int16)
# # Resample nếu cần
# target_sample_rate = 16000
# if sample_rate != target_sample_rate:
# audio_data = self._resample_audio(audio_data, sample_rate, target_sample_rate)
# sample_rate = target_sample_rate
# # Giới hạn độ dài audio
# max_duration = 30
# max_samples = sample_rate * max_duration
# if len(audio_data) > max_samples:
# audio_data = audio_data[:max_samples]
# print(f"🔊 Gửi audio đến Whisper: {len(audio_data)} samples, {sample_rate}Hz")
# # Tạo temporary file trong memory
# buffer = io.BytesIO()
# sf.write(buffer, audio_data, sample_rate, format='wav', subtype='PCM_16')
# buffer.seek(0)
# # Gọi API Whisper với timeout
# api_start = time.time()
# try:
# # FIXED: Sửa lỗi buffer - đọc lại buffer sau khi seek
# audio_buffer_content = buffer.getvalue()
# transcription = self.client.audio.transcriptions.create(
# model=settings.WHISPER_MODEL,
# file=("speech.wav", audio_buffer_content, "audio/wav"),
# response_format="text",
# language="vi",
# temperature=0.0,
# )
# except Exception as e:
# print(f"❌ Lỗi Whisper API: {e}")
# return None
# api_latency = time.time() - api_start
# # Xử lý response
# if hasattr(transcription, 'text'):
# result = transcription.text.strip()
# elif isinstance(transcription, str):
# result = transcription.strip()
# else:
# result = str(transcription).strip()
# total_asr_latency = time.time() - asr_start
# print(f"✅ ASR Latency: {total_asr_latency:.2f}s (API: {api_latency:.2f}s)")
# # Kiểm tra chất lượng transcription
# if self._is_valid_transcription(result):
# return result
# else:
# print(f"⚠️ Transcription không hợp lệ: '{result}'")
# return None
# except Exception as e:
# print(f"❌ Lỗi transcription: {e}")
# traceback.print_exc()
# return None
# def _is_valid_transcription(self, text: str) -> bool:
# """Kiểm tra tính hợp lệ của transcription"""
# if not text or len(text.strip()) == 0:
# return False
# # Danh sách từ ngắn được chấp nhận
# short_responses = {
# 'ừ', 'um', 'à', 'ờ', 'ê', 'ô',
# 'có', 'không', 'đúng', 'sai', 'ok', 'okay', 'ừm',
# 'vâng', 'dạ', 'ạ', 'được', 'tốt', 'hay', 'ừ ừ',
# 'yes', 'no', 'yeah', 'yep', 'nope'
# }
# normalized_text = text.lower().strip()
# # Loại bỏ dấu câu và khoảng trắng thừa
# import re
# normalized_text = re.sub(r'[^\w\s]', '', normalized_text)
# normalized_text = re.sub(r'\s+', ' ', normalized_text).strip()
# # Nếu là từ ngắn thông dụng, chấp nhận
# if normalized_text in short_responses:
# return True
# # Kiểm tra độ dài tối thiểu cho các từ khác
# min_length = 2
# return len(normalized_text) >= min_length
# def _generate_ai_response_optimized(self, user_input: str) -> str:
# """Sinh phản hồi AI với tối ưu hiệu suất - FIXED: Thêm LLM latency tracking"""
# llm_start = time.time()
# try:
# # Thêm vào lịch sử
# self.conversation_history.append({"role": "user", "content": user_input})
# # Tìm kiếm RAG
# rag_start = time.time()
# rag_results = self.rag_system.semantic_search(user_input, top_k=2) if self.rag_system else []
# rag_latency = time.time() - rag_start
# context_text = "\n".join([f"- {result.get('text', str(result))}" for result in rag_results]) if rag_results else ""
# system_prompt = f"""Bạn là trợ lý AI thông minh chuyên về tiếng Việt.
# Hãy trả lời ngắn gọn, tự nhiên và hữu ích (dưới 100 từ).
# Thông tin tham khảo:
# {context_text}
# """
# messages = [{"role": "system", "content": system_prompt}]
# # Giữ lại 6 tin nhắn gần nhất
# messages.extend(self.conversation_history[-6:])
# llm_inference_start = time.time()
# completion = self.client.chat.completions.create(
# model=settings.MULTILINGUAL_LLM_MODEL,
# messages=messages,
# max_tokens=300,
# temperature=0.7
# )
# ttft = time.time() - llm_inference_start
# response = completion.choices[0].message.content
# self.conversation_history.append({"role": "assistant", "content": response})
# total_llm_latency = time.time() - llm_start
# # Giới hạn lịch sử
# if len(self.conversation_history) > 12:
# self.conversation_history = self.conversation_history[-12:]
# # FIXED: Thêm LLM latency tracking
# self._add_latency_sample('llm', total_llm_latency)
# self._add_latency_sample('rag', rag_latency)
# print(f"✅ RAG Latency: {rag_latency:.2f}s")
# print(f"✅ LLM TTFT: {ttft:.2f}s")
# print(f"✅ Total LLM Latency: {total_llm_latency:.2f}s")
# return response
# except Exception as e:
# print(f"❌ Lỗi tạo AI response: {e}")
# return "Xin lỗi, tôi gặp lỗi khi tạo phản hồi. Vui lòng thử lại."
# def _text_to_speech_optimized(self, text: str) -> Optional[str]:
# """Chuyển văn bản thành giọng nói với tối ưu"""
# tts_start = time.time()
# try:
# if not text or text.startswith("❌") or text.startswith("Xin lỗi"):
# return None
# tts_bytes = self.tts_service.text_to_speech(text, 'vi')
# tts_latency = time.time() - tts_start
# if tts_bytes:
# audio_path = self.tts_service.save_audio_to_file(tts_bytes)
# print(f"✅ TTS Latency: {tts_latency:.2f}s")
# # FIXED: Thêm TTS latency tracking
# self._add_latency_sample('tts', tts_latency)
# return audio_path
# except Exception as e:
# print(f"❌ Lỗi TTS: {e}")
# return None
# def process_streaming_audio(self, audio_data: tuple) -> Dict[str, Any]:
# """Xử lý audio streaming manual mode - FIXED VERSION"""
# if not audio_data:
# return self._create_error_response("❌ Không có dữ liệu âm thanh")
# try:
# sample_rate, audio_array = audio_data
# print(f"🎯 Nhận audio manual: {len(audio_array)} samples, SR: {sample_rate}")
# # Kiểm tra và chuyển đổi kiểu dữ liệu
# if isinstance(audio_array, np.ndarray):
# if audio_array.dtype == np.float32 or audio_array.dtype == np.float64:
# audio_array = (audio_array * 32767).astype(np.int16)
# if len(audio_array) == 0:
# return self._create_error_response("❌ Âm thanh trống")
# # Kiểm tra âm lượng
# audio_abs = np.abs(audio_array.astype(np.float32))
# audio_rms = np.sqrt(np.mean(audio_abs**2)) / 32767.0
# if audio_rms < 0.005:
# return self._create_error_response("❌ Âm thanh quá yếu")
# # Sử dụng VAD để kiểm tra speech
# if not self.vad_processor.is_speech(audio_array, sample_rate):
# return self._create_error_response("❌ Không phát hiện giọng nói")
# # Chuyển đổi thành văn bản
# transcription = self._transcribe_audio_optimized(audio_array, sample_rate)
# if not transcription:
# return self._create_error_response("❌ Không nghe rõ")
# print(f"📝 Đã chuyển đổi: {transcription}")
# self.current_transcription = transcription
# # Tạo phản hồi AI
# response = self._generate_ai_response_optimized(transcription)
# # Tạo TTS
# tts_audio_path = self._text_to_speech_optimized(response)
# return {
# 'transcription': transcription,
# 'response': response,
# 'tts_audio': tts_audio_path,
# 'status': 'completed'
# }
# except Exception as e:
# print(f"❌ Lỗi xử lý streaming audio: {e}")
# return self._create_error_response(f"❌ Lỗi: {str(e)}")
# def _create_error_response(self, message: str) -> Dict[str, Any]:
# """Tạo response lỗi chuẩn"""
# return {
# 'transcription': message,
# 'response': "Vui lòng thử lại",
# 'tts_audio': None,
# 'status': 'error'
# }
# def _resample_audio(self, audio_data: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray:
# """Resample audio"""
# try:
# from scipy import signal
# duration = len(audio_data) / orig_sr
# new_length = int(duration * target_sr)
# resampled_audio = signal.resample(audio_data, new_length)
# return np.clip(resampled_audio, -32768, 32767).astype(np.int16)
# except Exception:
# return audio_data
# def _add_latency_sample(self, component: str, latency: float):
# """Thêm mẫu latency - FIXED VERSION"""
# if component in self.latency_metrics:
# self.latency_metrics[component].append(latency)
# # Giữ chỉ 100 mẫu gần nhất
# if len(self.latency_metrics[component]) > 100:
# self.latency_metrics[component] = self.latency_metrics[component][-100:]
# def _log_latency_metrics(self, latencies: dict):
# """Log và theo dõi latency metrics - FIXED VERSION"""
# for key, value in latencies.items():
# self._add_latency_sample(key, value)
# print("📊 LATENCY REPORT:")
# for component, latency in latencies.items():
# print(f" {component.upper()}: {latency:.2f}s")
# def get_latency_stats(self) -> dict:
# """Lấy thống kê latency - FIXED VERSION"""
# stats = {}
# for component, latencies in self.latency_metrics.items():
# if latencies:
# recent_latencies = latencies[-10:] # Lấy 10 mẫu gần nhất
# stats[component] = {
# 'avg': sum(latencies) / len(latencies),
# 'min': min(latencies),
# 'max': max(latencies),
# 'count': len(latencies),
# 'recent_avg': sum(recent_latencies) / len(recent_latencies),
# 'recent_min': min(recent_latencies),
# 'recent_max': max(recent_latencies)
# }
# else:
# stats[component] = {
# 'avg': 0, 'min': 0, 'max': 0, 'count': 0,
# 'recent_avg': 0, 'recent_min': 0, 'recent_max': 0
# }
# return stats
# def get_conversation_state(self) -> dict:
# """Lấy trạng thái hội thoại - FIXED VERSION"""
# return {
# 'is_listening': self.is_listening,
# 'is_processing': self.is_processing,
# 'history_length': len(self.conversation_history),
# 'current_transcription': self.current_transcription,
# 'queue_size': self.response_queue.qsize(),
# 'worker_threads': len(self.processing_threads),
# 'last_update': time.strftime("%H:%M:%S")
# }
# def clear_conversation(self):
# """Xóa lịch sử hội thoại"""
# self.conversation_history = []
# self.current_transcription = ""
# print("🗑️ Đã xóa lịch sử hội thoại")
import io
import numpy as np
import soundfile as sf
import time
import traceback
import threading
import queue
import json
import os
from vosk import Model, KaldiRecognizer
from groq import Groq
from typing import Optional, Dict, Any, Callable
from config.settings import settings
from core.rag_system import EnhancedRAGSystem
from core.tts_service import EnhancedTTSService
from core.silero_vad import SileroVAD
class VoskStreamingASR:
def __init__(self, model_path: str = None):
"""Khởi tạo VOSK ASR streaming với debug"""
self.model = None
self.recognizer = None
self.sample_rate = 16000
self.is_streaming = False
# Tự động tải model nếu không có đường dẫn
if model_path is None:
model_path = self._download_vosk_model()
if model_path and os.path.exists(model_path):
print(f"🔄 Đang tải VOSK model từ: {model_path}")
try:
self.model = Model(model_path)
self.recognizer = KaldiRecognizer(self.model, self.sample_rate)
self.recognizer.SetWords(True)
print("✅ Đã tải VOSK model thành công")
except Exception as e:
print(f"❌ Lỗi khởi tạo VOSK model: {e}")
else:
print(f"❌ Không tìm thấy VOSK model tại: {model_path}")
def _download_vosk_model(self):
"""Tải VOSK model tiếng Việt tự động"""
try:
import urllib.request
import zipfile
model_url = "https://alphacephei.com/vosk/models/vosk-model-small-vn-0.4.zip"
model_dir = "models/vosk-model-small-vn-0.4"
zip_path = "models/vosk-model-small-vn-0.4.zip"
# Tạo thư mục nếu chưa có
os.makedirs("models", exist_ok=True)
if not os.path.exists(model_dir):
print("📥 Đang tải VOSK Vietnamese model...")
urllib.request.urlretrieve(model_url, zip_path)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall("models/")
# Đảm bảo thư mục tồn tại
if os.path.exists("models/vosk-model-small-vn-0.4"):
os.rename("models/vosk-model-small-vn-0.4", model_dir)
if os.path.exists(zip_path):
os.remove(zip_path)
print("✅ Đã tải VOSK model thành công")
return model_dir if os.path.exists(model_dir) else None
except Exception as e:
print(f"❌ Lỗi tải VOSK model: {e}")
return None
def start_stream(self):
"""Bắt đầu stream mới"""
if self.model is None:
print("❌ VOSK model chưa được khởi tạo")
return False
try:
self.recognizer = KaldiRecognizer(self.model, self.sample_rate)
self.recognizer.SetWords(True)
self.is_streaming = True
print("🎤 Đã khởi động VOSK stream")
return True
except Exception as e:
print(f"❌ Lỗi khởi động VOSK stream: {e}")
return False
def process_audio_chunk(self, audio_chunk: np.ndarray, sample_rate: int = None) -> Dict[str, Any]:
"""Xử lý audio chunk và trả về kết quả - FIXED VERSION"""
if self.recognizer is None or not self.is_streaming:
return {"text": "", "partial": "", "is_final": False}
try:
# DEBUG: Thông tin audio chunk
print(f"🔊 Audio chunk: {len(audio_chunk)} samples, dtype: {audio_chunk.dtype}, max: {np.max(audio_chunk):.4f}")
# Chuẩn hóa audio - QUAN TRỌNG: VOSK cần audio ở dạng int16
if sample_rate and sample_rate != self.sample_rate:
audio_chunk = self._resample_audio(audio_chunk, sample_rate, self.sample_rate)
# Đảm bảo là int16 với giá trị phù hợp
if audio_chunk.dtype != np.int16:
if audio_chunk.dtype in [np.float32, np.float64]:
# Audio float cần được scale về [-32768, 32767]
audio_chunk = (audio_chunk * 32767).astype(np.int16)
else:
audio_chunk = audio_chunk.astype(np.int16)
# Kiểm tra âm lượng
audio_rms = np.sqrt(np.mean(audio_chunk.astype(np.float32)**2)) / 32767.0
print(f"📊 Audio RMS: {audio_rms:.4f}")
if audio_rms < 0.01: # Âm lượng quá thấp
print("⚠️ Âm lượng quá thấp, bỏ qua")
return {"text": "", "partial": "", "is_final": False}
# Chuyển đổi sang bytes
audio_bytes = audio_chunk.tobytes()
# Xử lý với VOSK
if self.recognizer.AcceptWaveform(audio_bytes):
# Kết quả cuối cùng
result_json = self.recognizer.Result()
result = json.loads(result_json)
text = result.get('text', '').strip()
print(f"✅ VOSK Final Result: '{text}'")
if text:
return {"text": text, "partial": "", "is_final": True}
else:
# Kết quả tạm thời
partial_json = self.recognizer.PartialResult()
partial_result = json.loads(partial_json)
partial_text = partial_result.get('partial', '').strip()
if partial_text:
print(f"🎯 VOSK Partial: '{partial_text}'")
return {"text": "", "partial": partial_text, "is_final": False}
except Exception as e:
print(f"❌ Lỗi VOSK processing: {e}")
traceback.print_exc()
return {"text": "", "partial": "", "is_final": False}
def stop_stream(self) -> str:
"""Kết thúc stream và lấy kết quả cuối"""
if self.recognizer:
try:
result_json = self.recognizer.FinalResult()
result = json.loads(result_json)
text = result.get('text', '').strip()
self.is_streaming = False
print(f"🛑 VOSK Final: '{text}'")
return text
except Exception as e:
print(f"❌ Lỗi khi dừng VOSK stream: {e}")
return ""
def _resample_audio(self, audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray:
"""Resample audio với chất lượng tốt hơn"""
if orig_sr == target_sr:
return audio
try:
from scipy import signal
# Tính số sample mới
num_samples = int(len(audio) * target_sr / orig_sr)
resampled_audio = signal.resample(audio, num_samples)
return resampled_audio.astype(np.int16)
except Exception as e:
print(f"❌ Lỗi resample audio: {e}")
return audio
class StreamingVoiceService:
def __init__(self, groq_client: Groq, rag_system, tts_service):
self.client = groq_client
self.rag_system = rag_system
self.tts_service = tts_service
# Khởi tạo VOSK ASR - FIXED: Thêm timeout và retry
print("🔄 Đang khởi tạo VOSK ASR...")
self.vosk_asr = VoskStreamingASR()
# Khởi tạo VAD
self.vad_processor = SileroVAD()
self.is_listening = False
self.speech_callback = None
self.is_processing = False
# Conversation context
self.conversation_history = []
self.current_transcription = ""
self.partial_transcription = ""
# Multi-thread processing
self.response_queue = queue.Queue()
self.processing_threads = []
self.max_workers = 2
# Streaming state
self.vosk_stream_active = False
self.last_voice_time = 0
self.silence_timeout = 3.0 # Tăng timeout lên 3 giây
# Audio buffer để cải thiện nhận diện
self.audio_buffer = []
self.buffer_duration = 1.0 # Buffer 1 giây
self.max_buffer_samples = 16000 # 1 giây ở 16kHz
# Latency tracking
self.latency_metrics = {
'asr': [], 'rag': [], 'llm': [], 'tts': [], 'total': [],
'vad_detection': [], 'queue_waiting': [], 'vosk_processing': []
}
self.current_callback = None
def start_listening(self, speech_callback: Callable) -> bool:
"""Bắt đầu lắng nghe với VOSK streaming - FIXED VERSION"""
if self.is_listening:
print("⚠️ Đã đang lắng nghe")
return False
self.current_callback = speech_callback
# Kiểm tra VOSK model
if self.vosk_asr.model is None:
print("❌ VOSK model không khả dụng")
if self.current_callback:
self.current_callback({
'transcription': "Lỗi: VOSK model không khả dụng",
'response': "Không thể khởi động nhận diện giọng nói",
'tts_audio': None,
'status': 'error'
})
return False
# Khởi động VOSK stream
if not self.vosk_asr.start_stream():
print("❌ Không thể khởi động VOSK stream")
return False
# Khởi động VAD
success = self.vad_processor.start_stream(self._on_speech_detected)
if success:
self.is_listening = True
self.is_processing = False
self.vosk_stream_active = True
self.last_voice_time = time.time()
self.audio_buffer = []
# Khởi động worker threads
if not self.processing_threads:
for i in range(self.max_workers):
thread = threading.Thread(
target=self._process_response_worker,
daemon=True,
name=f"ASR-Worker-{i}"
)
thread.start()
self.processing_threads.append(thread)
# Bắt đầu thread theo dõi VOSK streaming
threading.Thread(target=self._vosk_streaming_monitor, daemon=True).start()
print("🎙️ Đã bắt đầu lắng nghe với VOSK ASR streaming")
# Thông báo trạng thái
if self.current_callback:
self.current_callback({
'transcription': "Đã bắt đầu lắng nghe... Hãy nói gì đó",
'response': "",
'tts_audio': None,
'status': 'listening'
})
return True
return False
def _vosk_streaming_monitor(self):
"""Theo dõi VOSK streaming và xử lý kết quả real-time"""
while self.is_listening and self.vosk_stream_active:
try:
current_time = time.time()
silence_duration = current_time - self.last_voice_time
# Xử lý audio buffer nếu có dữ liệu
if self.audio_buffer and silence_duration > 0.5: # 0.5 giây im lặng
combined_audio = np.concatenate(self.audio_buffer)
if len(combined_audio) > 1600: # Ít nhất 0.1 giây audio
result = self.vosk_asr.process_audio_chunk(combined_audio, 16000)
self._handle_vosk_result(result)
self.audio_buffer = []
# Timeout im lặng
if silence_duration > self.silence_timeout and self.partial_transcription:
print(f"⏰ Silence timeout, xử lý: '{self.partial_transcription}'")
if len(self.partial_transcription) > 2: # Chỉ xử lý nếu có nội dung
self._process_final_transcription(self.partial_transcription)
self.partial_transcription = ""
self.vosk_asr.start_stream()
time.sleep(0.1)
except Exception as e:
print(f"❌ Lỗi VOSK monitor: {e}")
break
def _on_speech_detected(self, speech_audio: np.ndarray, sample_rate: int):
"""Callback khi VAD phát hiện speech - FIXED VERSION"""
if not self.vosk_stream_active or not self.is_listening:
return
# Cập nhật thời gian có giọng nói
self.last_voice_time = time.time()
# Thêm vào audio buffer để cải thiện nhận diện
self.audio_buffer.append(speech_audio)
# Giới hạn buffer size
total_samples = sum(len(chunk) for chunk in self.audio_buffer)
if total_samples > self.max_buffer_samples:
# Giữ lại các chunk gần nhất
while total_samples > self.max_buffer_samples and len(self.audio_buffer) > 1:
removed = self.audio_buffer.pop(0)
total_samples -= len(removed)
print(f"🎯 VAD detected: {len(speech_audio)} samples, Buffer: {len(self.audio_buffer)} chunks")
def _handle_vosk_result(self, result: Dict[str, Any]):
"""Xử lý kết quả từ VOSK"""
# Xử lý kết quả partial
if result['partial'] and len(result['partial']) > 1:
self.partial_transcription = result['partial']
print(f"🎯 VOSK Partial: '{result['partial']}'")
# Gửi partial result real-time
if self.current_callback:
self.current_callback({
'transcription': result['partial'],
'response': "",
'tts_audio': None,
'status': 'partial'
})
# Xử lý kết quả final
if result['is_final'] and result['text'] and len(result['text']) > 1:
print(f"✅ VOSK Final: '{result['text']}'")
self._process_final_transcription(result['text'])
self.partial_transcription = ""
self.audio_buffer = [] # Clear buffer
self.vosk_asr.start_stream() # Bắt đầu stream mới
def _process_final_transcription(self, transcription: str):
"""Xử lý transcription cuối cùng"""
if not transcription or len(transcription.strip()) < 2:
return
print(f"📝 Final Transcription: '{transcription}'")
self.current_transcription = transcription
# Đưa vào queue để xử lý
try:
self.response_queue.put(transcription, timeout=0.5)
print(f"📦 Đã đưa vào queue: '{transcription}'")
except queue.Full:
print("⚠️ Queue đầy, bỏ qua transcription")
def process_streaming_audio(self, audio_data: tuple) -> Dict[str, Any]:
"""Xử lý audio streaming manual mode với VOSK - FIXED VERSION"""
if not audio_data:
return self._create_error_response("❌ Không có dữ liệu âm thanh")
try:
sample_rate, audio_array = audio_data
print(f"🎤 Manual audio: {len(audio_array)} samples, {sample_rate}Hz")
# Kiểm tra âm lượng
if isinstance(audio_array, np.ndarray):
if audio_array.dtype in [np.float32, np.float64]:
audio_rms = np.sqrt(np.mean(audio_array**2))
print(f"📊 Manual audio RMS: {audio_rms:.4f}")
if audio_rms < 0.01:
return {
'transcription': "Âm thanh quá nhỏ, hãy nói to hơn",
'response': "",
'tts_audio': None,
'status': 'listening'
}
# Khởi động VOSK stream tạm thời
if not self.vosk_asr.start_stream():
return self._create_error_response("❌ Không thể khởi động VOSK")
# Xử lý audio với VOSK
result = self.vosk_asr.process_audio_chunk(audio_array, sample_rate)
if result['is_final'] and result['text'] and len(result['text']) > 1:
transcription = result['text']
print(f"📝 Manual Transcription: '{transcription}'")
# Tạo phản hồi AI
response = self._generate_ai_response_optimized(transcription)
tts_audio_path = self._text_to_speech_optimized(response)
return {
'transcription': transcription,
'response': response,
'tts_audio': tts_audio_path,
'status': 'completed'
}
elif result['partial']:
return {
'transcription': result['partial'],
'response': "",
'tts_audio': None,
'status': 'listening'
}
else:
return {
'transcription': "Đang nghe... Hãy nói rõ hơn",
'response': "",
'tts_audio': None,
'status': 'listening'
}
except Exception as e:
print(f"❌ Lỗi xử lý streaming audio: {e}")
traceback.print_exc()
return self._create_error_response(f"❌ Lỗi: {str(e)}")
def _create_error_response(self, message: str) -> Dict[str, Any]:
"""Tạo response lỗi chuẩn"""
return {
'transcription': message,
'response': "Vui lòng thử lại",
'tts_audio': None,
'status': 'error'
}
def _add_latency_sample(self, component: str, latency: float):
"""Thêm mẫu latency"""
if component in self.latency_metrics:
self.latency_metrics[component].append(latency)
if len(self.latency_metrics[component]) > 100:
self.latency_metrics[component] = self.latency_metrics[component][-100:]
def _log_latency_metrics(self, latencies: dict):
"""Log và theo dõi latency metrics"""
for key, value in latencies.items():
self._add_latency_sample(key, value)
print("📊 LATENCY REPORT:")
for component, latency in latencies.items():
print(f" {component.upper()}: {latency:.2f}s")
def get_latency_stats(self) -> dict:
"""Lấy thống kê latency"""
stats = {}
for component, latencies in self.latency_metrics.items():
if latencies:
recent_latencies = latencies[-10:]
stats[component] = {
'avg': sum(latencies) / len(latencies),
'min': min(latencies),
'max': max(latencies),
'count': len(latencies),
'recent_avg': sum(recent_latencies) / len(recent_latencies),
'recent_min': min(recent_latencies),
'recent_max': max(recent_latencies)
}
else:
stats[component] = {
'avg': 0, 'min': 0, 'max': 0, 'count': 0,
'recent_avg': 0, 'recent_min': 0, 'recent_max': 0
}
return stats
def get_conversation_state(self) -> dict:
"""Lấy trạng thái hội thoại"""
return {
'is_listening': self.is_listening,
'is_processing': self.is_processing,
'history_length': len(self.conversation_history),
'current_transcription': self.current_transcription,
'partial_transcription': self.partial_transcription,
'queue_size': self.response_queue.qsize(),
'worker_threads': len(self.processing_threads),
'vosk_active': self.vosk_stream_active,
'audio_buffer_chunks': len(self.audio_buffer),
'last_voice_time': time.strftime("%H:%M:%S", time.localtime(self.last_voice_time)),
'last_update': time.strftime("%H:%M:%S")
}
def clear_conversation(self):
"""Xóa lịch sử hội thoại"""
self.conversation_history = []
self.current_transcription = ""
self.partial_transcription = ""
print("🗑️ Đã xóa lịch sử hội thoại")