Spaces:
Runtime error
Runtime error
| import torch | |
| import time | |
| import moviepy.editor as mp | |
| import psutil | |
| import gradio as gr | |
| import spaces | |
| from transformers import pipeline | |
| from transformers.pipelines.audio_utils import ffmpeg_read | |
| DEFAULT_MODEL_NAME = "distil-whisper/distil-large-v3" | |
| BATCH_SIZE = 8 | |
| device = 0 if torch.cuda.is_available() else "cpu" | |
| if device == "cpu": | |
| DEFAULT_MODEL_NAME = "openai/whisper-tiny" | |
| def load_pipeline(model_name): | |
| return pipeline( | |
| task="automatic-speech-recognition", | |
| model=model_name, | |
| chunk_length_s=30, | |
| device=device, | |
| ) | |
| pipe = load_pipeline(DEFAULT_MODEL_NAME) | |
| def transcribe(inputs, task, model_name): | |
| if inputs is None: | |
| raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
| global pipe | |
| if model_name != pipe.model.name_or_path: | |
| pipe = load_pipeline(model_name) | |
| start_time = time.time() # Record the start time | |
| # Load the audio file and calculate its duration | |
| audio = mp.AudioFileClip(inputs) | |
| audio_duration = audio.duration | |
| text = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True)["text"] | |
| end_time = time.time() # Record the end time | |
| transcription_time = end_time - start_time # Calculate the transcription time | |
| # Create the transcription time output with additional information | |
| transcription_time_output = ( | |
| f"Transcription Time: {transcription_time:.2f} seconds\n" | |
| f"Audio Duration: {audio_duration:.2f} seconds\n" | |
| f"Model Used: {model_name}\n" | |
| f"Device Used: {'GPU' if torch.cuda.is_available() else 'CPU'}" | |
| ) | |
| return text, transcription_time_output | |
| from gpustat import GPUStatCollection | |
| def update_gpu_status(): | |
| if torch.cuda.is_available() == False: | |
| return "No Nviadia Device" | |
| try: | |
| gpu_stats = GPUStatCollection.new_query() | |
| for gpu in gpu_stats: | |
| # Assuming you want to monitor the first GPU, index 0 | |
| gpu_id = gpu.index | |
| gpu_name = gpu.name | |
| gpu_utilization = gpu.utilization | |
| memory_used = gpu.memory_used | |
| memory_total = gpu.memory_total | |
| memory_utilization = (memory_used / memory_total) * 100 | |
| gpu_status=(f"GPU {gpu_id}: {gpu_name}, Utilization: {gpu_utilization}%, Memory Used: {memory_used}MB, Memory Total: {memory_total}MB, Memory Utilization: {memory_utilization:.2f}%") | |
| return gpu_status | |
| except Exception as e: | |
| print(f"Error getting GPU stats: {e}") | |
| # def update_gpu_status(): | |
| # if torch.cuda.is_available(): | |
| # gpu_info = torch.cuda.get_device_name(0) | |
| # gpu_memory = torch.cuda.mem_get_info(0) | |
| # total_memory = gpu_memory[1] / (1024 * 1024) | |
| # used_memory = (gpu_memory[1] - gpu_memory[0]) / (1024 * 1024) | |
| # gpu_status = f"GPU: {gpu_info}\nTotal Memory: {total_memory:.2f} MB\nUsed Memory: {used_memory:.2f} MB" | |
| # else: | |
| # gpu_status = "No GPU available" | |
| # return gpu_status | |
| def update_cpu_status(): | |
| import datetime | |
| # Get the current time | |
| current_time = datetime.datetime.now().time() | |
| # Convert the time to a string | |
| time_str = current_time.strftime("%H:%M:%S") | |
| cpu_percent = psutil.cpu_percent() | |
| cpu_status = f"CPU Usage: {cpu_percent}% {time_str}" | |
| return cpu_status | |
| def update_status(): | |
| gpu_status = update_gpu_status() | |
| cpu_status = update_cpu_status() | |
| return gpu_status, cpu_status | |
| def refresh_status(): | |
| return update_status() | |
| demo = gr.Blocks() | |
| mf_transcribe = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Audio(type="filepath"), | |
| gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
| gr.Textbox( | |
| label="Model Name", | |
| value=DEFAULT_MODEL_NAME, | |
| placeholder="Enter the model name", | |
| info="Some available models: distil-whisper/distil-large-v3 distil-whisper/distil-medium.en Systran/faster-distil-whisper-large-v3 Systran/faster-whisper-large-v3 Systran/faster-whisper-medium openai/whisper-tiny, openai/whisper-base, openai/whisper-medium, openai/whisper-large-v3", | |
| ), | |
| ], | |
| outputs=[gr.TextArea(label="Transcription"), gr.TextArea(label="Transcription Info")], | |
| theme="huggingface", | |
| title="Whisper Transcription", | |
| description=( | |
| "Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the specified OpenAI Whisper" | |
| " checkpoint and 🤗 Transformers to transcribe audio files of arbitrary length." | |
| ), | |
| allow_flagging="never", | |
| ) | |
| file_transcribe = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Audio(type="filepath", label="Audio file"), | |
| gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
| gr.Textbox( | |
| label="Model Name", | |
| value=DEFAULT_MODEL_NAME, | |
| placeholder="Enter the model name", | |
| info="Some available models: openai/whisper-tiny, openai/whisper-base, openai/whisper-medium, openai/whisper-large-v2", | |
| ), | |
| ], | |
| outputs=[gr.TextArea(label="Transcription"), gr.TextArea(label="Transcription Info")], | |
| theme="huggingface", | |
| title="Whisper Transcription", | |
| description=( | |
| "Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the specified OpenAI Whisper" | |
| " checkpoint and 🤗 Transformers to transcribe audio files of arbitrary length." | |
| ), | |
| allow_flagging="never", | |
| ) | |
| with demo: | |
| gr.TabbedInterface([mf_transcribe, file_transcribe], ["Microphone", "Audio file"]) | |
| with gr.Row(): | |
| refresh_button = gr.Button("Refresh Status") # Create a refresh button | |
| gpu_status_output = gr.Textbox(label="GPU Status", interactive=False) | |
| cpu_status_output = gr.Textbox(label="CPU Status", interactive=False) | |
| # Link the refresh button to the refresh_status function | |
| refresh_button.click(refresh_status, None, [gpu_status_output, cpu_status_output]) | |
| # Load the initial status using update_status function | |
| demo.load(update_status, inputs=None, outputs=[gpu_status_output, cpu_status_output], every=2, queue=False) | |
| # Launch the Gradio app | |
| demo.launch(share=True) | |