|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | import argparse | 
					
						
						|  | import glob | 
					
						
						|  | import logging | 
					
						
						|  | import os | 
					
						
						|  | import sys | 
					
						
						|  | from typing import Any, ClassVar, Dict, List | 
					
						
						|  | import torch | 
					
						
						|  |  | 
					
						
						|  | from detectron2.config import CfgNode, get_cfg | 
					
						
						|  | from detectron2.data.detection_utils import read_image | 
					
						
						|  | from detectron2.engine.defaults import DefaultPredictor | 
					
						
						|  | from detectron2.structures.instances import Instances | 
					
						
						|  | from detectron2.utils.logger import setup_logger | 
					
						
						|  |  | 
					
						
						|  | from densepose import add_densepose_config | 
					
						
						|  | from densepose.structures import DensePoseChartPredictorOutput, DensePoseEmbeddingPredictorOutput | 
					
						
						|  | from densepose.utils.logger import verbosity_to_level | 
					
						
						|  | from densepose.vis.base import CompoundVisualizer | 
					
						
						|  | from densepose.vis.bounding_box import ScoredBoundingBoxVisualizer | 
					
						
						|  | from densepose.vis.densepose_outputs_vertex import ( | 
					
						
						|  | DensePoseOutputsTextureVisualizer, | 
					
						
						|  | DensePoseOutputsVertexVisualizer, | 
					
						
						|  | get_texture_atlases, | 
					
						
						|  | ) | 
					
						
						|  | from densepose.vis.densepose_results import ( | 
					
						
						|  | DensePoseResultsContourVisualizer, | 
					
						
						|  | DensePoseResultsFineSegmentationVisualizer, | 
					
						
						|  | DensePoseResultsUVisualizer, | 
					
						
						|  | DensePoseResultsVVisualizer, | 
					
						
						|  | ) | 
					
						
						|  | from densepose.vis.densepose_results_textures import ( | 
					
						
						|  | DensePoseResultsVisualizerWithTexture, | 
					
						
						|  | get_texture_atlas, | 
					
						
						|  | ) | 
					
						
						|  | from densepose.vis.extractor import ( | 
					
						
						|  | CompoundExtractor, | 
					
						
						|  | DensePoseOutputsExtractor, | 
					
						
						|  | DensePoseResultExtractor, | 
					
						
						|  | create_extractor, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | DOC = """Apply Net - a tool to print / visualize DensePose results | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | LOGGER_NAME = "apply_net" | 
					
						
						|  | logger = logging.getLogger(LOGGER_NAME) | 
					
						
						|  |  | 
					
						
						|  | _ACTION_REGISTRY: Dict[str, "Action"] = {} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class Action: | 
					
						
						|  | @classmethod | 
					
						
						|  | def add_arguments(cls: type, parser: argparse.ArgumentParser): | 
					
						
						|  | parser.add_argument( | 
					
						
						|  | "-v", | 
					
						
						|  | "--verbosity", | 
					
						
						|  | action="count", | 
					
						
						|  | help="Verbose mode. Multiple -v options increase the verbosity.", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def register_action(cls: type): | 
					
						
						|  | """ | 
					
						
						|  | Decorator for action classes to automate action registration | 
					
						
						|  | """ | 
					
						
						|  | global _ACTION_REGISTRY | 
					
						
						|  | _ACTION_REGISTRY[cls.COMMAND] = cls | 
					
						
						|  | return cls | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class InferenceAction(Action): | 
					
						
						|  | @classmethod | 
					
						
						|  | def add_arguments(cls: type, parser: argparse.ArgumentParser): | 
					
						
						|  | super(InferenceAction, cls).add_arguments(parser) | 
					
						
						|  | parser.add_argument("cfg", metavar="<config>", help="Config file") | 
					
						
						|  | parser.add_argument("model", metavar="<model>", help="Model file") | 
					
						
						|  | parser.add_argument( | 
					
						
						|  | "--opts", | 
					
						
						|  | help="Modify config options using the command-line 'KEY VALUE' pairs", | 
					
						
						|  | default=[], | 
					
						
						|  | nargs=argparse.REMAINDER, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def execute(cls: type, args: argparse.Namespace, human_img): | 
					
						
						|  | logger.info(f"Loading config from {args.cfg}") | 
					
						
						|  | opts = [] | 
					
						
						|  | cfg = cls.setup_config(args.cfg, args.model, args, opts) | 
					
						
						|  | logger.info(f"Loading model from {args.model}") | 
					
						
						|  | predictor = DefaultPredictor(cfg) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | context = cls.create_context(args, cfg) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with torch.no_grad(): | 
					
						
						|  | outputs = predictor(human_img)["instances"] | 
					
						
						|  | out_pose = cls.execute_on_outputs(context, {"image": human_img}, outputs) | 
					
						
						|  | cls.postexecute(context) | 
					
						
						|  | return out_pose | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def setup_config( | 
					
						
						|  | cls: type, config_fpath: str, model_fpath: str, args: argparse.Namespace, opts: List[str] | 
					
						
						|  | ): | 
					
						
						|  | cfg = get_cfg() | 
					
						
						|  | add_densepose_config(cfg) | 
					
						
						|  | cfg.merge_from_file(config_fpath) | 
					
						
						|  | cfg.merge_from_list(args.opts) | 
					
						
						|  | if opts: | 
					
						
						|  | cfg.merge_from_list(opts) | 
					
						
						|  | cfg.MODEL.WEIGHTS = model_fpath | 
					
						
						|  | cfg.freeze() | 
					
						
						|  | return cfg | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def _get_input_file_list(cls: type, input_spec: str): | 
					
						
						|  | if os.path.isdir(input_spec): | 
					
						
						|  | file_list = [ | 
					
						
						|  | os.path.join(input_spec, fname) | 
					
						
						|  | for fname in os.listdir(input_spec) | 
					
						
						|  | if os.path.isfile(os.path.join(input_spec, fname)) | 
					
						
						|  | ] | 
					
						
						|  | elif os.path.isfile(input_spec): | 
					
						
						|  | file_list = [input_spec] | 
					
						
						|  | else: | 
					
						
						|  | file_list = glob.glob(input_spec) | 
					
						
						|  | return file_list | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @register_action | 
					
						
						|  | class DumpAction(InferenceAction): | 
					
						
						|  | """ | 
					
						
						|  | Dump action that outputs results to a pickle file | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | COMMAND: ClassVar[str] = "dump" | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def add_parser(cls: type, subparsers: argparse._SubParsersAction): | 
					
						
						|  | parser = subparsers.add_parser(cls.COMMAND, help="Dump model outputs to a file.") | 
					
						
						|  | cls.add_arguments(parser) | 
					
						
						|  | parser.set_defaults(func=cls.execute) | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def add_arguments(cls: type, parser: argparse.ArgumentParser): | 
					
						
						|  | super(DumpAction, cls).add_arguments(parser) | 
					
						
						|  | parser.add_argument( | 
					
						
						|  | "--output", | 
					
						
						|  | metavar="<dump_file>", | 
					
						
						|  | default="results.pkl", | 
					
						
						|  | help="File name to save dump to", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def execute_on_outputs( | 
					
						
						|  | cls: type, context: Dict[str, Any], entry: Dict[str, Any], outputs: Instances | 
					
						
						|  | ): | 
					
						
						|  | image_fpath = entry["file_name"] | 
					
						
						|  | logger.info(f"Processing {image_fpath}") | 
					
						
						|  | result = {"file_name": image_fpath} | 
					
						
						|  | if outputs.has("scores"): | 
					
						
						|  | result["scores"] = outputs.get("scores").cpu() | 
					
						
						|  | if outputs.has("pred_boxes"): | 
					
						
						|  | result["pred_boxes_XYXY"] = outputs.get("pred_boxes").tensor.cpu() | 
					
						
						|  | if outputs.has("pred_densepose"): | 
					
						
						|  | if isinstance(outputs.pred_densepose, DensePoseChartPredictorOutput): | 
					
						
						|  | extractor = DensePoseResultExtractor() | 
					
						
						|  | elif isinstance(outputs.pred_densepose, DensePoseEmbeddingPredictorOutput): | 
					
						
						|  | extractor = DensePoseOutputsExtractor() | 
					
						
						|  | result["pred_densepose"] = extractor(outputs)[0] | 
					
						
						|  | context["results"].append(result) | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def create_context(cls: type, args: argparse.Namespace, cfg: CfgNode): | 
					
						
						|  | context = {"results": [], "out_fname": args.output} | 
					
						
						|  | return context | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def postexecute(cls: type, context: Dict[str, Any]): | 
					
						
						|  | out_fname = context["out_fname"] | 
					
						
						|  | out_dir = os.path.dirname(out_fname) | 
					
						
						|  | if len(out_dir) > 0 and not os.path.exists(out_dir): | 
					
						
						|  | os.makedirs(out_dir) | 
					
						
						|  | with open(out_fname, "wb") as hFile: | 
					
						
						|  | torch.save(context["results"], hFile) | 
					
						
						|  | logger.info(f"Output saved to {out_fname}") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @register_action | 
					
						
						|  | class ShowAction(InferenceAction): | 
					
						
						|  | """ | 
					
						
						|  | Show action that visualizes selected entries on an image | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | COMMAND: ClassVar[str] = "show" | 
					
						
						|  | VISUALIZERS: ClassVar[Dict[str, object]] = { | 
					
						
						|  | "dp_contour": DensePoseResultsContourVisualizer, | 
					
						
						|  | "dp_segm": DensePoseResultsFineSegmentationVisualizer, | 
					
						
						|  | "dp_u": DensePoseResultsUVisualizer, | 
					
						
						|  | "dp_v": DensePoseResultsVVisualizer, | 
					
						
						|  | "dp_iuv_texture": DensePoseResultsVisualizerWithTexture, | 
					
						
						|  | "dp_cse_texture": DensePoseOutputsTextureVisualizer, | 
					
						
						|  | "dp_vertex": DensePoseOutputsVertexVisualizer, | 
					
						
						|  | "bbox": ScoredBoundingBoxVisualizer, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def add_parser(cls: type, subparsers: argparse._SubParsersAction): | 
					
						
						|  | parser = subparsers.add_parser(cls.COMMAND, help="Visualize selected entries") | 
					
						
						|  | cls.add_arguments(parser) | 
					
						
						|  | parser.set_defaults(func=cls.execute) | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def add_arguments(cls: type, parser: argparse.ArgumentParser): | 
					
						
						|  | super(ShowAction, cls).add_arguments(parser) | 
					
						
						|  | parser.add_argument( | 
					
						
						|  | "visualizations", | 
					
						
						|  | metavar="<visualizations>", | 
					
						
						|  | help="Comma separated list of visualizations, possible values: " | 
					
						
						|  | "[{}]".format(",".join(sorted(cls.VISUALIZERS.keys()))), | 
					
						
						|  | ) | 
					
						
						|  | parser.add_argument( | 
					
						
						|  | "--min_score", | 
					
						
						|  | metavar="<score>", | 
					
						
						|  | default=0.8, | 
					
						
						|  | type=float, | 
					
						
						|  | help="Minimum detection score to visualize", | 
					
						
						|  | ) | 
					
						
						|  | parser.add_argument( | 
					
						
						|  | "--nms_thresh", metavar="<threshold>", default=None, type=float, help="NMS threshold" | 
					
						
						|  | ) | 
					
						
						|  | parser.add_argument( | 
					
						
						|  | "--texture_atlas", | 
					
						
						|  | metavar="<texture_atlas>", | 
					
						
						|  | default=None, | 
					
						
						|  | help="Texture atlas file (for IUV texture transfer)", | 
					
						
						|  | ) | 
					
						
						|  | parser.add_argument( | 
					
						
						|  | "--texture_atlases_map", | 
					
						
						|  | metavar="<texture_atlases_map>", | 
					
						
						|  | default=None, | 
					
						
						|  | help="JSON string of a dict containing texture atlas files for each mesh", | 
					
						
						|  | ) | 
					
						
						|  | parser.add_argument( | 
					
						
						|  | "--output", | 
					
						
						|  | metavar="<image_file>", | 
					
						
						|  | default="outputres.png", | 
					
						
						|  | help="File name to save output to", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def setup_config( | 
					
						
						|  | cls: type, config_fpath: str, model_fpath: str, args: argparse.Namespace, opts: List[str] | 
					
						
						|  | ): | 
					
						
						|  | opts.append("MODEL.ROI_HEADS.SCORE_THRESH_TEST") | 
					
						
						|  | opts.append(str(args.min_score)) | 
					
						
						|  | if args.nms_thresh is not None: | 
					
						
						|  | opts.append("MODEL.ROI_HEADS.NMS_THRESH_TEST") | 
					
						
						|  | opts.append(str(args.nms_thresh)) | 
					
						
						|  | cfg = super(ShowAction, cls).setup_config(config_fpath, model_fpath, args, opts) | 
					
						
						|  | return cfg | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def execute_on_outputs( | 
					
						
						|  | cls: type, context: Dict[str, Any], entry: Dict[str, Any], outputs: Instances | 
					
						
						|  | ): | 
					
						
						|  | import cv2 | 
					
						
						|  | import numpy as np | 
					
						
						|  | visualizer = context["visualizer"] | 
					
						
						|  | extractor = context["extractor"] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | image = cv2.cvtColor(entry["image"], cv2.COLOR_BGR2GRAY) | 
					
						
						|  | image = np.tile(image[:, :, np.newaxis], [1, 1, 3]) | 
					
						
						|  | data = extractor(outputs) | 
					
						
						|  | image_vis = visualizer.visualize(image, data) | 
					
						
						|  |  | 
					
						
						|  | return image_vis | 
					
						
						|  | entry_idx = context["entry_idx"] + 1 | 
					
						
						|  | out_fname = './image-densepose/' + image_fpath.split('/')[-1] | 
					
						
						|  | out_dir = './image-densepose' | 
					
						
						|  | out_dir = os.path.dirname(out_fname) | 
					
						
						|  | if len(out_dir) > 0 and not os.path.exists(out_dir): | 
					
						
						|  | os.makedirs(out_dir) | 
					
						
						|  | cv2.imwrite(out_fname, image_vis) | 
					
						
						|  | logger.info(f"Output saved to {out_fname}") | 
					
						
						|  | context["entry_idx"] += 1 | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def postexecute(cls: type, context: Dict[str, Any]): | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def _get_out_fname(cls: type, entry_idx: int, fname_base: str): | 
					
						
						|  | base, ext = os.path.splitext(fname_base) | 
					
						
						|  | return base + ".{0:04d}".format(entry_idx) + ext | 
					
						
						|  |  | 
					
						
						|  | @classmethod | 
					
						
						|  | def create_context(cls: type, args: argparse.Namespace, cfg: CfgNode) -> Dict[str, Any]: | 
					
						
						|  | vis_specs = args.visualizations.split(",") | 
					
						
						|  | visualizers = [] | 
					
						
						|  | extractors = [] | 
					
						
						|  | for vis_spec in vis_specs: | 
					
						
						|  | texture_atlas = get_texture_atlas(args.texture_atlas) | 
					
						
						|  | texture_atlases_dict = get_texture_atlases(args.texture_atlases_map) | 
					
						
						|  | vis = cls.VISUALIZERS[vis_spec]( | 
					
						
						|  | cfg=cfg, | 
					
						
						|  | texture_atlas=texture_atlas, | 
					
						
						|  | texture_atlases_dict=texture_atlases_dict, | 
					
						
						|  | ) | 
					
						
						|  | visualizers.append(vis) | 
					
						
						|  | extractor = create_extractor(vis) | 
					
						
						|  | extractors.append(extractor) | 
					
						
						|  | visualizer = CompoundVisualizer(visualizers) | 
					
						
						|  | extractor = CompoundExtractor(extractors) | 
					
						
						|  | context = { | 
					
						
						|  | "extractor": extractor, | 
					
						
						|  | "visualizer": visualizer, | 
					
						
						|  | "out_fname": args.output, | 
					
						
						|  | "entry_idx": 0, | 
					
						
						|  | } | 
					
						
						|  | return context | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def create_argument_parser() -> argparse.ArgumentParser: | 
					
						
						|  | parser = argparse.ArgumentParser( | 
					
						
						|  | description=DOC, | 
					
						
						|  | formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=120), | 
					
						
						|  | ) | 
					
						
						|  | parser.set_defaults(func=lambda _: parser.print_help(sys.stdout)) | 
					
						
						|  | subparsers = parser.add_subparsers(title="Actions") | 
					
						
						|  | for _, action in _ACTION_REGISTRY.items(): | 
					
						
						|  | action.add_parser(subparsers) | 
					
						
						|  | return parser | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def main(): | 
					
						
						|  | parser = create_argument_parser() | 
					
						
						|  | args = parser.parse_args() | 
					
						
						|  | verbosity = getattr(args, "verbosity", None) | 
					
						
						|  | global logger | 
					
						
						|  | logger = setup_logger(name=LOGGER_NAME) | 
					
						
						|  | logger.setLevel(verbosity_to_level(verbosity)) | 
					
						
						|  | args.func(args) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if __name__ == "__main__": | 
					
						
						|  | main() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  |