File size: 20,444 Bytes
01115c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
"""Advanced voice memory system for consistent voice generation."""
import os
import torch
import torchaudio
import numpy as np
import random
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from app.models import Segment

# Setup logging
logger = logging.getLogger(__name__)

# Path to store voice memories - use persistent location
VOICE_MEMORIES_DIR = "/app/voice_memories"
os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True)

@dataclass
class VoiceMemory:
    """Store voice characteristics for consistent generation."""
    name: str  # Voice name (alloy, echo, etc.)
    speaker_id: int  # Speaker ID (0-5)
    # Store multiple audio segments for context
    audio_segments: List[torch.Tensor]  
    # Store text prompts that produced good results
    text_segments: List[str]
    # Base characteristics for this voice
    pitch_base: float  # Base pitch characteristic (Hz)
    timbre: str  # Voice quality descriptor
    
    def get_context_segments(self, device: torch.device, max_segments: int = 2) -> List[Segment]:
        """Get context segments for this voice."""
        if not self.audio_segments:
            return []
            
        # Select a limited number of segments to avoid context overflow
        num_segments = min(len(self.audio_segments), max_segments)
        indices = list(range(len(self.audio_segments)))
        random.shuffle(indices)
        selected_indices = indices[:num_segments]
        
        segments = []
        for i in selected_indices:
            segments.append(
                Segment(
                    speaker=self.speaker_id,
                    text=self.text_segments[i] if i < len(self.text_segments) else f"Voice sample {i}",
                    audio=self.audio_segments[i].to(device)
                )
            )
            
        return segments
    
    def update_with_new_audio(self, audio: torch.Tensor, text: str, max_stored: int = 5):
        """Update voice memory with newly generated audio."""
        # Add new audio and text
        self.audio_segments.append(audio.detach().cpu())
        self.text_segments.append(text)
        
        # Keep only the most recent segments
        if len(self.audio_segments) > max_stored:
            self.audio_segments = self.audio_segments[-max_stored:]
            self.text_segments = self.text_segments[-max_stored:]
            
    def save(self):
        """Save voice memory to persistent storage."""
        data = {
            "name": self.name,
            "speaker_id": self.speaker_id,
            "audio_segments": self.audio_segments,
            "text_segments": self.text_segments,
            "pitch_base": self.pitch_base,
            "timbre": self.timbre
        }
        
        # Save to the persistent directory
        save_path = os.path.join(VOICE_MEMORIES_DIR, f"{self.name}.pt")
        try:
            torch.save(data, save_path)
            logger.info(f"Saved voice memory for {self.name} to {save_path}")
        except Exception as e:
            logger.error(f"Error saving voice memory for {self.name}: {e}")
        
    @classmethod
    def load(cls, name: str) -> Optional['VoiceMemory']:
        """Load voice memory from persistent storage."""
        path = os.path.join(VOICE_MEMORIES_DIR, f"{name}.pt")
        if not os.path.exists(path):
            logger.info(f"No saved voice memory found for {name} at {path}")
            return None
            
        try:
            data = torch.load(path)
            return cls(
                name=data["name"],
                speaker_id=data["speaker_id"],
                audio_segments=data["audio_segments"],
                text_segments=data["text_segments"],
                pitch_base=data["pitch_base"],
                timbre=data["timbre"]
            )
        except Exception as e:
            logger.error(f"Error loading voice memory for {name}: {e}")
            return None

# Dictionary of voice memories
VOICE_MEMORIES: Dict[str, VoiceMemory] = {}

# Voice characteristics
VOICE_CHARACTERISTICS = {
    "alloy": {"pitch": 220.0, "timbre": "balanced", "description": "A balanced, natural voice with medium pitch"},
    "echo": {"pitch": 330.0, "timbre": "resonant", "description": "A resonant voice with a reverberant quality"},
    "fable": {"pitch": 523.0, "timbre": "bright", "description": "A bright, higher-pitched voice with clear articulation"},
    "onyx": {"pitch": 165.0, "timbre": "deep", "description": "A deep, authoritative voice with lower pitch"},
    "nova": {"pitch": 392.0, "timbre": "warm", "description": "A warm, smooth voice with pleasant midrange tone"},
    "shimmer": {"pitch": 587.0, "timbre": "light", "description": "A light, airy voice with higher frequencies"}
}

# Voice intro texts - carefully crafted to capture voice characteristics
VOICE_INTROS = {
    "alloy": [
        "Hello, I'm Alloy. My voice is designed to be clear and balanced.",
        "This is the Alloy voice. I aim to sound natural and easy to understand.",
        "Welcome, I'm the voice known as Alloy. I have a balanced, medium-range tone."
    ],
    "echo": [
        "Hello, I'm Echo. My voice has a rich, resonant quality.",
        "This is the Echo voice. Notice my distinctive resonance and depth.",
        "Welcome, I'm the voice known as Echo. My tone is designed to resonate clearly."
    ],
    "fable": [
        "Hello, I'm Fable. My voice is bright and articulate.",
        "This is the Fable voice. I have a higher pitch with clear pronunciation.",
        "Welcome, I'm the voice known as Fable. I speak with a bright, energetic tone."
    ],
    "onyx": [
        "Hello, I'm Onyx. My voice is deep and authoritative.",
        "This is the Onyx voice. I speak with a lower pitch and commanding presence.",
        "Welcome, I'm the voice known as Onyx. My tone is deep and resonant."
    ],
    "nova": [
        "Hello, I'm Nova. My voice is warm and harmonious.",
        "This is the Nova voice. I have a smooth, pleasant mid-range quality.",
        "Welcome, I'm the voice known as Nova. I speak with a warm, friendly tone."
    ],
    "shimmer": [
        "Hello, I'm Shimmer. My voice is light and expressive.",
        "This is the Shimmer voice. I have a higher-pitched, airy quality.",
        "Welcome, I'm the voice known as Shimmer. My tone is bright and crisp."
    ]
}

def initialize_voices(sample_rate: int = 24000):
    """Initialize voice memories with consistent base samples."""
    global VOICE_MEMORIES
    
    # Check if persistent directory exists, create if needed
    os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True)
    logger.info(f"Using voice memories directory: {VOICE_MEMORIES_DIR}")
    
    # First try to load existing memories from persistent storage
    for voice_name in ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]:
        memory = VoiceMemory.load(voice_name)
        if memory:
            VOICE_MEMORIES[voice_name] = memory
            logger.info(f"Loaded existing voice memory for {voice_name} with {len(memory.audio_segments)} segments")
            continue
            
        # If no memory exists, create a new one
        speaker_id = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"].index(voice_name)
        characteristics = VOICE_CHARACTERISTICS[voice_name]
        
        # Create deterministic seed audio
        np.random.seed(speaker_id + 42)
        duration = 1.0  # seconds
        t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False)
        
        # Create characteristic waveform
        pitch = characteristics["pitch"]
        if voice_name == "alloy":
            audio = 0.5 * np.sin(2 * np.pi * pitch * t) + 0.3 * np.sin(2 * np.pi * pitch * 2 * t)
        elif voice_name == "echo":
            audio = np.sin(2 * np.pi * pitch * t) * np.exp(-t * 3)
        elif voice_name == "fable":
            audio = 0.7 * np.sin(2 * np.pi * pitch * t)
        elif voice_name == "onyx":
            audio = 0.8 * np.sin(2 * np.pi * pitch * t) + 0.1 * np.sin(2 * np.pi * pitch * 0.5 * t)
        elif voice_name == "nova":
            audio = 0.4 * np.sin(2 * np.pi * pitch * t) + 0.4 * np.sin(2 * np.pi * pitch * 0.5 * t)
        else:  # shimmer
            audio = 0.3 * np.sin(2 * np.pi * pitch * t) + 0.2 * np.sin(2 * np.pi * pitch * 1.5 * t) + 0.1 * np.sin(2 * np.pi * pitch * 2 * t)
            
        # Normalize
        audio = audio / np.max(np.abs(audio))
        
        # Convert to tensor
        audio_tensor = torch.tensor(audio, dtype=torch.float32)
        
        # Create voice memory
        memory = VoiceMemory(
            name=voice_name,
            speaker_id=speaker_id,
            audio_segments=[audio_tensor],
            text_segments=[f"This is the voice of {voice_name}"],
            pitch_base=characteristics["pitch"],
            timbre=characteristics["timbre"]
        )
        
        # Save the voice memory to persistent storage
        memory.save()
        
        # Store in dictionary
        VOICE_MEMORIES[voice_name] = memory
        
        # Save as wav for reference
        save_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_seed.wav")
        torchaudio.save(save_path, audio_tensor.unsqueeze(0), sample_rate)
        
        logger.info(f"Initialized new voice memory for {voice_name}")

def get_voice_context(voice_name: str, device: torch.device, max_segments: int = 2) -> List[Segment]:
    """Get context segments for a given voice."""
    if not VOICE_MEMORIES:
        initialize_voices()
        
    if voice_name in VOICE_MEMORIES:
        return VOICE_MEMORIES[voice_name].get_context_segments(device, max_segments=max_segments)
        
    # Default to alloy if voice not found
    logger.warning(f"Voice {voice_name} not found, defaulting to alloy")
    return VOICE_MEMORIES["alloy"].get_context_segments(device, max_segments=max_segments)

def update_voice_memory(voice_name: str, audio: torch.Tensor, text: str):
    """Update voice memory with newly generated audio and save to persistent storage."""
    if not VOICE_MEMORIES:
        initialize_voices()
        
    if voice_name in VOICE_MEMORIES:
        VOICE_MEMORIES[voice_name].update_with_new_audio(audio, text)
        VOICE_MEMORIES[voice_name].save()
        logger.info(f"Updated voice memory for {voice_name}, now has {len(VOICE_MEMORIES[voice_name].audio_segments)} segments")

def generate_voice_samples(app_state):
    """Generate high-quality voice samples for each voice.
    
    Args:
        app_state: The FastAPI app state containing the generator
    """
    generator = app_state.generator
    if not generator:
        logger.error("Cannot generate voice samples: generator not available")
        return
    
    logger.info("Beginning voice sample generation...")
    
    # Ensure persistent directory exists
    os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True)
    
    for voice_name in ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]:
        speaker_id = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"].index(voice_name)
        
        # Get multiple sample texts for this voice
        sample_texts = VOICE_INTROS[voice_name]
        
        # Generate a collection of samples for this voice
        logger.info(f"Generating samples for voice: {voice_name}")
        audio_segments = []
        text_segments = []
        
        for i, sample_text in enumerate(sample_texts):
            try:
                # Check if we already have a sample
                sample_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_sample_{i}.wav")
                if os.path.exists(sample_path):
                    logger.info(f"Found existing sample {i+1} for {voice_name}, loading from {sample_path}")
                    audio_tensor, sr = torchaudio.load(sample_path)
                    if sr != generator.sample_rate:
                        audio_tensor = torchaudio.functional.resample(
                            audio_tensor.squeeze(0), orig_freq=sr, new_freq=generator.sample_rate
                        )
                    else:
                        audio_tensor = audio_tensor.squeeze(0)
                    audio_segments.append(audio_tensor)
                    text_segments.append(sample_text)
                    continue
                    
                # Generate without context first for seed samples
                logger.info(f"Generating sample {i+1}/{len(sample_texts)} for {voice_name}: '{sample_text}'")
                
                # Use a lower temperature for more stable output
                audio = generator.generate(
                    text=sample_text,
                    speaker=speaker_id,
                    context=[],  # No context for initial samples
                    max_audio_length_ms=10000,
                    temperature=0.7,  # Lower temperature for more stable output
                    topk=30,
                )
                
                # Save this segment
                audio_segments.append(audio.detach().cpu())
                text_segments.append(sample_text)
                
                # Save as WAV for reference to persistent storage
                torchaudio.save(sample_path, audio.unsqueeze(0).cpu(), generator.sample_rate)
                
                logger.info(f"Generated sample {i+1} for {voice_name}, length: {audio.shape[0]/generator.sample_rate:.2f}s")
            
            except Exception as e:
                logger.error(f"Error generating sample {i+1} for {voice_name}: {e}")
        
        # Use the generated samples to update the voice memory
        if voice_name in VOICE_MEMORIES and audio_segments:
            # Replace existing samples with these high quality ones
            VOICE_MEMORIES[voice_name].audio_segments = audio_segments
            VOICE_MEMORIES[voice_name].text_segments = text_segments
            VOICE_MEMORIES[voice_name].save()
            
            logger.info(f"Updated voice memory for {voice_name} with {len(audio_segments)} high-quality samples")
        
        # Now generate a second pass with context from these samples
        if len(audio_segments) >= 2:
            try:
                # Check if we already have a character sample
                character_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_character.wav")
                if os.path.exists(character_path):
                    logger.info(f"Found existing character sample for {voice_name}, loading from {character_path}")
                    audio_tensor, sr = torchaudio.load(character_path)
                    if sr != generator.sample_rate:
                        audio_tensor = torchaudio.functional.resample(
                            audio_tensor.squeeze(0), orig_freq=sr, new_freq=generator.sample_rate
                        )
                    else:
                        audio_tensor = audio_tensor.squeeze(0)
                
                    character_sample_text = f"I'm the voice assistant known as {voice_name}. I'm designed to have a distinctive voice that you can easily recognize."
                    VOICE_MEMORIES[voice_name].audio_segments.append(audio_tensor)
                    VOICE_MEMORIES[voice_name].text_segments.append(character_sample_text)
                    VOICE_MEMORIES[voice_name].save()
                    continue
                
                # Get intro and conclusion prompts that build voice consistency
                context = [
                    Segment(
                        speaker=speaker_id,
                        text=text_segments[0],
                        audio=audio_segments[0].to(generator.device)
                    )
                ]
                
                # Create a longer sample with the voice characteristics now established
                character_sample_text = f"I'm the voice assistant known as {voice_name}. I'm designed to have a distinctive voice that you can easily recognize. My speech patterns and tone should remain consistent throughout our conversation."
                
                logger.info(f"Generating character sample for {voice_name} with context")
                character_audio = generator.generate(
                    text=character_sample_text,
                    speaker=speaker_id,
                    context=context,
                    max_audio_length_ms=15000,
                    temperature=0.7, 
                    topk=30,
                )
                
                # Save this comprehensive character sample to persistent storage
                torchaudio.save(character_path, character_audio.unsqueeze(0).cpu(), generator.sample_rate)
                
                # Add this to the memory as well
                VOICE_MEMORIES[voice_name].audio_segments.append(character_audio.detach().cpu()) 
                VOICE_MEMORIES[voice_name].text_segments.append(character_sample_text)
                VOICE_MEMORIES[voice_name].save()
                
                logger.info(f"Generated character sample for {voice_name}, length: {character_audio.shape[0]/generator.sample_rate:.2f}s")
                
            except Exception as e:
                logger.error(f"Error generating character sample for {voice_name}: {e}")

def create_custom_voice(
    app_state,
    name: str, 
    initial_text: str, 
    speaker_id: int = 0,
    pitch: Optional[float] = None,
    timbre: str = "custom"
) -> Dict:
    """Create a new custom voice.
    
    Args:
        app_state: The FastAPI app state containing the generator
        name: Name for the new voice
        initial_text: Text for the initial voice sample
        speaker_id: Base speaker ID (0-5)
        pitch: Base pitch in Hz (optional)
        timbre: Voice quality descriptor
        
    Returns:
        Dict with creation status and voice info
    """
    generator = app_state.generator
    if not generator:
        return {"status": "error", "message": "Generator not available"}
    
    # Check if voice already exists
    if not VOICE_MEMORIES:
        initialize_voices()
        
    if name in VOICE_MEMORIES:
        return {"status": "error", "message": f"Voice '{name}' already exists"}
    
    # Generate a voice sample
    try:
        logger.info(f"Creating custom voice '{name}' with text: '{initial_text}'")
        
        audio = generator.generate(
            text=initial_text,
            speaker=speaker_id,
            context=[],
            max_audio_length_ms=10000,
            temperature=0.7,
        )
        
        # Determine base pitch if not provided
        if pitch is None:
            if speaker_id == 0:  # alloy
                pitch = 220.0
            elif speaker_id == 1:  # echo
                pitch = 330.0
            elif speaker_id == 2:  # fable
                pitch = 523.0
            elif speaker_id == 3:  # onyx
                pitch = 165.0
            elif speaker_id == 4:  # nova
                pitch = 392.0
            else:  # shimmer
                pitch = 587.0
        
        # Create a new voice memory
        memory = VoiceMemory(
            name=name,
            speaker_id=speaker_id,
            audio_segments=[audio.detach().cpu()],
            text_segments=[initial_text],
            pitch_base=pitch,
            timbre=timbre
        )
        
        # Save the voice memory to persistent storage
        memory.save()
        VOICE_MEMORIES[name] = memory
        
        # Save sample as WAV for reference to persistent storage
        sample_path = os.path.join(VOICE_MEMORIES_DIR, f"{name}_sample.wav")
        torchaudio.save(sample_path, audio.unsqueeze(0).cpu(), generator.sample_rate)
        
        logger.info(f"Created custom voice '{name}' successfully")
        
        return {
            "status": "success",
            "message": f"Voice '{name}' created successfully",
            "voice": {
                "name": name,
                "speaker_id": speaker_id,
                "pitch": pitch,
                "timbre": timbre,
                "sample_length_seconds": audio.shape[0] / generator.sample_rate
            }
        }
    
    except Exception as e:
        logger.error(f"Error creating custom voice '{name}': {e}")
        return {
            "status": "error",
            "message": f"Error creating voice: {str(e)}"
        }