# app.py import streamlit as st from PIL import Image import cv2 import numpy as np import torch from ultralytics import YOLO import time import tempfile import os import requests from io import BytesIO # Tạo module depth_pro đơn giản (để thay thế module gốc) class DepthPro: @staticmethod def create_model_and_transforms(): # Nhập các thư viện cần thiết ở đây để tránh lỗi khi khởi tạo import torch from transformers import AutoImageProcessor, AutoModelForDepthEstimation # Tải mô hình depth estimation từ Hugging Face processor = AutoImageProcessor.from_pretrained("vinvino02/glpn-nyu") model = AutoModelForDepthEstimation.from_pretrained("vinvino02/glpn-nyu") # Tạo hàm transform đơn giản def transform(image): return processor(images=image, return_tensors="pt").pixel_values # Mở rộng model với phương thức infer def infer_method(self, image, f_px=None): with torch.no_grad(): outputs = self(image) predicted_depth = outputs.predicted_depth # Chuẩn hóa độ sâu depth_min = torch.min(predicted_depth) depth_max = torch.max(predicted_depth) predicted_depth = (predicted_depth - depth_min) / (depth_max - depth_min) predicted_depth = predicted_depth * 10 # Nhân với 10 để có giá trị mét hợp lý hơn return {"depth": predicted_depth} # Thêm phương thức infer vào model model.infer = infer_method.__get__(model) return model, transform # Hàm tải mô hình YOLO từ Hugging Face @st.cache_resource def load_yolo_model(): # Sử dụng mô hình YOLOv8n từ Hugging Face model = YOLO("yolov8n.pt") return model # Hàm tải và chuẩn bị mô hình độ sâu @st.cache_resource def load_depth_model(): depth_pro = DepthPro() model, transform = depth_pro.create_model_and_transforms() return model, transform # Hàm xử lý video def process_video(video_path): # Kiểm tra CUDA device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') st.info(f"Đang sử dụng thiết bị: {device}") # Tải mô hình YOLO with st.spinner('Đang tải mô hình YOLO...'): yolo_model = load_yolo_model() if device.type == 'cuda': yolo_model.to(device) # Tải mô hình độ sâu with st.spinner('Đang tải mô hình độ sâu...'): depth_model, transform = load_depth_model() depth_model.eval() if device.type == 'cuda': depth_model.to(device) # Mở video để xử lý cap = cv2.VideoCapture(video_path) # Lấy thuộc tính video cho đầu ra width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Tạo tệp tạm thời cho video đầu ra temp_output_dir = tempfile.mkdtemp() output_video_path = os.path.join(temp_output_dir, "person_detection_with_depth.mp4") output_depth_path = os.path.join(temp_output_dir, "depth_colormap.mp4") # Sử dụng codec phù hợp với môi trường Hugging Face fourcc = cv2.VideoWriter_fourcc(*'XVID') # Thay đổi từ mp4v sang XVID cho tương thích tốt hơn out_detection = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) out_depth = cv2.VideoWriter(output_depth_path, fourcc, fps, (width, height)) # Ước tính chiều dài tiêu cự và chuyển đổi sang tensor focal_length_px = torch.tensor([max(width, height)], device=device) # Hiển thị thanh tiến trình progress_bar = st.progress(0) progress_text = st.empty() frame_counter = 0 start_time = time.time() # Tạo cột để hiển thị khung video col1, col2 = st.columns(2) detection_placeholder = col1.empty() depth_placeholder = col2.empty() # Giảm kích thước frame để tăng tốc độ xử lý target_width = 640 # Kích thước đích scale_factor = target_width / width target_height = int(height * scale_factor) try: while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_counter += 1 # Cập nhật tiến trình progress = int(frame_counter / total_frames * 100) progress_bar.progress(progress) if frame_counter % 10 == 0: # Hiển thị tiến trình mỗi 10 khung hình elapsed_time = time.time() - start_time frames_left = total_frames - frame_counter est_time_left = (elapsed_time / frame_counter) * frames_left if frame_counter > 0 else 0 progress_text.text(f"Đang xử lý khung hình {frame_counter}/{total_frames} - Thời gian còn lại: {est_time_left:.2f}s") # Giảm kích thước khung hình để tăng tốc xử lý if scale_factor < 1: frame_resized = cv2.resize(frame, (target_width, target_height)) else: frame_resized = frame # Phát hiện YOLO results = yolo_model(frame_resized) person_boxes = [] for result in results: boxes = result.boxes.xyxy.cpu().numpy() classes = result.boxes.cls.cpu().numpy() confs = result.boxes.conf.cpu().numpy() for box, cls, conf in zip(boxes, classes, confs): if result.names[int(cls)] == "person" and conf > 0.5: # Thêm ngưỡng tin cậy if scale_factor < 1: # Điều chỉnh lại khung giới hạn nếu đã thay đổi kích thước x1, y1, x2, y2 = map(int, [box[0]/scale_factor, box[1]/scale_factor, box[2]/scale_factor, box[3]/scale_factor]) else: x1, y1, x2, y2 = map(int, box[:4]) person_boxes.append((x1, y1, x2, y2)) cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # Chuyển đổi khung hình cho đầu vào mô hình độ sâu rgb_frame = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(rgb_frame) depth_input = transform(pil_image) if device.type == 'cuda': depth_input = depth_input.to(device) # Ước tính độ sâu with torch.no_grad(): predictions = depth_model.infer(depth_input, f_px=focal_length_px) depth = predictions["depth"] # Độ sâu theo [m] depth_np = depth.squeeze().cpu().numpy() # Điều chỉnh lại kích thước bản đồ độ sâu if scale_factor < 1: depth_np = cv2.resize(depth_np, (width, height), interpolation=cv2.INTER_LINEAR) # Tạo bản đồ màu độ sâu depth_np_normalized = (depth_np - depth_np.min()) / (depth_np.max() - depth_np.min()) inv_depth_np_normalized = 1 - depth_np_normalized depth_colormap = cv2.applyColorMap((inv_depth_np_normalized * 255).astype(np.uint8), cv2.COLORMAP_TURBO) # Thêm giá trị độ sâu cho người được phát hiện for x1, y1, x2, y2 in person_boxes: center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 # Đảm bảo tọa độ nằm trong giới hạn center_x = min(center_x, depth_np.shape[1] - 1) center_y = min(center_y, depth_np.shape[0] - 1) depth_value = depth_np[center_y, center_x] text = f"Depth: {depth_value:.2f} m" font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.8 # Giảm kích thước font để phù hợp font_thickness = 2 text_size = cv2.getTextSize(text, font, font_scale, font_thickness)[0] text_x = x1 text_y = y1 - 10 rect_x1 = text_x - 5 rect_y1 = text_y - text_size[1] - 10 rect_x2 = text_x + text_size[0] + 5 rect_y2 = text_y + 5 cv2.rectangle(frame, (rect_x1, rect_y1), (rect_x2, rect_y2), (0, 255, 0), -1) cv2.putText(frame, text, (text_x, text_y), font, font_scale, (0, 0, 0), font_thickness) # Hiển thị khung hình trong Streamlit (cập nhật mỗi 5 khung hình để tránh làm chậm) if frame_counter % 5 == 0: detection_placeholder.image(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), caption="Detect people", use_container_width=True) depth_placeholder.image(depth_colormap, caption="depth: ", use_container_width=True) # Ghi khung hình vào video đầu ra out_detection.write(frame) out_depth.write(depth_colormap) finally: # Giải phóng tài nguyên cap.release() out_detection.release() out_depth.release() total_time = time.time() - start_time st.success(f"Xử lý hoàn tất! Tổng thời gian: {total_time:.2f}s") st.success(f"FPS trung bình: {frame_counter / total_time:.2f}") return output_video_path, output_depth_path # Giao diện Streamlit chính def main(): st.title("Ứng dụng Phát hiện Người và Ước tính Độ sâu") st.write("Tải lên video để phát hiện người và hiển thị thông tin độ sâu") video_path = None # Tải lên tệp video uploaded_file = st.file_uploader("Chọn một tệp video", type=['mp4', 'avi', 'mov']) if uploaded_file is not None: # Lưu tệp đã tải lên vào thư mục tạm thời temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') temp_file.write(uploaded_file.read()) video_path = temp_file.name temp_file.close() st.video(video_path) # Hiển thị thông tin về mô hình st.sidebar.header("Thông tin mô hình") st.sidebar.markdown(""" - Phát hiện người: YOLOv8n - Ước tính độ sâu: Depth_Pro """) # Thêm tùy chọn cho độ tin cậy phát hiện confidence = st.sidebar.slider("Ngưỡng tin cậy", 0.0, 1.0, 0.5) # Nút để bắt đầu xử lý if video_path and st.button("Xử lý Video"): with st.spinner("Đang xử lý video..."): detection_video_path, depth_video_path = process_video(video_path) # Hiển thị video đã xử lý st.subheader("Video đã xử lý") col1, col2 = st.columns(2) with col1: st.video(detection_video_path) st.download_button( label="Tải xuống video phát hiện", data=open(detection_video_path, 'rb').read(), file_name="person_detection_with_depth.mp4", mime="video/mp4" ) with col2: st.video(depth_video_path) st.download_button( label="Tải xuống bản đồ độ sâu", data=open(depth_video_path, 'rb').read(), file_name="depth_colormap.mp4", mime="video/mp4" ) # Xóa tệp tạm thời os.unlink(video_path) # Tệp requirements.txt def create_requirements(): requirements = """ streamlit numpy Pillow opencv-python torch torchvision transformers ultralytics requests opencv-python """ return requirements if __name__ == "__main__": main()