|
import gc |
|
from math import ceil |
|
from typing import Any, List |
|
import random |
|
|
|
import numpy as np |
|
import torch |
|
from backend.device import is_openvino_device |
|
from backend.controlnet import ( |
|
load_controlnet_adapters, |
|
update_controlnet_arguments, |
|
) |
|
from backend.models.lcmdiffusion_setting import ( |
|
DiffusionTask, |
|
LCMDiffusionSetting, |
|
LCMLora, |
|
) |
|
from backend.openvino.pipelines import ( |
|
get_ov_image_to_image_pipeline, |
|
get_ov_text_to_image_pipeline, |
|
ov_load_taesd, |
|
) |
|
from backend.pipelines.lcm import ( |
|
get_image_to_image_pipeline, |
|
get_lcm_model_pipeline, |
|
load_taesd, |
|
) |
|
from backend.pipelines.lcm_lora import get_lcm_lora_pipeline |
|
from constants import DEVICE, GGUF_THREADS |
|
from diffusers import LCMScheduler |
|
from image_ops import resize_pil_image |
|
from backend.openvino.flux_pipeline import get_flux_pipeline |
|
from backend.openvino.ov_hc_stablediffusion_pipeline import OvHcLatentConsistency |
|
from backend.gguf.gguf_diffusion import ( |
|
GGUFDiffusion, |
|
ModelConfig, |
|
Txt2ImgConfig, |
|
SampleMethod, |
|
) |
|
from paths import get_app_path |
|
from pprint import pprint |
|
|
|
try: |
|
|
|
import tomesd |
|
except ImportError: |
|
print("tomesd library unavailable; disabling token merging support") |
|
tomesd = None |
|
|
|
|
|
class LCMTextToImage: |
|
def __init__( |
|
self, |
|
device: str = "cpu", |
|
) -> None: |
|
self.pipeline = None |
|
self.use_openvino = False |
|
self.device = "" |
|
self.previous_model_id = None |
|
self.previous_use_tae_sd = False |
|
self.previous_use_lcm_lora = False |
|
self.previous_ov_model_id = "" |
|
self.previous_token_merging = 0.0 |
|
self.previous_safety_checker = False |
|
self.previous_use_openvino = False |
|
self.img_to_img_pipeline = None |
|
self.is_openvino_init = False |
|
self.previous_lora = None |
|
self.task_type = DiffusionTask.text_to_image |
|
self.previous_use_gguf_model = False |
|
self.previous_gguf_model = None |
|
self.torch_data_type = ( |
|
torch.float32 if is_openvino_device() or DEVICE == "mps" else torch.float16 |
|
) |
|
self.ov_model_id = None |
|
print(f"Torch datatype : {self.torch_data_type}") |
|
|
|
def _pipeline_to_device(self): |
|
print(f"Pipeline device : {DEVICE}") |
|
print(f"Pipeline dtype : {self.torch_data_type}") |
|
self.pipeline.to( |
|
torch_device=DEVICE, |
|
torch_dtype=self.torch_data_type, |
|
) |
|
|
|
def _add_freeu(self): |
|
pipeline_class = self.pipeline.__class__.__name__ |
|
if isinstance(self.pipeline.scheduler, LCMScheduler): |
|
if pipeline_class == "StableDiffusionPipeline": |
|
print("Add FreeU - SD") |
|
self.pipeline.enable_freeu( |
|
s1=0.9, |
|
s2=0.2, |
|
b1=1.2, |
|
b2=1.4, |
|
) |
|
elif pipeline_class == "StableDiffusionXLPipeline": |
|
print("Add FreeU - SDXL") |
|
self.pipeline.enable_freeu( |
|
s1=0.6, |
|
s2=0.4, |
|
b1=1.1, |
|
b2=1.2, |
|
) |
|
|
|
def _enable_vae_tiling(self): |
|
self.pipeline.vae.enable_tiling() |
|
|
|
def _update_lcm_scheduler_params(self): |
|
if isinstance(self.pipeline.scheduler, LCMScheduler): |
|
self.pipeline.scheduler = LCMScheduler.from_config( |
|
self.pipeline.scheduler.config, |
|
beta_start=0.001, |
|
beta_end=0.01, |
|
) |
|
|
|
def _is_hetero_pipeline(self) -> bool: |
|
return "square" in self.ov_model_id.lower() |
|
|
|
def _load_ov_hetero_pipeline(self): |
|
print("Loading Heterogeneous Compute pipeline") |
|
if DEVICE.upper() == "NPU": |
|
device = ["NPU", "NPU", "NPU"] |
|
self.pipeline = OvHcLatentConsistency(self.ov_model_id, device) |
|
else: |
|
self.pipeline = OvHcLatentConsistency(self.ov_model_id) |
|
|
|
def _generate_images_hetero_compute( |
|
self, |
|
lcm_diffusion_setting: LCMDiffusionSetting, |
|
): |
|
print("Using OpenVINO ") |
|
if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value: |
|
return [ |
|
self.pipeline.generate( |
|
prompt=lcm_diffusion_setting.prompt, |
|
neg_prompt=lcm_diffusion_setting.negative_prompt, |
|
init_image=None, |
|
strength=1.0, |
|
num_inference_steps=lcm_diffusion_setting.inference_steps, |
|
) |
|
] |
|
else: |
|
return [ |
|
self.pipeline.generate( |
|
prompt=lcm_diffusion_setting.prompt, |
|
neg_prompt=lcm_diffusion_setting.negative_prompt, |
|
init_image=lcm_diffusion_setting.init_image, |
|
strength=lcm_diffusion_setting.strength, |
|
num_inference_steps=lcm_diffusion_setting.inference_steps, |
|
) |
|
] |
|
|
|
def _is_valid_mode( |
|
self, |
|
modes: List, |
|
) -> bool: |
|
return modes.count(True) == 1 or modes.count(False) == 3 |
|
|
|
def _validate_mode( |
|
self, |
|
modes: List, |
|
) -> None: |
|
if not self._is_valid_mode(modes): |
|
raise ValueError("Invalid mode,delete configs/settings.yaml and retry!") |
|
|
|
def init( |
|
self, |
|
device: str = "cpu", |
|
lcm_diffusion_setting: LCMDiffusionSetting = LCMDiffusionSetting(), |
|
) -> None: |
|
|
|
|
|
modes = [ |
|
lcm_diffusion_setting.use_gguf_model, |
|
lcm_diffusion_setting.use_openvino, |
|
lcm_diffusion_setting.use_lcm_lora, |
|
] |
|
self._validate_mode(modes) |
|
self.device = device |
|
self.use_openvino = lcm_diffusion_setting.use_openvino |
|
model_id = lcm_diffusion_setting.lcm_model_id |
|
use_local_model = lcm_diffusion_setting.use_offline_model |
|
use_tiny_auto_encoder = lcm_diffusion_setting.use_tiny_auto_encoder |
|
use_lora = lcm_diffusion_setting.use_lcm_lora |
|
lcm_lora: LCMLora = lcm_diffusion_setting.lcm_lora |
|
token_merging = lcm_diffusion_setting.token_merging |
|
self.ov_model_id = lcm_diffusion_setting.openvino_lcm_model_id |
|
|
|
if lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value: |
|
lcm_diffusion_setting.init_image = resize_pil_image( |
|
lcm_diffusion_setting.init_image, |
|
lcm_diffusion_setting.image_width, |
|
lcm_diffusion_setting.image_height, |
|
) |
|
|
|
if ( |
|
self.pipeline is None |
|
or self.previous_model_id != model_id |
|
or self.previous_use_tae_sd != use_tiny_auto_encoder |
|
or self.previous_lcm_lora_base_id != lcm_lora.base_model_id |
|
or self.previous_lcm_lora_id != lcm_lora.lcm_lora_id |
|
or self.previous_use_lcm_lora != use_lora |
|
or self.previous_ov_model_id != self.ov_model_id |
|
or self.previous_token_merging != token_merging |
|
or self.previous_safety_checker != lcm_diffusion_setting.use_safety_checker |
|
or self.previous_use_openvino != lcm_diffusion_setting.use_openvino |
|
or self.previous_use_gguf_model != lcm_diffusion_setting.use_gguf_model |
|
or self.previous_gguf_model != lcm_diffusion_setting.gguf_model |
|
or ( |
|
self.use_openvino |
|
and ( |
|
self.previous_task_type != lcm_diffusion_setting.diffusion_task |
|
or self.previous_lora != lcm_diffusion_setting.lora |
|
) |
|
) |
|
or lcm_diffusion_setting.rebuild_pipeline |
|
): |
|
if self.use_openvino and is_openvino_device(): |
|
if self.pipeline: |
|
del self.pipeline |
|
self.pipeline = None |
|
gc.collect() |
|
self.is_openvino_init = True |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.text_to_image.value |
|
): |
|
print( |
|
f"***** Init Text to image (OpenVINO) - {self.ov_model_id} *****" |
|
) |
|
if "flux" in self.ov_model_id.lower(): |
|
print("Loading OpenVINO Flux pipeline") |
|
self.pipeline = get_flux_pipeline( |
|
self.ov_model_id, |
|
lcm_diffusion_setting.use_tiny_auto_encoder, |
|
) |
|
elif self._is_hetero_pipeline(): |
|
self._load_ov_hetero_pipeline() |
|
else: |
|
self.pipeline = get_ov_text_to_image_pipeline( |
|
self.ov_model_id, |
|
use_local_model, |
|
) |
|
elif ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
): |
|
if not self.pipeline and self._is_hetero_pipeline(): |
|
self._load_ov_hetero_pipeline() |
|
else: |
|
print( |
|
f"***** Image to image (OpenVINO) - {self.ov_model_id} *****" |
|
) |
|
self.pipeline = get_ov_image_to_image_pipeline( |
|
self.ov_model_id, |
|
use_local_model, |
|
) |
|
elif lcm_diffusion_setting.use_gguf_model: |
|
model = lcm_diffusion_setting.gguf_model.diffusion_path |
|
print(f"***** Init Text to image (GGUF) - {model} *****") |
|
|
|
|
|
|
|
|
|
self._init_gguf_diffusion(lcm_diffusion_setting) |
|
else: |
|
if self.pipeline or self.img_to_img_pipeline: |
|
self.pipeline = None |
|
self.img_to_img_pipeline = None |
|
gc.collect() |
|
|
|
controlnet_args = load_controlnet_adapters(lcm_diffusion_setting) |
|
if use_lora: |
|
print( |
|
f"***** Init LCM-LoRA pipeline - {lcm_lora.base_model_id} *****" |
|
) |
|
self.pipeline = get_lcm_lora_pipeline( |
|
lcm_lora.base_model_id, |
|
lcm_lora.lcm_lora_id, |
|
use_local_model, |
|
torch_data_type=self.torch_data_type, |
|
pipeline_args=controlnet_args, |
|
) |
|
|
|
else: |
|
print(f"***** Init LCM Model pipeline - {model_id} *****") |
|
self.pipeline = get_lcm_model_pipeline( |
|
model_id, |
|
use_local_model, |
|
controlnet_args, |
|
) |
|
|
|
self.img_to_img_pipeline = get_image_to_image_pipeline(self.pipeline) |
|
|
|
if tomesd and token_merging > 0.001: |
|
print(f"***** Token Merging: {token_merging} *****") |
|
tomesd.apply_patch(self.pipeline, ratio=token_merging) |
|
tomesd.apply_patch(self.img_to_img_pipeline, ratio=token_merging) |
|
|
|
if use_tiny_auto_encoder: |
|
if self.use_openvino and is_openvino_device(): |
|
if self.pipeline.__class__.__name__ != "OVFluxPipeline": |
|
print("Using Tiny Auto Encoder (OpenVINO)") |
|
ov_load_taesd( |
|
self.pipeline, |
|
use_local_model, |
|
) |
|
else: |
|
print("Using Tiny Auto Encoder") |
|
load_taesd( |
|
self.pipeline, |
|
use_local_model, |
|
self.torch_data_type, |
|
) |
|
load_taesd( |
|
self.img_to_img_pipeline, |
|
use_local_model, |
|
self.torch_data_type, |
|
) |
|
|
|
if not self.use_openvino and not is_openvino_device(): |
|
self._pipeline_to_device() |
|
|
|
if not self._is_hetero_pipeline(): |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
and lcm_diffusion_setting.use_openvino |
|
): |
|
self.pipeline.scheduler = LCMScheduler.from_config( |
|
self.pipeline.scheduler.config, |
|
) |
|
else: |
|
if not lcm_diffusion_setting.use_gguf_model: |
|
self._update_lcm_scheduler_params() |
|
|
|
if use_lora: |
|
self._add_freeu() |
|
|
|
self.previous_model_id = model_id |
|
self.previous_ov_model_id = self.ov_model_id |
|
self.previous_use_tae_sd = use_tiny_auto_encoder |
|
self.previous_lcm_lora_base_id = lcm_lora.base_model_id |
|
self.previous_lcm_lora_id = lcm_lora.lcm_lora_id |
|
self.previous_use_lcm_lora = use_lora |
|
self.previous_token_merging = lcm_diffusion_setting.token_merging |
|
self.previous_safety_checker = lcm_diffusion_setting.use_safety_checker |
|
self.previous_use_openvino = lcm_diffusion_setting.use_openvino |
|
self.previous_task_type = lcm_diffusion_setting.diffusion_task |
|
self.previous_lora = lcm_diffusion_setting.lora.model_copy(deep=True) |
|
self.previous_use_gguf_model = lcm_diffusion_setting.use_gguf_model |
|
self.previous_gguf_model = lcm_diffusion_setting.gguf_model.model_copy( |
|
deep=True |
|
) |
|
lcm_diffusion_setting.rebuild_pipeline = False |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.text_to_image.value |
|
): |
|
print(f"Pipeline : {self.pipeline}") |
|
elif ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
): |
|
if self.use_openvino and is_openvino_device(): |
|
print(f"Pipeline : {self.pipeline}") |
|
else: |
|
print(f"Pipeline : {self.img_to_img_pipeline}") |
|
if self.use_openvino: |
|
if lcm_diffusion_setting.lora.enabled: |
|
print("Warning: Lora models not supported on OpenVINO mode") |
|
elif not lcm_diffusion_setting.use_gguf_model: |
|
adapters = self.pipeline.get_active_adapters() |
|
print(f"Active adapters : {adapters}") |
|
|
|
def _get_timesteps(self): |
|
time_steps = self.pipeline.scheduler.config.get("timesteps") |
|
time_steps_value = [int(time_steps)] if time_steps else None |
|
return time_steps_value |
|
|
|
def generate( |
|
self, |
|
lcm_diffusion_setting: LCMDiffusionSetting, |
|
reshape: bool = False, |
|
) -> Any: |
|
guidance_scale = lcm_diffusion_setting.guidance_scale |
|
img_to_img_inference_steps = lcm_diffusion_setting.inference_steps |
|
check_step_value = int( |
|
lcm_diffusion_setting.inference_steps * lcm_diffusion_setting.strength |
|
) |
|
if ( |
|
lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value |
|
and check_step_value < 1 |
|
): |
|
img_to_img_inference_steps = ceil(1 / lcm_diffusion_setting.strength) |
|
print( |
|
f"Strength: {lcm_diffusion_setting.strength},{img_to_img_inference_steps}" |
|
) |
|
|
|
pipeline_extra_args = {} |
|
|
|
if lcm_diffusion_setting.use_seed: |
|
cur_seed = lcm_diffusion_setting.seed |
|
|
|
seeds = [ |
|
(cur_seed + i) for i in range(lcm_diffusion_setting.number_of_images) |
|
] |
|
else: |
|
seeds = [ |
|
random.randint(0, 999999999) |
|
for i in range(lcm_diffusion_setting.number_of_images) |
|
] |
|
|
|
if self.use_openvino: |
|
|
|
np.random.seed(seeds[0]) |
|
if self._is_hetero_pipeline(): |
|
torch.manual_seed(seeds[0]) |
|
lcm_diffusion_setting.seed = seeds[0] |
|
else: |
|
pipeline_extra_args["generator"] = [ |
|
torch.Generator(device=self.device).manual_seed(s) for s in seeds |
|
] |
|
|
|
is_openvino_pipe = lcm_diffusion_setting.use_openvino and is_openvino_device() |
|
if is_openvino_pipe and not self._is_hetero_pipeline(): |
|
print("Using OpenVINO") |
|
if reshape and not self.is_openvino_init: |
|
print("Reshape and compile") |
|
self.pipeline.reshape( |
|
batch_size=-1, |
|
height=lcm_diffusion_setting.image_height, |
|
width=lcm_diffusion_setting.image_width, |
|
num_images_per_prompt=lcm_diffusion_setting.number_of_images, |
|
) |
|
self.pipeline.compile() |
|
|
|
if self.is_openvino_init: |
|
self.is_openvino_init = False |
|
|
|
if is_openvino_pipe and self._is_hetero_pipeline(): |
|
return self._generate_images_hetero_compute(lcm_diffusion_setting) |
|
elif lcm_diffusion_setting.use_gguf_model: |
|
return self._generate_images_gguf(lcm_diffusion_setting) |
|
|
|
if lcm_diffusion_setting.clip_skip > 1: |
|
|
|
|
|
pipeline_extra_args["clip_skip"] = lcm_diffusion_setting.clip_skip - 1 |
|
|
|
if not lcm_diffusion_setting.use_safety_checker: |
|
self.pipeline.safety_checker = None |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
and not is_openvino_pipe |
|
): |
|
self.img_to_img_pipeline.safety_checker = None |
|
|
|
if ( |
|
not lcm_diffusion_setting.use_lcm_lora |
|
and not lcm_diffusion_setting.use_openvino |
|
and lcm_diffusion_setting.guidance_scale != 1.0 |
|
): |
|
print("Not using LCM-LoRA so setting guidance_scale 1.0") |
|
guidance_scale = 1.0 |
|
|
|
controlnet_args = update_controlnet_arguments(lcm_diffusion_setting) |
|
if lcm_diffusion_setting.use_openvino: |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.text_to_image.value |
|
): |
|
result_images = self.pipeline( |
|
prompt=lcm_diffusion_setting.prompt, |
|
negative_prompt=lcm_diffusion_setting.negative_prompt, |
|
num_inference_steps=lcm_diffusion_setting.inference_steps, |
|
guidance_scale=guidance_scale, |
|
width=lcm_diffusion_setting.image_width, |
|
height=lcm_diffusion_setting.image_height, |
|
num_images_per_prompt=lcm_diffusion_setting.number_of_images, |
|
).images |
|
elif ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
): |
|
result_images = self.pipeline( |
|
image=lcm_diffusion_setting.init_image, |
|
strength=lcm_diffusion_setting.strength, |
|
prompt=lcm_diffusion_setting.prompt, |
|
negative_prompt=lcm_diffusion_setting.negative_prompt, |
|
num_inference_steps=img_to_img_inference_steps * 3, |
|
guidance_scale=guidance_scale, |
|
num_images_per_prompt=lcm_diffusion_setting.number_of_images, |
|
).images |
|
|
|
else: |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.text_to_image.value |
|
): |
|
result_images = self.pipeline( |
|
prompt=lcm_diffusion_setting.prompt, |
|
negative_prompt=lcm_diffusion_setting.negative_prompt, |
|
num_inference_steps=lcm_diffusion_setting.inference_steps, |
|
guidance_scale=guidance_scale, |
|
width=lcm_diffusion_setting.image_width, |
|
height=lcm_diffusion_setting.image_height, |
|
num_images_per_prompt=lcm_diffusion_setting.number_of_images, |
|
timesteps=self._get_timesteps(), |
|
**pipeline_extra_args, |
|
**controlnet_args, |
|
).images |
|
|
|
elif ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
): |
|
result_images = self.img_to_img_pipeline( |
|
image=lcm_diffusion_setting.init_image, |
|
strength=lcm_diffusion_setting.strength, |
|
prompt=lcm_diffusion_setting.prompt, |
|
negative_prompt=lcm_diffusion_setting.negative_prompt, |
|
num_inference_steps=img_to_img_inference_steps, |
|
guidance_scale=guidance_scale, |
|
width=lcm_diffusion_setting.image_width, |
|
height=lcm_diffusion_setting.image_height, |
|
num_images_per_prompt=lcm_diffusion_setting.number_of_images, |
|
**pipeline_extra_args, |
|
**controlnet_args, |
|
).images |
|
|
|
for i, seed in enumerate(seeds): |
|
result_images[i].info["image_seed"] = seed |
|
|
|
return result_images |
|
|
|
def _init_gguf_diffusion( |
|
self, |
|
lcm_diffusion_setting: LCMDiffusionSetting, |
|
): |
|
config = ModelConfig() |
|
config.model_path = lcm_diffusion_setting.gguf_model.diffusion_path |
|
config.diffusion_model_path = lcm_diffusion_setting.gguf_model.diffusion_path |
|
config.clip_l_path = lcm_diffusion_setting.gguf_model.clip_path |
|
config.t5xxl_path = lcm_diffusion_setting.gguf_model.t5xxl_path |
|
config.vae_path = lcm_diffusion_setting.gguf_model.vae_path |
|
config.n_threads = GGUF_THREADS |
|
print(f"GGUF Threads : {GGUF_THREADS} ") |
|
print("GGUF - Model config") |
|
pprint(lcm_diffusion_setting.gguf_model.model_dump()) |
|
self.pipeline = GGUFDiffusion( |
|
get_app_path(), |
|
config, |
|
True, |
|
) |
|
|
|
def _generate_images_gguf( |
|
self, |
|
lcm_diffusion_setting: LCMDiffusionSetting, |
|
): |
|
if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value: |
|
t2iconfig = Txt2ImgConfig() |
|
t2iconfig.prompt = lcm_diffusion_setting.prompt |
|
t2iconfig.batch_count = lcm_diffusion_setting.number_of_images |
|
t2iconfig.cfg_scale = lcm_diffusion_setting.guidance_scale |
|
t2iconfig.height = lcm_diffusion_setting.image_height |
|
t2iconfig.width = lcm_diffusion_setting.image_width |
|
t2iconfig.sample_steps = lcm_diffusion_setting.inference_steps |
|
t2iconfig.sample_method = SampleMethod.EULER |
|
if lcm_diffusion_setting.use_seed: |
|
t2iconfig.seed = lcm_diffusion_setting.seed |
|
else: |
|
t2iconfig.seed = -1 |
|
|
|
return self.pipeline.generate_text2mg(t2iconfig) |
|
|