Spaces:
Paused
Paused
looker01202
granite 3.3 working locally using llama python libraries to load GGUF model version
5dce664
import os | |
import json # For debug printing | |
import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
try: | |
# Try importing llama-cpp-python for GGUF support | |
from llama_cpp import Llama | |
LLAMA_CPP_AVAILABLE = True | |
except ImportError: | |
print("β οΈ WARNING: llama-cpp-python library not found. Local GGUF execution will not be available.") | |
print(" To enable local GGUF, run: pip install llama-cpp-python") | |
Llama = None # Define as None if import fails | |
LLAMA_CPP_AVAILABLE = False | |
# --- Configuration --- | |
# HF Repo ID for the standard model (used in Space and for tokenizer) | |
HF_CHECKPOINT = "ibm-granite/granite-3.3-2b-instruct" | |
# GGUF Settings for Local Execution (Using llama-cpp-python) | |
GGUF_REPO_ID = "ibm-granite/granite-3.3-2b-instruct-gguf" # Official IBM v3.3 GGUF repo | |
GGUF_FILENAME = "granite-3.3-2b-instruct-Q2_K.gguf" # Smallest Q2_K quantization | |
# GGUF_FILENAME = "granite-3.3-2b-instruct-Q4_K_M.gguf" # Fallback if Q2_K fails | |
# Template Filename (Use v3.3 template for both paths now) | |
TEMPLATE_FILENAME = "granite3.3_2b_chat_template.jinja" | |
# --- End Configuration --- | |
# Detect Space environment | |
env = os.environ | |
is_space = env.get("SPACE_ID") is not None | |
print(f"RUNNING IN SPACE? {is_space}") | |
# Device setup (primarily for HF model in Space) | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"Using device: {device}") | |
# --- Load model function --- | |
def load_model(): | |
primary_checkpoint = HF_CHECKPOINT | |
model_name_display = primary_checkpoint | |
# --- Function to load and apply template --- | |
def apply_template_from_file(tokenizer, template_filename): | |
applied_template = False | |
try: | |
print(f"Attempting to load chat template from: {template_filename}") | |
script_dir = os.path.dirname(os.path.abspath(__file__)) | |
template_path = os.path.join(script_dir, template_filename) | |
if not os.path.exists(template_path): | |
print(f"β οΈ WARNING: Template file not found at: {template_path}") | |
return False | |
with open(template_path, "r", encoding="utf-8") as f: | |
custom_chat_template_content = f.read() | |
if hasattr(tokenizer, 'chat_template'): | |
tokenizer.chat_template = custom_chat_template_content | |
applied_template = True | |
print(f"β Loaded and applied chat template from: {template_filename}") | |
else: | |
print(f"β οΈ WARNING: Tokenizer object does not support setting 'chat_template'.") | |
except Exception as e: | |
print(f"β ERROR reading or applying template file '{template_filename}': {e}") | |
if not applied_template: | |
print("Falling back to tokenizer's default built-in template (if any).") | |
print("--- Final Chat Template Being Used (by HF Tokenizer) ---") | |
print(tokenizer.chat_template if hasattr(tokenizer, 'chat_template') and tokenizer.chat_template else "No template found or template empty/default.") | |
print("-------------------------------------------------------") | |
return applied_template | |
# --- End function --- | |
# --- Load Tokenizer (Common for both paths now) --- | |
try: | |
print(f"Loading HF Tokenizer: {primary_checkpoint}") | |
tokenizer = AutoTokenizer.from_pretrained(primary_checkpoint, use_fast=True) | |
print("β Loaded HF Tokenizer.") | |
# Apply the v3.3 template UNCONDITIONALLY | |
apply_template_from_file(tokenizer, TEMPLATE_FILENAME) | |
except Exception as e: | |
print(f"β Failed to load tokenizer {primary_checkpoint}: {e}") | |
raise RuntimeError("Failed to load the necessary tokenizer.") from e | |
# --- End Tokenizer Loading --- | |
if is_space: | |
print(f"π Running in Space. Loading HF model: {primary_checkpoint}") | |
try: | |
# Load HF Model for Space | |
model = AutoModelForCausalLM.from_pretrained( | |
primary_checkpoint, | |
torch_dtype=torch.float16, | |
low_cpu_mem_usage=True, | |
device_map="auto" | |
) | |
print(f"β Loaded HF {primary_checkpoint}") | |
model_name_display = primary_checkpoint | |
# Tokenizer already loaded and template applied | |
return tokenizer, model, model_name_display | |
except Exception as e: | |
print(f"β HF Primary load failed: {e}") | |
raise RuntimeError(f"Failed to load primary HF model {primary_checkpoint} in Space.") from e | |
else: # Running Locally - Load GGUF using llama-cpp-python | |
print(f"π» Running Locally. Attempting GGUF setup via llama-cpp-python.") | |
if not LLAMA_CPP_AVAILABLE: | |
raise RuntimeError("llama-cpp-python library is required but not installed/found.") | |
print(f" GGUF Repo ID: {GGUF_REPO_ID}") | |
print(f" GGUF Filename: {GGUF_FILENAME}") | |
try: | |
# Load GGUF Model using llama-cpp-python | |
print(f"Attempting to load GGUF model using Llama.from_pretrained...") | |
model = Llama.from_pretrained( | |
repo_id=GGUF_REPO_ID, | |
filename=GGUF_FILENAME, | |
n_gpu_layers=0, # Force CPU execution | |
verbose=True, | |
n_ctx=4096 # Increased context window | |
) | |
print(f"β Loaded GGUF model {GGUF_FILENAME} using llama-cpp-python") | |
model_name_display = f"GGUF (llama-cpp): {GGUF_FILENAME}" | |
# Return tokenizer loaded earlier and the Llama model object | |
return tokenizer, model, model_name_display | |
except Exception as e: | |
print(f"β Local GGUF load failed using llama-cpp-python: {e}") | |
if "Not Found" in str(e) or "404" in str(e): | |
print(f" File not found. Please ensure Repo ID '{GGUF_REPO_ID}' and Filename '{GGUF_FILENAME}' are correct and the file exists on Hugging Face Hub.") | |
elif "invalid GGUF file" in str(e) or "failed to load model" in str(e): | |
print(f" Model loading failed. The GGUF file '{GGUF_FILENAME}' might be corrupted, incompatible with this version of llama-cpp-python, or the quantization level is unsupported.") | |
print(f" Consider trying a different quantization like 'Q4_K_M'.") | |
# Add other potential error checks based on llama-cpp-python exceptions | |
raise RuntimeError(f"Failed to load local GGUF model '{GGUF_FILENAME}' using llama-cpp-python.") from e | |
# --- Call load_model --- | |
try: | |
# Tokenizer should now be loaded for both paths | |
tokenizer, model, model_name = load_model() | |
if tokenizer is None: # Should not happen now | |
raise RuntimeError("Tokenizer failed to load.") | |
except Exception as load_err: | |
print(f"π¨ CRITICAL ERROR DURING MODEL LOADING: {load_err}") | |
# For UI testing, you might want to create dummy objects instead of raising | |
# tokenizer = None | |
# model = None | |
# model_name = "LOAD FAILED" | |
raise # Re-raise for now | |
# --- Load hotel docs function --- | |
def load_hotel_docs(hotel_id): | |
knowledge_dir = "knowledge" | |
path = os.path.join(knowledge_dir, f"{hotel_id}.txt") | |
if not os.path.exists(path): | |
print(f"β οΈ Knowledge file not found: {path}") | |
return [] | |
try: | |
with open(path, encoding="utf-8") as f: | |
content = f.read().strip() | |
print(f"DEBUG [load_hotel_docs]: Read {len(content)} chars from {path}.") | |
if not content: | |
print(f"β οΈ WARNING [load_hotel_docs]: File {path} is empty.") | |
return [] | |
return [(hotel_id, content)] # Return list with tuple: [(id, content)] | |
except Exception as e: | |
print(f"β Error reading knowledge file {path}: {e}") | |
return [] | |
# --- Dynamic Hotel ID Detection --- | |
available_hotels = [] | |
knowledge_dir = "knowledge" | |
if os.path.exists(knowledge_dir): | |
print("π Scanning for available hotels...") | |
files = os.listdir(knowledge_dir) | |
potential_ids = set() | |
for f in files: | |
if f.endswith(".txt") and not f.endswith("-system.txt"): | |
potential_ids.add(f[:-4]) # Add ID without .txt | |
for hotel_id in sorted(list(potential_ids)): | |
doc_file = os.path.join(knowledge_dir, f"{hotel_id}.txt") | |
sys_file = os.path.join(knowledge_dir, f"{hotel_id}-system.txt") | |
if os.path.exists(doc_file) and os.path.exists(sys_file): | |
available_hotels.append(hotel_id) | |
print(f" β Found valid hotel pair: {hotel_id}") | |
else: | |
print(f" β οΈ Skipping '{hotel_id}': Missing either '{hotel_id}.txt' or '{hotel_id}-system.txt'") | |
print("Hotel scan complete.") | |
else: | |
print(f"β οΈ Knowledge directory '{knowledge_dir}' not found. No hotels loaded.") | |
# --- End Hotel Scanning --- | |
# --- Chat function --- | |
def chat(message, history, hotel_id): | |
if history is None: history = [] | |
# Convert Gradio history | |
history_hf_format = [] | |
for user_msg, assistant_msg in history: | |
if user_msg: history_hf_format.append({"role": "user", "content": user_msg}) | |
if assistant_msg: history_hf_format.append({"role": "assistant", "content": assistant_msg}) | |
current_turn = {"role": "user", "content": message} | |
ui_history = history + [[message, None]] | |
yield ui_history, "" | |
response = "Sorry, an error occurred." | |
input_text = "" # Initialize input_text | |
try: | |
# --- System Prompt Loading --- | |
default_system_prompt = "You are a helpful hotel assistant." | |
system_prompt_filename = f"{hotel_id}-system.txt" | |
system_prompt_path = os.path.join("knowledge", system_prompt_filename) | |
system_prompt_content = default_system_prompt | |
if os.path.exists(system_prompt_path): | |
try: | |
with open(system_prompt_path, "r", encoding="utf-8") as f: loaded_prompt = f.read().strip() | |
if loaded_prompt: system_prompt_content = loaded_prompt | |
else: print(f"β οΈ System prompt file '{system_prompt_path}' is empty. Using default.") | |
except Exception as e: print(f"β Error reading system prompt file '{system_prompt_path}': {e}. Using default.") | |
else: print(f"β οΈ System prompt file not found: '{system_prompt_path}'. Using default.") | |
# --- Document Loading --- | |
hotel_docs_list = load_hotel_docs(hotel_id) | |
# --- Message List Construction (Base: System, History, User) --- | |
messages = [{"role": "system", "content": system_prompt_content}] | |
messages.extend(history_hf_format) | |
messages.append(current_turn) | |
print(f"DEBUG [chat]: Base messages list:\n{json.dumps(messages, indent=2)}") | |
# --- Prepare documents kwarg (Used by apply_chat_template in BOTH paths) --- | |
documents_for_kwarg = [] | |
if hotel_docs_list: | |
# Use 'doc_id' and 'text' keys for v3.3 template | |
documents_for_kwarg = [{"doc_id": doc_id, "text": doc_content} for doc_id, doc_content in hotel_docs_list] | |
print(f"DEBUG [chat]: Preparing documents kwarg: {len(documents_for_kwarg)} docs") | |
# --- Template Application (Now UNCONDITIONAL - uses tokenizer) --- | |
input_text = tokenizer.apply_chat_template( | |
messages, | |
documents=documents_for_kwarg, # Use kwarg for v3.3 template | |
tokenize=False, | |
add_generation_prompt=True | |
) | |
# --- THIS IS THE DEBUG PRINT YOU REQUESTED --- | |
print("\n" + "="*40 + " FINAL PROMPT STRING " + "="*40) | |
print(input_text) | |
print("="*99 + "\n") | |
# --- END DEBUG PRINT --- | |
except Exception as e: | |
print(f"β Error during prompt preparation: {e}") | |
ui_history[-1][1] = "Sorry, an error occurred while preparing the prompt." | |
yield ui_history, "" | |
return | |
# --- Generation --- | |
try: | |
if is_space: | |
# --- HF Space Generation (model.generate) --- | |
print("π Generating response using HF model...") | |
inputs = tokenizer(input_text, return_tensors="pt").to(device) | |
input_length = inputs.input_ids.shape[1] | |
with torch.no_grad(): | |
outputs = model.generate( | |
inputs.input_ids, | |
attention_mask=inputs.attention_mask, | |
max_new_tokens=1024, | |
do_sample=False, | |
eos_token_id=tokenizer.eos_token_id | |
) | |
new_token_ids = outputs[0][input_length:] | |
response = tokenizer.decode(new_token_ids, skip_special_tokens=True).strip() | |
print("β HF Generation complete.") | |
else: # Local GGUF Generation using llama-cpp-python's lower-level call | |
print("π» Generating response using GGUF model (llama-cpp-python)...") | |
# --- Use model(prompt_string, ...) --- | |
output = model( # Call the Llama object directly with the formatted string | |
input_text, | |
max_tokens=512, # Max tokens to generate | |
stop=["<|end_of_text|>"], # Use model's stop token(s) | |
temperature=0.1, | |
# echo=False # Usually default, don't echo the prompt | |
) | |
# Extract response content | |
if output and 'choices' in output and output['choices'] and 'text' in output['choices'][0]: | |
response = output['choices'][0]['text'].strip() | |
else: | |
print(f"β οΈ Unexpected output format from model call: {output}") | |
response = "Sorry, received an unexpected response structure." | |
# --- End model(prompt_string, ...) --- | |
print("β GGUF Generation complete (llama-cpp-python).") | |
if not response: | |
response = "Sorry, I encountered an issue generating a response (empty)." | |
except Exception as e: | |
print(f"β Error during model generation or processing: {e}") | |
response = f"Sorry, an error occurred: {e}" | |
print(f"DEBUG: Final response variable before UI append = {repr(response)}") | |
ui_history[-1][1] = response | |
yield ui_history, "" | |
# --- Gradio UI --- | |
with gr.Blocks() as demo: | |
with gr.Column(variant="panel"): | |
gr.Markdown("### π¨ MultiβHotel Chatbot Demo") | |
gr.Markdown(f"**Running:** {model_name}") # Displays HF name or GGUF info | |
hotel_selector = gr.Dropdown( | |
choices=available_hotels, | |
label="Hotel", | |
value=available_hotels[0] if available_hotels else None, | |
interactive=bool(available_hotels) | |
) | |
with gr.Row(): | |
chatbot = gr.Chatbot(label="Chat History", height=500) | |
msg = gr.Textbox( | |
show_label=False, | |
placeholder="Ask about the hotel..." | |
) | |
clear_btn = gr.Button("Clear") | |
clear_btn.click(lambda: ([], ""), None, [chatbot, msg]) | |
msg.submit( | |
fn=chat, | |
inputs=[msg, chatbot, hotel_selector], | |
outputs=[chatbot, msg] | |
) | |
if is_space: | |
gr.Markdown("β οΈ Pause the Space when done to avoid charges.") | |
# Enable streaming queue | |
demo.queue(default_concurrency_limit=2, max_size=32) | |
if __name__ == "__main__": | |
print("Launching Gradio Interface...") | |
demo.launch() | |
print("Gradio Interface closed.") |