Spaces:
Running
on
Zero
Running
on
Zero
File size: 5,317 Bytes
82bc972 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import sys, copy
import os, random, numpy as np, socket
import json
import tqdm
from multiprocessing import Pool
import glob, os, fire
from collections import defaultdict
sys.path.insert(0, "../../")
from data.tokenizer import TextTokenizer, tokenize_text
def write_jsonl(data, fn):
with open(fn, "w") as file:
for entry in data:
file.write(json.dumps(entry, ensure_ascii=False) + "\n")
def read_jsonl(file_path):
cur_data = []
with open(file_path, 'r', encoding='utf-8-sig') as file:
for line in file:
cur_data.append(json.loads(line.strip()))
return cur_data
def phonemize_and_save(text, fn, text_tokenizer):
"""Phonemizes the text and saves the result to a file."""
phn = tokenize_text(text_tokenizer, text)
os.makedirs(os.path.dirname(fn), exist_ok=True)
with open(fn, "w") as f:
f.write(" ".join(phn))
return set(phn)
def process_item(item, root, sub_root, audio_folder, phn_folder, audio_ext, text_ext, phn_ext, text_tokenizer):
"""Worker function to process a single item."""
text_path = os.path.join(root, sub_root, audio_folder, item[0].replace(audio_ext, text_ext))
if not os.path.exists(text_path):
return {"missing_text": text_path, "success": False, "cur_phn_set": set()}
with open(text_path, "r") as f:
text = [line.strip() for line in f.readlines()]
text = " ".join(text)
phn_path = os.path.join(root, sub_root, phn_folder, item[0].replace(audio_ext, phn_ext))
cur_phn_set = phonemize_and_save(text, phn_path, text_tokenizer)
return {"missing_text": None, "success": True, "cur_phn_set": cur_phn_set}
def process_item_star(args):
"""Unpacks arguments for `process_item` to work with `imap`."""
return process_item(*args)
def main(
root="/data/scratch/pyp/datasets/emilia",
sub_root="preprocessed",
manifest_folder="manifest_for_codec",
audio_folder="audio",
phn_folder="phoneme",
audio_ext=".mp3",
text_ext=".txt",
phn_ext=".txt",
num_workers=8,
):
"""Main function to process phoneme generation in parallel."""
# # Initialize the tokenizer
text_tokenizer = TextTokenizer()
all_fns = glob.glob(f"{root}/{sub_root}/{manifest_folder}/*.txt")
print(f"found {len(all_fns)} manifest files")
print(f"{all_fns[:3]=}")
data = []
for fn in all_fns:
with open(fn, "r") as f:
data += [line.strip().split("\t") for line in f]
vocab = set()
################## parallel processing ##################
################## parallel processing ##################
################## parallel processing ##################
# Prepare arguments for the worker function
# tasks = [
# (
# item,
# root,
# sub_root,
# audio_folder,
# phn_folder,
# audio_ext,
# text_ext,
# phn_ext,
# text_tokenizer,
# )
# for item in data
# ]
# # Parallel processing with progress monitoring
# results = []
# with Pool(num_workers) as pool:
# for result in tqdm.tqdm(
# pool.imap_unordered(process_item_star, tasks),
# total=len(tasks),
# desc="Processing items",
# ):
# results.append(result)
# # read all manifest endswith .txt
# missing_text = [result["missing_text"] for result in results if not result["success"]]
# for result in results:
# if result['success']:
# vocab.update(result['cur_phn_set'])
################## parallel processing ##################
################## parallel processing ##################
################## parallel processing ##################
################## sequential processing ##################
################## sequential processing ##################
################## sequential processing ##################
missing_text = []
for item in tqdm.tqdm(data):
text_path = os.path.join(root, sub_root, audio_folder, item[0].replace(audio_ext, text_ext))
if not os.path.exists(text_path):
missing_text.append(text_path)
continue
try:
with open(text_path, "r") as f:
text = [line.strip() for line in f.readlines()]
text = " ".join(text)
except:
print(f"Error reading {text_path}")
continue
cur_phn_set = phonemize_and_save(text, os.path.join(root, sub_root, phn_folder, item[0].replace(audio_ext, phn_ext)), text_tokenizer)
vocab.update(cur_phn_set)
################## sequential processing ##################
################## sequential processing ##################
################## sequential processing ##################
# save the vocab
vocab = list(vocab)
# sort the vocab
vocab.sort()
with open(os.path.join(root, sub_root, "vocab.txt"), "w") as f:
f.write("\n".join(vocab))
# Collect missing text paths
print(f"Missing text files: {len(missing_text)}")
if missing_text:
print("Some missing files:", missing_text[:10]) # Print the first 10 missing files as an example
if __name__ == "__main__":
fire.Fire(main)
|