Spaces:
Running
on
Zero
Running
on
Zero
import spaces | |
import torch | |
import gradio as gr | |
import yt_dlp as youtube_dl | |
from transformers import pipeline | |
from transformers.pipelines.audio_utils import ffmpeg_read | |
import tempfile | |
import os | |
import time | |
# Environment and model configuration | |
hf_token = os.getenv('HF_TOKEN') | |
MODEL_NAME = "nyrahealth/CrisperWhisper" | |
BATCH_SIZE = 8 | |
FILE_LIMIT_MB = 1000 | |
YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files | |
# Device setup | |
device = 0 if torch.cuda.is_available() else "cpu" | |
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
# Timestamp adjustment function | |
def adjust_pauses_for_hf_pipeline_output(pipeline_output, split_threshold=0.12): | |
""" | |
Adjust pause timings by distributing pauses up to the threshold evenly between adjacent words. | |
""" | |
adjusted_chunks = pipeline_output["chunks"].copy() | |
for i in range(len(adjusted_chunks) - 1): | |
current_chunk = adjusted_chunks[i] | |
next_chunk = adjusted_chunks[i + 1] | |
current_start, current_end = current_chunk["timestamp"] | |
next_start, next_end = next_chunk["timestamp"] | |
pause_duration = next_start - current_end | |
if pause_duration > 0: | |
if pause_duration > split_threshold: | |
distribute = split_threshold / 2 | |
else: | |
distribute = pause_duration / 2 | |
# Adjust current chunk end time | |
adjusted_chunks[i]["timestamp"] = (current_start, current_end + distribute) | |
# Adjust next chunk start time | |
adjusted_chunks[i + 1]["timestamp"] = (next_start - distribute, next_end) | |
pipeline_output["chunks"] = adjusted_chunks | |
return pipeline_output | |
# Initialize pipeline | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=MODEL_NAME, | |
token=hf_token, | |
torch_dtype=torch_dtype, | |
chunk_length_s=30, | |
device=device, | |
return_timestamps='word', # Enable word-level timestamps | |
) | |
# Transcribe function for microphone and file inputs | |
def transcribe(inputs, task): | |
if inputs is None: | |
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
# Get full pipeline output | |
raw_output = pipe( | |
inputs, | |
batch_size=BATCH_SIZE, | |
generate_kwargs={"task": task}, | |
return_timestamps='word' | |
) | |
# Apply timestamp adjustment | |
adjusted_output = adjust_pauses_for_hf_pipeline_output(raw_output) | |
# Format output with timestamps | |
formatted_text = "" | |
for chunk in adjusted_output["chunks"]: | |
start = chunk["timestamp"][0] | |
text = chunk["text"] | |
formatted_text += f"[{start:.2f}] {text}\n" | |
return formatted_text | |
# YouTube HTML embed function | |
def _return_yt_html_embed(yt_url): | |
video_id = yt_url.split("?v=")[-1] | |
HTML_str = ( | |
f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>' | |
" </center>" | |
) | |
return HTML_str | |
# YouTube audio download function | |
def download_yt_audio(yt_url, filename): | |
info_loader = youtube_dl.YoutubeDL() | |
try: | |
info = info_loader.extract_info(yt_url, download=False) | |
except youtube_dl.utils.DownloadError as err: | |
raise gr.Error(str(err)) | |
file_length = info["duration_string"] | |
file_h_m_s = file_length.split(":") | |
file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] | |
if len(file_h_m_s) == 1: | |
file_h_m_s.insert(0, 0) | |
if len(file_h_m_s) == 2: | |
file_h_m_s.insert(0, 0) | |
file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] | |
if file_length_s > YT_LENGTH_LIMIT_S: | |
yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) | |
file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) | |
raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.") | |
ydl_opts = {"outtmpl": filename, "format": "worstvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"} | |
with youtube_dl.YoutubeDL(ydl_opts) as ydl: | |
try: | |
ydl.download([yt_url]) | |
except youtube_dl.utils.ExtractorError as err: | |
raise gr.Error(str(err)) | |
# Transcribe function for YouTube inputs | |
def yt_transcribe(yt_url, task, max_filesize=75.0): | |
html_embed_str = _return_yt_html_embed(yt_url) | |
with tempfile.TemporaryDirectory() as tmpdirname: | |
filepath = os.path.join(tmpdirname, "video.mp4") | |
download_yt_audio(yt_url, filepath) | |
with open(filepath, "rb") as f: | |
inputs = f.read() | |
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) | |
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} | |
# Get full pipeline output | |
raw_output = pipe( | |
inputs, | |
batch_size=BATCH_SIZE, | |
generate_kwargs={"task": task}, | |
return_timestamps='word' | |
) | |
# Apply timestamp adjustment | |
adjusted_output = adjust_pauses_for_hf_pipeline_output(raw_output) | |
# Format output with timestamps | |
formatted_text = "" | |
for chunk in adjusted_output["chunks"]: | |
start = chunk["timestamp"][0] | |
text = chunk["text"] | |
formatted_text += f"[{start:.2f}] {text}\n" | |
return html_embed_str, formatted_text | |
# Gradio interface setup | |
demo = gr.Blocks() | |
mf_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.Audio(sources="microphone", type="filepath"), | |
gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
], | |
outputs="text", | |
title="CrisperWhisper: Transcribe Audio as it is", | |
description=( | |
"Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the" | |
f" checkpoint [{MODEL_NAME}](https://huggingface.co./{MODEL_NAME}) and 🤗 Transformers to transcribe audio files" | |
" of arbitrary length." | |
), | |
allow_flagging="never", | |
) | |
file_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.Audio(sources="upload", type="filepath", label="Audio file"), | |
gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
], | |
outputs="text", | |
title="CrisperWhisper: Transcribe Audio as it is", | |
description=( | |
"Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the" | |
f" checkpoint [{MODEL_NAME}](https://huggingface.co./{MODEL_NAME}) and 🤗 Transformers to transcribe audio files" | |
" of arbitrary length." | |
), | |
allow_flagging="never", | |
) | |
yt_transcribe = gr.Interface( | |
fn=yt_transcribe, | |
inputs=[ | |
gr.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"), | |
gr.Radio(["transcribe", "translate"], label="Task", value="transcribe") | |
], | |
outputs=["html", "text"], | |
title="CrisperWhisper: Transcribe Audio as it is", | |
description=( | |
"Transcribe long-form YouTube videos with the click of a button! Demo uses the checkpoint" | |
f" [{MODEL_NAME}](https://huggingface.co./{MODEL_NAME}) and 🤗 Transformers to transcribe video files of" | |
" arbitrary length." | |
), | |
allow_flagging="never", | |
) | |
# Combine interfaces into a tabbed layout | |
with demo: | |
gr.TabbedInterface([mf_transcribe, file_transcribe, yt_transcribe], ["Microphone", "Audio file", "YouTube"]) | |
# Launch the app | |
demo.queue().launch() |