import cv2 import numpy as np from . import _utils as utils from .engine import Engine from .types import BoundingBox, Landmarks, Embedding from .exceptions import ModelLoadingException LANDMARKS_EXPAND_FACTOR = 0.3 ALIGNMENT_EXPAND_FACTOR = 1.0 MASK_EXPAND_FACTOR = 0.3 """The way the pipeline is run needs to be refactored. Making temporary fixes for now. """ AVAILABLE_OPTIONS = ["find_landmarks", "compute_embeddings"] class SplitGraph: def __init__(self, models_dirpath, settings=None, engine=Engine.OPENVINO): if settings is None: settings = {} if isinstance(engine, Engine): self.engine_name = engine else: self.engine_name = engine.split("-")[0] if self.engine_name == Engine.OPENVINO: from .openvino.engine import Engine as E elif self.engine_name == Engine.TENSORRT: from .tensorrt.engine import Engine as E else: raise ModelLoadingException( f"This is not a valid engine choice: {engine}. Available choices are: {Engine.all()}." ) self.engine = E(models_dirpath, settings) def prepare_for_detection(self, img): height, width = img.shape[:2] fd_input_height, fd_input_width = self.engine.fd_input_shape ratio = min(fd_input_height / height, fd_input_width / width) target_width = round(width * ratio) target_height = round(height * ratio) resized = utils.resize(img, target_height, target_width) offset_pad_height = fd_input_height - target_height offset_pad_width = fd_input_width - target_width padded = cv2.copyMakeBorder( resized, 0, offset_pad_height, 0, offset_pad_width, cv2.BORDER_CONSTANT, value=[0, 0, 0], ) return padded, (target_height, target_width) def prepare_for_landmarks(self, np_img, bbox, original_size): exp_bbox, _, pre_pad_exp_img = utils.expand_and_crop( np_img, LANDMARKS_EXPAND_FACTOR, bbox, original_size ) image_h, image_w = np_img.shape[:2] exp_img = utils.maybe_pad(pre_pad_exp_img, exp_bbox, image_h, image_w) target_h, target_w = self.engine.lm_input_shape resized_img = utils.resize(exp_img, target_h, target_w) return exp_bbox, resized_img def process_post_detection( self, imgs, relative_bboxes, detection_input_sizes, img_indexes ): absolute_bboxes = [] alignment_images = [] alignment_bounding_boxes = [] landmarks_input_bounding_boxes = [] landmarks_input_images = [] for i, relative_bbox in enumerate(relative_bboxes): img = imgs[img_indexes[i]] detection_input_size = detection_input_sizes[img_indexes[i]] img_size = np.asarray(img.shape[:2]) absolute_bbox = utils.convert_to_absolute_coordinates( relative_bbox, detection_input_size, img_size, self.engine.fd_input_shape, ) if absolute_bbox[0] > img_size[1] or absolute_bbox[1] > img_size[0]: continue square_bb = utils.square(absolute_bbox) landmarks_input_bbox, landmarks_input_image = self.prepare_for_landmarks( img, square_bb, img_size ) _, alignment_bbox, alignment_image = utils.expand_and_crop( img, ALIGNMENT_EXPAND_FACTOR, square_bb, img_size ) absolute_bboxes.append(absolute_bbox) alignment_images.append(alignment_image) alignment_bounding_boxes.append(alignment_bbox) landmarks_input_images.append(landmarks_input_image) landmarks_input_bounding_boxes.append(landmarks_input_bbox) values = { "bounding_boxes": [BoundingBox(*_bb) for _bb in absolute_bboxes], "landmarks_input_bounding_boxes": landmarks_input_bounding_boxes, "landmarks_input_images": landmarks_input_images, "alignment_bounding_boxes": alignment_bounding_boxes, "alignment_images": alignment_images, } return values def process_detection_options(self, detect_resp, scoring_mode, options): values = {} if "get_qualities" in options: qualities, acceptabilities = self.get_qualities( detect_resp["landmarks_input_images"] ) values["qualities"] = qualities values["acceptabilities"] = acceptabilities if any(option in AVAILABLE_OPTIONS for option in options): (landmarks, recognition_input_images) = self.find_landmarks( detect_resp["landmarks_input_bounding_boxes"], detect_resp["landmarks_input_images"], detect_resp["alignment_bounding_boxes"], detect_resp["alignment_images"], ) values["landmarks"] = [Landmarks(*x) for x in landmarks] values["recognition_input_images"] = recognition_input_images if "compute_embeddings" in options: values["embeddings"] = [ Embedding(data, scoring_mode) for data in self.compute_embeddings(recognition_input_images) ] return values def run(self, imgs, scoring_mode, options=None): if options is None: options = [] detection_inputs = [] detection_input_sizes = [] for img in imgs: img_for_fd, resized_size = self.prepare_for_detection(img) detection_inputs.append(img_for_fd) detection_input_sizes.append(resized_size) relative_bboxes, confidences, img_indexes = self.engine.predict_bounding_boxes( detection_inputs ) values = {"confidences": confidences} # post-process detection detect_resp = self.process_post_detection( imgs, relative_bboxes, detection_input_sizes, img_indexes ) values.update(detect_resp) # process options options_resp = self.process_detection_options( detect_resp, scoring_mode, options ) values.update(options_resp) return values, img_indexes def run_from_landmarks(self, img, bboxes): absolute_bboxes = [] alignment_images = [] alignment_bounding_boxes = [] landmarks_input_bounding_boxes = [] landmarks_input_images = [] for absolute_bbox in bboxes: img_size = np.asarray(img.shape[:2]) bounding_box = np.array( [ absolute_bbox.origin.x, absolute_bbox.origin.y, absolute_bbox.origin.x + absolute_bbox.width, absolute_bbox.origin.y + absolute_bbox.height, ] ) if bounding_box[0] > img_size[1] or bounding_box[1] > img_size[0]: continue square_bb = utils.square(bounding_box) landmarks_input_bbox, landmarks_input_image = self.prepare_for_landmarks( img, square_bb, img_size ) _, alignment_bbox, alignment_image = utils.expand_and_crop( img, ALIGNMENT_EXPAND_FACTOR, square_bb, img_size ) absolute_bboxes.append(bounding_box) alignment_images.append(alignment_image) alignment_bounding_boxes.append(alignment_bbox) landmarks_input_images.append(landmarks_input_image) landmarks_input_bounding_boxes.append(landmarks_input_bbox) (landmarks, recognition_input_images) = self.find_landmarks( landmarks_input_bounding_boxes, landmarks_input_images, alignment_bounding_boxes, alignment_images, ) values = { "bounding_boxes": [BoundingBox(*_bb) for _bb in absolute_bboxes], "landmarks_input_bounding_boxes": landmarks_input_bounding_boxes, "landmarks_input_images": landmarks_input_images, "alignment_bounding_boxes": alignment_bounding_boxes, "alignment_images": alignment_images, "landmarks": [Landmarks(*x) for x in landmarks], "recognition_input_images": recognition_input_images, } return values def find_landmarks( self, landmarks_input_bounding_boxes, landmarks_input_images, alignment_bounding_boxes, alignment_images, ): if len(landmarks_input_bounding_boxes) == 0: return [], [] relative_landmarks = self.engine.predict_landmarks(landmarks_input_images) relative_landmarks = relative_landmarks.reshape(-1, 5, 2) absolute_landmarks = [] recognition_input_images = [] for i, landmarks in enumerate(relative_landmarks): landmarks_input_bounding_box = landmarks_input_bounding_boxes[i] alignment_bounding_box = alignment_bounding_boxes[i] alignment_image = alignment_images[i] landmarks = utils.normalize(landmarks_input_bounding_box, landmarks) recognition_input_image = utils.crop_and_align( alignment_image, landmarks - alignment_bounding_box[:2], self.engine.fr_input_shape, ) absolute_landmarks.append(landmarks) recognition_input_images.append(recognition_input_image) return absolute_landmarks, recognition_input_images def compute_embeddings(self, recognition_input_images): if len(recognition_input_images) == 0: return [] return self.engine.predict_embeddings(recognition_input_images) def get_attributes(self, recognition_input_images): if len(recognition_input_images) == 0: return [], [] return self.engine.predict_attributes(recognition_input_images) def get_fr_input_shape(self): return self.engine.fr_input_shape def get_fr_output_shape(self): return self.engine.fr_output_shape def check_for_mask(self, landmarks_input_images): if len(landmarks_input_images) == 0: return [] return self.engine.check_for_masks(landmarks_input_images) def get_qualities(self, landmarks_input_images): if len(landmarks_input_images) == 0: return [], [] qualities, acceptabilities = self.engine.get_qualities(landmarks_input_images) qualities = np.clip(qualities, 0, 1).tolist() acceptabilities = np.clip(acceptabilities, 0, 1).tolist() return qualities, acceptabilities