Spaces:
Running
Running
""" | |
Text-to-speech audio generation for translated subtitles. | |
""" | |
import os | |
import time | |
import shutil | |
import tempfile | |
from pathlib import Path | |
from tqdm import tqdm | |
import subprocess | |
from gtts import gTTS | |
import pysrt | |
from src.utils.logger import get_logger | |
from src.audio.extractor import create_silent_audio | |
from config import OUTPUT_DIR, TTS_VOICES, MAX_RETRY_ATTEMPTS | |
logger = get_logger(__name__) | |
def generate_translated_audio(srt_path, target_lang, video_duration=180): | |
""" | |
Generate translated audio using text-to-speech for each subtitle. | |
Args: | |
srt_path (str): Path to the SRT subtitle file | |
target_lang (str): Target language code (e.g., 'en', 'es') | |
video_duration (float): Duration of the original video in seconds | |
Returns: | |
Path: Path to the translated audio file | |
Raises: | |
Exception: If audio generation fails | |
""" | |
try: | |
srt_path = Path(srt_path) | |
logger.info(f"Generating translated audio for {target_lang} from {srt_path}") | |
# Load subtitles | |
subs = pysrt.open(srt_path, encoding="utf-8") | |
logger.info(f"Loaded {len(subs)} subtitles from SRT file") | |
# Create temporary directory for audio chunks | |
temp_dir = Path(tempfile.mkdtemp(prefix=f"audio_{target_lang}_", dir=OUTPUT_DIR / "temp")) | |
logger.debug(f"Created temporary directory: {temp_dir}") | |
# Generate TTS for each subtitle | |
audio_files = [] | |
timings = [] | |
logger.info(f"Generating speech for {len(subs)} subtitles in {target_lang}") | |
for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} speech")): | |
text = sub.text.strip() | |
if not text: | |
continue | |
# Get timing information | |
start_time = (sub.start.hours * 3600 + | |
sub.start.minutes * 60 + | |
sub.start.seconds + | |
sub.start.milliseconds / 1000) | |
end_time = (sub.end.hours * 3600 + | |
sub.end.minutes * 60 + | |
sub.end.seconds + | |
sub.end.milliseconds / 1000) | |
duration = end_time - start_time | |
# Generate TTS audio | |
tts_lang = TTS_VOICES.get(target_lang, target_lang) | |
audio_file = temp_dir / f"chunk_{i:04d}.mp3" | |
# Add a retry mechanism | |
retry_count = 0 | |
while retry_count < MAX_RETRY_ATTEMPTS: | |
try: | |
# For certain languages, use slower speed | |
slow_option = target_lang in ["hi", "ja", "zh-CN", "ar"] | |
tts = gTTS(text=text, lang=target_lang, slow=slow_option) | |
tts.save(str(audio_file)) | |
logger.info(f"Generated TTS file size for chunk {i}: {audio_file.stat().st_size} bytes") | |
if audio_file.exists() and audio_file.stat().st_size > 0: | |
break | |
else: | |
raise Exception("Generated audio file is empty") | |
except Exception as e: | |
retry_count += 1 | |
logger.warning(f"TTS attempt {retry_count} failed for {target_lang}: {str(e)}") | |
time.sleep(1) | |
# Fallback to shortened text | |
if retry_count == MAX_RETRY_ATTEMPTS - 1 and len(text) > 100: | |
logger.warning(f"Trying with shortened text for {target_lang}") | |
shortened_text = text[:100] + "..." | |
tts = gTTS(text=shortened_text, lang=target_lang, slow=True) | |
tts.save(str(audio_file)) | |
if audio_file.exists() and audio_file.stat().st_size > 0: | |
audio_files.append(audio_file) | |
timings.append((start_time, end_time, duration, audio_file)) | |
else: | |
logger.warning(f"Failed to generate audio for subtitle {i}") | |
# Fallback if no audio generated | |
if not audio_files: | |
logger.warning(f"No audio files generated for {target_lang}") | |
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" | |
create_silent_audio(video_duration, silent_audio) | |
return silent_audio | |
# Output configuration | |
output_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.mp3" | |
silence_file = temp_dir / "silence.wav" | |
create_silent_audio(video_duration, silence_file) | |
# Validate input files | |
for f in [silence_file, *audio_files]: | |
if not f.exists(): | |
logger.error(f"Missing input file: {f}") | |
return create_silent_audio(video_duration, output_audio) | |
# Build FFmpeg command with volume boost and timing | |
cmd = ['ffmpeg', '-y'] | |
cmd += ['-i', str(silence_file)] | |
# Add all audio chunks as inputs | |
for audio_file in audio_files: | |
cmd += ['-i', str(audio_file)] | |
# Create filter chain for each audio chunk | |
filter_chains = [] | |
for i, (start_time, _, _, _) in enumerate(timings): | |
delay_ms = int(start_time * 1000) | |
filter_chains.append( | |
f"[{i+1}:a]volume=12dB,adelay={delay_ms}|{delay_ms},apad=whole_dur={video_duration}[a{i}]" | |
) | |
# Mix all audio streams with normalization | |
mix_inputs = ''.join([f"[a{i}]" for i in range(len(timings))]) | |
filter_complex = ";".join(filter_chains) + \ | |
f";{mix_inputs}amix=inputs={len(timings)}:duration=longest:normalize=0,volume=3dB[aout]" | |
cmd += [ | |
'-filter_complex', filter_complex, | |
'-map', '[aout]', | |
'-c:a', 'libmp3lame', # Changed to MP3 codec | |
'-b:a', '192k', | |
str(output_audio) | |
] | |
logger.debug(f"Running FFmpeg command: {' '.join(cmd)}") | |
# Execute audio mixing | |
process = subprocess.run(cmd, capture_output=True, text=True) | |
if process.returncode != 0: | |
logger.error(f"Audio mixing failed: {process.stderr}") | |
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" | |
create_silent_audio(video_duration, silent_audio) | |
return silent_audio | |
logger.info(f"Final audio file size: {output_audio.stat().st_size} bytes") | |
# Cleanup temporary files | |
try: | |
shutil.rmtree(temp_dir) | |
logger.debug(f"Cleaned temporary directory: {temp_dir}") | |
except Exception as e: | |
logger.warning(f"Temp cleanup failed: {str(e)}") | |
return output_audio | |
except Exception as e: | |
logger.error(f"Audio generation failed: {str(e)}", exc_info=True) | |
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" | |
create_silent_audio(video_duration, silent_audio) | |
return silent_audio |