Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import time | |
| import sys | |
| import io | |
| import tempfile | |
| import subprocess | |
| import requests | |
| from urllib.parse import urlparse | |
| from pydub import AudioSegment | |
| import logging | |
| import torch | |
| import importlib | |
| from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline | |
| import yt_dlp | |
| print(f"Current yt-dlp version: {yt_dlp.version.__version__}") | |
| class LogCapture(io.StringIO): | |
| def __init__(self, callback): | |
| super().__init__() | |
| self.callback = callback | |
| def write(self, s): | |
| super().write(s) | |
| self.callback(s) | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Clone and install faster-whisper from GitHub | |
| try: | |
| subprocess.run(["git", "clone", "https://github.com/SYSTRAN/faster-whisper.git"], check=True) | |
| subprocess.run(["pip", "install", "-e", "./faster-whisper"], check=True) | |
| except subprocess.CalledProcessError as e: | |
| logging.error(f"Error during faster-whisper installation: {e}") | |
| sys.exit(1) | |
| sys.path.append("./faster-whisper") | |
| from faster_whisper import WhisperModel | |
| from faster_whisper.transcribe import BatchedInferencePipeline | |
| # Check for CUDA availability | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| logging.info(f"Using device: {device}") | |
| def download_audio(url, method_choice, proxy_url, proxy_username, proxy_password): | |
| """ | |
| Downloads audio from a given URL using the specified method and proxy settings. | |
| Args: | |
| url (str): The URL of the audio. | |
| method_choice (str): The method to use for downloading audio. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| Returns: | |
| tuple: (path to the downloaded audio file, is_temp_file), or (None, False) if failed. | |
| """ | |
| parsed_url = urlparse(url) | |
| logging.info(f"Downloading audio from URL: {url} using method: {method_choice}") | |
| try: | |
| if 'youtube.com' in parsed_url.netloc or 'youtu.be' in parsed_url.netloc: | |
| audio_file = download_youtube_audio(url, method_choice, proxy_url, proxy_username, proxy_password) | |
| if not audio_file: | |
| error_msg = f"Failed to download audio from {url} using method {method_choice}. Ensure yt-dlp is up to date." | |
| logging.error(error_msg) | |
| return None, False | |
| elif parsed_url.scheme == 'rtsp': | |
| audio_file = download_rtsp_audio(url, proxy_url) | |
| if not audio_file: | |
| error_msg = f"Failed to download RTSP audio from {url}" | |
| logging.error(error_msg) | |
| return None, False | |
| else: | |
| audio_file = download_direct_audio(url, method_choice, proxy_url, proxy_username, proxy_password) | |
| if not audio_file: | |
| error_msg = f"Failed to download audio from {url} using method {method_choice}" | |
| logging.error(error_msg) | |
| return None, False | |
| return audio_file, True | |
| except Exception as e: | |
| error_msg = f"Error downloading audio from {url} using method {method_choice}: {str(e)}" | |
| logging.error(error_msg) | |
| return None, False | |
| def download_youtube_audio(url, method_choice, proxy_url, proxy_username, proxy_password): | |
| """ | |
| Downloads audio from a YouTube URL using the specified method. | |
| Args: | |
| url (str): The YouTube URL. | |
| method_choice (str): The method to use for downloading. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| Returns: | |
| str: Path to the downloaded audio file, or None if failed. | |
| """ | |
| methods = { | |
| 'yt-dlp': yt_dlp_method, | |
| 'pytube': pytube_method, | |
| } | |
| method = methods.get(method_choice, yt_dlp_method) | |
| try: | |
| logging.info(f"Attempting to download YouTube audio using {method_choice}") | |
| return method(url, proxy_url, proxy_username, proxy_password) | |
| except Exception as e: | |
| logging.error(f"Error downloading using {method_choice}: {str(e)}") | |
| return None | |
| def yt_dlp_method(url, proxy_url, proxy_username, proxy_password): | |
| """ | |
| Downloads YouTube audio using yt-dlp and saves it to a temporary file. | |
| Args: | |
| url (str): The YouTube URL. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| Returns: | |
| str: Path to the downloaded audio file, or None if failed. | |
| """ | |
| logging.info(f"Using yt-dlp {yt_dlp.version.version} method") | |
| temp_dir = tempfile.mkdtemp() | |
| output_template = os.path.join(temp_dir, '%(id)s.%(ext)s') | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': output_template, | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'quiet': False, | |
| 'no_warnings': False, | |
| 'logger': MyLogger(), # Use a custom logger to capture yt-dlp logs | |
| 'progress_hooks': [my_hook], # Hook to capture download progress and errors | |
| } | |
| if proxy_url and len(proxy_url.strip()) > 0: | |
| ydl_opts['proxy'] = proxy_url | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| if 'entries' in info: | |
| # Can be a playlist or a list of videos | |
| info = info['entries'][0] | |
| output_file = ydl.prepare_filename(info) | |
| output_file = os.path.splitext(output_file)[0] + '.mp3' | |
| if os.path.exists(output_file): | |
| logging.info(f"Downloaded YouTube audio: {output_file}") | |
| return output_file | |
| else: | |
| error_msg = "yt-dlp did not produce an output file." | |
| logging.error(error_msg) | |
| return None | |
| except Exception as e: | |
| logging.error(f"yt-dlp failed to download audio: {str(e)}") | |
| return None | |
| class MyLogger(object): | |
| """ | |
| Custom logger for yt-dlp to capture logs and errors. | |
| """ | |
| def debug(self, msg): | |
| logging.debug(msg) | |
| def info(self, msg): | |
| logging.info(msg) | |
| def warning(self, msg): | |
| logging.warning(msg) | |
| def error(self, msg): | |
| logging.error(msg) | |
| def my_hook(d): | |
| """ | |
| Hook function to capture yt-dlp download progress and errors. | |
| """ | |
| if d['status'] == 'finished': | |
| logging.info('Download finished, now converting...') | |
| elif d['status'] == 'error': | |
| logging.error(f"Download error: {d['filename']}") | |
| def pytube_method(url, proxy_url, proxy_username, proxy_password): | |
| """ | |
| Downloads audio from a YouTube URL using pytube and saves it to a temporary file. | |
| Args: | |
| url (str): The YouTube URL. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| Returns: | |
| str: Path to the downloaded audio file, or None if failed. | |
| """ | |
| logging.info("Using pytube method") | |
| from pytube import YouTube | |
| try: | |
| proxies = None | |
| if proxy_url and len(proxy_url.strip()) > 0: | |
| proxies = { | |
| "http": proxy_url, | |
| "https": proxy_url | |
| } | |
| yt = YouTube(url, proxies=proxies) | |
| audio_stream = yt.streams.filter(only_audio=True).first() | |
| if audio_stream is None: | |
| error_msg = "No audio streams available with pytube." | |
| logging.error(error_msg) | |
| return None | |
| temp_dir = tempfile.mkdtemp() | |
| out_file = audio_stream.download(output_path=temp_dir) | |
| base, ext = os.path.splitext(out_file) | |
| new_file = base + '.mp3' | |
| os.rename(out_file, new_file) | |
| logging.info(f"Downloaded and converted audio to: {new_file}") | |
| return new_file | |
| except Exception as e: | |
| logging.error(f"pytube failed to download audio: {str(e)}") | |
| return None | |
| def download_rtsp_audio(url, proxy_url): | |
| """ | |
| Downloads audio from an RTSP URL using FFmpeg. | |
| Args: | |
| url (str): The RTSP URL. | |
| proxy_url (str): Proxy URL if needed. | |
| Returns: | |
| str: Path to the downloaded audio file, or None if failed. | |
| """ | |
| logging.info("Using FFmpeg to download RTSP stream") | |
| output_file = tempfile.mktemp(suffix='.mp3') | |
| command = ['ffmpeg', '-i', url, '-acodec', 'libmp3lame', '-ab', '192k', '-y', output_file] | |
| env = os.environ.copy() | |
| if proxy_url and len(proxy_url.strip()) > 0: | |
| env['http_proxy'] = proxy_url | |
| env['https_proxy'] = proxy_url | |
| try: | |
| subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) | |
| logging.info(f"Downloaded RTSP audio to: {output_file}") | |
| return output_file | |
| except subprocess.CalledProcessError as e: | |
| logging.error(f"FFmpeg error: {e.stderr.decode()}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error downloading RTSP audio: {str(e)}") | |
| return None | |
| def download_direct_audio(url, method_choice, proxy_url, proxy_username, proxy_password): | |
| """ | |
| Downloads audio from a direct URL using the specified method. | |
| Args: | |
| url (str): The direct URL of the audio file. | |
| method_choice (str): The method to use for downloading. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| Returns: | |
| str: Path to the downloaded audio file, or None if failed. | |
| """ | |
| logging.info(f"Downloading direct audio from: {url} using method: {method_choice}") | |
| methods = { | |
| 'wget': wget_method, | |
| 'requests': requests_method, | |
| 'yt-dlp': yt_dlp_direct_method, | |
| 'ffmpeg': ffmpeg_method, | |
| 'aria2': aria2_method, | |
| } | |
| method = methods.get(method_choice, requests_method) | |
| try: | |
| audio_file = method(url, proxy_url, proxy_username, proxy_password) | |
| if not audio_file or not os.path.exists(audio_file): | |
| error_msg = f"Failed to download direct audio from {url} using method {method_choice}" | |
| logging.error(error_msg) | |
| return None | |
| return audio_file | |
| except Exception as e: | |
| logging.error(f"Error downloading direct audio with {method_choice}: {str(e)}") | |
| return None | |
| def requests_method(url, proxy_url, proxy_username, proxy_password): | |
| """ | |
| Downloads audio using the requests library. | |
| Args: | |
| url (str): The URL of the audio file. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| Returns: | |
| str: Path to the downloaded audio file, or None if failed. | |
| """ | |
| try: | |
| proxies = None | |
| auth = None | |
| if proxy_url and len(proxy_url.strip()) > 0: | |
| proxies = { | |
| "http": proxy_url, | |
| "https": proxy_url | |
| } | |
| if proxy_username and proxy_password: | |
| auth = (proxy_username, proxy_password) | |
| response = requests.get(url, stream=True, proxies=proxies, auth=auth) | |
| if response.status_code == 200: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| temp_file.write(chunk) | |
| logging.info(f"Downloaded direct audio to: {temp_file.name}") | |
| return temp_file.name | |
| else: | |
| logging.error(f"Failed to download audio from {url} with status code {response.status_code}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error in requests_method: {str(e)}") | |
| return None | |
| def wget_method(url, proxy_url, proxy_username, proxy_password): | |
| """ | |
| Downloads audio using the wget command-line tool. | |
| Args: | |
| url (str): The URL of the audio file. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| Returns: | |
| str: Path to the downloaded audio file, or None if failed. | |
| """ | |
| logging.info("Using wget method") | |
| output_file = tempfile.mktemp(suffix='.mp3') | |
| command = ['wget', '-O', output_file, url] | |
| env = os.environ.copy() | |
| if proxy_url and len(proxy_url.strip()) > 0: | |
| env['http_proxy'] = proxy_url | |
| env['https_proxy'] = proxy_url | |
| try: | |
| subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) | |
| logging.info(f"Downloaded audio to: {output_file}") | |
| return output_file | |
| except subprocess.CalledProcessError as e: | |
| logging.error(f"Wget error: {e.stderr.decode()}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error in wget_method: {str(e)}") | |
| return None | |
| def yt_dlp_direct_method(url, proxy_url, proxy_username, proxy_password): | |
| """ | |
| Downloads audio using yt-dlp (supports various protocols and sites). | |
| Args: | |
| url (str): The URL of the audio or webpage containing audio. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| Returns: | |
| str: Path to the downloaded audio file, or None if failed. | |
| """ | |
| logging.info("Using yt-dlp direct method") | |
| output_file = tempfile.mktemp(suffix='.mp3') | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': output_file, | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| } | |
| if proxy_url and len(proxy_url.strip()) > 0: | |
| ydl_opts['proxy'] = proxy_url | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| logging.info(f"Downloaded audio to: {output_file}") | |
| return output_file | |
| except Exception as e: | |
| logging.error(f"Error in yt_dlp_direct_method: {str(e)}") | |
| return None | |
| def ffmpeg_method(url, proxy_url, proxy_username, proxy_password): | |
| """ | |
| Downloads audio using FFmpeg. | |
| Args: | |
| url (str): The URL of the audio file. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| Returns: | |
| str: Path to the downloaded audio file, or None if failed. | |
| """ | |
| logging.info("Using ffmpeg method") | |
| output_file = tempfile.mktemp(suffix='.mp3') | |
| command = ['ffmpeg', '-i', url, '-vn', '-acodec', 'libmp3lame', '-q:a', '2', output_file] | |
| env = os.environ.copy() | |
| if proxy_url and len(proxy_url.strip()) > 0: | |
| env['http_proxy'] = proxy_url | |
| env['https_proxy'] = proxy_url | |
| try: | |
| subprocess.run(command, check=True, capture_output=True, text=True, env=env) | |
| logging.info(f"Downloaded and converted audio to: {output_file}") | |
| return output_file | |
| except subprocess.CalledProcessError as e: | |
| logging.error(f"FFmpeg error: {e.stderr}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error in ffmpeg_method: {str(e)}") | |
| return None | |
| def aria2_method(url, proxy_url, proxy_username, proxy_password): | |
| """ | |
| Downloads audio using aria2. | |
| Args: | |
| url (str): The URL of the audio file. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| Returns: | |
| str: Path to the downloaded audio file, or None if failed. | |
| """ | |
| logging.info("Using aria2 method") | |
| output_file = tempfile.mktemp(suffix='.mp3') | |
| command = ['aria2c', '--split=4', '--max-connection-per-server=4', '--out', output_file, url] | |
| if proxy_url and len(proxy_url.strip()) > 0: | |
| command.extend(['--all-proxy', proxy_url]) | |
| try: | |
| subprocess.run(command, check=True, capture_output=True, text=True) | |
| logging.info(f"Downloaded audio to: {output_file}") | |
| return output_file | |
| except subprocess.CalledProcessError as e: | |
| logging.error(f"Aria2 error: {e.stderr}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error in aria2_method: {str(e)}") | |
| return None | |
| def trim_audio(audio_path, start_time, end_time): | |
| """ | |
| Trims an audio file to the specified start and end times. | |
| Args: | |
| audio_path (str): Path to the audio file. | |
| start_time (float): Start time in seconds. | |
| end_time (float): End time in seconds. | |
| Returns: | |
| str: Path to the trimmed audio file. | |
| Raises: | |
| gr.Error: If invalid start or end times are provided. | |
| """ | |
| try: | |
| logging.info(f"Trimming audio from {start_time} to {end_time}") | |
| audio = AudioSegment.from_file(audio_path) | |
| audio_duration = len(audio) / 1000 # Duration in seconds | |
| # Default start and end times if None | |
| start_time = max(0, start_time) if start_time is not None else 0 | |
| end_time = min(audio_duration, end_time) if end_time is not None else audio_duration | |
| # Validate times | |
| if start_time >= end_time: | |
| raise gr.Error("End time must be greater than start time.") | |
| trimmed_audio = audio[int(start_time * 1000):int(end_time * 1000)] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_audio_file: | |
| trimmed_audio.export(temp_audio_file.name, format="wav") | |
| logging.info(f"Trimmed audio saved to: {temp_audio_file.name}") | |
| return temp_audio_file.name | |
| except Exception as e: | |
| logging.error(f"Error trimming audio: {str(e)}") | |
| raise gr.Error(f"Error trimming audio: {str(e)}") | |
| def save_transcription(transcription): | |
| """ | |
| Saves the transcription text to a temporary file. | |
| Args: | |
| transcription (str): The transcription text. | |
| Returns: | |
| str: The path to the transcription file. | |
| """ | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.txt', mode='w', encoding='utf-8') as temp_file: | |
| temp_file.write(transcription) | |
| logging.info(f"Transcription saved to: {temp_file.name}") | |
| return temp_file.name | |
| def get_model_options(pipeline_type): | |
| """ | |
| Returns a list of model IDs based on the selected pipeline type. | |
| Args: | |
| pipeline_type (str): The type of pipeline. | |
| Returns: | |
| list: A list of model IDs. | |
| """ | |
| if pipeline_type == "faster-batched": | |
| return ["cstr/whisper-large-v3-turbo-german-int8_float32","cstr/whisper-large-v3-turbo-int8_float32", "SYSTRAN/faster-whisper-large-v1", "GalaktischeGurke/primeline-whisper-large-v3-german-ct2"] | |
| elif pipeline_type == "faster-sequenced": | |
| return ["cstr/whisper-large-v3-turbo-german-int8_float32","SYSTRAN/faster-whisper-large-v1", "GalaktischeGurke/primeline-whisper-large-v3-german-ct2"] | |
| elif pipeline_type == "transformers": | |
| return ["cstr/whisper-large-v3-turbo-german-int8_float32","openai/whisper-large-v3", "openai/whisper-large-v2", "openai/whisper-medium", "openai/whisper-small"] | |
| else: | |
| return [] | |
| # Dictionary to store loaded models | |
| loaded_models = {} | |
| def transcribe_audio(audio_input, audio_url, proxy_url, proxy_username, proxy_password, pipeline_type, model_id, dtype, batch_size, download_method, start_time=None, end_time=None, verbose=False, include_timecodes=False): | |
| """ | |
| Transcribes audio from a given source using the specified pipeline and model. | |
| Args: | |
| audio_input (str): Path to uploaded audio file or recorded audio. | |
| audio_url (str): URL of audio. | |
| proxy_url (str): Proxy URL if needed. | |
| proxy_username (str): Proxy username. | |
| proxy_password (str): Proxy password. | |
| pipeline_type (str): Type of pipeline to use ('faster-batched', 'faster-sequenced', or 'transformers'). | |
| model_id (str): The ID of the model to use. | |
| dtype (str): Data type for model computations ('int8', 'float16', or 'float32'). | |
| batch_size (int): Batch size for transcription. | |
| download_method (str): Method to use for downloading audio. | |
| start_time (float, optional): Start time in seconds for trimming audio. | |
| end_time (float, optional): End time in seconds for trimming audio. | |
| verbose (bool, optional): Whether to output verbose logging. | |
| include_timecodes (bool, optional): Whether to include timecodes in the transcription. | |
| Yields: | |
| Tuple[str, str, str or None]: Metrics and messages, transcription text, path to transcription file. | |
| """ | |
| try: | |
| if verbose: | |
| logging.getLogger().setLevel(logging.INFO) | |
| else: | |
| logging.getLogger().setLevel(logging.WARNING) | |
| logging.info(f"Transcription parameters: pipeline_type={pipeline_type}, model_id={model_id}, dtype={dtype}, batch_size={batch_size}, download_method={download_method}") | |
| verbose_messages = f"Starting transcription with parameters:\nPipeline Type: {pipeline_type}\nModel ID: {model_id}\nData Type: {dtype}\nBatch Size: {batch_size}\nDownload Method: {download_method}\n" | |
| if verbose: | |
| yield verbose_messages, "", None | |
| # Determine the audio source | |
| audio_path = None | |
| is_temp_file = False | |
| if audio_input is not None and len(audio_input) > 0: | |
| # audio_input is a filepath to uploaded or recorded audio | |
| audio_path = audio_input | |
| is_temp_file = False | |
| elif audio_url is not None and len(audio_url.strip()) > 0: | |
| # audio_url is provided | |
| audio_path, is_temp_file = download_audio(audio_url, download_method, proxy_url, proxy_username, proxy_password) | |
| if not audio_path: | |
| error_msg = f"Error downloading audio from {audio_url} using method {download_method}. Check logs for details." | |
| logging.error(error_msg) | |
| yield verbose_messages + error_msg, "", None | |
| return | |
| else: | |
| error_msg = "No audio source provided. Please upload an audio file, record audio, or enter a URL." | |
| logging.error(error_msg) | |
| yield verbose_messages + error_msg, "", None | |
| return | |
| # Convert start_time and end_time to float or None | |
| start_time = float(start_time) if start_time else None | |
| end_time = float(end_time) if end_time else None | |
| if start_time is not None or end_time is not None: | |
| audio_path = trim_audio(audio_path, start_time, end_time) | |
| is_temp_file = True # The trimmed audio is a temporary file | |
| verbose_messages += f"Audio trimmed from {start_time} to {end_time}\n" | |
| if verbose: | |
| yield verbose_messages, "", None | |
| # Model caching | |
| model_key = (pipeline_type, model_id, dtype) | |
| if model_key in loaded_models: | |
| model_or_pipeline = loaded_models[model_key] | |
| logging.info("Loaded model from cache") | |
| else: | |
| if pipeline_type == "faster-batched": | |
| model = WhisperModel(model_id, device=device, compute_type=dtype) | |
| model_or_pipeline = BatchedInferencePipeline(model=model) | |
| elif pipeline_type == "faster-sequenced": | |
| model_or_pipeline = WhisperModel(model_id, device=device, compute_type=dtype) | |
| elif pipeline_type == "transformers": | |
| # Adjust torch_dtype based on dtype and device | |
| if dtype == "float16" and device == "cpu": | |
| torch_dtype = torch.float32 | |
| elif dtype == "float16": | |
| torch_dtype = torch.float16 | |
| else: | |
| torch_dtype = torch.float32 | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
| model_id, torch_dtype=torch_dtype | |
| ) | |
| processor = AutoProcessor.from_pretrained(model_id) | |
| model_or_pipeline = pipeline( | |
| "automatic-speech-recognition", | |
| model=model, | |
| tokenizer=processor.tokenizer, | |
| feature_extractor=processor.feature_extractor, | |
| chunk_length_s=30, | |
| batch_size=batch_size, | |
| return_timestamps=True, | |
| device=device, | |
| ) | |
| else: | |
| error_msg = "Invalid pipeline type" | |
| logging.error(error_msg) | |
| yield verbose_messages + error_msg, "", None | |
| return | |
| loaded_models[model_key] = model_or_pipeline # Cache the model or pipeline | |
| # Perform the transcription | |
| start_time_perf = time.time() | |
| transcription = "" | |
| if pipeline_type == "faster-batched": | |
| segments, info = model_or_pipeline.transcribe(audio_path, batch_size=batch_size) | |
| # Since segments is a generator, we need to iterate over it to complete transcription | |
| segments = list(segments) # Exhaust the generator | |
| elif pipeline_type == "faster-sequenced": | |
| segments, info = model_or_pipeline.transcribe(audio_path) | |
| segments = list(segments) # Exhaust the generator | |
| else: | |
| result = model_or_pipeline(audio_path) | |
| segments = result["chunks"] | |
| end_time_perf = time.time() | |
| # Calculate metrics | |
| transcription_time = end_time_perf - start_time_perf | |
| audio_file_size = os.path.getsize(audio_path) / (1024 * 1024) | |
| metrics_output = ( | |
| f"Transcription time: {transcription_time:.2f} seconds\n" | |
| f"Audio file size: {audio_file_size:.2f} MB\n" | |
| ) | |
| if verbose: | |
| yield verbose_messages + metrics_output, "", None | |
| # Compile the transcription text | |
| for segment in segments: | |
| if pipeline_type in ["faster-batched", "faster-sequenced"]: | |
| if include_timecodes: | |
| transcription_segment = f"[{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text}\n" | |
| else: | |
| transcription_segment = f"{segment.text}\n" | |
| else: | |
| if include_timecodes: | |
| transcription_segment = f"[{segment['timestamp'][0]:.2f}s -> {segment['timestamp'][1]:.2f}s] {segment['text']}\n" | |
| else: | |
| transcription_segment = f"{segment['text']}\n" | |
| transcription += transcription_segment | |
| if verbose: | |
| yield verbose_messages + metrics_output, transcription, None | |
| # Save the transcription to a file | |
| transcription_file = save_transcription(transcription) | |
| yield verbose_messages + metrics_output, transcription, transcription_file | |
| except Exception as e: | |
| error_msg = f"An error occurred during transcription: {str(e)}" | |
| logging.error(error_msg) | |
| yield verbose_messages + error_msg, "", None | |
| finally: | |
| # Clean up temporary audio files | |
| if audio_path and is_temp_file and os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| with gr.Blocks() as iface: | |
| gr.Markdown("# Audio Transcription") | |
| gr.Markdown("Transcribe audio using multiple pipelines and (Faster) Whisper models.") | |
| with gr.Row(): | |
| audio_input = gr.Audio(label="Upload or Record Audio", sources=["upload", "microphone"], type="filepath") | |
| audio_url = gr.Textbox(label="Or Enter URL of audio file or YouTube link") | |
| transcribe_button = gr.Button("Transcribe") | |
| with gr.Accordion("Advanced Options", open=False): | |
| with gr.Row(): | |
| proxy_url = gr.Textbox(label="Proxy URL", placeholder="Enter proxy URL if needed", value="", lines=1) | |
| proxy_username = gr.Textbox(label="Proxy Username", placeholder="Proxy username (optional)", value="", lines=1) | |
| proxy_password = gr.Textbox(label="Proxy Password", placeholder="Proxy password (optional)", value="", lines=1, type="password") | |
| with gr.Row(): | |
| pipeline_type = gr.Dropdown( | |
| choices=["faster-batched", "faster-sequenced", "transformers"], | |
| label="Pipeline Type", | |
| value="faster-batched" | |
| ) | |
| model_id = gr.Dropdown( | |
| label="Model", | |
| choices=get_model_options("faster-batched"), | |
| value="cstr/whisper-large-v3-turbo-int8_float32" | |
| ) | |
| with gr.Row(): | |
| dtype = gr.Dropdown(choices=["int8", "float16", "float32"], label="Data Type", value="int8") | |
| batch_size = gr.Slider(minimum=1, maximum=32, step=1, value=16, label="Batch Size") | |
| download_method = gr.Dropdown( | |
| choices=["yt-dlp", "pytube", "youtube-dl", "yt-dlp-alt", "ffmpeg", "aria2", "wget"], | |
| label="Download Method", | |
| value="yt-dlp" | |
| ) | |
| with gr.Row(): | |
| start_time = gr.Number(label="Start Time (seconds)", value=None, minimum=0) | |
| end_time = gr.Number(label="End Time (seconds)", value=None, minimum=0) | |
| verbose = gr.Checkbox(label="Verbose Output", value=False) | |
| include_timecodes = gr.Checkbox(label="Include timecodes in transcription", value=False) | |
| with gr.Row(): | |
| metrics_output = gr.Textbox(label="Transcription Metrics and Verbose Messages", lines=10) | |
| transcription_output = gr.Textbox(label="Transcription", lines=10) | |
| transcription_file = gr.File(label="Download Transcription") | |
| def update_model_dropdown(pipeline_type): | |
| """ | |
| Updates the model dropdown choices based on the selected pipeline type. | |
| Args: | |
| pipeline_type (str): The selected pipeline type. | |
| Returns: | |
| gr.update: Updated model dropdown component. | |
| """ | |
| try: | |
| model_choices = get_model_options(pipeline_type) | |
| logging.info(f"Model choices for {pipeline_type}: {model_choices}") | |
| if model_choices: | |
| return gr.update(choices=model_choices, value=model_choices[0], visible=True) | |
| else: | |
| return gr.update(choices=["No models available"], value=None, visible=False) | |
| except Exception as e: | |
| logging.error(f"Error in update_model_dropdown: {str(e)}") | |
| return gr.update(choices=["Error"], value="Error", visible=True) | |
| # Event handler for pipeline_type change | |
| pipeline_type.change(update_model_dropdown, inputs=[pipeline_type], outputs=[model_id]) | |
| def transcribe_with_progress(*args): | |
| # The audio_input is now the first argument | |
| for result in transcribe_audio(*args): | |
| yield result | |
| transcribe_button.click( | |
| transcribe_with_progress, | |
| inputs=[audio_input, audio_url, proxy_url, proxy_username, proxy_password, pipeline_type, model_id, dtype, batch_size, download_method, start_time, end_time, verbose, include_timecodes], | |
| outputs=[metrics_output, transcription_output, transcription_file] | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| [None, "https://www.youtube.com/watch?v=daQ_hqA6HDo", "", "", "", "faster-batched", "cstr/whisper-large-v3-turbo-int8_float32", "int8", 16, "yt-dlp", None, None, False, False], | |
| [None, "https://mcdn.podbean.com/mf/web/dir5wty678b6g4vg/HoP_453.mp3", "", "", "", "faster-sequenced", "SYSTRAN/faster-whisper-large-v1", "float16", 1, "ffmpeg", 0, 300, False, False], | |
| ], | |
| inputs=[audio_input, audio_url, proxy_url, proxy_username, proxy_password, pipeline_type, model_id, dtype, batch_size, download_method, start_time, end_time, verbose, include_timecodes], | |
| ) | |
| iface.launch(share=False, debug=True) |