update to v1.1
Browse files
app.py
CHANGED
|
@@ -7,21 +7,16 @@ from typing import Optional, List, Dict, Any
|
|
| 7 |
import torch
|
| 8 |
import gradio as gr
|
| 9 |
import yt_dlp as youtube_dl
|
| 10 |
-
import numpy as np
|
| 11 |
from transformers import pipeline
|
| 12 |
from transformers.pipelines.audio_utils import ffmpeg_read
|
| 13 |
-
from punctuators.models import PunctCapSegModelONNX
|
| 14 |
-
from stable_whisper import WhisperResult
|
| 15 |
|
| 16 |
|
| 17 |
# configuration
|
| 18 |
-
MODEL_NAME = "kotoba-tech/kotoba-whisper-v1.
|
| 19 |
BATCH_SIZE = 16
|
| 20 |
CHUNK_LENGTH_S = 15
|
| 21 |
FILE_LIMIT_MB = 1000
|
| 22 |
YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files
|
| 23 |
-
|
| 24 |
-
|
| 25 |
# device setting
|
| 26 |
if torch.cuda.is_available():
|
| 27 |
torch_dtype = torch.bfloat16
|
|
@@ -31,117 +26,19 @@ else:
|
|
| 31 |
torch_dtype = torch.float32
|
| 32 |
device = "cpu"
|
| 33 |
model_kwargs = {}
|
| 34 |
-
|
| 35 |
# define the pipeline
|
| 36 |
pipe = pipeline(
|
| 37 |
-
task="automatic-speech-recognition",
|
| 38 |
model=MODEL_NAME,
|
| 39 |
chunk_length_s=CHUNK_LENGTH_S,
|
| 40 |
batch_size=BATCH_SIZE,
|
| 41 |
torch_dtype=torch_dtype,
|
| 42 |
device=device,
|
| 43 |
-
model_kwargs=model_kwargs
|
|
|
|
|
|
|
| 44 |
)
|
| 45 |
|
| 46 |
|
| 47 |
-
class Punctuator:
|
| 48 |
-
|
| 49 |
-
ja_punctuations = ["!", "?", "γ", "γ"]
|
| 50 |
-
|
| 51 |
-
def __init__(self, model: str = "pcs_47lang"):
|
| 52 |
-
self.punctuation_model = PunctCapSegModelONNX.from_pretrained(model)
|
| 53 |
-
|
| 54 |
-
def punctuate(self, pipeline_chunk: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
| 55 |
-
|
| 56 |
-
def validate_punctuation(raw: str, punctuated: str):
|
| 57 |
-
if 'unk' in punctuated:
|
| 58 |
-
return raw
|
| 59 |
-
if punctuated.count("γ") > 1:
|
| 60 |
-
ind = punctuated.rfind("γ")
|
| 61 |
-
punctuated = punctuated.replace("γ", "")
|
| 62 |
-
punctuated = punctuated[:ind] + "γ" + punctuated[ind:]
|
| 63 |
-
return punctuated
|
| 64 |
-
|
| 65 |
-
text_edit = self.punctuation_model.infer([c['text'] for c in pipeline_chunk])
|
| 66 |
-
return [
|
| 67 |
-
{
|
| 68 |
-
'timestamp': c['timestamp'],
|
| 69 |
-
'text': validate_punctuation(c['text'], "".join(e))
|
| 70 |
-
} for c, e in zip(pipeline_chunk, text_edit)
|
| 71 |
-
]
|
| 72 |
-
|
| 73 |
-
|
| 74 |
-
PUNCTUATOR = Punctuator()
|
| 75 |
-
|
| 76 |
-
|
| 77 |
-
def _fix_timestamp(sample_rate: int, result: List[Dict[str, Any]], audio: np.ndarray) -> WhisperResult or None:
|
| 78 |
-
|
| 79 |
-
def replace_none_ts(parts):
|
| 80 |
-
total_dur = round(audio.shape[-1] / sample_rate, 3)
|
| 81 |
-
_medium_dur = _ts_nonzero_mask = None
|
| 82 |
-
|
| 83 |
-
def ts_nonzero_mask() -> np.ndarray:
|
| 84 |
-
nonlocal _ts_nonzero_mask
|
| 85 |
-
if _ts_nonzero_mask is None:
|
| 86 |
-
_ts_nonzero_mask = np.array([(p['end'] or p['start']) is not None for p in parts])
|
| 87 |
-
return _ts_nonzero_mask
|
| 88 |
-
|
| 89 |
-
def medium_dur() -> float:
|
| 90 |
-
nonlocal _medium_dur
|
| 91 |
-
if _medium_dur is None:
|
| 92 |
-
nonzero_dus = [p['end'] - p['start'] for p in parts if None not in (p['end'], p['start'])]
|
| 93 |
-
nonzero_durs = np.array(nonzero_dus)
|
| 94 |
-
_medium_dur = np.median(nonzero_durs) * 2 if len(nonzero_durs) else 2.0
|
| 95 |
-
return _medium_dur
|
| 96 |
-
|
| 97 |
-
def _curr_max_end(start: float, next_idx: float) -> float:
|
| 98 |
-
max_end = total_dur
|
| 99 |
-
if next_idx != len(parts):
|
| 100 |
-
mask = np.flatnonzero(ts_nonzero_mask()[next_idx:])
|
| 101 |
-
if len(mask):
|
| 102 |
-
_part = parts[mask[0]+next_idx]
|
| 103 |
-
max_end = _part['start'] or _part['end']
|
| 104 |
-
|
| 105 |
-
new_end = round(start + medium_dur(), 3)
|
| 106 |
-
if new_end > max_end:
|
| 107 |
-
return max_end
|
| 108 |
-
return new_end
|
| 109 |
-
|
| 110 |
-
for i, part in enumerate(parts, 1):
|
| 111 |
-
if part['start'] is None:
|
| 112 |
-
is_first = i == 1
|
| 113 |
-
if is_first:
|
| 114 |
-
new_start = round((part['end'] or 0) - medium_dur(), 3)
|
| 115 |
-
part['start'] = max(new_start, 0.0)
|
| 116 |
-
else:
|
| 117 |
-
part['start'] = parts[i - 2]['end']
|
| 118 |
-
if part['end'] is None:
|
| 119 |
-
no_next_start = i == len(parts) or parts[i]['start'] is None
|
| 120 |
-
part['end'] = _curr_max_end(part['start'], i) if no_next_start else parts[i]['start']
|
| 121 |
-
|
| 122 |
-
words = [dict(start=word['timestamp'][0], end=word['timestamp'][1], word=word['text']) for word in result]
|
| 123 |
-
replace_none_ts(words)
|
| 124 |
-
return WhisperResult([words], force_order=True, check_sorted=True)
|
| 125 |
-
|
| 126 |
-
|
| 127 |
-
def fix_timestamp(pipeline_output: List[Dict[str, Any]], audio: np.ndarray, sample_rate: int) -> List[Dict[str, Any]]:
|
| 128 |
-
result = _fix_timestamp(sample_rate=sample_rate, audio=audio, result=pipeline_output)
|
| 129 |
-
result.adjust_by_silence(
|
| 130 |
-
audio,
|
| 131 |
-
q_levels=20,
|
| 132 |
-
k_size=5,
|
| 133 |
-
sample_rate=sample_rate,
|
| 134 |
-
min_word_dur=None,
|
| 135 |
-
word_level=True,
|
| 136 |
-
verbose=True,
|
| 137 |
-
nonspeech_error=0.1,
|
| 138 |
-
use_word_position=True
|
| 139 |
-
)
|
| 140 |
-
if result.has_words:
|
| 141 |
-
result.regroup(True)
|
| 142 |
-
return [{"timestamp": [s.start, s.end], "text": s.text} for s in result.segments]
|
| 143 |
-
|
| 144 |
-
|
| 145 |
def format_time(start: Optional[float], end: Optional[float]):
|
| 146 |
|
| 147 |
def _format_time(seconds: Optional[float]):
|
|
@@ -157,17 +54,11 @@ def format_time(start: Optional[float], end: Optional[float]):
|
|
| 157 |
return f"[{_format_time(start)}-> {_format_time(end)}]:"
|
| 158 |
|
| 159 |
|
| 160 |
-
def get_prediction(inputs, prompt: Optional[str]
|
| 161 |
generate_kwargs = {"language": "japanese", "task": "transcribe"}
|
| 162 |
if prompt:
|
| 163 |
generate_kwargs['prompt_ids'] = pipe.tokenizer.get_prompt_ids(prompt, return_tensors='pt').to(device)
|
| 164 |
-
array = inputs["array"]
|
| 165 |
-
sr = inputs["sampling_rate"]
|
| 166 |
prediction = pipe(inputs, return_timestamps=True, generate_kwargs=generate_kwargs)
|
| 167 |
-
if stabilize_timestamp:
|
| 168 |
-
prediction['chunks'] = fix_timestamp(pipeline_output=prediction['chunks'], audio=array, sample_rate=sr)
|
| 169 |
-
if punctuate_text:
|
| 170 |
-
prediction['chunks'] = PUNCTUATOR.punctuate(prediction['chunks'])
|
| 171 |
text = "".join([c['text'] for c in prediction['chunks']])
|
| 172 |
text_timestamped = "\n".join([
|
| 173 |
f"{format_time(*c['timestamp'])} {c['text']}" for c in prediction['chunks']
|
|
@@ -175,14 +66,14 @@ def get_prediction(inputs, prompt: Optional[str], punctuate_text: bool = True, s
|
|
| 175 |
return text, text_timestamped
|
| 176 |
|
| 177 |
|
| 178 |
-
def transcribe(inputs: str, prompt
|
| 179 |
if inputs is None:
|
| 180 |
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.")
|
| 181 |
with open(inputs, "rb") as f:
|
| 182 |
inputs = f.read()
|
| 183 |
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate)
|
| 184 |
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate}
|
| 185 |
-
return get_prediction(inputs, prompt
|
| 186 |
|
| 187 |
|
| 188 |
def _return_yt_html_embed(yt_url):
|
|
@@ -216,7 +107,7 @@ def download_yt_audio(yt_url, filename):
|
|
| 216 |
raise gr.Error(str(err))
|
| 217 |
|
| 218 |
|
| 219 |
-
def yt_transcribe(yt_url, prompt
|
| 220 |
html_embed_str = _return_yt_html_embed(yt_url)
|
| 221 |
with tempfile.TemporaryDirectory() as tmpdirname:
|
| 222 |
filepath = os.path.join(tmpdirname, "video.mp4")
|
|
@@ -225,7 +116,7 @@ def yt_transcribe(yt_url, prompt, punctuate_text: bool = True, stabilize_timesta
|
|
| 225 |
inputs = f.read()
|
| 226 |
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate)
|
| 227 |
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate}
|
| 228 |
-
text, text_timestamped = get_prediction(inputs, prompt
|
| 229 |
return html_embed_str, text, text_timestamped
|
| 230 |
|
| 231 |
|
|
@@ -235,8 +126,6 @@ mf_transcribe = gr.Interface(
|
|
| 235 |
inputs=[
|
| 236 |
gr.inputs.Audio(source="microphone", type="filepath", optional=True),
|
| 237 |
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True),
|
| 238 |
-
gr.inputs.Checkbox(default=True, label="Add punctuations"),
|
| 239 |
-
gr.inputs.Checkbox(default=True, label="Stabilize timestamp")
|
| 240 |
],
|
| 241 |
outputs=["text", "text"],
|
| 242 |
layout="horizontal",
|
|
@@ -251,8 +140,6 @@ file_transcribe = gr.Interface(
|
|
| 251 |
inputs=[
|
| 252 |
gr.inputs.Audio(source="upload", type="filepath", optional=True, label="Audio file"),
|
| 253 |
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True),
|
| 254 |
-
gr.inputs.Checkbox(default=True, label="Add punctuations"),
|
| 255 |
-
gr.inputs.Checkbox(default=True, label="Stabilize timestamp")
|
| 256 |
],
|
| 257 |
outputs=["text", "text"],
|
| 258 |
layout="horizontal",
|
|
@@ -266,8 +153,6 @@ yt_transcribe = gr.Interface(
|
|
| 266 |
inputs=[
|
| 267 |
gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"),
|
| 268 |
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True),
|
| 269 |
-
gr.inputs.Checkbox(default=True, label="Add punctuations"),
|
| 270 |
-
gr.inputs.Checkbox(default=True, label="Stabilize timestamp")
|
| 271 |
],
|
| 272 |
outputs=["html", "text", "text"],
|
| 273 |
layout="horizontal",
|
|
|
|
| 7 |
import torch
|
| 8 |
import gradio as gr
|
| 9 |
import yt_dlp as youtube_dl
|
|
|
|
| 10 |
from transformers import pipeline
|
| 11 |
from transformers.pipelines.audio_utils import ffmpeg_read
|
|
|
|
|
|
|
| 12 |
|
| 13 |
|
| 14 |
# configuration
|
| 15 |
+
MODEL_NAME = "kotoba-tech/kotoba-whisper-v1.1"
|
| 16 |
BATCH_SIZE = 16
|
| 17 |
CHUNK_LENGTH_S = 15
|
| 18 |
FILE_LIMIT_MB = 1000
|
| 19 |
YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files
|
|
|
|
|
|
|
| 20 |
# device setting
|
| 21 |
if torch.cuda.is_available():
|
| 22 |
torch_dtype = torch.bfloat16
|
|
|
|
| 26 |
torch_dtype = torch.float32
|
| 27 |
device = "cpu"
|
| 28 |
model_kwargs = {}
|
|
|
|
| 29 |
# define the pipeline
|
| 30 |
pipe = pipeline(
|
|
|
|
| 31 |
model=MODEL_NAME,
|
| 32 |
chunk_length_s=CHUNK_LENGTH_S,
|
| 33 |
batch_size=BATCH_SIZE,
|
| 34 |
torch_dtype=torch_dtype,
|
| 35 |
device=device,
|
| 36 |
+
model_kwargs=model_kwargs,
|
| 37 |
+
punctuator=True,
|
| 38 |
+
stable_ts=True,
|
| 39 |
)
|
| 40 |
|
| 41 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 42 |
def format_time(start: Optional[float], end: Optional[float]):
|
| 43 |
|
| 44 |
def _format_time(seconds: Optional[float]):
|
|
|
|
| 54 |
return f"[{_format_time(start)}-> {_format_time(end)}]:"
|
| 55 |
|
| 56 |
|
| 57 |
+
def get_prediction(inputs, prompt: Optional[str]):
|
| 58 |
generate_kwargs = {"language": "japanese", "task": "transcribe"}
|
| 59 |
if prompt:
|
| 60 |
generate_kwargs['prompt_ids'] = pipe.tokenizer.get_prompt_ids(prompt, return_tensors='pt').to(device)
|
|
|
|
|
|
|
| 61 |
prediction = pipe(inputs, return_timestamps=True, generate_kwargs=generate_kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
| 62 |
text = "".join([c['text'] for c in prediction['chunks']])
|
| 63 |
text_timestamped = "\n".join([
|
| 64 |
f"{format_time(*c['timestamp'])} {c['text']}" for c in prediction['chunks']
|
|
|
|
| 66 |
return text, text_timestamped
|
| 67 |
|
| 68 |
|
| 69 |
+
def transcribe(inputs: str, prompt):
|
| 70 |
if inputs is None:
|
| 71 |
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.")
|
| 72 |
with open(inputs, "rb") as f:
|
| 73 |
inputs = f.read()
|
| 74 |
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate)
|
| 75 |
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate}
|
| 76 |
+
return get_prediction(inputs, prompt)
|
| 77 |
|
| 78 |
|
| 79 |
def _return_yt_html_embed(yt_url):
|
|
|
|
| 107 |
raise gr.Error(str(err))
|
| 108 |
|
| 109 |
|
| 110 |
+
def yt_transcribe(yt_url, prompt):
|
| 111 |
html_embed_str = _return_yt_html_embed(yt_url)
|
| 112 |
with tempfile.TemporaryDirectory() as tmpdirname:
|
| 113 |
filepath = os.path.join(tmpdirname, "video.mp4")
|
|
|
|
| 116 |
inputs = f.read()
|
| 117 |
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate)
|
| 118 |
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate}
|
| 119 |
+
text, text_timestamped = get_prediction(inputs, prompt)
|
| 120 |
return html_embed_str, text, text_timestamped
|
| 121 |
|
| 122 |
|
|
|
|
| 126 |
inputs=[
|
| 127 |
gr.inputs.Audio(source="microphone", type="filepath", optional=True),
|
| 128 |
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True),
|
|
|
|
|
|
|
| 129 |
],
|
| 130 |
outputs=["text", "text"],
|
| 131 |
layout="horizontal",
|
|
|
|
| 140 |
inputs=[
|
| 141 |
gr.inputs.Audio(source="upload", type="filepath", optional=True, label="Audio file"),
|
| 142 |
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True),
|
|
|
|
|
|
|
| 143 |
],
|
| 144 |
outputs=["text", "text"],
|
| 145 |
layout="horizontal",
|
|
|
|
| 153 |
inputs=[
|
| 154 |
gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"),
|
| 155 |
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True),
|
|
|
|
|
|
|
| 156 |
],
|
| 157 |
outputs=["html", "text", "text"],
|
| 158 |
layout="horizontal",
|