yoloserv/modules/paravision/recognition/_internal.py

299 lines
11 KiB
Python

import cv2
import numpy as np
from . import _utils as utils
from .engine import Engine
from .types import BoundingBox, Landmarks, Embedding
from .exceptions import ModelLoadingException
LANDMARKS_EXPAND_FACTOR = 0.3
ALIGNMENT_EXPAND_FACTOR = 1.0
MASK_EXPAND_FACTOR = 0.3
"""The way the pipeline is run needs to be refactored.
Making temporary fixes for now. """
AVAILABLE_OPTIONS = ["find_landmarks", "compute_embeddings"]
class SplitGraph:
def __init__(self, models_dirpath, settings=None, engine=Engine.OPENVINO):
if settings is None:
settings = {}
if isinstance(engine, Engine):
self.engine_name = engine
else:
self.engine_name = engine.split("-")[0]
if self.engine_name == Engine.OPENVINO:
from .openvino.engine import Engine as E
elif self.engine_name == Engine.TENSORRT:
from .tensorrt.engine import Engine as E
else:
raise ModelLoadingException(
f"This is not a valid engine choice: {engine}. Available choices are: {Engine.all()}."
)
self.engine = E(models_dirpath, settings)
def prepare_for_detection(self, img):
height, width = img.shape[:2]
fd_input_height, fd_input_width = self.engine.fd_input_shape
ratio = min(fd_input_height / height, fd_input_width / width)
target_width = round(width * ratio)
target_height = round(height * ratio)
resized = utils.resize(img, target_height, target_width)
offset_pad_height = fd_input_height - target_height
offset_pad_width = fd_input_width - target_width
padded = cv2.copyMakeBorder(
resized,
0,
offset_pad_height,
0,
offset_pad_width,
cv2.BORDER_CONSTANT,
value=[0, 0, 0],
)
return padded, (target_height, target_width)
def prepare_for_landmarks(self, np_img, bbox, original_size):
exp_bbox, _, pre_pad_exp_img = utils.expand_and_crop(
np_img, LANDMARKS_EXPAND_FACTOR, bbox, original_size
)
image_h, image_w = np_img.shape[:2]
exp_img = utils.maybe_pad(pre_pad_exp_img, exp_bbox, image_h, image_w)
target_h, target_w = self.engine.lm_input_shape
resized_img = utils.resize(exp_img, target_h, target_w)
return exp_bbox, resized_img
def process_post_detection(
self, imgs, relative_bboxes, detection_input_sizes, img_indexes
):
absolute_bboxes = []
alignment_images = []
alignment_bounding_boxes = []
landmarks_input_bounding_boxes = []
landmarks_input_images = []
for i, relative_bbox in enumerate(relative_bboxes):
img = imgs[img_indexes[i]]
detection_input_size = detection_input_sizes[img_indexes[i]]
img_size = np.asarray(img.shape[:2])
absolute_bbox = utils.convert_to_absolute_coordinates(
relative_bbox,
detection_input_size,
img_size,
self.engine.fd_input_shape,
)
if absolute_bbox[0] > img_size[1] or absolute_bbox[1] > img_size[0]:
continue
square_bb = utils.square(absolute_bbox)
landmarks_input_bbox, landmarks_input_image = self.prepare_for_landmarks(
img, square_bb, img_size
)
_, alignment_bbox, alignment_image = utils.expand_and_crop(
img, ALIGNMENT_EXPAND_FACTOR, square_bb, img_size
)
absolute_bboxes.append(absolute_bbox)
alignment_images.append(alignment_image)
alignment_bounding_boxes.append(alignment_bbox)
landmarks_input_images.append(landmarks_input_image)
landmarks_input_bounding_boxes.append(landmarks_input_bbox)
values = {
"bounding_boxes": [BoundingBox(*_bb) for _bb in absolute_bboxes],
"landmarks_input_bounding_boxes": landmarks_input_bounding_boxes,
"landmarks_input_images": landmarks_input_images,
"alignment_bounding_boxes": alignment_bounding_boxes,
"alignment_images": alignment_images,
}
return values
def process_detection_options(self, detect_resp, scoring_mode, options):
values = {}
if "get_qualities" in options:
qualities, acceptabilities = self.get_qualities(
detect_resp["landmarks_input_images"]
)
values["qualities"] = qualities
values["acceptabilities"] = acceptabilities
if any(option in AVAILABLE_OPTIONS for option in options):
(landmarks, recognition_input_images) = self.find_landmarks(
detect_resp["landmarks_input_bounding_boxes"],
detect_resp["landmarks_input_images"],
detect_resp["alignment_bounding_boxes"],
detect_resp["alignment_images"],
)
values["landmarks"] = [Landmarks(*x) for x in landmarks]
values["recognition_input_images"] = recognition_input_images
if "compute_embeddings" in options:
values["embeddings"] = [
Embedding(data, scoring_mode)
for data in self.compute_embeddings(recognition_input_images)
]
return values
def run(self, imgs, scoring_mode, options=None):
if options is None:
options = []
detection_inputs = []
detection_input_sizes = []
for img in imgs:
img_for_fd, resized_size = self.prepare_for_detection(img)
detection_inputs.append(img_for_fd)
detection_input_sizes.append(resized_size)
relative_bboxes, confidences, img_indexes = self.engine.predict_bounding_boxes(
detection_inputs
)
values = {"confidences": confidences}
# post-process detection
detect_resp = self.process_post_detection(
imgs, relative_bboxes, detection_input_sizes, img_indexes
)
values.update(detect_resp)
# process options
options_resp = self.process_detection_options(
detect_resp, scoring_mode, options
)
values.update(options_resp)
return values, img_indexes
def run_from_landmarks(self, img, bboxes):
absolute_bboxes = []
alignment_images = []
alignment_bounding_boxes = []
landmarks_input_bounding_boxes = []
landmarks_input_images = []
for absolute_bbox in bboxes:
img_size = np.asarray(img.shape[:2])
bounding_box = np.array(
[
absolute_bbox.origin.x,
absolute_bbox.origin.y,
absolute_bbox.origin.x + absolute_bbox.width,
absolute_bbox.origin.y + absolute_bbox.height,
]
)
if bounding_box[0] > img_size[1] or bounding_box[1] > img_size[0]:
continue
square_bb = utils.square(bounding_box)
landmarks_input_bbox, landmarks_input_image = self.prepare_for_landmarks(
img, square_bb, img_size
)
_, alignment_bbox, alignment_image = utils.expand_and_crop(
img, ALIGNMENT_EXPAND_FACTOR, square_bb, img_size
)
absolute_bboxes.append(bounding_box)
alignment_images.append(alignment_image)
alignment_bounding_boxes.append(alignment_bbox)
landmarks_input_images.append(landmarks_input_image)
landmarks_input_bounding_boxes.append(landmarks_input_bbox)
(landmarks, recognition_input_images) = self.find_landmarks(
landmarks_input_bounding_boxes,
landmarks_input_images,
alignment_bounding_boxes,
alignment_images,
)
values = {
"bounding_boxes": [BoundingBox(*_bb) for _bb in absolute_bboxes],
"landmarks_input_bounding_boxes": landmarks_input_bounding_boxes,
"landmarks_input_images": landmarks_input_images,
"alignment_bounding_boxes": alignment_bounding_boxes,
"alignment_images": alignment_images,
"landmarks": [Landmarks(*x) for x in landmarks],
"recognition_input_images": recognition_input_images,
}
return values
def find_landmarks(
self,
landmarks_input_bounding_boxes,
landmarks_input_images,
alignment_bounding_boxes,
alignment_images,
):
if len(landmarks_input_bounding_boxes) == 0:
return [], []
relative_landmarks = self.engine.predict_landmarks(landmarks_input_images)
relative_landmarks = relative_landmarks.reshape(-1, 5, 2)
absolute_landmarks = []
recognition_input_images = []
for i, landmarks in enumerate(relative_landmarks):
landmarks_input_bounding_box = landmarks_input_bounding_boxes[i]
alignment_bounding_box = alignment_bounding_boxes[i]
alignment_image = alignment_images[i]
landmarks = utils.normalize(landmarks_input_bounding_box, landmarks)
recognition_input_image = utils.crop_and_align(
alignment_image,
landmarks - alignment_bounding_box[:2],
self.engine.fr_input_shape,
)
absolute_landmarks.append(landmarks)
recognition_input_images.append(recognition_input_image)
return absolute_landmarks, recognition_input_images
def compute_embeddings(self, recognition_input_images):
if len(recognition_input_images) == 0:
return []
return self.engine.predict_embeddings(recognition_input_images)
def get_attributes(self, recognition_input_images):
if len(recognition_input_images) == 0:
return [], []
return self.engine.predict_attributes(recognition_input_images)
def get_fr_input_shape(self):
return self.engine.fr_input_shape
def get_fr_output_shape(self):
return self.engine.fr_output_shape
def check_for_mask(self, landmarks_input_images):
if len(landmarks_input_images) == 0:
return []
return self.engine.check_for_masks(landmarks_input_images)
def get_qualities(self, landmarks_input_images):
if len(landmarks_input_images) == 0:
return [], []
qualities, acceptabilities = self.engine.get_qualities(landmarks_input_images)
qualities = np.clip(qualities, 0, 1).tolist()
acceptabilities = np.clip(acceptabilities, 0, 1).tolist()
return qualities, acceptabilities