Spaces:
Running
Running

Refactor model structure: update import paths from 'app.modelz' to 'app.models' across multiple files for consistency, remove obsolete 'modelz' directory, and adjust Dockerfile and migration script to reflect these changes, enhancing clarity and organization in the codebase.
c27f115
"""Advanced voice memory system for consistent voice generation.""" | |
import os | |
import torch | |
import torchaudio | |
import numpy as np | |
import random | |
import logging | |
from typing import Dict, List, Optional | |
from dataclasses import dataclass | |
from app.model import Segment | |
# Setup logging | |
logger = logging.getLogger(__name__) | |
# Path to store voice memories - use persistent location | |
VOICE_MEMORIES_DIR = "/app/voice_memories" | |
os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True) | |
class VoiceMemory: | |
"""Store voice characteristics for consistent generation.""" | |
name: str # Voice name (alloy, echo, etc.) | |
speaker_id: int # Speaker ID (0-5) | |
# Store multiple audio segments for context | |
audio_segments: List[torch.Tensor] | |
# Store text prompts that produced good results | |
text_segments: List[str] | |
# Base characteristics for this voice | |
pitch_base: float # Base pitch characteristic (Hz) | |
timbre: str # Voice quality descriptor | |
def get_context_segments(self, device: torch.device, max_segments: int = 2) -> List[Segment]: | |
"""Get context segments for this voice.""" | |
if not self.audio_segments: | |
return [] | |
# Select a limited number of segments to avoid context overflow | |
num_segments = min(len(self.audio_segments), max_segments) | |
indices = list(range(len(self.audio_segments))) | |
random.shuffle(indices) | |
selected_indices = indices[:num_segments] | |
segments = [] | |
for i in selected_indices: | |
segments.append( | |
Segment( | |
speaker=self.speaker_id, | |
text=self.text_segments[i] if i < len(self.text_segments) else f"Voice sample {i}", | |
audio=self.audio_segments[i].to(device) | |
) | |
) | |
return segments | |
def update_with_new_audio(self, audio: torch.Tensor, text: str, max_stored: int = 5): | |
"""Update voice memory with newly generated audio.""" | |
# Add new audio and text | |
self.audio_segments.append(audio.detach().cpu()) | |
self.text_segments.append(text) | |
# Keep only the most recent segments | |
if len(self.audio_segments) > max_stored: | |
self.audio_segments = self.audio_segments[-max_stored:] | |
self.text_segments = self.text_segments[-max_stored:] | |
def save(self): | |
"""Save voice memory to persistent storage.""" | |
data = { | |
"name": self.name, | |
"speaker_id": self.speaker_id, | |
"audio_segments": self.audio_segments, | |
"text_segments": self.text_segments, | |
"pitch_base": self.pitch_base, | |
"timbre": self.timbre | |
} | |
# Save to the persistent directory | |
save_path = os.path.join(VOICE_MEMORIES_DIR, f"{self.name}.pt") | |
try: | |
torch.save(data, save_path) | |
logger.info(f"Saved voice memory for {self.name} to {save_path}") | |
except Exception as e: | |
logger.error(f"Error saving voice memory for {self.name}: {e}") | |
def load(cls, name: str) -> Optional['VoiceMemory']: | |
"""Load voice memory from persistent storage.""" | |
path = os.path.join(VOICE_MEMORIES_DIR, f"{name}.pt") | |
if not os.path.exists(path): | |
logger.info(f"No saved voice memory found for {name} at {path}") | |
return None | |
try: | |
data = torch.load(path) | |
return cls( | |
name=data["name"], | |
speaker_id=data["speaker_id"], | |
audio_segments=data["audio_segments"], | |
text_segments=data["text_segments"], | |
pitch_base=data["pitch_base"], | |
timbre=data["timbre"] | |
) | |
except Exception as e: | |
logger.error(f"Error loading voice memory for {name}: {e}") | |
return None | |
# Dictionary of voice memories | |
VOICE_MEMORIES: Dict[str, VoiceMemory] = {} | |
# Voice characteristics | |
VOICE_CHARACTERISTICS = { | |
"alloy": {"pitch": 220.0, "timbre": "balanced", "description": "A balanced, natural voice with medium pitch"}, | |
"echo": {"pitch": 330.0, "timbre": "resonant", "description": "A resonant voice with a reverberant quality"}, | |
"fable": {"pitch": 523.0, "timbre": "bright", "description": "A bright, higher-pitched voice with clear articulation"}, | |
"onyx": {"pitch": 165.0, "timbre": "deep", "description": "A deep, authoritative voice with lower pitch"}, | |
"nova": {"pitch": 392.0, "timbre": "warm", "description": "A warm, smooth voice with pleasant midrange tone"}, | |
"shimmer": {"pitch": 587.0, "timbre": "light", "description": "A light, airy voice with higher frequencies"} | |
} | |
# Voice intro texts - carefully crafted to capture voice characteristics | |
VOICE_INTROS = { | |
"alloy": [ | |
"Hello, I'm Alloy. My voice is designed to be clear and balanced.", | |
"This is the Alloy voice. I aim to sound natural and easy to understand.", | |
"Welcome, I'm the voice known as Alloy. I have a balanced, medium-range tone." | |
], | |
"echo": [ | |
"Hello, I'm Echo. My voice has a rich, resonant quality.", | |
"This is the Echo voice. Notice my distinctive resonance and depth.", | |
"Welcome, I'm the voice known as Echo. My tone is designed to resonate clearly." | |
], | |
"fable": [ | |
"Hello, I'm Fable. My voice is bright and articulate.", | |
"This is the Fable voice. I have a higher pitch with clear pronunciation.", | |
"Welcome, I'm the voice known as Fable. I speak with a bright, energetic tone." | |
], | |
"onyx": [ | |
"Hello, I'm Onyx. My voice is deep and authoritative.", | |
"This is the Onyx voice. I speak with a lower pitch and commanding presence.", | |
"Welcome, I'm the voice known as Onyx. My tone is deep and resonant." | |
], | |
"nova": [ | |
"Hello, I'm Nova. My voice is warm and harmonious.", | |
"This is the Nova voice. I have a smooth, pleasant mid-range quality.", | |
"Welcome, I'm the voice known as Nova. I speak with a warm, friendly tone." | |
], | |
"shimmer": [ | |
"Hello, I'm Shimmer. My voice is light and expressive.", | |
"This is the Shimmer voice. I have a higher-pitched, airy quality.", | |
"Welcome, I'm the voice known as Shimmer. My tone is bright and crisp." | |
] | |
} | |
def initialize_voices(sample_rate: int = 24000): | |
"""Initialize voice memories with consistent base samples.""" | |
global VOICE_MEMORIES | |
# Check if persistent directory exists, create if needed | |
os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True) | |
logger.info(f"Using voice memories directory: {VOICE_MEMORIES_DIR}") | |
# First try to load existing memories from persistent storage | |
for voice_name in ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]: | |
memory = VoiceMemory.load(voice_name) | |
if memory: | |
VOICE_MEMORIES[voice_name] = memory | |
logger.info(f"Loaded existing voice memory for {voice_name} with {len(memory.audio_segments)} segments") | |
continue | |
# If no memory exists, create a new one | |
speaker_id = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"].index(voice_name) | |
characteristics = VOICE_CHARACTERISTICS[voice_name] | |
# Create deterministic seed audio | |
np.random.seed(speaker_id + 42) | |
duration = 1.0 # seconds | |
t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False) | |
# Create characteristic waveform | |
pitch = characteristics["pitch"] | |
if voice_name == "alloy": | |
audio = 0.5 * np.sin(2 * np.pi * pitch * t) + 0.3 * np.sin(2 * np.pi * pitch * 2 * t) | |
elif voice_name == "echo": | |
audio = np.sin(2 * np.pi * pitch * t) * np.exp(-t * 3) | |
elif voice_name == "fable": | |
audio = 0.7 * np.sin(2 * np.pi * pitch * t) | |
elif voice_name == "onyx": | |
audio = 0.8 * np.sin(2 * np.pi * pitch * t) + 0.1 * np.sin(2 * np.pi * pitch * 0.5 * t) | |
elif voice_name == "nova": | |
audio = 0.4 * np.sin(2 * np.pi * pitch * t) + 0.4 * np.sin(2 * np.pi * pitch * 0.5 * t) | |
else: # shimmer | |
audio = 0.3 * np.sin(2 * np.pi * pitch * t) + 0.2 * np.sin(2 * np.pi * pitch * 1.5 * t) + 0.1 * np.sin(2 * np.pi * pitch * 2 * t) | |
# Normalize | |
audio = audio / np.max(np.abs(audio)) | |
# Convert to tensor | |
audio_tensor = torch.tensor(audio, dtype=torch.float32) | |
# Create voice memory | |
memory = VoiceMemory( | |
name=voice_name, | |
speaker_id=speaker_id, | |
audio_segments=[audio_tensor], | |
text_segments=[f"This is the voice of {voice_name}"], | |
pitch_base=characteristics["pitch"], | |
timbre=characteristics["timbre"] | |
) | |
# Save the voice memory to persistent storage | |
memory.save() | |
# Store in dictionary | |
VOICE_MEMORIES[voice_name] = memory | |
# Save as wav for reference | |
save_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_seed.wav") | |
torchaudio.save(save_path, audio_tensor.unsqueeze(0), sample_rate) | |
logger.info(f"Initialized new voice memory for {voice_name}") | |
def get_voice_context(voice_name: str, device: torch.device, max_segments: int = 2) -> List[Segment]: | |
"""Get context segments for a given voice.""" | |
if not VOICE_MEMORIES: | |
initialize_voices() | |
if voice_name in VOICE_MEMORIES: | |
return VOICE_MEMORIES[voice_name].get_context_segments(device, max_segments=max_segments) | |
# Default to alloy if voice not found | |
logger.warning(f"Voice {voice_name} not found, defaulting to alloy") | |
return VOICE_MEMORIES["alloy"].get_context_segments(device, max_segments=max_segments) | |
def update_voice_memory(voice_name: str, audio: torch.Tensor, text: str): | |
"""Update voice memory with newly generated audio and save to persistent storage.""" | |
if not VOICE_MEMORIES: | |
initialize_voices() | |
if voice_name in VOICE_MEMORIES: | |
VOICE_MEMORIES[voice_name].update_with_new_audio(audio, text) | |
VOICE_MEMORIES[voice_name].save() | |
logger.info(f"Updated voice memory for {voice_name}, now has {len(VOICE_MEMORIES[voice_name].audio_segments)} segments") | |
def generate_voice_samples(app_state): | |
"""Generate high-quality voice samples for each voice. | |
Args: | |
app_state: The FastAPI app state containing the generator | |
""" | |
generator = app_state.generator | |
if not generator: | |
logger.error("Cannot generate voice samples: generator not available") | |
return | |
logger.info("Beginning voice sample generation...") | |
# Ensure persistent directory exists | |
os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True) | |
for voice_name in ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]: | |
speaker_id = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"].index(voice_name) | |
# Get multiple sample texts for this voice | |
sample_texts = VOICE_INTROS[voice_name] | |
# Generate a collection of samples for this voice | |
logger.info(f"Generating samples for voice: {voice_name}") | |
audio_segments = [] | |
text_segments = [] | |
for i, sample_text in enumerate(sample_texts): | |
try: | |
# Check if we already have a sample | |
sample_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_sample_{i}.wav") | |
if os.path.exists(sample_path): | |
logger.info(f"Found existing sample {i+1} for {voice_name}, loading from {sample_path}") | |
audio_tensor, sr = torchaudio.load(sample_path) | |
if sr != generator.sample_rate: | |
audio_tensor = torchaudio.functional.resample( | |
audio_tensor.squeeze(0), orig_freq=sr, new_freq=generator.sample_rate | |
) | |
else: | |
audio_tensor = audio_tensor.squeeze(0) | |
audio_segments.append(audio_tensor) | |
text_segments.append(sample_text) | |
continue | |
# Generate without context first for seed samples | |
logger.info(f"Generating sample {i+1}/{len(sample_texts)} for {voice_name}: '{sample_text}'") | |
# Use a lower temperature for more stable output | |
audio = generator.generate( | |
text=sample_text, | |
speaker=speaker_id, | |
context=[], # No context for initial samples | |
max_audio_length_ms=10000, | |
temperature=0.7, # Lower temperature for more stable output | |
topk=30, | |
) | |
# Save this segment | |
audio_segments.append(audio.detach().cpu()) | |
text_segments.append(sample_text) | |
# Save as WAV for reference to persistent storage | |
torchaudio.save(sample_path, audio.unsqueeze(0).cpu(), generator.sample_rate) | |
logger.info(f"Generated sample {i+1} for {voice_name}, length: {audio.shape[0]/generator.sample_rate:.2f}s") | |
except Exception as e: | |
logger.error(f"Error generating sample {i+1} for {voice_name}: {e}") | |
# Use the generated samples to update the voice memory | |
if voice_name in VOICE_MEMORIES and audio_segments: | |
# Replace existing samples with these high quality ones | |
VOICE_MEMORIES[voice_name].audio_segments = audio_segments | |
VOICE_MEMORIES[voice_name].text_segments = text_segments | |
VOICE_MEMORIES[voice_name].save() | |
logger.info(f"Updated voice memory for {voice_name} with {len(audio_segments)} high-quality samples") | |
# Now generate a second pass with context from these samples | |
if len(audio_segments) >= 2: | |
try: | |
# Check if we already have a character sample | |
character_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_character.wav") | |
if os.path.exists(character_path): | |
logger.info(f"Found existing character sample for {voice_name}, loading from {character_path}") | |
audio_tensor, sr = torchaudio.load(character_path) | |
if sr != generator.sample_rate: | |
audio_tensor = torchaudio.functional.resample( | |
audio_tensor.squeeze(0), orig_freq=sr, new_freq=generator.sample_rate | |
) | |
else: | |
audio_tensor = audio_tensor.squeeze(0) | |
character_sample_text = f"I'm the voice assistant known as {voice_name}. I'm designed to have a distinctive voice that you can easily recognize." | |
VOICE_MEMORIES[voice_name].audio_segments.append(audio_tensor) | |
VOICE_MEMORIES[voice_name].text_segments.append(character_sample_text) | |
VOICE_MEMORIES[voice_name].save() | |
continue | |
# Get intro and conclusion prompts that build voice consistency | |
context = [ | |
Segment( | |
speaker=speaker_id, | |
text=text_segments[0], | |
audio=audio_segments[0].to(generator.device) | |
) | |
] | |
# Create a longer sample with the voice characteristics now established | |
character_sample_text = f"I'm the voice assistant known as {voice_name}. I'm designed to have a distinctive voice that you can easily recognize. My speech patterns and tone should remain consistent throughout our conversation." | |
logger.info(f"Generating character sample for {voice_name} with context") | |
character_audio = generator.generate( | |
text=character_sample_text, | |
speaker=speaker_id, | |
context=context, | |
max_audio_length_ms=15000, | |
temperature=0.7, | |
topk=30, | |
) | |
# Save this comprehensive character sample to persistent storage | |
torchaudio.save(character_path, character_audio.unsqueeze(0).cpu(), generator.sample_rate) | |
# Add this to the memory as well | |
VOICE_MEMORIES[voice_name].audio_segments.append(character_audio.detach().cpu()) | |
VOICE_MEMORIES[voice_name].text_segments.append(character_sample_text) | |
VOICE_MEMORIES[voice_name].save() | |
logger.info(f"Generated character sample for {voice_name}, length: {character_audio.shape[0]/generator.sample_rate:.2f}s") | |
except Exception as e: | |
logger.error(f"Error generating character sample for {voice_name}: {e}") | |
def create_custom_voice( | |
app_state, | |
name: str, | |
initial_text: str, | |
speaker_id: int = 0, | |
pitch: Optional[float] = None, | |
timbre: str = "custom" | |
) -> Dict: | |
"""Create a new custom voice. | |
Args: | |
app_state: The FastAPI app state containing the generator | |
name: Name for the new voice | |
initial_text: Text for the initial voice sample | |
speaker_id: Base speaker ID (0-5) | |
pitch: Base pitch in Hz (optional) | |
timbre: Voice quality descriptor | |
Returns: | |
Dict with creation status and voice info | |
""" | |
generator = app_state.generator | |
if not generator: | |
return {"status": "error", "message": "Generator not available"} | |
# Check if voice already exists | |
if not VOICE_MEMORIES: | |
initialize_voices() | |
if name in VOICE_MEMORIES: | |
return {"status": "error", "message": f"Voice '{name}' already exists"} | |
# Generate a voice sample | |
try: | |
logger.info(f"Creating custom voice '{name}' with text: '{initial_text}'") | |
audio = generator.generate( | |
text=initial_text, | |
speaker=speaker_id, | |
context=[], | |
max_audio_length_ms=10000, | |
temperature=0.7, | |
) | |
# Determine base pitch if not provided | |
if pitch is None: | |
if speaker_id == 0: # alloy | |
pitch = 220.0 | |
elif speaker_id == 1: # echo | |
pitch = 330.0 | |
elif speaker_id == 2: # fable | |
pitch = 523.0 | |
elif speaker_id == 3: # onyx | |
pitch = 165.0 | |
elif speaker_id == 4: # nova | |
pitch = 392.0 | |
else: # shimmer | |
pitch = 587.0 | |
# Create a new voice memory | |
memory = VoiceMemory( | |
name=name, | |
speaker_id=speaker_id, | |
audio_segments=[audio.detach().cpu()], | |
text_segments=[initial_text], | |
pitch_base=pitch, | |
timbre=timbre | |
) | |
# Save the voice memory to persistent storage | |
memory.save() | |
VOICE_MEMORIES[name] = memory | |
# Save sample as WAV for reference to persistent storage | |
sample_path = os.path.join(VOICE_MEMORIES_DIR, f"{name}_sample.wav") | |
torchaudio.save(sample_path, audio.unsqueeze(0).cpu(), generator.sample_rate) | |
logger.info(f"Created custom voice '{name}' successfully") | |
return { | |
"status": "success", | |
"message": f"Voice '{name}' created successfully", | |
"voice": { | |
"name": name, | |
"speaker_id": speaker_id, | |
"pitch": pitch, | |
"timbre": timbre, | |
"sample_length_seconds": audio.shape[0] / generator.sample_rate | |
} | |
} | |
except Exception as e: | |
logger.error(f"Error creating custom voice '{name}': {e}") | |
return { | |
"status": "error", | |
"message": f"Error creating voice: {str(e)}" | |
} |