Spaces:
Runtime error
Runtime error
| from __future__ import absolute_import | |
| import numpy as np | |
| # from sklearn.utils.linear_assignment_ import linear_assignment | |
| from scipy.optimize import linear_sum_assignment as linear_assignment | |
| from yolox.deepsort_tracker import kalman_filter | |
| INFTY_COST = 1e+5 | |
| def min_cost_matching( | |
| distance_metric, max_distance, tracks, detections, track_indices=None, | |
| detection_indices=None): | |
| """Solve linear assignment problem. | |
| Parameters | |
| ---------- | |
| distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray | |
| The distance metric is given a list of tracks and detections as well as | |
| a list of N track indices and M detection indices. The metric should | |
| return the NxM dimensional cost matrix, where element (i, j) is the | |
| association cost between the i-th track in the given track indices and | |
| the j-th detection in the given detection_indices. | |
| max_distance : float | |
| Gating threshold. Associations with cost larger than this value are | |
| disregarded. | |
| tracks : List[track.Track] | |
| A list of predicted tracks at the current time step. | |
| detections : List[detection.Detection] | |
| A list of detections at the current time step. | |
| track_indices : List[int] | |
| List of track indices that maps rows in `cost_matrix` to tracks in | |
| `tracks` (see description above). | |
| detection_indices : List[int] | |
| List of detection indices that maps columns in `cost_matrix` to | |
| detections in `detections` (see description above). | |
| Returns | |
| ------- | |
| (List[(int, int)], List[int], List[int]) | |
| Returns a tuple with the following three entries: | |
| * A list of matched track and detection indices. | |
| * A list of unmatched track indices. | |
| * A list of unmatched detection indices. | |
| """ | |
| if track_indices is None: | |
| track_indices = np.arange(len(tracks)) | |
| if detection_indices is None: | |
| detection_indices = np.arange(len(detections)) | |
| if len(detection_indices) == 0 or len(track_indices) == 0: | |
| return [], track_indices, detection_indices # Nothing to match. | |
| cost_matrix = distance_metric( | |
| tracks, detections, track_indices, detection_indices) | |
| cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5 | |
| row_indices, col_indices = linear_assignment(cost_matrix) | |
| matches, unmatched_tracks, unmatched_detections = [], [], [] | |
| for col, detection_idx in enumerate(detection_indices): | |
| if col not in col_indices: | |
| unmatched_detections.append(detection_idx) | |
| for row, track_idx in enumerate(track_indices): | |
| if row not in row_indices: | |
| unmatched_tracks.append(track_idx) | |
| for row, col in zip(row_indices, col_indices): | |
| track_idx = track_indices[row] | |
| detection_idx = detection_indices[col] | |
| if cost_matrix[row, col] > max_distance: | |
| unmatched_tracks.append(track_idx) | |
| unmatched_detections.append(detection_idx) | |
| else: | |
| matches.append((track_idx, detection_idx)) | |
| return matches, unmatched_tracks, unmatched_detections | |
| def matching_cascade( | |
| distance_metric, max_distance, cascade_depth, tracks, detections, | |
| track_indices=None, detection_indices=None): | |
| """Run matching cascade. | |
| Parameters | |
| ---------- | |
| distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray | |
| The distance metric is given a list of tracks and detections as well as | |
| a list of N track indices and M detection indices. The metric should | |
| return the NxM dimensional cost matrix, where element (i, j) is the | |
| association cost between the i-th track in the given track indices and | |
| the j-th detection in the given detection indices. | |
| max_distance : float | |
| Gating threshold. Associations with cost larger than this value are | |
| disregarded. | |
| cascade_depth: int | |
| The cascade depth, should be se to the maximum track age. | |
| tracks : List[track.Track] | |
| A list of predicted tracks at the current time step. | |
| detections : List[detection.Detection] | |
| A list of detections at the current time step. | |
| track_indices : Optional[List[int]] | |
| List of track indices that maps rows in `cost_matrix` to tracks in | |
| `tracks` (see description above). Defaults to all tracks. | |
| detection_indices : Optional[List[int]] | |
| List of detection indices that maps columns in `cost_matrix` to | |
| detections in `detections` (see description above). Defaults to all | |
| detections. | |
| Returns | |
| ------- | |
| (List[(int, int)], List[int], List[int]) | |
| Returns a tuple with the following three entries: | |
| * A list of matched track and detection indices. | |
| * A list of unmatched track indices. | |
| * A list of unmatched detection indices. | |
| """ | |
| if track_indices is None: | |
| track_indices = list(range(len(tracks))) | |
| if detection_indices is None: | |
| detection_indices = list(range(len(detections))) | |
| unmatched_detections = detection_indices | |
| matches = [] | |
| for level in range(cascade_depth): | |
| if len(unmatched_detections) == 0: # No detections left | |
| break | |
| track_indices_l = [ | |
| k for k in track_indices | |
| if tracks[k].time_since_update == 1 + level | |
| ] | |
| if len(track_indices_l) == 0: # Nothing to match at this level | |
| continue | |
| matches_l, _, unmatched_detections = \ | |
| min_cost_matching( | |
| distance_metric, max_distance, tracks, detections, | |
| track_indices_l, unmatched_detections) | |
| matches += matches_l | |
| unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches)) | |
| return matches, unmatched_tracks, unmatched_detections | |
| def gate_cost_matrix( | |
| kf, cost_matrix, tracks, detections, track_indices, detection_indices, | |
| gated_cost=INFTY_COST, only_position=False): | |
| """Invalidate infeasible entries in cost matrix based on the state | |
| distributions obtained by Kalman filtering. | |
| Parameters | |
| ---------- | |
| kf : The Kalman filter. | |
| cost_matrix : ndarray | |
| The NxM dimensional cost matrix, where N is the number of track indices | |
| and M is the number of detection indices, such that entry (i, j) is the | |
| association cost between `tracks[track_indices[i]]` and | |
| `detections[detection_indices[j]]`. | |
| tracks : List[track.Track] | |
| A list of predicted tracks at the current time step. | |
| detections : List[detection.Detection] | |
| A list of detections at the current time step. | |
| track_indices : List[int] | |
| List of track indices that maps rows in `cost_matrix` to tracks in | |
| `tracks` (see description above). | |
| detection_indices : List[int] | |
| List of detection indices that maps columns in `cost_matrix` to | |
| detections in `detections` (see description above). | |
| gated_cost : Optional[float] | |
| Entries in the cost matrix corresponding to infeasible associations are | |
| set this value. Defaults to a very large value. | |
| only_position : Optional[bool] | |
| If True, only the x, y position of the state distribution is considered | |
| during gating. Defaults to False. | |
| Returns | |
| ------- | |
| ndarray | |
| Returns the modified cost matrix. | |
| """ | |
| gating_dim = 2 if only_position else 4 | |
| gating_threshold = kalman_filter.chi2inv95[gating_dim] | |
| measurements = np.asarray( | |
| [detections[i].to_xyah() for i in detection_indices]) | |
| for row, track_idx in enumerate(track_indices): | |
| track = tracks[track_idx] | |
| gating_distance = kf.gating_distance( | |
| track.mean, track.covariance, measurements, only_position) | |
| cost_matrix[row, gating_distance > gating_threshold] = gated_cost | |
| return cost_matrix |