radames's picture
break it apart
9982a93
import torch
from transformers import (
Wav2Vec2ForCTC,
Wav2Vec2Processor,
AutomaticSpeechRecognitionPipeline,
)
import gradio as gr
import json
from difflib import Differ
import ffmpeg
from pathlib import Path
import spaces
# Set true if you're using huggingface inference API API https://huggingface.co/inference-api
API_BACKEND = True
# MODEL = 'facebook/wav2vec2-large-960h-lv60-self'
MODEL = "facebook/wav2vec2-large-960h"
# MODEL = "facebook/wav2vec2-base-960h"
# MODEL = "patrickvonplaten/wav2vec2-large-960h-lv60-self-4-gram"
# Load model and processor for manual processing (Spaces Zero compatible)
model = Wav2Vec2ForCTC.from_pretrained(MODEL).to("cuda")
processor = Wav2Vec2Processor.from_pretrained(MODEL)
# Create pipeline with pre-loaded model and processor
speech_recognizer = AutomaticSpeechRecognitionPipeline(
model=model,
feature_extractor=processor.feature_extractor,
tokenizer=processor.tokenizer,
device=0, # Use first CUDA device
)
videos_out_path = Path("./videos_out")
videos_out_path.mkdir(parents=True, exist_ok=True)
samples_data = sorted(Path("examples").glob("*.json"))
SAMPLES = []
for file in samples_data:
with open(file) as f:
sample = json.load(f)
SAMPLES.append(sample)
VIDEOS = list(map(lambda x: [x["video"]], SAMPLES))
@spaces.GPU(duration=120)
def speech_to_text(video_file_path):
"""
Takes a video path to convert to audio, transcribe audio channel to text and char timestamps
Using AutomaticSpeechRecognitionPipeline with pre-loaded model for Spaces Zero compatibility
"""
if video_file_path == None:
raise ValueError("Error no video input")
video_path = Path(video_file_path)
try:
# convert video to audio 16k using PIPE to audio_memory
audio_memory, _ = (
ffmpeg.input(video_path)
.output("-", format="wav", ac=1, ar="16k")
.overwrite_output()
.global_args("-loglevel", "quiet")
.run(capture_stdout=True)
)
except Exception as e:
raise RuntimeError("Error converting video to audio")
try:
print("Transcribing via local model")
output = speech_recognizer(
audio_memory,
return_timestamps="char",
chunk_length_s=10,
stride_length_s=(4, 2),
)
transcription = output["text"].lower()
timestamps = [
[
chunk["text"].lower(),
chunk["timestamp"][0].tolist(),
chunk["timestamp"][1].tolist(),
]
for chunk in output["chunks"]
]
return (transcription, transcription, timestamps)
except Exception as e:
raise RuntimeError("Error Running inference with local model", e)
def cut_timestamps_to_video(video_in, transcription, text_in, timestamps):
"""
Given original video input, text transcript + timestamps,
and edit ext cuts video segments into a single video
"""
video_path = Path(video_in)
video_file_name = video_path.stem
if video_in == None or text_in == None or transcription == None:
raise ValueError("Inputs undefined")
d = Differ()
# compare original transcription with edit text
diff_chars = d.compare(transcription, text_in)
# remove all text aditions from diff
filtered = list(filter(lambda x: x[0] != "+", diff_chars))
# filter timestamps to be removed
# timestamps_to_cut = [b for (a,b) in zip(filtered, timestamps_var) if a[0]== '-' ]
# return diff tokes and cutted video!!
# groupping character timestamps so there are less cuts
idx = 0
grouped = {}
for a, b in zip(filtered, timestamps):
if a[0] != "-":
if idx in grouped:
grouped[idx].append(b)
else:
grouped[idx] = []
grouped[idx].append(b)
else:
idx += 1
# after grouping, gets the lower and upter start and time for each group
timestamps_to_cut = [[v[0][1], v[-1][2]] for v in grouped.values()]
between_str = "+".join(
map(lambda t: f"between(t,{t[0]},{t[1]})", timestamps_to_cut)
)
if timestamps_to_cut:
video_file = ffmpeg.input(video_in)
video = video_file.video.filter("select", f"({between_str})").filter(
"setpts", "N/FRAME_RATE/TB"
)
audio = video_file.audio.filter("aselect", f"({between_str})").filter(
"asetpts", "N/SR/TB"
)
output_video = f"./videos_out/{video_file_name}.mp4"
ffmpeg.concat(video, audio, v=1, a=1).output(
output_video
).overwrite_output().global_args("-loglevel", "quiet").run()
else:
output_video = video_in
tokens = [(token[2:], token[0] if token[0] != " " else None) for token in filtered]
return (tokens, output_video)
# ---- Gradio Layout -----
video_in = gr.Video(label="Video file", elem_id="video-container")
text_in = gr.Textbox(label="Transcription", lines=10, interactive=True)
video_out = gr.Video(label="Video Out")
diff_out = gr.HighlightedText(label="Cuts Diffs", combine_adjacent=True)
examples = gr.Dataset(components=[video_in], samples=VIDEOS, type="index")
css = """
#cut_btn, #reset_btn { align-self:stretch; }
#\\31 3 { max-width: 540px; }
.output-markdown {max-width: 65ch !important;}
#video-container{
max-width: 40rem;
}
"""
with gr.Blocks(css=css) as demo:
transcription_var = gr.State()
timestamps_var = gr.State()
with gr.Row():
with gr.Column():
gr.Markdown("""
# Edit Video By Editing Text
This project is a quick proof of concept of a simple video editor where the edits
are made by editing the audio transcription.
Using the [Huggingface Automatic Speech Recognition Pipeline](https://huggingface.co/tasks/automatic-speech-recognition)
with a fine tuned [Wav2Vec2 model using Connectionist Temporal Classification (CTC)](https://huggingface.co/facebook/wav2vec2-large-960h-lv60-self)
you can predict not only the text transcription but also the [character or word base timestamps](https://huggingface.co/docs/transformers/v4.19.2/en/main_classes/pipelines#transformers.AutomaticSpeechRecognitionPipeline.__call__.return_timestamps)
""")
with gr.Row():
examples.render()
def load_example(id):
video = SAMPLES[id]["video"]
transcription = SAMPLES[id]["transcription"].lower()
timestamps = SAMPLES[id]["timestamps"]
return (video, transcription, transcription, timestamps)
examples.click(
load_example,
inputs=[examples],
outputs=[video_in, text_in, transcription_var, timestamps_var],
queue=False,
)
with gr.Row():
with gr.Column():
video_in.render()
transcribe_btn = gr.Button("Transcribe Audio")
transcribe_btn.click(
speech_to_text, [video_in], [text_in, transcription_var, timestamps_var]
)
with gr.Row():
gr.Markdown("""
### Now edit as text
After running the video transcription, you can make cuts to the text below (only cuts, not additions!)""")
with gr.Row():
with gr.Column():
text_in.render()
with gr.Row():
cut_btn = gr.Button("Cut to video", elem_id="cut_btn")
# send audio path and hidden variables
cut_btn.click(
cut_timestamps_to_video,
[video_in, transcription_var, text_in, timestamps_var],
[diff_out, video_out],
)
reset_transcription = gr.Button(
"Reset to last trascription", elem_id="reset_btn"
)
reset_transcription.click(lambda x: x, transcription_var, text_in)
with gr.Column():
video_out.render()
diff_out.render()
with gr.Row():
gr.Markdown("""
#### Video Credits
1. [Cooking](https://vimeo.com/573792389)
1. [Shia LaBeouf "Just Do It"](https://www.youtube.com/watch?v=n2lTxIk_Dr0)
1. [Mark Zuckerberg & Yuval Noah Harari in Conversation](https://www.youtube.com/watch?v=Boj9eD0Wug8)
""")
demo.queue()
if __name__ == "__main__":
demo.launch(debug=True)