Spaces:
Runtime error
Runtime error
| from argparse import Namespace | |
| import os, sys | |
| import torch | |
| import cv2 | |
| from pathlib import Path | |
| from .base import Viz | |
| from src.utils.metrics import compute_symmetrical_epipolar_errors, compute_pose_errors | |
| patch2pix_path = Path(__file__).parent / "../../third_party/patch2pix" | |
| sys.path.append(str(patch2pix_path)) | |
| from third_party.patch2pix.utils.eval.model_helper import load_model, estimate_matches | |
| class VizPatch2Pix(Viz): | |
| def __init__(self, args): | |
| super().__init__() | |
| if type(args) == dict: | |
| args = Namespace(**args) | |
| self.imsize = args.imsize | |
| self.match_threshold = args.match_threshold | |
| self.ksize = args.ksize | |
| self.model = load_model(args.ckpt, method="patch2pix") | |
| self.name = "Patch2Pix" | |
| print(f"Initialize {self.name} with image size {self.imsize}") | |
| def match_and_draw( | |
| self, | |
| data_dict, | |
| root_dir=None, | |
| ground_truth=False, | |
| measure_time=False, | |
| viz_matches=True, | |
| ): | |
| img_name0, img_name1 = list(zip(*data_dict["pair_names"]))[0] | |
| path_img0 = os.path.join(root_dir, img_name0) | |
| path_img1 = os.path.join(root_dir, img_name1) | |
| img0, img1 = cv2.imread(path_img0), cv2.imread(path_img1) | |
| return_m_upscale = True | |
| if str(data_dict["dataset_name"][0]).lower() == "scannet": | |
| # self.imsize = 640 | |
| img0 = cv2.resize(img0, tuple(self.imsize)) # (640, 480)) | |
| img1 = cv2.resize(img1, tuple(self.imsize)) # (640, 480)) | |
| return_m_upscale = False | |
| outputs = estimate_matches( | |
| self.model, | |
| path_img0, | |
| path_img1, | |
| ksize=self.ksize, | |
| io_thres=self.match_threshold, | |
| eval_type="fine", | |
| imsize=self.imsize, | |
| return_upscale=return_m_upscale, | |
| measure_time=measure_time, | |
| ) | |
| if measure_time: | |
| self.time_stats.append(outputs[-1]) | |
| matches, mconf = outputs[0], outputs[1] | |
| kpts0 = matches[:, :2] | |
| kpts1 = matches[:, 2:4] | |
| if viz_matches: | |
| saved_name = "_".join( | |
| [ | |
| img_name0.split("/")[-1].split(".")[0], | |
| img_name1.split("/")[-1].split(".")[0], | |
| ] | |
| ) | |
| folder_matches = os.path.join(root_dir, "{}_viz_matches".format(self.name)) | |
| if not os.path.exists(folder_matches): | |
| os.makedirs(folder_matches) | |
| path_to_save_matches = os.path.join( | |
| folder_matches, "{}.png".format(saved_name) | |
| ) | |
| if ground_truth: | |
| data_dict["mkpts0_f"] = ( | |
| torch.from_numpy(matches[:, :2]).float().to(self.device) | |
| ) | |
| data_dict["mkpts1_f"] = ( | |
| torch.from_numpy(matches[:, 2:4]).float().to(self.device) | |
| ) | |
| data_dict["m_bids"] = torch.zeros( | |
| matches.shape[0], device=self.device, dtype=torch.float32 | |
| ) | |
| compute_symmetrical_epipolar_errors( | |
| data_dict | |
| ) # compute epi_errs for each match | |
| compute_pose_errors( | |
| data_dict | |
| ) # compute R_errs, t_errs, pose_errs for each pair | |
| epi_errors = data_dict["epi_errs"].cpu().numpy() | |
| R_errors, t_errors = data_dict["R_errs"][0], data_dict["t_errs"][0] | |
| self.draw_matches( | |
| kpts0, | |
| kpts1, | |
| img0, | |
| img1, | |
| epi_errors, | |
| path=path_to_save_matches, | |
| R_errs=R_errors, | |
| t_errs=t_errors, | |
| ) | |
| rel_pair_names = list(zip(*data_dict["pair_names"])) | |
| bs = data_dict["image0"].size(0) | |
| metrics = { | |
| # to filter duplicate pairs caused by DistributedSampler | |
| "identifiers": ["#".join(rel_pair_names[b]) for b in range(bs)], | |
| "epi_errs": [ | |
| data_dict["epi_errs"][data_dict["m_bids"] == b].cpu().numpy() | |
| for b in range(bs) | |
| ], | |
| "R_errs": data_dict["R_errs"], | |
| "t_errs": data_dict["t_errs"], | |
| "inliers": data_dict["inliers"], | |
| } | |
| self.eval_stats.append({"metrics": metrics}) | |
| else: | |
| m_conf = 1 - mconf | |
| self.draw_matches( | |
| kpts0, | |
| kpts1, | |
| img0, | |
| img1, | |
| m_conf, | |
| path=path_to_save_matches, | |
| conf_thr=0.4, | |
| ) | |