import streamlit as st import pandas as pd import numpy as np import torch from transformers import AutoTokenizer, AutoModelForCausalLM import matplotlib.pyplot as plt import time import json import re import os import asyncio from dotenv import load_dotenv from scipy.stats import skew, kurtosis, zscore import llama_cpp # ------------------------------- # Environment and Token Management # ------------------------------- # Load environment variables from .env file in local development load_dotenv() def get_hf_token(): """ Retrieves HF token from secrets or .env file. """ token = os.getenv("HF_TOKEN") # Prioritize environment variable # If not found, fallback to Streamlit secrets if not token: try: token = st.secrets["HF_TOKEN"] except (FileNotFoundError, KeyError): st.error("❌ HF_TOKEN not found. Add it to .env or secrets.toml.") return None return token # ------------------------------- # Model Loading and Management # ------------------------------- async def async_load(model_id: str): """ Dummy async function to initialize the event loop. """ await asyncio.sleep(0.1) @st.cache_resource def load_model(model_id: str, token: str, checkpoint_path: str = None): """ Loads and caches the Gemma model and tokenizer with the Hugging Face token. Args: model_id (str): The Hugging Face model ID. token (str): The authentication token. checkpoint_path (str): Optional path to a fine-tuned model checkpoint. Returns: tuple: tokenizer, model """ try: asyncio.run(async_load(model_id)) tokenizer = AutoTokenizer.from_pretrained(model_id, token=token) model = AutoModelForCausalLM.from_pretrained(model_id, token=token) # Load fine-tuned checkpoint if provided if checkpoint_path and os.path.exists(checkpoint_path): model.load_state_dict(torch.load(checkpoint_path, map_location=torch.device('cpu'))) model.eval() st.success("✅ Fine-tuned model loaded successfully!") return tokenizer, model except Exception as e: st.error(f"❌ Model loading failed: {e}") return None, None # ------------------------------- # Model Saving Function # ------------------------------- def save_model(model, model_name: str): """ Saves the fine-tuned model to the specified path. Args: model (torch.nn.Module): The PyTorch model instance. model_name (str): The file path to save the model. Returns: str: The path where the model is saved. """ try: # Ensure the models directory exists os.makedirs(os.path.dirname(model_name), exist_ok=True) # Save the model torch.save(model.state_dict(), model_name) st.success(f"✅ Model saved successfully at `{model_name}`") return model_name except Exception as e: st.error(f"❌ Failed to save model: {e}") return None # ------------------------------- # File Processing and Cleaning # ------------------------------- def preprocess_data(uploaded_file, file_extension): """ Reads the uploaded file and returns a processed version. Supports CSV, JSONL, and TXT. """ try: if file_extension == "csv": return pd.read_csv(uploaded_file) elif file_extension == "jsonl": data = [json.loads(line) for line in uploaded_file.readlines()] try: return pd.DataFrame(data) except Exception: st.warning("⚠️ Unable to convert JSONL to table. Previewing raw JSON.") return data elif file_extension == "txt": text_data = uploaded_file.read().decode("utf-8") return text_data.splitlines() except Exception as e: st.error(f"❌ Error processing file: {e}") return None def clean_text(text, lowercase=True, remove_punctuation=True): """ Cleans text data by applying basic normalization. """ if lowercase: text = text.lower() if remove_punctuation: text = re.sub(r'[^\w\s]', '', text) return text # ------------------------------- # Model Conversion and Quantization # ------------------------------- def quantize_model(model): """ Applies dynamic quantization. """ try: quantized_model = torch.quantization.quantize_dynamic( model, {torch.nn.Linear}, dtype=torch.qint8 ) st.success("✅ Model quantized successfully!") return quantized_model except Exception as e: st.error(f"❌ Quantization failed: {e}") return model def convert_to_torchscript(model, output_path="model_ts.pt"): """ Converts the model to TorchScript format. """ try: example_input = torch.randint(0, 100, (1, 10)) traced_model = torch.jit.trace(model, example_input) traced_model.save(output_path) return output_path except Exception as e: st.error(f"❌ TorchScript conversion failed: {e}") return None def convert_to_onnx(model, output_path="model.onnx"): """ Converts the model to ONNX format. """ try: dummy_input = torch.randint(0, 100, (1, 10)) torch.onnx.export(model, dummy_input, output_path, input_names=["input"], output_names=["output"]) return output_path except Exception as e: st.error(f"❌ ONNX conversion failed: {e}") return None # Convert to GGUF (for Llama.cpp) def convert_to_gguf(model, output_path="model.gguf"): llama_cpp.export_gguf(model, output_path) return output_path # Convert to TensorFlow SavedModel def convert_to_tf_saved_model(model, output_path="model_tf"): tf_model = tf.Module() # Export the PyTorch model to TensorFlow using ONNX as intermediary dummy_input = torch.randn(1, 3, 224, 224) torch.onnx.export(model, dummy_input, "temp_model.onnx") # Load ONNX model into TensorFlow import onnx from onnx_tf.backend import prepare onnx_model = onnx.load("temp_model.onnx") tf_rep = prepare(onnx_model) tf_rep.export_graph(output_path) return output_path # Convert to PyTorch format def convert_to_pytorch(model, output_path="model.pth"): torch.save(model.state_dict(), output_path) return output_path # ------------------------------- # Model Inference and Training # ------------------------------- def simulate_training(num_epochs): """ Simulates a training loop for demonstration. Yields current epoch, loss values, and accuracy values. """ loss_values = [] accuracy_values = [] for epoch in range(1, num_epochs + 1): loss = np.exp(-epoch) + np.random.random() * 0.1 acc = 0.5 + (epoch / num_epochs) * 0.5 + np.random.random() * 0.05 loss_values.append(loss) accuracy_values.append(acc) yield epoch, loss_values, accuracy_values time.sleep(1) def plot_training_metrics(epochs, loss_values, accuracy_values): """ Plots training loss and accuracy. """ fig, ax = plt.subplots(1, 2, figsize=(12, 4)) ax[0].plot(range(1, epochs+1), loss_values, marker='o', color='red') ax[0].set_title("Training Loss") ax[0].set_xlabel("Epoch") ax[0].set_ylabel("Loss") ax[1].plot(range(1, epochs+1), accuracy_values, marker='o', color='green') ax[1].set_title("Training Accuracy") ax[1].set_xlabel("Epoch") ax[1].set_ylabel("Accuracy") return fig def generate_response(prompt, model, tokenizer, max_length=200): """ Generates a response using the fine-tuned model. """ try: inputs = tokenizer(prompt, return_tensors="pt").input_ids with torch.no_grad(): outputs = model.generate(inputs, max_length=max_length, num_return_sequences=1, temperature=0.7) return tokenizer.decode(outputs[0], skip_special_tokens=True) except Exception as e: st.error(f"❌ Response generation failed: {e}") return "" # ------------------------------- # Model Loading for Inference # ------------------------------- def load_finetuned_model(model, checkpoint_path="fine_tuned_model.pt"): """ Loads a fine-tuned model from a checkpoint. """ if os.path.exists(checkpoint_path): model.load_state_dict(torch.load(checkpoint_path, map_location=torch.device('cpu'))) model.eval() st.success("✅ Fine-tuned model loaded successfully!") else: st.error(f"❌ Checkpoint not found: {checkpoint_path}") return model import pandas as pd import os import pyarrow as pa import numpy as np from scipy.stats import zscore, kurtosis, skew # ====================================== # Dataset Operations # ====================================== def load_dataset(path: str) -> pd.DataFrame: """Load dataset from CSV with error handling.""" try: df = pd.read_csv(path) return make_arrow_compatible(df) except Exception as e: print(f"Error loading dataset: {e}") return pd.DataFrame() def save_dataset(df: pd.DataFrame, path: str): """Save dataset to CSV with error handling.""" try: df.to_csv(path, index=False) except Exception as e: print(f"Error saving dataset: {e}") def list_datasets(directory: str = "datasets") -> list: """List all available datasets in the directory.""" try: return [f for f in os.listdir(directory) if f.endswith(('.csv', '.json', '.xlsx'))] except Exception as e: print(f"Error listing datasets: {e}") return [] # ====================================== # Data Cleaning Functions # ====================================== def clean_dataset( df: pd.DataFrame, remove_duplicates: bool = True, fill_missing: bool = False, fill_value: str = "0", trim_spaces: bool = True ) -> pd.DataFrame: """ Clean the dataset with multiple operations: - Remove duplicates - Fill missing values - Trim spaces - Remove empty columns and rows - Auto-cast date columns """ # Remove duplicates if remove_duplicates: df = df.drop_duplicates() # Fill missing values if fill_missing: df = df.fillna(fill_value) # Trim spaces if trim_spaces: df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x) # Remove empty columns & rows df = df.dropna(how="all", axis=1) df = df.dropna(how="all", axis=0) # Auto-cast date columns for col in df.columns: try: df[col] = pd.to_datetime(df[col]) except (ValueError, TypeError): pass return make_arrow_compatible(df) # -------------------------------------- # Dataset Quality Score # -------------------------------------- def compute_dataset_score(df): """Compute dataset quality score.""" if df.empty: return 0.0 total_cells = np.prod(df.shape) missing_cells = df.isnull().sum().sum() missing_ratio = missing_cells / total_cells duplicate_ratio = 1 - (df.drop_duplicates().shape[0] / df.shape[0]) numeric_cols = df.select_dtypes(include=["number"]).columns if len(numeric_cols) > 0: skew_vals = df[numeric_cols].apply(lambda x: np.abs(skew(x.dropna())), axis=0) kurt_vals = df[numeric_cols].apply(lambda x: np.abs(kurtosis(x.dropna())), axis=0) numeric_score = 1 - (skew_vals.mean() + kurt_vals.mean()) / 10 else: numeric_score = 1 score = (1 - missing_ratio) * (1 - duplicate_ratio) * numeric_score * 100 return round(score, 2) # -------------------------------------- # Outlier Detection # -------------------------------------- def detect_outliers(df, threshold=3): """Detect outliers in numeric columns using Z-score.""" numeric_cols = df.select_dtypes(include=["number"]).columns outliers = {} for col in numeric_cols: z_scores = np.abs(zscore(df[col].dropna())) outliers[col] = np.sum(z_scores > threshold) return outliers # -------------------------------------- # Detect Inconsistent Types # -------------------------------------- def detect_inconsistent_types(df): """Detect inconsistent data types across columns.""" inconsistent_cols = {} for col in df.columns: if df[col].apply(type).nunique() > 1: inconsistent_cols[col] = df[col].apply(type).value_counts().to_dict() return inconsistent_cols # ====================================== # Data Transformations # ====================================== def apply_transformation(df: pd.DataFrame, col: str, transform: str) -> pd.DataFrame: """ Apply transformations to a specified column: - Log Transformation - Min-Max Normalization - Z-score Standardization """ if col not in df.columns: raise KeyError(f"Column '{col}' not found in dataset") if transform == "Log": df[col] = np.log1p(df[col].replace(0, np.nan)).fillna(0) elif transform == "Normalize": df[col] = (df[col] - df[col].min()) / (df[col].max() - df[col].min()) elif transform == "Standardize": df[col] = (df[col] - df[col].mean()) / df[col].std() return make_arrow_compatible(df) # ====================================== # Normalization & Standardization # ====================================== def normalize_column(df: pd.DataFrame, col: str) -> pd.DataFrame: """Normalize column (Min-Max Scaling).""" df[col] = (df[col] - df[col].min()) / (df[col].max() - df[col].min()) return df def standardize_column(df: pd.DataFrame, col: str) -> pd.DataFrame: """Standardize column (Z-score).""" df[col] = (df[col] - df[col].mean()) / df[col].std() return df # ====================================== # Arrow Compatibility & Fixes # ====================================== def make_arrow_compatible(df: pd.DataFrame) -> pd.DataFrame: """ Ensure dataset compatibility with Streamlit Arrow serialization. """ for col in df.columns: if df[col].dtype == object: try: df[col] = df[col].astype(str) except Exception as e: print(f"Could not convert column {col}: {e}") return df def fix_arrow_incompatibility(df: pd.DataFrame) -> pd.DataFrame: """ Fix Arrow incompatibility by converting mixed types to `str`. """ for col in df.columns: try: pa.Table.from_pandas(df[[col]]) except pa.lib.ArrowInvalid: print(f"Arrow compatibility issue in column: {col}") df[col] = df[col].astype(str) return df