Spaces:
Running
on
Zero
Running
on
Zero
| import typing | |
| import julius | |
| import numpy as np | |
| import torch | |
| import torchaudio | |
| from . import util | |
| class EffectMixin: | |
| GAIN_FACTOR = np.log(10) / 20 | |
| """Gain factor for converting between amplitude and decibels.""" | |
| CODEC_PRESETS = { | |
| "8-bit": {"format": "wav", "encoding": "ULAW", "bits_per_sample": 8}, | |
| "GSM-FR": {"format": "gsm"}, | |
| "MP3": {"format": "mp3", "compression": -9}, | |
| "Vorbis": {"format": "vorbis", "compression": -1}, | |
| "Ogg": { | |
| "format": "ogg", | |
| "compression": -1, | |
| }, | |
| "Amr-nb": {"format": "amr-nb"}, | |
| } | |
| """Presets for applying codecs via torchaudio.""" | |
| def mix( | |
| self, | |
| other, | |
| snr: typing.Union[torch.Tensor, np.ndarray, float] = 10, | |
| other_eq: typing.Union[torch.Tensor, np.ndarray] = None, | |
| ): | |
| """Mixes noise with signal at specified | |
| signal-to-noise ratio. Optionally, the | |
| other signal can be equalized in-place. | |
| Parameters | |
| ---------- | |
| other : AudioSignal | |
| AudioSignal object to mix with. | |
| snr : typing.Union[torch.Tensor, np.ndarray, float], optional | |
| Signal to noise ratio, by default 10 | |
| other_eq : typing.Union[torch.Tensor, np.ndarray], optional | |
| EQ curve to apply to other signal, if any, by default None | |
| Returns | |
| ------- | |
| AudioSignal | |
| In-place modification of AudioSignal. | |
| """ | |
| snr = util.ensure_tensor(snr).to(self.device) | |
| pad_len = max(0, self.signal_length - other.signal_length) | |
| other.zero_pad(0, pad_len) | |
| other.truncate_samples(self.signal_length) | |
| if other_eq is not None: | |
| other = other.equalizer(other_eq) | |
| tgt_loudness = self.loudness() - snr | |
| other = other.normalize(tgt_loudness) | |
| self.audio_data = self.audio_data + other.audio_data | |
| return self | |
| def convolve(self, other, start_at_max: bool = True): | |
| """Convolves self with other. | |
| This function uses FFTs to do the convolution. | |
| Parameters | |
| ---------- | |
| other : AudioSignal | |
| Signal to convolve with. | |
| start_at_max : bool, optional | |
| Whether to start at the max value of other signal, to | |
| avoid inducing delays, by default True | |
| Returns | |
| ------- | |
| AudioSignal | |
| Convolved signal, in-place. | |
| """ | |
| from . import AudioSignal | |
| pad_len = self.signal_length - other.signal_length | |
| if pad_len > 0: | |
| other.zero_pad(0, pad_len) | |
| else: | |
| other.truncate_samples(self.signal_length) | |
| if start_at_max: | |
| # Use roll to rotate over the max for every item | |
| # so that the impulse responses don't induce any | |
| # delay. | |
| idx = other.audio_data.abs().argmax(axis=-1) | |
| irs = torch.zeros_like(other.audio_data) | |
| for i in range(other.batch_size): | |
| irs[i] = torch.roll(other.audio_data[i], -idx[i].item(), -1) | |
| other = AudioSignal(irs, other.sample_rate) | |
| delta = torch.zeros_like(other.audio_data) | |
| delta[..., 0] = 1 | |
| length = self.signal_length | |
| delta_fft = torch.fft.rfft(delta, length) | |
| other_fft = torch.fft.rfft(other.audio_data, length) | |
| self_fft = torch.fft.rfft(self.audio_data, length) | |
| convolved_fft = other_fft * self_fft | |
| convolved_audio = torch.fft.irfft(convolved_fft, length) | |
| delta_convolved_fft = other_fft * delta_fft | |
| delta_audio = torch.fft.irfft(delta_convolved_fft, length) | |
| # Use the delta to rescale the audio exactly as needed. | |
| delta_max = delta_audio.abs().max(dim=-1, keepdims=True)[0] | |
| scale = 1 / delta_max.clamp(1e-5) | |
| convolved_audio = convolved_audio * scale | |
| self.audio_data = convolved_audio | |
| return self | |
| def apply_ir( | |
| self, | |
| ir, | |
| drr: typing.Union[torch.Tensor, np.ndarray, float] = None, | |
| ir_eq: typing.Union[torch.Tensor, np.ndarray] = None, | |
| use_original_phase: bool = False, | |
| ): | |
| """Applies an impulse response to the signal. If ` is`ir_eq`` | |
| is specified, the impulse response is equalized before | |
| it is applied, using the given curve. | |
| Parameters | |
| ---------- | |
| ir : AudioSignal | |
| Impulse response to convolve with. | |
| drr : typing.Union[torch.Tensor, np.ndarray, float], optional | |
| Direct-to-reverberant ratio that impulse response will be | |
| altered to, if specified, by default None | |
| ir_eq : typing.Union[torch.Tensor, np.ndarray], optional | |
| Equalization that will be applied to impulse response | |
| if specified, by default None | |
| use_original_phase : bool, optional | |
| Whether to use the original phase, instead of the convolved | |
| phase, by default False | |
| Returns | |
| ------- | |
| AudioSignal | |
| Signal with impulse response applied to it | |
| """ | |
| if ir_eq is not None: | |
| ir = ir.equalizer(ir_eq) | |
| if drr is not None: | |
| ir = ir.alter_drr(drr) | |
| # Save the peak before | |
| max_spk = self.audio_data.abs().max(dim=-1, keepdims=True).values | |
| # Augment the impulse response to simulate microphone effects | |
| # and with varying direct-to-reverberant ratio. | |
| phase = self.phase | |
| self.convolve(ir) | |
| # Use the input phase | |
| if use_original_phase: | |
| self.stft() | |
| self.stft_data = self.magnitude * torch.exp(1j * phase) | |
| self.istft() | |
| # Rescale to the input's amplitude | |
| max_transformed = self.audio_data.abs().max(dim=-1, keepdims=True).values | |
| scale_factor = max_spk.clamp(1e-8) / max_transformed.clamp(1e-8) | |
| self = self * scale_factor | |
| return self | |
| def ensure_max_of_audio(self, max: float = 1.0): | |
| """Ensures that ``abs(audio_data) <= max``. | |
| Parameters | |
| ---------- | |
| max : float, optional | |
| Max absolute value of signal, by default 1.0 | |
| Returns | |
| ------- | |
| AudioSignal | |
| Signal with values scaled between -max and max. | |
| """ | |
| peak = self.audio_data.abs().max(dim=-1, keepdims=True)[0] | |
| peak_gain = torch.ones_like(peak) | |
| peak_gain[peak > max] = max / peak[peak > max] | |
| self.audio_data = self.audio_data * peak_gain | |
| return self | |
| def normalize(self, db: typing.Union[torch.Tensor, np.ndarray, float] = -24.0): | |
| """Normalizes the signal's volume to the specified db, in LUFS. | |
| This is GPU-compatible, making for very fast loudness normalization. | |
| Parameters | |
| ---------- | |
| db : typing.Union[torch.Tensor, np.ndarray, float], optional | |
| Loudness to normalize to, by default -24.0 | |
| Returns | |
| ------- | |
| AudioSignal | |
| Normalized audio signal. | |
| """ | |
| db = util.ensure_tensor(db).to(self.device) | |
| ref_db = self.loudness() | |
| gain = db - ref_db | |
| gain = torch.exp(gain * self.GAIN_FACTOR) | |
| self.audio_data = self.audio_data * gain[:, None, None] | |
| return self | |
| def volume_change(self, db: typing.Union[torch.Tensor, np.ndarray, float]): | |
| """Change volume of signal by some amount, in dB. | |
| Parameters | |
| ---------- | |
| db : typing.Union[torch.Tensor, np.ndarray, float] | |
| Amount to change volume by. | |
| Returns | |
| ------- | |
| AudioSignal | |
| Signal at new volume. | |
| """ | |
| db = util.ensure_tensor(db, ndim=1).to(self.device) | |
| gain = torch.exp(db * self.GAIN_FACTOR) | |
| self.audio_data = self.audio_data * gain[:, None, None] | |
| return self | |
| def _to_2d(self): | |
| waveform = self.audio_data.reshape(-1, self.signal_length) | |
| return waveform | |
| def _to_3d(self, waveform): | |
| return waveform.reshape(self.batch_size, self.num_channels, -1) | |
| def pitch_shift(self, n_semitones: int, quick: bool = True): | |
| """Pitch shift the signal. All items in the batch | |
| get the same pitch shift. | |
| Parameters | |
| ---------- | |
| n_semitones : int | |
| How many semitones to shift the signal by. | |
| quick : bool, optional | |
| Using quick pitch shifting, by default True | |
| Returns | |
| ------- | |
| AudioSignal | |
| Pitch shifted audio signal. | |
| """ | |
| device = self.device | |
| effects = [ | |
| ["pitch", str(n_semitones * 100)], | |
| ["rate", str(self.sample_rate)], | |
| ] | |
| if quick: | |
| effects[0].insert(1, "-q") | |
| waveform = self._to_2d().cpu() | |
| waveform, sample_rate = torchaudio.sox_effects.apply_effects_tensor( | |
| waveform, self.sample_rate, effects, channels_first=True | |
| ) | |
| self.sample_rate = sample_rate | |
| self.audio_data = self._to_3d(waveform) | |
| return self.to(device) | |
| def time_stretch(self, factor: float, quick: bool = True): | |
| """Time stretch the audio signal. | |
| Parameters | |
| ---------- | |
| factor : float | |
| Factor by which to stretch the AudioSignal. Typically | |
| between 0.8 and 1.2. | |
| quick : bool, optional | |
| Whether to use quick time stretching, by default True | |
| Returns | |
| ------- | |
| AudioSignal | |
| Time-stretched AudioSignal. | |
| """ | |
| device = self.device | |
| effects = [ | |
| ["tempo", str(factor)], | |
| ["rate", str(self.sample_rate)], | |
| ] | |
| if quick: | |
| effects[0].insert(1, "-q") | |
| waveform = self._to_2d().cpu() | |
| waveform, sample_rate = torchaudio.sox_effects.apply_effects_tensor( | |
| waveform, self.sample_rate, effects, channels_first=True | |
| ) | |
| self.sample_rate = sample_rate | |
| self.audio_data = self._to_3d(waveform) | |
| return self.to(device) | |
| def apply_codec( | |
| self, | |
| preset: str = None, | |
| format: str = "wav", | |
| encoding: str = None, | |
| bits_per_sample: int = None, | |
| compression: int = None, | |
| ): # pragma: no cover | |
| """Applies an audio codec to the signal. | |
| Parameters | |
| ---------- | |
| preset : str, optional | |
| One of the keys in ``self.CODEC_PRESETS``, by default None | |
| format : str, optional | |
| Format for audio codec, by default "wav" | |
| encoding : str, optional | |
| Encoding to use, by default None | |
| bits_per_sample : int, optional | |
| How many bits per sample, by default None | |
| compression : int, optional | |
| Compression amount of codec, by default None | |
| Returns | |
| ------- | |
| AudioSignal | |
| AudioSignal with codec applied. | |
| Raises | |
| ------ | |
| ValueError | |
| If preset is not in ``self.CODEC_PRESETS``, an error | |
| is thrown. | |
| """ | |
| torchaudio_version_070 = "0.7" in torchaudio.__version__ | |
| if torchaudio_version_070: | |
| return self | |
| kwargs = { | |
| "format": format, | |
| "encoding": encoding, | |
| "bits_per_sample": bits_per_sample, | |
| "compression": compression, | |
| } | |
| if preset is not None: | |
| if preset in self.CODEC_PRESETS: | |
| kwargs = self.CODEC_PRESETS[preset] | |
| else: | |
| raise ValueError( | |
| f"Unknown preset: {preset}. " | |
| f"Known presets: {list(self.CODEC_PRESETS.keys())}" | |
| ) | |
| waveform = self._to_2d() | |
| if kwargs["format"] in ["vorbis", "mp3", "ogg", "amr-nb"]: | |
| # Apply it in a for loop | |
| augmented = torch.cat( | |
| [ | |
| torchaudio.functional.apply_codec( | |
| waveform[i][None, :], self.sample_rate, **kwargs | |
| ) | |
| for i in range(waveform.shape[0]) | |
| ], | |
| dim=0, | |
| ) | |
| else: | |
| augmented = torchaudio.functional.apply_codec( | |
| waveform, self.sample_rate, **kwargs | |
| ) | |
| augmented = self._to_3d(augmented) | |
| self.audio_data = augmented | |
| return self | |
| def mel_filterbank(self, n_bands: int): | |
| """Breaks signal into mel bands. | |
| Parameters | |
| ---------- | |
| n_bands : int | |
| Number of mel bands to use. | |
| Returns | |
| ------- | |
| torch.Tensor | |
| Mel-filtered bands, with last axis being the band index. | |
| """ | |
| filterbank = ( | |
| julius.SplitBands(self.sample_rate, n_bands).float().to(self.device) | |
| ) | |
| filtered = filterbank(self.audio_data) | |
| return filtered.permute(1, 2, 3, 0) | |
| def equalizer(self, db: typing.Union[torch.Tensor, np.ndarray]): | |
| """Applies a mel-spaced equalizer to the audio signal. | |
| Parameters | |
| ---------- | |
| db : typing.Union[torch.Tensor, np.ndarray] | |
| EQ curve to apply. | |
| Returns | |
| ------- | |
| AudioSignal | |
| AudioSignal with equalization applied. | |
| """ | |
| db = util.ensure_tensor(db) | |
| n_bands = db.shape[-1] | |
| fbank = self.mel_filterbank(n_bands) | |
| # If there's a batch dimension, make sure it's the same. | |
| if db.ndim == 2: | |
| if db.shape[0] != 1: | |
| assert db.shape[0] == fbank.shape[0] | |
| else: | |
| db = db.unsqueeze(0) | |
| weights = (10**db).to(self.device).float() | |
| fbank = fbank * weights[:, None, None, :] | |
| eq_audio_data = fbank.sum(-1) | |
| self.audio_data = eq_audio_data | |
| return self | |
| def clip_distortion( | |
| self, clip_percentile: typing.Union[torch.Tensor, np.ndarray, float] | |
| ): | |
| """Clips the signal at a given percentile. The higher it is, | |
| the lower the threshold for clipping. | |
| Parameters | |
| ---------- | |
| clip_percentile : typing.Union[torch.Tensor, np.ndarray, float] | |
| Values are between 0.0 to 1.0. Typical values are 0.1 or below. | |
| Returns | |
| ------- | |
| AudioSignal | |
| Audio signal with clipped audio data. | |
| """ | |
| clip_percentile = util.ensure_tensor(clip_percentile, ndim=1) | |
| min_thresh = torch.quantile(self.audio_data, clip_percentile / 2, dim=-1) | |
| max_thresh = torch.quantile(self.audio_data, 1 - (clip_percentile / 2), dim=-1) | |
| nc = self.audio_data.shape[1] | |
| min_thresh = min_thresh[:, :nc, :] | |
| max_thresh = max_thresh[:, :nc, :] | |
| self.audio_data = self.audio_data.clamp(min_thresh, max_thresh) | |
| return self | |
| def quantization( | |
| self, quantization_channels: typing.Union[torch.Tensor, np.ndarray, int] | |
| ): | |
| """Applies quantization to the input waveform. | |
| Parameters | |
| ---------- | |
| quantization_channels : typing.Union[torch.Tensor, np.ndarray, int] | |
| Number of evenly spaced quantization channels to quantize | |
| to. | |
| Returns | |
| ------- | |
| AudioSignal | |
| Quantized AudioSignal. | |
| """ | |
| quantization_channels = util.ensure_tensor(quantization_channels, ndim=3) | |
| x = self.audio_data | |
| x = (x + 1) / 2 | |
| x = x * quantization_channels | |
| x = x.floor() | |
| x = x / quantization_channels | |
| x = 2 * x - 1 | |
| residual = (self.audio_data - x).detach() | |
| self.audio_data = self.audio_data - residual | |
| return self | |
| def mulaw_quantization( | |
| self, quantization_channels: typing.Union[torch.Tensor, np.ndarray, int] | |
| ): | |
| """Applies mu-law quantization to the input waveform. | |
| Parameters | |
| ---------- | |
| quantization_channels : typing.Union[torch.Tensor, np.ndarray, int] | |
| Number of mu-law spaced quantization channels to quantize | |
| to. | |
| Returns | |
| ------- | |
| AudioSignal | |
| Quantized AudioSignal. | |
| """ | |
| mu = quantization_channels - 1.0 | |
| mu = util.ensure_tensor(mu, ndim=3) | |
| x = self.audio_data | |
| # quantize | |
| x = torch.sign(x) * torch.log1p(mu * torch.abs(x)) / torch.log1p(mu) | |
| x = ((x + 1) / 2 * mu + 0.5).to(torch.int64) | |
| # unquantize | |
| x = (x / mu) * 2 - 1.0 | |
| x = torch.sign(x) * (torch.exp(torch.abs(x) * torch.log1p(mu)) - 1.0) / mu | |
| residual = (self.audio_data - x).detach() | |
| self.audio_data = self.audio_data - residual | |
| return self | |
| def __matmul__(self, other): | |
| return self.convolve(other) | |
| class ImpulseResponseMixin: | |
| """These functions are generally only used with AudioSignals that are derived | |
| from impulse responses, not other sources like music or speech. These methods | |
| are used to replicate the data augmentation described in [1]. | |
| 1. Bryan, Nicholas J. "Impulse response data augmentation and deep | |
| neural networks for blind room acoustic parameter estimation." | |
| ICASSP 2020-2020 IEEE International Conference on Acoustics, | |
| Speech and Signal Processing (ICASSP). IEEE, 2020. | |
| """ | |
| def decompose_ir(self): | |
| """Decomposes an impulse response into early and late | |
| field responses. | |
| """ | |
| # Equations 1 and 2 | |
| # ----------------- | |
| # Breaking up into early | |
| # response + late field response. | |
| td = torch.argmax(self.audio_data, dim=-1, keepdim=True) | |
| t0 = int(self.sample_rate * 0.0025) | |
| idx = torch.arange(self.audio_data.shape[-1], device=self.device)[None, None, :] | |
| idx = idx.expand(self.batch_size, -1, -1) | |
| early_idx = (idx >= td - t0) * (idx <= td + t0) | |
| early_response = torch.zeros_like(self.audio_data, device=self.device) | |
| early_response[early_idx] = self.audio_data[early_idx] | |
| late_idx = ~early_idx | |
| late_field = torch.zeros_like(self.audio_data, device=self.device) | |
| late_field[late_idx] = self.audio_data[late_idx] | |
| # Equation 4 | |
| # ---------- | |
| # Decompose early response into windowed | |
| # direct path and windowed residual. | |
| window = torch.zeros_like(self.audio_data, device=self.device) | |
| for idx in range(self.batch_size): | |
| window_idx = early_idx[idx, 0].nonzero() | |
| window[idx, ..., window_idx] = self.get_window( | |
| "hann", window_idx.shape[-1], self.device | |
| ) | |
| return early_response, late_field, window | |
| def measure_drr(self): | |
| """Measures the direct-to-reverberant ratio of the impulse | |
| response. | |
| Returns | |
| ------- | |
| float | |
| Direct-to-reverberant ratio | |
| """ | |
| early_response, late_field, _ = self.decompose_ir() | |
| num = (early_response**2).sum(dim=-1) | |
| den = (late_field**2).sum(dim=-1) | |
| drr = 10 * torch.log10(num / den) | |
| return drr | |
| def solve_alpha(early_response, late_field, wd, target_drr): | |
| """Used to solve for the alpha value, which is used | |
| to alter the drr. | |
| """ | |
| # Equation 5 | |
| # ---------- | |
| # Apply the good ol' quadratic formula. | |
| wd_sq = wd**2 | |
| wd_sq_1 = (1 - wd) ** 2 | |
| e_sq = early_response**2 | |
| l_sq = late_field**2 | |
| a = (wd_sq * e_sq).sum(dim=-1) | |
| b = (2 * (1 - wd) * wd * e_sq).sum(dim=-1) | |
| c = (wd_sq_1 * e_sq).sum(dim=-1) - torch.pow(10, target_drr / 10) * l_sq.sum( | |
| dim=-1 | |
| ) | |
| expr = ((b**2) - 4 * a * c).sqrt() | |
| alpha = torch.maximum( | |
| (-b - expr) / (2 * a), | |
| (-b + expr) / (2 * a), | |
| ) | |
| return alpha | |
| def alter_drr(self, drr: typing.Union[torch.Tensor, np.ndarray, float]): | |
| """Alters the direct-to-reverberant ratio of the impulse response. | |
| Parameters | |
| ---------- | |
| drr : typing.Union[torch.Tensor, np.ndarray, float] | |
| Direct-to-reverberant ratio that impulse response will be | |
| altered to, if specified, by default None | |
| Returns | |
| ------- | |
| AudioSignal | |
| Altered impulse response. | |
| """ | |
| drr = util.ensure_tensor(drr, 2, self.batch_size).to(self.device) | |
| early_response, late_field, window = self.decompose_ir() | |
| alpha = self.solve_alpha(early_response, late_field, window, drr) | |
| min_alpha = ( | |
| late_field.abs().max(dim=-1)[0] / early_response.abs().max(dim=-1)[0] | |
| ) | |
| alpha = torch.maximum(alpha, min_alpha)[..., None] | |
| aug_ir_data = ( | |
| alpha * window * early_response | |
| + ((1 - window) * early_response) | |
| + late_field | |
| ) | |
| self.audio_data = aug_ir_data | |
| self.ensure_max_of_audio() | |
| return self | |