Transcription / app.py
doyouknowmarc's picture
Init - create App.py
c2cfda7 verified
import gradio as gr
import warnings
import torch
import os
import whisper
import ssl
import zipfile
from pydub import AudioSegment
from pydub.silence import detect_nonsilent
import subprocess
import tempfile
import time
ssl._create_default_https_context = ssl._create_unverified_context
def process_audio(
audio_paths,
remove_silence=False,
min_silence_len=500,
silence_thresh=-50,
enable_chunking=False,
chunk_duration=600,
ffmpeg_path="ffmpeg",
model_size="large-v3-turbo",
language="de"
):
try:
if not audio_paths:
return "No files selected.", "", None
# Clean up any existing temp directory at the start
temp_dir = "temp_processing"
if os.path.exists(temp_dir):
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(f"Error cleaning up {file_path}: {e}")
try:
os.rmdir(temp_dir)
except Exception as e:
print(f"Error removing temp directory: {e}")
# Create fresh temp directory with unique timestamp
temp_dir = f"temp_processing_{int(time.time())}"
os.makedirs(temp_dir, exist_ok=True)
processed_files = []
all_results = []
all_segments = []
all_txt_paths = []
try:
# Step 1: Process each audio file
for audio_path in audio_paths:
if not audio_path:
continue
current_file = audio_path
temp_files = []
# Step 1a: Split audio if chunking is enabled
if enable_chunking:
base_name = os.path.splitext(os.path.basename(current_file))[0]
output_pattern = os.path.join(temp_dir, f"{base_name}_part_%d.mp3")
cmd = [
ffmpeg_path, "-i", current_file,
"-f", "segment",
"-segment_time", str(chunk_duration),
"-c:a", "copy",
"-segment_start_number", "1",
output_pattern
]
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
chunk_files = sorted([os.path.join(temp_dir, f) for f in os.listdir(temp_dir)
if f.startswith(f"{base_name}_part_")])
temp_files.extend(chunk_files)
else:
temp_files.append(current_file)
# Step 1b: Remove silence if requested
if remove_silence:
silence_removed_files = []
for file in temp_files:
audio = AudioSegment.from_file(file)
nonsilent = detect_nonsilent(
audio,
min_silence_len=min_silence_len,
silence_thresh=silence_thresh
)
output = AudioSegment.empty()
for start, end in nonsilent:
output += audio[start:end]
# Save the silence-removed file
silence_removed_path = os.path.join(temp_dir, f"silence_removed_{os.path.basename(file)}")
output.export(silence_removed_path, format="mp3")
silence_removed_files.append(silence_removed_path)
processed_files.extend(silence_removed_files)
else:
processed_files.extend(temp_files)
# Step 2: Transcribe all processed files
print(f"Loading Whisper model '{model_size}'...")
model = whisper.load_model(model_size, device="cpu")
for file in processed_files:
print(f"Transcribing: {file}")
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead")
result = model.transcribe(file, fp16=False, language=language, temperature=0.0)
full_text = result["text"]
segments = ""
for segment in result["segments"]:
segments += f"[{segment['start']:.2f} - {segment['end']:.2f}]: {segment['text']}\n"
# Store transcript files in temp directory
txt_path = os.path.join(temp_dir, f"transcript_{os.path.splitext(os.path.basename(file))[0]}.txt")
with open(txt_path, "w", encoding="utf-8") as f:
f.write("=== Full Transcription ===\n\n")
f.write(full_text)
f.write("\n\n=== Segment-wise Transcription ===\n")
f.write(segments)
all_results.append(full_text)
all_segments.append(segments)
all_txt_paths.append(txt_path)
# Create combined transcript file in temp directory
combined_txt_path = os.path.join(temp_dir, "combined_transcripts.txt")
with open(combined_txt_path, "w", encoding="utf-8") as f:
f.write("=== Combined Transcriptions ===\n\n")
for i, (result, segment, path) in enumerate(zip(all_results, all_segments, all_txt_paths)):
filename = os.path.basename(processed_files[i])
f.write(f"File: {filename}\n")
f.write("=== Full Transcription ===\n")
f.write(result)
f.write("\n\n=== Segment-wise Transcription ===\n")
f.write(segment)
f.write("\n" + "-"*50 + "\n\n")
# Format display output
combined_results = "=== File Transcriptions ===\n\n"
combined_segments = "=== File Segments ===\n\n"
for i, (result, segment) in enumerate(zip(all_results, all_segments)):
filename = os.path.basename(processed_files[i])
combined_results += f"File: {filename}\n{result}\n\n"
combined_segments += f"File: {filename}\n{segment}\n\n"
# Create ZIP with all processed files and transcripts
zip_path = f"processed_files_and_transcripts_{int(time.time())}.zip"
cleanup_files = processed_files.copy()
with zipfile.ZipFile(zip_path, 'w') as zipf:
for file in processed_files:
if os.path.exists(file):
zipf.write(file, os.path.basename(file))
for txt_file in all_txt_paths:
if os.path.exists(txt_file):
zipf.write(txt_file)
if os.path.exists(combined_txt_path):
zipf.write(combined_txt_path)
# Cleanup files after ZIP creation
for file in cleanup_files:
if os.path.exists(file):
os.remove(file)
for txt_file in all_txt_paths:
if os.path.exists(txt_file):
os.remove(txt_file)
if os.path.exists(combined_txt_path):
os.remove(combined_txt_path)
# Clean up temp directory
if os.path.exists(temp_dir):
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
os.rmdir(temp_dir)
return combined_results, combined_segments, zip_path
except Exception as inner_e:
print(f"Error during processing: {inner_e}")
raise inner_e
except Exception as e:
print(f"Error in process_audio: {e}")
if 'temp_dir' in locals() and os.path.exists(temp_dir):
try:
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
os.rmdir(temp_dir)
except:
pass
return f"Error: {str(e)}", "", None
def create_interface():
with gr.Blocks(title="Interview Audio Processing App") as app:
gr.Markdown("""
# Audio Processing App
Upload audio files (MP3 or M4A) for processing and transcription.\\
Intended use case: transcription of interviews.
""")
with gr.Row():
with gr.Column():
audio_input = gr.File(
label="Upload Audio Files",
file_count="multiple",
type="filepath"
)
with gr.Group():
gr.Markdown("### Silence Removal Settings")
gr.Markdown(" Default settings are working very well. Silence removal helps to reduce hallucination.")
remove_silence = gr.Checkbox(
label="Remove Silence",
value=False
)
min_silence_len = gr.Slider(
minimum=100,
maximum=2000,
value=500,
step=100,
label="Minimum Silence Length (ms)",
visible=False
)
silence_thresh = gr.Slider(
minimum=-70,
maximum=-30,
value=-50,
step=5,
label="Silence Threshold (dB)",
visible=False
)
with gr.Group():
gr.Markdown("### Chunking Settings")
gr.Markdown(" Chunking reduces the load on the model. 10min chunks work really good.")
enable_chunking = gr.Checkbox(
label="Enable Chunking",
value=False
)
chunk_duration = gr.Slider(
minimum=60,
maximum=3600,
value=600,
step=60,
label="Chunk Duration (seconds)",
visible=False
)
ffmpeg_path = gr.Textbox(
label="FFmpeg Path",
value="ffmpeg",
placeholder="Path to ffmpeg executable",
visible=False
)
with gr.Group():
gr.Markdown("### Transcription Settings")
gr.Markdown(" tiny is the fastest, but the worst quality. Large-v3-turbo is the best, but slower.")
model_size = gr.Dropdown(
choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3", "turbo", "large-v3-turbo"],
value="large-v3-turbo",
label="Whisper Model Size"
)
language = gr.Dropdown(
choices=["de", "en", "fr", "es", "it"],
value="de",
label="Language"
)
process_btn = gr.Button("Process", variant="primary")
delete_btn = gr.Button("Delete Everything", variant="stop")
with gr.Column():
full_transcription = gr.Textbox(label="Full Transcription", lines=15)
segmented_transcription = gr.Textbox(label="Segmented Transcription", lines=15)
download_output = gr.File(label="Download Processed Files and Transcripts (ZIP)")
def update_silence_controls(remove_silence):
return {
min_silence_len: gr.update(visible=remove_silence),
silence_thresh: gr.update(visible=remove_silence),
full_transcription: gr.update(value=""),
segmented_transcription: gr.update(value=""),
download_output: gr.update(value=None)
}
def update_chunking_controls(enable_chunking):
return {
chunk_duration: gr.update(visible=enable_chunking),
ffmpeg_path: gr.update(visible=enable_chunking),
full_transcription: gr.update(value=""),
segmented_transcription: gr.update(value=""),
download_output: gr.update(value=None)
}
remove_silence.change(
fn=update_silence_controls,
inputs=[remove_silence],
outputs=[
min_silence_len,
silence_thresh,
full_transcription,
segmented_transcription,
download_output
]
)
enable_chunking.change(
fn=update_chunking_controls,
inputs=[enable_chunking],
outputs=[
chunk_duration,
ffmpeg_path,
full_transcription,
segmented_transcription,
download_output
]
)
process_btn.click(
fn=process_audio,
inputs=[
audio_input,
remove_silence,
min_silence_len,
silence_thresh,
enable_chunking,
chunk_duration,
ffmpeg_path,
model_size,
language,
],
outputs=[
full_transcription,
segmented_transcription,
download_output,
]
)
# Add cleanup function
def cleanup_files():
try:
# Clean up temp directories
temp_dirs = [d for d in os.listdir('.') if d.startswith('temp_processing')]
for temp_dir in temp_dirs:
if os.path.exists(temp_dir):
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
os.rmdir(temp_dir)
# Clean up ZIP files
zip_files = [f for f in os.listdir('.') if f.startswith('processed_files_and_transcripts_')]
for zip_file in zip_files:
if os.path.exists(zip_file):
os.remove(zip_file)
# Clean up transcript files
transcript_files = [f for f in os.listdir('.') if f.startswith('transcript_')]
for transcript_file in transcript_files:
if os.path.exists(transcript_file):
os.remove(transcript_file)
# Return updates for all output fields
return {
full_transcription: gr.update(value="All temporary files have been deleted."),
segmented_transcription: gr.update(value=""),
download_output: gr.update(value=None)
}
except Exception as e:
return {
full_transcription: gr.update(value=f"Error during cleanup: {str(e)}"),
segmented_transcription: gr.update(value=""),
download_output: gr.update(value=None)
}
# Update the delete button click handler
delete_btn.click(
fn=cleanup_files,
inputs=[],
outputs=[
full_transcription,
segmented_transcription,
download_output
]
)
return app
if __name__ == "__main__":
app = create_interface()
app.launch(share=False)