Spaces:
Running
Running
import gradio as gr | |
import warnings | |
import torch | |
import os | |
import whisper | |
import ssl | |
import zipfile | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
import subprocess | |
import tempfile | |
import time | |
ssl._create_default_https_context = ssl._create_unverified_context | |
def process_audio( | |
audio_paths, | |
remove_silence=False, | |
min_silence_len=500, | |
silence_thresh=-50, | |
enable_chunking=False, | |
chunk_duration=600, | |
ffmpeg_path="ffmpeg", | |
model_size="large-v3-turbo", | |
language="de" | |
): | |
try: | |
if not audio_paths: | |
return "No files selected.", "", None | |
# Clean up any existing temp directory at the start | |
temp_dir = "temp_processing" | |
if os.path.exists(temp_dir): | |
for file in os.listdir(temp_dir): | |
file_path = os.path.join(temp_dir, file) | |
try: | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
except Exception as e: | |
print(f"Error cleaning up {file_path}: {e}") | |
try: | |
os.rmdir(temp_dir) | |
except Exception as e: | |
print(f"Error removing temp directory: {e}") | |
# Create fresh temp directory with unique timestamp | |
temp_dir = f"temp_processing_{int(time.time())}" | |
os.makedirs(temp_dir, exist_ok=True) | |
processed_files = [] | |
all_results = [] | |
all_segments = [] | |
all_txt_paths = [] | |
try: | |
# Step 1: Process each audio file | |
for audio_path in audio_paths: | |
if not audio_path: | |
continue | |
current_file = audio_path | |
temp_files = [] | |
# Step 1a: Split audio if chunking is enabled | |
if enable_chunking: | |
base_name = os.path.splitext(os.path.basename(current_file))[0] | |
output_pattern = os.path.join(temp_dir, f"{base_name}_part_%d.mp3") | |
cmd = [ | |
ffmpeg_path, "-i", current_file, | |
"-f", "segment", | |
"-segment_time", str(chunk_duration), | |
"-c:a", "copy", | |
"-segment_start_number", "1", | |
output_pattern | |
] | |
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
chunk_files = sorted([os.path.join(temp_dir, f) for f in os.listdir(temp_dir) | |
if f.startswith(f"{base_name}_part_")]) | |
temp_files.extend(chunk_files) | |
else: | |
temp_files.append(current_file) | |
# Step 1b: Remove silence if requested | |
if remove_silence: | |
silence_removed_files = [] | |
for file in temp_files: | |
audio = AudioSegment.from_file(file) | |
nonsilent = detect_nonsilent( | |
audio, | |
min_silence_len=min_silence_len, | |
silence_thresh=silence_thresh | |
) | |
output = AudioSegment.empty() | |
for start, end in nonsilent: | |
output += audio[start:end] | |
# Save the silence-removed file | |
silence_removed_path = os.path.join(temp_dir, f"silence_removed_{os.path.basename(file)}") | |
output.export(silence_removed_path, format="mp3") | |
silence_removed_files.append(silence_removed_path) | |
processed_files.extend(silence_removed_files) | |
else: | |
processed_files.extend(temp_files) | |
# Step 2: Transcribe all processed files | |
print(f"Loading Whisper model '{model_size}'...") | |
model = whisper.load_model(model_size, device="cpu") | |
for file in processed_files: | |
print(f"Transcribing: {file}") | |
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead") | |
result = model.transcribe(file, fp16=False, language=language, temperature=0.0) | |
full_text = result["text"] | |
segments = "" | |
for segment in result["segments"]: | |
segments += f"[{segment['start']:.2f} - {segment['end']:.2f}]: {segment['text']}\n" | |
# Store transcript files in temp directory | |
txt_path = os.path.join(temp_dir, f"transcript_{os.path.splitext(os.path.basename(file))[0]}.txt") | |
with open(txt_path, "w", encoding="utf-8") as f: | |
f.write("=== Full Transcription ===\n\n") | |
f.write(full_text) | |
f.write("\n\n=== Segment-wise Transcription ===\n") | |
f.write(segments) | |
all_results.append(full_text) | |
all_segments.append(segments) | |
all_txt_paths.append(txt_path) | |
# Create combined transcript file in temp directory | |
combined_txt_path = os.path.join(temp_dir, "combined_transcripts.txt") | |
with open(combined_txt_path, "w", encoding="utf-8") as f: | |
f.write("=== Combined Transcriptions ===\n\n") | |
for i, (result, segment, path) in enumerate(zip(all_results, all_segments, all_txt_paths)): | |
filename = os.path.basename(processed_files[i]) | |
f.write(f"File: {filename}\n") | |
f.write("=== Full Transcription ===\n") | |
f.write(result) | |
f.write("\n\n=== Segment-wise Transcription ===\n") | |
f.write(segment) | |
f.write("\n" + "-"*50 + "\n\n") | |
# Format display output | |
combined_results = "=== File Transcriptions ===\n\n" | |
combined_segments = "=== File Segments ===\n\n" | |
for i, (result, segment) in enumerate(zip(all_results, all_segments)): | |
filename = os.path.basename(processed_files[i]) | |
combined_results += f"File: {filename}\n{result}\n\n" | |
combined_segments += f"File: {filename}\n{segment}\n\n" | |
# Create ZIP with all processed files and transcripts | |
zip_path = f"processed_files_and_transcripts_{int(time.time())}.zip" | |
cleanup_files = processed_files.copy() | |
with zipfile.ZipFile(zip_path, 'w') as zipf: | |
for file in processed_files: | |
if os.path.exists(file): | |
zipf.write(file, os.path.basename(file)) | |
for txt_file in all_txt_paths: | |
if os.path.exists(txt_file): | |
zipf.write(txt_file) | |
if os.path.exists(combined_txt_path): | |
zipf.write(combined_txt_path) | |
# Cleanup files after ZIP creation | |
for file in cleanup_files: | |
if os.path.exists(file): | |
os.remove(file) | |
for txt_file in all_txt_paths: | |
if os.path.exists(txt_file): | |
os.remove(txt_file) | |
if os.path.exists(combined_txt_path): | |
os.remove(combined_txt_path) | |
# Clean up temp directory | |
if os.path.exists(temp_dir): | |
for file in os.listdir(temp_dir): | |
file_path = os.path.join(temp_dir, file) | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
os.rmdir(temp_dir) | |
return combined_results, combined_segments, zip_path | |
except Exception as inner_e: | |
print(f"Error during processing: {inner_e}") | |
raise inner_e | |
except Exception as e: | |
print(f"Error in process_audio: {e}") | |
if 'temp_dir' in locals() and os.path.exists(temp_dir): | |
try: | |
for file in os.listdir(temp_dir): | |
file_path = os.path.join(temp_dir, file) | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
os.rmdir(temp_dir) | |
except: | |
pass | |
return f"Error: {str(e)}", "", None | |
def create_interface(): | |
with gr.Blocks(title="Interview Audio Processing App") as app: | |
gr.Markdown(""" | |
# Audio Processing App | |
Upload audio files (MP3 or M4A) for processing and transcription.\\ | |
Intended use case: transcription of interviews. | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
audio_input = gr.File( | |
label="Upload Audio Files", | |
file_count="multiple", | |
type="filepath" | |
) | |
with gr.Group(): | |
gr.Markdown("### Silence Removal Settings") | |
gr.Markdown(" Default settings are working very well. Silence removal helps to reduce hallucination.") | |
remove_silence = gr.Checkbox( | |
label="Remove Silence", | |
value=False | |
) | |
min_silence_len = gr.Slider( | |
minimum=100, | |
maximum=2000, | |
value=500, | |
step=100, | |
label="Minimum Silence Length (ms)", | |
visible=False | |
) | |
silence_thresh = gr.Slider( | |
minimum=-70, | |
maximum=-30, | |
value=-50, | |
step=5, | |
label="Silence Threshold (dB)", | |
visible=False | |
) | |
with gr.Group(): | |
gr.Markdown("### Chunking Settings") | |
gr.Markdown(" Chunking reduces the load on the model. 10min chunks work really good.") | |
enable_chunking = gr.Checkbox( | |
label="Enable Chunking", | |
value=False | |
) | |
chunk_duration = gr.Slider( | |
minimum=60, | |
maximum=3600, | |
value=600, | |
step=60, | |
label="Chunk Duration (seconds)", | |
visible=False | |
) | |
ffmpeg_path = gr.Textbox( | |
label="FFmpeg Path", | |
value="ffmpeg", | |
placeholder="Path to ffmpeg executable", | |
visible=False | |
) | |
with gr.Group(): | |
gr.Markdown("### Transcription Settings") | |
gr.Markdown(" tiny is the fastest, but the worst quality. Large-v3-turbo is the best, but slower.") | |
model_size = gr.Dropdown( | |
choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3", "turbo", "large-v3-turbo"], | |
value="large-v3-turbo", | |
label="Whisper Model Size" | |
) | |
language = gr.Dropdown( | |
choices=["de", "en", "fr", "es", "it"], | |
value="de", | |
label="Language" | |
) | |
process_btn = gr.Button("Process", variant="primary") | |
delete_btn = gr.Button("Delete Everything", variant="stop") | |
with gr.Column(): | |
full_transcription = gr.Textbox(label="Full Transcription", lines=15) | |
segmented_transcription = gr.Textbox(label="Segmented Transcription", lines=15) | |
download_output = gr.File(label="Download Processed Files and Transcripts (ZIP)") | |
def update_silence_controls(remove_silence): | |
return { | |
min_silence_len: gr.update(visible=remove_silence), | |
silence_thresh: gr.update(visible=remove_silence), | |
full_transcription: gr.update(value=""), | |
segmented_transcription: gr.update(value=""), | |
download_output: gr.update(value=None) | |
} | |
def update_chunking_controls(enable_chunking): | |
return { | |
chunk_duration: gr.update(visible=enable_chunking), | |
ffmpeg_path: gr.update(visible=enable_chunking), | |
full_transcription: gr.update(value=""), | |
segmented_transcription: gr.update(value=""), | |
download_output: gr.update(value=None) | |
} | |
remove_silence.change( | |
fn=update_silence_controls, | |
inputs=[remove_silence], | |
outputs=[ | |
min_silence_len, | |
silence_thresh, | |
full_transcription, | |
segmented_transcription, | |
download_output | |
] | |
) | |
enable_chunking.change( | |
fn=update_chunking_controls, | |
inputs=[enable_chunking], | |
outputs=[ | |
chunk_duration, | |
ffmpeg_path, | |
full_transcription, | |
segmented_transcription, | |
download_output | |
] | |
) | |
process_btn.click( | |
fn=process_audio, | |
inputs=[ | |
audio_input, | |
remove_silence, | |
min_silence_len, | |
silence_thresh, | |
enable_chunking, | |
chunk_duration, | |
ffmpeg_path, | |
model_size, | |
language, | |
], | |
outputs=[ | |
full_transcription, | |
segmented_transcription, | |
download_output, | |
] | |
) | |
# Add cleanup function | |
def cleanup_files(): | |
try: | |
# Clean up temp directories | |
temp_dirs = [d for d in os.listdir('.') if d.startswith('temp_processing')] | |
for temp_dir in temp_dirs: | |
if os.path.exists(temp_dir): | |
for file in os.listdir(temp_dir): | |
file_path = os.path.join(temp_dir, file) | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
os.rmdir(temp_dir) | |
# Clean up ZIP files | |
zip_files = [f for f in os.listdir('.') if f.startswith('processed_files_and_transcripts_')] | |
for zip_file in zip_files: | |
if os.path.exists(zip_file): | |
os.remove(zip_file) | |
# Clean up transcript files | |
transcript_files = [f for f in os.listdir('.') if f.startswith('transcript_')] | |
for transcript_file in transcript_files: | |
if os.path.exists(transcript_file): | |
os.remove(transcript_file) | |
# Return updates for all output fields | |
return { | |
full_transcription: gr.update(value="All temporary files have been deleted."), | |
segmented_transcription: gr.update(value=""), | |
download_output: gr.update(value=None) | |
} | |
except Exception as e: | |
return { | |
full_transcription: gr.update(value=f"Error during cleanup: {str(e)}"), | |
segmented_transcription: gr.update(value=""), | |
download_output: gr.update(value=None) | |
} | |
# Update the delete button click handler | |
delete_btn.click( | |
fn=cleanup_files, | |
inputs=[], | |
outputs=[ | |
full_transcription, | |
segmented_transcription, | |
download_output | |
] | |
) | |
return app | |
if __name__ == "__main__": | |
app = create_interface() | |
app.launch(share=False) |