import streamlit as st import json import pandas as pd import os st.set_page_config(page_title="Dataset Builder and Editor", layout="wide") st.title("๐Ÿ“š JSONL Dataset Builder and Editor") TMP_DIR = "temp" TMP_FILE = os.path.join(TMP_DIR, "session_dataset.jsonl") META_FILE = os.path.join(TMP_DIR, "metadata.json") # --- Helper: ensure tmp dir exists --- os.makedirs(TMP_DIR, exist_ok=True) # --- Helper: get all unique fields from records --- def get_all_fields(data): all_keys = set() for record in data: all_keys.update(record.keys()) return sorted(all_keys) if st.button("๐Ÿ”„ Reset Session"): st.session_state.clear() # Clear session state if os.path.exists(TMP_FILE): os.remove(TMP_FILE) # Remove temporary file if os.path.exists(META_FILE): os.remove(META_FILE) # Remove metadata file st.success("๐Ÿงน Session has been reset. Starting fresh!") st.rerun() # Rerun the app to reset everything # --- Load session data from temp file if exists --- if "data" not in st.session_state: if os.path.exists(TMP_FILE): with open(TMP_FILE, "r", encoding="utf-8") as f: st.session_state.data = [json.loads(line) for line in f] else: st.session_state.data = [] if os.path.exists(META_FILE): with open(META_FILE, "r", encoding="utf-8") as f: metadata = json.load(f) st.session_state.all_fields = [ f["name"] for f in metadata.get("fields", []) ] st.session_state.field_types = { f["name"]: f.get("type", "text") for f in metadata.get("fields", []) } else: st.session_state.all_fields = get_all_fields(st.session_state.data) st.session_state.field_types = { field: "text" for field in st.session_state.all_fields } # --- Upload JSONL File --- uploaded_file = st.file_uploader("Upload a JSONL file", type=["jsonl"]) if uploaded_file: # Read the uploaded content content = uploaded_file.read().decode("utf-8") st.session_state.data = [json.loads(line) for line in content.strip().splitlines()] # Update all fields and field types st.session_state.all_fields = get_all_fields(st.session_state.data) # Update field_types based on metadata if it exists, otherwise set them to 'text' if "field_types" in st.session_state: for field in st.session_state.all_fields: if field not in st.session_state.field_types: st.session_state.field_types[field] = ( "textarea" # default type for new fields ) else: # Initialize field_types for all fields with "text" if no metadata exists st.session_state.field_types = { field: "text" for field in st.session_state.all_fields } # Save the updated data and metadata with open(TMP_FILE, "w", encoding="utf-8") as f: for item in st.session_state.data: f.write(json.dumps(item, ensure_ascii=False) + "\n") # Save metadata (field types) metadata = { "fields": [ {"name": field, "type": st.session_state.field_types.get(field, "text")} for field in st.session_state.all_fields ] } with open(META_FILE, "w", encoding="utf-8") as f: json.dump(metadata, f, indent=2, ensure_ascii=False) # Success message st.success( f"Loaded {len(st.session_state.data)} records with fields: {st.session_state.all_fields}" ) # If still no data, use safe fallback fields if not st.session_state.data and not st.session_state.all_fields: st.session_state.all_fields = [] # --- Edit Existing Records --- if st.session_state.data: st.markdown("### โœ๏ธ Edit Records") df = pd.DataFrame(st.session_state.data) df = df.reindex(columns=st.session_state.all_fields) # Convert all columns to string for safe editing for field in st.session_state.all_fields: df[field] = df[field].astype(str) # Ensure field_types is initialized if "field_types" not in st.session_state: st.session_state.field_types = { field: "text" for field in st.session_state.all_fields } # Build dynamic column config based on metadata column_configs = { field: ( st.column_config.TextColumn(label=field, width="large") if st.session_state.field_types.get(field) == "textarea" else st.column_config.TextColumn(label=field) ) for field in st.session_state.all_fields } edited_df = st.data_editor( df, use_container_width=True, num_rows="dynamic", column_config=column_configs, ) if not edited_df.equals(df): st.session_state.data = edited_df.fillna("").to_dict(orient="records") with open(TMP_FILE, "w", encoding="utf-8") as f: for item in st.session_state.data: f.write(json.dumps(item, ensure_ascii=False) + "\n") st.toast("โœ… Auto-saved!", icon="๐Ÿ’พ") st.rerun() # --- Add New Entry --- if st.session_state.all_fields: st.markdown("### โž• Add New Entry") with st.form("new_entry_form"): new_record = {} # Clear form fields if reset flag is set if "reset_form" in st.session_state and st.session_state.reset_form: # Clear the session state for each input field for field in st.session_state.all_fields: st.session_state[f"input_{field}"] = "" st.session_state.reset_form = ( False # Reset the reset flag after clearing fields ) # Collect new record input from the user for field in st.session_state.all_fields: input_type = st.session_state.field_types.get(field, "text") if input_type == "textarea": new_record[field] = st.text_area(f"{field}", key=f"input_{field}") else: new_record[field] = st.text_input(f"{field}", key=f"input_{field}") submitted = st.form_submit_button("Add Entry") if submitted: # Append the new record to session data st.session_state.data.append(new_record) # Save the updated data to a temp file with open(TMP_FILE, "w", encoding="utf-8") as f: for item in st.session_state.data: f.write(json.dumps(item, ensure_ascii=False) + "\n") # Set the reset flag to clear the form next time st.session_state.reset_form = True # Show success message and re-run to clear form fields st.success("โœ… New entry added!") st.rerun() # --- Add New Field --- with st.expander("โž• Add New Field"): new_field = st.text_input("Field name", key="new_field_name") new_type = st.selectbox("Field type", ["text", "textarea"], key="new_field_type") if st.button("Add Field"): if new_field and new_field not in st.session_state.all_fields: st.session_state.all_fields.append(new_field) st.session_state.field_types[new_field] = new_type fields_metadata = [ {"name": f, "type": st.session_state.field_types[f]} for f in st.session_state.all_fields ] with open(META_FILE, "w", encoding="utf-8") as f: json.dump({"fields": fields_metadata}, f, indent=2, ensure_ascii=False) st.success(f"โœ… Field '{new_field}' added!") st.rerun() # --- Download Dataset Button --- st.markdown("### ๐Ÿ“ค Download Dataset") # Read the session data as a JSONL string dataset_content = "\n".join( [json.dumps(row, ensure_ascii=False) for row in st.session_state.data] ) if os.path.exists(TMP_FILE): # Provide the download button st.download_button( label="โฌ‡๏ธ Download Dataset as JSONL", data=dataset_content, file_name="session_dataset.jsonl", mime="application/json", ) else: st.warning("Dataset not yet generated!")