Spaces:
Running
Running
import gradio as gr | |
import re | |
import difflib | |
import os | |
from typing import List, Dict, Tuple, Optional | |
from dataclasses import dataclass | |
import numpy as np | |
class Segment: | |
"""A segment of a transcript with a speaker and text""" | |
speaker: str | |
timestamp: str | |
text: str | |
original_text: str # The text as it appears in the original transcript | |
index: int # Position in the original transcript | |
def clean_text_for_matching(text: str) -> str: | |
"""Clean text for matching purposes (remove formatting, punctuation, etc.)""" | |
# Remove markdown links and formatting | |
text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text) # Replace markdown links with just the text | |
text = re.sub(r'\*\*|\*', '', text) # Remove bold and italic formatting | |
# Remove common filler words and punctuation for better matching | |
text = re.sub(r'[,.;:!?]', ' ', text) | |
text = re.sub(r'\s+', ' ', text) | |
return text.lower().strip() | |
def load_transcript_file(file_path: str) -> str: | |
"""Load transcript from a file""" | |
with open(file_path, 'r', encoding='utf-8') as f: | |
return f.read() | |
def parse_transcript(transcript: str) -> List[Segment]: | |
""" | |
Parse transcript into segments. | |
Works with both formats: | |
- Speaker LastName 00:00:00 | |
- **Speaker LastName** *00:00:00* | |
""" | |
# Match both markdown and plain formats | |
pattern = r"(?:\*\*)?(?:Speaker\s+)?([A-Za-z]+)(?:\*\*)?\s+(?:\*)?([0-9:]+)(?:\*)?\s*\n\n(.*?)(?=\n\n(?:\*\*)?(?:Speaker\s+)?[A-Za-z]+|\Z)" | |
segments = [] | |
for i, match in enumerate(re.finditer(pattern, transcript, re.DOTALL)): | |
speaker, timestamp, text = match.groups() | |
original_text = text.strip() | |
cleaned_text = clean_text_for_matching(original_text) | |
segments.append(Segment(speaker, timestamp, cleaned_text, original_text, i)) | |
return segments | |
def align_segments(auto_segments: List[Segment], human_segments: List[Segment]) -> Dict[int, int]: | |
""" | |
Align segments from human-edited transcript to auto-generated transcript. | |
Returns a dictionary mapping human segment indices to auto segment indices. | |
""" | |
alignments = {} | |
# Create text similarity matrix | |
similarity_matrix = np.zeros((len(human_segments), len(auto_segments))) | |
for h_idx, h_segment in enumerate(human_segments): | |
for a_idx, a_segment in enumerate(auto_segments): | |
similarity = difflib.SequenceMatcher(None, h_segment.text, a_segment.text).ratio() | |
similarity_matrix[h_idx, a_idx] = similarity | |
# Find best matches while maintaining order | |
remaining_auto_indices = set(range(len(auto_segments))) | |
for h_idx, h_segment in enumerate(human_segments): | |
# Find the best matching auto segment that hasn't been assigned yet | |
best_match = -1 | |
best_similarity = 0.5 # Threshold for considering a match | |
for a_idx in remaining_auto_indices: | |
similarity = similarity_matrix[h_idx, a_idx] | |
if similarity > best_similarity: | |
# Check if this would violate sequence ordering | |
if all(aligned_a_idx < a_idx for aligned_h_idx, aligned_a_idx in alignments.items() if aligned_h_idx < h_idx): | |
best_match = a_idx | |
best_similarity = similarity | |
if best_match >= 0: | |
alignments[h_idx] = best_match | |
remaining_auto_indices.remove(best_match) | |
return alignments | |
def update_transcript(human_segments: List[Segment], auto_segments: List[Segment], | |
alignments: Dict[int, int], is_markdown: bool) -> str: | |
""" | |
Create updated transcript by transferring timestamps from auto segments to human segments. | |
Preserves all human edits, formatting, links, etc. | |
""" | |
updated_segments = [] | |
for h_idx, h_segment in enumerate(human_segments): | |
if h_idx in alignments: | |
# Segment was matched, use timestamp from auto segment | |
a_idx = alignments[h_idx] | |
if is_markdown: | |
updated_segments.append(f"**{h_segment.speaker}** *{auto_segments[a_idx].timestamp}*\n\n{h_segment.original_text}") | |
else: | |
updated_segments.append(f"Speaker {h_segment.speaker} {auto_segments[a_idx].timestamp}\n\n{h_segment.original_text}") | |
else: | |
# No match found, keep original timestamp but mark it | |
if is_markdown: | |
updated_segments.append(f"**{h_segment.speaker}** *{h_segment.timestamp} [NO MATCH]*\n\n{h_segment.original_text}") | |
else: | |
updated_segments.append(f"Speaker {h_segment.speaker} {h_segment.timestamp} [NO MATCH]\n\n{h_segment.original_text}") | |
return "\n\n".join(updated_segments) | |
def generate_match_report(human_segments: List[Segment], auto_segments: List[Segment], | |
alignments: Dict[int, int]) -> str: | |
"""Generate a report about the matching process""" | |
total_human = len(human_segments) | |
total_auto = len(auto_segments) | |
total_matched = len(alignments) | |
report = f"### Matching Report\n\n" | |
report += f"- Human segments: {total_human}\n" | |
report += f"- Auto segments: {total_auto}\n" | |
report += f"- Matched segments: {total_matched} ({total_matched/total_human*100:.1f}%)\n" | |
if total_matched < total_human: | |
report += f"\n### Unmatched Segments ({total_human - total_matched})\n\n" | |
for h_idx, h_segment in enumerate(human_segments): | |
if h_idx not in alignments: | |
report += f"- Speaker {h_segment.speaker} at {h_segment.timestamp}: '{h_segment.text[:50]}...'\n" | |
# Calculate average similarity of matches | |
if alignments: | |
similarities = [ | |
difflib.SequenceMatcher(None, | |
human_segments[h_idx].text, | |
auto_segments[a_idx].text).ratio() | |
for h_idx, a_idx in alignments.items() | |
] | |
avg_similarity = sum(similarities) / len(similarities) | |
report += f"\n### Match Quality\n\n" | |
report += f"- Average similarity: {avg_similarity:.2f}\n" | |
return report | |
def process_transcripts(auto_transcript, human_transcript): | |
"""Process the auto and human transcripts to update timestamps""" | |
try: | |
# Load transcripts | |
auto_content = auto_transcript.decode('utf-8') if isinstance(auto_transcript, bytes) else auto_transcript | |
human_content = human_transcript.decode('utf-8') if isinstance(human_transcript, bytes) else human_transcript | |
# Check if transcripts use markdown formatting | |
is_markdown = "**" in human_content | |
# Parse transcripts | |
auto_segments = parse_transcript(auto_content) | |
human_segments = parse_transcript(human_content) | |
if not auto_segments or not human_segments: | |
return "Error: Could not parse transcripts. Please check the format.", "" | |
# Align segments | |
alignments = align_segments(auto_segments, human_segments) | |
# Update transcript | |
updated_transcript = update_transcript(human_segments, auto_segments, alignments, is_markdown) | |
# Generate report | |
report = generate_match_report(human_segments, auto_segments, alignments) | |
return updated_transcript, report | |
except Exception as e: | |
return f"Error processing transcripts: {str(e)}", "" | |
def save_transcript(transcript: str) -> str: | |
"""Save transcript to a temporary file and return the path""" | |
output_dir = "output" | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
output_path = os.path.join(output_dir, "updated_transcript.md") | |
with open(output_path, 'w', encoding='utf-8') as f: | |
f.write(transcript) | |
return output_path | |
# Create Gradio interface | |
with gr.Blocks(title="Transcript Timestamp Synchronizer") as demo: | |
gr.Markdown(""" | |
# ๐๏ธ Transcript Timestamp Synchronizer | |
This tool updates timestamps in human-edited transcripts based on new auto-generated transcripts. | |
## Instructions: | |
1. Upload or paste your new auto-generated transcript (with updated timestamps) | |
2. Upload or paste your human-edited transcript (with old timestamps) | |
3. Click "Synchronize Timestamps" to generate an updated transcript | |
The tool will match segments between the transcripts and update the timestamps while preserving all human edits. | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
auto_source = gr.Radio( | |
["Upload File", "Paste Text"], | |
label="Auto-generated Transcript Source", | |
value="Paste Text" | |
) | |
auto_file = gr.File( | |
label="Upload Auto-generated Transcript", | |
file_types=[".md", ".txt"], | |
visible=False | |
) | |
auto_text = gr.TextArea( | |
label="Auto-generated Transcript (with new timestamps)", | |
placeholder="Paste the auto-generated transcript here...", | |
lines=15, | |
visible=True | |
) | |
with gr.Column(): | |
human_source = gr.Radio( | |
["Upload File", "Paste Text"], | |
label="Human-edited Transcript Source", | |
value="Paste Text" | |
) | |
human_file = gr.File( | |
label="Upload Human-edited Transcript", | |
file_types=[".md", ".txt"], | |
visible=False | |
) | |
human_text = gr.TextArea( | |
label="Human-edited Transcript (with old timestamps)", | |
placeholder="Paste the human-edited transcript here...", | |
lines=15, | |
visible=True | |
) | |
update_btn = gr.Button("Synchronize Timestamps") | |
with gr.Tabs(): | |
with gr.TabItem("Updated Transcript"): | |
updated_transcript = gr.TextArea( | |
label="Updated Transcript", | |
placeholder="The updated transcript will appear here...", | |
lines=20 | |
) | |
download_btn = gr.Button("Download Updated Transcript") | |
download_path = gr.File(label="Download", visible=False) | |
with gr.TabItem("Matching Report"): | |
matching_report = gr.Markdown( | |
label="Matching Report", | |
value="The matching report will appear here..." | |
) | |
# Handle visibility of upload/paste options | |
def update_auto_visibility(choice): | |
return gr.update(visible=choice=="Upload File"), gr.update(visible=choice=="Paste Text") | |
def update_human_visibility(choice): | |
return gr.update(visible=choice=="Upload File"), gr.update(visible=choice=="Paste Text") | |
auto_source.change(update_auto_visibility, auto_source, [auto_file, auto_text]) | |
human_source.change(update_human_visibility, human_source, [human_file, human_text]) | |
# Load file content if uploaded | |
def load_auto_file(file): | |
if file is None: | |
return "" | |
with open(file.name, "r", encoding="utf-8") as f: | |
return f.read() | |
def load_human_file(file): | |
if file is None: | |
return "" | |
with open(file.name, "r", encoding="utf-8") as f: | |
return f.read() | |
auto_file.change(load_auto_file, auto_file, auto_text) | |
human_file.change(load_human_file, human_file, human_text) | |
# Process transcripts | |
def handle_process(auto_content, human_content): | |
return process_transcripts(auto_content, human_content) | |
update_btn.click( | |
fn=handle_process, | |
inputs=[auto_text, human_text], | |
outputs=[updated_transcript, matching_report] | |
) | |
# Handle download | |
def prepare_download(transcript): | |
if not transcript: | |
return None | |
return save_transcript(transcript) | |
download_btn.click( | |
fn=prepare_download, | |
inputs=[updated_transcript], | |
outputs=[download_path] | |
) | |
# For local testing | |
if __name__ == "__main__": | |
demo.launch() |