Spaces:
Running
on
Zero
Running
on
Zero
| import numpy as np | |
| import os | |
| import torch | |
| import cv2 | |
| import csv | |
| from metric import * | |
| import metric | |
| import argparse | |
| from tqdm import tqdm | |
| import json | |
| device = 'cuda' | |
| eval_metrics = [ | |
| "abs_relative_difference", | |
| "rmse_linear", | |
| "delta1_acc", | |
| # "squared_relative_difference", | |
| # "rmse_log", | |
| # "log10", | |
| # "delta2_acc", | |
| # "delta3_acc", | |
| # "i_rmse", | |
| # "silog_rmse", | |
| ] | |
| def depth2disparity(depth, return_mask=False): | |
| if isinstance(depth, torch.Tensor): | |
| disparity = torch.zeros_like(depth) | |
| elif isinstance(depth, np.ndarray): | |
| disparity = np.zeros_like(depth) | |
| non_negtive_mask = depth > 0 | |
| disparity[non_negtive_mask] = 1.0 / depth[non_negtive_mask] | |
| if return_mask: | |
| return disparity, non_negtive_mask | |
| else: | |
| return disparity | |
| def resize_images(images, new_size): | |
| resized_images = np.empty( | |
| (images.shape[0], new_size[0], new_size[1], images.shape[3]) | |
| ) | |
| for i, image in enumerate(images): | |
| if image.shape[2]==1: | |
| resized_images[i] = cv2.resize(image, (new_size[1], new_size[0]))[..., None] | |
| else: | |
| resized_images[i] = cv2.resize(image, (new_size[1], new_size[0])) | |
| return resized_images | |
| def eval_single( | |
| pred_disp_path, | |
| gt_disp_path, | |
| seq_len=98, | |
| domain='depth', | |
| method_type="ours", | |
| dataset_max_depth="70" | |
| ): | |
| # load data | |
| gt_disp = np.load(gt_disp_path)['disparity'] \ | |
| if 'disparity' in np.load(gt_disp_path).files else \ | |
| np.load(gt_disp_path)['arr_0'] # (t, 1, h, w) | |
| if method_type=="ours": | |
| pred_disp = np.load(pred_disp_path)['depth'] # (t, h, w) | |
| if method_type=="depth_anything": | |
| pred_disp = np.load(pred_disp_path)['disparity'] # (t, h, w) | |
| # seq_len | |
| if pred_disp.shape[0] < seq_len: | |
| seq_len = pred_disp.shape[0] | |
| # preprocess | |
| pred_disp = resize_images(pred_disp[..., None], (gt_disp.shape[-2], gt_disp.shape[-1])) # (t, h, w) | |
| pred_disp = pred_disp[..., 0] # (t, h, w) | |
| pred_disp = pred_disp[:seq_len] | |
| gt_disp = gt_disp[:seq_len, 0] # (t, h, w) | |
| # valid mask | |
| valid_mask = np.logical_and( | |
| (gt_disp > 1e-3), | |
| (gt_disp < dataset_max_depth) | |
| ) | |
| pred_disp = np.clip(pred_disp, a_min=1e-3, a_max=None) | |
| pred_disp_masked = pred_disp[valid_mask].reshape((-1, 1)) | |
| # choose evaluation domain | |
| DOMAIN = domain | |
| if DOMAIN=='disp': | |
| # align in real disp, calc in disp | |
| gt_disp_maksed = gt_disp[valid_mask].reshape((-1, 1)).astype(np.float64) | |
| elif DOMAIN=='depth': | |
| # align in disp = 1/depth, calc in depth | |
| gt_disp_maksed = 1. / (gt_disp[valid_mask].reshape((-1, 1)).astype(np.float64) + 1e-8) | |
| else: | |
| pass | |
| # calc scale and shift | |
| _ones = np.ones_like(pred_disp_masked) | |
| A = np.concatenate([pred_disp_masked, _ones], axis=-1) | |
| X = np.linalg.lstsq(A, gt_disp_maksed, rcond=None)[0] | |
| scale, shift = X # gt = scale * pred + shift | |
| # align | |
| aligned_pred = scale * pred_disp + shift | |
| aligned_pred = np.clip(aligned_pred, a_min=1e-3, a_max=None) | |
| # align in real disp, calc in disp | |
| if DOMAIN=='disp': | |
| pred_depth = aligned_pred | |
| gt_depth = gt_disp | |
| # align in disp = 1/depth, calc in depth | |
| elif DOMAIN=='depth': | |
| pred_depth = depth2disparity(aligned_pred) | |
| gt_depth = gt_disp | |
| else: | |
| pass | |
| # metric evaluation, clip to dataset min max | |
| pred_depth = np.clip( | |
| pred_depth, a_min=1e-3, a_max=dataset_max_depth | |
| ) | |
| # evaluate metric | |
| sample_metric = [] | |
| metric_funcs = [getattr(metric, _met) for _met in eval_metrics] | |
| # Evaluate | |
| sample_metric = [] | |
| pred_depth_ts = torch.from_numpy(pred_depth).to(device) | |
| gt_depth_ts = torch.from_numpy(gt_depth).to(device) | |
| valid_mask_ts = torch.from_numpy(valid_mask).to(device) | |
| n = valid_mask.sum((-1, -2)) | |
| valid_frame = (n > 0) | |
| pred_depth_ts = pred_depth_ts[valid_frame] | |
| gt_depth_ts = gt_depth_ts[valid_frame] | |
| valid_mask_ts = valid_mask_ts[valid_frame] | |
| for met_func in metric_funcs: | |
| _metric_name = met_func.__name__ | |
| _metric = met_func(pred_depth_ts, gt_depth_ts, valid_mask_ts).item() | |
| sample_metric.append(_metric) | |
| return sample_metric | |
| if __name__=="__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "--seq_len", | |
| type=int, | |
| default=50, | |
| help="Max video frame length for evaluation." | |
| ) | |
| parser.add_argument( | |
| "--domain", | |
| type=str, | |
| default="depth", | |
| choices=["depth", "disp"], | |
| help="Domain of metric calculation." | |
| ) | |
| parser.add_argument( | |
| "--method_type", | |
| type=str, | |
| default="ours", | |
| choices=["ours", "depth_anything"], | |
| help="Choose the methods." | |
| ) | |
| parser.add_argument( | |
| "--dataset_max_depth", | |
| type=int, | |
| default=70, | |
| help="Dataset max depth clip." | |
| ) | |
| parser.add_argument( | |
| "--pred_disp_root", | |
| type=str, | |
| default="./demo_output", | |
| help="Predicted output directory." | |
| ) | |
| parser.add_argument( | |
| "--gt_disp_root", | |
| type=str, | |
| required=True, | |
| help="GT depth directory." | |
| ) | |
| parser.add_argument( | |
| "--dataset", | |
| type=str, | |
| required=True, | |
| help="Choose the datasets." | |
| ) | |
| parser.add_argument( | |
| "--meta_path", | |
| type=str, | |
| required=True, | |
| help="Path of test dataset csv file." | |
| ) | |
| args = parser.parse_args() | |
| SEQ_LEN = args.seq_len | |
| method_type = args.method_type | |
| if method_type == "ours": | |
| pred_disp_root = os.path.join(args.pred_disp_root, f'results_{args.dataset}') | |
| else: | |
| # pred_disp_root = args.pred_disp_root | |
| pred_disp_root = os.path.join(args.pred_disp_root, f'results_{args.dataset}') | |
| domain = args.domain | |
| dataset_max_depth = args.dataset_max_depth | |
| saved_json_path = os.path.join(args.pred_disp_root, f"results_{args.dataset}.json") | |
| meta_path = args.meta_path | |
| assert method_type in ["depth_anything", "ours"], "Invalid method type, must be in ['depth_anything', 'ours']" | |
| assert domain in ["depth", "disp"], "Invalid domain type, must be in ['depth', 'disp']" | |
| with open(meta_path, mode="r", encoding="utf-8") as csvfile: | |
| csv_reader = csv.DictReader(csvfile) | |
| samples = list(csv_reader) | |
| # iterate all cases | |
| results_all = [] | |
| for i, sample in enumerate(tqdm(samples)): | |
| gt_disp_path = os.path.join(args.gt_disp_root, samples[i]['filepath_disparity']) | |
| if method_type=="ours": | |
| pred_disp_path = os.path.join(pred_disp_root, samples[i]['filepath_disparity']) | |
| pred_disp_path = pred_disp_path.replace("disparity", "rgb_left") | |
| if method_type=="depth_anything": | |
| pred_disp_path = os.path.join(pred_disp_root, samples[i]['filepath_disparity']) | |
| pred_disp_path = pred_disp_path.replace("disparity", "rgb_left_depth") | |
| results_single = eval_single( | |
| pred_disp_path, | |
| gt_disp_path, | |
| seq_len=SEQ_LEN, | |
| domain=domain, | |
| method_type=method_type, | |
| dataset_max_depth=dataset_max_depth | |
| ) | |
| results_all.append(results_single) | |
| # avarage | |
| final_results = np.array(results_all) | |
| final_results_mean = np.mean(final_results, axis=0) | |
| print("") | |
| # save mean to json | |
| result_dict = { 'name': method_type } | |
| for i, metric in enumerate(eval_metrics): | |
| result_dict[metric] = final_results_mean[i] | |
| print(f"{metric}: {final_results_mean[i]:04f}") | |
| # save each case to json | |
| for i, results in enumerate(results_all): | |
| result_dict[samples[i]['filepath_disparity']] = results | |
| # write json | |
| with open(saved_json_path, 'w') as f: | |
| json.dump(result_dict, f, indent=4) | |
| print("") | |
| print(f"Evaluation results json are saved to {saved_json_path}") | |