yoloserv/modules/paravision/recognition/sdk.py

472 lines
16 KiB
Python

"""sdk: Instantiate the Paravision model."""
from typing import List, Optional, Sequence
import numpy as np
import warnings
import os
from ._internal import SplitGraph
from . import _utils as utils
from .types import (
BoundingBox,
Face,
Embedding,
InferenceResult,
ImageInferenceData,
Landmarks,
ScoringMode,
)
from .exceptions import InvalidInputException, InternalErrorException
from .engine import Engine
ERR_INVALID_EMB_MODE = "Invalid embedding scoring mode"
ERR_INVALID_EMB_PREPARED_IMAGE = "Invalid prepared image for embedding"
MATCH_SCORE_SCALE = 1000
ENHANCED_MATCH_SCORE_WEIGHT = 2.3
ENHANCED_MATCH_SCORE_BIAS = -0.5
STANDARD_MATCH_SCORE_WEIGHT = 2.1
STANDARD_MATCH_SCORE_BIAS = -5.3
class SDK:
"""
SDK()
A sdk object contains an instance of the Paravision model and its
associated resources.
SDK objects are long-living and do not need to be re-instantiated between
method calls.
"""
def __init__(
self,
models_dir: Optional[str] = None,
settings: Optional[dict] = None,
engine: Engine = Engine.AUTO,
):
"""Create a SDK instance."""
if settings is None:
settings = {}
if models_dir is None:
models_dir = str(utils.model_location())
if engine == Engine.AUTO:
engine = utils.match_engine()
elif engine == Engine.AUTO:
engine = utils.match_engine_given_path(models_dir)
if "attributes" not in settings:
settings["attributes"] = {"models_dir": models_dir}
if "mask" not in settings:
if os.path.isdir(os.path.join(models_dir, "mask")):
settings["mask"] = {"models_dir": os.path.join(models_dir, "mask")}
else:
try:
settings["mask"] = {"models_dir": utils.mask_model_location()}
except Exception:
# TODO: temp solution to silent SonarCloud, should update when logging is added.
settings.pop("mask", None)
self._graph = SplitGraph(models_dir, settings, engine=engine)
self._weight = utils.read_spec_value(models_dir, "weight")
self._bias = utils.read_spec_value(models_dir, "bias")
self._scoring_mode = settings.get("scoring_mode", ScoringMode.StandardEmbedding)
def get_faces(
self,
imgs: Sequence[np.ndarray],
qualities: bool = False,
landmarks: bool = False,
embeddings: bool = False,
) -> InferenceResult:
"""
Detect faces in the image.
Includes bounding boxes, landmarks, and [optionally] image quality
details.
Accepts a list of NumPy arrays (images).
Returns InferenceResult object.
"""
options = []
if landmarks is True:
options.append("find_landmarks")
if embeddings is True:
options.append("compute_embeddings")
if qualities is True:
options.append("get_qualities")
outputs, img_idxs = self._graph.run(imgs, self._scoring_mode, options)
faces = utils.build_faces(outputs)
image_inferences = []
for img in imgs:
height, width = img.shape[:2]
image_inferences.append(ImageInferenceData(width, height))
for img_idx, face in zip(img_idxs, faces):
image_inferences[img_idx].faces.append(face)
return InferenceResult(image_inferences)
def get_qualities(self, faces: Sequence[Face]) -> None:
"""
Get qualities for faces in the image.
Accepts a list of Face objects.
No return values. Updates the face objects in place with qualities.
"""
if len(faces) == 0:
return
imgs = [face.landmarks_input_image for face in faces]
qualities, acceptabilities = self._graph.get_qualities(imgs)
for face, quality, acceptability in zip(faces, qualities, acceptabilities):
face.quality = quality
face.acceptability = acceptability
def get_masks(self, faces: Sequence[Face]) -> None:
"""
Deprecated: This will be removed in the next major release. An Attributes SDK
will be provided in the future to replace functionality.
Get the mask probabilities for faces.
Accepts a list of faces.
No return values. Updates the face objects in place with mask probabilities.
"""
warnings.warn(
"""get_masks is deprecated and will be removed in the next major release.
An Attributes SDK will be provided in the future to replace functionality.""",
DeprecationWarning,
)
if len(faces) == 0:
return
mask_input_images = []
for face in faces:
if face.landmarks_input_image is None:
raise InvalidInputException(
"Face.landmarks_input_image is needed but is None"
)
mask_input_images.append(face.landmarks_input_image)
probability = self._graph.check_for_mask(mask_input_images)
for i, face in enumerate(faces):
face.mask = float(probability[i])
def get_bounding_boxes(self, imgs: Sequence[np.ndarray]) -> InferenceResult:
"""
Detect bounding boxes of faces in the image, returning a list of Faces.
Accepts a list of NumPy arrays (images).
Returns InferenceResult object.
"""
return self.get_faces(imgs)
def get_landmarks_from_bounding_boxes(
self, img: np.ndarray, bboxes: Sequence[BoundingBox]
) -> InferenceResult:
outputs = self._graph.run_from_landmarks(img, bboxes)
faces = utils.build_faces(outputs)
height, width = img.shape[:2]
image_inference = ImageInferenceData(width, height)
image_inference.faces.extend(faces)
return InferenceResult([image_inference])
def get_landmarks(self, faces: Sequence[Face]):
"""
Get the landmarks for faces.
Accepts a list of faces.
No return values. Updates the face objects in place with landmark values.
"""
if len(faces) == 0:
return
landmarks_input_bounding_boxes = []
landmarks_input_images = []
alignment_images = []
alignment_bounding_boxes = []
for face in faces:
if face.landmarks_input_image is None:
raise InvalidInputException("Face.landmarks_input_image is None.")
if face.landmarks_input_bounding_box is None:
raise InvalidInputException(
"Face.landmarks_input_bounding_box is None."
)
if face.alignment_image is None:
raise InvalidInputException("Face.alignment_image is None.")
if face.alignment_bounding_box is None:
raise InvalidInputException("Face.alignment_bounding_box is None.")
landmarks_input_images.append(face.landmarks_input_image)
landmarks_input_bounding_boxes.append(face.landmarks_input_bounding_box)
alignment_images.append(face.alignment_image)
alignment_bounding_boxes.append(face.alignment_bounding_box)
landmarks, recognition_input_images = self._graph.find_landmarks(
landmarks_input_bounding_boxes,
landmarks_input_images,
alignment_bounding_boxes,
alignment_images,
)
for i, face in enumerate(faces):
face.landmarks = Landmarks(*landmarks[i])
face.recognition_input_image = recognition_input_images[i]
def get_embeddings(self, faces: Sequence[Face]):
"""
Get embeddings for faces.
Accepts a list of Face objects.
No return values. Updates the face objects in place with embeddings.
"""
if len(faces) == 0:
return
recognition_input_images = []
for face in faces:
if face.recognition_input_image is None:
raise InvalidInputException("Face.recognition_input_image is None.")
recognition_input_images.append(face.recognition_input_image)
embeddings = self._graph.compute_embeddings(recognition_input_images)
for i, face in enumerate(faces):
face.embedding = Embedding(embeddings[i], self._scoring_mode)
def get_embeddings_from_landmarks(
self, image: np.ndarray, landmarks: Sequence[Landmarks]
) -> List[Embedding]:
recognition_input_images = [
utils.crop_and_align(
image, landmark.astuple(), self._graph.engine.fr_input_shape
)
for landmark in landmarks
]
return [
Embedding(data, self._scoring_mode)
for data in self._graph.compute_embeddings(recognition_input_images)
]
def get_embedding_from_prepared_image(
self, prepared_image: np.ndarray
) -> Embedding:
"""
Compute embedding using the prepared image i.e. recognition_input_image.
Accepts one prepared image.
Returns embedding.
"""
if prepared_image is None:
raise InvalidInputException(ERR_INVALID_EMB_PREPARED_IMAGE)
embeddings = self._graph.compute_embeddings([prepared_image])
return Embedding(embeddings[0], self._scoring_mode)
def get_attributes(self, faces: Sequence[Face]):
"""
Deprecated: This will be removed in the next major release. An Attributes SDK
will be provided in the future to replace functionality.
Computes age and gender attributes for faces.
Accepts a list of Face objects.
No return values. Updates the face objects in place with age and gender values.
"""
warnings.warn(
"""get_attributes is deprecated and will be removed in the next major release.
An Attributes SDK will be provided in the future to replace functionality.""",
DeprecationWarning,
)
if len(faces) == 0:
return
recognition_input_images = []
for face in faces:
if face.recognition_input_image is None:
raise InvalidInputException("Face.recognition_input_image is None.")
recognition_input_images.append(face.recognition_input_image)
ages, genders = self._graph.get_attributes(recognition_input_images)
for i, face in enumerate(faces):
face.ages = ages[i]
face.genders = genders[i]
@staticmethod
def _get_standard_score(emb1: Embedding, emb2: Embedding) -> float:
"""
Compute the difference score of two faces embeddings based on the Euclidean
distance between them. A larger number indicates a greater similarity between
the two embeddings; a lower number indicates a greater difference between the two embeddings.
Accepts 2 embedding objects. Assumes the scoring mode of the embeddings to be standard.
Returns a float between [0, 4]. If both embeddings are not in standard scoring mode,
an InvalidInputException is thrown.
"""
if (
emb1.scoring_mode != ScoringMode.StandardEmbedding
or emb1.scoring_mode != emb2.scoring_mode
):
raise InvalidInputException(ERR_INVALID_EMB_MODE)
score = 4 - np.sum((emb1.data - emb2.data) ** 2)
return float(np.clip(score, 0, 4))
@staticmethod
def _get_enhanced_score(emb1: Embedding, emb2: Embedding) -> float:
"""
Compute quality-aware score between two face embeddings. A larger number indicates a
greater similarity between the two embeddings; a lower number indicates a
greater difference between the two embeddings.
Accepts 2 embedding vectors.
Returns a float between [0, 2]. If both embeddings are not in enhanced scoring mode,
an InvalidInputException is thrown.
"""
if (
emb1.scoring_mode != ScoringMode.EnhancedEmbedding
or emb1.scoring_mode != emb2.scoring_mode
):
raise InvalidInputException(ERR_INVALID_EMB_MODE)
base_emb1, uncertainty1 = emb1.data[:-1], emb1.data[-1]
base_emb2, uncertainty2 = emb2.data[:-1], emb2.data[-1]
total_uncertainty = uncertainty1 + uncertainty2
if total_uncertainty < 0:
raise InternalErrorException("Uncertainty values cannot be negative.")
attention = 2 * (1 - base_emb1 @ base_emb2) / (1e-10 + total_uncertainty)
dist = attention + np.log(1e-10 + total_uncertainty)
score = np.exp(-dist)
return float(np.clip(score, 0, 2))
@staticmethod
def get_similarity(emb1: Embedding, emb2: Embedding) -> float:
"""
Compute the difference score of two faces embeddings. A larger number indicates a
greater similarity between the two embeddings; a lower number indicates a
greater difference between the two embeddings.
Accepts 2 embedding objects.
Returns a float between [0, 2] for enhanced mode or [0, 4] for standard mode.
If either of the embeddings is None, or if the embeddings are of different
sizes, or if the embeddings have different scoring_method, raises InvalidInputException
"""
if not (
isinstance(emb1, Embedding)
and isinstance(emb2, Embedding)
and len(emb1.data) == len(emb2.data)
):
raise InvalidInputException("Invalid input embedding")
if emb1.scoring_mode != emb2.scoring_mode:
raise InvalidInputException("Scoring mode mismatch for input embeddings")
if emb1.scoring_mode == ScoringMode.EnhancedEmbedding:
score = SDK._get_enhanced_score(emb1, emb2)
elif emb1.scoring_mode == ScoringMode.StandardEmbedding:
score = SDK._get_standard_score(emb1, emb2)
else:
raise InvalidInputException(ERR_INVALID_EMB_MODE)
return score
@staticmethod
def get_match_score(emb1: Embedding, emb2: Embedding) -> int:
"""
Compute the difference score of two faces embeddings. A larger number indicates a
greater similarity between the two embeddings; a lower number indicates a
greater difference between the two embeddings.
Accepts 2 embedding objects.
Returns a int between [0, 1000]. If either of the embeddings is None,
or if the embeddings are of different sizes, or if the embeddings
have different scoring_method, raises InvalidInputException
"""
similarity = SDK.get_similarity(emb1, emb2)
match_score = -1
if emb1.scoring_mode == ScoringMode.EnhancedEmbedding:
match_score = round(
utils.sigmoid_transform(
similarity, ENHANCED_MATCH_SCORE_WEIGHT, ENHANCED_MATCH_SCORE_BIAS
)
* MATCH_SCORE_SCALE
)
elif emb1.scoring_mode == ScoringMode.StandardEmbedding:
match_score = round(
utils.sigmoid_transform(
similarity, STANDARD_MATCH_SCORE_WEIGHT, STANDARD_MATCH_SCORE_BIAS
)
* MATCH_SCORE_SCALE
)
else:
raise InvalidInputException(ERR_INVALID_EMB_MODE)
return int(np.clip(match_score, 0, 1000))
def get_confidence(self, emb1: Embedding, emb2: Embedding) -> float:
"""
Deprecated: This will be removed in the next major release. Use the
get_match_score or get_similarity functions instead.
Compute the probability of two faces being the same using the standard mode.
Accepts 2 embedding objects.
Returns a float between [0, 1]. If either of the embeddings is None,
or if the embeddings are of different sizes, or if the embeddings
have different scoring_method, raises InvalidInputException
"""
warnings.warn(
"""get_confidence is deprecated and will be removed in the next major release.
Use the get_match_score or get_similarity functions instead.""",
DeprecationWarning,
)
if emb1 is not None and emb1.scoring_mode == ScoringMode.EnhancedEmbedding:
emb1 = Embedding(emb1.data, ScoringMode.StandardEmbedding)
if emb2 is not None and emb2.scoring_mode == ScoringMode.EnhancedEmbedding:
emb2 = Embedding(emb2.data, ScoringMode.StandardEmbedding)
score = self.get_similarity(emb1, emb2)
return float(utils.sigmoid_transform(score, self._weight, self._bias))