Spaces:
Sleeping
Sleeping
| import cv2 | |
| from django import conf | |
| import numpy as np | |
| from ultralytics import YOLO | |
| from insightface.app import FaceAnalysis | |
| import torchreid | |
| import torch | |
| # Configuration | |
| DETECTION_THRESHOLD = 0.75 # Confidence threshold for person detection | |
| # ============================================================================= | |
| # MODEL INITIALIZATION | |
| # ============================================================================= | |
| # Load YOLOv8 model with ByteTrack tracker for person detection and tracking | |
| # YOLOv8 handles object detection while ByteTrack provides consistent tracking IDs | |
| model = YOLO(r'detection.pt') # Replace with your trained model path | |
| # Initialize InsightFace for facial feature extraction | |
| # Uses buffalo_l model which provides high-quality face embeddings | |
| face_app = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider']) | |
| face_app.prepare(ctx_id=0) # Prepare for GPU inference | |
| # Initialize TorchReID for full-body person re-identification | |
| # OSNet is a lightweight but effective model for person ReID | |
| reid_extractor = torchreid.utils.FeatureExtractor( | |
| model_name='osnet_x0_25', | |
| model_path='osnet_x0_25_market1501.pth', # Pre-trained on Market1501 dataset | |
| device='cuda' | |
| ) | |
| # ============================================================================= | |
| # GLOBAL VARIABLES FOR PERSON RE-IDENTIFICATION | |
| # ============================================================================= | |
| # Storage for known person embeddings and their assigned global IDs | |
| known_embeddings = [] # List of combined face+body embeddings | |
| known_ids = [] # Corresponding global IDs for each embedding | |
| next_global_id = 1 # Counter for assigning new global IDs | |
| # Mapping from ByteTrack tracker IDs to global person IDs | |
| # This helps maintain consistency when tracker IDs change | |
| track_to_global = {} | |
| # ============================================================================= | |
| # VIDEO INPUT/OUTPUT SETUP | |
| # ============================================================================= | |
| # Initialize video capture and output writer | |
| cap = cv2.VideoCapture("demo.mp4") # Input video file | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| # Create output video writer with same properties as input | |
| out = cv2.VideoWriter("output.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height)) | |
| # ============================================================================= | |
| # MAIN PROCESSING LOOP | |
| # ============================================================================= | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break # End of video | |
| # Run YOLOv8 detection with ByteTrack tracking | |
| # persist=True maintains tracking across frames | |
| results = model.track(frame, tracker="bytetrack.yaml", persist=True, | |
| verbose=False, conf=DETECTION_THRESHOLD) | |
| # Process each detection result | |
| for result in results: | |
| # Extract bounding boxes in (x1, y1, x2, y2) format | |
| boxes = result.boxes.xyxy.cpu().numpy() | |
| # Extract tracking IDs if available | |
| if result.boxes.id is not None: | |
| track_ids = result.boxes.id.int().cpu().tolist() | |
| else: | |
| # No tracking IDs available, assign None for each detection | |
| track_ids = [None] * len(boxes) | |
| # Process each detected person | |
| for box, track_id in zip(boxes, track_ids): | |
| x1, y1, x2, y2 = map(int, box) | |
| # Crop the person from the frame | |
| person_crop = frame[y1:y2, x1:x2] | |
| # Initialize embedding variables | |
| face_embedding = None | |
| body_embedding = None | |
| # ============================================================= | |
| # FACE EMBEDDING EXTRACTION | |
| # ============================================================= | |
| # Extract face embedding using InsightFace | |
| faces = face_app.get(person_crop) | |
| if faces: | |
| # Use the first detected face (most confident) | |
| face_embedding = faces[0].embedding | |
| # ============================================================= | |
| # BODY EMBEDDING EXTRACTION | |
| # ============================================================= | |
| # Extract body embedding using TorchReID | |
| try: | |
| # TorchReID expects 128x256 RGB input | |
| body_input = cv2.resize(person_crop, (128, 256)) | |
| body_input = cv2.cvtColor(body_input, cv2.COLOR_BGR2RGB) | |
| # Extract features and convert to numpy | |
| body_embedding = reid_extractor(body_input)[0].cpu().numpy() | |
| except: | |
| # Handle cases where crop is too small or invalid | |
| pass | |
| # ============================================================= | |
| # EMBEDDING COMBINATION AND PERSON MATCHING | |
| # ============================================================= | |
| # Combine face and body embeddings for robust person representation | |
| embedding = None | |
| if face_embedding is not None and body_embedding is not None: | |
| # Concatenate both embeddings for maximum distinctiveness | |
| embedding = np.concatenate((face_embedding, body_embedding)).astype(np.float32) | |
| elif face_embedding is not None: | |
| # Use only face embedding if body embedding failed | |
| embedding = face_embedding.astype(np.float32) | |
| elif body_embedding is not None: | |
| # Use only body embedding if face detection failed | |
| embedding = body_embedding.astype(np.float32) | |
| # Assign global ID based on embedding similarity | |
| if embedding is not None: | |
| match_found = False | |
| # Search for similar embeddings among known people | |
| if known_embeddings: | |
| # Only compare embeddings of the same dimension | |
| matching_embeddings = [ | |
| (emb, gid) for emb, gid in zip(known_embeddings, known_ids) | |
| if emb.shape[0] == embedding.shape[0] | |
| ] | |
| if matching_embeddings: | |
| embs, gids = zip(*matching_embeddings) | |
| embs = np.array(embs) | |
| # Calculate cosine similarity with all known embeddings | |
| sims = np.dot(embs, embedding) / ( | |
| np.linalg.norm(embs, axis=1) * np.linalg.norm(embedding) + 1e-6 | |
| ) | |
| # Find the best match | |
| best_match = np.argmax(sims) | |
| if sims[best_match] > 0.6: # Similarity threshold | |
| global_id = gids[best_match] | |
| match_found = True | |
| # If no match found, assign new global ID | |
| if not match_found: | |
| global_id = next_global_id | |
| next_global_id += 1 | |
| known_embeddings.append(embedding) | |
| known_ids.append(global_id) | |
| # Update tracker ID to global ID mapping | |
| if track_id is not None: | |
| track_to_global[track_id] = global_id | |
| display_id = global_id | |
| else: | |
| # No usable embedding available, fallback to tracker ID | |
| global_id = track_to_global.get(track_id, f"T{track_id}") | |
| display_id = global_id | |
| # ============================================================= | |
| # VISUALIZATION | |
| # ============================================================= | |
| # Draw bounding box around detected person | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
| # Display the global ID above the bounding box | |
| cv2.putText(frame, f"ID {display_id}", (x1, y1 - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) | |
| # ============================================================================= | |
| # OUTPUT AND DISPLAY | |
| # ============================================================================= | |
| # Show the frame with tracking results | |
| cv2.imshow("Tracking + ReID", frame) | |
| # Break loop if 'q' key is pressed | |
| if cv2.waitKey(1) & 0xFF == ord('q'): | |
| break | |
| # Write frame to output video | |
| out.write(frame) | |
| # ============================================================================= | |
| # CLEANUP | |
| # ============================================================================= | |
| # Release video capture and writer resources | |
| cap.release() | |
| out.release() | |
| cv2.destroyAllWindows() |