import gradio as gr import torch from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig # Removed BitsAndBytesConfig import os import gc import psutil import time # --- Configuration --- # Your HF Hub model # This model seems to be 7B parameters (based on typical naming for Llama/Mistral fine-tunes) # A 7B model in float32 is approx 28GB. Loading this on 18GB RAM WILL require disk offloading. MODEL_DIR = "ErenalpCet/E-Model-Reasoning-Coder-V1" IM_START = "<|im_start|>" IM_END = "<|im_end|>" ASSISTANT_TAG = f"{IM_START}assistant\n" def load_model(): """Loads the fine-tuned model and tokenizer for CPU using offloading.""" print(f"Loading model from: {MODEL_DIR}") # Force garbage collection before loading model gc.collect() # No need for cuda empty cache as we are on CPU # --- Configuration (Removed Quantization config) --- # BitsAndBytes requires CUDA for its quantization methods (load_in_4bit, load_in_8bit) # Since you only have CPU, we remove the BitsAndBytesConfig. # We will rely on device_map="cpu" and offloading to handle memory. # --- Loading --- try: # Load just the tokenizer first tokenizer = AutoTokenizer.from_pretrained(MODEL_DIR) # Check memory before loading model process = psutil.Process(os.getpid()) print(f"Memory usage before model load: {process.memory_info().rss / (1024 * 1024):.2f} MB") start_time = time.time() print("Starting model loading...") # Load model using device_map="cpu" and offloading for memory management # This will load the model weights, likely in float32 (approx 28GB for 7B), # splitting parts of it between RAM and disk ('offload_folder'). model = AutoModelForCausalLM.from_pretrained( MODEL_DIR, trust_remote_code=True, # We are on CPU, so we don't need torch_dtype=torch.float16/bfloat16 # Float32 is the default and standard for CPU compute. # Quantization via BitsAndBytesConfig is removed. low_cpu_mem_usage=True, # Very important for large models on CPU offload_folder="offload_folder", # Required with low_cpu_mem_usage if model > RAM offload_state_dict=True, # Offload state dict during loading device_map="cpu" # Explicitly set to CPU # You can also use device_map="auto" with max_memory={0: "18GB", "cpu": "auto"} # but device_map="cpu" is simpler if you know you only have CPU. # Let's stick to the explicit "cpu" as requested. ) end_time = time.time() print(f"Model loading took {end_time - start_time:.2f} seconds.") # Model is already on CPU due to device_map="cpu" # model = model.to("cpu") # Redundant with device_map="cpu" # Set model to evaluation mode model.eval() # Add special tokens if needed original_vocab_size = len(tokenizer) special_tokens = [IM_START, IM_END] added_tokens_dict = {"additional_special_tokens": []} for token in special_tokens: # Check if token is already in the main vocab or added special tokens # Use tokenizer.convert_tokens_to_ids and check against unknown token ID token_id = tokenizer.convert_tokens_to_ids(token) if token_id is None or token_id == tokenizer.unk_token_id: added_tokens_dict["additional_special_tokens"].append(token) if added_tokens_dict["additional_special_tokens"]: num_added = tokenizer.add_special_tokens(added_tokens_dict) print(f"Added {num_added} special tokens: {added_tokens_dict['additional_special_tokens']}") # Resize token embeddings ONLY if tokens were added and new size is larger if len(tokenizer) > original_vocab_size: print(f"Resizing model embeddings from {original_vocab_size} to {len(tokenizer)}") model.resize_token_embeddings(len(tokenizer)) else: print("Vocab size unchanged, no need to resize embeddings.") else: print("Special tokens already in tokenizer vocab.") # Verify stop token im_end_id = tokenizer.convert_tokens_to_ids(IM_END) if im_end_id is None or im_end_id == tokenizer.unk_token_id: print(f"Warning: '{IM_END}' not recognized by tokenizer. Using EOS token ({tokenizer.eos_token}) as stop sequence (ID: {tokenizer.eos_token_id}).") stop_token_id = tokenizer.eos_token_id else: print(f"Using '{IM_END}' (ID: {im_end_id}) as stop token.") stop_token_id = im_end_id # Ensure the stop token ID is also in the generation parameters if using multiple # (though eos_token_id is primary) # Print memory usage after loading process = psutil.Process(os.getpid()) print(f"Memory usage after model load: {process.memory_info().rss / (1024 * 1024):.2f} MB") print(f"Number of model parameters: {model.num_parameters():,}") # Print parameter count return model, tokenizer, stop_token_id except Exception as e: print(f"Error loading model: {e}") print("Attempting to print traceback:") import traceback traceback.print_exc() print("-" * 20) print("Troubleshooting Steps (for CPU loading without BitsAndBytes):") print("1. Verify the model path/name is correct on Hugging Face Hub.") print(f"2. Ensure you have sufficient *disk space* in the '{os.path.abspath('offload_folder')}' for offloading (likely tens of GB needed).") print("3. Ensure you have the 'accelerate' library installed (`pip install accelerate`). This is crucial for low_cpu_mem_usage and offloading.") print("4. The model might still be too large even with offloading if total system memory (RAM + swap) is insufficient or disk I/O is a bottleneck during loading.") print("5. Check system logs for any out-of-memory errors or disk full errors.") print("6. Consider if your system has sufficient swap space configured, as offloading might rely on it.") return None, None, None # Initialize model as None and load lazily model, tokenizer, stop_token_id = None, None, None # --- Rest of your code (generate_response, Gradio interface, __main__ block) --- def lazy_load_model(): """Lazily load model only when needed""" global model, tokenizer, stop_token_id if model is None: print("Model not loaded, attempting to load now...") model, tokenizer, stop_token_id = load_model() if model is None: print("Model loading failed.") else: print("Model loaded successfully.") return model is not None def generate_response( user_input, history, max_tokens=1024, temperature=0.7, top_p=0.95, ): # Lazily load model on first request if not lazy_load_model(): return "Model loading failed. Check server logs for details." # Build conversation history context = [] for user_msg, bot_msg in history: context.append(f"{IM_START}user\n{user_msg}\n{IM_END}") if bot_msg: # Ensure bot_msg is not None or empty before adding if bot_msg.strip(): context.append(f"{ASSISTANT_TAG}{bot_msg}\n{IM_END}") # Add current input context.append(f"{IM_START}user\n{user_input}\n{IM_END}") context.append(ASSISTANT_TAG) # Tokenize with efficient settings input_text = "\n".join(context) # Get max length from model config if available, default otherwise # Some models might have a different config attribute for context length max_model_input_length = getattr(model.config, "max_position_embeddings", None) if max_model_input_length is None: # Fallback for models without max_position_embeddings (e.g., some Llama configs use hidden_size or similar indirectly) # A common safe value for many 7B models is 4096 or 8192 max_model_input_length = 4096 # Or check model card/config.json print(f"Warning: model.config.max_position_embeddings not found. Using default max_length: {max_model_input_length}") # Ensure we don't truncate the input so much that there's no space for output # A better calculation might consider the token size of the prompt vs total length # A safer approach is to limit input length if it gets too long, # but let's keep the current logic which reserves space for max_tokens output. # However, the truncation setting itself `max_length` applies to the *input*. # If input exceeds max_length, it's truncated. Need enough length for prompt + max_tokens effective_max_length = max_model_input_length if effective_max_length - max_tokens < len(tokenizer.encode(input_text)): # If the required input length *plus* desired output length exceeds model capacity, # something is wrong or the input is too long. Let's just truncate input firmly. # A common pattern is to limit conversation history or simply let truncation handle it. # Let's rely on truncation below. pass # No specific action here, tokenizer(..., max_length=...) handles truncation inputs = tokenizer( input_text, return_tensors="pt", truncation=True, # Explicitly enable truncation if input is too long max_length=effective_max_length, # Truncate input if it exceeds model capacity padding=False, # Avoid unnecessary padding ).to(model.device) # Use model.device - will be 'cpu' due to device_map # Generate response with optimized settings for CPU with torch.no_grad(): try: # Use more memory-efficient generation settings for CPU if possible # num_beams > 1 increases memory significantly, stick to 1 # Early stopping can save computation outputs = model.generate( **inputs, max_new_tokens=max_tokens, temperature=temperature, top_p=top_p, do_sample=True, eos_token_id=stop_token_id, # Use the verified stop_token_id # Use pad_token_id if available, otherwise eos_token_id is common fallback pad_token_id=tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id, repetition_penalty=1.2, use_cache=True, # Enable KV caching (still beneficial on CPU) num_beams=1, # Keep beam search off for lower memory # Add stop sequences if needed for multi-token stops not covered by eos_token_id # For chat, frequently [IM_END, f"{IM_START}user"] are good stops # You can provide list of token IDs or list of strings # Example using strings (requires generate to handle them, which it usually does) # stopping_criteria=transformers.StoppingCriteriaList([transformers.TextGenerationStopCriteria(...)]) # More complex # Simple approach: rely on eos_token_id and post-processing ) except Exception as e: print(f"Error during generation: {e}") # Fallback to simpler generation settings if needed try: print("Attempting simplified generation (greedy decoding)...") outputs = model.generate( **inputs, max_new_tokens=max_tokens, do_sample=False, # Force greedy decoding - less memory, potentially faster use_cache=True, eos_token_id=stop_token_id, pad_token_id=tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id, num_beams=1, ) except Exception as e2: print(f"Simplified generation also failed: {e2}") return "An error occurred during response generation." # Force garbage collection after generation # This can be helpful to free up memory used during generation del inputs # Delete input tensors # Move output tensors to CPU if they aren't already (should be with device_map="cpu") if 'outputs' in locals() and isinstance(outputs, torch.Tensor): outputs = outputs.cpu() # Don't delete outputs yet, we need it for decoding gc.collect() # No need for cuda empty cache on CPU # Decode and clean response # Ensure outputs is a tensor before decoding if isinstance(outputs, torch.Tensor) and outputs.ndim == 2 and outputs.shape[0] > 0: # Decode the generated tokens, stopping at the end of the sequence if it includes the stop token # Decode the whole sequence first, then parse full_text = tokenizer.decode(outputs[0], skip_special_tokens=False) else: print("Warning: Generation output was not a tensor or was empty. Cannot decode.") print(f"Output type: {type(outputs)}") print(f"Output value: {outputs}") return "Error: Failed to generate valid output." # --- Parsing Logic (Keep your existing logic, it looks reasonable) --- # Find the start of the assistant's response tag response_start_marker = f"{ASSISTANT_TAG}" # Look for the exact tag response_start_idx = full_text.rfind(response_start_marker) response = "Error: Could not parse response." # Default error message if response_start_idx != -1: response_start = response_start_idx + len(response_start_marker) # Look for stop sequences *after* the assistant tag stop_sequences = [IM_END, f"{IM_START}user"] # Common chat stops earliest_stop = len(full_text) # Default end is the end of the generated text for stop_seq in stop_sequences: idx = full_text.find(stop_seq, response_start) if idx != -1: if earliest_stop == len(full_text) or idx < earliest_stop: earliest_stop = idx response = full_text[response_start:earliest_stop].strip() # Clean up potential trailing tokens if parsing wasn't perfect response = response.replace(IM_START, "").replace(IM_END, "").replace("user\n", "").replace("assistant\n", "").strip() else: # Fallback if the assistant tag wasn't found in the output print(f"Warning: Assistant tag '{ASSISTANT_TAG}' not found in generated text. Generated text:") print(full_text) # Attempt a simpler cleanup, maybe just removing special tokens response = full_text.replace(IM_START, "").replace(IM_END, "").replace("user\n", "").replace("assistant\n", "").strip() if not response: response = "Warning: Could not find assistant response tag in generated text." return response # Gradio interface with lazy loading demo = gr.ChatInterface( generate_response, additional_inputs=[ gr.Slider(1, 2048, 1024, step=1, label="Max Tokens"), gr.Slider(0.1, 2.0, 0.7, step=0.1, label="Temperature"), gr.Slider(0.1, 1.0, 0.95, step=0.05, label="Top-p") ], title="Code Reasoning Assistant (CPU)", description="Fine-tuned coding assistant specialized in code reasoning and generation (Running on CPU)", theme="soft" ) if __name__ == "__main__": # Create offload folder if it doesn't exist OFFLOAD_DIR = "offload_folder" os.makedirs(OFFLOAD_DIR, exist_ok=True) print(f"Offload folder '{os.path.abspath(OFFLOAD_DIR)}' created or already exists.") # Use lazy loading - only load model when first query arrives print("Starting server with lazy model loading...") print(f"Ensure '{os.path.abspath(OFFLOAD_DIR)}' has sufficient disk space for model offloading.") print("Performance will be limited by CPU and disk speed.") print("Monitoring memory (RAM + swap) and disk usage during the first query is highly recommended.") demo.queue().launch()