Spaces:
Runtime error
Runtime error
| from dataclasses import dataclass | |
| import os | |
| from typing import Union, List, Optional | |
| import warnings | |
| import copy | |
| from mmcv import Config | |
| import numpy as np | |
| import torch | |
| from torch import Tensor | |
| from torch.utils.data import Dataset | |
| from risk_biased.scene_dataset.pedestrian import RandomPedestrians | |
| from risk_biased.utils.torch_utils import torch_linspace | |
| class RandomSceneParams: | |
| """Dataclass that defines all the listed parameters that are necessary for a RandomScene object | |
| Args: | |
| batch_size: number of scenes in the batch | |
| time_scene: time length of the scene in seconds | |
| sample_times: list of times to get the positions | |
| ego_ref_speed: constant reference speed of the ego vehicle in meters/seconds | |
| ego_speed_init_low: lowest initial speed of the ego vehicle in meters/seconds | |
| ego_speed_init_high: higest initial speed of the ego vehicle in meters/seconds | |
| ego_acceleration_mean_low: lowest mean acceleration of the ego vehicle in m/s^2 | |
| ego_acceleration_mean_high: highest mean acceleration of the ego vehicle in m/s^2 | |
| ego_acceleration_std: std for acceleration of the ego vehicle in m/s^2 | |
| ego_length: length of the ego vehicle in meters | |
| ego_width: width of the ego vehicle in meters | |
| dt: time step to use in the trajectory sequence | |
| fast_speed: fast walking speed for the random pedestrian in meters/seconds | |
| slow_speed: slow walking speed for the random pedestrian in meters/seconds | |
| p_change_pace: probability that a slow (resp. fast) pedestrian walk at fast_speed (resp. slow_speed) at each time step | |
| proportion_fast: proportion of the pedestrians that are mainly walking at fast_speed | |
| perception_noise_std: standard deviation of the gaussian noise that is affecting the position observations | |
| """ | |
| batch_size: int | |
| time_scene: float | |
| sample_times: list | |
| ego_ref_speed: float | |
| ego_speed_init_low: float | |
| ego_speed_init_high: float | |
| ego_acceleration_mean_low: float | |
| ego_acceleration_mean_high: float | |
| ego_acceleration_std: float | |
| ego_length: float | |
| ego_width: float | |
| dt: float | |
| fast_speed: float | |
| slow_speed: float | |
| p_change_pace: float | |
| proportion_fast: float | |
| perception_noise_std: float | |
| def from_config(cfg: Config): | |
| return RandomSceneParams( | |
| batch_size=cfg.batch_size, | |
| sample_times=cfg.sample_times, | |
| time_scene=cfg.time_scene, | |
| ego_ref_speed=cfg.ego_ref_speed, | |
| ego_speed_init_low=cfg.ego_speed_init_low, | |
| ego_speed_init_high=cfg.ego_speed_init_high, | |
| ego_acceleration_mean_low=cfg.ego_acceleration_mean_low, | |
| ego_acceleration_mean_high=cfg.ego_acceleration_mean_high, | |
| ego_acceleration_std=cfg.ego_acceleration_std, | |
| ego_length=cfg.ego_length, | |
| ego_width=cfg.ego_width, | |
| dt=cfg.dt, | |
| fast_speed=cfg.fast_speed, | |
| slow_speed=cfg.slow_speed, | |
| p_change_pace=cfg.p_change_pace, | |
| proportion_fast=cfg.proportion_fast, | |
| perception_noise_std=cfg.perception_noise_std, | |
| ) | |
| class RandomScene: | |
| """ | |
| Batched scenes with one vehicle at constant velocity and one random pedestrian. Utility functions to draw the scene and compute risk factors (time to collision etc...) | |
| Args: | |
| params: dataclass containing the necessary parameters | |
| is_torch: set to True to produce Tensor batches and to False to produce numpy arrays | |
| """ | |
| def __init__( | |
| self, | |
| params: RandomSceneParams, | |
| is_torch: bool = False, | |
| ) -> None: | |
| self._is_torch = is_torch | |
| self._batch_size = params.batch_size | |
| self._fast_speed = params.fast_speed | |
| self._slow_speed = params.slow_speed | |
| self._p_change_pace = params.p_change_pace | |
| self._proportion_fast = params.proportion_fast | |
| self.dt = params.dt | |
| self.sample_times = params.sample_times | |
| self.ego_ref_speed = params.ego_ref_speed | |
| self._ego_speed_init_low = params.ego_speed_init_low | |
| self._ego_speed_init_high = params.ego_speed_init_high | |
| self._ego_acceleration_mean_low = params.ego_acceleration_mean_low | |
| self._ego_acceleration_mean_high = params.ego_acceleration_mean_high | |
| self._ego_acceleration_std = params.ego_acceleration_std | |
| self.perception_noise_std = params.perception_noise_std | |
| self.road_length = ( | |
| params.ego_ref_speed + params.fast_speed | |
| ) * params.time_scene | |
| self.time_scene = params.time_scene | |
| self.lane_width = 3 | |
| self.sidewalks_width = 1.5 | |
| self.road_width = 2 * self.lane_width + 2 * self.sidewalks_width | |
| self.bottom = -self.lane_width / 2 - self.sidewalks_width | |
| self.top = 3 * self.lane_width / 2 + self.sidewalks_width | |
| self.ego_width = 1.75 | |
| self.ego_length = 4 | |
| self.current_time = 0 | |
| if self._is_torch: | |
| pedestrians_x = ( | |
| torch.rand(params.batch_size, 1) | |
| * (self.road_length - self.ego_length / 2) | |
| + self.ego_length / 2 | |
| ) | |
| pedestrians_y = ( | |
| torch.rand(params.batch_size, 1) * (self.top - self.bottom) | |
| + self.bottom | |
| ) | |
| self._pedestrians_positions = torch.stack( | |
| (pedestrians_x, pedestrians_y), -1 | |
| ) | |
| else: | |
| pedestrians_x = np.random.uniform( | |
| low=self.ego_length / 2, | |
| high=self.road_length, | |
| size=(params.batch_size, 1), | |
| ) | |
| pedestrians_y = np.random.uniform( | |
| low=self.bottom, high=self.top, size=(params.batch_size, 1) | |
| ) | |
| self._pedestrians_positions = np.stack((pedestrians_x, pedestrians_y), -1) | |
| self.pedestrians = RandomPedestrians( | |
| batch_size=self._batch_size, | |
| dt=self.dt, | |
| fast_speed=self._fast_speed, | |
| slow_speed=self._slow_speed, | |
| p_change_pace=self._p_change_pace, | |
| proportion_fast=self._proportion_fast, | |
| is_torch=self._is_torch, | |
| ) | |
| self._set_pedestrians() | |
| def pedestrians_positions(self): | |
| # relative_positions = self._pedestrians_positions/[[(self.road_length - self.ego_length / 2), (self.top - self.bottom)]] - [[self.ego_length / 2, self.bottom]] | |
| return self._pedestrians_positions | |
| def set_pedestrians_states( | |
| self, | |
| relative_pedestrians_positions: Union[torch.Tensor, np.ndarray], | |
| pedestrians_angles: Optional[Union[torch.Tensor, np.ndarray]] = None, | |
| ): | |
| """Force pedestrian initial states | |
| Args: | |
| relative_pedestrians_positions: Relative positions in the scene as percentage distance from left to right and from bottom to top | |
| pedestrians_angles: Pedestrian heading angles in radiants | |
| """ | |
| if self._is_torch: | |
| assert isinstance(relative_pedestrians_positions, torch.Tensor) | |
| else: | |
| assert isinstance(relative_pedestrians_positions, np.ndarray) | |
| self._batch_size = relative_pedestrians_positions.shape[0] | |
| if (0 > relative_pedestrians_positions).any() or ( | |
| relative_pedestrians_positions > 1 | |
| ).any(): | |
| warnings.warn( | |
| "Some of the given pedestrian initial positions are outside of the road range" | |
| ) | |
| center_y = (self.top - self.bottom) * relative_pedestrians_positions[ | |
| :, :, 1 | |
| ] + self.bottom | |
| center_x = ( | |
| self.road_length - self.ego_length / 2 | |
| ) * relative_pedestrians_positions[:, :, 0] + self.ego_length / 2 | |
| if self._is_torch: | |
| pedestrians_positions = torch.stack([center_x, center_y], -1) | |
| else: | |
| pedestrians_positions = np.stack([center_x, center_y], -1) | |
| self.pedestrians = RandomPedestrians( | |
| batch_size=self._batch_size, | |
| dt=self.dt, | |
| fast_speed=self._fast_speed, | |
| slow_speed=self._slow_speed, | |
| p_change_pace=self._p_change_pace, | |
| proportion_fast=self._proportion_fast, | |
| is_torch=self._is_torch, | |
| ) | |
| self._pedestrians_positions = pedestrians_positions | |
| if pedestrians_angles is not None: | |
| self.pedestrians.angle = pedestrians_angles | |
| self._set_pedestrians() | |
| def _set_pedestrians(self): | |
| self.pedestrians_trajectories = self.sample_pedestrians_trajectories( | |
| self.sample_times | |
| ) | |
| self.final_pedestrians_positions = self.pedestrians_trajectories[:, :, -1] | |
| def get_ego_ref_trajectory(self, time_sequence: list): | |
| """ | |
| Returns only one ego reference trajectory and not a batch because it is always the same. | |
| Args: | |
| time_sequence: the time points at which to get the positions | |
| """ | |
| out = np.array([[[[t * self.ego_ref_speed, 0] for t in time_sequence]]]) | |
| if self._is_torch: | |
| return torch.from_numpy(out.astype("float32")) | |
| else: | |
| return out | |
| def get_pedestrians_velocities(self): | |
| """ | |
| Returns the batch of mean pedestrian velocities between their positions and their final positions. | |
| """ | |
| return (self.final_pedestrians_positions - self._pedestrians_positions)[ | |
| :, None | |
| ] / self.time_scene | |
| def get_ego_ref_velocity(self): | |
| """ | |
| Returns the reference ego velocity. | |
| """ | |
| if self._is_torch: | |
| return torch.from_numpy( | |
| np.array([[[[self.ego_ref_speed, 0]]]], dtype="float32") | |
| ) | |
| else: | |
| return np.array([[[[self.ego_ref_speed, 0]]]]) | |
| def get_ego_ref_position(self): | |
| """ | |
| Returns the current reference ego position (at set time self.current_time) | |
| """ | |
| if self._is_torch: | |
| return torch.from_numpy( | |
| np.array( | |
| [[[[self.ego_ref_speed * self.current_time, 0]]]], dtype="float32" | |
| ) | |
| ) | |
| else: | |
| return np.array([[[[self.ego_ref_speed * self.current_time, 0]]]]) | |
| def set_current_time(self, time: float): | |
| """ | |
| Set the current time of the scene. | |
| Args: | |
| time : The current time to set. It should be between 0 and 1 | |
| """ | |
| assert 0 <= time <= self.time_scene | |
| self.current_time = time | |
| def sample_ego_velocities(self, time_sequence: list): | |
| """ | |
| Get ego velocity trajectories following the ego's acceleration distribution and the initial | |
| velocity distribution. | |
| Args: | |
| time_sequence: a list of time points at which to sample the trajectory positions. | |
| Returns: | |
| batch of sequence of velocities of shape (batch_size, 1, len(time_sequence), 2) | |
| """ | |
| vel_traj = [] | |
| # uniform sampling of acceleration_mean between self._ego_acceleration_mean_low and | |
| # self._ego_acceleration_mean_high | |
| acceleration_mean = np.random.rand(self._batch_size, 2) * np.array( | |
| [ | |
| self._ego_acceleration_mean_high - self._ego_acceleration_mean_low, | |
| 0.0, | |
| ] | |
| ) + np.array([self._ego_acceleration_mean_low, 0.0]) | |
| t_prev = 0 | |
| # uniform sampling of initial velocity between self._ego_speed_init_low and | |
| # self._ego_speed_init_high | |
| vel_prev = np.random.rand(self._batch_size, 2) * np.array( | |
| [self._ego_speed_init_high - self._ego_speed_init_low, 0.0] | |
| ) + np.array([self._ego_speed_init_low, 0.0]) | |
| for t in time_sequence: | |
| # integrate accelerations once to get velocities | |
| acceleration = acceleration_mean + np.random.randn( | |
| self._batch_size, 2 | |
| ) * np.array([self._ego_acceleration_std, 0.0]) | |
| vel_prev = vel_prev + acceleration * (t - t_prev) | |
| t_prev = t | |
| vel_traj.append(vel_prev) | |
| vel_traj = np.stack(vel_traj, 1) | |
| if self._is_torch: | |
| vel_traj = torch.from_numpy(vel_traj.astype("float32")) | |
| return vel_traj[:, None] | |
| def sample_ego_trajectories(self, time_sequence: list): | |
| """ | |
| Get ego trajectories following the ego's acceleration distribution and the initial velocity | |
| distribution. | |
| Args: | |
| time_sequence: a list of time points at which to sample the trajectory positions. | |
| Returns: | |
| batch of sequence of positions of shape (batch_size, len(time_sequence), 2) | |
| """ | |
| vel_traj = self.sample_ego_velocities(time_sequence) | |
| traj = [] | |
| t_prev = 0 | |
| pos_prev = np.array([[0, 0]], dtype="float32") | |
| if self._is_torch: | |
| pos_prev = torch.from_numpy(pos_prev) | |
| for idx, t in enumerate(time_sequence): | |
| # integrate velocities once to get positions | |
| vel = vel_traj[:, :, idx, :] | |
| pos_prev = pos_prev + vel * (t - t_prev) | |
| t_prev = t | |
| traj.append(pos_prev) | |
| if self._is_torch: | |
| return torch.stack(traj, -2) | |
| else: | |
| return np.stack(traj, -2) | |
| def sample_pedestrians_trajectories(self, time_sequence: list): | |
| """ | |
| Produce pedestrian trajectories following the pedestrian behavior distribution | |
| (it is resampled, the final position will not match self.final_pedestrians_positions) | |
| Args: | |
| time_sequence: a list of time points at which to sample the trajectory positions. | |
| Returns: | |
| batch of sequence of positions of shape (batch_size, len(time_sequence), 2) | |
| """ | |
| traj = [] | |
| t_prev = 0 | |
| pos_prev = self.pedestrians_positions | |
| for t in time_sequence: | |
| pos_prev = ( | |
| pos_prev | |
| + self.pedestrians.get_final_position(t - t_prev) | |
| - self.pedestrians.position | |
| ) | |
| t_prev = t | |
| traj.append(pos_prev) | |
| if self._is_torch: | |
| traj = torch.stack(traj, 2) | |
| return traj + torch.randn_like(traj) * self.perception_noise_std | |
| else: | |
| traj = np.stack(traj, 2) | |
| return traj + np.random.randn(*traj.shape) * self.perception_noise_std | |
| def get_pedestrians_trajectories(self): | |
| """ | |
| Returns the batch of pedestrian trajectories sampled every dt. | |
| """ | |
| return self.pedestrians_trajectories | |
| def get_pedestrian_trajectory(self, ind: int, time_sequence: list = None): | |
| """ | |
| Returns one pedestrian trajectory of index ind sampled at times set in time_sequence. | |
| Args: | |
| ind: index of the pedestrian in the batch. | |
| time_sequence: a list of time points at which to sample the trajectory positions. | |
| Returns: | |
| A pedestrian trajectory of shape (len(time_sequence), 2) | |
| """ | |
| len_traj = len(self.sample_times) | |
| if self._is_torch: | |
| ped_traj = torch_linspace( | |
| self.pedestrians_positions[ind], | |
| self.final_pedestrians_positions[ind], | |
| len_traj, | |
| ) | |
| else: | |
| ped_traj = np.linspace( | |
| self.pedestrians_positions[ind], | |
| self.final_pedestrians_positions[ind], | |
| len_traj, | |
| ) | |
| if time_sequence is not None: | |
| n_steps = [int(t / self.dt) for t in time_sequence] | |
| else: | |
| n_steps = range(int(self.time_scene / self.dt)) | |
| return ped_traj[n_steps] | |
| class SceneDataset(Dataset): | |
| """ | |
| Dataset of scenes with one vehicle at constant velocity and one random pedestrian. | |
| The scenes are randomly generated so the distribution can be sampled at each batch or pre-fetched. | |
| Args: | |
| len: int number of scenes per epoch | |
| params: dataclass defining all the necessary parameters | |
| pre_fetch: set to True to fetch the whole dataset at initialization | |
| """ | |
| def __init__( | |
| self, | |
| len: int, | |
| params: RandomSceneParams, | |
| pre_fetch: bool = True, | |
| ) -> None: | |
| super().__init__() | |
| self._pre_fetch = pre_fetch | |
| self._len = len | |
| self._sample_times = params.sample_times | |
| self.params = copy.deepcopy(params) | |
| params.batch_size = len | |
| if self._pre_fetch: | |
| self.scene_set = RandomScene( | |
| params, is_torch=True | |
| ).sample_pedestrians_trajectories(self._sample_times) | |
| def __len__(self) -> int: | |
| return self._len | |
| # This is a hack, get item only returns the index so that the collate_fn can handle making the batch without looping on RandomScene creation. | |
| def __getitem__(self, index: int) -> Tensor: | |
| return index | |
| def collate_fn(self, index_list: list) -> Tensor: | |
| if self._pre_fetch: | |
| return self.scene_set[torch.from_numpy(np.array(index_list))] | |
| else: | |
| self.params.batch_size = len(index_list) | |
| return RandomScene( | |
| self.params, | |
| is_torch=True, | |
| ).sample_pedestrians_trajectories(self._sample_times) | |
| # Call this function to create a dataset as a .npy file that can be loaded as a numpy array with np.load(file_name.npy) | |
| def save_dataset(file_path: str, size: int, config: Config): | |
| """ | |
| Save a dataset at file_path using the configuration. | |
| Args: | |
| file_path: Where to save the dataset | |
| size: Number of samples to save | |
| config: Configuration to use for the dataset generation | |
| """ | |
| dir_path = os.path.dirname(file_path) | |
| config_path = os.path.join(dir_path, "config.py") | |
| config = copy.deepcopy(config) | |
| config.batch_size = size | |
| params = RandomSceneParams.from_config(config) | |
| scene = RandomScene( | |
| params, | |
| is_torch=False, | |
| ) | |
| data_pedestrian = scene.sample_pedestrians_trajectories(config.sample_times) | |
| data_ego = scene.sample_ego_trajectories(config.sample_times) | |
| data = np.stack([data_pedestrian, data_ego], 0) | |
| np.save(file_path, data) | |
| # Cannot use config.dump here because it is buggy and does not work if config was not loaded from a file. | |
| with open(config_path, "w", encoding="utf-8") as f: | |
| f.write(config.pretty_text) | |
| def load_create_dataset( | |
| config: Config, | |
| base_dir=None, | |
| ) -> List: | |
| """ | |
| Load the dataset described by its config if it exists or create one. | |
| Args: | |
| config: Configuration to use for the dataset | |
| base_dir: Where to look for the dataset or to save it. | |
| """ | |
| if base_dir is None: | |
| base_dir = os.path.join( | |
| os.path.dirname(os.path.realpath(__file__)), "..", "..", "data" | |
| ) | |
| found = False | |
| dataset_out = [] | |
| i = 0 | |
| dir_path = os.path.join(base_dir, f"scene_dataset_{i:03d}") | |
| while os.path.exists(dir_path): | |
| config_path = os.path.join(dir_path, "config.py") | |
| if os.path.exists(config_path): | |
| config_check = Config.fromfile(config_path) | |
| if config_check.dataset_parameters == config.dataset_parameters: | |
| found = True | |
| break | |
| else: | |
| warnings.warn( | |
| f"Dataset directory {dir_path} exists but doesn't contain a config file. Cannot use it." | |
| ) | |
| i += 1 | |
| dir_path = os.path.join(base_dir, f"scene_dataset_{i:03d}") | |
| if not found: | |
| print(f"Dataset not found, creating a new one.") | |
| os.makedirs(dir_path) | |
| for dataset in config.datasets: | |
| dataset_name = f"scene_dataset_{dataset}.npy" | |
| dataset_path = os.path.join(dir_path, dataset_name) | |
| save_dataset(dataset_path, config.datasets_sizes[dataset], config) | |
| if found: | |
| print(f"Loading existing dataset at {dir_path}.") | |
| for dataset in config.datasets: | |
| dataset_path = os.path.join(dir_path, f"scene_dataset_{dataset}.npy") | |
| dataset_out.append(torch.from_numpy(np.load(dataset_path).astype("float32"))) | |
| return dataset_out | |