Spaces:
Runtime error
Runtime error
| from argparse import Namespace | |
| import os | |
| import torch | |
| import cv2 | |
| from time import time | |
| from pathlib import Path | |
| import matplotlib.cm as cm | |
| import numpy as np | |
| from src.models.topic_fm import TopicFM | |
| from src import get_model_cfg | |
| from .base import Viz | |
| from src.utils.metrics import compute_symmetrical_epipolar_errors, compute_pose_errors | |
| from src.utils.plotting import draw_topics, draw_topicfm_demo, error_colormap | |
| class VizTopicFM(Viz): | |
| def __init__(self, args): | |
| super().__init__() | |
| if type(args) == dict: | |
| args = Namespace(**args) | |
| self.match_threshold = args.match_threshold | |
| self.n_sampling_topics = args.n_sampling_topics | |
| self.show_n_topics = args.show_n_topics | |
| # Load model | |
| conf = dict(get_model_cfg()) | |
| conf["match_coarse"]["thr"] = self.match_threshold | |
| conf["coarse"]["n_samples"] = self.n_sampling_topics | |
| print("model config: ", conf) | |
| self.model = TopicFM(config=conf) | |
| ckpt_dict = torch.load(args.ckpt) | |
| self.model.load_state_dict(ckpt_dict["state_dict"]) | |
| self.model = self.model.eval().to(self.device) | |
| # Name the method | |
| # self.ckpt_name = args.ckpt.split('/')[-1].split('.')[0] | |
| self.name = "TopicFM" | |
| print(f"Initialize {self.name}") | |
| def match_and_draw( | |
| self, | |
| data_dict, | |
| root_dir=None, | |
| ground_truth=False, | |
| measure_time=False, | |
| viz_matches=True, | |
| ): | |
| if measure_time: | |
| torch.cuda.synchronize() | |
| start = torch.cuda.Event(enable_timing=True) | |
| end = torch.cuda.Event(enable_timing=True) | |
| start.record() | |
| self.model(data_dict) | |
| if measure_time: | |
| torch.cuda.synchronize() | |
| end.record() | |
| torch.cuda.synchronize() | |
| self.time_stats.append(start.elapsed_time(end)) | |
| kpts0 = data_dict["mkpts0_f"].cpu().numpy() | |
| kpts1 = data_dict["mkpts1_f"].cpu().numpy() | |
| img_name0, img_name1 = list(zip(*data_dict["pair_names"]))[0] | |
| img0 = cv2.imread(os.path.join(root_dir, img_name0)) | |
| img1 = cv2.imread(os.path.join(root_dir, img_name1)) | |
| if str(data_dict["dataset_name"][0]).lower() == "scannet": | |
| img0 = cv2.resize(img0, (640, 480)) | |
| img1 = cv2.resize(img1, (640, 480)) | |
| if viz_matches: | |
| saved_name = "_".join( | |
| [ | |
| img_name0.split("/")[-1].split(".")[0], | |
| img_name1.split("/")[-1].split(".")[0], | |
| ] | |
| ) | |
| folder_matches = os.path.join(root_dir, "{}_viz_matches".format(self.name)) | |
| if not os.path.exists(folder_matches): | |
| os.makedirs(folder_matches) | |
| path_to_save_matches = os.path.join( | |
| folder_matches, "{}.png".format(saved_name) | |
| ) | |
| if ground_truth: | |
| compute_symmetrical_epipolar_errors( | |
| data_dict | |
| ) # compute epi_errs for each match | |
| compute_pose_errors( | |
| data_dict | |
| ) # compute R_errs, t_errs, pose_errs for each pair | |
| epi_errors = data_dict["epi_errs"].cpu().numpy() | |
| R_errors, t_errors = data_dict["R_errs"][0], data_dict["t_errs"][0] | |
| self.draw_matches( | |
| kpts0, | |
| kpts1, | |
| img0, | |
| img1, | |
| epi_errors, | |
| path=path_to_save_matches, | |
| R_errs=R_errors, | |
| t_errs=t_errors, | |
| ) | |
| # compute evaluation metrics | |
| rel_pair_names = list(zip(*data_dict["pair_names"])) | |
| bs = data_dict["image0"].size(0) | |
| metrics = { | |
| # to filter duplicate pairs caused by DistributedSampler | |
| "identifiers": ["#".join(rel_pair_names[b]) for b in range(bs)], | |
| "epi_errs": [ | |
| data_dict["epi_errs"][data_dict["m_bids"] == b].cpu().numpy() | |
| for b in range(bs) | |
| ], | |
| "R_errs": data_dict["R_errs"], | |
| "t_errs": data_dict["t_errs"], | |
| "inliers": data_dict["inliers"], | |
| } | |
| self.eval_stats.append({"metrics": metrics}) | |
| else: | |
| m_conf = 1 - data_dict["mconf"].cpu().numpy() | |
| self.draw_matches( | |
| kpts0, | |
| kpts1, | |
| img0, | |
| img1, | |
| m_conf, | |
| path=path_to_save_matches, | |
| conf_thr=0.4, | |
| ) | |
| if self.show_n_topics > 0: | |
| folder_topics = os.path.join( | |
| root_dir, "{}_viz_topics".format(self.name) | |
| ) | |
| if not os.path.exists(folder_topics): | |
| os.makedirs(folder_topics) | |
| draw_topics( | |
| data_dict, | |
| img0, | |
| img1, | |
| saved_folder=folder_topics, | |
| show_n_topics=self.show_n_topics, | |
| saved_name=saved_name, | |
| ) | |
| def run_demo( | |
| self, dataloader, writer=None, output_dir=None, no_display=False, skip_frames=1 | |
| ): | |
| data_dict = next(dataloader) | |
| frame_id = 0 | |
| last_image_id = 0 | |
| img0 = ( | |
| np.array(cv2.imread(str(data_dict["img_path"][0])), dtype=np.float32) / 255 | |
| ) | |
| frame_tensor = data_dict["img"].to(self.device) | |
| pair_data = {"image0": frame_tensor} | |
| last_frame = cv2.resize( | |
| img0, (frame_tensor.shape[-1], frame_tensor.shape[-2]), cv2.INTER_LINEAR | |
| ) | |
| if output_dir is not None: | |
| print("==> Will write outputs to {}".format(output_dir)) | |
| Path(output_dir).mkdir(exist_ok=True) | |
| # Create a window to display the demo. | |
| if not no_display: | |
| window_name = "Topic-assisted Feature Matching" | |
| cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) | |
| cv2.resizeWindow(window_name, (640 * 2, 480 * 2)) | |
| else: | |
| print("Skipping visualization, will not show a GUI.") | |
| # Print the keyboard help menu. | |
| print( | |
| "==> Keyboard control:\n" | |
| "\tn: select the current frame as the reference image (left)\n" | |
| "\tq: quit" | |
| ) | |
| # vis_range = [kwargs["bottom_k"], kwargs["top_k"]] | |
| while True: | |
| frame_id += 1 | |
| if frame_id == len(dataloader): | |
| print("Finished demo_loftr.py") | |
| break | |
| data_dict = next(dataloader) | |
| if frame_id % skip_frames != 0: | |
| # print("Skipping frame.") | |
| continue | |
| stem0, stem1 = last_image_id, data_dict["id"][0].item() - 1 | |
| frame = ( | |
| np.array(cv2.imread(str(data_dict["img_path"][0])), dtype=np.float32) | |
| / 255 | |
| ) | |
| frame_tensor = data_dict["img"].to(self.device) | |
| frame = cv2.resize( | |
| frame, | |
| (frame_tensor.shape[-1], frame_tensor.shape[-2]), | |
| interpolation=cv2.INTER_LINEAR, | |
| ) | |
| pair_data = {**pair_data, "image1": frame_tensor} | |
| self.model(pair_data) | |
| total_n_matches = len(pair_data["mkpts0_f"]) | |
| mkpts0 = pair_data["mkpts0_f"].cpu().numpy() # [vis_range[0]:vis_range[1]] | |
| mkpts1 = pair_data["mkpts1_f"].cpu().numpy() # [vis_range[0]:vis_range[1]] | |
| mconf = pair_data["mconf"].cpu().numpy() # [vis_range[0]:vis_range[1]] | |
| # Normalize confidence. | |
| if len(mconf) > 0: | |
| mconf = 1 - mconf | |
| # alpha = 0 | |
| # color = cm.jet(mconf, alpha=alpha) | |
| color = error_colormap(mconf, thr=0.4, alpha=0.1) | |
| text = [ | |
| f"Topics", | |
| "#Matches: {}".format(total_n_matches), | |
| ] | |
| out = draw_topicfm_demo( | |
| pair_data, | |
| last_frame, | |
| frame, | |
| mkpts0, | |
| mkpts1, | |
| color, | |
| text, | |
| show_n_topics=4, | |
| path=None, | |
| ) | |
| if not no_display: | |
| if writer is not None: | |
| writer.write(out) | |
| cv2.imshow("TopicFM Matches", out) | |
| key = chr(cv2.waitKey(10) & 0xFF) | |
| if key == "q": | |
| if writer is not None: | |
| writer.release() | |
| print("Exiting...") | |
| break | |
| elif key == "n": | |
| pair_data["image0"] = frame_tensor | |
| last_frame = frame | |
| last_image_id = data_dict["id"][0].item() - 1 | |
| frame_id_left = frame_id | |
| elif output_dir is not None: | |
| stem = "matches_{:06}_{:06}".format(stem0, stem1) | |
| out_file = str(Path(output_dir, stem + ".png")) | |
| print("\nWriting image to {}".format(out_file)) | |
| cv2.imwrite(out_file, out) | |
| else: | |
| raise ValueError("output_dir is required when no display is given.") | |
| cv2.destroyAllWindows() | |
| if writer is not None: | |
| writer.release() | |