Spaces:
Runtime error
Runtime error
| # Lloyd algorithm while estimating average cost? | |
| import os | |
| import matplotlib.pyplot as plt | |
| import matplotlib | |
| import numpy as np | |
| from pytorch_lightning.utilities.seed import seed_everything | |
| # from scipy.cluster.vq import kmeans2 | |
| # from scipy.spatial import voronoi_plot_2d, Voronoi | |
| import torch | |
| import torch.nn as nn | |
| from risk_biased.scene_dataset.loaders import SceneDataLoaders | |
| from risk_biased.scene_dataset.scene import RandomScene, RandomSceneParams | |
| # from risk_biased.scene_dataset.scene_plotter import ScenePlotter | |
| from risk_biased.utils.callbacks import get_fast_slow_scenes, DrawCallbackParams | |
| from risk_biased.utils.config_argparse import config_argparse | |
| from risk_biased.utils.cost import TTCCostNumpy | |
| from risk_biased.utils.load_model import load_from_config | |
| def draw_cost_map( | |
| model: nn.Module, | |
| selected_agent: int, | |
| device, | |
| scene: RandomScene, | |
| sqrt_n_samples: int, | |
| params: DrawCallbackParams, | |
| ): | |
| n_samples = sqrt_n_samples**2 | |
| ped_trajs = scene.get_pedestrians_trajectories() | |
| n_scenes, n_agents, n_steps, features = ped_trajs.shape | |
| input_traj = ped_trajs[:, :, : params.num_steps] | |
| normalized_input, offset = SceneDataLoaders.normalize_trajectory( | |
| torch.from_numpy(input_traj.astype("float32")).contiguous().to(device) | |
| ) | |
| n_scenes = ped_trajs.shape[0] | |
| x = np.linspace(-3, 3, sqrt_n_samples) | |
| y = np.linspace(-3, 3, sqrt_n_samples) | |
| xx, yy = np.meshgrid(x, y) | |
| prior_samples = ( | |
| torch.from_numpy(np.stack((xx, yy), -1).astype("float32")) | |
| .view(1, 1, n_samples, 2) | |
| .repeat(n_scenes, n_agents, 1, 1) | |
| ) | |
| mask_z = torch.ones_like(prior_samples[..., 0, 0]) | |
| mask_input = torch.ones_like(normalized_input[..., 0]) | |
| map = torch.empty(n_scenes, 0, 0, features, device=device) | |
| mask_map = torch.empty(n_scenes, 0, 0) | |
| generated_trajs = ( | |
| SceneDataLoaders.unnormalize_trajectory( | |
| model.decode( | |
| z_samples=prior_samples, | |
| mask_z=mask_z, | |
| x=normalized_input, | |
| mask_x=mask_input, | |
| map=map, | |
| mask_map=mask_map, | |
| offset=offset, | |
| ), | |
| offset, | |
| ) | |
| .cpu() | |
| .detach() | |
| .numpy() | |
| ) | |
| input_traj = np.repeat( | |
| input_traj.reshape((n_scenes, n_agents, 1, params.num_steps, features)), | |
| n_samples, | |
| axis=2, | |
| ) | |
| generated_ped_trajs = np.concatenate((input_traj, generated_trajs), axis=3) | |
| ego_traj = scene.get_ego_ref_trajectory(params.scene_params.sample_times)[ | |
| None, :, : | |
| ] | |
| ttc_cost_func = TTCCostNumpy(params.ttc_cost_params) | |
| sample_times = np.array(params.scene_params.sample_times) | |
| ped_velocities = ( | |
| generated_ped_trajs[:, :, :, 1:] - generated_ped_trajs[:, :, :, :-1] | |
| ) / ((sample_times[1:] - sample_times[:-1])[None, None, None, :, None]) | |
| ped_velocities = np.concatenate((ped_velocities[:, :, :, 0:1], ped_velocities), 3) | |
| ttc_cost_pred, (ttc_pred, dist_pred) = ttc_cost_func( | |
| ego_traj[:, :, :, params.num_steps :], | |
| generated_ped_trajs[:, :, :, params.num_steps :], | |
| scene.get_ego_ref_velocity()[:, :, None], | |
| ped_velocities[:, :, :, params.num_steps :], | |
| ) | |
| ttc_cost_pred = ( | |
| ttc_cost_pred[:, selected_agent] | |
| .reshape(n_scenes, sqrt_n_samples, sqrt_n_samples) | |
| .mean(0) | |
| ) | |
| cmap = plt.get_cmap("RdBu_r") | |
| plt.contourf( | |
| xx, | |
| yy, | |
| ttc_cost_pred.reshape((sqrt_n_samples, sqrt_n_samples)), | |
| 50, | |
| cmap=cmap, | |
| extent=(-3, 3, -3, 3), | |
| vmin=0, | |
| vmax=2, | |
| ) | |
| norm = matplotlib.colors.Normalize(vmin=0, vmax=2, clip=True) | |
| sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm) | |
| plt.colorbar(sm, label="TTC cost") | |
| plt.axis([-3, 3, -3, 3]) | |
| plt.show() | |
| if __name__ == "__main__": | |
| # Draws a contour plot of the cost associated with the latent samples in two scenarios: safer_fast and safer_slow | |
| working_dir = os.path.dirname(os.path.realpath(__file__)) | |
| config_path = os.path.join( | |
| working_dir, "..", "..", "risk_biased", "config", "learning_config.py" | |
| ) | |
| cfg = config_argparse(config_path) | |
| model, loaders, cfg = load_from_config(cfg) | |
| assert ( | |
| cfg.latent_dim == 2 | |
| and "The latent dimension of the model must be exactly 2 to be plotted (no dimensionality reduction capabilities)" | |
| ) | |
| scene_params = RandomSceneParams.from_config(cfg) | |
| safer_fast_scene, safer_slow_scene = get_fast_slow_scenes(scene_params, 100) | |
| draw_params = DrawCallbackParams.from_config(cfg) | |
| if cfg.seed is not None: | |
| seed_everything(cfg.seed) | |
| sqrt_n_samples = 100 | |
| n_quantize = 100 | |
| draw_cost_map( | |
| model.model, | |
| 0, | |
| model.device, | |
| safer_fast_scene, | |
| sqrt_n_samples, | |
| draw_params, | |
| ) | |
| draw_cost_map( | |
| model.model, | |
| 0, | |
| model.device, | |
| safer_slow_scene, | |
| sqrt_n_samples, | |
| draw_params, | |
| ) | |