Spaces:
Running
Running
import os | |
import os.path as osp | |
import sys | |
import tempfile | |
from uuid import uuid4 | |
import gradio as gr | |
import soundfile | |
import torch | |
import torch.nn.functional as F | |
from huggingface_hub import snapshot_download | |
from transformers import AutoTokenizer | |
from src.internvl.eval import load_video | |
from src.moviedubber.infer.utils_infer import ( | |
cfg_strength, | |
chunk_text, | |
nfe_step, | |
sway_sampling_coef, | |
) | |
from src.moviedubber.infer.video_preprocess import VideoFeatureExtractor | |
from src.moviedubber.infer_with_mmlm_result import get_spk_emb, get_video_duration, load_models, merge_video_audio | |
from src.moviedubber.model.utils import convert_char_to_pinyin | |
sys.path.insert(0, "src/third_party") | |
sys.path.append("src/third_party/BigVGAN") | |
from InternVL.internvl_chat.internvl.model.internvl_chat.modeling_internvl_chat import InternVLChatModel # type: ignore | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
repo_local_path = snapshot_download(repo_id="woak-oa/DeepDubber-V1") | |
mmlm_path = osp.join(repo_local_path, "mmlm") | |
mmlm = InternVLChatModel.from_pretrained( | |
mmlm_path, | |
torch_dtype=torch.bfloat16, | |
low_cpu_mem_usage=True, | |
use_flash_attn=False, | |
) | |
mmlm = mmlm.eval().to(device) | |
tokenizer = AutoTokenizer.from_pretrained(mmlm_path, trust_remote_code=True, use_fast=False) | |
generation_config = dict(max_new_tokens=1024, do_sample=False) | |
ema_model, vocoder, ort_session = load_models(repo_local_path, device=device) | |
videofeature_extractor = VideoFeatureExtractor(device=device) | |
out_dir = "./output" | |
if not os.path.exists(out_dir): | |
os.makedirs(out_dir) | |
def deepdubber(video_path: str, subtitle_text: str, audio_path: str = None) -> str: | |
pixel_values, num_patches_list = load_video(video_path, num_segments=8, max_num=1) | |
pixel_values = pixel_values.to(torch.bfloat16).to(device) | |
video_prefix = "".join([f"Frame{i + 1}: <image>\n" for i in range(len(num_patches_list))]) | |
question = ( | |
video_prefix | |
+ "What is the voice-over category for this video? Options: A. dialogue, B. monologue, C. narration." | |
) | |
response = mmlm.chat( | |
tokenizer, | |
pixel_values, | |
question, | |
generation_config, | |
num_patches_list=num_patches_list, | |
history=None, | |
return_history=False, | |
) | |
try: | |
response = response.split("<REASONING>")[1].split("</REASONING>")[0].strip() | |
except Exception as e: | |
print(f"Error: {e}, response: {response}") | |
response = response.strip()[0] | |
print(f"Starting deepdubber with video_path: {video_path} and subtitle_text: {subtitle_text}") | |
gen_clip = videofeature_extractor.extract_features(video_path) | |
gen_text = subtitle_text | |
v_dur = get_video_duration(video_path) | |
gen_audio_len = int(v_dur * 24000 // 256) | |
gen_clip = gen_clip.unsqueeze(0).to(device=device, dtype=torch.float32).transpose(1, 2) | |
gen_clip = F.interpolate(gen_clip, size=(gen_audio_len,), mode="linear", align_corners=False).transpose(1, 2) | |
if audio_path is not None: | |
spk_emb = get_spk_emb(audio_path, ort_session) | |
spk_emb = torch.tensor(spk_emb).to(device=device, dtype=torch.float32).unsqueeze(0).unsqueeze(0) | |
else: | |
spk_emb = torch.zeros(1, 1, 256).to(device=device, dtype=torch.float32) | |
gen_text_batches = chunk_text(gen_text, max_chars=1024) | |
final_text_list = convert_char_to_pinyin(gen_text_batches) | |
cond = torch.zeros(1, gen_audio_len, 100).to(device) | |
with torch.inference_mode(): | |
generated, _ = ema_model.sample( | |
cond=cond, | |
text=final_text_list, | |
clip=gen_clip, | |
spk_emb=spk_emb, | |
duration=gen_audio_len, | |
steps=nfe_step, | |
cfg_strength=cfg_strength, | |
sway_sampling_coef=sway_sampling_coef, | |
no_ref_audio=True, | |
) | |
generated = generated.to(torch.float32) | |
generated_mel_spec = generated.permute(0, 2, 1) | |
generated_wave = vocoder(generated_mel_spec) | |
generated_wave = generated_wave.squeeze().cpu().numpy() | |
# using a temporary wav file to save the generated audio | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav", dir="./output") as temp_wav_file: | |
temp_wav_path = temp_wav_file.name | |
soundfile.write(temp_wav_path, generated_wave, samplerate=24000) | |
video_out_path = os.path.join(out_dir, f"dubbed_video_{uuid4[:6]}.mp4") | |
concated_video = merge_video_audio( | |
video_path, temp_wav_path, video_out_path, 0, soundfile.info(temp_wav_path).duration | |
) | |
# Ensure the temporary file is deleted after use | |
os.remove(temp_wav_path) | |
print(f"Deepdubber completed successfully, output path: {concated_video}") | |
return response, concated_video | |
def process_video_dubbing( | |
video_path: str, subtitle_text: str, audio_path: str = None, caption_input: str = None | |
) -> str: | |
try: | |
if not os.path.exists(video_path): | |
raise ValueError("Video file does not exist") | |
if not subtitle_text.strip(): | |
raise ValueError("Subtitle text cannot be empty") | |
if audio_path is None: | |
audio_path = "datasets/CoTMovieDubbing/GT.wav" | |
print(f"Processing video: {video_path}") | |
res, output_path = deepdubber(video_path, subtitle_text, audio_path) | |
return res, output_path | |
except Exception as e: | |
print(f"Error in process_video_dubbing: {e}") | |
return None, None | |
def create_ui(): | |
with gr.Blocks(title="DeepDubber-V1") as app: | |
gr.Markdown("# DeepDubber-V1\nUpload your video file and enter the subtitle you want to dub") | |
with gr.Row(): | |
video_input = gr.Video(label="Upload video") | |
subtitle_input = gr.Textbox( | |
label="Enter the subtitle", placeholder="Enter the subtitle to be dubbed...", lines=5 | |
) | |
audio_input = gr.Audio(label="Upload speech prompt (Optional)", type="filepath") | |
# caption_input = gr.Textbox(label="Enter the description of Video (Optional)", lines=1) | |
process_btn = gr.Button("Start Dubbing") | |
with gr.Row(): | |
output_response = gr.Textbox(label="Response", placeholder="Response from MMLM", lines=5) | |
output_video = gr.Video(label="Dubbed Video") | |
# add some examples | |
examples = [ | |
[ | |
"datasets/CoTMovieDubbing/demo/v01input.mp4", | |
"it isn't simply a question of creating a robot who can love", | |
"datasets/CoTMovieDubbing/demo/speech_prompt_01.mp3", | |
# "datasets/CoTMovieDubbing/demo/speech_prompt_01.mp3", | |
], | |
[ | |
"datasets/CoTMovieDubbing/demo/v02input.mp4", | |
"Me, I'd be happy with one who's not... fixed.", | |
"datasets/CoTMovieDubbing/demo/speech_prompt_02.mp3", | |
# "datasets/CoTMovieDubbing/demo/speech_prompt_02.mp3", | |
], | |
[ | |
"datasets/CoTMovieDubbing/demo/v03input.mp4", | |
"Man, Papi. What am I gonna do?", | |
"datasets/CoTMovieDubbing/demo/speech_prompt_03.mp3", | |
# "datasets/CoTMovieDubbing/demo/speech_prompt_02.mp3", | |
], | |
] | |
process_btn.click( | |
fn=process_video_dubbing, | |
inputs=[video_input, subtitle_input, audio_input], | |
outputs=[output_response, output_video], | |
) | |
# gr.Examples(examples=examples, inputs=[video_input, subtitle_input, audio_input, caption_input]) | |
gr.Examples(examples=examples, inputs=[video_input, subtitle_input, audio_input]) | |
return app | |
if __name__ == "__main__": | |
app = create_ui() | |
app.launch(allowed_paths=["./output", "./datasets"]) | |