Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # Copyright (c) Alibaba, Inc. and its affiliates. | |
| import os | |
| import copy | |
| import time | |
| import inspect | |
| import argparse | |
| import importlib | |
| from configs import VACE_PREPROCCESS_CONFIGS | |
| import annotators | |
| from annotators.utils import read_image, read_mask, read_video_frames, save_one_video, save_one_image | |
| def parse_bboxes(s): | |
| bboxes = [] | |
| for bbox_str in s.split(): | |
| coords = list(map(float, bbox_str.split(','))) | |
| if len(coords) != 4: | |
| raise ValueError(f"The bounding box requires 4 values, but the input is {len(coords)}.") | |
| bboxes.append(coords) | |
| return bboxes | |
| def validate_args(args): | |
| assert args.task in VACE_PREPROCCESS_CONFIGS, f"Unsupport task: [{args.task}]" | |
| assert args.video is not None or args.image is not None or args.bbox is not None, "Please specify the video or image or bbox." | |
| return args | |
| def get_parser(): | |
| parser = argparse.ArgumentParser( | |
| description="Data processing carried out by VACE" | |
| ) | |
| parser.add_argument( | |
| "--task", | |
| type=str, | |
| default='', | |
| choices=list(VACE_PREPROCCESS_CONFIGS.keys()), | |
| help="The task to run.") | |
| parser.add_argument( | |
| "--video", | |
| type=str, | |
| default=None, | |
| help="The path of the videos to be processed, separated by commas if there are multiple.") | |
| parser.add_argument( | |
| "--image", | |
| type=str, | |
| default=None, | |
| help="The path of the images to be processed, separated by commas if there are multiple.") | |
| parser.add_argument( | |
| "--mode", | |
| type=str, | |
| default=None, | |
| help="The specific mode of the task, such as firstframe, mask, bboxtrack, label...") | |
| parser.add_argument( | |
| "--mask", | |
| type=str, | |
| default=None, | |
| help="The path of the mask images to be processed, separated by commas if there are multiple.") | |
| parser.add_argument( | |
| "--bbox", | |
| type=parse_bboxes, | |
| default=None, | |
| help="Enter the bounding box, with each four numbers separated by commas (x1, y1, x2, y2), and each pair separated by a space." | |
| ) | |
| parser.add_argument( | |
| "--label", | |
| type=str, | |
| default=None, | |
| help="Enter the label to be processed, separated by commas if there are multiple." | |
| ) | |
| parser.add_argument( | |
| "--caption", | |
| type=str, | |
| default=None, | |
| help="Enter the caption to be processed." | |
| ) | |
| parser.add_argument( | |
| "--direction", | |
| type=str, | |
| default=None, | |
| help="The direction of outpainting includes any combination of left, right, up, down, with multiple combinations separated by commas.") | |
| parser.add_argument( | |
| "--expand_ratio", | |
| type=float, | |
| default=None, | |
| help="The outpainting's outward expansion ratio.") | |
| parser.add_argument( | |
| "--expand_num", | |
| type=int, | |
| default=None, | |
| help="The number of frames extended by the extension task.") | |
| parser.add_argument( | |
| "--maskaug_mode", | |
| type=str, | |
| default=None, | |
| help="The mode of mask augmentation, such as original, original_expand, hull, hull_expand, bbox, bbox_expand.") | |
| parser.add_argument( | |
| "--maskaug_ratio", | |
| type=float, | |
| default=None, | |
| help="The ratio of mask augmentation.") | |
| parser.add_argument( | |
| "--pre_save_dir", | |
| type=str, | |
| default=None, | |
| help="The path to save the processed data.") | |
| parser.add_argument( | |
| "--save_fps", | |
| type=int, | |
| default=16, | |
| help="The fps to save the processed data.") | |
| return parser | |
| def preproccess(): | |
| pass | |
| def proccess(): | |
| pass | |
| def postproccess(): | |
| pass | |
| def main(args): | |
| args = argparse.Namespace(**args) if isinstance(args, dict) else args | |
| args = validate_args(args) | |
| task_name = args.task | |
| video_path = args.video | |
| image_path = args.image | |
| mask_path = args.mask | |
| bbox = args.bbox | |
| caption = args.caption | |
| label = args.label | |
| save_fps = args.save_fps | |
| # init class | |
| task_cfg = copy.deepcopy(VACE_PREPROCCESS_CONFIGS)[task_name] | |
| class_name = task_cfg.pop("NAME") | |
| input_params = task_cfg.pop("INPUTS") | |
| output_params = task_cfg.pop("OUTPUTS") | |
| # input data | |
| fps = None | |
| input_data = copy.deepcopy(input_params) | |
| if 'video' in input_params: | |
| assert video_path is not None, "Please set video or check configs" | |
| frames, fps, width, height, num_frames = read_video_frames(video_path.split(",")[0], use_type='cv2', info=True) | |
| assert frames is not None, "Video read error" | |
| input_data['frames'] = frames | |
| input_data['video'] = video_path | |
| if 'frames' in input_params: | |
| assert video_path is not None, "Please set video or check configs" | |
| frames, fps, width, height, num_frames = read_video_frames(video_path.split(",")[0], use_type='cv2', info=True) | |
| assert frames is not None, "Video read error" | |
| input_data['frames'] = frames | |
| if 'frames_2' in input_params: | |
| # assert video_path is not None and len(video_path.split(",")[1]) >= 2, "Please set two videos or check configs" | |
| if len(video_path.split(",")) >= 2: | |
| frames, fps, width, height, num_frames = read_video_frames(video_path.split(",")[1], use_type='cv2', info=True) | |
| assert frames is not None, "Video read error" | |
| input_data['frames_2'] = frames | |
| if 'image' in input_params: | |
| assert image_path is not None, "Please set image or check configs" | |
| image, width, height = read_image(image_path.split(",")[0], use_type='pil', info=True) | |
| assert image is not None, "Image read error" | |
| input_data['image'] = image | |
| if 'image_2' in input_params: | |
| # assert image_path is not None and len(image_path.split(",")[1]) >= 2, "Please set two images or check configs" | |
| if len(image_path.split(",")) >= 2: | |
| image, width, height = read_image(image_path.split(",")[1], use_type='pil', info=True) | |
| assert image is not None, "Image read error" | |
| input_data['image_2'] = image | |
| if 'images' in input_params: | |
| assert image_path is not None, "Please set image or check configs" | |
| images = [ read_image(path, use_type='pil', info=True)[0] for path in image_path.split(",") ] | |
| input_data['images'] = images | |
| if 'mask' in input_params: | |
| # assert mask_path is not None, "Please set mask or check configs" | |
| if mask_path is not None: | |
| mask, width, height = read_mask(mask_path.split(",")[0], use_type='pil', info=True) | |
| assert mask is not None, "Mask read error" | |
| input_data['mask'] = mask | |
| if 'bbox' in input_params: | |
| # assert bbox is not None, "Please set bbox" | |
| if bbox is not None: | |
| input_data['bbox'] = bbox[0] if len(bbox) == 1 else bbox | |
| if 'label' in input_params: | |
| # assert label is not None, "Please set label or check configs" | |
| input_data['label'] = label.split(',') if label is not None else None | |
| if 'caption' in input_params: | |
| # assert caption is not None, "Please set caption or check configs" | |
| input_data['caption'] = caption | |
| if 'mode' in input_params: | |
| input_data['mode'] = args.mode | |
| if 'direction' in input_params: | |
| if args.direction is not None: | |
| input_data['direction'] = args.direction.split(',') | |
| if 'expand_ratio' in input_params: | |
| if args.expand_ratio is not None: | |
| input_data['expand_ratio'] = args.expand_ratio | |
| if 'expand_num' in input_params: | |
| # assert args.expand_num is not None, "Please set expand_num or check configs" | |
| if args.expand_num is not None: | |
| input_data['expand_num'] = args.expand_num | |
| if 'mask_cfg' in input_params: | |
| # assert args.maskaug_mode is not None and args.maskaug_ratio is not None, "Please set maskaug_mode and maskaug_ratio or check configs" | |
| if args.maskaug_mode is not None: | |
| if args.maskaug_ratio is not None: | |
| input_data['mask_cfg'] = {"mode": args.maskaug_mode, "kwargs": {'expand_ratio': args.maskaug_ratio, 'expand_iters': 5}} | |
| else: | |
| input_data['mask_cfg'] = {"mode": args.maskaug_mode} | |
| # processing | |
| pre_ins = getattr(annotators, class_name)(cfg=task_cfg, device=f'cuda:{os.getenv("RANK", 0)}') | |
| results = pre_ins.forward(**input_data) | |
| # output data | |
| save_fps = fps if fps is not None else save_fps | |
| if args.pre_save_dir is None: | |
| pre_save_dir = os.path.join('processed', task_name, time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(time.time()))) | |
| else: | |
| pre_save_dir = args.pre_save_dir | |
| if not os.path.exists(pre_save_dir): | |
| os.makedirs(pre_save_dir) | |
| ret_data = {} | |
| if 'frames' in output_params: | |
| frames = results['frames'] if isinstance(results, dict) else results | |
| if frames is not None: | |
| save_path = os.path.join(pre_save_dir, f'src_video-{task_name}.mp4') | |
| save_one_video(save_path, frames, fps=save_fps) | |
| print(f"Save frames result to {save_path}") | |
| ret_data['src_video'] = save_path | |
| if 'masks' in output_params: | |
| frames = results['masks'] if isinstance(results, dict) else results | |
| if frames is not None: | |
| save_path = os.path.join(pre_save_dir, f'src_mask-{task_name}.mp4') | |
| save_one_video(save_path, frames, fps=save_fps) | |
| print(f"Save frames result to {save_path}") | |
| ret_data['src_mask'] = save_path | |
| if 'image' in output_params: | |
| ret_image = results['image'] if isinstance(results, dict) else results | |
| if ret_image is not None: | |
| save_path = os.path.join(pre_save_dir, f'src_ref_image-{task_name}.png') | |
| save_one_image(save_path, ret_image, use_type='pil') | |
| print(f"Save image result to {save_path}") | |
| ret_data['src_ref_images'] = save_path | |
| if 'images' in output_params: | |
| ret_images = results['images'] if isinstance(results, dict) else results | |
| if ret_images is not None: | |
| src_ref_images = [] | |
| for i, img in enumerate(ret_images): | |
| if img is not None: | |
| save_path = os.path.join(pre_save_dir, f'src_ref_image_{i}-{task_name}.png') | |
| save_one_image(save_path, img, use_type='pil') | |
| print(f"Save image result to {save_path}") | |
| src_ref_images.append(save_path) | |
| if len(src_ref_images) > 0: | |
| ret_data['src_ref_images'] = ','.join(src_ref_images) | |
| else: | |
| ret_data['src_ref_images'] = None | |
| if 'mask' in output_params: | |
| ret_image = results['mask'] if isinstance(results, dict) else results | |
| if ret_image is not None: | |
| save_path = os.path.join(pre_save_dir, f'src_mask-{task_name}.png') | |
| save_one_image(save_path, ret_image, use_type='pil') | |
| print(f"Save mask result to {save_path}") | |
| return ret_data | |
| if __name__ == "__main__": | |
| args = get_parser().parse_args() | |
| main(args) | |