# construct manifest file for training, note that we only have one train split # also create neighbors folder for each sample, which is simply done through speaker label in the original manifest where each file has rows # path\tdistance\tduration # where distance is always 0 because we don't know the distance between the samples # waiting on Yushen Chen to provide data filtering approach import sys, copy import os, random, numpy as np, socket import json import tqdm from multiprocessing import Pool import glob, os from collections import defaultdict def write_jsonl(data, fn): with open(fn, "w") as file: for entry in data: file.write(json.dumps(entry, ensure_ascii=False) + "\n") def read_jsonl(file_path): cur_data = [] with open(file_path, 'r', encoding='utf-8-sig') as file: for line in file: cur_data.append(json.loads(line.strip())) return cur_data def repetition_found(text, length=2, tolerance=10): pattern_count = defaultdict(int) for i in range(len(text) - length + 1): pattern = text[i : i + length] pattern_count[pattern] += 1 for pattern, count in pattern_count.items(): if count > tolerance: return True return False out_en = { "EN_B00013_S00913", "EN_B00042_S00120", "EN_B00055_S04111", "EN_B00061_S00693", "EN_B00061_S01494", "EN_B00061_S03375", "EN_B00059_S00092", "EN_B00111_S04300", "EN_B00100_S03759", "EN_B00087_S03811", "EN_B00059_S00950", "EN_B00089_S00946", "EN_B00078_S05127", "EN_B00070_S04089", "EN_B00074_S09659", "EN_B00061_S06983", "EN_B00061_S07060", "EN_B00059_S08397", "EN_B00082_S06192", "EN_B00091_S01238", "EN_B00089_S07349", "EN_B00070_S04343", "EN_B00061_S02400", "EN_B00076_S01262", "EN_B00068_S06467", "EN_B00076_S02943", "EN_B00064_S05954", "EN_B00061_S05386", "EN_B00066_S06544", "EN_B00076_S06944", "EN_B00072_S08620", "EN_B00076_S07135", "EN_B00076_S09127", "EN_B00065_S00497", "EN_B00059_S06227", "EN_B00063_S02859", "EN_B00075_S01547", "EN_B00061_S08286", "EN_B00079_S02901", "EN_B00092_S03643", "EN_B00096_S08653", "EN_B00063_S04297", "EN_B00063_S04614", "EN_B00079_S04698", "EN_B00104_S01666", "EN_B00061_S09504", "EN_B00061_S09694", "EN_B00065_S05444", "EN_B00063_S06860", "EN_B00065_S05725", "EN_B00069_S07628", "EN_B00083_S03875", "EN_B00071_S07665", "EN_B00071_S07665", "EN_B00062_S04187", "EN_B00065_S09873", "EN_B00065_S09922", "EN_B00084_S02463", "EN_B00067_S05066", "EN_B00106_S08060", "EN_B00073_S06399", "EN_B00073_S09236", "EN_B00087_S00432", "EN_B00085_S05618", "EN_B00064_S01262", "EN_B00072_S01739", "EN_B00059_S03913", "EN_B00069_S04036", "EN_B00067_S05623", "EN_B00060_S05389", "EN_B00060_S07290", "EN_B00062_S08995", } en_filters = ["ا", "い", "て"] from multiprocessing import Pool def process_meta_item(item, root, sub_root, audio_folder, audio_ext, text_ext): global filtered_duration, filtered_count, total_duration, total_count # Data filtering following Yushen's approach if ( item["wav"].split("/")[-1] in out_en or any(t in item["text"] for t in en_filters) or repetition_found(item["text"], length=4) ): return None, item["duration"], 1, 0, 0, (None, None) # Return filtered results # Trim leading space from text if exists if item["text"].startswith(" "): item["text"] = item["text"][1:] # write text to text file text_fn = os.path.join(root, sub_root, audio_folder, item["wav"].replace(audio_ext, text_ext)) os.makedirs(os.path.dirname(text_fn), exist_ok=True) with open(text_fn, "w") as f: f.write(item["text"]) # spk2info[item["speaker"]].append(item) return ( f"{item['wav']}\t{item['duration']}\n", 0, 0, item["duration"], 1, (item['speaker'], item) ) # Return processed results def parallel_process_meta(meta, root, sub_root, audio_folder, num_workers, audio_ext, text_ext): with Pool(num_workers) as pool: results = pool.starmap( process_meta_item, [(item, root, sub_root, audio_folder, audio_ext, text_ext) for item in meta], ) processed_items = [] spkitem = [] filtered_duration = 0 filtered_count = 0 total_duration = 0 total_count = 0 for result in results: if result[0]: # If the item was processed processed_items.append(result[0]) filtered_duration += result[1] filtered_count += result[2] total_duration += result[3] total_count += result[4] spkitem.append(result[5]) return processed_items, filtered_duration, filtered_count, total_duration, total_count, spkitem def main( root: str = "/data/scratch/pyp/datasets/emilia", sub_root: str = "preprocessed", audio_folder: str = "audio", manifest_folder: str = "manifest_for_codec", neighbors_folder: str = "neighbors", audio_ext: str = ".mp3", text_ext: str = ".txt", num_workers: int = 8, # Specify the number of workers ): # Find the segments that are untarred all_fns = [ item for item in glob.glob(f"{root}/{sub_root}/{audio_folder}/*") if os.path.basename(item).startswith("EN_") and os.path.isdir(item) ] print(f"found {len(all_fns)} untarred segments") print(f"{all_fns[:3]}") res = [] total_duration = 0 total_count = 0 filtered_duration = 0 filtered_count = 0 for fn in tqdm.tqdm(all_fns, desc="overall progress"): spk2info = defaultdict(list) metafn = os.path.join(root, "EN", os.path.basename(fn) + ".jsonl") meta = read_jsonl(metafn) # Parallel process metadata processed_items, fd, fc, td, tc, spkitem = parallel_process_meta( meta, root, sub_root, audio_folder, num_workers, audio_ext, text_ext ) # Aggregate results res.extend(processed_items) filtered_duration += fd filtered_count += fc total_duration += td total_count += tc for spk, item in spkitem: if spk: spk2info[spk].append(item) # Save neighbor files for spk in spk2info: for item in spk2info[spk]: neighbor_fn = os.path.join( root, sub_root, neighbors_folder, item["wav"].replace(audio_ext, text_ext), ) os.makedirs(os.path.dirname(neighbor_fn), exist_ok=True) tobe_write = [f"{neighbor_item['wav'].replace(audio_ext, text_ext)}\t0\t{neighbor_item['duration']}\n" for neighbor_item in spk2info[spk] if neighbor_item["wav"] != item["wav"]] if tobe_write: with open(neighbor_fn, "w") as f: f.writelines(tobe_write) print( f"total duration: {total_duration / 3600:.2f} hours, total count: {total_count}" ) print( f"filtered duration: {filtered_duration / 3600:.2f} hours, filtered count: {filtered_count}" ) save_fn = os.path.join(root, sub_root, manifest_folder, "train.txt") os.makedirs(os.path.dirname(save_fn), exist_ok=True) with open(save_fn, "w") as f: for item in res: f.write(item) if __name__ == "__main__": import fire fire.Fire(main)