LS-AI-img-gen / src /img_gen_modal.py
Andre
dynamic data
d1cc8ad
#img_gen_modal.py
import modal
import random
import io
from config.config import api_token
from config.prompts import prompts
from config.models import models_modal
import os
import gradio as gr
import torch
import sentencepiece
import torch
from huggingface_hub import login
from transformers import AutoTokenizer
import random
from datetime import datetime
from diffusers.callbacks import SDXLCFGCutoffCallback
from diffusers import FluxPipeline
from diffusers import DPMSolverMultistepScheduler, StableDiffusionPipeline, AutoencoderTiny, AutoencoderKL, DiffusionPipeline, FluxTransformer2DModel, GGUFQuantizationConfig
from PIL import Image
from src.check_dependecies import check_dependencies
import numpy as np
from src.prompt_gen import prompt_gen
MAX_SEED = np.iinfo(np.int32).max
MAX_IMAGE_SIZE = 2048
CACHE_DIR = "/model_cache"
# Define the Modal image
image = (
modal.Image.from_registry("nvidia/cuda:12.2.0-devel-ubuntu22.04", add_python="3.9")
.pip_install_from_requirements("requirements.txt")
#modal.Image.debian_slim(python_version="3.9") # Base image
# .apt_install(
# "git",
# )
# .pip_install(
# "diffusers",
# f"git+https://github.com/huggingface/transformers.git"
# )
.env(
{
"HF_HUB_ENABLE_HF_TRANSFER": "1", "HF_HOME": "HF_HOME", "HF_HUB_CACHE": CACHE_DIR
}
)
)
# Create a Modal app
app = modal.App("LS-img-gen-modal", image=image)
with image.imports():
import os
flux_model_vol = modal.Volume.from_name("flux-model-vol", create_if_missing=True) # Reference your volume
# GPU FUNCTION
@app.function(volumes={"/data": flux_model_vol},
secrets=[modal.Secret.from_name("huggingface-token")],
gpu="L40S",
timeout = 300
)
# MAIN GENERATE IMAGE FUNCTION
def generate_image_gpu(
adventurer_id,
#prompt_alias,
#character_dropdown,
scene_dropdown,
#model_alias,
custom_prompt,
height=360,
width=640,
num_inference_steps=20,
guidance_scale=2.0,
seed=-1):
# Find the selected prompt and model
print("Hello from LS_img_gen!")
#check_dependencies()
print ("Sending to prompt gen")
adventurer, prompt = prompt_gen(adventurer_id, scene_dropdown)
# Print the original prompt and dynamic values for debugging
print("Original Prompt:")
print(prompt)
print("Defining advanturer variables")
id = adventurer['id']
name = adventurer['name']
weapon_equipment = adventurer['weapon']
head_equipment = adventurer['head']
hand_equipment = adventurer['hand']
chest_equipment = adventurer['chest']
waist_equipment = adventurer['waist']
foot_equipment = adventurer['foot']
gold_equipment = adventurer['gold']
beast_last_battle = adventurer['beast']
print("Loading Model")
model_alias = "FLUX.1-dev"
try:
#prompt = next(p for p in prompts if p["alias"] == prompt_alias)["text"]
#model_name = next(m for m in models_modal if m["alias"] == model_alias)["name"]
model_name = model_alias
except StopIteration:
return None, "ERROR: Invalid prompt or model selected."
# # Append the custom character (if provided)
# if character_dropdown == "Wizard":
# prompt += f" A wizard combats using powerful magic against the {beast_last_battle}"
# elif character_dropdown == "Warrior":
# prompt += f" A warrior combats using his weapons against the {beast_last_battle}"
# else:
# pass
# Append the custom prompt (if provided)
if custom_prompt and len(custom_prompt.strip()) > 0:
prompt += " " + custom_prompt.strip()
# Print the formatted prompt for debugging
print("\nFormatted Prompt:")
print(prompt)
# Randomize the seed if needed
if seed == -1:
seed = random.randint(0, 1000000)
# HF LOGIN
print("Initializing HF TOKEN")
print (api_token)
# login(token=api_token)
# print("model_name:")
# print(model_name)
# Use absolute path with leading slash
model_path = f"/data/{model_name}" # Changed from "data/" to "/data/"
print(f"Loading model from local path: {model_path}")
# Debug: Check if the directory exists and list its contents
if os.path.exists(model_path):
print("Directory exists. Contents:")
for item in os.listdir(model_path):
print(f" - {item}")
else:
print(f"Directory does not exist: {model_path}")
print("Contents of /data:")
print(os.listdir("/data"))
# CHECK FOR TORCH USING CUDA
print("CHECK FOR TORCH USING CUDA")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print("inside if")
print(f"CUDA device count: {torch.cuda.device_count()}")
print(f"Current device: {torch.cuda.current_device()}")
print(f"Device name: {torch.cuda.get_device_name(torch.cuda.current_device())}")
try:
print("-----INITIALIZING PIPE-----")
pipe = FluxPipeline.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
#torch_dtype=torch.float16,
#torch_dtype=torch.float32,
#vae=taef1,
local_files_only=False,
)
#torch.cuda.empty_cache()
if model_alias == "Flux-Midjourney-Mix2-LoRA":
pipe.load_lora_weights("/data/Flux-Midjourney-Mix2-LoRA")
if torch.cuda.is_available():
print("CUDA available")
print("using gpu")
pipe = pipe.to("cuda")
pipe_message = "CUDA"
#pipe.enable_model_cpu_offload() # official recommended method but is running slower w it
else:
print("CUDA not available")
print("using cpu")
pipe = pipe.to("cpu")
pipe_message = "CPU"
print(f"-----{pipe_message} PIPE INITIALIZED-----")
print(f"Using device: {pipe.device}")
except Exception as e:
print(f"Detailed error: {str(e)}")
return None, f"ERROR: Failed to initialize PIPE2. Details: {e}"
########## SENDING IMG GEN TO PIPE - WORKING CODE ##########
try:
print("-----SENDING IMG GEN TO PIPE-----")
print("-----HOLD ON-----")
image = pipe(
prompt,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps,
width=width,
height=height,
max_sequence_length=512,
#callback_on_step_end=decode_tensors,
#callback_on_step_end_tensor_inputs=["latents"],
# seed=seed
).images[0]
#############################################################
print("-----IMAGE GENERATED SUCCESSFULLY!-----")
print(image)
except Exception as e:
return f"ERROR: Failed to initialize InferenceClient. Details: {e}"
try:
# Save the image with a timestamped filename
print("-----SAVING-----", image)
message = f"Image generated successfully! Call the banners! \n\n=====ADVENTURER GENERATED===== \nID: {id}\nNAME: {name}\nWEAPON: {weapon_equipment}\nHEAD: {head_equipment}\nHAND: {hand_equipment}\nCHEST: {chest_equipment}\nWAIST: {waist_equipment}\nFOOT: {foot_equipment}\nGOLD: {gold_equipment}\nLAST BATTLE BEAST: {beast_last_battle}"
file_name_extension = f"{id}_{beast_last_battle.replace(' ', '_').lower()}"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"/data/LS_images/{timestamp}_{seed}_{model_alias.replace(' ', '_').lower()}_{scene_dropdown.replace(' ', '_').lower()}_{file_name_extension.replace(' ', '_').lower()}.png"
try:
image.save(output_filename)
except Exception as e:
return None, f"ERROR: Failed to save image. Details: {e}"
print("-----DONE!-----")
print("-----CALL THE BANNERS!-----")
except Exception as e:
print(f"ERROR: Failed to save image. Details: {e}")
# Return the filename and success message
return image, message