Spaces:
Running
on
Zero
Running
on
Zero
# construct manifest file for training, note that we only have one train split | |
# also create neighbors folder for each sample, which is simply done through speaker label in the original manifest where each file has rows | |
# path\tdistance\tduration | |
# where distance is always 0 because we don't know the distance between the samples | |
# waiting on Yushen Chen to provide data filtering approach | |
import sys, copy | |
import os, random, numpy as np, socket | |
import json | |
import tqdm | |
from multiprocessing import Pool | |
import glob, os | |
from collections import defaultdict | |
def write_jsonl(data, fn): | |
with open(fn, "w") as file: | |
for entry in data: | |
file.write(json.dumps(entry, ensure_ascii=False) + "\n") | |
def read_jsonl(file_path): | |
cur_data = [] | |
with open(file_path, 'r', encoding='utf-8-sig') as file: | |
for line in file: | |
cur_data.append(json.loads(line.strip())) | |
return cur_data | |
def repetition_found(text, length=2, tolerance=10): | |
pattern_count = defaultdict(int) | |
for i in range(len(text) - length + 1): | |
pattern = text[i : i + length] | |
pattern_count[pattern] += 1 | |
for pattern, count in pattern_count.items(): | |
if count > tolerance: | |
return True | |
return False | |
out_en = { | |
"EN_B00013_S00913", | |
"EN_B00042_S00120", | |
"EN_B00055_S04111", | |
"EN_B00061_S00693", | |
"EN_B00061_S01494", | |
"EN_B00061_S03375", | |
"EN_B00059_S00092", | |
"EN_B00111_S04300", | |
"EN_B00100_S03759", | |
"EN_B00087_S03811", | |
"EN_B00059_S00950", | |
"EN_B00089_S00946", | |
"EN_B00078_S05127", | |
"EN_B00070_S04089", | |
"EN_B00074_S09659", | |
"EN_B00061_S06983", | |
"EN_B00061_S07060", | |
"EN_B00059_S08397", | |
"EN_B00082_S06192", | |
"EN_B00091_S01238", | |
"EN_B00089_S07349", | |
"EN_B00070_S04343", | |
"EN_B00061_S02400", | |
"EN_B00076_S01262", | |
"EN_B00068_S06467", | |
"EN_B00076_S02943", | |
"EN_B00064_S05954", | |
"EN_B00061_S05386", | |
"EN_B00066_S06544", | |
"EN_B00076_S06944", | |
"EN_B00072_S08620", | |
"EN_B00076_S07135", | |
"EN_B00076_S09127", | |
"EN_B00065_S00497", | |
"EN_B00059_S06227", | |
"EN_B00063_S02859", | |
"EN_B00075_S01547", | |
"EN_B00061_S08286", | |
"EN_B00079_S02901", | |
"EN_B00092_S03643", | |
"EN_B00096_S08653", | |
"EN_B00063_S04297", | |
"EN_B00063_S04614", | |
"EN_B00079_S04698", | |
"EN_B00104_S01666", | |
"EN_B00061_S09504", | |
"EN_B00061_S09694", | |
"EN_B00065_S05444", | |
"EN_B00063_S06860", | |
"EN_B00065_S05725", | |
"EN_B00069_S07628", | |
"EN_B00083_S03875", | |
"EN_B00071_S07665", | |
"EN_B00071_S07665", | |
"EN_B00062_S04187", | |
"EN_B00065_S09873", | |
"EN_B00065_S09922", | |
"EN_B00084_S02463", | |
"EN_B00067_S05066", | |
"EN_B00106_S08060", | |
"EN_B00073_S06399", | |
"EN_B00073_S09236", | |
"EN_B00087_S00432", | |
"EN_B00085_S05618", | |
"EN_B00064_S01262", | |
"EN_B00072_S01739", | |
"EN_B00059_S03913", | |
"EN_B00069_S04036", | |
"EN_B00067_S05623", | |
"EN_B00060_S05389", | |
"EN_B00060_S07290", | |
"EN_B00062_S08995", | |
} | |
en_filters = ["ا", "い", "て"] | |
from multiprocessing import Pool | |
def process_meta_item(item, root, sub_root, audio_folder, audio_ext, text_ext): | |
global filtered_duration, filtered_count, total_duration, total_count | |
# Data filtering following Yushen's approach | |
if ( | |
item["wav"].split("/")[-1] in out_en | |
or any(t in item["text"] for t in en_filters) | |
or repetition_found(item["text"], length=4) | |
): | |
return None, item["duration"], 1, 0, 0, (None, None) # Return filtered results | |
# Trim leading space from text if exists | |
if item["text"].startswith(" "): | |
item["text"] = item["text"][1:] | |
# write text to text file | |
text_fn = os.path.join(root, sub_root, audio_folder, item["wav"].replace(audio_ext, text_ext)) | |
os.makedirs(os.path.dirname(text_fn), exist_ok=True) | |
with open(text_fn, "w") as f: | |
f.write(item["text"]) | |
# spk2info[item["speaker"]].append(item) | |
return ( | |
f"{item['wav']}\t{item['duration']}\n", | |
0, | |
0, | |
item["duration"], | |
1, | |
(item['speaker'], item) | |
) # Return processed results | |
def parallel_process_meta(meta, root, sub_root, audio_folder, num_workers, audio_ext, text_ext): | |
with Pool(num_workers) as pool: | |
results = pool.starmap( | |
process_meta_item, | |
[(item, root, sub_root, audio_folder, audio_ext, text_ext) for item in meta], | |
) | |
processed_items = [] | |
spkitem = [] | |
filtered_duration = 0 | |
filtered_count = 0 | |
total_duration = 0 | |
total_count = 0 | |
for result in results: | |
if result[0]: # If the item was processed | |
processed_items.append(result[0]) | |
filtered_duration += result[1] | |
filtered_count += result[2] | |
total_duration += result[3] | |
total_count += result[4] | |
spkitem.append(result[5]) | |
return processed_items, filtered_duration, filtered_count, total_duration, total_count, spkitem | |
def main( | |
root: str = "/data/scratch/pyp/datasets/emilia", | |
sub_root: str = "preprocessed", | |
audio_folder: str = "audio", | |
manifest_folder: str = "manifest_for_codec", | |
neighbors_folder: str = "neighbors", | |
audio_ext: str = ".mp3", | |
text_ext: str = ".txt", | |
num_workers: int = 8, # Specify the number of workers | |
): | |
# Find the segments that are untarred | |
all_fns = [ | |
item | |
for item in glob.glob(f"{root}/{sub_root}/{audio_folder}/*") | |
if os.path.basename(item).startswith("EN_") and os.path.isdir(item) | |
] | |
print(f"found {len(all_fns)} untarred segments") | |
print(f"{all_fns[:3]}") | |
res = [] | |
total_duration = 0 | |
total_count = 0 | |
filtered_duration = 0 | |
filtered_count = 0 | |
for fn in tqdm.tqdm(all_fns, desc="overall progress"): | |
spk2info = defaultdict(list) | |
metafn = os.path.join(root, "EN", os.path.basename(fn) + ".jsonl") | |
meta = read_jsonl(metafn) | |
# Parallel process metadata | |
processed_items, fd, fc, td, tc, spkitem = parallel_process_meta( | |
meta, root, sub_root, audio_folder, num_workers, audio_ext, text_ext | |
) | |
# Aggregate results | |
res.extend(processed_items) | |
filtered_duration += fd | |
filtered_count += fc | |
total_duration += td | |
total_count += tc | |
for spk, item in spkitem: | |
if spk: | |
spk2info[spk].append(item) | |
# Save neighbor files | |
for spk in spk2info: | |
for item in spk2info[spk]: | |
neighbor_fn = os.path.join( | |
root, | |
sub_root, | |
neighbors_folder, | |
item["wav"].replace(audio_ext, text_ext), | |
) | |
os.makedirs(os.path.dirname(neighbor_fn), exist_ok=True) | |
tobe_write = [f"{neighbor_item['wav'].replace(audio_ext, text_ext)}\t0\t{neighbor_item['duration']}\n" for neighbor_item in spk2info[spk] if neighbor_item["wav"] != item["wav"]] | |
if tobe_write: | |
with open(neighbor_fn, "w") as f: | |
f.writelines(tobe_write) | |
print( | |
f"total duration: {total_duration / 3600:.2f} hours, total count: {total_count}" | |
) | |
print( | |
f"filtered duration: {filtered_duration / 3600:.2f} hours, filtered count: {filtered_count}" | |
) | |
save_fn = os.path.join(root, sub_root, manifest_folder, "train.txt") | |
os.makedirs(os.path.dirname(save_fn), exist_ok=True) | |
with open(save_fn, "w") as f: | |
for item in res: | |
f.write(item) | |
if __name__ == "__main__": | |
import fire | |
fire.Fire(main) |