hotel-chat / app.py
looker01202
granite 3.3 working locally using llama python libraries to load GGUF model version
5dce664
import os
import json # For debug printing
import gradio as gr
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
try:
# Try importing llama-cpp-python for GGUF support
from llama_cpp import Llama
LLAMA_CPP_AVAILABLE = True
except ImportError:
print("⚠️ WARNING: llama-cpp-python library not found. Local GGUF execution will not be available.")
print(" To enable local GGUF, run: pip install llama-cpp-python")
Llama = None # Define as None if import fails
LLAMA_CPP_AVAILABLE = False
# --- Configuration ---
# HF Repo ID for the standard model (used in Space and for tokenizer)
HF_CHECKPOINT = "ibm-granite/granite-3.3-2b-instruct"
# GGUF Settings for Local Execution (Using llama-cpp-python)
GGUF_REPO_ID = "ibm-granite/granite-3.3-2b-instruct-gguf" # Official IBM v3.3 GGUF repo
GGUF_FILENAME = "granite-3.3-2b-instruct-Q2_K.gguf" # Smallest Q2_K quantization
# GGUF_FILENAME = "granite-3.3-2b-instruct-Q4_K_M.gguf" # Fallback if Q2_K fails
# Template Filename (Use v3.3 template for both paths now)
TEMPLATE_FILENAME = "granite3.3_2b_chat_template.jinja"
# --- End Configuration ---
# Detect Space environment
env = os.environ
is_space = env.get("SPACE_ID") is not None
print(f"RUNNING IN SPACE? {is_space}")
# Device setup (primarily for HF model in Space)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
# --- Load model function ---
def load_model():
primary_checkpoint = HF_CHECKPOINT
model_name_display = primary_checkpoint
# --- Function to load and apply template ---
def apply_template_from_file(tokenizer, template_filename):
applied_template = False
try:
print(f"Attempting to load chat template from: {template_filename}")
script_dir = os.path.dirname(os.path.abspath(__file__))
template_path = os.path.join(script_dir, template_filename)
if not os.path.exists(template_path):
print(f"⚠️ WARNING: Template file not found at: {template_path}")
return False
with open(template_path, "r", encoding="utf-8") as f:
custom_chat_template_content = f.read()
if hasattr(tokenizer, 'chat_template'):
tokenizer.chat_template = custom_chat_template_content
applied_template = True
print(f"βœ… Loaded and applied chat template from: {template_filename}")
else:
print(f"⚠️ WARNING: Tokenizer object does not support setting 'chat_template'.")
except Exception as e:
print(f"❌ ERROR reading or applying template file '{template_filename}': {e}")
if not applied_template:
print("Falling back to tokenizer's default built-in template (if any).")
print("--- Final Chat Template Being Used (by HF Tokenizer) ---")
print(tokenizer.chat_template if hasattr(tokenizer, 'chat_template') and tokenizer.chat_template else "No template found or template empty/default.")
print("-------------------------------------------------------")
return applied_template
# --- End function ---
# --- Load Tokenizer (Common for both paths now) ---
try:
print(f"Loading HF Tokenizer: {primary_checkpoint}")
tokenizer = AutoTokenizer.from_pretrained(primary_checkpoint, use_fast=True)
print("βœ… Loaded HF Tokenizer.")
# Apply the v3.3 template UNCONDITIONALLY
apply_template_from_file(tokenizer, TEMPLATE_FILENAME)
except Exception as e:
print(f"❌ Failed to load tokenizer {primary_checkpoint}: {e}")
raise RuntimeError("Failed to load the necessary tokenizer.") from e
# --- End Tokenizer Loading ---
if is_space:
print(f"πŸš€ Running in Space. Loading HF model: {primary_checkpoint}")
try:
# Load HF Model for Space
model = AutoModelForCausalLM.from_pretrained(
primary_checkpoint,
torch_dtype=torch.float16,
low_cpu_mem_usage=True,
device_map="auto"
)
print(f"βœ… Loaded HF {primary_checkpoint}")
model_name_display = primary_checkpoint
# Tokenizer already loaded and template applied
return tokenizer, model, model_name_display
except Exception as e:
print(f"❌ HF Primary load failed: {e}")
raise RuntimeError(f"Failed to load primary HF model {primary_checkpoint} in Space.") from e
else: # Running Locally - Load GGUF using llama-cpp-python
print(f"πŸ’» Running Locally. Attempting GGUF setup via llama-cpp-python.")
if not LLAMA_CPP_AVAILABLE:
raise RuntimeError("llama-cpp-python library is required but not installed/found.")
print(f" GGUF Repo ID: {GGUF_REPO_ID}")
print(f" GGUF Filename: {GGUF_FILENAME}")
try:
# Load GGUF Model using llama-cpp-python
print(f"Attempting to load GGUF model using Llama.from_pretrained...")
model = Llama.from_pretrained(
repo_id=GGUF_REPO_ID,
filename=GGUF_FILENAME,
n_gpu_layers=0, # Force CPU execution
verbose=True,
n_ctx=4096 # Increased context window
)
print(f"βœ… Loaded GGUF model {GGUF_FILENAME} using llama-cpp-python")
model_name_display = f"GGUF (llama-cpp): {GGUF_FILENAME}"
# Return tokenizer loaded earlier and the Llama model object
return tokenizer, model, model_name_display
except Exception as e:
print(f"❌ Local GGUF load failed using llama-cpp-python: {e}")
if "Not Found" in str(e) or "404" in str(e):
print(f" File not found. Please ensure Repo ID '{GGUF_REPO_ID}' and Filename '{GGUF_FILENAME}' are correct and the file exists on Hugging Face Hub.")
elif "invalid GGUF file" in str(e) or "failed to load model" in str(e):
print(f" Model loading failed. The GGUF file '{GGUF_FILENAME}' might be corrupted, incompatible with this version of llama-cpp-python, or the quantization level is unsupported.")
print(f" Consider trying a different quantization like 'Q4_K_M'.")
# Add other potential error checks based on llama-cpp-python exceptions
raise RuntimeError(f"Failed to load local GGUF model '{GGUF_FILENAME}' using llama-cpp-python.") from e
# --- Call load_model ---
try:
# Tokenizer should now be loaded for both paths
tokenizer, model, model_name = load_model()
if tokenizer is None: # Should not happen now
raise RuntimeError("Tokenizer failed to load.")
except Exception as load_err:
print(f"🚨 CRITICAL ERROR DURING MODEL LOADING: {load_err}")
# For UI testing, you might want to create dummy objects instead of raising
# tokenizer = None
# model = None
# model_name = "LOAD FAILED"
raise # Re-raise for now
# --- Load hotel docs function ---
def load_hotel_docs(hotel_id):
knowledge_dir = "knowledge"
path = os.path.join(knowledge_dir, f"{hotel_id}.txt")
if not os.path.exists(path):
print(f"⚠️ Knowledge file not found: {path}")
return []
try:
with open(path, encoding="utf-8") as f:
content = f.read().strip()
print(f"DEBUG [load_hotel_docs]: Read {len(content)} chars from {path}.")
if not content:
print(f"⚠️ WARNING [load_hotel_docs]: File {path} is empty.")
return []
return [(hotel_id, content)] # Return list with tuple: [(id, content)]
except Exception as e:
print(f"❌ Error reading knowledge file {path}: {e}")
return []
# --- Dynamic Hotel ID Detection ---
available_hotels = []
knowledge_dir = "knowledge"
if os.path.exists(knowledge_dir):
print("πŸ” Scanning for available hotels...")
files = os.listdir(knowledge_dir)
potential_ids = set()
for f in files:
if f.endswith(".txt") and not f.endswith("-system.txt"):
potential_ids.add(f[:-4]) # Add ID without .txt
for hotel_id in sorted(list(potential_ids)):
doc_file = os.path.join(knowledge_dir, f"{hotel_id}.txt")
sys_file = os.path.join(knowledge_dir, f"{hotel_id}-system.txt")
if os.path.exists(doc_file) and os.path.exists(sys_file):
available_hotels.append(hotel_id)
print(f" βœ… Found valid hotel pair: {hotel_id}")
else:
print(f" ⚠️ Skipping '{hotel_id}': Missing either '{hotel_id}.txt' or '{hotel_id}-system.txt'")
print("Hotel scan complete.")
else:
print(f"⚠️ Knowledge directory '{knowledge_dir}' not found. No hotels loaded.")
# --- End Hotel Scanning ---
# --- Chat function ---
def chat(message, history, hotel_id):
if history is None: history = []
# Convert Gradio history
history_hf_format = []
for user_msg, assistant_msg in history:
if user_msg: history_hf_format.append({"role": "user", "content": user_msg})
if assistant_msg: history_hf_format.append({"role": "assistant", "content": assistant_msg})
current_turn = {"role": "user", "content": message}
ui_history = history + [[message, None]]
yield ui_history, ""
response = "Sorry, an error occurred."
input_text = "" # Initialize input_text
try:
# --- System Prompt Loading ---
default_system_prompt = "You are a helpful hotel assistant."
system_prompt_filename = f"{hotel_id}-system.txt"
system_prompt_path = os.path.join("knowledge", system_prompt_filename)
system_prompt_content = default_system_prompt
if os.path.exists(system_prompt_path):
try:
with open(system_prompt_path, "r", encoding="utf-8") as f: loaded_prompt = f.read().strip()
if loaded_prompt: system_prompt_content = loaded_prompt
else: print(f"⚠️ System prompt file '{system_prompt_path}' is empty. Using default.")
except Exception as e: print(f"❌ Error reading system prompt file '{system_prompt_path}': {e}. Using default.")
else: print(f"⚠️ System prompt file not found: '{system_prompt_path}'. Using default.")
# --- Document Loading ---
hotel_docs_list = load_hotel_docs(hotel_id)
# --- Message List Construction (Base: System, History, User) ---
messages = [{"role": "system", "content": system_prompt_content}]
messages.extend(history_hf_format)
messages.append(current_turn)
print(f"DEBUG [chat]: Base messages list:\n{json.dumps(messages, indent=2)}")
# --- Prepare documents kwarg (Used by apply_chat_template in BOTH paths) ---
documents_for_kwarg = []
if hotel_docs_list:
# Use 'doc_id' and 'text' keys for v3.3 template
documents_for_kwarg = [{"doc_id": doc_id, "text": doc_content} for doc_id, doc_content in hotel_docs_list]
print(f"DEBUG [chat]: Preparing documents kwarg: {len(documents_for_kwarg)} docs")
# --- Template Application (Now UNCONDITIONAL - uses tokenizer) ---
input_text = tokenizer.apply_chat_template(
messages,
documents=documents_for_kwarg, # Use kwarg for v3.3 template
tokenize=False,
add_generation_prompt=True
)
# --- THIS IS THE DEBUG PRINT YOU REQUESTED ---
print("\n" + "="*40 + " FINAL PROMPT STRING " + "="*40)
print(input_text)
print("="*99 + "\n")
# --- END DEBUG PRINT ---
except Exception as e:
print(f"❌ Error during prompt preparation: {e}")
ui_history[-1][1] = "Sorry, an error occurred while preparing the prompt."
yield ui_history, ""
return
# --- Generation ---
try:
if is_space:
# --- HF Space Generation (model.generate) ---
print("πŸš€ Generating response using HF model...")
inputs = tokenizer(input_text, return_tensors="pt").to(device)
input_length = inputs.input_ids.shape[1]
with torch.no_grad():
outputs = model.generate(
inputs.input_ids,
attention_mask=inputs.attention_mask,
max_new_tokens=1024,
do_sample=False,
eos_token_id=tokenizer.eos_token_id
)
new_token_ids = outputs[0][input_length:]
response = tokenizer.decode(new_token_ids, skip_special_tokens=True).strip()
print("βœ… HF Generation complete.")
else: # Local GGUF Generation using llama-cpp-python's lower-level call
print("πŸ’» Generating response using GGUF model (llama-cpp-python)...")
# --- Use model(prompt_string, ...) ---
output = model( # Call the Llama object directly with the formatted string
input_text,
max_tokens=512, # Max tokens to generate
stop=["<|end_of_text|>"], # Use model's stop token(s)
temperature=0.1,
# echo=False # Usually default, don't echo the prompt
)
# Extract response content
if output and 'choices' in output and output['choices'] and 'text' in output['choices'][0]:
response = output['choices'][0]['text'].strip()
else:
print(f"⚠️ Unexpected output format from model call: {output}")
response = "Sorry, received an unexpected response structure."
# --- End model(prompt_string, ...) ---
print("βœ… GGUF Generation complete (llama-cpp-python).")
if not response:
response = "Sorry, I encountered an issue generating a response (empty)."
except Exception as e:
print(f"❌ Error during model generation or processing: {e}")
response = f"Sorry, an error occurred: {e}"
print(f"DEBUG: Final response variable before UI append = {repr(response)}")
ui_history[-1][1] = response
yield ui_history, ""
# --- Gradio UI ---
with gr.Blocks() as demo:
with gr.Column(variant="panel"):
gr.Markdown("### 🏨 Multi‑Hotel Chatbot Demo")
gr.Markdown(f"**Running:** {model_name}") # Displays HF name or GGUF info
hotel_selector = gr.Dropdown(
choices=available_hotels,
label="Hotel",
value=available_hotels[0] if available_hotels else None,
interactive=bool(available_hotels)
)
with gr.Row():
chatbot = gr.Chatbot(label="Chat History", height=500)
msg = gr.Textbox(
show_label=False,
placeholder="Ask about the hotel..."
)
clear_btn = gr.Button("Clear")
clear_btn.click(lambda: ([], ""), None, [chatbot, msg])
msg.submit(
fn=chat,
inputs=[msg, chatbot, hotel_selector],
outputs=[chatbot, msg]
)
if is_space:
gr.Markdown("⚠️ Pause the Space when done to avoid charges.")
# Enable streaming queue
demo.queue(default_concurrency_limit=2, max_size=32)
if __name__ == "__main__":
print("Launching Gradio Interface...")
demo.launch()
print("Gradio Interface closed.")