Spaces:
Runtime error
Runtime error
| import numpy as np | |
| #from numba import jit | |
| from collections import OrderedDict, deque | |
| import itertools | |
| import os | |
| import cv2 | |
| import torch | |
| from torch._C import dtype | |
| import torchvision | |
| from yolox.motdt_tracker import matching | |
| from .kalman_filter import KalmanFilter | |
| from .reid_model import load_reid_model, extract_reid_features | |
| from yolox.data.dataloading import get_yolox_datadir | |
| from .basetrack import BaseTrack, TrackState | |
| class STrack(BaseTrack): | |
| def __init__(self, tlwh, score, max_n_features=100, from_det=True): | |
| # wait activate | |
| self._tlwh = np.asarray(tlwh, dtype=np.float) | |
| self.kalman_filter = None | |
| self.mean, self.covariance = None, None | |
| self.is_activated = False | |
| self.score = score | |
| self.max_n_features = max_n_features | |
| self.curr_feature = None | |
| self.last_feature = None | |
| self.features = deque([], maxlen=self.max_n_features) | |
| # classification | |
| self.from_det = from_det | |
| self.tracklet_len = 0 | |
| self.time_by_tracking = 0 | |
| # self-tracking | |
| self.tracker = None | |
| def set_feature(self, feature): | |
| if feature is None: | |
| return False | |
| self.features.append(feature) | |
| self.curr_feature = feature | |
| self.last_feature = feature | |
| # self._p_feature = 0 | |
| return True | |
| def predict(self): | |
| if self.time_since_update > 0: | |
| self.tracklet_len = 0 | |
| self.time_since_update += 1 | |
| mean_state = self.mean.copy() | |
| if self.state != TrackState.Tracked: | |
| mean_state[7] = 0 | |
| self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance) | |
| if self.tracker: | |
| self.tracker.update_roi(self.tlwh) | |
| def self_tracking(self, image): | |
| tlwh = self.tracker.predict(image) if self.tracker else self.tlwh | |
| return tlwh | |
| def activate(self, kalman_filter, frame_id, image): | |
| """Start a new tracklet""" | |
| self.kalman_filter = kalman_filter # type: KalmanFilter | |
| self.track_id = self.next_id() | |
| # cx, cy, aspect_ratio, height, dx, dy, da, dh | |
| self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(self._tlwh)) | |
| # self.tracker = sot.SingleObjectTracker() | |
| # self.tracker.init(image, self.tlwh) | |
| del self._tlwh | |
| self.time_since_update = 0 | |
| self.time_by_tracking = 0 | |
| self.tracklet_len = 0 | |
| self.state = TrackState.Tracked | |
| # self.is_activated = True | |
| self.frame_id = frame_id | |
| self.start_frame = frame_id | |
| def re_activate(self, new_track, frame_id, image, new_id=False): | |
| # self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(new_track.tlwh)) | |
| self.mean, self.covariance = self.kalman_filter.update( | |
| self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh) | |
| ) | |
| self.time_since_update = 0 | |
| self.time_by_tracking = 0 | |
| self.tracklet_len = 0 | |
| self.state = TrackState.Tracked | |
| self.is_activated = True | |
| self.frame_id = frame_id | |
| if new_id: | |
| self.track_id = self.next_id() | |
| self.set_feature(new_track.curr_feature) | |
| def update(self, new_track, frame_id, image, update_feature=True): | |
| """ | |
| Update a matched track | |
| :type new_track: STrack | |
| :type frame_id: int | |
| :type update_feature: bool | |
| :return: | |
| """ | |
| self.frame_id = frame_id | |
| self.time_since_update = 0 | |
| if new_track.from_det: | |
| self.time_by_tracking = 0 | |
| else: | |
| self.time_by_tracking += 1 | |
| self.tracklet_len += 1 | |
| new_tlwh = new_track.tlwh | |
| self.mean, self.covariance = self.kalman_filter.update( | |
| self.mean, self.covariance, self.tlwh_to_xyah(new_tlwh)) | |
| self.state = TrackState.Tracked | |
| self.is_activated = True | |
| self.score = new_track.score | |
| if update_feature: | |
| self.set_feature(new_track.curr_feature) | |
| if self.tracker: | |
| self.tracker.update(image, self.tlwh) | |
| #@jit | |
| def tlwh(self): | |
| """Get current position in bounding box format `(top left x, top left y, | |
| width, height)`. | |
| """ | |
| if self.mean is None: | |
| return self._tlwh.copy() | |
| ret = self.mean[:4].copy() | |
| ret[2] *= ret[3] | |
| ret[:2] -= ret[2:] / 2 | |
| return ret | |
| #@jit | |
| def tlbr(self): | |
| """Convert bounding box to format `(min x, min y, max x, max y)`, i.e., | |
| `(top left, bottom right)`. | |
| """ | |
| ret = self.tlwh.copy() | |
| ret[2:] += ret[:2] | |
| return ret | |
| #@jit | |
| def tlwh_to_xyah(tlwh): | |
| """Convert bounding box to format `(center x, center y, aspect ratio, | |
| height)`, where the aspect ratio is `width / height`. | |
| """ | |
| ret = np.asarray(tlwh).copy() | |
| ret[:2] += ret[2:] / 2 | |
| ret[2] /= ret[3] | |
| return ret | |
| def to_xyah(self): | |
| return self.tlwh_to_xyah(self.tlwh) | |
| def tracklet_score(self): | |
| # score = (1 - np.exp(-0.6 * self.hit_streak)) * np.exp(-0.03 * self.time_by_tracking) | |
| score = max(0, 1 - np.log(1 + 0.05 * self.time_by_tracking)) * (self.tracklet_len - self.time_by_tracking > 2) | |
| # score = max(0, 1 - np.log(1 + 0.05 * self.n_tracking)) * (1 - np.exp(-0.6 * self.hit_streak)) | |
| return score | |
| def __repr__(self): | |
| return 'OT_{}_({}-{})'.format(self.track_id, self.start_frame, self.end_frame) | |
| class OnlineTracker(object): | |
| def __init__(self, model_folder, min_cls_score=0.4, min_ap_dist=0.8, max_time_lost=30, use_tracking=True, use_refind=True): | |
| self.min_cls_score = min_cls_score | |
| self.min_ap_dist = min_ap_dist | |
| self.max_time_lost = max_time_lost | |
| self.kalman_filter = KalmanFilter() | |
| self.tracked_stracks = [] # type: list[STrack] | |
| self.lost_stracks = [] # type: list[STrack] | |
| self.removed_stracks = [] # type: list[STrack] | |
| self.use_refind = use_refind | |
| self.use_tracking = use_tracking | |
| self.classifier = None | |
| self.reid_model = load_reid_model(model_folder) | |
| self.frame_id = 0 | |
| def update(self, output_results, img_info, img_size, img_file_name): | |
| img_file_name = os.path.join(get_yolox_datadir(), 'mot', 'train', img_file_name) | |
| image = cv2.imread(img_file_name) | |
| # post process detections | |
| output_results = output_results.cpu().numpy() | |
| confidences = output_results[:, 4] * output_results[:, 5] | |
| bboxes = output_results[:, :4] # x1y1x2y2 | |
| img_h, img_w = img_info[0], img_info[1] | |
| scale = min(img_size[0] / float(img_h), img_size[1] / float(img_w)) | |
| bboxes /= scale | |
| bbox_xyxy = bboxes | |
| tlwhs = self._xyxy_to_tlwh_array(bbox_xyxy) | |
| remain_inds = confidences > self.min_cls_score | |
| tlwhs = tlwhs[remain_inds] | |
| det_scores = confidences[remain_inds] | |
| self.frame_id += 1 | |
| activated_starcks = [] | |
| refind_stracks = [] | |
| lost_stracks = [] | |
| removed_stracks = [] | |
| """step 1: prediction""" | |
| for strack in itertools.chain(self.tracked_stracks, self.lost_stracks): | |
| strack.predict() | |
| """step 2: scoring and selection""" | |
| if det_scores is None: | |
| det_scores = np.ones(len(tlwhs), dtype=float) | |
| detections = [STrack(tlwh, score, from_det=True) for tlwh, score in zip(tlwhs, det_scores)] | |
| if self.use_tracking: | |
| tracks = [STrack(t.self_tracking(image), t.score * t.tracklet_score(), from_det=False) | |
| for t in itertools.chain(self.tracked_stracks, self.lost_stracks) if t.is_activated] | |
| detections.extend(tracks) | |
| rois = np.asarray([d.tlbr for d in detections], dtype=np.float32) | |
| scores = np.asarray([d.score for d in detections], dtype=np.float32) | |
| # nms | |
| if len(detections) > 0: | |
| nms_out_index = torchvision.ops.batched_nms( | |
| torch.from_numpy(rois), | |
| torch.from_numpy(scores.reshape(-1)).to(torch.from_numpy(rois).dtype), | |
| torch.zeros_like(torch.from_numpy(scores.reshape(-1))), | |
| 0.7, | |
| ) | |
| keep = nms_out_index.numpy() | |
| mask = np.zeros(len(rois), dtype=np.bool) | |
| mask[keep] = True | |
| keep = np.where(mask & (scores >= self.min_cls_score))[0] | |
| detections = [detections[i] for i in keep] | |
| scores = scores[keep] | |
| for d, score in zip(detections, scores): | |
| d.score = score | |
| pred_dets = [d for d in detections if not d.from_det] | |
| detections = [d for d in detections if d.from_det] | |
| # set features | |
| tlbrs = [det.tlbr for det in detections] | |
| features = extract_reid_features(self.reid_model, image, tlbrs) | |
| features = features.cpu().numpy() | |
| for i, det in enumerate(detections): | |
| det.set_feature(features[i]) | |
| """step 3: association for tracked""" | |
| # matching for tracked targets | |
| unconfirmed = [] | |
| tracked_stracks = [] # type: list[STrack] | |
| for track in self.tracked_stracks: | |
| if not track.is_activated: | |
| unconfirmed.append(track) | |
| else: | |
| tracked_stracks.append(track) | |
| dists = matching.nearest_reid_distance(tracked_stracks, detections, metric='euclidean') | |
| dists = matching.gate_cost_matrix(self.kalman_filter, dists, tracked_stracks, detections) | |
| matches, u_track, u_detection = matching.linear_assignment(dists, thresh=self.min_ap_dist) | |
| for itracked, idet in matches: | |
| tracked_stracks[itracked].update(detections[idet], self.frame_id, image) | |
| # matching for missing targets | |
| detections = [detections[i] for i in u_detection] | |
| dists = matching.nearest_reid_distance(self.lost_stracks, detections, metric='euclidean') | |
| dists = matching.gate_cost_matrix(self.kalman_filter, dists, self.lost_stracks, detections) | |
| matches, u_lost, u_detection = matching.linear_assignment(dists, thresh=self.min_ap_dist) | |
| for ilost, idet in matches: | |
| track = self.lost_stracks[ilost] # type: STrack | |
| det = detections[idet] | |
| track.re_activate(det, self.frame_id, image, new_id=not self.use_refind) | |
| refind_stracks.append(track) | |
| # remaining tracked | |
| # tracked | |
| len_det = len(u_detection) | |
| detections = [detections[i] for i in u_detection] + pred_dets | |
| r_tracked_stracks = [tracked_stracks[i] for i in u_track] | |
| dists = matching.iou_distance(r_tracked_stracks, detections) | |
| matches, u_track, u_detection = matching.linear_assignment(dists, thresh=0.5) | |
| for itracked, idet in matches: | |
| r_tracked_stracks[itracked].update(detections[idet], self.frame_id, image, update_feature=True) | |
| for it in u_track: | |
| track = r_tracked_stracks[it] | |
| track.mark_lost() | |
| lost_stracks.append(track) | |
| # unconfirmed | |
| detections = [detections[i] for i in u_detection if i < len_det] | |
| dists = matching.iou_distance(unconfirmed, detections) | |
| matches, u_unconfirmed, u_detection = matching.linear_assignment(dists, thresh=0.7) | |
| for itracked, idet in matches: | |
| unconfirmed[itracked].update(detections[idet], self.frame_id, image, update_feature=True) | |
| for it in u_unconfirmed: | |
| track = unconfirmed[it] | |
| track.mark_removed() | |
| removed_stracks.append(track) | |
| """step 4: init new stracks""" | |
| for inew in u_detection: | |
| track = detections[inew] | |
| if not track.from_det or track.score < 0.6: | |
| continue | |
| track.activate(self.kalman_filter, self.frame_id, image) | |
| activated_starcks.append(track) | |
| """step 6: update state""" | |
| for track in self.lost_stracks: | |
| if self.frame_id - track.end_frame > self.max_time_lost: | |
| track.mark_removed() | |
| removed_stracks.append(track) | |
| self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked] | |
| self.lost_stracks = [t for t in self.lost_stracks if t.state == TrackState.Lost] # type: list[STrack] | |
| self.tracked_stracks.extend(activated_starcks) | |
| self.tracked_stracks.extend(refind_stracks) | |
| self.lost_stracks.extend(lost_stracks) | |
| self.removed_stracks.extend(removed_stracks) | |
| # output_stracks = self.tracked_stracks + self.lost_stracks | |
| # get scores of lost tracks | |
| output_tracked_stracks = [track for track in self.tracked_stracks if track.is_activated] | |
| output_stracks = output_tracked_stracks | |
| return output_stracks | |
| def _xyxy_to_tlwh_array(bbox_xyxy): | |
| if isinstance(bbox_xyxy, np.ndarray): | |
| bbox_tlwh = bbox_xyxy.copy() | |
| elif isinstance(bbox_xyxy, torch.Tensor): | |
| bbox_tlwh = bbox_xyxy.clone() | |
| bbox_tlwh[:, 2] = bbox_xyxy[:, 2] - bbox_xyxy[:, 0] | |
| bbox_tlwh[:, 3] = bbox_xyxy[:, 3] - bbox_xyxy[:, 1] | |
| return bbox_tlwh | |