"""Audio processing utilities for CSM-1B TTS API.""" import logging import numpy as np import torch from scipy import signal logger = logging.getLogger(__name__) def remove_long_silences( audio: torch.Tensor, sample_rate: int, min_speech_energy: float = 0.015, max_silence_sec: float = 0.4, keep_silence_sec: float = 0.1, ) -> torch.Tensor: """ Remove uncomfortably long silences from audio while preserving natural pauses. Args: audio: Audio tensor sample_rate: Sample rate in Hz min_speech_energy: Minimum RMS energy to consider as speech max_silence_sec: Maximum silence duration to keep in seconds keep_silence_sec: Amount of silence to keep at speech boundaries Returns: Audio with long silences removed """ # Convert to numpy for processing audio_np = audio.cpu().numpy() # Calculate frame size and hop length frame_size = int(0.02 * sample_rate) # 20ms frames hop_length = int(0.01 * sample_rate) # 10ms hop # Compute frame energy frames = [] for i in range(0, len(audio_np) - frame_size + 1, hop_length): frames.append(audio_np[i:i+frame_size]) if len(frames) < 2: # If audio is too short for analysis return audio frames = np.array(frames) # Root mean square energy frame_energy = np.sqrt(np.mean(frames**2, axis=1)) # Adaptive threshold based on audio content # Uses a percentile to adapt to different audio characteristics energy_threshold = max( min_speech_energy, # Minimum threshold np.percentile(frame_energy, 10) # Adapt to audio ) # Identify speech frames is_speech = frame_energy > energy_threshold # Convert frame indices to sample indices considering overlapping frames speech_segments = [] in_speech = False speech_start = 0 for i in range(len(is_speech)): if is_speech[i] and not in_speech: # Start of speech in_speech = True # Calculate start sample including keep_silence speech_start = max(0, i * hop_length - int(keep_silence_sec * sample_rate)) elif not is_speech[i] and in_speech: # Potential end of speech, look ahead to check if silence continues silence_length = 0 for j in range(i, min(len(is_speech), i + int(max_silence_sec * sample_rate / hop_length))): if not is_speech[j]: silence_length += 1 else: break if silence_length * hop_length >= max_silence_sec * sample_rate: # End of speech, long enough silence detected in_speech = False # Calculate end sample including keep_silence speech_end = min(len(audio_np), i * hop_length + int(keep_silence_sec * sample_rate)) speech_segments.append((speech_start, speech_end)) # Handle the case where audio ends during speech if in_speech: speech_segments.append((speech_start, len(audio_np))) if not speech_segments: logger.warning("No speech segments detected, returning original audio") return audio # Combine speech segments with controlled silence durations result = [] # Add initial silence if the first segment doesn't start at the beginning if speech_segments[0][0] > 0: # Add a short leading silence (100ms) silence_samples = min(int(0.1 * sample_rate), speech_segments[0][0]) if silence_samples > 0: result.append(audio_np[speech_segments[0][0] - silence_samples:speech_segments[0][0]]) # Process each speech segment for i, (start, end) in enumerate(speech_segments): # Add this speech segment result.append(audio_np[start:end]) # Add a controlled silence between segments if i < len(speech_segments) - 1: next_start = speech_segments[i+1][0] # Calculate available silence duration available_silence = next_start - end if available_silence > 0: # Use either the actual silence (if shorter than max) or the max allowed silence_duration = min(available_silence, int(max_silence_sec * sample_rate)) # Take the first portion of the silence - usually cleaner result.append(audio_np[end:end + silence_duration]) # Combine all parts processed_audio = np.concatenate(result) # Log the results original_duration = len(audio_np) / sample_rate processed_duration = len(processed_audio) / sample_rate logger.info(f"Silence removal: {original_duration:.2f}s -> {processed_duration:.2f}s ({processed_duration/original_duration*100:.1f}%)") # Return as tensor with original device and dtype return torch.tensor(processed_audio, device=audio.device, dtype=audio.dtype) def create_high_shelf_filter(audio, sample_rate, frequency=4000, gain_db=3.0): """ Create a high shelf filter to boost frequencies above the given frequency. Args: audio: Audio numpy array sample_rate: Sample rate in Hz frequency: Shelf frequency in Hz gain_db: Gain in dB for frequencies above the shelf Returns: Filtered audio """ # Convert gain from dB to linear gain = 10 ** (gain_db / 20.0) # Normalized frequency (0 to 1, where 1 is Nyquist frequency) normalized_freq = 2.0 * frequency / sample_rate # Design a high-shelf biquad filter # This is a standard second-order section (SOS) implementation b0 = gain b1 = 0 b2 = 0 a0 = 1 a1 = 0 a2 = 0 # Simple first-order high-shelf filter alpha = np.sin(np.pi * normalized_freq) / 2 * np.sqrt((gain + 1/gain) * (1/0.5 - 1) + 2) cos_w0 = np.cos(np.pi * normalized_freq) b0 = gain * ((gain + 1) + (gain - 1) * cos_w0 + 2 * np.sqrt(gain) * alpha) b1 = -2 * gain * ((gain - 1) + (gain + 1) * cos_w0) b2 = gain * ((gain + 1) + (gain - 1) * cos_w0 - 2 * np.sqrt(gain) * alpha) a0 = (gain + 1) - (gain - 1) * cos_w0 + 2 * np.sqrt(gain) * alpha a1 = 2 * ((gain - 1) - (gain + 1) * cos_w0) a2 = (gain + 1) - (gain - 1) * cos_w0 - 2 * np.sqrt(gain) * alpha # Normalize coefficients b = np.array([b0, b1, b2]) / a0 a = np.array([1.0, a1/a0, a2/a0]) # Apply the filter return signal.lfilter(b, a, audio) def enhance_audio_quality(audio: torch.Tensor, sample_rate: int) -> torch.Tensor: """ Enhance audio quality by applying various processing techniques. Args: audio: Audio tensor sample_rate: Sample rate in Hz Returns: Enhanced audio tensor """ try: audio_np = audio.cpu().numpy() # Remove DC offset audio_np = audio_np - np.mean(audio_np) # Apply light compression to improve perceived loudness # Compress by reducing peaks and increasing quieter parts slightly threshold = 0.5 ratio = 1.5 attack = 0.01 release = 0.1 # Simple compression algorithm gain = np.ones_like(audio_np) for i in range(1, len(audio_np)): level = abs(audio_np[i]) if level > threshold: gain[i] = threshold + (level - threshold) / ratio gain[i] = gain[i] / level if level > 0 else 1.0 else: gain[i] = 1.0 # Smooth gain changes gain[i] = gain[i-1] + (gain[i] - gain[i-1]) * (attack if gain[i] < gain[i-1] else release) audio_np = audio_np * gain # Apply high-shelf filter to enhance speech clarity # Boost frequencies above 4000 Hz by 3 dB audio_np = create_high_shelf_filter(audio_np, sample_rate, frequency=4000, gain_db=3.0) # Normalize to prevent clipping max_val = np.max(np.abs(audio_np)) if max_val > 0: audio_np = audio_np * 0.95 / max_val return torch.tensor(audio_np, device=audio.device, dtype=audio.dtype) except Exception as e: logger.warning(f"Audio quality enhancement failed: {e}") return audio