vace-demo / vace /gradios /vace_preproccess_demo.py
maffia's picture
Upload 94 files
690f890 verified
# -*- coding: utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
import shutil
import sys
import json
import os
import argparse
import datetime
import copy
import random
import cv2
import imageio
import numpy as np
import gradio as gr
import tempfile
from pycocotools import mask as mask_utils
sys.path.insert(0, os.path.sep.join(os.path.realpath(__file__).split(os.path.sep)[:-3]))
from vace.annotators.utils import single_rle_to_mask, read_video_frames, save_one_video, read_video_one_frame
from vace.configs import VACE_IMAGE_PREPROCCESS_CONFIGS, VACE_IMAGE_MASK_PREPROCCESS_CONFIGS, VACE_IMAGE_MASKAUG_PREPROCCESS_CONFIGS, VACE_VIDEO_PREPROCCESS_CONFIGS, VACE_VIDEO_MASK_PREPROCCESS_CONFIGS, VACE_VIDEO_MASKAUG_PREPROCCESS_CONFIGS, VACE_COMPOSITION_PREPROCCESS_CONFIGS
import vace.annotators as annotators
def tid_maker():
return '{0:%Y%m%d%H%M%S%f}'.format(datetime.datetime.now())
def dict_to_markdown_table(d):
markdown = "| Key | Value |\n"
markdown += "| --- | ----- |\n"
for key, value in d.items():
markdown += f"| {key} | {value} |\n"
return markdown
class VACEImageTag():
def __init__(self, cfg):
self.save_dir = os.path.join(cfg.save_dir, 'image')
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
self.image_anno_processor = {}
self.load_image_anno_list = ["image_plain", "image_depth", "image_gray", "image_pose", "image_scribble", "image_outpainting"]
for anno_name, anno_cfg in copy.deepcopy(VACE_IMAGE_PREPROCCESS_CONFIGS).items():
if anno_name not in self.load_image_anno_list: continue
class_name = anno_cfg.pop("NAME")
input_params = anno_cfg.pop("INPUTS")
output_params = anno_cfg.pop("OUTPUTS")
anno_ins = getattr(annotators, class_name)(cfg=anno_cfg)
self.image_anno_processor[anno_name] = {"inputs": input_params, "outputs": output_params,
"anno_ins": anno_ins}
self.mask_anno_processor = {}
self.load_mask_anno_list = ["image_mask_plain", "image_mask_seg", "image_mask_draw", "image_mask_face"]
for anno_name, anno_cfg in copy.deepcopy(VACE_IMAGE_MASK_PREPROCCESS_CONFIGS).items():
if anno_name not in self.load_mask_anno_list: continue
class_name = anno_cfg.pop("NAME")
input_params = anno_cfg.pop("INPUTS")
output_params = anno_cfg.pop("OUTPUTS")
anno_ins = getattr(annotators, class_name)(cfg=anno_cfg)
self.mask_anno_processor[anno_name] = {"inputs": input_params, "outputs": output_params,
"anno_ins": anno_ins}
self.maskaug_anno_processor = {}
self.load_maskaug_anno_list = ["image_maskaug_plain", "image_maskaug_invert", "image_maskaug", "image_maskaug_region_random", "image_maskaug_region_crop"]
for anno_name, anno_cfg in copy.deepcopy(VACE_IMAGE_MASKAUG_PREPROCCESS_CONFIGS).items():
if anno_name not in self.load_maskaug_anno_list: continue
class_name = anno_cfg.pop("NAME")
input_params = anno_cfg.pop("INPUTS")
output_params = anno_cfg.pop("OUTPUTS")
anno_ins = getattr(annotators, class_name)(cfg=anno_cfg)
self.maskaug_anno_processor[anno_name] = {"inputs": input_params, "outputs": output_params,
"anno_ins": anno_ins}
self.seg_type = ['maskpointtrack', 'maskbboxtrack', 'masktrack', 'salientmasktrack', 'salientbboxtrack', 'label', 'caption']
self.seg_draw_type = ['maskpoint', 'maskbbox', 'mask']
def create_ui_image(self, *args, **kwargs):
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.input_process_image = gr.ImageMask(
label="input_process_image",
layers=False,
type='pil',
format='png',
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.output_process_image = gr.Image(
label="output_process_image",
value=None,
type='pil',
image_mode='RGB',
format='png',
interactive=False)
with gr.Column(scale=1):
with gr.Row():
self.output_process_masked_image = gr.Image(
label="output_process_masked_image",
value=None,
type='pil',
image_mode='RGB',
format='png',
interactive=False)
with gr.Column(scale=1):
with gr.Row():
self.output_process_mask = gr.Image(
label="output_process_mask",
value=None,
type='pil',
image_mode='L',
format='png',
interactive=False)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.image_process_type = gr.Dropdown(
label='Image Annotator',
choices=list(self.image_anno_processor.keys()),
value=list(self.image_anno_processor.keys())[0],
interactive=True)
with gr.Row(visible=False) as self.outpainting_setting:
self.outpainting_direction = gr.Dropdown(
multiselect=True,
label='Outpainting Direction',
choices=['left', 'right', 'up', 'down'],
value=['left', 'right', 'up', 'down'],
interactive=True)
self.outpainting_ratio = gr.Slider(
label='Outpainting Ratio',
minimum=0.0,
maximum=2.0,
step=0.1,
value=0.3,
interactive=True)
with gr.Column(scale=1):
with gr.Row():
self.mask_process_type = gr.Dropdown(
label='Mask Annotator',
choices=list(self.mask_anno_processor.keys()),
value=list(self.mask_anno_processor.keys())[0],
interactive=True)
with gr.Row():
self.mask_opacity = gr.Slider(
label='Mask Opacity',
minimum=0.0,
maximum=1.0,
step=0.1,
value=1.0,
interactive=True)
self.mask_gray = gr.Checkbox(
label='Mask Gray',
value=True,
interactive=True)
with gr.Row(visible=False) as self.segment_setting:
self.mask_type = gr.Dropdown(
label='Segment Type',
choices=self.seg_type,
value='maskpointtrack',
interactive=True)
self.mask_segtag = gr.Textbox(
label='Mask Seg Tag',
value='',
interactive=True)
with gr.Column(scale=1):
with gr.Row():
self.mask_aug_process_type = gr.Dropdown(
label='Mask Aug Annotator',
choices=list(self.maskaug_anno_processor.keys()),
value=list(self.maskaug_anno_processor.keys())[0],
interactive=True)
with gr.Row(visible=False) as self.maskaug_setting:
self.mask_aug_type = gr.Dropdown(
label='Mask Aug Type',
choices=['random', 'original', 'original_expand', 'hull', 'hull_expand', 'bbox', 'bbox_expand'],
value='original',
interactive=True)
self.mask_expand_ratio = gr.Slider(
label='Mask Expand Ratio',
minimum=0.0,
maximum=1.0,
step=0.1,
value=0.3,
interactive=True)
self.mask_expand_iters = gr.Slider(
label='Mask Expand Iters',
minimum=1,
maximum=10,
step=1,
value=5,
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.process_button = gr.Button(
value='[1]Sample Process',
elem_classes='type_row',
elem_id='process_button',
visible=True)
with gr.Row():
self.save_button = gr.Button(
value='[2]Sample Save',
elem_classes='type_row',
elem_id='save_button',
visible=True)
with gr.Row():
self.save_log = gr.Markdown()
def change_process_type(self, image_process_type, mask_process_type, mask_aug_process_type):
outpainting_setting_visible = False
segment_setting = False
maskaug_setting = False
segment_choices = self.seg_type
if image_process_type == "image_outpainting":
outpainting_setting_visible = True
if mask_process_type in ["image_mask_seg", "image_mask_draw"]:
segment_setting = True
if mask_process_type in ["image_mask_draw"]:
segment_choices = self.seg_draw_type
if mask_aug_process_type in ["image_maskaug", "image_maskaug_region_random", "image_maskaug_region_crop"]:
maskaug_setting = True
return gr.update(visible=outpainting_setting_visible), gr.update(visible=segment_setting), gr.update(choices=segment_choices, value=segment_choices[0]), gr.update(visible=maskaug_setting)
def process_image_data(self, input_process_image, image_process_type, outpainting_direction, outpainting_ratio, mask_process_type, mask_type, mask_segtag, mask_opacity, mask_gray, mask_aug_process_type, mask_aug_type, mask_expand_ratio, mask_expand_iters):
image = np.array(input_process_image['background'].convert('RGB'))
mask = np.array(input_process_image['layers'][0].split()[-1].convert('L'))
image_shape = image.shape
if image_process_type in ['image_outpainting']:
ret_data = self.image_anno_processor[image_process_type]['anno_ins'].forward(image, direction=outpainting_direction, expand_ratio=outpainting_ratio)
image, mask = ret_data['image'], ret_data['mask']
else:
image = self.image_anno_processor[image_process_type]['anno_ins'].forward(image)
if image.shape != image_shape:
image = cv2.resize(image, image_shape[:2][::-1], interpolation=cv2.INTER_LINEAR)
if mask_process_type in ["image_mask_seg"]:
mask = mask[..., None]
mask = self.mask_anno_processor[mask_process_type]['anno_ins'].forward(image, mask=mask, label=mask_segtag, caption=mask_segtag, mode=mask_type)['mask']
elif mask_process_type in ['image_mask_draw']:
ret_data = self.mask_anno_processor[mask_process_type]['anno_ins'].forward(mask=mask, mode=mask_type)
mask = ret_data['mask'] if isinstance(ret_data, dict) and 'mask' in ret_data else ret_data
elif mask_process_type in ['image_mask_face']:
ret_data = self.mask_anno_processor[mask_process_type]['anno_ins'].forward(image=image)
mask = ret_data['mask'] if isinstance(ret_data, dict) and 'mask' in ret_data else ret_data
else:
ret_data = self.mask_anno_processor[mask_process_type]['anno_ins'].forward(mask=mask)
mask = ret_data['mask'] if isinstance(ret_data, dict) and 'mask' in ret_data else ret_data
mask_cfg = {
'mode': mask_aug_type,
'kwargs': {
'expand_ratio': mask_expand_ratio,
'expand_iters': mask_expand_iters
}
}
if mask_aug_process_type == 'image_maskaug':
mask = self.maskaug_anno_processor[mask_aug_process_type]['anno_ins'].forward(np.array(mask), mask_cfg)
elif mask_aug_process_type in ["image_maskaug_region_random", "image_maskaug_region_crop"]:
image = self.maskaug_anno_processor[mask_aug_process_type]['anno_ins'].forward(np.array(image), np.array(mask), mask_cfg=mask_cfg)
else:
ret_data = self.maskaug_anno_processor[mask_aug_process_type]['anno_ins'].forward(mask=mask)
mask = ret_data['mask'] if isinstance(ret_data, dict) and 'mask' in ret_data else ret_data
if mask_opacity > 0:
if mask.shape[:2] != image.shape[:2]:
raise gr.Error(f"Mask shape {mask.shape[:2]} should be the same as image shape {image.shape[:2]} or set mask_opacity to 0.")
if mask_gray:
masked_image = image.copy()
masked_image[mask == 255] = 127.5
else:
mask_weight = mask / 255 * mask_opacity
masked_image = np.clip(image * (1 - mask_weight[:, :, None]), 0, 255).astype(np.uint8)
else:
masked_image = image
return image, masked_image, mask
def save_image_data(self, input_image, image, masked_image, mask):
save_data = {
"input_image": input_image['background'].convert('RGB') if isinstance(input_image, dict) else input_image,
"input_image_mask": input_image['layers'][0].split()[-1].convert('L') if isinstance(input_image, dict) else None,
"output_image": image,
"output_masked_image": masked_image,
"output_image_mask": mask
}
save_info = {}
tid = tid_maker()
for name, image in save_data.items():
if image is None: continue
save_image_dir = os.path.join(self.save_dir, tid[:8])
if not os.path.exists(save_image_dir): os.makedirs(save_image_dir)
save_image_path = os.path.join(save_image_dir, tid + '-' + name + '.png')
save_info[name] = save_image_path
image.save(save_image_path)
gr.Info(f'Save {name} to {save_image_path}', duration=15)
save_txt_path = os.path.join(self.save_dir, tid[:8], tid + '.txt')
save_info['save_info'] = save_txt_path
with open(save_txt_path, 'w') as f:
f.write(json.dumps(save_info, ensure_ascii=False))
return dict_to_markdown_table(save_info)
def set_callbacks_image(self, **kwargs):
inputs = [self.input_process_image, self.image_process_type, self.outpainting_direction, self.outpainting_ratio, self.mask_process_type, self.mask_type, self.mask_segtag, self.mask_opacity, self.mask_gray, self.mask_aug_process_type, self.mask_aug_type, self.mask_expand_ratio, self.mask_expand_iters]
outputs = [self.output_process_image, self.output_process_masked_image, self.output_process_mask]
self.process_button.click(self.process_image_data,
inputs=inputs,
outputs=outputs)
self.save_button.click(self.save_image_data,
inputs=[self.input_process_image, self.output_process_image, self.output_process_masked_image, self.output_process_mask],
outputs=[self.save_log])
process_inputs = [self.image_process_type, self.mask_process_type, self.mask_aug_process_type]
process_outputs = [self.outpainting_setting, self.segment_setting, self.mask_type, self.maskaug_setting]
self.image_process_type.change(self.change_process_type, inputs=process_inputs, outputs=process_outputs)
self.mask_process_type.change(self.change_process_type, inputs=process_inputs, outputs=process_outputs)
self.mask_aug_process_type.change(self.change_process_type, inputs=process_inputs, outputs=process_outputs)
class VACEVideoTag():
def __init__(self, cfg):
self.save_dir = os.path.join(cfg.save_dir, 'video')
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
self.video_anno_processor = {}
self.load_video_anno_list = ["plain", "depth", "flow", "gray", "pose", "scribble", "outpainting", "outpainting_inner", "framerefext"]
for anno_name, anno_cfg in copy.deepcopy(VACE_VIDEO_PREPROCCESS_CONFIGS).items():
if anno_name not in self.load_video_anno_list: continue
class_name = anno_cfg.pop("NAME")
input_params = anno_cfg.pop("INPUTS")
output_params = anno_cfg.pop("OUTPUTS")
anno_ins = getattr(annotators, class_name)(cfg=anno_cfg)
self.video_anno_processor[anno_name] = {"inputs": input_params, "outputs": output_params,
"anno_ins": anno_ins}
self.mask_anno_processor = {}
self.load_mask_anno_list = ["mask_expand", "mask_seg"]
for anno_name, anno_cfg in copy.deepcopy(VACE_VIDEO_MASK_PREPROCCESS_CONFIGS).items():
if anno_name not in self.load_mask_anno_list: continue
class_name = anno_cfg.pop("NAME")
input_params = anno_cfg.pop("INPUTS")
output_params = anno_cfg.pop("OUTPUTS")
anno_ins = getattr(annotators, class_name)(cfg=anno_cfg)
self.mask_anno_processor[anno_name] = {"inputs": input_params, "outputs": output_params,
"anno_ins": anno_ins}
self.maskaug_anno_processor = {}
self.load_maskaug_anno_list = ["maskaug_plain", "maskaug_invert", "maskaug", "maskaug_layout"]
for anno_name, anno_cfg in copy.deepcopy(VACE_VIDEO_MASKAUG_PREPROCCESS_CONFIGS).items():
if anno_name not in self.load_maskaug_anno_list: continue
class_name = anno_cfg.pop("NAME")
input_params = anno_cfg.pop("INPUTS")
output_params = anno_cfg.pop("OUTPUTS")
anno_ins = getattr(annotators, class_name)(cfg=anno_cfg)
self.maskaug_anno_processor[anno_name] = {"inputs": input_params, "outputs": output_params,
"anno_ins": anno_ins}
def create_ui_video(self, *args, **kwargs):
with gr.Row(variant="panel"):
with gr.Column(scale=1):
self.input_process_video = gr.Video(
label="input_process_video",
sources=['upload'],
interactive=True)
self.input_process_image_show = gr.Image(
label="input_process_image_show",
format='png',
interactive=False)
with gr.Column(scale=2):
self.input_process_image = gr.ImageMask(
label="input_process_image",
layers=False,
type='pil',
format='png',
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.output_process_video = gr.Video(
label="output_process_video",
value=None,
interactive=False)
with gr.Column(scale=1):
with gr.Row():
self.output_process_masked_video = gr.Video(
label="output_process_masked_video",
value=None,
interactive=False)
with gr.Column(scale=1):
with gr.Row():
self.output_process_video_mask = gr.Video(
label="output_process_video_mask",
value=None,
interactive=False)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.video_process_type = gr.Dropdown(
label='Video Annotator',
choices=list(self.video_anno_processor.keys()),
value=list(self.video_anno_processor.keys())[0],
interactive=True)
with gr.Row(visible=False) as self.outpainting_setting:
self.outpainting_direction = gr.Dropdown(
multiselect=True,
label='Outpainting Direction',
choices=['left', 'right', 'up', 'down'],
value=['left', 'right', 'up', 'down'],
interactive=True)
self.outpainting_ratio = gr.Slider(
label='Outpainting Ratio',
minimum=0.0,
maximum=2.0,
step=0.1,
value=0.3,
interactive=True)
with gr.Row(visible=False) as self.frame_reference_setting:
self.frame_reference_mode = gr.Dropdown(
label='Frame Reference Mode',
choices=['first', 'last', 'firstlast', 'random'],
value='first',
interactive=True)
self.frame_reference_num = gr.Textbox(
label='Frame Reference Num',
value='1',
interactive=True)
with gr.Column(scale=1):
with gr.Row():
self.mask_process_type = gr.Dropdown(
label='Mask Annotator',
choices=list(self.mask_anno_processor.keys()),
value=list(self.mask_anno_processor.keys())[0],
interactive=True)
with gr.Row():
self.mask_opacity = gr.Slider(
label='Mask Opacity',
minimum=0.0,
maximum=1.0,
step=0.1,
value=1.0,
interactive=True)
self.mask_gray = gr.Checkbox(
label='Mask Gray',
value=True,
interactive=True)
with gr.Row(visible=False) as self.segment_setting:
self.mask_type = gr.Dropdown(
label='Segment Type',
choices=['maskpointtrack', 'maskbboxtrack', 'masktrack', 'salientmasktrack', 'salientbboxtrack',
'label', 'caption'],
value='maskpointtrack',
interactive=True)
self.mask_segtag = gr.Textbox(
label='Mask Seg Tag',
value='',
interactive=True)
with gr.Column(scale=1):
with gr.Row():
self.mask_aug_process_type = gr.Dropdown(
label='Mask Aug Annotator',
choices=list(self.maskaug_anno_processor.keys()),
value=list(self.maskaug_anno_processor.keys())[0],
interactive=True)
with gr.Row(visible=False) as self.maskaug_setting:
self.mask_aug_type = gr.Dropdown(
label='Mask Aug Type',
choices=['random', 'original', 'original_expand', 'hull', 'hull_expand', 'bbox', 'bbox_expand'],
value='original',
interactive=True)
self.mask_expand_ratio = gr.Slider(
label='Mask Expand Ratio',
minimum=0.0,
maximum=1.0,
step=0.1,
value=0.3,
interactive=True)
self.mask_expand_iters = gr.Slider(
label='Mask Expand Iters',
minimum=1,
maximum=10,
step=1,
value=5,
interactive=True)
self.mask_layout_label = gr.Textbox(
label='Mask Layout Label',
value='',
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.process_button = gr.Button(
value='[1]Sample Process',
elem_classes='type_row',
elem_id='process_button',
visible=True)
with gr.Row():
self.save_button = gr.Button(
value='[2]Sample Save',
elem_classes='type_row',
elem_id='save_button',
visible=True)
with gr.Row():
self.save_log = gr.Markdown()
def process_video_data(self, input_process_video, input_process_image, video_process_type, outpainting_direction, outpainting_ratio, frame_reference_mode, frame_reference_num, mask_process_type, mask_type, mask_segtag, mask_opacity, mask_gray, mask_aug_process_type, mask_aug_type, mask_expand_ratio, mask_expand_iters, mask_layout_label):
video_frames, fps, width, height, total_frames = read_video_frames(input_process_video, use_type='cv2', info=True)
# image = np.array(input_process_image['background'].convert('RGB'))
mask = input_process_image['layers'][0].split()[-1].convert('L')
if mask.height != height and mask.width != width:
mask = mask.resize((width, height))
if mask_process_type in ['mask_seg']:
mask_data = self.mask_anno_processor[mask_process_type]['anno_ins'].forward(video=input_process_video, mask=mask, label=mask_segtag, caption=mask_segtag, mode=mask_type, return_frame=False)
mask_frames = mask_data['masks']
elif mask_process_type in ['mask_expand']:
mask_frames = self.mask_anno_processor[mask_process_type]['anno_ins'].forward(mask=np.array(mask), expand_num=total_frames)
else:
raise NotImplementedError
output_video = []
if video_process_type in ['framerefext']:
output_data = self.video_anno_processor[video_process_type]['anno_ins'].forward(video_frames, ref_cfg={'mode': frame_reference_mode}, ref_num=frame_reference_num)
output_video, mask_frames = output_data['frames'], output_data['masks']
elif video_process_type in ['outpainting', 'outpainting_inner']:
# ratio = ((16 / 9 * height) / width - 1) / 2
output_data = self.video_anno_processor[video_process_type]['anno_ins'].forward(video_frames, direction=outpainting_direction, expand_ratio=outpainting_ratio)
output_video, mask_frames = output_data['frames'], output_data['masks']
else:
output_video = self.video_anno_processor[video_process_type]['anno_ins'].forward(video_frames)
mask_cfg = {
'mode': mask_aug_type,
'kwargs': {
'expand_ratio': mask_expand_ratio,
'expand_iters': mask_expand_iters
}
}
# print(mask_cfg)
if mask_aug_process_type == 'maskaug_layout':
output_video = self.maskaug_anno_processor[mask_aug_process_type]['anno_ins'].forward(mask_frames, mask_cfg=mask_cfg, label=mask_layout_label)
mask_aug_frames = [ np.ones_like(submask) * 255 for submask in mask_frames ]
else:
mask_aug_frames = self.maskaug_anno_processor[mask_aug_process_type]['anno_ins'].forward(mask_frames)
with (tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as output_video_path, \
tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as masked_video_path, \
tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as mask_video_path):
output_video_writer = imageio.get_writer(output_video_path.name, codec='libx264', fps=fps, quality=8, macro_block_size=None)
masked_video_writer = imageio.get_writer(masked_video_path.name, codec='libx264', fps=fps, quality=8, macro_block_size=None)
mask_video_writer = imageio.get_writer(mask_video_path.name, codec='libx264', fps=fps, quality=8, macro_block_size=None)
for i in range(total_frames):
output_frame = output_video[i] if len(output_video) > 0 else video_frames[i]
frame = output_video[i] if len(output_video) > 0 else video_frames[i]
mask = mask_aug_frames[i]
if mask_gray:
masked_image = frame.copy()
masked_image[mask == 255] = 127.5
else:
mask_weight = mask / 255 * mask_opacity
masked_image = np.clip(frame * (1 - mask_weight[:, :, None]), 0, 255).astype(np.uint8)
output_video_writer.append_data(output_frame)
masked_video_writer.append_data(masked_image)
mask_video_writer.append_data(mask)
output_video_writer.close()
masked_video_writer.close()
mask_video_writer.close()
return output_video_path.name, masked_video_path.name, mask_video_path.name
def save_video_data(self, input_video_path, input_image, video_path, masked_video_path, mask_path):
save_image_data = {
"input_image": input_image['background'].convert('RGB') if isinstance(input_image, dict) else input_image,
"input_image_mask": input_image['layers'][0].split()[-1].convert('L') if isinstance(input_image, dict) else None
}
save_video_data = {
"input_video": input_video_path,
"output_video": video_path,
"output_masked_video": masked_video_path,
"output_video_mask": mask_path
}
save_info = {}
tid = tid_maker()
for name, image in save_image_data.items():
if image is None: continue
save_image_dir = os.path.join(self.save_dir, tid[:8])
if not os.path.exists(save_image_dir): os.makedirs(save_image_dir)
save_image_path = os.path.join(save_image_dir, tid + '-' + name + '.png')
save_info[name] = save_image_path
image.save(save_image_path)
gr.Info(f'Save {name} to {save_image_path}', duration=15)
for name, ori_video_path in save_video_data.items():
if ori_video_path is None: continue
save_video_dir = os.path.join(self.save_dir, tid[:8])
if not os.path.exists(save_video_dir): os.makedirs(save_video_dir)
save_video_path = os.path.join(save_video_dir, tid + '-' + name + os.path.splitext(ori_video_path)[-1])
save_info[name] = save_video_path
shutil.copy(ori_video_path, save_video_path)
gr.Info(f'Save {name} to {save_video_path}', duration=15)
save_txt_path = os.path.join(self.save_dir, tid[:8], tid + '.txt')
save_info['save_info'] = save_txt_path
with open(save_txt_path, 'w') as f:
f.write(json.dumps(save_info, ensure_ascii=False))
return dict_to_markdown_table(save_info)
def change_process_type(self, video_process_type, mask_process_type, mask_aug_process_type):
frame_reference_setting_visible = False
outpainting_setting_visible = False
segment_setting = False
maskaug_setting = False
if video_process_type in ["framerefext"]:
frame_reference_setting_visible = True
elif video_process_type in ["outpainting", "outpainting_inner"]:
outpainting_setting_visible = True
if mask_process_type in ["mask_seg"]:
segment_setting = True
if mask_aug_process_type in ["maskaug", "maskaug_layout"]:
maskaug_setting = True
return gr.update(visible=frame_reference_setting_visible), gr.update(visible=outpainting_setting_visible), gr.update(visible=segment_setting), gr.update(visible=maskaug_setting)
def set_callbacks_video(self, **kwargs):
inputs = [self.input_process_video, self.input_process_image, self.video_process_type, self.outpainting_direction, self.outpainting_ratio, self.frame_reference_mode, self.frame_reference_num, self.mask_process_type, self.mask_type, self.mask_segtag, self.mask_opacity, self.mask_gray, self.mask_aug_process_type, self.mask_aug_type, self.mask_expand_ratio, self.mask_expand_iters, self.mask_layout_label]
outputs = [self.output_process_video, self.output_process_masked_video, self.output_process_video_mask]
self.process_button.click(self.process_video_data, inputs=inputs, outputs=outputs)
self.input_process_video.change(read_video_one_frame, inputs=[self.input_process_video], outputs=[self.input_process_image_show])
self.save_button.click(self.save_video_data,
inputs=[self.input_process_video, self.input_process_image, self.output_process_video, self.output_process_masked_video, self.output_process_video_mask],
outputs=[self.save_log])
process_inputs = [self.video_process_type, self.mask_process_type, self.mask_aug_process_type]
process_outputs = [self.frame_reference_setting, self.outpainting_setting, self.segment_setting, self.maskaug_setting]
self.video_process_type.change(self.change_process_type, inputs=process_inputs, outputs=process_outputs)
self.mask_process_type.change(self.change_process_type, inputs=process_inputs, outputs=process_outputs)
self.mask_aug_process_type.change(self.change_process_type, inputs=process_inputs, outputs=process_outputs)
class VACETagComposition():
def __init__(self, cfg):
self.save_dir = os.path.join(cfg.save_dir, 'composition')
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
anno_name = 'composition'
anno_cfg = copy.deepcopy(VACE_COMPOSITION_PREPROCCESS_CONFIGS[anno_name])
class_name = anno_cfg.pop("NAME")
input_params = anno_cfg.pop("INPUTS")
output_params = anno_cfg.pop("OUTPUTS")
anno_ins = getattr(annotators, class_name)(cfg=anno_cfg)
self.comp_anno_processor = {"inputs": input_params, "outputs": output_params,
"anno_ins": anno_ins}
self.process_types = ["repaint", "extension", "control"]
def create_ui_composition(self, *args, **kwargs):
with gr.Row(variant="panel"):
with gr.Column(scale=1):
self.input_process_video_1 = gr.Video(
label="input_process_video_1",
sources=['upload'],
interactive=True)
with gr.Column(scale=1):
self.input_process_video_2 = gr.Video(
label="input_process_video_1",
sources=['upload'],
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.input_process_video_mask_1 = gr.Video(
label="input_process_video_mask_1",
sources=['upload'],
interactive=True)
with gr.Column(scale=1):
with gr.Row():
self.input_process_video_mask_2 = gr.Video(
label="input_process_video_mask_2",
sources=['upload'],
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.input_process_type_1 = gr.Dropdown(
label='input_process_type_1',
choices=list(self.process_types),
value=list(self.process_types)[0],
interactive=True)
with gr.Column(scale=1):
with gr.Row():
self.input_process_type_2 = gr.Dropdown(
label='input_process_type_2',
choices=list(self.process_types),
value=list(self.process_types)[0],
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.process_button = gr.Button(
value='[1]Sample Process',
elem_classes='type_row',
elem_id='process_button',
visible=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
self.output_process_video = gr.Video(
label="output_process_video",
sources=['upload'],
interactive=False)
with gr.Column(scale=1):
self.output_process_mask = gr.Video(
label="output_process_mask",
sources=['upload'],
interactive=False)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.save_button = gr.Button(
value='[2]Sample Save',
elem_classes='type_row',
elem_id='save_button',
visible=True)
with gr.Row():
self.save_log = gr.Markdown()
def process_composition_data(self, input_process_video_1, input_process_video_2, input_process_video_mask_1, input_process_video_mask_2, input_process_type_1, input_process_type_2):
# "repaint", "extension", "control"
# ('repaint', 'repaint') / ('repaint', 'extension') / ('repaint', 'control')
# ('extension', 'extension') / ('extension', 'repaint') / ('extension', 'control')
# ('control', 'control') / ('control', 'repaint') / ('control', 'extension')
video_frames_1, video_fps_1, video_width_1, video_height_1, video_total_frames_1 = read_video_frames(input_process_video_1, use_type='cv2', info=True)
video_frames_2, video_fps_2, video_width_2, video_height_2, video_total_frames_2 = read_video_frames(input_process_video_2, use_type='cv2', info=True)
mask_frames_1, mask_fps_1, mask_width_1, mask_height_1, mask_total_frames_1 = read_video_frames(input_process_video_mask_1, use_type='cv2', info=True)
mask_frames_2, mask_fps_2, mask_width_2, mask_height_2, mask_total_frames_2 = read_video_frames(input_process_video_mask_2, use_type='cv2', info=True)
mask_frames_1 = [np.where(mask > 127, 1, 0).astype(np.uint8) for mask in mask_frames_1]
mask_frames_2 = [np.where(mask > 127, 1, 0).astype(np.uint8) for mask in mask_frames_2]
assert video_width_1 == video_width_2 == mask_width_1 == mask_width_2
assert video_height_1 == video_height_2 == mask_height_1 == mask_height_2
assert video_fps_1 == video_fps_2
output_video, output_mask = self.comp_anno_processor['anno_ins'].forward(input_process_type_1, input_process_type_2, video_frames_1, video_frames_2, mask_frames_1, mask_frames_2)
fps = video_fps_1
total_frames = len(output_video)
if output_video is not None and output_mask is not None:
with (tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as output_video_path, \
tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as mask_video_path):
output_video_writer = imageio.get_writer(output_video_path.name, codec='libx264', fps=fps, quality=8, macro_block_size=None)
mask_video_writer = imageio.get_writer(mask_video_path.name, codec='libx264', fps=fps, quality=8, macro_block_size=None)
for i in range(total_frames):
output_video_writer.append_data(output_video[i])
mask_video_writer.append_data(output_mask[i])
output_video_writer.close()
mask_video_writer.close()
return output_video_path.name, mask_video_path.name
else:
return None, None
def save_composition_data(self, video_path, mask_path):
save_video_data = {
"output_video": video_path,
"output_video_mask": mask_path
}
save_info = {}
tid = tid_maker()
for name, ori_video_path in save_video_data.items():
if ori_video_path is None: continue
save_video_dir = os.path.join(self.save_dir, tid[:8])
if not os.path.exists(save_video_dir): os.makedirs(save_video_dir)
save_video_path = os.path.join(save_video_dir, tid + '-' + name + os.path.splitext(ori_video_path)[-1])
save_info[name] = save_video_path
shutil.copy(ori_video_path, save_video_path)
gr.Info(f'Save {name} to {save_video_path}', duration=15)
save_txt_path = os.path.join(self.save_dir, tid[:8], tid + '.txt')
save_info['save_info'] = save_txt_path
with open(save_txt_path, 'w') as f:
f.write(json.dumps(save_info, ensure_ascii=False))
return dict_to_markdown_table(save_info)
def set_callbacks_composition(self, **kwargs):
inputs = [self.input_process_video_1, self.input_process_video_2, self.input_process_video_mask_1, self.input_process_video_mask_2, self.input_process_type_1, self.input_process_type_2]
outputs = [self.output_process_video, self.output_process_mask]
self.process_button.click(self.process_composition_data,
inputs=inputs,
outputs=outputs)
self.save_button.click(self.save_composition_data,
inputs=[self.output_process_video, self.output_process_mask],
outputs=[self.save_log])
class VACEVideoTool():
def __init__(self, cfg):
self.save_dir = os.path.join(cfg.save_dir, 'video_tool')
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
self.process_types = ["expand_frame", "expand_clipframe", "concat_clip", "blank_mask"]
def create_ui_video_tool(self, *args, **kwargs):
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.input_process_image_1 = gr.Image(
label="input_process_image_1",
type='pil',
format='png',
interactive=True)
with gr.Column(scale=1):
with gr.Row():
self.input_process_image_2 = gr.Image(
label="input_process_image_2",
type='pil',
format='png',
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
self.input_process_video_1 = gr.Video(
label="input_process_video_1",
sources=['upload'],
interactive=True)
with gr.Column(scale=1):
self.input_process_video_2 = gr.Video(
label="input_process_video_1",
sources=['upload'],
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.input_process_video_mask_1 = gr.Video(
label="input_process_video_mask_1",
sources=['upload'],
interactive=True)
with gr.Column(scale=1):
with gr.Row():
self.input_process_video_mask_2 = gr.Video(
label="input_process_video_mask_2",
sources=['upload'],
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.input_process_type = gr.Dropdown(
label='input_process_type',
choices=list(self.process_types),
value=list(self.process_types)[0],
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.output_height = gr.Textbox(
label='resolutions_height',
value=720,
interactive=True)
self.output_width = gr.Textbox(
label='resolutions_width',
value=1280,
interactive=True)
self.frame_rate = gr.Textbox(
label='frame_rate',
value=16,
interactive=True)
self.num_frames = gr.Textbox(
label='num_frames',
value=81,
interactive=True)
self.mask_gray = gr.Checkbox(
label='Mask Gray',
value=False,
interactive=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.process_button = gr.Button(
value='[1]Sample Process',
elem_classes='type_row',
elem_id='process_button',
visible=True)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.output_process_image = gr.Image(
label="output_process_image",
value=None,
type='pil',
image_mode='RGB',
format='png',
interactive=False)
with gr.Column(scale=1):
self.output_process_video = gr.Video(
label="output_process_video",
sources=['upload'],
interactive=False)
with gr.Column(scale=1):
self.output_process_mask = gr.Video(
label="output_process_mask",
sources=['upload'],
interactive=False)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
self.save_button = gr.Button(
value='[2]Sample Save',
elem_classes='type_row',
elem_id='save_button',
visible=True)
with gr.Row():
self.save_log = gr.Markdown()
def process_tool_data(self, input_process_image_1, input_process_image_2, input_process_video_1, input_process_video_2, input_process_video_mask_1, input_process_video_mask_2, input_process_type, output_height, output_width, frame_rate, num_frames):
output_height, output_width, frame_rate, num_frames = int(output_height), int(output_width), int(frame_rate), int(num_frames)
output_video, output_mask = None, None
if input_process_type == 'expand_frame':
assert input_process_image_1 or input_process_image_2
output_video = [np.ones((output_height, output_width, 3), dtype=np.uint8) * 127.5] * num_frames
output_mask = [np.ones((output_height, output_width), dtype=np.uint8) * 255] * num_frames
if input_process_image_1 is not None:
output_video[0] = np.array(input_process_image_1.resize((output_width, output_height)))
output_mask[0] = np.zeros((output_height, output_width))
if input_process_image_2 is not None:
output_video[-1] = np.array(input_process_image_2.resize((output_width, output_height)))
output_mask[-1] = np.zeros((output_height, output_width))
elif input_process_type == 'expand_clipframe':
video_frames, fps, width, height, total_frames = read_video_frames(input_process_video_1, use_type='cv2', info=True)
frame_rate = fps
output_video = video_frames + [np.ones((height, width, 3), dtype=np.uint8) * 127.5] * num_frames
output_mask = [np.zeros((height, width), dtype=np.uint8)] * total_frames + [np.ones((height, width), dtype=np.uint8) * 255] * num_frames
output_video[-1] = np.array(input_process_image_2.resize((width, height)))
output_mask[-1] = np.zeros((height, width))
elif input_process_type == 'concat_clip':
video_frames_1, fps_1, width_1, height_1, total_frames_1 = read_video_frames(input_process_video_1, use_type='cv2', info=True)
video_frames_2, fps_2, width_2, height_2, total_frames_2 = read_video_frames(input_process_video_2, use_type='cv2', info=True)
if width_1 != width_2 or height_1 != height_2:
video_frames_2 = [np.array(frame.resize((width_1, height_1))) for frame in video_frames_2]
frame_rate = fps_1
output_video = video_frames_1 + video_frames_2
output_mask = [np.ones((height_1, width_1), dtype=np.uint8) * 255] * len(output_video)
elif input_process_type == 'blank_mask':
output_mask = [np.ones((output_height, output_width), dtype=np.uint8) * 255] * num_frames
else:
raise NotImplementedError
output_image_path = None
if output_video is not None:
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as output_path:
flag = save_one_video(videos=output_video, file_path=output_path.name, fps=frame_rate)
output_video_path = output_path.name if flag else None
else:
output_video_path = None
if output_mask is not None:
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as output_path:
flag = save_one_video(videos=output_mask, file_path=output_path.name, fps=frame_rate)
output_mask_path = output_path.name if flag else None
else:
output_mask_path = None
return output_image_path, output_video_path, output_mask_path
def save_tool_data(self, image_path, video_path, mask_path):
save_video_data = {
"output_video": video_path,
"output_video_mask": mask_path
}
save_info = {}
tid = tid_maker()
for name, ori_video_path in save_video_data.items():
if ori_video_path is None: continue
save_video_path = os.path.join(self.save_dir, tid[:8], tid + '-' + name + os.path.splitext(ori_video_path)[-1])
save_info[name] = save_video_path
shutil.copy(ori_video_path, save_video_path)
gr.Info(f'Save {name} to {save_video_path}', duration=15)
save_txt_path = os.path.join(self.save_dir, tid[:8], tid + '.txt')
save_info['save_info'] = save_txt_path
with open(save_txt_path, 'w') as f:
f.write(json.dumps(save_info, ensure_ascii=False))
return dict_to_markdown_table(save_info)
def set_callbacks_video_tool(self, **kwargs):
inputs = [self.input_process_image_1, self.input_process_image_2, self.input_process_video_1, self.input_process_video_2, self.input_process_video_mask_1, self.input_process_video_mask_2, self.input_process_type, self.output_height, self.output_width, self.frame_rate, self.num_frames]
outputs = [self.output_process_image, self.output_process_video, self.output_process_mask]
self.process_button.click(self.process_tool_data,
inputs=inputs,
outputs=outputs)
self.save_button.click(self.save_tool_data,
inputs=[self.output_process_image, self.output_process_video, self.output_process_mask],
outputs=[self.save_log])
class VACETag():
def __init__(self, cfg):
self.cfg = cfg
self.save_dir = cfg.save_dir
self.current_index = 0
self.loaded_data = {}
self.vace_video_tag = VACEVideoTag(cfg)
self.vace_image_tag = VACEImageTag(cfg)
self.vace_tag_composition = VACETagComposition(cfg)
# self.vace_video_tool = VACEVideoTool(cfg)
def create_ui(self, *args, **kwargs):
gr.Markdown("""
<div style="text-align: center; font-size: 24px; font-weight: bold; margin-bottom: 15px;">
<a href="https://ali-vilab.github.io/VACE-Page/" style="text-decoration: none; color: inherit;">VACE Preprocessor</a>
</div>
""")
with gr.Tabs(elem_id='VACE Tag') as vace_tab:
with gr.TabItem('VACE Video Tag', id=1, elem_id='video_tab'):
self.vace_video_tag.create_ui_video(*args, **kwargs)
with gr.TabItem('VACE Image Tag', id=2, elem_id='image_tab'):
self.vace_image_tag.create_ui_image(*args, **kwargs)
with gr.TabItem('VACE Composition Tag', id=3, elem_id='composition_tab'):
self.vace_tag_composition.create_ui_composition(*args, **kwargs)
# with gr.TabItem('VACE Video Tool', id=4, elem_id='video_tool_tab'):
# self.vace_video_tool.create_ui_video_tool(*args, **kwargs)
def set_callbacks(self, **kwargs):
self.vace_video_tag.set_callbacks_video(**kwargs)
self.vace_image_tag.set_callbacks_image(**kwargs)
self.vace_tag_composition.set_callbacks_composition(**kwargs)
# self.vace_video_tool.set_callbacks_video_tool(**kwargs)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Argparser for VACE-Preprocessor:\n')
parser.add_argument('--server_port', dest='server_port', help='', default=7860)
parser.add_argument('--server_name', dest='server_name', help='', default='0.0.0.0')
parser.add_argument('--root_path', dest='root_path', help='', default=None)
parser.add_argument('--save_dir', dest='save_dir', help='', default='cache')
args = parser.parse_args()
if not os.path.exists(args.save_dir):
os.makedirs(args.save_dir, exist_ok=True)
vace_tag = VACETag(args)
with gr.Blocks() as demo:
vace_tag.create_ui()
vace_tag.set_callbacks()
demo.queue(status_update_rate=1).launch(server_name=args.server_name,
server_port=int(args.server_port),
show_api=False, show_error=True,
debug=True)