|
|
|
|
|
import torch |
|
import torch.nn.functional as F |
|
import copy |
|
import math |
|
import random |
|
from contextlib import nullcontext |
|
from einops import rearrange |
|
from scepter.modules.model.network.ldm import LatentDiffusion |
|
from scepter.modules.model.registry import MODELS, DIFFUSIONS, BACKBONES, LOSSES, TOKENIZERS, EMBEDDERS |
|
from scepter.modules.model.utils.basic_utils import check_list_of_list, to_device, pack_imagelist_into_tensor, \ |
|
limit_batch_data, unpack_tensor_into_imagelist, count_params, disabled_train |
|
from scepter.modules.utils.config import dict_to_yaml |
|
from scepter.modules.utils.distribute import we |
|
|
|
@MODELS.register_class() |
|
class LatentDiffusionACEPlus(LatentDiffusion): |
|
para_dict = LatentDiffusion.para_dict |
|
def __init__(self, cfg, logger=None): |
|
super().__init__(cfg, logger=logger) |
|
self.guide_scale = cfg.get('GUIDE_SCALE', 1.0) |
|
|
|
def init_params(self): |
|
self.parameterization = self.cfg.get('PARAMETERIZATION', 'rf') |
|
assert self.parameterization in [ |
|
'eps', 'x0', 'v', 'rf' |
|
], 'currently only supporting "eps" and "x0" and "v" and "rf"' |
|
|
|
diffusion_cfg = self.cfg.get("DIFFUSION", None) |
|
assert diffusion_cfg is not None |
|
if self.cfg.have("WORK_DIR"): |
|
diffusion_cfg.WORK_DIR = self.cfg.WORK_DIR |
|
self.diffusion = DIFFUSIONS.build(diffusion_cfg, logger=self.logger) |
|
|
|
self.pretrained_model = self.cfg.get('PRETRAINED_MODEL', None) |
|
self.ignore_keys = self.cfg.get('IGNORE_KEYS', []) |
|
|
|
self.model_config = self.cfg.DIFFUSION_MODEL |
|
self.first_stage_config = self.cfg.FIRST_STAGE_MODEL |
|
self.cond_stage_config = self.cfg.COND_STAGE_MODEL |
|
self.tokenizer_config = self.cfg.get('TOKENIZER', None) |
|
self.loss_config = self.cfg.get('LOSS', None) |
|
|
|
self.scale_factor = self.cfg.get('SCALE_FACTOR', 0.18215) |
|
self.size_factor = self.cfg.get('SIZE_FACTOR', 16) |
|
self.default_n_prompt = self.cfg.get('DEFAULT_N_PROMPT', '') |
|
self.default_n_prompt = '' if self.default_n_prompt is None else self.default_n_prompt |
|
self.p_zero = self.cfg.get('P_ZERO', 0.0) |
|
self.train_n_prompt = self.cfg.get('TRAIN_N_PROMPT', '') |
|
if self.default_n_prompt is None: |
|
self.default_n_prompt = '' |
|
if self.train_n_prompt is None: |
|
self.train_n_prompt = '' |
|
self.use_ema = self.cfg.get('USE_EMA', False) |
|
self.model_ema_config = self.cfg.get('DIFFUSION_MODEL_EMA', None) |
|
|
|
def construct_network(self): |
|
|
|
|
|
self.model = BACKBONES.build(self.model_config, logger=self.logger).to(torch.bfloat16) |
|
self.logger.info('all parameters:{}'.format(count_params(self.model))) |
|
if self.use_ema: |
|
if self.model_ema_config: |
|
self.model_ema = BACKBONES.build(self.model_ema_config, |
|
logger=self.logger) |
|
else: |
|
self.model_ema = copy.deepcopy(self.model) |
|
self.model_ema = self.model_ema.eval() |
|
for param in self.model_ema.parameters(): |
|
param.requires_grad = False |
|
if self.loss_config: |
|
self.loss = LOSSES.build(self.loss_config, logger=self.logger) |
|
if self.tokenizer_config is not None: |
|
self.tokenizer = TOKENIZERS.build(self.tokenizer_config, |
|
logger=self.logger) |
|
if self.first_stage_config: |
|
self.first_stage_model = MODELS.build(self.first_stage_config, |
|
logger=self.logger) |
|
self.first_stage_model = self.first_stage_model.eval() |
|
self.first_stage_model.train = disabled_train |
|
for param in self.first_stage_model.parameters(): |
|
param.requires_grad = False |
|
else: |
|
self.first_stage_model = None |
|
if self.tokenizer_config is not None: |
|
self.cond_stage_config.KWARGS = { |
|
'vocab_size': self.tokenizer.vocab_size |
|
} |
|
if self.cond_stage_config == '__is_unconditional__': |
|
print( |
|
f'Training {self.__class__.__name__} as an unconditional model.' |
|
) |
|
self.cond_stage_model = None |
|
else: |
|
model = EMBEDDERS.build(self.cond_stage_config, logger=self.logger) |
|
self.cond_stage_model = model.eval().requires_grad_(False) |
|
self.cond_stage_model.train = disabled_train |
|
|
|
@torch.no_grad() |
|
def encode_first_stage(self, x, **kwargs): |
|
def run_one_image(u): |
|
zu = self.first_stage_model.encode(u) |
|
if isinstance(zu, (tuple, list)): |
|
zu = zu[0] |
|
return zu |
|
|
|
z = [run_one_image(u.unsqueeze(0) if u.dim() == 3 else u) for u in x] |
|
return z |
|
|
|
@torch.no_grad() |
|
def decode_first_stage(self, z): |
|
return [self.first_stage_model.decode(zu) for zu in z] |
|
def noise_sample(self, num_samples, h, w, seed, dtype=torch.bfloat16): |
|
noise = torch.randn( |
|
num_samples, |
|
16, |
|
|
|
2 * math.ceil(h / 16), |
|
2 * math.ceil(w / 16), |
|
device=we.device_id, |
|
dtype=dtype, |
|
generator=torch.Generator(device=we.device_id).manual_seed(seed), |
|
) |
|
return noise |
|
def resize_func(self, x, size): |
|
if x is None: return x |
|
return F.interpolate(x.unsqueeze(0), size = size, mode='nearest-exact') |
|
def parse_ref_and_edit(self, src_image, |
|
modify_image, |
|
src_image_mask, |
|
text_embedding, |
|
|
|
edit_id): |
|
edit_image = [] |
|
modi_image = [] |
|
edit_mask = [] |
|
ref_image = [] |
|
ref_mask = [] |
|
ref_context = [] |
|
ref_y = [] |
|
ref_id = [] |
|
txt = [] |
|
txt_y = [] |
|
for sample_id, (one_src, |
|
one_modify, |
|
one_src_mask, |
|
one_text_embedding, |
|
one_text_y, |
|
|
|
one_edit_id) in enumerate(zip(src_image, |
|
modify_image, |
|
src_image_mask, |
|
text_embedding["context"], |
|
text_embedding["y"], |
|
|
|
edit_id) |
|
): |
|
ref_id.append([i for i in range(len(one_src))]) |
|
if hasattr(self, "ref_cond_stage_model") and self.ref_cond_stage_model: |
|
ref_image.append(self.ref_cond_stage_model.encode_list([((i + 1.0) / 2.0 * 255).type(torch.uint8) for i in one_src])) |
|
else: |
|
ref_image.append(one_src) |
|
ref_mask.append(one_src_mask) |
|
|
|
current_edit_image = to_device([one_src[i] for i in one_edit_id], strict=False) |
|
current_edit_image = [v.squeeze(0) for v in self.encode_first_stage(current_edit_image)] |
|
|
|
current_modify_image = to_device([one_modify[i] for i in one_edit_id], |
|
strict=False) |
|
current_modify_image = [ |
|
v.squeeze(0) |
|
for v in self.encode_first_stage(current_modify_image) |
|
] |
|
current_edit_image_mask = to_device( |
|
[one_src_mask[i] for i in one_edit_id], strict=False) |
|
current_edit_image_mask = [ |
|
self.reshape_func(m).squeeze(0) |
|
for m in current_edit_image_mask |
|
] |
|
|
|
edit_image.append(current_edit_image) |
|
modi_image.append(current_modify_image) |
|
edit_mask.append(current_edit_image_mask) |
|
ref_context.append(one_text_embedding[:len(ref_id[-1])]) |
|
ref_y.append(one_text_y[:len(ref_id[-1])]) |
|
if not sum(len(src_) for src_ in src_image) > 0: |
|
ref_image = None |
|
ref_context = None |
|
ref_y = None |
|
for sample_id, (one_text_embedding, one_text_y) in enumerate(zip(text_embedding["context"], |
|
text_embedding["y"])): |
|
txt.append(one_text_embedding[-1].squeeze(0)) |
|
txt_y.append(one_text_y[-1]) |
|
return { |
|
"edit": edit_image, |
|
'modify': modi_image, |
|
"edit_mask": edit_mask, |
|
"edit_id": edit_id, |
|
"ref_context": ref_context, |
|
"ref_y": ref_y, |
|
"context": txt, |
|
"y": txt_y, |
|
"ref_x": ref_image, |
|
"ref_mask": ref_mask, |
|
"ref_id": ref_id |
|
} |
|
|
|
|
|
def reshape_func(self, mask): |
|
mask = mask.to(torch.bfloat16) |
|
mask = mask.view((-1, mask.shape[-2], mask.shape[-1])) |
|
mask = rearrange( |
|
mask, |
|
"c (h ph) (w pw) -> c (ph pw) h w", |
|
ph=8, |
|
pw=8, |
|
) |
|
return mask |
|
|
|
def forward_train(self, |
|
src_image_list=[], |
|
modify_image_list=[], |
|
src_mask_list=[], |
|
edit_id=[], |
|
image=None, |
|
image_mask=None, |
|
noise=None, |
|
prompt=[], |
|
**kwargs): |
|
''' |
|
Args: |
|
src_image: list of list of src_image |
|
src_image_mask: list of list of src_image_mask |
|
image: target image |
|
image_mask: target image mask |
|
noise: default is None, generate automaticly |
|
ref_prompt: list of list of text |
|
prompt: list of text |
|
**kwargs: |
|
Returns: |
|
''' |
|
assert check_list_of_list(src_image_list) and check_list_of_list( |
|
src_mask_list) |
|
assert self.cond_stage_model is not None |
|
|
|
gc_seg = kwargs.pop("gc_seg", []) |
|
gc_seg = int(gc_seg[0]) if len(gc_seg) > 0 else 0 |
|
align = kwargs.pop("align", []) |
|
prompt_ = [[pp] if isinstance(pp, str) else pp for pp in prompt] |
|
if len(align) < 1: align = [0] * len(prompt_) |
|
context = getattr(self.cond_stage_model, 'encode_list_of_list')(prompt_) |
|
guide_scale = self.guide_scale |
|
if guide_scale is not None: |
|
guide_scale = torch.full((len(prompt_),), guide_scale, device=we.device_id) |
|
else: |
|
guide_scale = None |
|
|
|
|
|
if check_list_of_list(image): |
|
image = [to_device(ix) for ix in image] |
|
x_start = [self.encode_first_stage(ix, **kwargs) for ix in image] |
|
noise = [[torch.randn_like(ii) for ii in ix] for ix in x_start] |
|
x_start = [torch.cat(ix, dim=-1) for ix in x_start] |
|
noise = [torch.cat(ix, dim=-1) for ix in noise] |
|
|
|
noise, _ = pack_imagelist_into_tensor(noise) |
|
|
|
image_mask = [to_device(im, strict=False) for im in image_mask] |
|
x_mask = [[self.reshape_func(i).squeeze(0) for i in im] if im is not None else [None] * len(ix) for ix, im in zip(image, image_mask)] |
|
x_mask = [torch.cat(im, dim=-1) for im in x_mask] |
|
else: |
|
image = to_device(image) |
|
x_start = self.encode_first_stage(image, **kwargs) |
|
image_mask = to_device(image_mask, strict=False) |
|
x_mask = [self.reshape_func(i).squeeze(0) for i in image_mask] if image_mask is not None else [None] * len( |
|
image) |
|
loss_mask, _ = pack_imagelist_into_tensor( |
|
tuple(torch.ones_like(ix, dtype=torch.bool, device=ix.device) for ix in x_start)) |
|
x_start, x_shapes = pack_imagelist_into_tensor(x_start) |
|
context['x_shapes'] = x_shapes |
|
context['align'] = align |
|
|
|
|
|
context['x_mask'] = x_mask |
|
ref_edit_context = self.parse_ref_and_edit(src_image_list, modify_image_list, src_mask_list, context, edit_id) |
|
context.update(ref_edit_context) |
|
|
|
teacher_context = copy.deepcopy(context) |
|
teacher_context["context"] = torch.cat(teacher_context["context"], dim=0) |
|
teacher_context["y"] = torch.cat(teacher_context["y"], dim=0) |
|
loss = self.diffusion.loss(x_0=x_start, |
|
model=self.model, |
|
model_kwargs={"cond": context, |
|
"gc_seg": gc_seg, |
|
"guidance": guide_scale}, |
|
noise=noise, |
|
reduction='none', |
|
**kwargs) |
|
loss = loss[loss_mask].mean() |
|
ret = {'loss': loss, 'probe_data': {'prompt': prompt}} |
|
return ret |
|
|
|
@torch.no_grad() |
|
def forward_test(self, |
|
src_image_list=[], |
|
modify_image_list=[], |
|
src_mask_list=[], |
|
edit_id=[], |
|
image=None, |
|
image_mask=None, |
|
prompt=[], |
|
sampler='flow_euler', |
|
sample_steps=20, |
|
seed=2023, |
|
guide_scale=3.5, |
|
guide_rescale=0.0, |
|
show_process=False, |
|
log_num=-1, |
|
**kwargs): |
|
outputs = self.forward_editing( |
|
src_image_list=src_image_list, |
|
src_mask_list=src_mask_list, |
|
modify_image_list=modify_image_list, |
|
edit_id=edit_id, |
|
image=image, |
|
image_mask=image_mask, |
|
prompt=prompt, |
|
sampler=sampler, |
|
sample_steps=sample_steps, |
|
seed=seed, |
|
guide_scale=guide_scale, |
|
guide_rescale=guide_rescale, |
|
show_process=show_process, |
|
log_num=log_num, |
|
**kwargs |
|
) |
|
return outputs |
|
|
|
@torch.no_grad() |
|
def forward_editing(self, |
|
src_image_list=[], |
|
modify_image_list=None, |
|
src_mask_list=[], |
|
edit_id=[], |
|
image=None, |
|
image_mask=None, |
|
prompt=[], |
|
sampler='flow_euler', |
|
sample_steps=20, |
|
seed=2023, |
|
guide_scale=3.5, |
|
log_num=-1, |
|
**kwargs |
|
): |
|
|
|
prompt, image, image_mask, src_image, modify_image, src_image_mask, edit_id = limit_batch_data( |
|
[prompt, image, image_mask, src_image_list, modify_image_list, src_mask_list, edit_id], log_num) |
|
assert check_list_of_list(src_image) and check_list_of_list(src_image_mask) |
|
assert self.cond_stage_model is not None |
|
align = kwargs.pop("align", []) |
|
prompt_ = [[pp] if isinstance(pp, str) else pp for pp in prompt] |
|
if len(align) < 1: align = [0] * len(prompt_) |
|
context = getattr(self.cond_stage_model, 'encode_list_of_list')(prompt_) |
|
guide_scale = guide_scale or self.guide_scale |
|
if guide_scale is not None: |
|
guide_scale = torch.full((len(prompt),), guide_scale, device=we.device_id) |
|
else: |
|
guide_scale = None |
|
|
|
seed = seed if seed >= 0 else random.randint(0, 2 ** 32 - 1) |
|
if image is not None: |
|
if check_list_of_list(image): |
|
image = [torch.cat(ix, dim=-1) for ix in image] |
|
image_mask = [torch.cat(im, dim=-1) for im in image_mask] |
|
noise = [self.noise_sample(1, ix.shape[1], ix.shape[2], seed) for ix in image] |
|
else: |
|
height, width = kwargs.pop("height"), kwargs.pop("width") |
|
noise = [self.noise_sample(1, height, width, seed) for _ in prompt] |
|
noise, x_shapes = pack_imagelist_into_tensor(noise) |
|
context['x_shapes'] = x_shapes |
|
context['align'] = align |
|
|
|
image_mask = to_device(image_mask, strict=False) |
|
x_mask = [self.reshape_func(i).squeeze(0) for i in image_mask] |
|
context['x_mask'] = x_mask |
|
ref_edit_context = self.parse_ref_and_edit(src_image, modify_image, src_image_mask, context, edit_id) |
|
context.update(ref_edit_context) |
|
|
|
|
|
|
|
model = self.model |
|
embedding_context = model.no_sync if isinstance(model, torch.distributed.fsdp.FullyShardedDataParallel) \ |
|
else nullcontext |
|
with embedding_context(): |
|
samples = self.diffusion.sample( |
|
noise=noise, |
|
sampler=sampler, |
|
model=self.model, |
|
model_kwargs={"cond": context, "guidance": guide_scale, "gc_seg": -1 |
|
}, |
|
steps=sample_steps, |
|
show_progress=True, |
|
guide_scale=guide_scale, |
|
return_intermediate=None, |
|
**kwargs).float() |
|
samples = unpack_tensor_into_imagelist(samples, x_shapes) |
|
with torch.autocast(device_type="cuda", dtype=torch.bfloat16): |
|
x_samples = self.decode_first_stage(samples) |
|
outputs = list() |
|
for i in range(len(prompt)): |
|
rec_img = torch.clamp((x_samples[i].float() + 1.0) / 2.0, min=0.0, max=1.0) |
|
rec_img = rec_img.squeeze(0) |
|
edit_imgs, modify_imgs, edit_img_masks = [], [], [] |
|
if src_image is not None and src_image[i] is not None: |
|
if src_image_mask[i] is None: |
|
src_image_mask[i] = [None] * len(src_image[i]) |
|
for edit_img, modify_img, edit_mask in zip(src_image[i], modify_image_list[i], src_image_mask[i]): |
|
edit_img = torch.clamp((edit_img.float() + 1.0) / 2.0, min=0.0, max=1.0) |
|
edit_imgs.append(edit_img.squeeze(0)) |
|
modify_img = torch.clamp((modify_img.float() + 1.0) / 2.0, |
|
min=0.0, |
|
max=1.0) |
|
modify_imgs.append(modify_img.squeeze(0)) |
|
if edit_mask is None: |
|
edit_mask = torch.ones_like(edit_img[[0], :, :]) |
|
edit_img_masks.append(edit_mask) |
|
one_tup = { |
|
'reconstruct_image': rec_img, |
|
'instruction': prompt[i], |
|
'edit_image': edit_imgs if len(edit_imgs) > 0 else None, |
|
'modify_image': modify_imgs if len(modify_imgs) > 0 else None, |
|
'edit_mask': edit_img_masks if len(edit_imgs) > 0 else None |
|
} |
|
if image is not None: |
|
if image_mask is None: |
|
image_mask = [None] * len(image) |
|
ori_img = torch.clamp((image[i] + 1.0) / 2.0, min=0.0, max=1.0) |
|
one_tup['target_image'] = ori_img.squeeze(0) |
|
one_tup['target_mask'] = image_mask[i] if image_mask[i] is not None else torch.ones_like( |
|
ori_img[[0], :, :]) |
|
outputs.append(one_tup) |
|
return outputs |
|
|
|
@staticmethod |
|
def get_config_template(): |
|
return dict_to_yaml('MODEL', |
|
__class__.__name__, |
|
LatentDiffusionACEPlus.para_dict, |
|
set_name=True) |
|
|
|
|