Spaces:
Paused
Paused
| """ | |
| Emotion Detection: | |
| Model from: https://github.com/onnx/models/blob/main/vision/body_analysis/emotion_ferplus/model/emotion-ferplus-8.onnx | |
| Model name: emotion-ferplus-8.onnx | |
| """ | |
| import cv2 | |
| import numpy as np | |
| import time | |
| import os | |
| from cv2 import dnn | |
| from math import ceil | |
| import logging | |
| import queue | |
| from pathlib import Path | |
| from typing import List, NamedTuple | |
| import av | |
| import streamlit as st | |
| from streamlit_webrtc import WebRtcMode, webrtc_streamer | |
| from sample_utils.download import download_file | |
| from sample_utils.turn import get_ice_servers | |
| HERE = Path(__file__).parent | |
| ROOT = HERE.parent | |
| logger = logging.getLogger(__name__) | |
| ONNX_MODEL_URL = "https://github.com/spmallick/learnopencv/raw/master/Facial-Emotion-Recognition/emotion-ferplus-8.onnx" # noqa: E501 | |
| ONNX_MODEL_LOCAL_PATH = ROOT / "./emotion-ferplus-8.onnx" | |
| CAFFE_MODEL_URL = "https://github.com/spmallick/learnopencv/raw/master/Facial-Emotion-Recognition/RFB-320/RFB-320.caffemodel" # noqa: E501 | |
| CAFFE_MODEL_LOCAL_PATH = ROOT / "./RFB-320/RFB-320.caffemodel" | |
| PROTOTXT_URL = "https://github.com/spmallick/learnopencv/raw/master/Facial-Emotion-Recognition/RFB-320/RFB-320.prototxt" # noqa: E501 | |
| PROTOTXT_LOCAL_PATH = ROOT / "./RFB-320/RFB-320.prototxt.txt" | |
| download_file(ONNX_MODEL_URL, ONNX_MODEL_LOCAL_PATH, expected_size=None) | |
| download_file(CAFFE_MODEL_URL, CAFFE_MODEL_LOCAL_PATH, expected_size=None) | |
| download_file(PROTOTXT_URL, PROTOTXT_LOCAL_PATH, expected_size=None) | |
| # Session-specific caching | |
| onnx_cache_key = "onnx_model" | |
| caffe_cache_key = "caffe_model" | |
| if onnx_cache_key in st.session_state and caffe_cache_key in st.session_state: | |
| model = st.session_state[onnx_cache_key] | |
| net = st.session_state[caffe_cache_key] | |
| else: | |
| # emotion detection model | |
| model = cv2.dnn.readNetFromONNX(str(ONNX_MODEL_LOCAL_PATH)) | |
| # face detection model | |
| net = cv2.dnn.readNetFromCaffe(str(PROTOTXT_LOCAL_PATH), str(CAFFE_MODEL_LOCAL_PATH)) | |
| st.session_state[onnx_cache_key] = model | |
| st.session_state[caffe_cache_key] = net | |
| image_mean = np.array([127, 127, 127]) | |
| image_std = 128.0 | |
| iou_threshold = 0.3 | |
| center_variance = 0.1 | |
| size_variance = 0.2 | |
| min_boxes = [ | |
| [10.0, 16.0, 24.0], | |
| [32.0, 48.0], | |
| [64.0, 96.0], | |
| [128.0, 192.0, 256.0] | |
| ] | |
| strides = [8.0, 16.0, 32.0, 64.0] | |
| threshold = 0.5 | |
| emotion_dict = { | |
| 0: 'neutral', | |
| 1: 'happiness', | |
| 2: 'surprise', | |
| 3: 'sadness', | |
| 4: 'anger', | |
| 5: 'disgust', | |
| 6: 'fear' | |
| } | |
| def define_img_size(image_size): | |
| shrinkage_list = [] | |
| feature_map_w_h_list = [] | |
| for size in image_size: | |
| feature_map = [int(ceil(size / stride)) for stride in strides] | |
| feature_map_w_h_list.append(feature_map) | |
| for i in range(0, len(image_size)): | |
| shrinkage_list.append(strides) | |
| priors = generate_priors( | |
| feature_map_w_h_list, shrinkage_list, image_size, min_boxes | |
| ) | |
| return priors | |
| def generate_priors( | |
| feature_map_list, shrinkage_list, image_size, min_boxes | |
| ): | |
| priors = [] | |
| for index in range(0, len(feature_map_list[0])): | |
| scale_w = image_size[0] / shrinkage_list[0][index] | |
| scale_h = image_size[1] / shrinkage_list[1][index] | |
| for j in range(0, feature_map_list[1][index]): | |
| for i in range(0, feature_map_list[0][index]): | |
| x_center = (i + 0.5) / scale_w | |
| y_center = (j + 0.5) / scale_h | |
| for min_box in min_boxes[index]: | |
| w = min_box / image_size[0] | |
| h = min_box / image_size[1] | |
| priors.append([ | |
| x_center, | |
| y_center, | |
| w, | |
| h | |
| ]) | |
| print("priors nums:{}".format(len(priors))) | |
| return np.clip(priors, 0.0, 1.0) | |
| def hard_nms(box_scores, iou_threshold, top_k=-1, candidate_size=200): | |
| scores = box_scores[:, -1] | |
| boxes = box_scores[:, :-1] | |
| picked = [] | |
| indexes = np.argsort(scores) | |
| indexes = indexes[-candidate_size:] | |
| while len(indexes) > 0: | |
| current = indexes[-1] | |
| picked.append(current) | |
| if 0 < top_k == len(picked) or len(indexes) == 1: | |
| break | |
| current_box = boxes[current, :] | |
| indexes = indexes[:-1] | |
| rest_boxes = boxes[indexes, :] | |
| iou = iou_of( | |
| rest_boxes, | |
| np.expand_dims(current_box, axis=0), | |
| ) | |
| indexes = indexes[iou <= iou_threshold] | |
| return box_scores[picked, :] | |
| def area_of(left_top, right_bottom): | |
| hw = np.clip(right_bottom - left_top, 0.0, None) | |
| return hw[..., 0] * hw[..., 1] | |
| def iou_of(boxes0, boxes1, eps=1e-5): | |
| overlap_left_top = np.maximum(boxes0[..., :2], boxes1[..., :2]) | |
| overlap_right_bottom = np.minimum(boxes0[..., 2:], boxes1[..., 2:]) | |
| overlap_area = area_of(overlap_left_top, overlap_right_bottom) | |
| area0 = area_of(boxes0[..., :2], boxes0[..., 2:]) | |
| area1 = area_of(boxes1[..., :2], boxes1[..., 2:]) | |
| return overlap_area / (area0 + area1 - overlap_area + eps) | |
| def predict( | |
| width, | |
| height, | |
| confidences, | |
| boxes, | |
| prob_threshold, | |
| iou_threshold=0.3, | |
| top_k=-1 | |
| ): | |
| boxes = boxes[0] | |
| confidences = confidences[0] | |
| picked_box_probs = [] | |
| picked_labels = [] | |
| for class_index in range(1, confidences.shape[1]): | |
| probs = confidences[:, class_index] | |
| mask = probs > prob_threshold | |
| probs = probs[mask] | |
| if probs.shape[0] == 0: | |
| continue | |
| subset_boxes = boxes[mask, :] | |
| box_probs = np.concatenate( | |
| [subset_boxes, probs.reshape(-1, 1)], axis=1 | |
| ) | |
| box_probs = hard_nms(box_probs, | |
| iou_threshold=iou_threshold, | |
| top_k=top_k, | |
| ) | |
| picked_box_probs.append(box_probs) | |
| picked_labels.extend([class_index] * box_probs.shape[0]) | |
| if not picked_box_probs: | |
| return np.array([]), np.array([]), np.array([]) | |
| picked_box_probs = np.concatenate(picked_box_probs) | |
| picked_box_probs[:, 0] *= width | |
| picked_box_probs[:, 1] *= height | |
| picked_box_probs[:, 2] *= width | |
| picked_box_probs[:, 3] *= height | |
| return ( | |
| picked_box_probs[:, :4].astype(np.int32), | |
| np.array(picked_labels), | |
| picked_box_probs[:, 4] | |
| ) | |
| def convert_locations_to_boxes(locations, priors, center_variance, | |
| size_variance): | |
| if len(priors.shape) + 1 == len(locations.shape): | |
| priors = np.expand_dims(priors, 0) | |
| return np.concatenate([ | |
| locations[..., :2] * center_variance * priors[..., 2:] + priors[..., :2], | |
| np.exp(locations[..., 2:] * size_variance) * priors[..., 2:] | |
| ], axis=len(locations.shape) - 1) | |
| def center_form_to_corner_form(locations): | |
| return np.concatenate( | |
| [locations[..., :2] - locations[..., 2:] / 2, | |
| locations[..., :2] + locations[..., 2:] / 2], | |
| len(locations.shape) - 1 | |
| ) | |
| def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame: | |
| frame = frame.to_ndarray(format="bgr24") | |
| input_size = [320, 240] | |
| width = input_size[0] | |
| height = input_size[1] | |
| priors = define_img_size(input_size) | |
| img_ori = frame | |
| #print("frame size: ", frame.shape) | |
| rect = cv2.resize(img_ori, (width, height)) | |
| rect = cv2.cvtColor(rect, cv2.COLOR_BGR2RGB) | |
| net.setInput(dnn.blobFromImage( | |
| rect, 1 / image_std, (width, height), 127) | |
| ) | |
| start_time = time.time() | |
| boxes, scores = net.forward(["boxes", "scores"]) | |
| boxes = np.expand_dims(np.reshape(boxes, (-1, 4)), axis=0) | |
| scores = np.expand_dims(np.reshape(scores, (-1, 2)), axis=0) | |
| boxes = convert_locations_to_boxes( | |
| boxes, priors, center_variance, size_variance | |
| ) | |
| boxes = center_form_to_corner_form(boxes) | |
| boxes, labels, probs = predict( | |
| img_ori.shape[1], | |
| img_ori.shape[0], | |
| scores, | |
| boxes, | |
| threshold | |
| ) | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| for (x1, y1, x2, y2) in boxes: | |
| w = x2 - x1 | |
| h = y2 - y1 | |
| cv2.rectangle(frame, (x1,y1), (x2, y2), (255,0,0), 2) | |
| resize_frame = cv2.resize( | |
| gray[y1:y1 + h, x1:x1 + w], (64, 64) | |
| ) | |
| resize_frame = resize_frame.reshape(1, 1, 64, 64) | |
| model.setInput(resize_frame) | |
| output = model.forward() | |
| end_time = time.time() | |
| fps = 1 / (end_time - start_time) | |
| print(f"FPS: {fps:.1f}") | |
| pred = emotion_dict[list(output[0]).index(max(output[0]))] | |
| cv2.rectangle( | |
| img_ori, | |
| (x1, y1), | |
| (x2, y2), | |
| (215, 5, 247), | |
| 2, | |
| lineType=cv2.LINE_AA | |
| ) | |
| cv2.putText( | |
| frame, | |
| pred, | |
| (x1, y1-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 0.8, | |
| (215, 5, 247), | |
| 2, | |
| lineType=cv2.LINE_AA | |
| ) | |
| return av.VideoFrame.from_ndarray(frame, format="bgr24") | |
| if __name__ == "__main__": | |
| webrtc_ctx = webrtc_streamer( | |
| key="face-emotion-recognition", | |
| mode=WebRtcMode.SENDRECV, | |
| rtc_configuration={ | |
| "iceServers": get_ice_servers(), | |
| "iceTransportPolicy": "relay", | |
| }, | |
| video_frame_callback=video_frame_callback, | |
| media_stream_constraints={"video": True, "audio": False}, | |
| async_processing=True, | |
| ) | |
| st.markdown( | |
| "This demo uses a model and code from " | |
| "https://github.com/spmallick/learnopencv. " | |
| "Many thanks to the project." | |
| ) |