Spaces:
Sleeping
Sleeping
import base64 | |
import tempfile | |
import os | |
import requests | |
import gradio as gr | |
import random | |
import time | |
from openai import OpenAI | |
from requests.exceptions import RequestException, Timeout, ConnectionError | |
# Available voices for audio generation | |
VOICES = ["alloy", "ash", "ballad", "coral", "echo", "fable", "onyx", "nova", "sage", "shimmer", "verse"] | |
# Example audio URLs | |
EXAMPLE_AUDIO_URLS = [ | |
"https://cdn.openai.com/API/docs/audio/alloy.wav", | |
"https://cdn.openai.com/API/docs/audio/ash.wav", | |
"https://cdn.openai.com/API/docs/audio/coral.wav", | |
"https://cdn.openai.com/API/docs/audio/echo.wav", | |
"https://cdn.openai.com/API/docs/audio/fable.wav", | |
"https://cdn.openai.com/API/docs/audio/onyx.wav", | |
"https://cdn.openai.com/API/docs/audio/nova.wav", | |
"https://cdn.openai.com/API/docs/audio/sage.wav", | |
"https://cdn.openai.com/API/docs/audio/shimmer.wav" | |
] | |
# Supported languages for translation | |
SUPPORTED_LANGUAGES = [ | |
"Afrikaans", "Arabic", "Armenian", "Azerbaijani", "Belarusian", "Bosnian", | |
"Bulgarian", "Catalan", "Chinese", "Croatian", "Czech", "Danish", "Dutch", | |
"English", "Estonian", "Finnish", "French", "Galician", "German", "Greek", | |
"Hebrew", "Hindi", "Hungarian", "Icelandic", "Indonesian", "Italian", "Japanese", | |
"Kannada", "Kazakh", "Korean", "Latvian", "Lithuanian", "Macedonian", "Malay", | |
"Marathi", "Maori", "Nepali", "Norwegian", "Persian", "Polish", "Portuguese", | |
"Romanian", "Russian", "Serbian", "Slovak", "Slovenian", "Spanish", "Swahili", | |
"Swedish", "Tagalog", "Tamil", "Thai", "Turkish", "Ukrainian", "Urdu", | |
"Vietnamese", "Welsh" | |
] | |
# Max retries for API calls | |
MAX_RETRIES = 3 | |
RETRY_DELAY = 2 # seconds | |
def create_openai_client(api_key): | |
"""Create an OpenAI client with proper timeout settings""" | |
return OpenAI( | |
api_key=api_key, | |
timeout=60.0, # 60 second timeout | |
max_retries=3 # Allow 3 retries | |
) | |
def process_text_input(api_key, text_prompt, selected_voice): | |
"""Generate audio response from text input""" | |
try: | |
# Initialize OpenAI client with the provided API key | |
client = create_openai_client(api_key) | |
completion = client.chat.completions.create( | |
model="gpt-4o-audio-preview", | |
modalities=["text", "audio"], | |
audio={"voice": selected_voice, "format": "wav"}, | |
messages=[ | |
{ | |
"role": "user", | |
"content": text_prompt | |
} | |
] | |
) | |
# Save the audio to a temporary file | |
wav_bytes = base64.b64decode(completion.choices[0].message.audio.data) | |
temp_path = tempfile.mktemp(suffix=".wav") | |
with open(temp_path, "wb") as f: | |
f.write(wav_bytes) | |
# Get the text response directly from the API | |
text_response = completion.choices[0].message.content | |
return text_response, temp_path | |
except ConnectionError as e: | |
return f"Connection error: {str(e)}. Please check your internet connection and try again.", None | |
except Timeout as e: | |
return f"Timeout error: {str(e)}. The request took too long to complete. Please try again.", None | |
except Exception as e: | |
return f"Error: {str(e)}", None | |
def process_audio_input(api_key, audio_path, text_prompt, selected_voice): | |
"""Process audio input and generate a response""" | |
try: | |
if not audio_path: | |
return "Please upload or record audio first.", None | |
# Initialize OpenAI client with the provided API key | |
client = create_openai_client(api_key) | |
# Read audio file and encode to base64 | |
with open(audio_path, "rb") as audio_file: | |
audio_data = audio_file.read() | |
encoded_audio = base64.b64encode(audio_data).decode('utf-8') | |
# Create message content with both text and audio | |
message_content = [] | |
if text_prompt: | |
message_content.append({ | |
"type": "text", | |
"text": text_prompt | |
}) | |
message_content.append({ | |
"type": "input_audio", | |
"input_audio": { | |
"data": encoded_audio, | |
"format": "wav" | |
} | |
}) | |
# Call OpenAI API | |
completion = client.chat.completions.create( | |
model="gpt-4o-audio-preview", | |
modalities=["text", "audio"], | |
audio={"voice": selected_voice, "format": "wav"}, | |
messages=[ | |
{ | |
"role": "user", | |
"content": message_content | |
} | |
] | |
) | |
# Save the audio response | |
wav_bytes = base64.b64decode(completion.choices[0].message.audio.data) | |
temp_path = tempfile.mktemp(suffix=".wav") | |
with open(temp_path, "wb") as f: | |
f.write(wav_bytes) | |
# Get the text response | |
text_response = completion.choices[0].message.content | |
return text_response, temp_path | |
except ConnectionError as e: | |
return f"Connection error: {str(e)}. Please check your internet connection and try again.", None | |
except Timeout as e: | |
return f"Timeout error: {str(e)}. The request took too long to complete. Please try again.", None | |
except Exception as e: | |
return f"Error: {str(e)}", None | |
def transcribe_audio(api_key, audio_path): | |
"""Transcribe an audio file using OpenAI's API""" | |
try: | |
if not audio_path: | |
return "No audio file provided for transcription." | |
client = create_openai_client(api_key) | |
# Make sure the file exists and is readable | |
if not os.path.exists(audio_path): | |
return "Audio file not found or inaccessible." | |
# Check file size | |
file_size = os.path.getsize(audio_path) | |
if file_size == 0: | |
return "Audio file is empty." | |
with open(audio_path, "rb") as audio_file: | |
for attempt in range(MAX_RETRIES): | |
try: | |
transcription = client.audio.transcriptions.create( | |
model="gpt-4o-transcribe", | |
file=audio_file | |
) | |
return transcription.text | |
except (ConnectionError, Timeout) as e: | |
if attempt < MAX_RETRIES - 1: | |
time.sleep(RETRY_DELAY) | |
# Reset file pointer | |
audio_file.seek(0) | |
continue | |
else: | |
return f"Transcription failed after {MAX_RETRIES} attempts: {str(e)}" | |
except Exception as e: | |
return f"Transcription error: {str(e)}" | |
except Exception as e: | |
return f"Transcription error: {str(e)}" | |
def translate_audio(api_key, audio_path): | |
"""Translate audio to English using OpenAI's Whisper model with improved error handling""" | |
try: | |
if not audio_path: | |
return "No audio file provided for translation." | |
# Verify file exists and is accessible | |
if not os.path.exists(audio_path): | |
return "Audio file not found or inaccessible." | |
# Check file size | |
file_size = os.path.getsize(audio_path) | |
if file_size == 0: | |
return "Audio file is empty." | |
client = create_openai_client(api_key) | |
# Implement retry mechanism | |
for attempt in range(MAX_RETRIES): | |
try: | |
with open(audio_path, "rb") as audio_file: | |
translation = client.audio.translations.create( | |
model="whisper-1", | |
file=audio_file, | |
timeout=90.0 # Extended timeout for translation | |
) | |
return translation.text | |
except (ConnectionError, Timeout) as e: | |
if attempt < MAX_RETRIES - 1: | |
# Wait before retrying | |
time.sleep(RETRY_DELAY * (attempt + 1)) # Exponential backoff | |
continue | |
else: | |
return f"Translation failed after {MAX_RETRIES} attempts: Connection error. Please check your internet connection and try again." | |
except Exception as e: | |
# Handle other exceptions | |
error_message = str(e) | |
if "connection" in error_message.lower(): | |
return f"Connection error: {error_message}. Please check your internet connection and try again." | |
else: | |
return f"Translation error: {error_message}" | |
except Exception as e: | |
return f"Translation error: {str(e)}" | |
def download_example_audio(): | |
"""Download a random example audio file for testing with improved error handling""" | |
try: | |
# Randomly select one of the example audio URLs | |
url = random.choice(EXAMPLE_AUDIO_URLS) | |
# Get the voice name from the URL for feedback | |
voice_name = url.split('/')[-1].split('.')[0] | |
# Implement retry mechanism | |
for attempt in range(MAX_RETRIES): | |
try: | |
response = requests.get(url, timeout=30) | |
response.raise_for_status() | |
# Save to a temporary file | |
temp_path = tempfile.mktemp(suffix=".wav") | |
with open(temp_path, "wb") as f: | |
f.write(response.content) | |
return temp_path, f"Loaded example voice: {voice_name}" | |
except (ConnectionError, Timeout) as e: | |
if attempt < MAX_RETRIES - 1: | |
time.sleep(RETRY_DELAY) | |
continue | |
else: | |
return None, f"Failed to download example after {MAX_RETRIES} attempts: {str(e)}" | |
except Exception as e: | |
return None, f"Error loading example: {str(e)}" | |
except Exception as e: | |
return None, f"Error loading example: {str(e)}" | |
def use_example_audio(): | |
"""Load random example audio for the interface""" | |
audio_path, message = download_example_audio() | |
return audio_path, message | |
def check_api_key(api_key): | |
"""Validate if the API key is provided""" | |
if not api_key or api_key.strip() == "": | |
return False | |
return True | |
# Create Gradio Interface | |
with gr.Blocks(title="OpenAI Audio Chat App") as app: | |
gr.Markdown("# OpenAI Audio Chat App") | |
gr.Markdown("Interact with GPT-4o audio model through text and audio inputs") | |
# API Key input (used across all tabs) | |
api_key = gr.Textbox( | |
label="OpenAI API Key", | |
placeholder="Enter your OpenAI API key here", | |
type="password" | |
) | |
with gr.Tab("Text to Audio"): | |
with gr.Row(): | |
with gr.Column(): | |
text_input = gr.Textbox( | |
label="Text Prompt", | |
placeholder="Enter your question or prompt here...", | |
lines=3 | |
) | |
text_voice = gr.Dropdown( | |
choices=VOICES, | |
value="alloy", | |
label="Voice" | |
) | |
text_submit = gr.Button("Generate Response") | |
with gr.Column(): | |
text_output = gr.Textbox(label="AI Response (Checks Error)", lines=5) | |
audio_output = gr.Audio(label="AI Response (Audio)") | |
transcribed_output = gr.Textbox(label="Transcription of Audio Response", lines=3) | |
# Function to process text input and then transcribe the resulting audio | |
def text_input_with_transcription(api_key, text_prompt, voice): | |
if not check_api_key(api_key): | |
return "Please enter your OpenAI API key first.", None, "No API key provided." | |
text_response, audio_path = process_text_input(api_key, text_prompt, voice) | |
# Get transcription of the generated audio | |
if audio_path: | |
transcription = transcribe_audio(api_key, audio_path) | |
else: | |
transcription = "No audio generated to transcribe." | |
return text_response, audio_path, transcription | |
text_submit.click( | |
fn=text_input_with_transcription, | |
inputs=[api_key, text_input, text_voice], | |
outputs=[text_output, audio_output, transcribed_output] | |
) | |
with gr.Tab("Audio Input to Audio Response"): | |
with gr.Row(): | |
with gr.Column(): | |
audio_input = gr.Audio( | |
label="Audio Input", | |
type="filepath", | |
sources=["microphone", "upload"] | |
) | |
example_btn = gr.Button("Use Random Example Audio") | |
example_message = gr.Textbox(label="Example Status", interactive=False) | |
accompanying_text = gr.Textbox( | |
label="Accompanying Text (Optional)", | |
placeholder="Add any text context or question about the audio...", | |
lines=2 | |
) | |
audio_voice = gr.Dropdown( | |
choices=VOICES, | |
value="alloy", | |
label="Response Voice" | |
) | |
audio_submit = gr.Button("Process Audio & Generate Response") | |
with gr.Column(): | |
audio_text_output = gr.Textbox(label="AI Response (Checks Error)", lines=5) | |
audio_audio_output = gr.Audio(label="AI Response (Audio)") | |
audio_transcribed_output = gr.Textbox(label="Transcription of Audio Response", lines=3) | |
input_transcription = gr.Textbox(label="Transcription of Input Audio", lines=3) | |
# Function to process audio input, generate response, and provide transcriptions | |
def audio_input_with_transcription(api_key, audio_path, text_prompt, voice): | |
if not check_api_key(api_key): | |
return "Please enter your OpenAI API key first.", None, "No API key provided.", "No API key provided." | |
# First transcribe the input audio | |
input_transcription = "N/A" | |
if audio_path: | |
input_transcription = transcribe_audio(api_key, audio_path) | |
else: | |
return "Please upload or record audio first.", None, "No audio to transcribe.", "No audio provided." | |
# Process the audio input and get response | |
text_response, response_audio_path = process_audio_input(api_key, audio_path, text_prompt, voice) | |
# Transcribe the response audio | |
response_transcription = "No audio generated to transcribe." | |
if response_audio_path: | |
response_transcription = transcribe_audio(api_key, response_audio_path) | |
return text_response, response_audio_path, response_transcription, input_transcription | |
audio_submit.click( | |
fn=audio_input_with_transcription, | |
inputs=[api_key, audio_input, accompanying_text, audio_voice], | |
outputs=[audio_text_output, audio_audio_output, audio_transcribed_output, input_transcription] | |
) | |
example_btn.click( | |
fn=use_example_audio, | |
inputs=[], | |
outputs=[audio_input, example_message] | |
) | |
with gr.Tab("Voice Samples"): | |
gr.Markdown("## Listen to samples of each voice") | |
def generate_voice_sample(api_key, voice_type): | |
if not check_api_key(api_key): | |
return "Please enter your OpenAI API key first.", None, "No API key provided." | |
try: | |
client = create_openai_client(api_key) | |
# Use retry mechanism | |
for attempt in range(MAX_RETRIES): | |
try: | |
completion = client.chat.completions.create( | |
model="gpt-4o-audio-preview", | |
modalities=["text", "audio"], | |
audio={"voice": voice_type, "format": "wav"}, | |
messages=[ | |
{ | |
"role": "user", | |
"content": f"This is a sample of the {voice_type} voice. It has its own unique tone and character." | |
} | |
] | |
) | |
# Save the audio to a temporary file | |
wav_bytes = base64.b64decode(completion.choices[0].message.audio.data) | |
temp_path = tempfile.mktemp(suffix=".wav") | |
with open(temp_path, "wb") as f: | |
f.write(wav_bytes) | |
# Get transcription | |
transcription = transcribe_audio(api_key, temp_path) | |
return f"Sample generated with voice: {voice_type}", temp_path, transcription | |
except (ConnectionError, Timeout) as e: | |
if attempt < MAX_RETRIES - 1: | |
time.sleep(RETRY_DELAY) | |
continue | |
else: | |
return f"Connection error after {MAX_RETRIES} attempts: {str(e)}. Please check your internet connection.", None, "No sample generated." | |
except Exception as e: | |
return f"Error: {str(e)}", None, "No transcription available." | |
with gr.Row(): | |
sample_voice = gr.Dropdown( | |
choices=VOICES, | |
value="alloy", | |
label="Select Voice Sample" | |
) | |
sample_btn = gr.Button("Generate Sample") | |
with gr.Row(): | |
sample_text = gr.Textbox(label="Status") | |
sample_audio = gr.Audio(label="Voice Sample") | |
sample_transcription = gr.Textbox(label="Transcription", lines=3) | |
sample_btn.click( | |
fn=generate_voice_sample, | |
inputs=[api_key, sample_voice], | |
outputs=[sample_text, sample_audio, sample_transcription] | |
) | |
# New tab for audio translation with improved error handling | |
with gr.Tab("Audio Translation"): | |
gr.Markdown("## Translate audio from other languages to English") | |
gr.Markdown("Supports 50+ languages including: Arabic, Chinese, French, German, Japanese, Spanish, and many more.") | |
with gr.Row(): | |
with gr.Column(): | |
translation_audio_input = gr.Audio( | |
label="Audio to Translate", | |
type="filepath", | |
sources=["microphone", "upload"] | |
) | |
translate_btn = gr.Button("Translate to English") | |
connection_status = gr.Textbox(label="Connection Status", value="Ready", interactive=False) | |
with gr.Column(): | |
translation_output = gr.Textbox(label="English Translation", lines=5) | |
original_transcription = gr.Textbox(label="Original Transcription (if available)", lines=5) | |
def translate_audio_input(api_key, audio_path): | |
"""Handle the translation of uploaded audio with better connection handling""" | |
if not check_api_key(api_key): | |
return "Please enter your OpenAI API key first.", "No API key provided.", "No API key provided." | |
try: | |
if not audio_path: | |
return "Please upload or record audio first.", "No audio to translate.", "Connection ready" | |
# Update connection status | |
yield "Processing...", "Preparing audio for translation...", "Connecting to OpenAI API..." | |
# Get the translation | |
translation = translate_audio(api_key, audio_path) | |
# If there's a connection error message in the translation | |
if "connection error" in translation.lower(): | |
yield translation, "Translation failed due to connection issues.", "Connection failed" | |
return | |
# Try to get original transcription (this might be in the original language) | |
try: | |
original = transcribe_audio(api_key, audio_path) | |
if "error" in original.lower(): | |
original = "Could not transcribe original audio due to connection issues." | |
except Exception: | |
original = "Could not transcribe original audio." | |
yield translation, original, "Connection successful" | |
except ConnectionError as e: | |
yield f"Connection error: {str(e)}. Please check your internet connection and try again.", "Translation failed.", "Connection failed" | |
except Timeout as e: | |
yield f"Timeout error: {str(e)}. The request took too long to complete. Please try again.", "Translation timed out.", "Connection timed out" | |
except Exception as e: | |
yield f"Translation error: {str(e)}", "Error occurred during processing.", "Error occurred" | |
translate_btn.click( | |
fn=translate_audio_input, | |
inputs=[api_key, translation_audio_input], | |
outputs=[translation_output, original_transcription, connection_status] | |
) | |
# Show supported languages | |
with gr.Accordion("Supported Languages", open=False): | |
gr.Markdown(", ".join(SUPPORTED_LANGUAGES)) | |
# Connection troubleshooting tips | |
with gr.Accordion("Connection Troubleshooting", open=False): | |
gr.Markdown(""" | |
### If you experience connection errors: | |
1. **Check your internet connection** - Ensure you have a stable internet connection | |
2. **Verify your API key** - Make sure your OpenAI API key is valid and has sufficient credits | |
3. **Try a smaller audio file** - Large audio files may time out during upload | |
4. **Wait and retry** - OpenAI servers might be experiencing high traffic | |
5. **Check file format** - Make sure your audio file is in a supported format (MP3, WAV, etc.) | |
6. **Try on a different network** - Some networks might block API calls to OpenAI | |
The app will automatically retry failed connections up to 3 times. | |
""") | |
gr.Markdown(""" | |
## Notes: | |
- You must provide your OpenAI API key in the field above | |
- The model used is `gpt-4o-audio-preview` for conversation, `gpt-4o-transcribe` for transcriptions, and `whisper-1` for translations | |
- Audio inputs should be in WAV format for chat and any supported format for translation | |
- Available voices: alloy, ash, ballad, coral, echo, fable, onyx, nova, sage, shimmer, and verse | |
- Each audio response is automatically transcribed for verification | |
- The "Use Random Example Audio" button will load a random sample from OpenAI's demo voices | |
- The translation feature supports 50+ languages, translating them to English | |
- If you experience connection errors, the app will automatically retry up to 3 times | |
""") | |
if __name__ == "__main__": | |
app.launch() |