Spaces:
Running
on
Zero
Running
on
Zero
| import torch | |
| from transformers import ( | |
| Wav2Vec2ForCTC, | |
| Wav2Vec2Processor, | |
| AutomaticSpeechRecognitionPipeline, | |
| ) | |
| import gradio as gr | |
| import json | |
| from difflib import Differ | |
| import ffmpeg | |
| from pathlib import Path | |
| import spaces | |
| # Set true if you're using huggingface inference API API https://huggingface.co/inference-api | |
| API_BACKEND = True | |
| # MODEL = 'facebook/wav2vec2-large-960h-lv60-self' | |
| MODEL = "facebook/wav2vec2-large-960h" | |
| # MODEL = "facebook/wav2vec2-base-960h" | |
| # MODEL = "patrickvonplaten/wav2vec2-large-960h-lv60-self-4-gram" | |
| # Load model and processor for manual processing (Spaces Zero compatible) | |
| model = Wav2Vec2ForCTC.from_pretrained(MODEL).to("cuda") | |
| processor = Wav2Vec2Processor.from_pretrained(MODEL) | |
| # Create pipeline with pre-loaded model and processor | |
| speech_recognizer = AutomaticSpeechRecognitionPipeline( | |
| model=model, | |
| feature_extractor=processor.feature_extractor, | |
| tokenizer=processor.tokenizer, | |
| device=0, # Use first CUDA device | |
| ) | |
| videos_out_path = Path("./videos_out") | |
| videos_out_path.mkdir(parents=True, exist_ok=True) | |
| samples_data = sorted(Path("examples").glob("*.json")) | |
| SAMPLES = [] | |
| for file in samples_data: | |
| with open(file) as f: | |
| sample = json.load(f) | |
| SAMPLES.append(sample) | |
| VIDEOS = list(map(lambda x: [x["video"]], SAMPLES)) | |
| def speech_to_text(video_file_path): | |
| """ | |
| Takes a video path to convert to audio, transcribe audio channel to text and char timestamps | |
| Using AutomaticSpeechRecognitionPipeline with pre-loaded model for Spaces Zero compatibility | |
| """ | |
| if video_file_path == None: | |
| raise ValueError("Error no video input") | |
| video_path = Path(video_file_path) | |
| try: | |
| # convert video to audio 16k using PIPE to audio_memory | |
| audio_memory, _ = ( | |
| ffmpeg.input(video_path) | |
| .output("-", format="wav", ac=1, ar="16k") | |
| .overwrite_output() | |
| .global_args("-loglevel", "quiet") | |
| .run(capture_stdout=True) | |
| ) | |
| except Exception as e: | |
| raise RuntimeError("Error converting video to audio") | |
| try: | |
| print("Transcribing via local model") | |
| output = speech_recognizer( | |
| audio_memory, | |
| return_timestamps="char", | |
| chunk_length_s=10, | |
| stride_length_s=(4, 2), | |
| ) | |
| transcription = output["text"].lower() | |
| timestamps = [ | |
| [ | |
| chunk["text"].lower(), | |
| chunk["timestamp"][0].tolist(), | |
| chunk["timestamp"][1].tolist(), | |
| ] | |
| for chunk in output["chunks"] | |
| ] | |
| return (transcription, transcription, timestamps) | |
| except Exception as e: | |
| raise RuntimeError("Error Running inference with local model", e) | |
| def cut_timestamps_to_video(video_in, transcription, text_in, timestamps): | |
| """ | |
| Given original video input, text transcript + timestamps, | |
| and edit ext cuts video segments into a single video | |
| """ | |
| video_path = Path(video_in) | |
| video_file_name = video_path.stem | |
| if video_in == None or text_in == None or transcription == None: | |
| raise ValueError("Inputs undefined") | |
| d = Differ() | |
| # compare original transcription with edit text | |
| diff_chars = d.compare(transcription, text_in) | |
| # remove all text aditions from diff | |
| filtered = list(filter(lambda x: x[0] != "+", diff_chars)) | |
| # filter timestamps to be removed | |
| # timestamps_to_cut = [b for (a,b) in zip(filtered, timestamps_var) if a[0]== '-' ] | |
| # return diff tokes and cutted video!! | |
| # groupping character timestamps so there are less cuts | |
| idx = 0 | |
| grouped = {} | |
| for a, b in zip(filtered, timestamps): | |
| if a[0] != "-": | |
| if idx in grouped: | |
| grouped[idx].append(b) | |
| else: | |
| grouped[idx] = [] | |
| grouped[idx].append(b) | |
| else: | |
| idx += 1 | |
| # after grouping, gets the lower and upter start and time for each group | |
| timestamps_to_cut = [[v[0][1], v[-1][2]] for v in grouped.values()] | |
| between_str = "+".join( | |
| map(lambda t: f"between(t,{t[0]},{t[1]})", timestamps_to_cut) | |
| ) | |
| if timestamps_to_cut: | |
| video_file = ffmpeg.input(video_in) | |
| video = video_file.video.filter("select", f"({between_str})").filter( | |
| "setpts", "N/FRAME_RATE/TB" | |
| ) | |
| audio = video_file.audio.filter("aselect", f"({between_str})").filter( | |
| "asetpts", "N/SR/TB" | |
| ) | |
| output_video = f"./videos_out/{video_file_name}.mp4" | |
| ffmpeg.concat(video, audio, v=1, a=1).output( | |
| output_video | |
| ).overwrite_output().global_args("-loglevel", "quiet").run() | |
| else: | |
| output_video = video_in | |
| tokens = [(token[2:], token[0] if token[0] != " " else None) for token in filtered] | |
| return (tokens, output_video) | |
| # ---- Gradio Layout ----- | |
| video_in = gr.Video(label="Video file", elem_id="video-container") | |
| text_in = gr.Textbox(label="Transcription", lines=10, interactive=True) | |
| video_out = gr.Video(label="Video Out") | |
| diff_out = gr.HighlightedText(label="Cuts Diffs", combine_adjacent=True) | |
| examples = gr.Dataset(components=[video_in], samples=VIDEOS, type="index") | |
| css = """ | |
| #cut_btn, #reset_btn { align-self:stretch; } | |
| #\\31 3 { max-width: 540px; } | |
| .output-markdown {max-width: 65ch !important;} | |
| #video-container{ | |
| max-width: 40rem; | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| transcription_var = gr.State() | |
| timestamps_var = gr.State() | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown(""" | |
| # Edit Video By Editing Text | |
| This project is a quick proof of concept of a simple video editor where the edits | |
| are made by editing the audio transcription. | |
| Using the [Huggingface Automatic Speech Recognition Pipeline](https://huggingface.co/tasks/automatic-speech-recognition) | |
| with a fine tuned [Wav2Vec2 model using Connectionist Temporal Classification (CTC)](https://huggingface.co/facebook/wav2vec2-large-960h-lv60-self) | |
| you can predict not only the text transcription but also the [character or word base timestamps](https://huggingface.co/docs/transformers/v4.19.2/en/main_classes/pipelines#transformers.AutomaticSpeechRecognitionPipeline.__call__.return_timestamps) | |
| """) | |
| with gr.Row(): | |
| examples.render() | |
| def load_example(id): | |
| video = SAMPLES[id]["video"] | |
| transcription = SAMPLES[id]["transcription"].lower() | |
| timestamps = SAMPLES[id]["timestamps"] | |
| return (video, transcription, transcription, timestamps) | |
| examples.click( | |
| load_example, | |
| inputs=[examples], | |
| outputs=[video_in, text_in, transcription_var, timestamps_var], | |
| queue=False, | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_in.render() | |
| transcribe_btn = gr.Button("Transcribe Audio") | |
| transcribe_btn.click( | |
| speech_to_text, [video_in], [text_in, transcription_var, timestamps_var] | |
| ) | |
| with gr.Row(): | |
| gr.Markdown(""" | |
| ### Now edit as text | |
| After running the video transcription, you can make cuts to the text below (only cuts, not additions!)""") | |
| with gr.Row(): | |
| with gr.Column(): | |
| text_in.render() | |
| with gr.Row(): | |
| cut_btn = gr.Button("Cut to video", elem_id="cut_btn") | |
| # send audio path and hidden variables | |
| cut_btn.click( | |
| cut_timestamps_to_video, | |
| [video_in, transcription_var, text_in, timestamps_var], | |
| [diff_out, video_out], | |
| ) | |
| reset_transcription = gr.Button( | |
| "Reset to last trascription", elem_id="reset_btn" | |
| ) | |
| reset_transcription.click(lambda x: x, transcription_var, text_in) | |
| with gr.Column(): | |
| video_out.render() | |
| diff_out.render() | |
| with gr.Row(): | |
| gr.Markdown(""" | |
| #### Video Credits | |
| 1. [Cooking](https://vimeo.com/573792389) | |
| 1. [Shia LaBeouf "Just Do It"](https://www.youtube.com/watch?v=n2lTxIk_Dr0) | |
| 1. [Mark Zuckerberg & Yuval Noah Harari in Conversation](https://www.youtube.com/watch?v=Boj9eD0Wug8) | |
| """) | |
| demo.queue() | |
| if __name__ == "__main__": | |
| demo.launch(debug=True) | |