Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import json | |
| import tempfile | |
| import os | |
| from ultralytics import YOLO | |
| import numpy as np | |
| from collections import defaultdict | |
| from typing import Dict, List, Tuple, Any | |
| class HumanTracker: | |
| def __init__(self): | |
| # Load YOLOv11 model - using the nano version for faster processing | |
| # You can change to yolo11s.pt, yolo11m.pt, yolo11l.pt, or yolo11x.pt for better accuracy | |
| self.model = YOLO("yolo11n.pt") | |
| def calculate_center(self, x1: float, y1: float, x2: float, y2: float) -> Tuple[float, float]: | |
| """Calculate center coordinates from bounding box coordinates.""" | |
| center_x = (x1 + x2) / 2 | |
| center_y = (y1 + y2) / 2 | |
| return center_x, center_y | |
| def process_video(self, video_path: str, progress_callback=None) -> Dict[str, Any]: | |
| """ | |
| Process video file and extract human tracking data. | |
| Args: | |
| video_path: Path to the input video file | |
| progress_callback: Optional callback function for progress updates | |
| Returns: | |
| Dictionary containing processed tracking data in the required JSON format | |
| """ | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Could not open video file: {video_path}") | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_data = {} | |
| id_mapping = {} # Maps original YOLO IDs to simplified sequential IDs | |
| next_person_id = 1 | |
| print(f"Processing video: {total_frames} frames at {fps} FPS") | |
| # Process video with YOLO tracking | |
| # Using stream=True for memory efficiency with large videos | |
| results = self.model.track( | |
| video_path, | |
| classes=[0], # Only detect humans (class 0) | |
| persist=True, # Enable tracking | |
| stream=True, | |
| verbose=False | |
| ) | |
| frame_count = 0 | |
| for result in results: | |
| if progress_callback: | |
| progress = (frame_count + 1) / total_frames | |
| progress_callback(progress, f"Processing frame {frame_count + 1}/{total_frames}") | |
| # Check if any detections exist | |
| if result.boxes is not None and len(result.boxes) > 0: | |
| # Extract bounding boxes, track IDs, and confidences | |
| boxes = result.boxes.xyxy.cpu().numpy() # x1, y1, x2, y2 format | |
| track_ids = result.boxes.id | |
| confidences = result.boxes.conf.cpu().numpy() | |
| if track_ids is not None: | |
| track_ids = track_ids.int().cpu().numpy() | |
| people_in_frame = [] | |
| for box, track_id, confidence in zip(boxes, track_ids, confidences): | |
| x1, y1, x2, y2 = box | |
| # Map original YOLO ID to simplified sequential ID | |
| if track_id not in id_mapping: | |
| id_mapping[track_id] = next_person_id | |
| next_person_id += 1 | |
| person_id = id_mapping[track_id] | |
| # Calculate center coordinates | |
| center_x, center_y = self.calculate_center(x1, y1, x2, y2) | |
| # Create person data | |
| person_data = { | |
| "person_id": person_id, | |
| "center_x": float(center_x), | |
| "center_y": float(center_y), | |
| "confidence": float(confidence), | |
| "bbox": { | |
| "x1": float(x1), | |
| "y1": float(y1), | |
| "x2": float(x2), | |
| "y2": float(y2) | |
| } | |
| } | |
| people_in_frame.append(person_data) | |
| if people_in_frame: | |
| # Sort people by person_id for consistency | |
| people_in_frame.sort(key=lambda x: x["person_id"]) | |
| frame_data[frame_count] = people_in_frame | |
| frame_count += 1 | |
| cap.release() | |
| # Convert to the required JSON format | |
| frames_list = [] | |
| sorted_frames = sorted(frame_data.keys()) | |
| for frame_num in sorted_frames: | |
| frames_list.append({ | |
| "frame": frame_num, | |
| "people": frame_data[frame_num] | |
| }) | |
| # Create the final output structure | |
| output = { | |
| "metadata": { | |
| "total_frames": len(frames_list), | |
| "total_people": len(id_mapping), | |
| "video_info": { | |
| "fps": float(fps), | |
| "total_video_frames": total_frames | |
| }, | |
| "id_mapping": {str(original_id): simplified_id for original_id, simplified_id in id_mapping.items()} | |
| }, | |
| "frames": frames_list | |
| } | |
| return output | |
| def process_video_gradio(video_file, progress=gr.Progress()): | |
| """ | |
| Gradio interface function for processing videos. | |
| Args: | |
| video_file: Uploaded video file from Gradio | |
| progress: Gradio progress tracker | |
| Returns: | |
| Tuple of (JSON file path, status message, preview of results) | |
| """ | |
| if video_file is None: | |
| return None, "β Please upload a video file", "No video uploaded" | |
| try: | |
| # Initialize the tracker | |
| tracker = HumanTracker() | |
| # Create progress callback | |
| def update_progress(prog, msg): | |
| progress(prog, desc=msg) | |
| # Process the video | |
| progress(0.1, desc="Starting video processing...") | |
| results = tracker.process_video(video_file, update_progress) | |
| progress(0.9, desc="Generating JSON output...") | |
| # Create temporary JSON file | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: | |
| json.dump(results, f, indent=2) | |
| json_path = f.name | |
| # Create a preview of the results | |
| metadata = results["metadata"] | |
| total_frames = metadata["total_frames"] | |
| total_people = metadata["total_people"] | |
| preview = f""" | |
| π **Processing Results:** | |
| - **Total frames with detections:** {total_frames} | |
| - **Unique people detected:** {total_people} | |
| - **Original video frames:** {metadata.get('video_info', {}).get('total_video_frames', 'N/A')} | |
| - **Video FPS:** {metadata.get('video_info', {}).get('fps', 'N/A'):.2f} | |
| π **ID Mapping:** | |
| {json.dumps(metadata["id_mapping"], indent=2)} | |
| π **Sample Frame Data (first frame):** | |
| {json.dumps(results["frames"][:1] if results["frames"] else [], indent=2)} | |
| """ | |
| progress(1.0, desc="β Processing complete!") | |
| return ( | |
| json_path, | |
| f"β Successfully processed video! Detected {total_people} unique people across {total_frames} frames.", | |
| preview | |
| ) | |
| except Exception as e: | |
| error_msg = f"β Error processing video: {str(e)}" | |
| print(error_msg) | |
| return None, error_msg, f"Error details: {str(e)}" | |
| # Create the Gradio interface | |
| def create_interface(): | |
| with gr.Blocks( | |
| title="Dynamic Veme Processor", | |
| theme=gr.themes.Soft() | |
| ) as demo: | |
| gr.Markdown(""" | |
| # π― Dynamic Veme Processor | |
| Upload a video to detect and track humans using YOLOv11. The app will: | |
| - π Detect humans in each frame | |
| - π― Track individuals across frames with unique IDs | |
| - π Extract bounding box coordinates and center points | |
| - π Generate JSON output for text overlay positioning | |
| **Supported formats:** MP4, AVI, MOV, WEBM | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video( | |
| label="πΉ Upload Video", | |
| height=400 | |
| ) | |
| process_btn = gr.Button( | |
| "π Process Video", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=1): | |
| json_output = gr.File( | |
| label="π Download JSON Results", | |
| file_count="single" | |
| ) | |
| status_output = gr.Textbox( | |
| label="π Status", | |
| value="Ready to process video...", | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| preview_output = gr.Textbox( | |
| label="ποΈ Results Preview", | |
| lines=15, | |
| interactive=False, | |
| placeholder="Results preview will appear here after processing..." | |
| ) | |
| # Event handlers | |
| process_btn.click( | |
| fn=process_video_gradio, | |
| inputs=[video_input], | |
| outputs=[json_output, status_output, preview_output], | |
| show_progress=True | |
| ) | |
| # Example section | |
| gr.Markdown(""" | |
| ## π Output Format | |
| The generated JSON file contains: | |
| - **metadata**: Video info, total people count, ID mappings | |
| - **frames**: Array of frame data with person detections | |
| Each person detection includes: | |
| - `person_id`: Unique identifier for tracking | |
| - `center_x`, `center_y`: Center coordinates for text overlay positioning | |
| - `confidence`: Detection confidence score | |
| - `bbox`: Full bounding box coordinates (x1, y1, x2, y2) | |
| """) | |
| return demo | |
| if __name__ == "__main__": | |
| # Create and launch the interface | |
| demo = create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", # Allow external access | |
| server_port=7860, | |
| share=False, # Set to True if you want a public link | |
| show_error=True | |
| ) | |