Spaces:
Running
Running
| import os | |
| import tqdm | |
| import time | |
| import numpy as np | |
| from typing import BinaryIO, Union, Tuple | |
| from datetime import datetime, timedelta | |
| import faster_whisper | |
| import ctranslate2 | |
| import whisper | |
| import torch | |
| import gradio as gr | |
| from .base_interface import BaseInterface | |
| from modules.subtitle_manager import get_srt, get_vtt, get_txt, write_file, safe_filename | |
| from modules.youtube_manager import get_ytdata, get_ytaudio | |
| class FasterWhisperInference(BaseInterface): | |
| def __init__(self): | |
| super().__init__() | |
| self.current_model_size = None | |
| self.model = None | |
| self.available_models = whisper.available_models() | |
| self.available_langs = sorted(list(whisper.tokenizer.LANGUAGES.values())) | |
| self.translatable_models = ["large", "large-v1", "large-v2"] | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.available_compute_types = ctranslate2.get_supported_compute_types("cuda") if self.device == "cuda" else ctranslate2.get_supported_compute_types("cpu") | |
| self.current_compute_type = "float16" if self.device == "cuda" else "float32" | |
| self.default_beam_size = 1 | |
| def transcribe_file(self, | |
| fileobjs: list, | |
| model_size: str, | |
| lang: str, | |
| file_format: str, | |
| istranslate: bool, | |
| add_timestamp: bool, | |
| beam_size: int, | |
| log_prob_threshold: float, | |
| no_speech_threshold: float, | |
| compute_type: str, | |
| progress=gr.Progress() | |
| ) -> str: | |
| """ | |
| Write subtitle file from Files | |
| Parameters | |
| ---------- | |
| fileobjs: list | |
| List of files to transcribe from gr.Files() | |
| model_size: str | |
| Whisper model size from gr.Dropdown() | |
| lang: str | |
| Source language of the file to transcribe from gr.Dropdown() | |
| file_format: str | |
| File format to write from gr.Dropdown(). Supported format: [SRT, WebVTT, txt] | |
| istranslate: bool | |
| Boolean value from gr.Checkbox() that determines whether to translate to English. | |
| It's Whisper's feature to translate speech from another language directly into English end-to-end. | |
| add_timestamp: bool | |
| Boolean value from gr.Checkbox() that determines whether to add a timestamp at the end of the filename. | |
| beam_size: int | |
| Int value from gr.Number() that is used for decoding option. | |
| log_prob_threshold: float | |
| float value from gr.Number(). If the average log probability over sampled tokens is | |
| below this value, treat as failed. | |
| no_speech_threshold: float | |
| float value from gr.Number(). If the no_speech probability is higher than this value AND | |
| the average log probability over sampled tokens is below `log_prob_threshold`, | |
| consider the segment as silent. | |
| compute_type: str | |
| compute type from gr.Dropdown(). | |
| see more info : https://opennmt.net/CTranslate2/quantization.html | |
| progress: gr.Progress | |
| Indicator to show progress directly in gradio. | |
| Returns | |
| ---------- | |
| String to return to gr.Textbox() | |
| """ | |
| try: | |
| self.update_model_if_needed(model_size=model_size, compute_type=compute_type, progress=progress) | |
| files_info = {} | |
| for fileobj in fileobjs: | |
| transcribed_segments, time_for_task = self.transcribe( | |
| audio=fileobj.name, | |
| lang=lang, | |
| istranslate=istranslate, | |
| beam_size=beam_size, | |
| log_prob_threshold=log_prob_threshold, | |
| no_speech_threshold=no_speech_threshold, | |
| progress=progress | |
| ) | |
| file_name, file_ext = os.path.splitext(os.path.basename(fileobj.orig_name)) | |
| file_name = safe_filename(file_name) | |
| subtitle = self.generate_and_write_file( | |
| file_name=file_name, | |
| transcribed_segments=transcribed_segments, | |
| add_timestamp=add_timestamp, | |
| file_format=file_format | |
| ) | |
| print(f"{subtitle}") | |
| files_info[file_name] = {"subtitle": subtitle, "time_for_task": time_for_task} | |
| total_result = '' | |
| total_time = 0 | |
| for file_name, info in files_info.items(): | |
| total_result += '------------------------------------\n' | |
| total_result += f'{file_name}\n\n' | |
| total_result += f'{info["subtitle"]}' | |
| total_time += info["time_for_task"] | |
| return f"Done in {self.format_time(total_time)}! Subtitle is in the outputs folder.\n\n{total_result}" | |
| except Exception as e: | |
| print(f"Error transcribing file on line {e}") | |
| finally: | |
| self.release_cuda_memory() | |
| self.remove_input_files([fileobj.name for fileobj in fileobjs]) | |
| def transcribe_youtube(self, | |
| youtubelink: str, | |
| model_size: str, | |
| lang: str, | |
| file_format: str, | |
| istranslate: bool, | |
| add_timestamp: bool, | |
| beam_size: int, | |
| log_prob_threshold: float, | |
| no_speech_threshold: float, | |
| compute_type: str, | |
| progress=gr.Progress() | |
| ) -> str: | |
| """ | |
| Write subtitle file from Youtube | |
| Parameters | |
| ---------- | |
| youtubelink: str | |
| Link of Youtube to transcribe from gr.Textbox() | |
| model_size: str | |
| Whisper model size from gr.Dropdown() | |
| lang: str | |
| Source language of the file to transcribe from gr.Dropdown() | |
| file_format: str | |
| File format to write from gr.Dropdown(). Supported format: [SRT, WebVTT, txt] | |
| istranslate: bool | |
| Boolean value from gr.Checkbox() that determines whether to translate to English. | |
| It's Whisper's feature to translate speech from another language directly into English end-to-end. | |
| add_timestamp: bool | |
| Boolean value from gr.Checkbox() that determines whether to add a timestamp at the end of the filename. | |
| beam_size: int | |
| Int value from gr.Number() that is used for decoding option. | |
| log_prob_threshold: float | |
| float value from gr.Number(). If the average log probability over sampled tokens is | |
| below this value, treat as failed. | |
| no_speech_threshold: float | |
| float value from gr.Number(). If the no_speech probability is higher than this value AND | |
| the average log probability over sampled tokens is below `log_prob_threshold`, | |
| consider the segment as silent. | |
| compute_type: str | |
| compute type from gr.Dropdown(). | |
| see more info : https://opennmt.net/CTranslate2/quantization.html | |
| progress: gr.Progress | |
| Indicator to show progress directly in gradio. | |
| Returns | |
| ---------- | |
| String to return to gr.Textbox() | |
| """ | |
| try: | |
| self.update_model_if_needed(model_size=model_size, compute_type=compute_type, progress=progress) | |
| progress(0, desc="Loading Audio from Youtube..") | |
| yt = get_ytdata(youtubelink) | |
| audio = get_ytaudio(yt) | |
| transcribed_segments, time_for_task = self.transcribe( | |
| audio=audio, | |
| lang=lang, | |
| istranslate=istranslate, | |
| beam_size=beam_size, | |
| log_prob_threshold=log_prob_threshold, | |
| no_speech_threshold=no_speech_threshold, | |
| progress=progress | |
| ) | |
| progress(1, desc="Completed!") | |
| file_name = safe_filename(yt.title) | |
| subtitle = self.generate_and_write_file( | |
| file_name=file_name, | |
| transcribed_segments=transcribed_segments, | |
| add_timestamp=add_timestamp, | |
| file_format=file_format | |
| ) | |
| return f"Done in {self.format_time(time_for_task)}! Subtitle file is in the outputs folder.\n\n{subtitle}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| finally: | |
| try: | |
| if 'yt' not in locals(): | |
| yt = get_ytdata(youtubelink) | |
| file_path = get_ytaudio(yt) | |
| else: | |
| file_path = get_ytaudio(yt) | |
| self.release_cuda_memory() | |
| self.remove_input_files([file_path]) | |
| except Exception as cleanup_error: | |
| pass | |
| def transcribe_mic(self, | |
| micaudio: str, | |
| model_size: str, | |
| lang: str, | |
| file_format: str, | |
| istranslate: bool, | |
| beam_size: int, | |
| log_prob_threshold: float, | |
| no_speech_threshold: float, | |
| compute_type: str, | |
| progress=gr.Progress() | |
| ) -> str: | |
| """ | |
| Write subtitle file from microphone | |
| Parameters | |
| ---------- | |
| micaudio: str | |
| Audio file path from gr.Microphone() | |
| model_size: str | |
| Whisper model size from gr.Dropdown() | |
| lang: str | |
| Source language of the file to transcribe from gr.Dropdown() | |
| file_format: str | |
| File format to write from gr.Dropdown(). Supported format: [SRT, WebVTT, txt] | |
| istranslate: bool | |
| Boolean value from gr.Checkbox() that determines whether to translate to English. | |
| It's Whisper's feature to translate speech from another language directly into English end-to-end. | |
| beam_size: int | |
| Int value from gr.Number() that is used for decoding option. | |
| log_prob_threshold: float | |
| float value from gr.Number(). If the average log probability over sampled tokens is | |
| below this value, treat as failed. | |
| no_speech_threshold: float | |
| float value from gr.Number(). If the no_speech probability is higher than this value AND | |
| the average log probability over sampled tokens is below `log_prob_threshold`, | |
| compute_type: str | |
| compute type from gr.Dropdown(). | |
| see more info : https://opennmt.net/CTranslate2/quantization.html | |
| consider the segment as silent. | |
| progress: gr.Progress | |
| Indicator to show progress directly in gradio. | |
| Returns | |
| ---------- | |
| String to return to gr.Textbox() | |
| """ | |
| try: | |
| self.update_model_if_needed(model_size=model_size, compute_type=compute_type, progress=progress) | |
| progress(0, desc="Loading Audio..") | |
| transcribed_segments, time_for_task = self.transcribe( | |
| audio=micaudio, | |
| lang=lang, | |
| istranslate=istranslate, | |
| beam_size=beam_size, | |
| log_prob_threshold=log_prob_threshold, | |
| no_speech_threshold=no_speech_threshold, | |
| progress=progress | |
| ) | |
| progress(1, desc="Completed!") | |
| subtitle = self.generate_and_write_file( | |
| file_name="Mic", | |
| transcribed_segments=transcribed_segments, | |
| add_timestamp=True, | |
| file_format=file_format | |
| ) | |
| return f"Done in {self.format_time(time_for_task)}! Subtitle file is in the outputs folder.\n\n{subtitle}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| finally: | |
| self.release_cuda_memory() | |
| self.remove_input_files([micaudio]) | |
| def transcribe(self, | |
| audio: Union[str, BinaryIO, np.ndarray], | |
| lang: str, | |
| istranslate: bool, | |
| beam_size: int, | |
| log_prob_threshold: float, | |
| no_speech_threshold: float, | |
| progress: gr.Progress | |
| ) -> Tuple[list, float]: | |
| """ | |
| transcribe method for faster-whisper. | |
| Parameters | |
| ---------- | |
| audio: Union[str, BinaryIO, np.ndarray] | |
| Audio path or file binary or Audio numpy array | |
| lang: str | |
| Source language of the file to transcribe from gr.Dropdown() | |
| istranslate: bool | |
| Boolean value from gr.Checkbox() that determines whether to translate to English. | |
| It's Whisper's feature to translate speech from another language directly into English end-to-end. | |
| beam_size: int | |
| Int value from gr.Number() that is used for decoding option. | |
| log_prob_threshold: float | |
| float value from gr.Number(). If the average log probability over sampled tokens is | |
| below this value, treat as failed. | |
| no_speech_threshold: float | |
| float value from gr.Number(). If the no_speech probability is higher than this value AND | |
| the average log probability over sampled tokens is below `log_prob_threshold`, | |
| consider the segment as silent. | |
| progress: gr.Progress | |
| Indicator to show progress directly in gradio. | |
| Returns | |
| ---------- | |
| segments_result: list[dict] | |
| list of dicts that includes start, end timestamps and transcribed text | |
| elapsed_time: float | |
| elapsed time for transcription | |
| """ | |
| start_time = time.time() | |
| if lang == "Automatic Detection": | |
| lang = None | |
| else: | |
| language_code_dict = {value: key for key, value in whisper.tokenizer.LANGUAGES.items()} | |
| lang = language_code_dict[lang] | |
| segments, info = self.model.transcribe( | |
| audio=audio, | |
| language=lang, | |
| task="translate" if istranslate and self.current_model_size in self.translatable_models else "transcribe", | |
| beam_size=beam_size, | |
| log_prob_threshold=log_prob_threshold, | |
| no_speech_threshold=no_speech_threshold, | |
| ) | |
| progress(0, desc="Loading audio..") | |
| segments_result = [] | |
| for segment in segments: | |
| progress(segment.start / info.duration, desc="Transcribing..") | |
| segments_result.append({ | |
| "start": segment.start, | |
| "end": segment.end, | |
| "text": segment.text | |
| }) | |
| elapsed_time = time.time() - start_time | |
| return segments_result, elapsed_time | |
| def update_model_if_needed(self, | |
| model_size: str, | |
| compute_type: str, | |
| progress: gr.Progress | |
| ): | |
| """ | |
| Initialize model if it doesn't match with current model setting | |
| """ | |
| if model_size != self.current_model_size or self.model is None or self.current_compute_type != compute_type: | |
| progress(0, desc="Initializing Model..") | |
| self.current_model_size = model_size | |
| self.current_compute_type = compute_type | |
| self.model = faster_whisper.WhisperModel( | |
| device=self.device, | |
| model_size_or_path=model_size, | |
| download_root=os.path.join("models", "Whisper", "faster-whisper"), | |
| compute_type=self.current_compute_type | |
| ) | |
| def generate_and_write_file(file_name: str, | |
| transcribed_segments: list, | |
| add_timestamp: bool, | |
| file_format: str, | |
| ) -> str: | |
| """ | |
| This method writes subtitle file and returns str to gr.Textbox | |
| """ | |
| timestamp = datetime.now().strftime("%m%d%H%M%S") | |
| if add_timestamp: | |
| output_path = os.path.join("outputs", f"{file_name}-{timestamp}") | |
| else: | |
| output_path = os.path.join("outputs", f"{file_name}") | |
| if file_format == "SRT": | |
| content = get_srt(transcribed_segments) | |
| write_file(content, f"{output_path}.srt") | |
| elif file_format == "WebVTT": | |
| content = get_vtt(transcribed_segments) | |
| write_file(content, f"{output_path}.vtt") | |
| elif file_format == "txt": | |
| content = get_txt(transcribed_segments) | |
| write_file(content, f"{output_path}.txt") | |
| return content | |
| def format_time(elapsed_time: float) -> str: | |
| hours, rem = divmod(elapsed_time, 3600) | |
| minutes, seconds = divmod(rem, 60) | |
| time_str = "" | |
| if hours: | |
| time_str += f"{hours} hours " | |
| if minutes: | |
| time_str += f"{minutes} minutes " | |
| seconds = round(seconds) | |
| time_str += f"{seconds} seconds" | |
| return time_str.strip() | |