Spaces:
Configuration error
Configuration error
| import numpy as np | |
| import json | |
| import triton_python_backend_utils as pb_utils | |
| import cv2 | |
| class TritonPythonModel: | |
| """Your Python model must use the same class name. Every Python model | |
| that is created must have "TritonPythonModel" as the class name. | |
| """ | |
| def initialize(self, args): | |
| """`initialize` is called only once when the model is being loaded.""" | |
| self.model_config = model_config = json.loads(args['model_config']) | |
| # Get OUTPUT0 configuration for Triton output layers | |
| num_detections_config = pb_utils.get_output_config_by_name( | |
| model_config, "num_detections") | |
| detection_boxes_config = pb_utils.get_output_config_by_name( | |
| model_config, "detection_boxes") | |
| detection_scores_config = pb_utils.get_output_config_by_name( | |
| model_config, "detection_scores") | |
| detection_classes_config = pb_utils.get_output_config_by_name( | |
| model_config, "detection_classes") | |
| # Convert Triton types to numpy types | |
| self.num_detections_dtype = pb_utils.triton_string_to_numpy( | |
| num_detections_config['data_type']) | |
| self.detection_boxes_dtype = pb_utils.triton_string_to_numpy( | |
| detection_boxes_config['data_type']) | |
| self.detection_scores_dtype = pb_utils.triton_string_to_numpy( | |
| detection_scores_config['data_type']) | |
| self.detection_classes_dtype = pb_utils.triton_string_to_numpy( | |
| detection_classes_config['data_type']) | |
| # Thresholds for detection filtering | |
| self.score_threshold = 0.25 # Confidence threshold | |
| self.nms_threshold = 0.45 # NMS threshold to suppress overlaps | |
| def execute(self, requests): | |
| """The function is executed when inference requests are made.""" | |
| num_detections_dtype = self.num_detections_dtype | |
| detection_boxes_dtype = self.detection_boxes_dtype | |
| detection_scores_dtype = self.detection_scores_dtype | |
| detection_classes_dtype = self.detection_classes_dtype | |
| responses = [] | |
| # Process each inference request | |
| for request in requests: | |
| # Get INPUT0 - input tensor for the model | |
| in_0 = pb_utils.get_input_tensor_by_name(request, "INPUT_0") | |
| # Get the output arrays from the results (assuming batch size of 1) | |
| outputs = in_0.as_numpy() | |
| outputs = np.array([cv2.transpose(outputs[0])]) # Transpose to match expected format | |
| rows = outputs.shape[1] | |
| boxes = [] | |
| scores = [] | |
| class_ids = [] | |
| # Iterate over each detection | |
| for i in range(rows): | |
| # Extract class scores and determine the best class and its score | |
| classes_scores = outputs[0][i][4:] | |
| (minScore, maxScore, minClassLoc, (x, maxClassIndex) | |
| ) = cv2.minMaxLoc(classes_scores) | |
| if maxScore >= self.score_threshold: # Filter out low confidence predictions | |
| # YOLO format: (x_center, y_center, width, height) | |
| box = [ | |
| outputs[0][i][0] - (0.5 * outputs[0][i][2]), # x_min | |
| outputs[0][i][1] - (0.5 * outputs[0][i][3]), # y_min | |
| outputs[0][i][2], # width | |
| outputs[0][i][3] # height | |
| ] | |
| boxes.append(box) | |
| scores.append(maxScore) | |
| class_ids.append(maxClassIndex) # Store the predicted class ID | |
| # Apply Non-Maximum Suppression (NMS) to remove redundant boxes | |
| result_boxes = cv2.dnn.NMSBoxes(boxes, scores, self.score_threshold, self.nms_threshold) | |
| num_detections = 0 | |
| output_boxes = [] | |
| output_scores = [] | |
| output_classids = [] | |
| # Process the final set of boxes after NMS | |
| for i in range(len(result_boxes)): | |
| index = result_boxes[i] | |
| box = boxes[index] | |
| detection = { | |
| 'class_id': class_ids[index], # Store as integer | |
| 'confidence': scores[index], # Confidence score | |
| 'box': box # Bounding box | |
| } | |
| output_boxes.append(box) | |
| output_scores.append(scores[index]) | |
| output_classids.append(class_ids[index]) | |
| num_detections += 1 | |
| # Create output tensors for Triton | |
| num_detections = np.array([num_detections], dtype=num_detections_dtype) | |
| detection_boxes = np.array(output_boxes, dtype=detection_boxes_dtype) | |
| detection_scores = np.array(output_scores, dtype=detection_scores_dtype) | |
| detection_classes = np.array(output_classids, dtype=detection_classes_dtype) | |
| # Create the inference response | |
| inference_response = pb_utils.InferenceResponse( | |
| output_tensors=[ | |
| pb_utils.Tensor("num_detections", num_detections), | |
| pb_utils.Tensor("detection_boxes", detection_boxes), | |
| pb_utils.Tensor("detection_scores", detection_scores), | |
| pb_utils.Tensor("detection_classes", detection_classes) | |
| ] | |
| ) | |
| responses.append(inference_response) | |
| return responses | |
| def finalize(self): | |
| """Clean-up function when the model is unloaded.""" | |
| pass | |