LinguaStream / src /audio /generator.py
Maaz1's picture
Update src/audio/generator.py
e0f225e verified
"""
Text-to-speech audio generation for translated subtitles.
"""
import os
import time
import shutil
import tempfile
from pathlib import Path
from tqdm import tqdm
import subprocess
from gtts import gTTS
import pysrt
from src.utils.logger import get_logger
from src.audio.extractor import create_silent_audio
from config import OUTPUT_DIR, TTS_VOICES, MAX_RETRY_ATTEMPTS
logger = get_logger(__name__)
def generate_translated_audio(srt_path, target_lang, video_duration=180):
"""
Generate translated audio using text-to-speech for each subtitle.
Args:
srt_path (str): Path to the SRT subtitle file
target_lang (str): Target language code (e.g., 'en', 'es')
video_duration (float): Duration of the original video in seconds
Returns:
Path: Path to the translated audio file
Raises:
Exception: If audio generation fails
"""
try:
srt_path = Path(srt_path)
logger.info(f"Generating translated audio for {target_lang} from {srt_path}")
# Load subtitles
subs = pysrt.open(srt_path, encoding="utf-8")
logger.info(f"Loaded {len(subs)} subtitles from SRT file")
# Create temporary directory for audio chunks
temp_dir = Path(tempfile.mkdtemp(prefix=f"audio_{target_lang}_", dir=OUTPUT_DIR / "temp"))
logger.debug(f"Created temporary directory: {temp_dir}")
# Generate TTS for each subtitle
audio_files = []
timings = []
logger.info(f"Generating speech for {len(subs)} subtitles in {target_lang}")
for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} speech")):
text = sub.text.strip()
if not text:
continue
# Get timing information
start_time = (sub.start.hours * 3600 +
sub.start.minutes * 60 +
sub.start.seconds +
sub.start.milliseconds / 1000)
end_time = (sub.end.hours * 3600 +
sub.end.minutes * 60 +
sub.end.seconds +
sub.end.milliseconds / 1000)
duration = end_time - start_time
# Generate TTS audio
tts_lang = TTS_VOICES.get(target_lang, target_lang)
audio_file = temp_dir / f"chunk_{i:04d}.mp3"
# Add a retry mechanism
retry_count = 0
while retry_count < MAX_RETRY_ATTEMPTS:
try:
# For certain languages, use slower speed
slow_option = target_lang in ["hi", "ja", "zh-CN", "ar"]
tts = gTTS(text=text, lang=target_lang, slow=slow_option)
tts.save(str(audio_file))
logger.info(f"Generated TTS file size for chunk {i}: {audio_file.stat().st_size} bytes")
if audio_file.exists() and audio_file.stat().st_size > 0:
break
else:
raise Exception("Generated audio file is empty")
except Exception as e:
retry_count += 1
logger.warning(f"TTS attempt {retry_count} failed for {target_lang}: {str(e)}")
time.sleep(1)
# Fallback to shortened text
if retry_count == MAX_RETRY_ATTEMPTS - 1 and len(text) > 100:
logger.warning(f"Trying with shortened text for {target_lang}")
shortened_text = text[:100] + "..."
tts = gTTS(text=shortened_text, lang=target_lang, slow=True)
tts.save(str(audio_file))
if audio_file.exists() and audio_file.stat().st_size > 0:
audio_files.append(audio_file)
timings.append((start_time, end_time, duration, audio_file))
else:
logger.warning(f"Failed to generate audio for subtitle {i}")
# Fallback if no audio generated
if not audio_files:
logger.warning(f"No audio files generated for {target_lang}")
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav"
create_silent_audio(video_duration, silent_audio)
return silent_audio
# Output configuration
output_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.mp3"
silence_file = temp_dir / "silence.wav"
create_silent_audio(video_duration, silence_file)
# Validate input files
for f in [silence_file, *audio_files]:
if not f.exists():
logger.error(f"Missing input file: {f}")
return create_silent_audio(video_duration, output_audio)
# Build FFmpeg command with volume boost and timing
cmd = ['ffmpeg', '-y']
cmd += ['-i', str(silence_file)]
# Add all audio chunks as inputs
for audio_file in audio_files:
cmd += ['-i', str(audio_file)]
# Create filter chain for each audio chunk
filter_chains = []
for i, (start_time, _, _, _) in enumerate(timings):
delay_ms = int(start_time * 1000)
filter_chains.append(
f"[{i+1}:a]volume=12dB,adelay={delay_ms}|{delay_ms},apad=whole_dur={video_duration}[a{i}]"
)
# Mix all audio streams with normalization
mix_inputs = ''.join([f"[a{i}]" for i in range(len(timings))])
filter_complex = ";".join(filter_chains) + \
f";{mix_inputs}amix=inputs={len(timings)}:duration=longest:normalize=0,volume=3dB[aout]"
cmd += [
'-filter_complex', filter_complex,
'-map', '[aout]',
'-c:a', 'libmp3lame', # Changed to MP3 codec
'-b:a', '192k',
str(output_audio)
]
logger.debug(f"Running FFmpeg command: {' '.join(cmd)}")
# Execute audio mixing
process = subprocess.run(cmd, capture_output=True, text=True)
if process.returncode != 0:
logger.error(f"Audio mixing failed: {process.stderr}")
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav"
create_silent_audio(video_duration, silent_audio)
return silent_audio
logger.info(f"Final audio file size: {output_audio.stat().st_size} bytes")
# Cleanup temporary files
try:
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Temp cleanup failed: {str(e)}")
return output_audio
except Exception as e:
logger.error(f"Audio generation failed: {str(e)}", exc_info=True)
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav"
create_silent_audio(video_duration, silent_audio)
return silent_audio