import io import numpy as np import soundfile as sf import time import traceback import threading import queue import torch from groq import Groq from typing import Optional, Dict, Any, Callable from config.settings import settings class SileroVAD: def __init__(self): self.model = None self.sample_rate = 16000 self.is_streaming = False self.speech_callback = None self.audio_buffer = [] self.speech_buffer = [] self.state = "silence" self.speech_start_time = 0 self.last_voice_time = 0 # Cấu hình tối ưu self.chunk_size = 512 self.speech_threshold = settings.VAD_THRESHOLD self.min_speech_duration = settings.VAD_MIN_SPEECH_DURATION self.min_silence_duration = settings.VAD_MIN_SILENCE_DURATION self.speech_pad_duration = settings.VAD_SPEECH_PAD_DURATION self.pre_speech_buffer = settings.VAD_PRE_SPEECH_BUFFER # Buffer cho pre-speech self.pre_speech_samples = int(self.pre_speech_buffer * self.sample_rate) self.pre_speech_buffer_data = [] # Double buffer system để tránh mất dữ liệu self.active_speech_buffer = [] self.backup_speech_buffer = [] self._initialize_model() def _initialize_model(self): """Khởi tạo Silero VAD model""" try: print("🔄 Đang tải Silero VAD model...") self.model, utils = torch.hub.load( repo_or_dir='snakers4/silero-vad', model='silero_vad', force_reload=False, trust_repo=True ) self.model.eval() print("✅ Đã tải Silero VAD model thành công") except Exception as e: print(f"❌ Lỗi tải Silero VAD model: {e}") self.model = None def start_stream(self, speech_callback: Callable): """Bắt đầu stream với VAD""" if self.model is None: return False self.is_streaming = True self.speech_callback = speech_callback self.audio_buffer = [] self.speech_buffer = [] self.pre_speech_buffer_data = [] self.active_speech_buffer = [] self.backup_speech_buffer = [] self.state = "silence" self.speech_start_time = 0 self.last_voice_time = 0 print("🎙️ Bắt đầu VAD streaming với double buffer system...") return True def stop_stream(self): """Dừng stream""" self.is_streaming = False self.speech_callback = None self.audio_buffer = [] self.speech_buffer = [] self.pre_speech_buffer_data = [] self.active_speech_buffer = [] self.backup_speech_buffer = [] self.state = "silence" print("🛑 Đã dừng VAD streaming") def process_stream(self, audio_chunk: np.ndarray, sample_rate: int): """Xử lý audio chunk với VAD và double buffer""" if not self.is_streaming or self.model is None: return try: # Resample nếu cần if sample_rate != self.sample_rate: audio_chunk = self._resample_audio(audio_chunk, sample_rate, self.sample_rate) # Thêm vào audio buffer self.audio_buffer.extend(audio_chunk) # Đồng thời thêm vào backup buffer để tránh mất dữ liệu if self.state == "speech": self.backup_speech_buffer.extend(audio_chunk) # Xử lý VAD theo chunks while len(self.audio_buffer) >= self.chunk_size: chunk = self.audio_buffer[:self.chunk_size] self._process_vad_chunk(np.array(chunk)) self.audio_buffer = self.audio_buffer[self.chunk_size:] except Exception as e: print(f"❌ Lỗi xử lý VAD: {e}") def _process_vad_chunk(self, audio_chunk: np.ndarray): """Xử lý VAD cho một chunk với double buffer""" current_time = time.time() # Chuẩn hóa audio audio_chunk = self._normalize_audio(audio_chunk) # Lấy xác suất speech speech_prob = self._get_speech_probability(audio_chunk) if self.state == "silence": if speech_prob > self.speech_threshold: print("🎤 Bắt đầu phát hiện speech") self.state = "speech" self.speech_start_time = current_time self.last_voice_time = current_time # Khởi tạo cả active và backup buffer self.active_speech_buffer = self.pre_speech_buffer_data.copy() self.active_speech_buffer.extend(audio_chunk) self.backup_speech_buffer = self.active_speech_buffer.copy() else: # Lưu pre-speech buffer self.pre_speech_buffer_data.extend(audio_chunk) if len(self.pre_speech_buffer_data) > self.pre_speech_samples: self.pre_speech_buffer_data = self.pre_speech_buffer_data[-self.pre_speech_samples:] elif self.state == "speech": # Thêm vào cả hai buffers self.active_speech_buffer.extend(audio_chunk) self.backup_speech_buffer.extend(audio_chunk) # Cập nhật thời gian voice cuối cùng if speech_prob > self.speech_threshold: self.last_voice_time = current_time # Tính toán thời gian silence_duration = current_time - self.last_voice_time speech_duration = current_time - self.speech_start_time # Logic kết thúc thông minh is_short_response = speech_duration < self.min_speech_duration is_long_silence_after_short = silence_duration >= self.min_silence_duration if is_short_response and is_long_silence_after_short: print(f"🎯 Phát hiện phản hồi ngắn: {speech_duration:.2f}s, im lặng: {silence_duration:.2f}s") self._finalize_speech() elif (speech_duration >= self.min_speech_duration and silence_duration >= self.min_silence_duration): print(f"🎯 Kết thúc speech dài: {speech_duration:.2f}s") self._finalize_speech() elif speech_duration > settings.MAX_AUDIO_DURATION: print(f"⏰ Speech timeout ({speech_duration:.2f}s) - xử lý dù đang nói") self._finalize_speech() elif self.state == "processing": # Trong khi đang xử lý, vẫn tiếp tục ghi vào backup buffer self.backup_speech_buffer.extend(audio_chunk) def _finalize_speech(self): """Hoàn thành xử lý speech segment với buffer switching""" if not self.active_speech_buffer: self._reset_buffers() return # Chuyển sang state processing self.state = "processing" # Sử dụng active buffer cho xử lý hiện tại speech_audio = np.array(self.active_speech_buffer, dtype=np.float32) # Gọi callback trong thread riêng if self.speech_callback: threading.Thread( target=self.speech_callback, args=(speech_audio, self.sample_rate), daemon=True ).start() # Chuẩn bị cho lần tiếp theo: chuyển backup buffer thành active buffer self.active_speech_buffer = self.backup_speech_buffer.copy() self.backup_speech_buffer = [] # Quay lại state speech để tiếp tục nhận dữ liệu self.state = "speech" self.last_voice_time = time.time() def _reset_buffers(self): """Reset tất cả buffers""" self.active_speech_buffer = [] self.backup_speech_buffer = [] self.audio_buffer = [] self.state = "silence" def _normalize_audio(self, audio: np.ndarray) -> np.ndarray: """Chuẩn hóa audio""" if audio.dtype != np.float32: audio = audio.astype(np.float32) if np.max(np.abs(audio)) > 1.0: audio = audio / 32768.0 return np.clip(audio, -1.0, 1.0) def _get_speech_probability(self, audio_chunk: np.ndarray) -> float: """Lấy xác suất speech""" try: if len(audio_chunk) != self.chunk_size: return 0.0 audio_tensor = torch.from_numpy(audio_chunk).float().unsqueeze(0) with torch.no_grad(): return self.model(audio_tensor, self.sample_rate).item() except Exception as e: print(f" Lỗi speech probability: {e}") return 0.0 def _resample_audio(self, audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: """Resample audio""" if orig_sr == target_sr: return audio try: from scipy import signal duration = len(audio) / orig_sr new_length = int(duration * target_sr) resampled_audio = signal.resample(audio, new_length) return resampled_audio.astype(np.float32) except Exception: return audio def is_speech(self, audio_chunk: np.ndarray, sample_rate: int) -> bool: """Kiểm tra speech (cho compatibility)""" if self.model is None: return True try: if sample_rate != self.sample_rate: audio_chunk = self._resample_audio(audio_chunk, sample_rate, self.sample_rate) audio_chunk = self._normalize_audio(audio_chunk) chunk_size = 512 speech_probs = [] for i in range(0, len(audio_chunk), chunk_size): chunk = audio_chunk[i:i+chunk_size] if len(chunk) == chunk_size: prob = self._get_speech_probability(chunk) speech_probs.append(prob) return np.mean(speech_probs) > self.speech_threshold if speech_probs else False except Exception as e: print(f" Lỗi kiểm tra speech: {e}") return True