Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import sys | |
import importlib.util | |
import site | |
import json | |
import torch | |
import gradio as gr | |
import torchaudio | |
import numpy as np | |
from huggingface_hub import snapshot_download, hf_hub_download | |
import subprocess | |
import re | |
import spaces | |
# 创建一个全局变量来跟踪已下载的资源 | |
# Create a global variable to track downloaded resources | |
downloaded_resources = { | |
"configs": False, | |
"tokenizer_vq32": False, | |
"tokenizer_vq8192": False, | |
"ar_Vq32ToVq8192": False, | |
"ar_PhoneToVq8192": False, | |
"fmt_Vq8192ToMels": False, | |
"vocoder": False | |
} | |
def install_espeak(): | |
"""Detect and install espeak-ng dependency""" | |
try: | |
# Check if espeak-ng is already installed | |
result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True) | |
if result.returncode != 0: | |
print("Detected espeak-ng not installed in the system, attempting to install...") | |
# Try to install espeak-ng and its data using apt-get | |
subprocess.run(["apt-get", "update"], check=True) | |
# Install espeak-ng and the corresponding language data package | |
subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True) | |
print("espeak-ng and its data packages installed successfully!") | |
else: | |
print("espeak-ng is already installed in the system.") | |
# Even if already installed, try to update data to ensure integrity (optional but sometimes helpful) | |
# print("Attempting to update espeak-ng data...") | |
# subprocess.run(["apt-get", "update"], check=True) | |
# subprocess.run(["apt-get", "install", "--only-upgrade", "-y", "espeak-ng-data"], check=True) | |
# Verify Chinese support (optional) | |
try: | |
voices_result = subprocess.run(["espeak-ng", "--voices=cmn"], capture_output=True, text=True, check=True) | |
if "cmn" in voices_result.stdout: | |
print("espeak-ng supports 'cmn' language.") | |
else: | |
print("Warning: espeak-ng is installed, but 'cmn' language still seems unavailable.") | |
except Exception as e: | |
print(f"Error verifying espeak-ng Chinese support (may not affect functionality): {e}") | |
except Exception as e: | |
print(f"Error installing espeak-ng: {e}") | |
print("Please try to run manually: apt-get update && apt-get install -y espeak-ng espeak-ng-data") | |
# Install espeak before all other operations | |
install_espeak() | |
def patch_langsegment_init(): | |
try: | |
# Try to find the location of the LangSegment package | |
spec = importlib.util.find_spec("LangSegment") | |
if spec is None or spec.origin is None: | |
print("Unable to locate LangSegment package.") | |
return | |
# Build the path to __init__.py | |
init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py') | |
if not os.path.exists(init_path): | |
print(f"LangSegment __init__.py file not found at: {init_path}") | |
# Try to find in site-packages, applicable in some environments | |
for site_pkg_path in site.getsitepackages(): | |
potential_path = os.path.join(site_pkg_path, 'LangSegment', '__init__.py') | |
if os.path.exists(potential_path): | |
init_path = potential_path | |
print(f"Found __init__.py in site-packages: {init_path}") | |
break | |
else: # If the loop ends normally (no break) | |
print(f"Also unable to find __init__.py in site-packages") | |
return | |
print(f"Attempting to read LangSegment __init__.py: {init_path}") | |
with open(init_path, 'r') as f: | |
lines = f.readlines() | |
modified = False | |
new_lines = [] | |
target_line_prefix = "from .LangSegment import" | |
for line in lines: | |
stripped_line = line.strip() | |
if stripped_line.startswith(target_line_prefix): | |
if 'setLangfilters' in stripped_line or 'getLangfilters' in stripped_line: | |
print(f"Found line that needs modification: {stripped_line}") | |
# Remove setLangfilters and getLangfilters | |
modified_line = stripped_line.replace(',setLangfilters', '') | |
modified_line = modified_line.replace(',getLangfilters', '') | |
# Ensure comma handling is correct (e.g., if they are the last items) | |
modified_line = modified_line.replace('setLangfilters,', '') | |
modified_line = modified_line.replace('getLangfilters,', '') | |
# If they are the only extra imports, remove any redundant commas | |
modified_line = modified_line.rstrip(',') | |
new_lines.append(modified_line + '\n') | |
modified = True | |
print(f"Modified line: {modified_line.strip()}") | |
else: | |
new_lines.append(line) # Line is fine, keep as is | |
else: | |
new_lines.append(line) # Non-target line, keep as is | |
if modified: | |
print(f"Attempting to write back modified LangSegment __init__.py to: {init_path}") | |
try: | |
with open(init_path, 'w') as f: | |
f.writelines(new_lines) | |
print("LangSegment __init__.py modified successfully.") | |
# Try to reload the module to make changes effective (may not work, depending on import chain) | |
try: | |
import LangSegment | |
importlib.reload(LangSegment) | |
print("LangSegment module has been attempted to reload.") | |
except Exception as reload_e: | |
print(f"Error reloading LangSegment (may have no impact): {reload_e}") | |
except PermissionError: | |
print(f"Error: Insufficient permissions to modify {init_path}. Consider modifying requirements.txt.") | |
except Exception as write_e: | |
print(f"Other error occurred when writing LangSegment __init__.py: {write_e}") | |
else: | |
print("LangSegment __init__.py doesn't need modification.") | |
except ImportError: | |
print("LangSegment package not found, unable to fix.") | |
except Exception as e: | |
print(f"Unexpected error occurred when fixing LangSegment package: {e}") | |
# Execute the fix before all other imports (especially Amphion) that might trigger LangSegment | |
patch_langsegment_init() | |
# Clone Amphion repository | |
if not os.path.exists("Amphion"): | |
subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"]) | |
os.chdir("Amphion") | |
else: | |
if not os.getcwd().endswith("Amphion"): | |
os.chdir("Amphion") | |
# Add Amphion to the path | |
if os.path.dirname(os.path.abspath("Amphion")) not in sys.path: | |
sys.path.append(os.path.dirname(os.path.abspath("Amphion"))) | |
# Ensure needed directories exist | |
os.makedirs("wav", exist_ok=True) | |
os.makedirs("ckpts/Vevo", exist_ok=True) | |
from models.vc.vevo.vevo_utils import VevoInferencePipeline, save_audio, load_wav | |
# Download and setup config files | |
def setup_configs(): | |
if downloaded_resources["configs"]: | |
print("Config files already downloaded, skipping...") | |
return | |
config_path = "models/vc/vevo/config" | |
os.makedirs(config_path, exist_ok=True) | |
config_files = [ | |
"PhoneToVq8192.json", | |
"Vocoder.json", | |
"Vq32ToVq8192.json", | |
"Vq8192ToMels.json", | |
"hubert_large_l18_c32.yaml", | |
] | |
for file in config_files: | |
file_path = f"{config_path}/{file}" | |
if not os.path.exists(file_path): | |
try: | |
file_data = hf_hub_download( | |
repo_id="amphion/Vevo", | |
filename=f"config/{file}", | |
repo_type="model", | |
) | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
# Copy file to target location | |
subprocess.run(["cp", file_data, file_path]) | |
except Exception as e: | |
print(f"Error downloading config file {file}: {e}") | |
downloaded_resources["configs"] = True | |
setup_configs() | |
# Device configuration | |
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") | |
print(f"Using device: {device}") | |
# Initialize pipeline dictionary | |
inference_pipelines = {} | |
# Download all necessary model resources at startup | |
def preload_all_resources(): | |
print("Preloading all model resources...") | |
# Download configuration files | |
setup_configs() | |
# Store the downloaded model paths | |
global downloaded_content_tokenizer_path | |
global downloaded_content_style_tokenizer_path | |
global downloaded_ar_vq32_path | |
global downloaded_ar_phone_path | |
global downloaded_fmt_path | |
global downloaded_vocoder_path | |
# Download Content Tokenizer (vq32) | |
if not downloaded_resources["tokenizer_vq32"]: | |
print("Preloading Content Tokenizer (vq32)...") | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["tokenizer/vq32/*"], | |
) | |
downloaded_content_tokenizer_path = local_dir | |
downloaded_resources["tokenizer_vq32"] = True | |
print("Content Tokenizer (vq32) download completed") | |
# Download Content-Style Tokenizer (vq8192) | |
if not downloaded_resources["tokenizer_vq8192"]: | |
print("Preloading Content-Style Tokenizer (vq8192)...") | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["tokenizer/vq8192/*"], | |
) | |
downloaded_content_style_tokenizer_path = local_dir | |
downloaded_resources["tokenizer_vq8192"] = True | |
print("Content-Style Tokenizer (vq8192) download completed") | |
# Download Autoregressive Transformer (Vq32ToVq8192) | |
if not downloaded_resources["ar_Vq32ToVq8192"]: | |
print("Preloading Autoregressive Transformer (Vq32ToVq8192)...") | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["contentstyle_modeling/Vq32ToVq8192/*"], | |
) | |
downloaded_ar_vq32_path = local_dir | |
downloaded_resources["ar_Vq32ToVq8192"] = True | |
print("Autoregressive Transformer (Vq32ToVq8192) download completed") | |
# Download Autoregressive Transformer (PhoneToVq8192) | |
if not downloaded_resources["ar_PhoneToVq8192"]: | |
print("Preloading Autoregressive Transformer (PhoneToVq8192)...") | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["contentstyle_modeling/PhoneToVq8192/*"], | |
) | |
downloaded_ar_phone_path = local_dir | |
downloaded_resources["ar_PhoneToVq8192"] = True | |
print("Autoregressive Transformer (PhoneToVq8192) download completed") | |
# Download Flow Matching Transformer | |
if not downloaded_resources["fmt_Vq8192ToMels"]: | |
print("Preloading Flow Matching Transformer (Vq8192ToMels)...") | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["acoustic_modeling/Vq8192ToMels/*"], | |
) | |
downloaded_fmt_path = local_dir | |
downloaded_resources["fmt_Vq8192ToMels"] = True | |
print("Flow Matching Transformer (Vq8192ToMels) download completed") | |
# Download Vocoder | |
if not downloaded_resources["vocoder"]: | |
print("Preloading Vocoder...") | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["acoustic_modeling/Vocoder/*"], | |
) | |
downloaded_vocoder_path = local_dir | |
downloaded_resources["vocoder"] = True | |
print("Vocoder download completed") | |
print("All model resources preloading completed!") | |
# Initialize path variables to store downloaded model paths | |
downloaded_content_tokenizer_path = None | |
downloaded_content_style_tokenizer_path = None | |
downloaded_ar_vq32_path = None | |
downloaded_ar_phone_path = None | |
downloaded_fmt_path = None | |
downloaded_vocoder_path = None | |
# Preload all resources before creating the Gradio interface | |
preload_all_resources() | |
def get_pipeline(pipeline_type): | |
if pipeline_type in inference_pipelines: | |
return inference_pipelines[pipeline_type] | |
# Initialize pipeline based on the required pipeline type | |
if pipeline_type == "style" or pipeline_type == "voice": | |
# Use already downloaded Content Tokenizer | |
if downloaded_resources["tokenizer_vq32"]: | |
content_tokenizer_ckpt_path = os.path.join( | |
downloaded_content_tokenizer_path, "tokenizer/vq32/hubert_large_l18_c32.pkl" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["tokenizer/vq32/*"], | |
) | |
content_tokenizer_ckpt_path = os.path.join( | |
local_dir, "tokenizer/vq32/hubert_large_l18_c32.pkl" | |
) | |
# Use already downloaded Content-Style Tokenizer | |
if downloaded_resources["tokenizer_vq8192"]: | |
content_style_tokenizer_ckpt_path = os.path.join( | |
downloaded_content_style_tokenizer_path, "tokenizer/vq8192" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["tokenizer/vq8192/*"], | |
) | |
content_style_tokenizer_ckpt_path = os.path.join(local_dir, "tokenizer/vq8192") | |
# Use already downloaded Autoregressive Transformer | |
ar_cfg_path = "./models/vc/vevo/config/Vq32ToVq8192.json" | |
if downloaded_resources["ar_Vq32ToVq8192"]: | |
ar_ckpt_path = os.path.join( | |
downloaded_ar_vq32_path, "contentstyle_modeling/Vq32ToVq8192" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["contentstyle_modeling/Vq32ToVq8192/*"], | |
) | |
ar_ckpt_path = os.path.join(local_dir, "contentstyle_modeling/Vq32ToVq8192") | |
# Use already downloaded Flow Matching Transformer | |
fmt_cfg_path = "./models/vc/vevo/config/Vq8192ToMels.json" | |
if downloaded_resources["fmt_Vq8192ToMels"]: | |
fmt_ckpt_path = os.path.join( | |
downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["acoustic_modeling/Vq8192ToMels/*"], | |
) | |
fmt_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vq8192ToMels") | |
# Use already downloaded Vocoder | |
vocoder_cfg_path = "./models/vc/vevo/config/Vocoder.json" | |
if downloaded_resources["vocoder"]: | |
vocoder_ckpt_path = os.path.join( | |
downloaded_vocoder_path, "acoustic_modeling/Vocoder" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["acoustic_modeling/Vocoder/*"], | |
) | |
vocoder_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vocoder") | |
# Initialize pipeline | |
inference_pipeline = VevoInferencePipeline( | |
content_tokenizer_ckpt_path=content_tokenizer_ckpt_path, | |
content_style_tokenizer_ckpt_path=content_style_tokenizer_ckpt_path, | |
ar_cfg_path=ar_cfg_path, | |
ar_ckpt_path=ar_ckpt_path, | |
fmt_cfg_path=fmt_cfg_path, | |
fmt_ckpt_path=fmt_ckpt_path, | |
vocoder_cfg_path=vocoder_cfg_path, | |
vocoder_ckpt_path=vocoder_ckpt_path, | |
device=device, | |
) | |
elif pipeline_type == "timbre": | |
# Use already downloaded Content-Style Tokenizer | |
if downloaded_resources["tokenizer_vq8192"]: | |
content_style_tokenizer_ckpt_path = os.path.join( | |
downloaded_content_style_tokenizer_path, "tokenizer/vq8192" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["tokenizer/vq8192/*"], | |
) | |
content_style_tokenizer_ckpt_path = os.path.join(local_dir, "tokenizer/vq8192") | |
# Use already downloaded Flow Matching Transformer | |
fmt_cfg_path = "./models/vc/vevo/config/Vq8192ToMels.json" | |
if downloaded_resources["fmt_Vq8192ToMels"]: | |
fmt_ckpt_path = os.path.join( | |
downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["acoustic_modeling/Vq8192ToMels/*"], | |
) | |
fmt_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vq8192ToMels") | |
# Use already downloaded Vocoder | |
vocoder_cfg_path = "./models/vc/vevo/config/Vocoder.json" | |
if downloaded_resources["vocoder"]: | |
vocoder_ckpt_path = os.path.join( | |
downloaded_vocoder_path, "acoustic_modeling/Vocoder" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["acoustic_modeling/Vocoder/*"], | |
) | |
vocoder_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vocoder") | |
# Initialize pipeline | |
inference_pipeline = VevoInferencePipeline( | |
content_style_tokenizer_ckpt_path=content_style_tokenizer_ckpt_path, | |
fmt_cfg_path=fmt_cfg_path, | |
fmt_ckpt_path=fmt_ckpt_path, | |
vocoder_cfg_path=vocoder_cfg_path, | |
vocoder_ckpt_path=vocoder_ckpt_path, | |
device=device, | |
) | |
elif pipeline_type == "tts": | |
# Use already downloaded Content-Style Tokenizer | |
if downloaded_resources["tokenizer_vq8192"]: | |
content_style_tokenizer_ckpt_path = os.path.join( | |
downloaded_content_style_tokenizer_path, "tokenizer/vq8192" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["tokenizer/vq8192/*"], | |
) | |
content_style_tokenizer_ckpt_path = os.path.join(local_dir, "tokenizer/vq8192") | |
# Use already downloaded Autoregressive Transformer (TTS specific) | |
ar_cfg_path = "./models/vc/vevo/config/PhoneToVq8192.json" | |
if downloaded_resources["ar_PhoneToVq8192"]: | |
ar_ckpt_path = os.path.join( | |
downloaded_ar_phone_path, "contentstyle_modeling/PhoneToVq8192" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["contentstyle_modeling/PhoneToVq8192/*"], | |
) | |
ar_ckpt_path = os.path.join(local_dir, "contentstyle_modeling/PhoneToVq8192") | |
# Use already downloaded Flow Matching Transformer | |
fmt_cfg_path = "./models/vc/vevo/config/Vq8192ToMels.json" | |
if downloaded_resources["fmt_Vq8192ToMels"]: | |
fmt_ckpt_path = os.path.join( | |
downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["acoustic_modeling/Vq8192ToMels/*"], | |
) | |
fmt_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vq8192ToMels") | |
# Use already downloaded Vocoder | |
vocoder_cfg_path = "./models/vc/vevo/config/Vocoder.json" | |
if downloaded_resources["vocoder"]: | |
vocoder_ckpt_path = os.path.join( | |
downloaded_vocoder_path, "acoustic_modeling/Vocoder" | |
) | |
else: | |
# Fallback to direct download | |
local_dir = snapshot_download( | |
repo_id="amphion/Vevo", | |
repo_type="model", | |
cache_dir="./ckpts/Vevo", | |
allow_patterns=["acoustic_modeling/Vocoder/*"], | |
) | |
vocoder_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vocoder") | |
# Initialize pipeline | |
inference_pipeline = VevoInferencePipeline( | |
content_style_tokenizer_ckpt_path=content_style_tokenizer_ckpt_path, | |
ar_cfg_path=ar_cfg_path, | |
ar_ckpt_path=ar_ckpt_path, | |
fmt_cfg_path=fmt_cfg_path, | |
fmt_ckpt_path=fmt_ckpt_path, | |
vocoder_cfg_path=vocoder_cfg_path, | |
vocoder_ckpt_path=vocoder_ckpt_path, | |
device=device, | |
) | |
# Cache pipeline instance | |
inference_pipelines[pipeline_type] = inference_pipeline | |
return inference_pipeline | |
# Implement VEVO functionality functions | |
def vevo_style(content_wav, style_wav): | |
temp_content_path = "wav/temp_content.wav" | |
temp_style_path = "wav/temp_style.wav" | |
output_path = "wav/output_vevostyle.wav" | |
# Check and process audio data | |
if content_wav is None or style_wav is None: | |
raise ValueError("Please upload audio files") | |
# Process audio format | |
if isinstance(content_wav, tuple) and len(content_wav) == 2: | |
if isinstance(content_wav[0], np.ndarray): | |
content_data, content_sr = content_wav | |
else: | |
content_sr, content_data = content_wav | |
# Ensure single channel | |
if len(content_data.shape) > 1 and content_data.shape[1] > 1: | |
content_data = np.mean(content_data, axis=1) | |
# Resample to 24kHz | |
if content_sr != 24000: | |
content_tensor = torch.FloatTensor(content_data).unsqueeze(0) | |
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000) | |
content_sr = 24000 | |
else: | |
content_tensor = torch.FloatTensor(content_data).unsqueeze(0) | |
# Normalize volume | |
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95 | |
else: | |
raise ValueError("Invalid content audio format") | |
if isinstance(style_wav[0], np.ndarray): | |
style_data, style_sr = style_wav | |
else: | |
style_sr, style_data = style_wav | |
# Ensure single channel | |
if len(style_data.shape) > 1 and style_data.shape[1] > 1: | |
style_data = np.mean(style_data, axis=1) | |
# Resample to 24kHz | |
if style_sr != 24000: | |
style_tensor = torch.FloatTensor(style_data).unsqueeze(0) | |
style_tensor = torchaudio.functional.resample(style_tensor, style_sr, 24000) | |
style_sr = 24000 | |
else: | |
style_tensor = torch.FloatTensor(style_data).unsqueeze(0) | |
# Normalize volume | |
style_tensor = style_tensor / (torch.max(torch.abs(style_tensor)) + 1e-6) * 0.95 | |
# Print debug information | |
print(f"Content audio shape: {content_tensor.shape}, sample rate: {content_sr}") | |
print(f"Style audio shape: {style_tensor.shape}, sample rate: {style_sr}") | |
# Save audio | |
torchaudio.save(temp_content_path, content_tensor, content_sr) | |
torchaudio.save(temp_style_path, style_tensor, style_sr) | |
try: | |
# Get pipeline | |
pipeline = get_pipeline("style") | |
# Inference | |
gen_audio = pipeline.inference_ar_and_fm( | |
src_wav_path=temp_content_path, | |
src_text=None, | |
style_ref_wav_path=temp_style_path, | |
timbre_ref_wav_path=temp_content_path, | |
) | |
# Check if generated audio is numerical anomaly | |
if torch.isnan(gen_audio).any() or torch.isinf(gen_audio).any(): | |
print("Warning: Generated audio contains NaN or Inf values") | |
gen_audio = torch.nan_to_num(gen_audio, nan=0.0, posinf=0.95, neginf=-0.95) | |
print(f"Generated audio shape: {gen_audio.shape}, max: {torch.max(gen_audio)}, min: {torch.min(gen_audio)}") | |
# Save generated audio | |
save_audio(gen_audio, output_path=output_path) | |
return output_path | |
except Exception as e: | |
print(f"Error during processing: {e}") | |
import traceback | |
traceback.print_exc() | |
raise e | |
def vevo_timbre(content_wav, reference_wav): | |
temp_content_path = "wav/temp_content.wav" | |
temp_reference_path = "wav/temp_reference.wav" | |
output_path = "wav/output_vevotimbre.wav" | |
# Check and process audio data | |
if content_wav is None or reference_wav is None: | |
raise ValueError("Please upload audio files") | |
# Process content audio format | |
if isinstance(content_wav, tuple) and len(content_wav) == 2: | |
if isinstance(content_wav[0], np.ndarray): | |
content_data, content_sr = content_wav | |
else: | |
content_sr, content_data = content_wav | |
# Ensure single channel | |
if len(content_data.shape) > 1 and content_data.shape[1] > 1: | |
content_data = np.mean(content_data, axis=1) | |
# Resample to 24kHz | |
if content_sr != 24000: | |
content_tensor = torch.FloatTensor(content_data).unsqueeze(0) | |
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000) | |
content_sr = 24000 | |
else: | |
content_tensor = torch.FloatTensor(content_data).unsqueeze(0) | |
# Normalize volume | |
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95 | |
else: | |
raise ValueError("Invalid content audio format") | |
# Process reference audio format | |
if isinstance(reference_wav, tuple) and len(reference_wav) == 2: | |
if isinstance(reference_wav[0], np.ndarray): | |
reference_data, reference_sr = reference_wav | |
else: | |
reference_sr, reference_data = reference_wav | |
# Ensure single channel | |
if len(reference_data.shape) > 1 and reference_data.shape[1] > 1: | |
reference_data = np.mean(reference_data, axis=1) | |
# Resample to 24kHz | |
if reference_sr != 24000: | |
reference_tensor = torch.FloatTensor(reference_data).unsqueeze(0) | |
reference_tensor = torchaudio.functional.resample(reference_tensor, reference_sr, 24000) | |
reference_sr = 24000 | |
else: | |
reference_tensor = torch.FloatTensor(reference_data).unsqueeze(0) | |
# Normalize volume | |
reference_tensor = reference_tensor / (torch.max(torch.abs(reference_tensor)) + 1e-6) * 0.95 | |
else: | |
raise ValueError("Invalid reference audio format") | |
# Print debug information | |
print(f"Content audio shape: {content_tensor.shape}, sample rate: {content_sr}") | |
print(f"Reference audio shape: {reference_tensor.shape}, sample rate: {reference_sr}") | |
# Save uploaded audio | |
torchaudio.save(temp_content_path, content_tensor, content_sr) | |
torchaudio.save(temp_reference_path, reference_tensor, reference_sr) | |
try: | |
# Get pipeline | |
pipeline = get_pipeline("timbre") | |
# Inference | |
gen_audio = pipeline.inference_fm( | |
src_wav_path=temp_content_path, | |
timbre_ref_wav_path=temp_reference_path, | |
flow_matching_steps=32, | |
) | |
# Check if generated audio is numerical anomaly | |
if torch.isnan(gen_audio).any() or torch.isinf(gen_audio).any(): | |
print("Warning: Generated audio contains NaN or Inf values") | |
gen_audio = torch.nan_to_num(gen_audio, nan=0.0, posinf=0.95, neginf=-0.95) | |
print(f"Generated audio shape: {gen_audio.shape}, max: {torch.max(gen_audio)}, min: {torch.min(gen_audio)}") | |
# Save generated audio | |
save_audio(gen_audio, output_path=output_path) | |
return output_path | |
except Exception as e: | |
print(f"Error during processing: {e}") | |
import traceback | |
traceback.print_exc() | |
raise e | |
def vevo_voice(content_wav, style_reference_wav, timbre_reference_wav): | |
temp_content_path = "wav/temp_content.wav" | |
temp_style_path = "wav/temp_style.wav" | |
temp_timbre_path = "wav/temp_timbre.wav" | |
output_path = "wav/output_vevovoice.wav" | |
# Check and process audio data | |
if content_wav is None or style_reference_wav is None or timbre_reference_wav is None: | |
raise ValueError("Please upload all required audio files") | |
# Process content audio format | |
if isinstance(content_wav, tuple) and len(content_wav) == 2: | |
if isinstance(content_wav[0], np.ndarray): | |
content_data, content_sr = content_wav | |
else: | |
content_sr, content_data = content_wav | |
# Ensure single channel | |
if len(content_data.shape) > 1 and content_data.shape[1] > 1: | |
content_data = np.mean(content_data, axis=1) | |
# Resample to 24kHz | |
if content_sr != 24000: | |
content_tensor = torch.FloatTensor(content_data).unsqueeze(0) | |
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000) | |
content_sr = 24000 | |
else: | |
content_tensor = torch.FloatTensor(content_data).unsqueeze(0) | |
# Normalize volume | |
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95 | |
else: | |
raise ValueError("Invalid content audio format") | |
# Process style reference audio format | |
if isinstance(style_reference_wav, tuple) and len(style_reference_wav) == 2: | |
if isinstance(style_reference_wav[0], np.ndarray): | |
style_data, style_sr = style_reference_wav | |
else: | |
style_sr, style_data = style_reference_wav | |
# Ensure single channel | |
if len(style_data.shape) > 1 and style_data.shape[1] > 1: | |
style_data = np.mean(style_data, axis=1) | |
# Resample to 24kHz | |
if style_sr != 24000: | |
style_tensor = torch.FloatTensor(style_data).unsqueeze(0) | |
style_tensor = torchaudio.functional.resample(style_tensor, style_sr, 24000) | |
style_sr = 24000 | |
else: | |
style_tensor = torch.FloatTensor(style_data).unsqueeze(0) | |
# Normalize volume | |
style_tensor = style_tensor / (torch.max(torch.abs(style_tensor)) + 1e-6) * 0.95 | |
else: | |
raise ValueError("Invalid style reference audio format") | |
# Process timbre reference audio format | |
if isinstance(timbre_reference_wav, tuple) and len(timbre_reference_wav) == 2: | |
if isinstance(timbre_reference_wav[0], np.ndarray): | |
timbre_data, timbre_sr = timbre_reference_wav | |
else: | |
timbre_sr, timbre_data = timbre_reference_wav | |
# Ensure single channel | |
if len(timbre_data.shape) > 1 and timbre_data.shape[1] > 1: | |
timbre_data = np.mean(timbre_data, axis=1) | |
# Resample to 24kHz | |
if timbre_sr != 24000: | |
timbre_tensor = torch.FloatTensor(timbre_data).unsqueeze(0) | |
timbre_tensor = torchaudio.functional.resample(timbre_tensor, timbre_sr, 24000) | |
timbre_sr = 24000 | |
else: | |
timbre_tensor = torch.FloatTensor(timbre_data).unsqueeze(0) | |
# Normalize volume | |
timbre_tensor = timbre_tensor / (torch.max(torch.abs(timbre_tensor)) + 1e-6) * 0.95 | |
else: | |
raise ValueError("Invalid timbre reference audio format") | |
# Print debug information | |
print(f"Content audio shape: {content_tensor.shape}, sample rate: {content_sr}") | |
print(f"Style reference audio shape: {style_tensor.shape}, sample rate: {style_sr}") | |
print(f"Timbre reference audio shape: {timbre_tensor.shape}, sample rate: {timbre_sr}") | |
# Save uploaded audio | |
torchaudio.save(temp_content_path, content_tensor, content_sr) | |
torchaudio.save(temp_style_path, style_tensor, style_sr) | |
torchaudio.save(temp_timbre_path, timbre_tensor, timbre_sr) | |
try: | |
# Get pipeline | |
pipeline = get_pipeline("voice") | |
# Inference | |
gen_audio = pipeline.inference_ar_and_fm( | |
src_wav_path=temp_content_path, | |
src_text=None, | |
style_ref_wav_path=temp_style_path, | |
timbre_ref_wav_path=temp_timbre_path, | |
) | |
# Check if generated audio is numerical anomaly | |
if torch.isnan(gen_audio).any() or torch.isinf(gen_audio).any(): | |
print("Warning: Generated audio contains NaN or Inf values") | |
gen_audio = torch.nan_to_num(gen_audio, nan=0.0, posinf=0.95, neginf=-0.95) | |
print(f"Generated audio shape: {gen_audio.shape}, max: {torch.max(gen_audio)}, min: {torch.min(gen_audio)}") | |
# Save generated audio | |
save_audio(gen_audio, output_path=output_path) | |
return output_path | |
except Exception as e: | |
print(f"Error during processing: {e}") | |
import traceback | |
traceback.print_exc() | |
raise e | |
def vevo_tts(text, ref_wav, timbre_ref_wav=None, style_ref_text=None, src_language="en", ref_language="en", style_ref_text_language="en"): | |
temp_ref_path = "wav/temp_ref.wav" | |
temp_timbre_path = "wav/temp_timbre.wav" | |
output_path = "wav/output_vevotts.wav" | |
# Check and process audio data | |
if ref_wav is None: | |
raise ValueError("Please upload a reference audio file") | |
# Process reference audio format | |
if isinstance(ref_wav, tuple) and len(ref_wav) == 2: | |
if isinstance(ref_wav[0], np.ndarray): | |
ref_data, ref_sr = ref_wav | |
else: | |
ref_sr, ref_data = ref_wav | |
# Ensure single channel | |
if len(ref_data.shape) > 1 and ref_data.shape[1] > 1: | |
ref_data = np.mean(ref_data, axis=1) | |
# Resample to 24kHz | |
if ref_sr != 24000: | |
ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0) | |
ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, 24000) | |
ref_sr = 24000 | |
else: | |
ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0) | |
# Normalize volume | |
ref_tensor = ref_tensor / (torch.max(torch.abs(ref_tensor)) + 1e-6) * 0.95 | |
else: | |
raise ValueError("Invalid reference audio format") | |
# Print debug information | |
print(f"Reference audio shape: {ref_tensor.shape}, sample rate: {ref_sr}") | |
if style_ref_text: | |
print(f"Style reference text: {style_ref_text}, language: {style_ref_text_language}") | |
# Save uploaded audio | |
torchaudio.save(temp_ref_path, ref_tensor, ref_sr) | |
if timbre_ref_wav is not None: | |
if isinstance(timbre_ref_wav, tuple) and len(timbre_ref_wav) == 2: | |
if isinstance(timbre_ref_wav[0], np.ndarray): | |
timbre_data, timbre_sr = timbre_ref_wav | |
else: | |
timbre_sr, timbre_data = timbre_ref_wav | |
# Ensure single channel | |
if len(timbre_data.shape) > 1 and timbre_data.shape[1] > 1: | |
timbre_data = np.mean(timbre_data, axis=1) | |
# Resample to 24kHz | |
if timbre_sr != 24000: | |
timbre_tensor = torch.FloatTensor(timbre_data).unsqueeze(0) | |
timbre_tensor = torchaudio.functional.resample(timbre_tensor, timbre_sr, 24000) | |
timbre_sr = 24000 | |
else: | |
timbre_tensor = torch.FloatTensor(timbre_data).unsqueeze(0) | |
# Normalize volume | |
timbre_tensor = timbre_tensor / (torch.max(torch.abs(timbre_tensor)) + 1e-6) * 0.95 | |
print(f"Timbre reference audio shape: {timbre_tensor.shape}, sample rate: {timbre_sr}") | |
torchaudio.save(temp_timbre_path, timbre_tensor, timbre_sr) | |
else: | |
raise ValueError("Invalid timbre reference audio format") | |
else: | |
temp_timbre_path = temp_ref_path | |
try: | |
# Get pipeline | |
pipeline = get_pipeline("tts") | |
# Inference | |
gen_audio = pipeline.inference_ar_and_fm( | |
src_wav_path=None, | |
src_text=text, | |
style_ref_wav_path=temp_ref_path, | |
timbre_ref_wav_path=temp_timbre_path, | |
style_ref_wav_text=style_ref_text, | |
src_text_language=src_language, | |
style_ref_wav_text_language=style_ref_text_language, | |
) | |
# Check if generated audio is numerical anomaly | |
if torch.isnan(gen_audio).any() or torch.isinf(gen_audio).any(): | |
print("Warning: Generated audio contains NaN or Inf values") | |
gen_audio = torch.nan_to_num(gen_audio, nan=0.0, posinf=0.95, neginf=-0.95) | |
print(f"Generated audio shape: {gen_audio.shape}, max: {torch.max(gen_audio)}, min: {torch.min(gen_audio)}") | |
# Save generated audio | |
save_audio(gen_audio, output_path=output_path) | |
return output_path | |
except Exception as e: | |
print(f"Error during processing: {e}") | |
import traceback | |
traceback.print_exc() | |
raise e | |
# Create Gradio interface | |
with gr.Blocks(title="Vevo: Controllable Zero-Shot Voice Imitation with Self-Supervised Disentanglement") as demo: | |
gr.Markdown("# Vevo: Controllable Zero-Shot Voice Imitation with Self-Supervised Disentanglement") | |
# Add link tag line | |
with gr.Row(elem_id="links_row"): | |
gr.HTML(""" | |
<div style="display: flex; justify-content: flex-start; gap: 8px; margin: 0 0; padding-left: 0px;"> | |
<a href="https://arxiv.org/abs/2502.07243" target="_blank" style="text-decoration: none;"> | |
<img alt="arXiv Paper" src="https://img.shields.io/badge/arXiv-Paper-red"> | |
</a> | |
<a href="https://openreview.net/pdf?id=anQDiQZhDP" target="_blank" style="text-decoration: none;"> | |
<img alt="ICLR Paper" src="https://img.shields.io/badge/ICLR-Paper-64b63a"> | |
</a> | |
<a href="https://huggingface.co./amphion/Vevo" target="_blank" style="text-decoration: none;"> | |
<img alt="HuggingFace Model" src="https://img.shields.io/badge/%F0%9F%A4%97%20HuggingFace-Model-yellow"> | |
</a> | |
<a href="https://github.com/open-mmlab/Amphion/tree/main/models/vc/vevo" target="_blank" style="text-decoration: none;"> | |
<img alt="GitHub Repo" src="https://img.shields.io/badge/GitHub-Repo-blue"> | |
</a> | |
</div> | |
""") | |
with gr.Tab("Vevo-Timbre"): | |
gr.Markdown("### Vevo-Timbre: Maintain style but transfer timbre") | |
with gr.Row(): | |
with gr.Column(): | |
timbre_content = gr.Audio(label="Source Audio", type="numpy") | |
timbre_reference = gr.Audio(label="Timbre Reference", type="numpy") | |
timbre_button = gr.Button("Generate") | |
with gr.Column(): | |
timbre_output = gr.Audio(label="Result") | |
timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output) | |
with gr.Tab("Vevo-Style"): | |
gr.Markdown("### Vevo-Style: Maintain timbre but transfer style (accent, emotion, etc.)") | |
with gr.Row(): | |
with gr.Column(): | |
style_content = gr.Audio(label="Source Audio", type="numpy") | |
style_reference = gr.Audio(label="Style Reference", type="numpy") | |
style_button = gr.Button("Generate") | |
with gr.Column(): | |
style_output = gr.Audio(label="Result") | |
style_button.click(vevo_style, inputs=[style_content, style_reference], outputs=style_output) | |
with gr.Tab("Vevo-Voice"): | |
gr.Markdown("### Vevo-Voice: Transfers both style and timbre with separate references") | |
with gr.Row(): | |
with gr.Column(): | |
voice_content = gr.Audio(label="Source Audio", type="numpy") | |
voice_style_reference = gr.Audio(label="Style Reference", type="numpy") | |
voice_timbre_reference = gr.Audio(label="Timbre Reference", type="numpy") | |
voice_button = gr.Button("Generate") | |
with gr.Column(): | |
voice_output = gr.Audio(label="Result") | |
voice_button.click(vevo_voice, inputs=[voice_content, voice_style_reference, voice_timbre_reference], outputs=voice_output) | |
with gr.Tab("Vevo-TTS"): | |
gr.Markdown("### Vevo-TTS: Text-to-speech with separate style and timbre references") | |
with gr.Row(): | |
with gr.Column(): | |
tts_text = gr.Textbox(label="Target Text", placeholder="Enter text to synthesize...", lines=3) | |
tts_src_language = gr.Dropdown(["en", "zh", "de", "fr", "ja", "ko"], label="Text Language", value="en") | |
tts_reference = gr.Audio(label="Style Reference", type="numpy") | |
tts_style_ref_text = gr.Textbox(label="Style Reference Text", placeholder="Enter style reference text...", lines=3) | |
tts_style_ref_text_language = gr.Dropdown(["en", "zh", "de", "fr", "ja", "ko"], label="Style Reference Text Language", value="en") | |
tts_timbre_reference = gr.Audio(label="Timbre Reference", type="numpy") | |
tts_button = gr.Button("Generate") | |
with gr.Column(): | |
tts_output = gr.Audio(label="Result") | |
tts_button.click( | |
vevo_tts, | |
inputs=[tts_text, tts_reference, tts_timbre_reference, tts_style_ref_text, tts_src_language, tts_style_ref_text_language], | |
outputs=tts_output | |
) | |
gr.Markdown(""" | |
## About VEVO | |
VEVO is a versatile voice synthesis and conversion model that offers four main functionalities: | |
1. **Vevo-Style**: Maintains timbre but transfers style (accent, emotion, etc.) | |
2. **Vevo-Timbre**: Maintains style but transfers timbre | |
3. **Vevo-Voice**: Transfers both style and timbre with separate references | |
4. **Vevo-TTS**: Text-to-speech with separate style and timbre references | |
For more information, visit the [Amphion project](https://github.com/open-mmlab/Amphion) | |
""") | |
# Launch application | |
demo.launch() |