Spaces:
Runtime error
Runtime error
| import os | |
| import librosa | |
| import soundfile as sf | |
| import numpy as np | |
| from pathlib import Path | |
| import io | |
| # Split audio stream at silence points to prevent playback stuttering issues | |
| # caused by AAC encoder frame padding when streaming audio through Gradio audio components. | |
| class AudioStreamProcessor: | |
| def __init__(self, sr=22050, min_silence_duration=0.1, threshold_db=-40): | |
| self.sr = sr | |
| self.min_silence_duration = min_silence_duration | |
| self.threshold_db = threshold_db | |
| self.buffer = np.array([]) | |
| def process(self, audio_data, last=False): | |
| """ | |
| Add audio data and process it | |
| params: | |
| audio_data: audio data in numpy array | |
| last: whether this is the last chunk of data | |
| returns: | |
| Processed audio data, returns None if no split point is found | |
| """ | |
| # Add new data to buffer | |
| self.buffer = np.concatenate([self.buffer, audio_data]) if len(self.buffer) > 0 else audio_data | |
| if last: | |
| result = self.buffer | |
| self.buffer = np.array([]) | |
| return self._to_wav_bytes(result) | |
| # Find silence boundary | |
| split_point = self._find_silence_boundary(self.buffer) | |
| if split_point is not None: | |
| # Modified: Extend split point to the end of silence | |
| silence_end = self._find_silence_end(split_point) | |
| result = self.buffer[:silence_end] | |
| self.buffer = self.buffer[silence_end:] | |
| return self._to_wav_bytes(result) | |
| return None | |
| def _find_silence_boundary(self, audio): | |
| """ | |
| Find the starting point of silence boundary in audio | |
| """ | |
| # Convert audio to decibels | |
| db = librosa.amplitude_to_db(np.abs(audio), ref=np.max) | |
| # Find points below threshold | |
| silence_points = np.where(db < self.threshold_db)[0] | |
| if len(silence_points) == 0: | |
| return None | |
| # Calculate minimum silence samples | |
| min_silence_samples = int(self.min_silence_duration * self.sr) | |
| # Search backwards for continuous silence segment starting point | |
| for i in range(len(silence_points) - min_silence_samples, -1, -1): | |
| if i < 0: | |
| break | |
| if np.all(np.diff(silence_points[i:i+min_silence_samples]) == 1): | |
| return silence_points[i] | |
| return None | |
| def _find_silence_end(self, start_point): | |
| """ | |
| Find the end point of silence segment | |
| """ | |
| db = librosa.amplitude_to_db(np.abs(self.buffer[start_point:]), ref=np.max) | |
| silence_points = np.where(db >= self.threshold_db)[0] | |
| if len(silence_points) == 0: | |
| return len(self.buffer) | |
| return start_point + silence_points[0] | |
| def _to_wav_bytes(self, audio_data): | |
| """ | |
| trans_to_wav_bytes | |
| """ | |
| wav_buffer = io.BytesIO() | |
| sf.write(wav_buffer, audio_data, self.sr, format='WAV') | |
| return wav_buffer.getvalue() | |