Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
app.py – Quranic Data Training Pipeline Endpoint for ZeroGPU Spaces | |
-------------------------------------------------------------------- | |
This script integrates a full Quranic data processing and training pipeline | |
into a Gradio interface endpoint. It is optimized for CPU/GPU-based training | |
on Hugging Face ZeroGPU (using the Gradio SDK) and uses chunked incremental | |
training, memory management, and gradient checkpointing to efficiently update | |
Google's Gemma-2-2b model with Quranic data. | |
Requirements: | |
- Transformers (==4.45.0) | |
- Gradio (>=5.12.0) | |
- PyTorch (==2.3.0) | |
- psutil (==5.9.5) | |
- Accelerate (>=0.26.0) | |
- Hugging Face PRO subscription with ZeroGPU enabled (ensure your HF token is set as an environment variable HF_TOKEN) | |
- Ubuntu CPU/Linux with access to ZeroGPU hardware via Spaces | |
- Input data files placed in the project root. | |
- Sufficient storage in "working_directory" | |
Author: [M-Saddam Hussain] | |
Date: March 2025 | |
Data References: [Tanzil.net, IslamSource, QuranicCorpus] | |
""" | |
import json | |
import logging | |
import os | |
import traceback | |
import gc | |
import time | |
import psutil | |
import math | |
import shutil | |
from datetime import datetime | |
from typing import Dict, List, Optional | |
from dataclasses import dataclass, asdict | |
import torch | |
# Limit PyTorch threads for CPU stability. | |
torch.set_num_threads(8) | |
from torch.utils.data import Dataset | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForCausalLM, | |
TrainingArguments, | |
Trainer, | |
DataCollatorForLanguageModeling, | |
__version__ as transformers_version | |
) | |
from threading import Lock | |
import gradio as gr | |
import spaces | |
# Check for minimum required Transformers version for custom model support | |
MIN_TRANSFORMERS_VERSION = "4.42.0" | |
if tuple(map(int, transformers_version.split("."))) < tuple(map(int, MIN_TRANSFORMERS_VERSION.split("."))): | |
logging.warning(f"Transformers version {transformers_version} detected. Please upgrade to at least {MIN_TRANSFORMERS_VERSION} for proper support of the 'gemma2' architecture.") | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('pipeline.log'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
def manage_memory(threshold_percent: int = 90, min_available_mb: int = 500, sleep_duration: int = 10): | |
""" | |
Check memory usage; if usage is high or available memory is low, | |
force garbage collection and sleep briefly. | |
""" | |
vm = psutil.virtual_memory() | |
used_percent = vm.percent | |
available_mb = vm.available / (1024 * 1024) | |
logger.info(f"Memory usage: {used_percent}% used, {available_mb:.2f} MB available") | |
if used_percent > threshold_percent or available_mb < min_available_mb: | |
logger.warning("High memory usage detected, forcing garbage collection and sleeping...") | |
gc.collect() | |
time.sleep(sleep_duration) | |
def manage_gpu_resources(sleep_duration: int = 5): | |
""" | |
Checks GPU memory and empties cache if necessary. | |
""" | |
if torch.cuda.is_available(): | |
allocated = torch.cuda.memory_allocated() / (1024 * 1024) | |
cached = torch.cuda.memory_reserved() / (1024 * 1024) | |
logger.info(f"GPU Memory Allocated: {allocated:.2f} MB, Reserved: {cached:.2f} MB") | |
torch.cuda.empty_cache() | |
time.sleep(sleep_duration) | |
def zip_checkpoint(checkpoint_dir: str) -> str: | |
""" | |
Zips the checkpoint directory and returns the path to the zip file. | |
""" | |
zip_file = checkpoint_dir + ".zip" | |
if os.path.exists(zip_file): | |
os.remove(zip_file) | |
shutil.make_archive(checkpoint_dir, 'zip', checkpoint_dir) | |
return os.path.basename(zip_file) | |
class WordAnalysis: | |
"""Structured representation of word-level analysis""" | |
arabic: str | |
translation: str | |
position: str | |
morphology: Dict | |
features: List[str] | |
root: str | |
location: str | |
metadata: Dict | |
class VerseData: | |
"""Structured representation of verse-level data""" | |
chapter: int | |
verse: int | |
arabic_text: str | |
translation: str | |
words: List[WordAnalysis] | |
metadata: Dict | |
class QuranicDataset(Dataset): | |
"""Custom dataset for Quranic text training.""" | |
def __init__(self, processed_data: List[Dict], tokenizer): | |
self.examples = [] | |
self.tokenizer = tokenizer | |
for verse_data in processed_data: | |
self.examples.extend(self._create_training_examples(verse_data)) | |
def _create_training_examples(self, verse_data: Dict) -> List[Dict]: | |
examples = [] | |
text_block = ( | |
f"[VERSE {verse_data['chapter']}:{verse_data['verse']}]\n" | |
f"Arabic: {verse_data['arabic_text']}\n" | |
f"Translation: {verse_data['translation']}\n" | |
"Morphological Analysis:\n" | |
) | |
for word in verse_data['words']: | |
text_block += ( | |
f"[WORD] {word['arabic']}\n" | |
f"Root: {word['root']}\n" | |
f"Features: {', '.join(word['features'])}\n" | |
) | |
examples.append(self._format_example(text_block)) | |
return examples | |
def _format_example(self, text: str) -> Dict: | |
encodings = self.tokenizer( | |
text, | |
truncation=True, | |
max_length=64, | |
padding="max_length", | |
return_tensors="pt" | |
) | |
# Explicitly move tensors to CPU | |
return { | |
"input_ids": encodings["input_ids"][0].cpu(), | |
"attention_mask": encodings["attention_mask"][0].cpu() | |
} | |
def __len__(self): | |
return len(self.examples) | |
def __getitem__(self, idx): | |
return self.examples[idx] | |
class QuranicDataProcessor: | |
"""Processes Quranic data into structured training examples.""" | |
def __init__(self, source_dir: str, output_dir: str): | |
self.source_dir = source_dir | |
self.output_dir = output_dir | |
self.morphological_data: Dict[str, Dict] = {} | |
self.word_by_word_data: Dict[str, List[str]] = {} | |
self.translation_data: Dict[str, str] = {} | |
self.processing_lock = Lock() | |
os.makedirs(output_dir, exist_ok=True) | |
os.makedirs(os.path.join(output_dir, 'json'), exist_ok=True) | |
os.makedirs(os.path.join(output_dir, 'txt'), exist_ok=True) | |
os.makedirs(os.path.join(output_dir, 'checkpoints'), exist_ok=True) | |
logger.info(f"Initialized processor with source dir: {source_dir}") | |
def load_source_files(self) -> bool: | |
"""Loads morphological, translation, and word-by-word data from project root.""" | |
try: | |
logger.info("Loading morphological data...") | |
morph_path = os.path.join(self.source_dir, 'quranic-corpus-morphology-0.4.txt') | |
with open(morph_path, 'r', encoding='utf-8') as f: | |
next(f) | |
for line in f: | |
if line.strip() and not line.startswith('#'): | |
parts = line.strip().split('\t') | |
if len(parts) >= 4: | |
location = parts[0].strip('()') | |
self.morphological_data[location] = { | |
'form': parts[1], | |
'tag': parts[2], | |
'features': parts[3] | |
} | |
logger.info(f"Loaded {len(self.morphological_data)} morphological entries") | |
logger.info("Loading translation data...") | |
trans_path = os.path.join(self.source_dir, 'en.sample.quran-maududi.txt') | |
with open(trans_path, 'r', encoding='utf-8') as f: | |
next(f) | |
for line in f: | |
if line.strip(): | |
parts = line.strip().split('|') | |
if len(parts) >= 3: | |
key = f"{parts[0]}:{parts[1]}" | |
self.translation_data[key] = parts[2].strip() | |
logger.info(f"Loaded {len(self.translation_data)} verse translations") | |
logger.info("Loading word-by-word data...") | |
word_path = os.path.join(self.source_dir, 'en.w4w.qurandev.txt') | |
with open(word_path, 'r', encoding='utf-8-sig') as f: | |
lines = [line.strip() for line in f if line.strip()] | |
sorted_keys = sorted(self.translation_data.keys(), key=lambda x: (int(x.split(':')[0]), int(x.split(':')[1]))) | |
if len(lines) != len(sorted_keys): | |
logger.warning("Mismatch between word-by-word file and translation data") | |
for i, verse_key in enumerate(sorted_keys): | |
if i < len(lines): | |
words = [w.strip() for w in lines[i].split('|') if w.strip()] | |
self.word_by_word_data[verse_key] = words | |
logger.info(f"Loaded word-by-word data for {len(self.word_by_word_data)} verses") | |
return True | |
except Exception as e: | |
logger.error(f"Error loading source files: {str(e)}") | |
logger.error(traceback.format_exc()) | |
return False | |
def process_verse(self, chapter: int, verse: int) -> Optional[VerseData]: | |
"""Processes a single verse into structured format.""" | |
try: | |
verse_ref = f"{chapter}:{verse}" | |
logger.info(f"Processing verse {verse_ref}") | |
translation = self.translation_data.get(verse_ref) | |
if not translation: | |
logger.warning(f"No translation for verse {verse_ref}") | |
return None | |
verse_word_list = self.word_by_word_data.get(verse_ref, []) | |
if not verse_word_list: | |
logger.warning(f"No word-by-word data for verse {verse_ref}") | |
return None | |
verse_words: List[WordAnalysis] = [] | |
arabic_text = "" | |
for pos in range(1, len(verse_word_list) + 1): | |
pattern = f"{chapter}:{verse}:{pos}:" | |
matching_entries = [data for loc, data in self.morphological_data.items() if loc.startswith(pattern)] | |
if not matching_entries: | |
logger.debug(f"No morphological data for {pattern}") | |
continue | |
combined_form = " ".join(entry['form'] for entry in matching_entries) | |
combined_features = [] | |
root = "" | |
for entry in matching_entries: | |
features = entry['features'].split('|') | |
combined_features.extend(features) | |
if not root: | |
for f in features: | |
if 'ROOT:' in f: | |
root = f.split('ROOT:')[1] | |
break | |
word_translation = verse_word_list[pos - 1] | |
word = WordAnalysis( | |
arabic=combined_form, | |
translation=word_translation, | |
position=str(pos), | |
morphology=matching_entries[0], | |
features=combined_features, | |
root=root, | |
location=f"{chapter}:{verse}:{pos}", | |
metadata={} | |
) | |
verse_words.append(word) | |
arabic_text += f" {combined_form}" | |
verse_data = VerseData( | |
chapter=chapter, | |
verse=verse, | |
arabic_text=arabic_text.strip(), | |
translation=translation, | |
words=verse_words, | |
metadata={ | |
"processed_timestamp": datetime.now().isoformat(), | |
"word_count": len(verse_words) | |
} | |
) | |
self._save_verse_data(verse_data) | |
return verse_data | |
except Exception as e: | |
logger.error(f"Error processing verse {chapter}:{verse}: {str(e)}") | |
logger.error(traceback.format_exc()) | |
return None | |
def _save_verse_data(self, verse_data: VerseData): | |
"""Saves processed verse data as JSON and TXT.""" | |
try: | |
verse_ref = f"{verse_data.chapter}:{verse_data.verse}" | |
json_path = os.path.join(self.output_dir, 'json', f'verse_{verse_ref.replace(":", "_")}.json') | |
with open(json_path, 'w', encoding='utf-8') as f: | |
json.dump(asdict(verse_data), f, ensure_ascii=False, indent=2) | |
txt_path = os.path.join(self.output_dir, 'txt', f'verse_{verse_ref.replace(":", "_")}.txt') | |
with open(txt_path, 'w', encoding='utf-8') as f: | |
f.write(f"=== Verse {verse_ref} ===\n\n") | |
f.write(f"Arabic Text:\n{verse_data.arabic_text}\n\n") | |
f.write(f"Translation:\n{verse_data.translation}\n\n") | |
f.write("Word Analysis:\n") | |
for i, word in enumerate(verse_data.words, 1): | |
f.write(f"\nWord {i}:\n") | |
f.write(f" Arabic: {word.arabic}\n") | |
f.write(f" Translation: {word.translation}\n") | |
f.write(f" Root: {word.root}\n") | |
f.write(" Features:\n") | |
for feature in word.features: | |
f.write(f" - {feature}\n") | |
f.write("\n") | |
logger.info(f"Saved verse data to {json_path} and {txt_path}") | |
except Exception as e: | |
logger.error(f"Error saving verse data: {str(e)}") | |
logger.error(traceback.format_exc()) | |
class QuranicModelTrainer: | |
"""Trains the Gemma-2-2b model on Quranic data using chunked incremental updates.""" | |
def __init__(self, | |
model_name: str = "google/gemma-2-2b", | |
processed_data_dir: str = "processed_data", | |
checkpoint_dir: str = "checkpoints"): | |
self.processed_data_dir = processed_data_dir | |
self.checkpoint_dir = checkpoint_dir | |
# Force CPU mode initially regardless of GPU availability. | |
self.device = "cpu" | |
logger.info("Forcing training on CPU initially.") | |
logger.info("Loading tokenizer and model...") | |
self.tokenizer = AutoTokenizer.from_pretrained( | |
model_name, | |
token=os.environ.get("HF_TOKEN"), | |
additional_special_tokens=["[VERSE]", "[WORD]", "[ROOT]", "[FEATURES]"], | |
trust_remote_code=True | |
) | |
if self.tokenizer.pad_token is None: | |
self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) | |
try: | |
self.model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
token=os.environ.get("HF_TOKEN"), | |
torch_dtype=torch.float32, | |
low_cpu_mem_usage=True, | |
trust_remote_code=True, | |
attn_implementation="eager" | |
) | |
except Exception as e: | |
logger.error(f"Error loading model directly: {str(e)}") | |
logger.info("Attempting to load with fallback parameters...") | |
from transformers import AutoConfig | |
config = AutoConfig.from_pretrained( | |
model_name, | |
token=os.environ.get("HF_TOKEN"), | |
trust_remote_code=True | |
) | |
self.model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
token=os.environ.get("HF_TOKEN"), | |
config=config, | |
torch_dtype=torch.float32, | |
low_cpu_mem_usage=True, | |
trust_remote_code=True, | |
revision="main", | |
attn_implementation="eager" | |
) | |
self.model.resize_token_embeddings(len(self.tokenizer)) | |
self.model.train() | |
self.model.config.use_cache = False | |
if hasattr(self.model, "gradient_checkpointing_enable"): | |
self.model.gradient_checkpointing_enable() | |
else: | |
logger.warning("Gradient checkpointing not available for this model") | |
# Use Accelerate for device management; force CPU initially. | |
from accelerate import Accelerator | |
self.accelerator = Accelerator(cpu=True) | |
self.model = self.accelerator.prepare(self.model) | |
def prepare_training_data(self, chapter_data: List[Dict]) -> Dataset: | |
"""Creates a QuranicDataset from processed chapter data.""" | |
return QuranicDataset(chapter_data, self.tokenizer) | |
def train_chunk(self, training_args: TrainingArguments, dataset: Dataset, chunk_output_dir: str) -> bool: | |
""" | |
Trains a single chunk. Returns True if successful. | |
""" | |
try: | |
data_collator = DataCollatorForLanguageModeling( | |
tokenizer=self.tokenizer, | |
mlm=False | |
) | |
trainer = Trainer( | |
model=self.model, | |
args=training_args, | |
train_dataset=dataset, | |
processing_class=self.tokenizer, # Updated per deprecation notice. | |
data_collator=data_collator | |
) | |
logger.info(f"Starting training on chunk at {chunk_output_dir} with device {self.device}") | |
trainer.train() | |
trainer.save_model(chunk_output_dir) | |
zip_filename = zip_checkpoint(chunk_output_dir) | |
base_url = os.environ.get("HF_SPACE_URL", "http://localhost") | |
download_link = f"{base_url}/file/{zip_filename}" | |
logger.info(f"Checkpoint download link: {download_link}") | |
with open(os.path.join(chunk_output_dir, "download_link.txt"), "w") as f: | |
f.write(download_link) | |
del trainer | |
gc.collect() | |
manage_memory() | |
manage_gpu_resources() | |
return True | |
except Exception as e: | |
logger.error(f"Error in training chunk at {chunk_output_dir}: {str(e)}") | |
logger.error(traceback.format_exc()) | |
return False | |
def poll_for_gpu(self, poll_interval: int = 10, max_attempts: int = 30) -> bool: | |
""" | |
Polls periodically to check if GPU is available. | |
Returns True if GPU becomes available within the attempts, otherwise False. | |
""" | |
attempts = 0 | |
while attempts < max_attempts: | |
if torch.cuda.is_available(): | |
manage_gpu_resources(1) | |
logger.info("GPU is now available for training.") | |
return True | |
time.sleep(poll_interval) | |
attempts += 1 | |
logger.info(f"Polling for GPU availability... attempt {attempts}/{max_attempts}") | |
return False | |
def train_chapter(self, | |
chapter_num: int, | |
processed_verses: List[Dict], | |
chunk_size: int = 5, # Reduced chunk size | |
num_train_epochs: int = 5, # Lower epochs for testing | |
per_device_train_batch_size: int = 1, | |
learning_rate: float = 3e-5, | |
weight_decay: float = 0.01, | |
gradient_accumulation_steps: int = 32) -> bool: | |
""" | |
Splits chapter data into chunks and trains incrementally. | |
The pipeline starts on CPU. After each chunk is trained on CPU, it polls for GPU. | |
If GPU becomes available, the model is moved to GPU for subsequent training. | |
In case GPU training fails, it falls back to CPU. | |
""" | |
total_examples = len(processed_verses) | |
total_chunks = math.ceil(total_examples / chunk_size) | |
logger.info(f"Chapter {chapter_num}: {total_examples} examples, {total_chunks} chunks.") | |
for chunk_index in range(total_chunks): | |
chunk_data = processed_verses[chunk_index * chunk_size: (chunk_index + 1) * chunk_size] | |
dataset = self.prepare_training_data(chunk_data) | |
chunk_output_dir = os.path.join(self.checkpoint_dir, f"chapter_{chapter_num}", f"chunk_{chunk_index}") | |
os.makedirs(chunk_output_dir, exist_ok=True) | |
training_args = TrainingArguments( | |
output_dir=chunk_output_dir, | |
overwrite_output_dir=True, | |
num_train_epochs=num_train_epochs, | |
per_device_train_batch_size=per_device_train_batch_size, | |
learning_rate=learning_rate, | |
weight_decay=weight_decay, | |
gradient_accumulation_steps=gradient_accumulation_steps, | |
fp16=False, | |
remove_unused_columns=False, | |
logging_steps=50, | |
report_to="none", | |
eval_strategy="no", | |
no_cuda=(self.device == "cpu"), # Force-disable CUDA when on CPU | |
dataloader_num_workers=0, | |
dataloader_pin_memory=False | |
) | |
logger.info(f"Training chunk {chunk_index+1}/{total_chunks} for Chapter {chapter_num} on device {self.device}...") | |
success = self.train_chunk(training_args, dataset, chunk_output_dir) | |
# If training fails on GPU, fall back to CPU. | |
if not success and self.device == "cuda": | |
logger.info(f"GPU error detected on chunk {chunk_index+1}. Shifting to CPU for this chunk...") | |
self.model.to("cpu") | |
self.device = "cpu" | |
training_args.no_cuda = True | |
training_args.optim = "adamw_torch" # Explicit optimizer for CPU | |
success = self.train_chunk(training_args, dataset, chunk_output_dir) | |
if not success: | |
logger.error(f"Training failed for Chapter {chapter_num} on chunk {chunk_index+1} even on CPU. Stopping chapter training.") | |
return False | |
# If running on CPU, poll for GPU availability after the chunk | |
if self.device == "cpu": | |
if self.poll_for_gpu(): | |
logger.info("GPU available; switching model to GPU for subsequent chunks.") | |
self.model.to("cuda") | |
self.device = "cuda" | |
logger.info(f"Completed training for Chapter {chapter_num}") | |
return True | |
class QuranicPipeline: | |
"""Integrates data processing and incremental model training for all chapters.""" | |
def __init__(self, | |
source_dir: str = ".", | |
working_dir: str = "working_directory", | |
start_chapter: int = 1, | |
end_chapter: int = 114): | |
self.source_dir = source_dir | |
self.working_dir = working_dir | |
self.start_chapter = start_chapter | |
self.end_chapter = end_chapter | |
self.setup_directories() | |
global logger | |
logger = logging.getLogger(__name__) | |
self.state = { | |
"last_processed_chapter": 0, | |
"last_trained_chapter": 0, | |
"current_state": "initialized", | |
"errors": [], | |
"start_time": datetime.now().isoformat() | |
} | |
self.load_state() | |
try: | |
logger.info("Initializing Quranic Data Processor...") | |
self.processor = QuranicDataProcessor( | |
source_dir=self.source_dir, | |
output_dir=os.path.join(self.working_dir, "processed_data") | |
) | |
logger.info("Initializing Quranic Model Trainer...") | |
self.trainer = QuranicModelTrainer( | |
model_name="google/gemma-2-2b", | |
processed_data_dir=os.path.join(self.working_dir, "processed_data"), | |
checkpoint_dir=os.path.join(self.working_dir, "checkpoints") | |
) | |
self.state["current_state"] = "ready" | |
self.save_state() | |
except Exception as e: | |
self.handle_error("Initialization failed", e) | |
raise | |
def setup_directories(self): | |
dirs = [ | |
self.working_dir, | |
os.path.join(self.working_dir, "processed_data"), | |
os.path.join(self.working_dir, "checkpoints"), | |
os.path.join(self.working_dir, "logs"), | |
os.path.join(self.working_dir, "state") | |
] | |
for d in dirs: | |
os.makedirs(d, exist_ok=True) | |
def load_state(self): | |
state_file = os.path.join(self.working_dir, "state", "pipeline_state.json") | |
if os.path.exists(state_file): | |
try: | |
with open(state_file, 'r') as f: | |
saved_state = json.load(f) | |
self.state.update(saved_state) | |
logger.info(f"Loaded previous state: Last processed chapter {self.state.get('last_processed_chapter')}, last trained chapter {self.state.get('last_trained_chapter')}") | |
except Exception as e: | |
logger.warning(f"Could not load previous state: {str(e)}") | |
def save_state(self): | |
state_file = os.path.join(self.working_dir, "state", "pipeline_state.json") | |
with open(state_file, 'w') as f: | |
json.dump(self.state, f, indent=2) | |
def handle_error(self, context: str, error: Exception): | |
error_detail = { | |
"timestamp": datetime.now().isoformat(), | |
"context": context, | |
"error": str(error), | |
"traceback": traceback.format_exc() | |
} | |
self.state.setdefault("errors", []).append(error_detail) | |
logger.error(f"{context}: {str(error)}") | |
self.save_state() | |
def run_pipeline(self): | |
"""Runs processing and training for chapters sequentially, then saves the final model.""" | |
logger.info("Starting pipeline execution") | |
try: | |
if not self.processor.load_source_files(): | |
raise Exception("Failed to load source files") | |
for chapter in range(self.start_chapter, self.end_chapter + 1): | |
logger.info(f"=== Processing Chapter {chapter} ===") | |
processed_chapter_data = [] | |
verse = 1 | |
while True: | |
verse_data = self.processor.process_verse(chapter, verse) | |
if verse_data is None: | |
break | |
processed_chapter_data.append(asdict(verse_data)) | |
verse += 1 | |
if processed_chapter_data: | |
success = self.trainer.train_chapter(chapter, processed_chapter_data) | |
if not success: | |
logger.error(f"Training failed for Chapter {chapter}. Stopping pipeline.") | |
break | |
self.state["last_trained_chapter"] = chapter | |
self.save_state() | |
else: | |
logger.warning(f"No processed data for Chapter {chapter}") | |
self.state["last_processed_chapter"] = chapter | |
self.save_state() | |
manage_memory() | |
manage_gpu_resources() | |
logger.info("Pipeline execution completed") | |
final_model_dir = os.path.join(self.working_dir, "final_model") | |
os.makedirs(final_model_dir, exist_ok=True) | |
self.trainer.model.save_pretrained(final_model_dir) | |
self.trainer.tokenizer.save_pretrained(final_model_dir) | |
logger.info(f"Final model saved to {final_model_dir}") | |
except Exception as e: | |
self.handle_error("Pipeline execution failed", e) | |
raise | |
# Request ZeroGPU hardware for the Space | |
def start_pipeline(): | |
try: | |
logger.info("Starting Quranic Training Pipeline with Gemma-2-2b") | |
logger.info(f"PyTorch version: {torch.__version__}") | |
logger.info(f"CUDA available: {torch.cuda.is_available()}") | |
if torch.cuda.is_available(): | |
logger.info(f"CUDA device count: {torch.cuda.device_count()}") | |
logger.info(f"CUDA device name: {torch.cuda.get_device_name(0)}") | |
if not os.environ.get("HF_TOKEN"): | |
logger.warning("HF_TOKEN environment variable not set. Model loading may fail.") | |
required_files = [ | |
'quranic-corpus-morphology-0.4.txt', | |
'en.sample.quran-maududi.txt', | |
'en.w4w.qurandev.txt' | |
] | |
missing_files = [f for f in required_files if not os.path.exists(f)] | |
if missing_files: | |
return f"Missing required data files: {', '.join(missing_files)}" | |
pipeline = QuranicPipeline( | |
source_dir=".", | |
working_dir="working_directory", | |
start_chapter=1, | |
end_chapter=114 | |
) | |
pipeline.run_pipeline() | |
return "Pipeline execution completed successfully." | |
except Exception as e: | |
error_msg = f"Pipeline execution failed: {str(e)}\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
return error_msg | |
iface = gr.Interface( | |
fn=start_pipeline, | |
inputs=[], | |
outputs=gr.Textbox(label="Pipeline Status", lines=10), | |
title="Quranic Training Pipeline for Gemma-2-2b", | |
description="""This pipeline fine-tunes Google's Gemma-2-2b model on Quranic data. | |
Click 'Submit' to trigger the Quranic data processing and training pipeline on ZeroGPU. | |
Requirements: | |
- Transformers (==4.45.0) | |
- Gradio (>=5.12.0) | |
- PyTorch (==2.3.0) | |
- psutil (==5.9.5) | |
- Accelerate (>=0.26.0) | |
The pipeline processes all 114 chapters of the Quran sequentially, with memory and GPU resource management optimizations. | |
Checkpoint download links are provided after every training chunk.""" | |
) | |
if __name__ == "__main__": | |
iface.launch() |