311 lines
7.6 KiB
Python
311 lines
7.6 KiB
Python
import json
|
|
import cv2
|
|
import importlib
|
|
import numpy as np
|
|
|
|
from os import walk, path
|
|
|
|
from .engine import Engine
|
|
from .exceptions import ModelLoadingException, InternalErrorException
|
|
from .types import Face
|
|
|
|
OPENVINO_EXT = "xml"
|
|
TENSORRT_EXT = "onnx"
|
|
MODELS_DIRECTORY = "recognition"
|
|
|
|
KEYS = {
|
|
"acceptabilities": "acceptability",
|
|
"bounding_boxes": "bounding_box",
|
|
"confidences": "score",
|
|
"recognition_input_images": "recognition_input_image",
|
|
"embeddings": "embedding",
|
|
"landmarks_input_images": "landmarks_input_image",
|
|
"mask_input_images": "mask_input_image",
|
|
"landmarks_input_bounding_boxes": "landmarks_input_bounding_box",
|
|
"alignment_bounding_boxes": "alignment_bounding_box",
|
|
"alignment_images": "alignment_image",
|
|
"qualities": "quality",
|
|
}
|
|
|
|
_SQUARE_TO_POINTS = [
|
|
[38.2946, 51.6963],
|
|
[73.5318, 51.5014],
|
|
[56.0252, 71.7366],
|
|
[41.5493, 92.3655],
|
|
[70.7299, 92.2041],
|
|
]
|
|
|
|
|
|
def model_location():
|
|
try:
|
|
paravision_models = importlib.import_module("paravision_models")
|
|
|
|
return paravision_models.location()
|
|
except ModuleNotFoundError as err:
|
|
raise ModelLoadingException(
|
|
"You need to install Paravision Models package"
|
|
) from err
|
|
|
|
|
|
def match_engine():
|
|
try:
|
|
paravision_models = importlib.import_module("paravision_models")
|
|
|
|
return paravision_models.engine()
|
|
except ModuleNotFoundError as err:
|
|
raise ModelLoadingException(
|
|
"You need to install Paravision Models package"
|
|
) from err
|
|
|
|
|
|
def match_engine_given_path(models_dir):
|
|
(_, _, filenames) = next(walk(path.join(models_dir, MODELS_DIRECTORY)))
|
|
|
|
if any(OPENVINO_EXT in f_name for f_name in filenames):
|
|
return Engine.OPENVINO
|
|
|
|
if any(TENSORRT_EXT in f_name for f_name in filenames):
|
|
return Engine.TENSORRT
|
|
|
|
raise ModelLoadingException(
|
|
"No compatible models found. Please ensure that your model path is correct."
|
|
)
|
|
|
|
|
|
def mask_model_location():
|
|
try:
|
|
mask = importlib.import_module("paravision_models.mask")
|
|
|
|
return mask.location()
|
|
except ModuleNotFoundError as err:
|
|
raise ModelLoadingException(
|
|
"You need to install Paravision Mask Model package"
|
|
) from err
|
|
|
|
|
|
def read_spec_value(model_loc, key):
|
|
try:
|
|
with open(path.join(model_loc, "spec.json"), "r", encoding="utf-8") as f:
|
|
spec = json.load(f)
|
|
|
|
return spec[key]
|
|
except (FileNotFoundError, KeyError) as err:
|
|
raise ModelLoadingException(
|
|
"Invalid spec file. Please verify the models are installed correctly."
|
|
) from err
|
|
|
|
|
|
def build_faces(graph_dict):
|
|
faces = []
|
|
for values in zip(*graph_dict.values()):
|
|
face_dict = {KEYS.get(k, k): v for k, v in zip(graph_dict.keys(), values)}
|
|
face_dict["bounding_box"].score = face_dict.get("score", None)
|
|
face = Face(face_dict["bounding_box"])
|
|
face_dict.pop("bounding_box")
|
|
face_dict.pop("score", None)
|
|
for k, v in face_dict.items():
|
|
setattr(face, k, v)
|
|
faces.append(face)
|
|
|
|
return faces
|
|
|
|
|
|
def read_fd_input_shape(model_loc, fd_model_type):
|
|
if fd_model_type == "streaming":
|
|
return read_spec_value(model_loc, "fd_streaming_input_shape")
|
|
return read_spec_value(model_loc, "fd_input_shape")
|
|
|
|
|
|
def read_lm_input_shape(model_loc):
|
|
return read_spec_value(model_loc, "lm_input_shape")
|
|
|
|
|
|
def read_fr_input_shape(model_loc):
|
|
return read_spec_value(model_loc, "fr_input_shape")
|
|
|
|
|
|
def read_fr_output_shape(model_loc):
|
|
return read_spec_value(model_loc, "embedding_size")
|
|
|
|
|
|
def read_at_input_shape(model_loc):
|
|
return read_spec_value(model_loc, "at_input_shape")
|
|
|
|
|
|
def read_em_input_shape(model_loc):
|
|
return read_spec_value(model_loc, "em_input_shape")
|
|
|
|
|
|
def read_md_input_shape(model_loc):
|
|
return read_spec_value(model_loc, "md_input_shape")
|
|
|
|
|
|
def resize(np_img, height, width):
|
|
return cv2.resize(np_img, (width, height))
|
|
|
|
|
|
def expand_bb(bbox, p=1.0):
|
|
"""Takes a bounding box and expand by a factor of 1 + p
|
|
|
|
Args:
|
|
bb: bounding box in the format of [x1, y1, x2, y2]
|
|
p: additive factor
|
|
"""
|
|
x1, y1, x2, y2 = bbox
|
|
|
|
dx = (x2 - x1) * p / 2
|
|
dy = (y2 - y1) * p / 2
|
|
|
|
x1 -= dx
|
|
y1 -= dy
|
|
x2 += dx
|
|
y2 += dy
|
|
|
|
return x1, y1, x2, y2
|
|
|
|
|
|
def restrict_bbox_to_edges(h, w, bbox):
|
|
x1, y1, x2, y2 = bbox
|
|
|
|
x1 = max(x1, 0)
|
|
y1 = max(y1, 0)
|
|
x2 = min(x2, w)
|
|
y2 = min(y2, h)
|
|
|
|
return x1, y1, x2, y2
|
|
|
|
|
|
def maybe_pad(crop_img, exp_bbox, h, w):
|
|
x1, y1, x2, y2 = exp_bbox
|
|
|
|
pc1 = max(0 - x1, 0)
|
|
pc2 = max(0, x2 - w)
|
|
pr1 = max(0 - y1, 0)
|
|
pr2 = max(0, y2 - h)
|
|
pad = np.rint(np.array([(pr1, pr2), (pc1, pc2), (0, 0)])).astype(np.int32)
|
|
crop_pad_img = np.pad(crop_img, pad, mode="constant")
|
|
|
|
return crop_pad_img
|
|
|
|
|
|
def square(bb):
|
|
x1, y1, x2, y2 = bb
|
|
padding = ((x2 - x1) - (y2 - y1)) / 2
|
|
|
|
if padding < 0:
|
|
x1 += padding
|
|
x2 -= padding
|
|
elif padding > 0:
|
|
y1 -= padding
|
|
y2 += padding
|
|
|
|
return x1, y1, x2, y2
|
|
|
|
|
|
def crop(np_img, bb, h, w):
|
|
"""Simple crop function in numpy
|
|
|
|
Args:
|
|
np_img: H x W x C image
|
|
bb: list or tuple of format (x1, y1, x2, y2)
|
|
|
|
Returns:
|
|
cropped numpy image
|
|
"""
|
|
x1, y1, x2, y2 = bb
|
|
|
|
if x1 >= x2 or y1 >= y2:
|
|
raise InternalErrorException("Invalid bounding box for image cropping.")
|
|
|
|
x1 = max(x1, 0)
|
|
y1 = max(y1, 0)
|
|
x2 = min(x2, w)
|
|
y2 = min(y2, h)
|
|
|
|
x1, y1, x2, y2 = np.rint([x1, y1, x2, y2]).astype(np.int32)
|
|
|
|
return np_img[y1:y2, x1:x2, :]
|
|
|
|
|
|
def compute_transform(src_points, dst_points):
|
|
"""estimate the rigid transform needed to transform src_points into
|
|
dst_points
|
|
"""
|
|
points1 = np.asarray(src_points)
|
|
points2 = np.asarray(dst_points)
|
|
|
|
# zero-mean
|
|
center1 = np.expand_dims(np.mean(points1, axis=0), axis=0)
|
|
center2 = np.expand_dims(np.mean(points2, axis=0), axis=0)
|
|
points1 -= center1
|
|
points2 -= center2
|
|
|
|
std1 = np.std(points1)
|
|
std2 = np.std(points2)
|
|
points1 /= std1
|
|
points2 /= std2
|
|
|
|
U, _, V = np.linalg.svd(points1.T.dot(points2))
|
|
R = (U.dot(V)).T
|
|
trans = np.hstack(
|
|
((std2 / std1) * R, center2.T - ((std2 / std1) * R).dot(center1.T))
|
|
)
|
|
|
|
return trans
|
|
|
|
|
|
def crop_and_align(np_img, from_points, img_shape):
|
|
h, w = img_shape
|
|
trans = compute_transform(from_points, _SQUARE_TO_POINTS)
|
|
|
|
return cv2.warpAffine(np_img, trans, (w, h))
|
|
|
|
|
|
def normalize(exp_bbox, lmks):
|
|
x1, y1, x2, y2 = exp_bbox
|
|
return lmks * [x2 - x1, y2 - y1] + [x1, y1]
|
|
|
|
|
|
def expand_and_crop(np_img, p, bbox, original_size):
|
|
h, w = original_size
|
|
|
|
exp_bbox = expand_bb(bbox, p)
|
|
exp_edge_restricted_bbox = restrict_bbox_to_edges(h, w, exp_bbox)
|
|
|
|
crop_img = crop(np_img, exp_edge_restricted_bbox, h, w)
|
|
|
|
return exp_bbox, exp_edge_restricted_bbox, crop_img
|
|
|
|
|
|
def convert_to_absolute_coordinates(bbox, resized_size, original_size, fd_input_shape):
|
|
h, w = original_size
|
|
ratio = fd_input_shape / np.asarray(resized_size)
|
|
|
|
return (
|
|
bbox
|
|
* np.asarray([w, h, w, h])
|
|
* np.asarray([ratio[1], ratio[0], ratio[1], ratio[0]])
|
|
)
|
|
|
|
|
|
def sigmoid_transform(value, weight, bias):
|
|
return 1 / (1 + np.exp(-(weight * value + bias)))
|
|
|
|
|
|
def get_model_types(settings):
|
|
fd_model_type = settings.get("detection_model", "default")
|
|
lm_model_type = "default"
|
|
ql_model_type = "default"
|
|
fr_model_type = "default"
|
|
at_model_type = "default"
|
|
md_model_type = "default"
|
|
|
|
return (
|
|
fd_model_type,
|
|
lm_model_type,
|
|
ql_model_type,
|
|
fr_model_type,
|
|
at_model_type,
|
|
md_model_type,
|
|
)
|