yoloserv changes for editing going to remove camera live_stream

This commit is contained in:
ox 2026-01-06 14:29:53 -04:00
parent 3c4cbfd300
commit 1737999c86
127 changed files with 3518 additions and 71881 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
# Ignore these folders
src/__pycache__/
var/

41
doc/para.md Normal file
View File

@ -0,0 +1,41 @@
## Paravision
#### Creds
Username: 2a76e3b5-733a-4c93-98fb-339927b0f90c
Passwd: 453a6a2d-2935-430b-a22f-9b2880ca381
#### Setup
Like most things the paravision docs do not work in following setup. Below is how I was able to
install models and setup my enviorment.
##### To Install a Model
To install a model we ran ~$ pip3 install --no-cache-dir --timeout 60 --no-deps --extra-index-url https://2a76e3b5-733a-4c93-98fb-339927b0f90c:453a6a2d-2935-430b-a22f-9b2880ca3818@paravision.mycloudrepo.io/repositories/python-recognition "paravision-recognition" "paravision-models-gen6-balanced-openvino-2022-3"
by visting the URL you can view what models are avaliable.
##### To Install the Liveness
First we must install the Liveness SDK:
pip3 install --no-cache-dir \
--extra-index-url https://2a76e3b5-733a-4c93-98fb-339927b0f90c:453a6a2d-2935-430b-a22f-9b2880ca3818@paravision.mycloudrepo.io/repositories/python-liveness2d \
"paravision-liveness2d-gen6==2.1.1"
Then we must install the Liveness models:
https://2a76e3b5-733a-4c93-98fb-339927b0f90c:453a6a2d-2935-430b-a22f-9b2880ca3818@paravision.mycloudrepo.io/repositories/python-liveness2d "paravision-models-gen6-liveness2d-openvino-2022-3"
Then we must install the Validness models:
pip3 install --no-cache-dir \
--extra-index-url https://2a76e3b5-733a-4c93-98fb-339927b0f90c:453a6a2d-2935-430b-a22f-9b2880ca3818@paravision.mycloudrepo.io/repositories/python-validness \
"paravision-models-gen6-validness-openvino-2022-3==1.0.0"
pip3 install --no-cache-dir --timeout 60 \
--extra-index-url https://2a76e3b5-733a-4c93-98fb-339927b0f90c:453a6a2d-2935-430b-a22f-9b2880ca3818@paravision.mycloudrepo.io/repositories/python-recognition \
--extra-index-url https://2a76e3b5-733a-4c93-98fb-339927b0f90c:453a6a2d-2935-430b-a22f-9b2880ca3818@paravision.mycloudrepo.io/repositories/python-libs \
"paravision-recognition[openvino]" "paravision-models-gen6-balanced-openvino-2022-3"

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
<ie><plugins><plugin name="CPU" location="libopenvino_intel_cpu_plugin.so"></plugin></plugins></ie>

View File

@ -0,0 +1,6 @@
try:
from .session import Liveness, CameraParams # noqa
except Exception:
pass
__version__ = "7.0.0"

View File

@ -0,0 +1,7 @@
class InvalidWindowSizeException(Exception):
def __init__(self, message):
self.message = message
class InvalidSpecError(Exception):
pass

View File

@ -0,0 +1,64 @@
import pyrealsense2 as rs
import cv2
import numpy as np
from .tensorrt.engine import Engine
from .utils import (
estimate_depth_bounding_box,
expand_bbox_to_edge_and_crop,
model_location,
)
from .exceptions import InvalidWindowSizeException
# validity constants
WINDOW_SIZE = 5
class CameraParams(object):
def __init__(self, depth_intr, color_intr, color_to_depth_extr):
self.depth_intr = depth_intr
self.color_intr = color_intr
self.color_to_depth_extr = color_to_depth_extr
class CameraParams(object):
def __init__(self, depth_intr, color_intr, color_to_depth_extr):
self.depth_intr = depth_intr
self.color_intr = color_intr
self.color_to_depth_extr = color_to_depth_extr
class Liveness(object):
def __init__(self, model_path=None, settings={}):
if model_path is None:
model_path = model_location()
self.predictor = Engine(model_path, settings)
def load_depth_data_from_file(self, file_path):
return np.loadtxt(file_path, dtype=np.int16, delimiter=",")
def write_depth_data_to_file(self, file_path, depth_data):
np.savetxt(file_path, depth_data, fmt="%d", delimiter=",")
def crop_depth_frame(self, camera_params, depth_frame, bounding_box):
if camera_params is None or depth_frame is None or bounding_box is None:
raise Exception("Invalid input arguments")
proj_depth_bb = estimate_depth_bounding_box(bounding_box, camera_params)
cropped_depth_frame = expand_bbox_to_edge_and_crop(depth_frame, proj_depth_bb)
return cropped_depth_frame
def compute_liveness_probability(self, depth_imgs):
if len(depth_imgs) != WINDOW_SIZE:
raise InvalidWindowSizeException("Windows size must equal 5")
resized = [
cv2.resize(
depth_img.astype(np.float32),
(self.predictor.input_shape[1], self.predictor.input_shape[0]),
)
for depth_img in depth_imgs
]
frame_probs = self.predictor.predict(resized)
return np.mean(frame_probs)

View File

@ -0,0 +1,114 @@
import os
import cv2
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit # noqa
import tensorrt as trt
from .utils import do_inference, allocate_buffers, GiB
from ..utils import _read_spec_value
LOGGER = trt.Logger(trt.Logger.Severity.ERROR)
DEFAULT_MAX_BATCH_SIZE = 1
class Engine(object):
def __init__(self, model_path, settings):
self.stream = cuda.Stream()
self.input_shape = _read_spec_value(model_path, "input_shape")
self.engine = self._load_engine(model_path, settings)
self.context = self.engine.create_execution_context()
(self.inputs, self.outputs, self.data, self.bindings) = allocate_buffers(
self.engine
)
def _load_engine(self, model_path, settings):
engine_dirpath = model_path
try:
import paravision_models.liveness
if paravision_models.liveness.location() == model_path:
engine_dirpath = paravision_models.liveness.TRT_ENGINE_PATH
except (ImportError, NameError, AttributeError):
pass
runtime = trt.Runtime(LOGGER)
engine_path = "{}/liveness.engine".format(engine_dirpath)
if os.path.isfile(engine_path) is False:
return self._build_engine(model_path, engine_path, settings)
with open(engine_path, "rb") as f:
return runtime.deserialize_cuda_engine(f.read())
def _build_engine(self, model_path, engine_path, settings):
model_file = "{}/liveness.onnx".format(model_path)
max_batch_size = settings.get("max_batch_size", DEFAULT_MAX_BATCH_SIZE)
trt_version = int(trt.__version__.split(".")[0])
if trt_version >= 7:
input_shape = [max_batch_size, 3] + list(self.input_shape)
net_flags = 1 << (int)(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
elif trt_version == 6:
input_shape = [3] + list(self.input_shape)
net_flags = 0
else:
raise Exception("TensorRT version 6 or higher required to build engine")
if os.path.isfile(model_file) is False:
raise Exception("No model found at {}".format(model_file))
with open(model_file, "rb") as f:
model = f.read()
with trt.Builder(LOGGER) as builder, builder.create_network(
net_flags
) as network, trt.OnnxParser(network, LOGGER) as parser:
builder.max_workspace_size = GiB(1)
builder.max_batch_size = max_batch_size
if not parser.parse(model):
raise Exception("Cannot parse liveness model.")
network.get_input(0).shape = input_shape
engine = builder.build_cuda_engine(network)
serialized = engine.serialize()
if serialized is None:
raise Exception("Cannot serialize engine")
with open(engine_path, "wb") as f:
f.write(serialized)
return engine
def predict(self, exp_bb_depth_imgs):
max_batch_size = self.engine.max_batch_size
live_probs = []
for i in range(0, len(exp_bb_depth_imgs), max_batch_size):
batch = exp_bb_depth_imgs[
i : min(len(exp_bb_depth_imgs), i + max_batch_size)
]
probs_batch = self._batch_predict(batch)
live_probs.extend(probs_batch)
return live_probs
def _batch_predict(self, np_imgs):
stacked = [np.stack([np_img for _ in range(3)], axis=0) for np_img in np_imgs]
np_imgs = np.asarray(stacked, dtype=np.float32)
results = do_inference(
self.context,
bindings=self.bindings,
inputs=self.inputs,
input_data=[np_imgs.ravel()],
outputs=self.outputs,
output_data=self.data,
stream=self.stream,
)
# grab every other value to return the live probabilities
return results[0][0 : 2 * len(np_imgs) : 2]

View File

@ -1,21 +1,28 @@
import numpy as np
import pycuda.driver as cuda
import tensorrt as trt
from collections import defaultdict
DTYPES = defaultdict(lambda: np.float32)
DTYPES["num_detections"] = np.int32
DTYPES["NMS_1"] = np.int32
def GiB(val):
return val * 1 << 30
def do_inference(
context, bindings, inputs, input_data, outputs, output_data, stream, batch_size=1
):
flattened_input_data = [input_data.ravel()]
for i, input_ in enumerate(inputs):
cuda.memcpy_htod_async(input_, flattened_input_data[i], stream)
[
cuda.memcpy_htod_async(input, input_data[i], stream)
for i, input in enumerate(inputs)
]
context.set_binding_shape(0, input_data.shape)
context.execute_async_v2(bindings=bindings, stream_handle=stream.handle)
context.execute_async(
bindings=bindings, stream_handle=stream.handle, batch_size=batch_size
)
data = []
@ -39,7 +46,7 @@ def allocate_buffers(engine):
for binding in engine:
shape = engine.get_binding_shape(binding)
size = calculate_volume(shape)
size = trt.volume(shape)
dtype = DTYPES[str(binding)]
host_mem = (size, dtype)
device_mem = cuda.mem_alloc(size * engine.max_batch_size * dtype().itemsize)
@ -53,14 +60,3 @@ def allocate_buffers(engine):
data.append(host_mem)
return inputs, outputs, data, bindings
def calculate_volume(shape):
volume = 1
for dim in shape:
# -1 indicates dynamic batching
if dim == -1:
continue
volume *= dim
return volume

View File

@ -0,0 +1,136 @@
import os
import pyrealsense2 as rs
import numpy as np
from unittest import TestCase
from ..session import Liveness, CameraParams
from ..types import Rectangle
from ..exceptions import InvalidWindowSizeException
ASSETS_PATH = os.path.join(os.path.dirname(__file__), "assets")
liveness_session = None
class TestSession(TestCase):
@classmethod
def setUpClass(cls):
global liveness_session
liveness_session = Liveness()
def setUp(self):
self.liveness_session = liveness_session
def test_crop_depth_frame(self):
bounding_box = Rectangle(
528.551139831543, 234.36917863815668, 839.0621948242188, 642.2044240250417
)
depth_intrinsic = rs.intrinsics()
depth_intrinsic.width = 1280
depth_intrinsic.height = 720
depth_intrinsic.ppx = 640.387
depth_intrinsic.ppy = 357.513
depth_intrinsic.fx = 635.811
depth_intrinsic.fy = 635.811
depth_intrinsic.model = rs.distortion.brown_conrady
depth_intrinsic.coeffs = [0, 0, 0, 0, 0]
color_intrinsic = rs.intrinsics()
color_intrinsic.width = 1280
color_intrinsic.height = 720
color_intrinsic.ppx = 647.024
color_intrinsic.ppy = 356.927
color_intrinsic.fx = 922.169
color_intrinsic.fy = 922.476
color_intrinsic.model = rs.distortion.inverse_brown_conrady
color_intrinsic.coeffs = [0, 0, 0, 0, 0]
color_depth_extr = rs.extrinsics()
color_depth_extr.rotation = [
0.999945,
0.0103263,
0.00163071,
-0.0103348,
0.999932,
0.00530964,
-0.00157577,
-0.0053262,
0.999985,
]
color_depth_extr.translation = [-0.0147758, -0.000159923, -0.000372309]
camera_params = CameraParams(depth_intrinsic, color_intrinsic, color_depth_extr)
depth_frame = liveness_session.load_depth_data_from_file(
os.path.join(ASSETS_PATH, "depth.txt")
)
cropped_depth_frame = liveness_session.crop_depth_frame(
camera_params, depth_frame, bounding_box
)
expected_cropped_depth_frame = liveness_session.load_depth_data_from_file(
os.path.join(ASSETS_PATH, "cropped_depth.txt")
)
self.assertIsNotNone(cropped_depth_frame, msg="unexpected cropped depth frame")
self.assertTrue(
np.array_equal(cropped_depth_frame, expected_cropped_depth_frame),
msg="invalid cropped depth frame",
)
def test_crop_depth_frame_invalid_camera_params(self):
depth_frame = liveness_session.load_depth_data_from_file(
os.path.join(ASSETS_PATH, "depth.txt")
)
bounding_box = Rectangle(
528.551139831543, 234.36917863815668, 839.0621948242188, 642.2044240250417
)
self.assertRaises(
Exception,
liveness_session.crop_depth_frame,
None,
depth_frame,
bounding_box,
)
def test_crop_depth_frame_invalid_depth_frame(self):
camera_params = CameraParams(None, None, None)
depth_frame = None
bounding_box = Rectangle(1.2, 1.2, 1.2, 1.2)
self.assertRaises(
Exception,
liveness_session.crop_depth_frame,
camera_params,
depth_frame,
bounding_box,
)
def test_crop_depth_frame_invalid_bounding_box(self):
camera_params = CameraParams(None, None, None)
depth_frame = liveness_session.load_depth_data_from_file(
os.path.join(ASSETS_PATH, "depth.txt")
)
bounding_box = None
self.assertRaises(
Exception,
liveness_session.crop_depth_frame,
camera_params,
depth_frame,
bounding_box,
)
def test_engine_invalid_window_size(self):
depth_frame = liveness_session.load_depth_data_from_file(
os.path.join(ASSETS_PATH, "depth.txt")
)
self.assertRaises(
InvalidWindowSizeException,
liveness_session.compute_liveness_probability,
[depth_frame],
)
def test_engine_valid_window_size(self):
depth_frame = liveness_session.load_depth_data_from_file(
os.path.join(ASSETS_PATH, "depth.txt")
)
prob = liveness_session.compute_liveness_probability([depth_frame] * 5)
self.assertTrue(prob >= 0 and prob <= 1)

View File

@ -0,0 +1,100 @@
import os
import pyrealsense2 as rs
import numpy as np
from unittest import TestCase
from ..session import Liveness, CameraParams
from ..utils import estimate_depth_bounding_box, expand_bbox_to_edge_and_crop
from ..types import Rectangle
ASSETS_PATH = os.path.join(os.path.dirname(__file__), "assets")
liveness_session = None
expected_projected_bounding_box = [
514.62513733,
267.85284424,
726.44473267,
551.44799805,
]
class TestUtils(TestCase):
@classmethod
def setUpClass(cls):
global liveness_session
liveness_session = Liveness()
def setUp(self):
self.liveness_session = liveness_session
def test_estimate_depth_bounding_box(self):
bounding_box = Rectangle(
528.551139831543, 234.36917863815668, 839.0621948242188, 642.2044240250417
)
depth_intrinsic = rs.intrinsics()
depth_intrinsic.width = 1280
depth_intrinsic.height = 720
depth_intrinsic.ppx = 640.387
depth_intrinsic.ppy = 357.513
depth_intrinsic.fx = 635.811
depth_intrinsic.fy = 635.811
depth_intrinsic.model = rs.distortion.brown_conrady
depth_intrinsic.coeffs = [0, 0, 0, 0, 0]
color_intrinsic = rs.intrinsics()
color_intrinsic.width = 1280
color_intrinsic.height = 720
color_intrinsic.ppx = 647.024
color_intrinsic.ppy = 356.927
color_intrinsic.fx = 922.169
color_intrinsic.fy = 922.476
color_intrinsic.model = rs.distortion.inverse_brown_conrady
color_intrinsic.coeffs = [0, 0, 0, 0, 0]
color_depth_extr = rs.extrinsics()
color_depth_extr.rotation = [
0.999945,
0.0103263,
0.00163071,
-0.0103348,
0.999932,
0.00530964,
-0.00157577,
-0.0053262,
0.999985,
]
color_depth_extr.translation = [-0.0147758, -0.000159923, -0.000372309]
camera_params = CameraParams(depth_intrinsic, color_intrinsic, color_depth_extr)
proj_depth_bb = estimate_depth_bounding_box(bounding_box, camera_params)
self.assertIsNotNone(
proj_depth_bb, msg="unexpected or none projected depth bounding box"
)
self.assertTrue(
np.allclose(proj_depth_bb, expected_projected_bounding_box),
msg="invalid projected depth bounding box",
)
def test_expand_bbox_to_edge_and_crop(self):
depth_frame = liveness_session.load_depth_data_from_file(
os.path.join(ASSETS_PATH, "depth.txt")
)
expanded_cropped_frame = expand_bbox_to_edge_and_crop(
depth_frame, expected_projected_bounding_box
)
expected_cropped_depth_frame = liveness_session.load_depth_data_from_file(
os.path.join(ASSETS_PATH, "cropped_depth.txt")
)
self.assertIsNotNone(
expanded_cropped_frame, msg="unexpected cropped depth frame"
)
self.assertTrue(
np.array_equal(expanded_cropped_frame, expected_cropped_depth_frame),
msg="invalid cropped depth frame",
)

View File

@ -0,0 +1,91 @@
import base64
import cv2
import numpy as np
class Point(object):
"""
A point within an image, represented by x- and y-coordinates.
Attributes
----------
x : int
The x-coordinate.
y : int
The y-coordinate.
"""
def __init__(self, x, y):
self.x = int(x)
self.y = int(y)
def __repr__(self):
return "<Point (%d, %d)>" % (self.x, self.y)
def __str__(self):
return "(%d, %d)" % (self.x, self.y)
def as_dict(self):
"""Convert this object to a dictionary"""
return {"x": self.x, "y": self.y}
def todict(self):
return self.as_dict()
def tolist(self):
"""Convert this object to a list"""
return [self.x, self.y]
class Rectangle(object):
"""
A rectangle, represented by top-left and bottom-right Points.
Attributes
----------
top_left : Point
The top-left corner of the rectangle.
bottom_right : Point
The bottom-right corner of the rectangle.
"""
def __init__(self, x1, y1, x2, y2):
self.top_left = Point(x1, y1)
self.bottom_right = Point(x2, y2)
def __repr__(self):
return "<Rectangle (top_left=%s, bottom_right=%s)>" % (
repr(self.top_left),
repr(self.bottom_right),
)
def __str__(self):
return "(%s, %s)" % (str(self.top_left), str(self.bottom_right))
def as_dict(self):
"""Convert this object to a dictionary"""
return {
"top_left": self.top_left.as_dict(),
"bottom_right": self.bottom_right.as_dict(),
}
def todict(self):
return self.as_dict()
def tolist(self):
"""Convert this object to a list"""
return [
self.top_left.x,
self.top_left.y,
self.bottom_right.x,
self.bottom_right.y,
]
def width(self):
"""Get the width of the Rectangle."""
return self.bottom_right.x - self.top_left.x
def height(self):
"""Get the height of the Rectangle."""
return self.bottom_right.y - self.top_left.y

View File

@ -0,0 +1,100 @@
import numpy as np
import json
import pyrealsense2 as rs
from .exceptions import InvalidSpecError
DEPTH_MIN = 0.11
DEPTH_MAX = 10.0
EXPANSION_FACTOR = 0.7
def _read_spec_value(model_loc, key):
try:
with open("{}/spec.json".format(model_loc), "r") as f:
spec = json.load(f)
return spec[key]
except (FileNotFoundError, KeyError):
raise InvalidSpecError("Invalid spec file. Try upgrading your model.")
def model_location():
try:
import paravision_models.liveness
return paravision_models.liveness.location()
except ImportError:
raise ImportError("You need to install Paravision Liveness Models package")
def expand_bbox_to_edge_and_crop(depth_frame, proj_depth_bb):
h, w = depth_frame.shape[:2]
exp_bbox = _expand_bbox_to_edges(h, w, proj_depth_bb)
cropped = _crop(depth_frame, exp_bbox)
return cropped
def estimate_depth_bounding_box(bb, camera_params):
height = camera_params.depth_intr.height
width = camera_params.depth_intr.width
left_corner = [bb.top_left.x, bb.top_left.y]
right_corner = [bb.bottom_right.x, bb.bottom_right.y]
left_depth_corner = _compute_epipolar_midpoint(
height, width, left_corner, camera_params
)
right_depth_corner = _compute_epipolar_midpoint(
height, width, right_corner, camera_params
)
proj_bb = np.hstack([left_depth_corner, right_depth_corner])
return proj_bb
def _get_depth_point(height, width, pt, scale, camera_params):
color_world_pt = rs.rs2_deproject_pixel_to_point(
camera_params.color_intr, pt, scale
)
depth_world_pt = rs.rs2_transform_point_to_point(
camera_params.color_to_depth_extr, color_world_pt
)
depth_pt = rs.rs2_project_point_to_pixel(camera_params.depth_intr, depth_world_pt)
depth_pt = rs.adjust_2D_point_to_boundary(depth_pt, width, height)
return depth_pt
def _compute_epipolar_midpoint(height, width, pt, camera_params):
# define depth endpoints of epipolar line to search
start_depth_pt = _get_depth_point(height, width, pt, DEPTH_MIN, camera_params)
end_depth_pt = _get_depth_point(height, width, pt, DEPTH_MAX, camera_params)
mid_pt = (np.array(start_depth_pt) + np.array(end_depth_pt)) / 2
return mid_pt
def _expand_bbox_to_edges(h, w, bbox):
x1, y1, x2, y2 = bbox
dx = (x2 - x1) * EXPANSION_FACTOR / 2
dy = (y2 - y1) * EXPANSION_FACTOR / 2
x1_ = max(0, x1 - dx)
y1_ = max(0, y1 - dy)
x2_ = min(w, x2 + dx)
y2_ = min(h, y2 + dy)
return _round(np.array([x1_, y1_, x2_, y2_]))
def _crop(np_img, bbox):
bbox = _round(bbox)
x1, y1, x2, y2 = bbox
h, w = np_img.shape[:2]
x1 = max(x1, 0)
y1 = max(y1, 0)
x2 = min(x2, w)
y2 = min(y2, h)
return np_img[y1:y2, x1:x2]
def _round(bbox):
return np.rint(bbox).astype(np.int32)

View File

@ -0,0 +1,15 @@
from __future__ import annotations
import paravision.liveness2d
import typing
from .types import Engine
from .sdk import SDK
__all__ = [
"Engine",
"SDK",
"sdk",
"types"
]
__version__ = 'dev'

View File

@ -0,0 +1,113 @@
from __future__ import annotations
import paravision.liveness2d.sdk
import typing
import paravision.liveness2d.types
import paravision.recognition.sdk
import paravision.recognition.types
__all__ = [
"Metadata",
"SDK"
]
class Metadata():
def __init__(self) -> None: ...
@property
def engine(self) -> str:
"""
The engine or accelerator of the Liveness2D SDK instance being used.
:type: str
"""
@engine.setter
def engine(self, arg0: str) -> None:
"""
The engine or accelerator of the Liveness2D SDK instance being used.
"""
@property
def engine_version(self) -> str:
"""
The version of the engine or accelerator being used.
:type: str
"""
@engine_version.setter
def engine_version(self, arg0: str) -> None:
"""
The version of the engine or accelerator being used.
"""
@property
def generation(self) -> int:
"""
The generation of the Liveness2D SDK.
:type: int
"""
@generation.setter
def generation(self, arg0: int) -> None:
"""
The generation of the Liveness2D SDK.
"""
@property
def model_version(self) -> str:
"""
The version of the Liveness2D models.
:type: str
"""
@model_version.setter
def model_version(self, arg0: str) -> None:
"""
The version of the Liveness2D models.
"""
@property
def sdk_version(self) -> str:
"""
The version of the Liveness2D SDK.
:type: str
"""
@sdk_version.setter
def sdk_version(self, arg0: str) -> None:
"""
The version of the Liveness2D SDK.
"""
pass
class SDK():
"""
SDK()
A sdk object contains an instance of the Paravision model and its
associated resources.
SDK objects are long-living and do not need to be re-instantiated between
method calls.
"""
def __init__(self, models_dir: typing.Optional[str] = None, settings: typing.Optional[paravision.liveness2d.types.Settings] = None) -> None:
"""
Create a new SDK instance.
"""
def check_validness(self, face: paravision.recognition.types.Face, validness_settings: paravision.liveness2d.types.ValidnessSettings, reco_sdk: paravision.recognition.sdk.SDK) -> paravision.recognition.types.ValidnessResult:
"""
Check if a face is valid for liveness inference.
"""
def get_liveness(self, face: paravision.recognition.types.Face) -> paravision.liveness2d.types.LivenessResult:
"""
Get the liveness of a face.
"""
@staticmethod
def get_metadata(models_dir: typing.Optional[str] = None) -> Metadata:
"""
Returns metadata for SDK and model info.
"""
pass

View File

@ -0,0 +1,383 @@
from __future__ import annotations
import paravision.liveness2d.types
import typing
import paravision.recognition.types
__all__ = [
"Engine",
"GPUConfig",
"LivenessResult",
"Settings",
"ValidnessSettings"
]
class Engine():
"""
Members:
AUTO : Automatically select the engine
OPENVINO : Use the OpenVINO engine
TENSORRT : Use the TensorRT engine
"""
def __eq__(self, other: object) -> bool: ...
def __getstate__(self) -> int: ...
def __hash__(self) -> int: ...
def __index__(self) -> int: ...
def __init__(self, value: int) -> None: ...
def __int__(self) -> int: ...
def __ne__(self, other: object) -> bool: ...
def __repr__(self) -> str: ...
def __setstate__(self, state: int) -> None: ...
def __str__(self) -> str: ...
@property
def name(self) -> str:
"""
:type: str
"""
@property
def value(self) -> int:
"""
:type: int
"""
AUTO: paravision.liveness2d.types.Engine # value = <Engine.AUTO: 0>
OPENVINO: paravision.liveness2d.types.Engine # value = <Engine.OPENVINO: 1>
TENSORRT: paravision.liveness2d.types.Engine # value = <Engine.TENSORRT: 2>
__members__: dict # value = {'AUTO': <Engine.AUTO: 0>, 'OPENVINO': <Engine.OPENVINO: 1>, 'TENSORRT': <Engine.TENSORRT: 2>}
pass
class GPUConfig():
def __init__(self) -> None: ...
@property
def gpu_id(self) -> int:
"""
The index of the GPU device to use.
:type: int
"""
@gpu_id.setter
def gpu_id(self, arg0: int) -> None:
"""
The index of the GPU device to use.
"""
@property
def worker_count(self) -> int:
"""
The number of workers to allocate.
:type: int
"""
@worker_count.setter
def worker_count(self, arg0: int) -> None:
"""
The number of workers to allocate.
"""
pass
class LivenessResult():
"""
LivenessResult()
A LivenessResult object contains the result of a liveness2d inference run on a face.
Attributes
----------
live_probability : float
The probability that the face is real.
spoof_probability : float
The probability that the face is a spoof.
"""
@typing.overload
def __init__(self) -> None:
"""
Construct an empty LivenessResult
"""
@typing.overload
def __init__(self, live_prob: float, spoof_prob: float) -> None:
"""
Constructor a LiveFaceResult with the given probabilities
"""
def __repr__(self) -> str: ...
def asdict(self) -> dict:
"""
Convert this object to a dictionary
"""
@property
def live_prob(self) -> float:
"""
The probability that the face is real
:type: float
"""
@property
def spoof_prob(self) -> float:
"""
The probability that the face is a spoof
:type: float
"""
pass
class Settings():
def __init__(self) -> None: ...
@property
def cache_generated_engine(self) -> bool:
"""
Cache generated engine for TensorRT only
:type: bool
"""
@cache_generated_engine.setter
def cache_generated_engine(self, arg0: bool) -> None:
"""
Cache generated engine for TensorRT only
"""
@property
def engine(self) -> Engine:
"""
Engine to use
:type: Engine
"""
@engine.setter
def engine(self, arg0: Engine) -> None:
"""
Engine to use
"""
@property
def gpu_configs(self) -> list[GPUConfig]:
"""
List of GPU configs for worker allocation in multiple GPUs
:type: list[GPUConfig]
"""
@gpu_configs.setter
def gpu_configs(self, arg0: list[GPUConfig]) -> None:
"""
List of GPU configs for worker allocation in multiple GPUs
"""
@property
def openvino_threads_limit(self) -> int:
"""
Thread limit for OpenVINO
:type: int
"""
@openvino_threads_limit.setter
def openvino_threads_limit(self, arg0: int) -> None:
"""
Thread limit for OpenVINO
"""
@property
def tensorrt_engine_cache_path(self) -> str:
"""
Directory of where serialized model files are stored
:type: str
"""
@tensorrt_engine_cache_path.setter
def tensorrt_engine_cache_path(self, arg0: str) -> None:
"""
Directory of where serialized model files are stored
"""
@property
def use_cached_engine(self) -> bool:
"""
Use cached engine for TensorRT only
:type: bool
"""
@use_cached_engine.setter
def use_cached_engine(self, arg0: bool) -> None:
"""
Use cached engine for TensorRT only
"""
@property
def worker_count(self) -> int:
"""
The number of workers for inference
:type: int
"""
@worker_count.setter
def worker_count(self, arg0: int) -> None:
"""
The number of workers for inference
"""
pass
class ValidnessSettings():
def __init__(self, face: paravision.recognition.types.Face) -> None:
"""
Construct ValidnessSettings instance.
"""
@property
def fail_fast(self) -> bool:
"""
Option to early return if any check fails
:type: bool
"""
@fail_fast.setter
def fail_fast(self, arg0: bool) -> None:
"""
Option to early return if any check fails
"""
@property
def image_boundary_height_pct(self) -> float:
"""
The percentage of image height the face is permitted
:type: float
"""
@image_boundary_height_pct.setter
def image_boundary_height_pct(self, arg0: float) -> None:
"""
The percentage of image height the face is permitted
"""
@property
def image_boundary_width_pct(self) -> float:
"""
The percentage of image width the face is permitted
:type: float
"""
@image_boundary_width_pct.setter
def image_boundary_width_pct(self, arg0: float) -> None:
"""
The percentage of image width the face is permitted
"""
@property
def image_illumination_control(self) -> int:
"""
The image illumination control
:type: int
"""
@image_illumination_control.setter
def image_illumination_control(self, arg0: int) -> None:
"""
The image illumination control
"""
@property
def max_face_mask_prob(self) -> float:
"""
The max probability the face has a mask
:type: float
"""
@max_face_mask_prob.setter
def max_face_mask_prob(self, arg0: float) -> None:
"""
The max probability the face has a mask
"""
@property
def max_face_roll_angle(self) -> int:
"""
The maximum roll angle allowed of the face.
:type: int
"""
@max_face_roll_angle.setter
def max_face_roll_angle(self, arg0: int) -> None:
"""
The maximum roll angle allowed of the face.
"""
@property
def max_face_size_pct(self) -> float:
"""
The max percentage of the face size relative to the image boundaries
:type: float
"""
@max_face_size_pct.setter
def max_face_size_pct(self, arg0: float) -> None:
"""
The max percentage of the face size relative to the image boundaries
"""
@property
def min_face_acceptability(self) -> float:
"""
The minimum face acceptability threshold
:type: float
"""
@min_face_acceptability.setter
def min_face_acceptability(self, arg0: float) -> None:
"""
The minimum face acceptability threshold
"""
@property
def min_face_frontality(self) -> int:
"""
The minimum face frontality threshold
:type: int
"""
@min_face_frontality.setter
def min_face_frontality(self, arg0: int) -> None:
"""
The minimum face frontality threshold
"""
@property
def min_face_quality(self) -> float:
"""
The minimum image quality threshold
:type: float
"""
@min_face_quality.setter
def min_face_quality(self, arg0: float) -> None:
"""
The minimum image quality threshold
"""
@property
def min_face_sharpness(self) -> float:
"""
The minimum image sharpness threshold
:type: float
"""
@min_face_sharpness.setter
def min_face_sharpness(self, arg0: float) -> None:
"""
The minimum image sharpness threshold
"""
@property
def min_face_size(self) -> int:
"""
The minimum size of the face
:type: int
"""
@min_face_size.setter
def min_face_size(self, arg0: int) -> None:
"""
The minimum size of the face
"""
pass

View File

@ -1,37 +0,0 @@
"""
Paravision
======
Provides an interface to the Paravision models.
How to use the documentation
----------------------------
Documentation is available in two forms: docstrings provided within the code,
and a reference guide, available
`here <https://ever-ai-documentation.readme.io/v1.0/docs/getting-started>`.
Code snippets are indicated by three greater-than signs::
>>> sdk = paravision.SDK()
Use the built-in ``help`` function to view a function or object's docstring::
>>> help(paravision.SDK)
...
Example
-------
This simple example illustrates how to detect the bounding boxes of faces in an image:
>>> import paravision
>>> from paravision.utils import load_image
>>> img = load_image('/tmp/face.jpg')
>>> sdk = paravision.SDK()
>>> sdk.get_faces(img)
([<Face ((278, 262), (904, 1143))>], 0)
"""
from .sdk import SDK # noqa
from .engine import Engine # noqa
__version__ = "8.2.0"

View File

@ -0,0 +1,19 @@
from __future__ import annotations
import paravision.recognition
import typing
from .types import Engine
from .types import ImageManipulator
from .sdk import SDK
__all__ = [
"Engine",
"ImageManipulator",
"SDK",
"exceptions",
"sdk",
"types",
"utils"
]
__version__ = 'dev'

View File

@ -1,298 +0,0 @@
import cv2
import numpy as np
from . import _utils as utils
from .engine import Engine
from .types import BoundingBox, Landmarks, Embedding
from .exceptions import ModelLoadingException
LANDMARKS_EXPAND_FACTOR = 0.3
ALIGNMENT_EXPAND_FACTOR = 1.0
MASK_EXPAND_FACTOR = 0.3
"""The way the pipeline is run needs to be refactored.
Making temporary fixes for now. """
AVAILABLE_OPTIONS = ["find_landmarks", "compute_embeddings"]
class SplitGraph:
def __init__(self, models_dirpath, settings=None, engine=Engine.OPENVINO):
if settings is None:
settings = {}
if isinstance(engine, Engine):
self.engine_name = engine
else:
self.engine_name = engine.split("-")[0]
if self.engine_name == Engine.OPENVINO:
from .openvino.engine import Engine as E
elif self.engine_name == Engine.TENSORRT:
from .tensorrt.engine import Engine as E
else:
raise ModelLoadingException(
f"This is not a valid engine choice: {engine}. Available choices are: {Engine.all()}."
)
self.engine = E(models_dirpath, settings)
def prepare_for_detection(self, img):
height, width = img.shape[:2]
fd_input_height, fd_input_width = self.engine.fd_input_shape
ratio = min(fd_input_height / height, fd_input_width / width)
target_width = round(width * ratio)
target_height = round(height * ratio)
resized = utils.resize(img, target_height, target_width)
offset_pad_height = fd_input_height - target_height
offset_pad_width = fd_input_width - target_width
padded = cv2.copyMakeBorder(
resized,
0,
offset_pad_height,
0,
offset_pad_width,
cv2.BORDER_CONSTANT,
value=[0, 0, 0],
)
return padded, (target_height, target_width)
def prepare_for_landmarks(self, np_img, bbox, original_size):
exp_bbox, _, pre_pad_exp_img = utils.expand_and_crop(
np_img, LANDMARKS_EXPAND_FACTOR, bbox, original_size
)
image_h, image_w = np_img.shape[:2]
exp_img = utils.maybe_pad(pre_pad_exp_img, exp_bbox, image_h, image_w)
target_h, target_w = self.engine.lm_input_shape
resized_img = utils.resize(exp_img, target_h, target_w)
return exp_bbox, resized_img
def process_post_detection(
self, imgs, relative_bboxes, detection_input_sizes, img_indexes
):
absolute_bboxes = []
alignment_images = []
alignment_bounding_boxes = []
landmarks_input_bounding_boxes = []
landmarks_input_images = []
for i, relative_bbox in enumerate(relative_bboxes):
img = imgs[img_indexes[i]]
detection_input_size = detection_input_sizes[img_indexes[i]]
img_size = np.asarray(img.shape[:2])
absolute_bbox = utils.convert_to_absolute_coordinates(
relative_bbox,
detection_input_size,
img_size,
self.engine.fd_input_shape,
)
if absolute_bbox[0] > img_size[1] or absolute_bbox[1] > img_size[0]:
continue
square_bb = utils.square(absolute_bbox)
landmarks_input_bbox, landmarks_input_image = self.prepare_for_landmarks(
img, square_bb, img_size
)
_, alignment_bbox, alignment_image = utils.expand_and_crop(
img, ALIGNMENT_EXPAND_FACTOR, square_bb, img_size
)
absolute_bboxes.append(absolute_bbox)
alignment_images.append(alignment_image)
alignment_bounding_boxes.append(alignment_bbox)
landmarks_input_images.append(landmarks_input_image)
landmarks_input_bounding_boxes.append(landmarks_input_bbox)
values = {
"bounding_boxes": [BoundingBox(*_bb) for _bb in absolute_bboxes],
"landmarks_input_bounding_boxes": landmarks_input_bounding_boxes,
"landmarks_input_images": landmarks_input_images,
"alignment_bounding_boxes": alignment_bounding_boxes,
"alignment_images": alignment_images,
}
return values
def process_detection_options(self, detect_resp, scoring_mode, options):
values = {}
if "get_qualities" in options:
qualities, acceptabilities = self.get_qualities(
detect_resp["landmarks_input_images"]
)
values["qualities"] = qualities
values["acceptabilities"] = acceptabilities
if any(option in AVAILABLE_OPTIONS for option in options):
(landmarks, recognition_input_images) = self.find_landmarks(
detect_resp["landmarks_input_bounding_boxes"],
detect_resp["landmarks_input_images"],
detect_resp["alignment_bounding_boxes"],
detect_resp["alignment_images"],
)
values["landmarks"] = [Landmarks(*x) for x in landmarks]
values["recognition_input_images"] = recognition_input_images
if "compute_embeddings" in options:
values["embeddings"] = [
Embedding(data, scoring_mode)
for data in self.compute_embeddings(recognition_input_images)
]
return values
def run(self, imgs, scoring_mode, options=None):
if options is None:
options = []
detection_inputs = []
detection_input_sizes = []
for img in imgs:
img_for_fd, resized_size = self.prepare_for_detection(img)
detection_inputs.append(img_for_fd)
detection_input_sizes.append(resized_size)
relative_bboxes, confidences, img_indexes = self.engine.predict_bounding_boxes(
detection_inputs
)
values = {"confidences": confidences}
# post-process detection
detect_resp = self.process_post_detection(
imgs, relative_bboxes, detection_input_sizes, img_indexes
)
values.update(detect_resp)
# process options
options_resp = self.process_detection_options(
detect_resp, scoring_mode, options
)
values.update(options_resp)
return values, img_indexes
def run_from_landmarks(self, img, bboxes):
absolute_bboxes = []
alignment_images = []
alignment_bounding_boxes = []
landmarks_input_bounding_boxes = []
landmarks_input_images = []
for absolute_bbox in bboxes:
img_size = np.asarray(img.shape[:2])
bounding_box = np.array(
[
absolute_bbox.origin.x,
absolute_bbox.origin.y,
absolute_bbox.origin.x + absolute_bbox.width,
absolute_bbox.origin.y + absolute_bbox.height,
]
)
if bounding_box[0] > img_size[1] or bounding_box[1] > img_size[0]:
continue
square_bb = utils.square(bounding_box)
landmarks_input_bbox, landmarks_input_image = self.prepare_for_landmarks(
img, square_bb, img_size
)
_, alignment_bbox, alignment_image = utils.expand_and_crop(
img, ALIGNMENT_EXPAND_FACTOR, square_bb, img_size
)
absolute_bboxes.append(bounding_box)
alignment_images.append(alignment_image)
alignment_bounding_boxes.append(alignment_bbox)
landmarks_input_images.append(landmarks_input_image)
landmarks_input_bounding_boxes.append(landmarks_input_bbox)
(landmarks, recognition_input_images) = self.find_landmarks(
landmarks_input_bounding_boxes,
landmarks_input_images,
alignment_bounding_boxes,
alignment_images,
)
values = {
"bounding_boxes": [BoundingBox(*_bb) for _bb in absolute_bboxes],
"landmarks_input_bounding_boxes": landmarks_input_bounding_boxes,
"landmarks_input_images": landmarks_input_images,
"alignment_bounding_boxes": alignment_bounding_boxes,
"alignment_images": alignment_images,
"landmarks": [Landmarks(*x) for x in landmarks],
"recognition_input_images": recognition_input_images,
}
return values
def find_landmarks(
self,
landmarks_input_bounding_boxes,
landmarks_input_images,
alignment_bounding_boxes,
alignment_images,
):
if len(landmarks_input_bounding_boxes) == 0:
return [], []
relative_landmarks = self.engine.predict_landmarks(landmarks_input_images)
relative_landmarks = relative_landmarks.reshape(-1, 5, 2)
absolute_landmarks = []
recognition_input_images = []
for i, landmarks in enumerate(relative_landmarks):
landmarks_input_bounding_box = landmarks_input_bounding_boxes[i]
alignment_bounding_box = alignment_bounding_boxes[i]
alignment_image = alignment_images[i]
landmarks = utils.normalize(landmarks_input_bounding_box, landmarks)
recognition_input_image = utils.crop_and_align(
alignment_image,
landmarks - alignment_bounding_box[:2],
self.engine.fr_input_shape,
)
absolute_landmarks.append(landmarks)
recognition_input_images.append(recognition_input_image)
return absolute_landmarks, recognition_input_images
def compute_embeddings(self, recognition_input_images):
if len(recognition_input_images) == 0:
return []
return self.engine.predict_embeddings(recognition_input_images)
def get_attributes(self, recognition_input_images):
if len(recognition_input_images) == 0:
return [], []
return self.engine.predict_attributes(recognition_input_images)
def get_fr_input_shape(self):
return self.engine.fr_input_shape
def get_fr_output_shape(self):
return self.engine.fr_output_shape
def check_for_mask(self, landmarks_input_images):
if len(landmarks_input_images) == 0:
return []
return self.engine.check_for_masks(landmarks_input_images)
def get_qualities(self, landmarks_input_images):
if len(landmarks_input_images) == 0:
return [], []
qualities, acceptabilities = self.engine.get_qualities(landmarks_input_images)
qualities = np.clip(qualities, 0, 1).tolist()
acceptabilities = np.clip(acceptabilities, 0, 1).tolist()
return qualities, acceptabilities

View File

@ -1,310 +0,0 @@
import json
import cv2
import importlib
import numpy as np
from os import walk, path
from .engine import Engine
from .exceptions import ModelLoadingException, InternalErrorException
from .types import Face
OPENVINO_EXT = "xml"
TENSORRT_EXT = "onnx"
MODELS_DIRECTORY = "recognition"
KEYS = {
"acceptabilities": "acceptability",
"bounding_boxes": "bounding_box",
"confidences": "score",
"recognition_input_images": "recognition_input_image",
"embeddings": "embedding",
"landmarks_input_images": "landmarks_input_image",
"mask_input_images": "mask_input_image",
"landmarks_input_bounding_boxes": "landmarks_input_bounding_box",
"alignment_bounding_boxes": "alignment_bounding_box",
"alignment_images": "alignment_image",
"qualities": "quality",
}
_SQUARE_TO_POINTS = [
[38.2946, 51.6963],
[73.5318, 51.5014],
[56.0252, 71.7366],
[41.5493, 92.3655],
[70.7299, 92.2041],
]
def model_location():
try:
paravision_models = importlib.import_module("paravision_models")
return paravision_models.location()
except ModuleNotFoundError as err:
raise ModelLoadingException(
"You need to install Paravision Models package"
) from err
def match_engine():
try:
paravision_models = importlib.import_module("paravision_models")
return paravision_models.engine()
except ModuleNotFoundError as err:
raise ModelLoadingException(
"You need to install Paravision Models package"
) from err
def match_engine_given_path(models_dir):
(_, _, filenames) = next(walk(path.join(models_dir, MODELS_DIRECTORY)))
if any(OPENVINO_EXT in f_name for f_name in filenames):
return Engine.OPENVINO
if any(TENSORRT_EXT in f_name for f_name in filenames):
return Engine.TENSORRT
raise ModelLoadingException(
"No compatible models found. Please ensure that your model path is correct."
)
def mask_model_location():
try:
mask = importlib.import_module("paravision_models.mask")
return mask.location()
except ModuleNotFoundError as err:
raise ModelLoadingException(
"You need to install Paravision Mask Model package"
) from err
def read_spec_value(model_loc, key):
try:
with open(path.join(model_loc, "spec.json"), "r", encoding="utf-8") as f:
spec = json.load(f)
return spec[key]
except (FileNotFoundError, KeyError) as err:
raise ModelLoadingException(
"Invalid spec file. Please verify the models are installed correctly."
) from err
def build_faces(graph_dict):
faces = []
for values in zip(*graph_dict.values()):
face_dict = {KEYS.get(k, k): v for k, v in zip(graph_dict.keys(), values)}
face_dict["bounding_box"].score = face_dict.get("score", None)
face = Face(face_dict["bounding_box"])
face_dict.pop("bounding_box")
face_dict.pop("score", None)
for k, v in face_dict.items():
setattr(face, k, v)
faces.append(face)
return faces
def read_fd_input_shape(model_loc, fd_model_type):
if fd_model_type == "streaming":
return read_spec_value(model_loc, "fd_streaming_input_shape")
return read_spec_value(model_loc, "fd_input_shape")
def read_lm_input_shape(model_loc):
return read_spec_value(model_loc, "lm_input_shape")
def read_fr_input_shape(model_loc):
return read_spec_value(model_loc, "fr_input_shape")
def read_fr_output_shape(model_loc):
return read_spec_value(model_loc, "embedding_size")
def read_at_input_shape(model_loc):
return read_spec_value(model_loc, "at_input_shape")
def read_em_input_shape(model_loc):
return read_spec_value(model_loc, "em_input_shape")
def read_md_input_shape(model_loc):
return read_spec_value(model_loc, "md_input_shape")
def resize(np_img, height, width):
return cv2.resize(np_img, (width, height))
def expand_bb(bbox, p=1.0):
"""Takes a bounding box and expand by a factor of 1 + p
Args:
bb: bounding box in the format of [x1, y1, x2, y2]
p: additive factor
"""
x1, y1, x2, y2 = bbox
dx = (x2 - x1) * p / 2
dy = (y2 - y1) * p / 2
x1 -= dx
y1 -= dy
x2 += dx
y2 += dy
return x1, y1, x2, y2
def restrict_bbox_to_edges(h, w, bbox):
x1, y1, x2, y2 = bbox
x1 = max(x1, 0)
y1 = max(y1, 0)
x2 = min(x2, w)
y2 = min(y2, h)
return x1, y1, x2, y2
def maybe_pad(crop_img, exp_bbox, h, w):
x1, y1, x2, y2 = exp_bbox
pc1 = max(0 - x1, 0)
pc2 = max(0, x2 - w)
pr1 = max(0 - y1, 0)
pr2 = max(0, y2 - h)
pad = np.rint(np.array([(pr1, pr2), (pc1, pc2), (0, 0)])).astype(np.int32)
crop_pad_img = np.pad(crop_img, pad, mode="constant")
return crop_pad_img
def square(bb):
x1, y1, x2, y2 = bb
padding = ((x2 - x1) - (y2 - y1)) / 2
if padding < 0:
x1 += padding
x2 -= padding
elif padding > 0:
y1 -= padding
y2 += padding
return x1, y1, x2, y2
def crop(np_img, bb, h, w):
"""Simple crop function in numpy
Args:
np_img: H x W x C image
bb: list or tuple of format (x1, y1, x2, y2)
Returns:
cropped numpy image
"""
x1, y1, x2, y2 = bb
if x1 >= x2 or y1 >= y2:
raise InternalErrorException("Invalid bounding box for image cropping.")
x1 = max(x1, 0)
y1 = max(y1, 0)
x2 = min(x2, w)
y2 = min(y2, h)
x1, y1, x2, y2 = np.rint([x1, y1, x2, y2]).astype(np.int32)
return np_img[y1:y2, x1:x2, :]
def compute_transform(src_points, dst_points):
"""estimate the rigid transform needed to transform src_points into
dst_points
"""
points1 = np.asarray(src_points)
points2 = np.asarray(dst_points)
# zero-mean
center1 = np.expand_dims(np.mean(points1, axis=0), axis=0)
center2 = np.expand_dims(np.mean(points2, axis=0), axis=0)
points1 -= center1
points2 -= center2
std1 = np.std(points1)
std2 = np.std(points2)
points1 /= std1
points2 /= std2
U, _, V = np.linalg.svd(points1.T.dot(points2))
R = (U.dot(V)).T
trans = np.hstack(
((std2 / std1) * R, center2.T - ((std2 / std1) * R).dot(center1.T))
)
return trans
def crop_and_align(np_img, from_points, img_shape):
h, w = img_shape
trans = compute_transform(from_points, _SQUARE_TO_POINTS)
return cv2.warpAffine(np_img, trans, (w, h))
def normalize(exp_bbox, lmks):
x1, y1, x2, y2 = exp_bbox
return lmks * [x2 - x1, y2 - y1] + [x1, y1]
def expand_and_crop(np_img, p, bbox, original_size):
h, w = original_size
exp_bbox = expand_bb(bbox, p)
exp_edge_restricted_bbox = restrict_bbox_to_edges(h, w, exp_bbox)
crop_img = crop(np_img, exp_edge_restricted_bbox, h, w)
return exp_bbox, exp_edge_restricted_bbox, crop_img
def convert_to_absolute_coordinates(bbox, resized_size, original_size, fd_input_shape):
h, w = original_size
ratio = fd_input_shape / np.asarray(resized_size)
return (
bbox
* np.asarray([w, h, w, h])
* np.asarray([ratio[1], ratio[0], ratio[1], ratio[0]])
)
def sigmoid_transform(value, weight, bias):
return 1 / (1 + np.exp(-(weight * value + bias)))
def get_model_types(settings):
fd_model_type = settings.get("detection_model", "default")
lm_model_type = "default"
ql_model_type = "default"
fr_model_type = "default"
at_model_type = "default"
md_model_type = "default"
return (
fd_model_type,
lm_model_type,
ql_model_type,
fr_model_type,
at_model_type,
md_model_type,
)

View File

@ -1,11 +0,0 @@
from enum import Enum
class Engine(str, Enum):
OPENVINO = "openvino"
TENSORRT = "tensorrt"
AUTO = "auto"
@staticmethod
def all():
return [Engine.OPENVINO, Engine.TENSORRT, Engine.AUTO]

View File

@ -1,15 +0,0 @@
class ParavisionException(Exception):
def __init__(self, message):
self.message = message
class ModelLoadingException(ParavisionException):
pass
class InvalidInputException(ParavisionException):
pass
class InternalErrorException(ParavisionException):
pass

View File

@ -0,0 +1,28 @@
from __future__ import annotations
import paravision.recognition.exceptions
import typing
__all__ = [
"InternalErrorException",
"InvalidInputException",
"ModelLoadingException",
"ParavisionException"
]
class ParavisionException(Exception, BaseException):
pass
class InvalidInputException(ParavisionException, Exception, BaseException):
pass
class ModelLoadingException(ParavisionException, Exception, BaseException):
pass
class InternalErrorException(ParavisionException, Exception, BaseException):
pass

View File

@ -1,245 +0,0 @@
import multiprocessing
import numpy as np
import os
from openvino.inference_engine import IECore
from .. import _utils as utils
UNIT_LOWER_LIMIT = 0
UNIT_UPPER_LIMIT = 1
FD_NAME = "detection"
LM_NAME = "landmarks"
QL_NAME = "quality"
FR_NAME = "recognition"
AT_NAME = "attributes"
MD_NAME = "mask"
BIN_EXT = ".bin"
XML_EXT = ".xml"
class Engine:
def __init__(self, models_dir, settings):
ie_core = IECore()
num_threads = multiprocessing.cpu_count()
try:
num_threads = min(
num_threads, max(int(os.getenv("PV_OPENVINO_THREADS_LIMIT")), 1)
)
except (TypeError, ValueError):
pass
ie_core.set_config({"CPU_THREADS_NUM": str(num_threads)}, "CPU")
(
fd_model_type,
lm_model_type,
ql_model_type,
fr_model_type,
at_model_type,
md_model_type,
) = utils.get_model_types(settings)
fd_net = ie_core.read_network(
model=os.path.join(models_dir, FD_NAME, fd_model_type, FD_NAME + XML_EXT),
weights=os.path.join(models_dir, FD_NAME, fd_model_type, FD_NAME + BIN_EXT),
)
self.fd_input_name = next(iter(fd_net.input_info))
self.fd_input_shape = utils.read_fd_input_shape(models_dir, fd_model_type)
self.fd_bboxes_name = "bboxes"
self.fd_scores_name = "scores"
self.fd_select_idxs_name = "selected_indices"
self.fd_net = ie_core.load_network(network=fd_net, device_name="CPU")
lm_net = ie_core.read_network(
model=os.path.join(models_dir, LM_NAME, lm_model_type, LM_NAME + XML_EXT),
weights=os.path.join(models_dir, LM_NAME, lm_model_type, LM_NAME + BIN_EXT),
)
self.lm_input_name = next(iter(lm_net.input_info))
self.lm_input_shape = utils.read_lm_input_shape(models_dir)
self.lm_landmarks_name = "landmarks"
self.lm_net = ie_core.load_network(network=lm_net, device_name="CPU")
ql_net = ie_core.read_network(
model=os.path.join(models_dir, QL_NAME, ql_model_type, QL_NAME + XML_EXT),
weights=os.path.join(models_dir, QL_NAME, ql_model_type, QL_NAME + BIN_EXT),
)
self.ql_input_name = next(iter(ql_net.input_info))
self.ql_input_shape = utils.read_lm_input_shape(models_dir)
self.ql_qualities_name = "qualities"
self.ql_acceptabilities_name = "acceptabilities"
self.ql_net = ie_core.load_network(network=ql_net, device_name="CPU")
fr_net = ie_core.read_network(
model=os.path.join(models_dir, FR_NAME, fr_model_type, FR_NAME + XML_EXT),
weights=os.path.join(models_dir, FR_NAME, fr_model_type, FR_NAME + BIN_EXT),
)
self.fr_input_name = next(iter(fr_net.input_info))
self.fr_input_shape = utils.read_fr_input_shape(models_dir)
self.fr_output_name = next(iter(fr_net.outputs))
self.fr_output_shape = utils.read_fr_output_shape(models_dir)
self.fr_net = ie_core.load_network(network=fr_net, device_name="CPU")
at_net = ie_core.read_network(
model=os.path.join(models_dir, AT_NAME, at_model_type, AT_NAME + XML_EXT),
weights=os.path.join(models_dir, AT_NAME, at_model_type, AT_NAME + BIN_EXT),
)
self.at_input_name = next(iter(at_net.input_info))
self.at_input_shape = utils.read_at_input_shape(models_dir)
self.at_net = ie_core.load_network(network=at_net, device_name="CPU")
if "mask" in settings:
md_model_path = settings["mask"]["models_dir"]
md_net = ie_core.read_network(
model=os.path.join(md_model_path, md_model_type, MD_NAME + XML_EXT),
weights=os.path.join(md_model_path, md_model_type, MD_NAME + BIN_EXT),
)
self.md_input_name = next(iter(md_net.input_info))
self.md_input_shape = md_net.input_info[
self.md_input_name
].input_data.shape[2:]
self.md_net = ie_core.load_network(network=md_net, device_name="CPU")
self.mask_enabled = True
else:
self.mask_enabled = False
def predict_bounding_boxes(self, np_imgs):
"""
Args:
np_imgs: (list) list of images loaded in numpy, of format (1, H, W, C)
Returns:
bboxes: (list) list containing arrays of bboxes for each image
in order [x1, y1, x2, y2], scaled between 0, 1
confs: (list) list containing arrays of confidences scores
of the faces for each image
"""
all_bboxes, all_scores, all_face_counts = [], [], []
np_imgs = np.transpose(np_imgs, (0, 3, 1, 2))
for np_img in np_imgs:
ie_out = self.fd_net.infer(inputs={self.fd_input_name: np_img})
bboxes = ie_out[self.fd_bboxes_name]
scores = ie_out[self.fd_scores_name]
select_idxs = ie_out[self.fd_select_idxs_name]
# keep select_idxs until we see -1
i = 0
for idx in select_idxs[:, 0]:
if idx == -1:
break
i += 1
select_idxs = select_idxs[:i]
# filter bboxes and scores based on select_idxs
for batch_idx, class_idx, idx in select_idxs:
all_bboxes.append(bboxes[batch_idx][idx])
all_scores.append(scores[batch_idx][class_idx][idx].item())
all_face_counts.append(len(select_idxs))
img_idxs = []
for img, num in enumerate(all_face_counts):
img_idxs += [img] * num
return all_bboxes, all_scores, img_idxs
def predict_landmarks(self, np_imgs):
"""
Args:
np_imgs: (list) list of imgages loaded in numpy of format (1, C, H, W)
Returns:
qualities: (numpy array) qualities value between 0 and 1
lmks: (numpy array) landmarks in the shape of (N, 5, 2)
"""
np_imgs = np.transpose(np_imgs, (0, 3, 1, 2))
landmarks = []
for np_img in np_imgs:
ie_out = self.lm_net.infer(inputs={self.lm_input_name: np_img})
lmks = np.squeeze(ie_out[self.lm_landmarks_name])
landmarks.append(lmks)
return np.asarray(landmarks)
def get_qualities(self, np_imgs):
"""
Args:
np_imgs: (list) list of imgages loaded in numpy of format (1, C, H, W)
Returns:
qualities: (numpy array) qualities value between 0 and 1
"""
np_imgs = np.transpose(np_imgs, (0, 3, 1, 2))
qualities, acceptabilities = [], []
for np_img in np_imgs:
ie_out = self.ql_net.infer(inputs={self.ql_input_name: np_img})
quality = np.squeeze(ie_out[self.ql_qualities_name])
qualities.append(quality)
acceptability = np.squeeze(ie_out[self.ql_acceptabilities_name])
acceptabilities.append(acceptability)
return (
np.clip(qualities, UNIT_LOWER_LIMIT, UNIT_UPPER_LIMIT),
np.clip(acceptabilities, UNIT_LOWER_LIMIT, UNIT_UPPER_LIMIT),
)
def predict_embeddings(self, np_imgs):
"""
Args:
np_imgs: (list) list of images loaded in numpy of format (1, C, H, W)
Returns:
embs: (numpy array) array of embedding arrays
"""
np_imgs = np.transpose(np_imgs, (0, 3, 1, 2))
embeddings = []
for np_img in np_imgs:
ie_out = self.fr_net.infer(inputs={self.fr_input_name: np_img})
embeddings.append(np.squeeze(ie_out[self.fr_output_name]))
return np.asarray(embeddings)
def predict_attributes(self, np_imgs):
"""
Args:
np_img: (numpy array) img loaded in numpy of format (1, C, H, W)
Returns:
ages: (numpy array) age probabilities in the shape of (N, 1, 7)
genders: (numpy array) gender probabilities in the shape of (N, 1, 2)
"""
np_imgs = np.transpose(np_imgs, (0, 3, 1, 2))
ages, genders = [], []
for np_img in np_imgs:
ie_out = self.at_net.infer(inputs={self.at_input_name: np_img})
ages.append(ie_out["age_probs"][0])
genders.append(ie_out["gender_probs"][0])
return ages, genders
def check_for_masks(self, np_imgs):
"""
Args:
np_img: (numpy array) img loaded in numpy of format (1, C, H, W)
Returns:
mask_probabilities: (numpy array) mask probabilities in the shape of (N, 1, 4)
"""
np_imgs = np.transpose(np_imgs, (0, 3, 1, 2))
mask_probabilities = []
for np_img in np_imgs:
ie_out = self.md_net.infer(inputs={self.md_input_name: np_img})
mask_probabilities.append(list(ie_out.values())[0][0][0])
return mask_probabilities

View File

@ -1,471 +0,0 @@
"""sdk: Instantiate the Paravision model."""
from typing import List, Optional, Sequence
import numpy as np
import warnings
import os
from ._internal import SplitGraph
from . import _utils as utils
from .types import (
BoundingBox,
Face,
Embedding,
InferenceResult,
ImageInferenceData,
Landmarks,
ScoringMode,
)
from .exceptions import InvalidInputException, InternalErrorException
from .engine import Engine
ERR_INVALID_EMB_MODE = "Invalid embedding scoring mode"
ERR_INVALID_EMB_PREPARED_IMAGE = "Invalid prepared image for embedding"
MATCH_SCORE_SCALE = 1000
ENHANCED_MATCH_SCORE_WEIGHT = 2.3
ENHANCED_MATCH_SCORE_BIAS = -0.5
STANDARD_MATCH_SCORE_WEIGHT = 2.1
STANDARD_MATCH_SCORE_BIAS = -5.3
class SDK:
"""
SDK()
A sdk object contains an instance of the Paravision model and its
associated resources.
SDK objects are long-living and do not need to be re-instantiated between
method calls.
"""
def __init__(
self,
models_dir: Optional[str] = None,
settings: Optional[dict] = None,
engine: Engine = Engine.AUTO,
):
"""Create a SDK instance."""
if settings is None:
settings = {}
if models_dir is None:
models_dir = str(utils.model_location())
if engine == Engine.AUTO:
engine = utils.match_engine()
elif engine == Engine.AUTO:
engine = utils.match_engine_given_path(models_dir)
if "attributes" not in settings:
settings["attributes"] = {"models_dir": models_dir}
if "mask" not in settings:
if os.path.isdir(os.path.join(models_dir, "mask")):
settings["mask"] = {"models_dir": os.path.join(models_dir, "mask")}
else:
try:
settings["mask"] = {"models_dir": utils.mask_model_location()}
except Exception:
# TODO: temp solution to silent SonarCloud, should update when logging is added.
settings.pop("mask", None)
self._graph = SplitGraph(models_dir, settings, engine=engine)
self._weight = utils.read_spec_value(models_dir, "weight")
self._bias = utils.read_spec_value(models_dir, "bias")
self._scoring_mode = settings.get("scoring_mode", ScoringMode.StandardEmbedding)
def get_faces(
self,
imgs: Sequence[np.ndarray],
qualities: bool = False,
landmarks: bool = False,
embeddings: bool = False,
) -> InferenceResult:
"""
Detect faces in the image.
Includes bounding boxes, landmarks, and [optionally] image quality
details.
Accepts a list of NumPy arrays (images).
Returns InferenceResult object.
"""
options = []
if landmarks is True:
options.append("find_landmarks")
if embeddings is True:
options.append("compute_embeddings")
if qualities is True:
options.append("get_qualities")
outputs, img_idxs = self._graph.run(imgs, self._scoring_mode, options)
faces = utils.build_faces(outputs)
image_inferences = []
for img in imgs:
height, width = img.shape[:2]
image_inferences.append(ImageInferenceData(width, height))
for img_idx, face in zip(img_idxs, faces):
image_inferences[img_idx].faces.append(face)
return InferenceResult(image_inferences)
def get_qualities(self, faces: Sequence[Face]) -> None:
"""
Get qualities for faces in the image.
Accepts a list of Face objects.
No return values. Updates the face objects in place with qualities.
"""
if len(faces) == 0:
return
imgs = [face.landmarks_input_image for face in faces]
qualities, acceptabilities = self._graph.get_qualities(imgs)
for face, quality, acceptability in zip(faces, qualities, acceptabilities):
face.quality = quality
face.acceptability = acceptability
def get_masks(self, faces: Sequence[Face]) -> None:
"""
Deprecated: This will be removed in the next major release. An Attributes SDK
will be provided in the future to replace functionality.
Get the mask probabilities for faces.
Accepts a list of faces.
No return values. Updates the face objects in place with mask probabilities.
"""
warnings.warn(
"""get_masks is deprecated and will be removed in the next major release.
An Attributes SDK will be provided in the future to replace functionality.""",
DeprecationWarning,
)
if len(faces) == 0:
return
mask_input_images = []
for face in faces:
if face.landmarks_input_image is None:
raise InvalidInputException(
"Face.landmarks_input_image is needed but is None"
)
mask_input_images.append(face.landmarks_input_image)
probability = self._graph.check_for_mask(mask_input_images)
for i, face in enumerate(faces):
face.mask = float(probability[i])
def get_bounding_boxes(self, imgs: Sequence[np.ndarray]) -> InferenceResult:
"""
Detect bounding boxes of faces in the image, returning a list of Faces.
Accepts a list of NumPy arrays (images).
Returns InferenceResult object.
"""
return self.get_faces(imgs)
def get_landmarks_from_bounding_boxes(
self, img: np.ndarray, bboxes: Sequence[BoundingBox]
) -> InferenceResult:
outputs = self._graph.run_from_landmarks(img, bboxes)
faces = utils.build_faces(outputs)
height, width = img.shape[:2]
image_inference = ImageInferenceData(width, height)
image_inference.faces.extend(faces)
return InferenceResult([image_inference])
def get_landmarks(self, faces: Sequence[Face]):
"""
Get the landmarks for faces.
Accepts a list of faces.
No return values. Updates the face objects in place with landmark values.
"""
if len(faces) == 0:
return
landmarks_input_bounding_boxes = []
landmarks_input_images = []
alignment_images = []
alignment_bounding_boxes = []
for face in faces:
if face.landmarks_input_image is None:
raise InvalidInputException("Face.landmarks_input_image is None.")
if face.landmarks_input_bounding_box is None:
raise InvalidInputException(
"Face.landmarks_input_bounding_box is None."
)
if face.alignment_image is None:
raise InvalidInputException("Face.alignment_image is None.")
if face.alignment_bounding_box is None:
raise InvalidInputException("Face.alignment_bounding_box is None.")
landmarks_input_images.append(face.landmarks_input_image)
landmarks_input_bounding_boxes.append(face.landmarks_input_bounding_box)
alignment_images.append(face.alignment_image)
alignment_bounding_boxes.append(face.alignment_bounding_box)
landmarks, recognition_input_images = self._graph.find_landmarks(
landmarks_input_bounding_boxes,
landmarks_input_images,
alignment_bounding_boxes,
alignment_images,
)
for i, face in enumerate(faces):
face.landmarks = Landmarks(*landmarks[i])
face.recognition_input_image = recognition_input_images[i]
def get_embeddings(self, faces: Sequence[Face]):
"""
Get embeddings for faces.
Accepts a list of Face objects.
No return values. Updates the face objects in place with embeddings.
"""
if len(faces) == 0:
return
recognition_input_images = []
for face in faces:
if face.recognition_input_image is None:
raise InvalidInputException("Face.recognition_input_image is None.")
recognition_input_images.append(face.recognition_input_image)
embeddings = self._graph.compute_embeddings(recognition_input_images)
for i, face in enumerate(faces):
face.embedding = Embedding(embeddings[i], self._scoring_mode)
def get_embeddings_from_landmarks(
self, image: np.ndarray, landmarks: Sequence[Landmarks]
) -> List[Embedding]:
recognition_input_images = [
utils.crop_and_align(
image, landmark.astuple(), self._graph.engine.fr_input_shape
)
for landmark in landmarks
]
return [
Embedding(data, self._scoring_mode)
for data in self._graph.compute_embeddings(recognition_input_images)
]
def get_embedding_from_prepared_image(
self, prepared_image: np.ndarray
) -> Embedding:
"""
Compute embedding using the prepared image i.e. recognition_input_image.
Accepts one prepared image.
Returns embedding.
"""
if prepared_image is None:
raise InvalidInputException(ERR_INVALID_EMB_PREPARED_IMAGE)
embeddings = self._graph.compute_embeddings([prepared_image])
return Embedding(embeddings[0], self._scoring_mode)
def get_attributes(self, faces: Sequence[Face]):
"""
Deprecated: This will be removed in the next major release. An Attributes SDK
will be provided in the future to replace functionality.
Computes age and gender attributes for faces.
Accepts a list of Face objects.
No return values. Updates the face objects in place with age and gender values.
"""
warnings.warn(
"""get_attributes is deprecated and will be removed in the next major release.
An Attributes SDK will be provided in the future to replace functionality.""",
DeprecationWarning,
)
if len(faces) == 0:
return
recognition_input_images = []
for face in faces:
if face.recognition_input_image is None:
raise InvalidInputException("Face.recognition_input_image is None.")
recognition_input_images.append(face.recognition_input_image)
ages, genders = self._graph.get_attributes(recognition_input_images)
for i, face in enumerate(faces):
face.ages = ages[i]
face.genders = genders[i]
@staticmethod
def _get_standard_score(emb1: Embedding, emb2: Embedding) -> float:
"""
Compute the difference score of two faces embeddings based on the Euclidean
distance between them. A larger number indicates a greater similarity between
the two embeddings; a lower number indicates a greater difference between the two embeddings.
Accepts 2 embedding objects. Assumes the scoring mode of the embeddings to be standard.
Returns a float between [0, 4]. If both embeddings are not in standard scoring mode,
an InvalidInputException is thrown.
"""
if (
emb1.scoring_mode != ScoringMode.StandardEmbedding
or emb1.scoring_mode != emb2.scoring_mode
):
raise InvalidInputException(ERR_INVALID_EMB_MODE)
score = 4 - np.sum((emb1.data - emb2.data) ** 2)
return float(np.clip(score, 0, 4))
@staticmethod
def _get_enhanced_score(emb1: Embedding, emb2: Embedding) -> float:
"""
Compute quality-aware score between two face embeddings. A larger number indicates a
greater similarity between the two embeddings; a lower number indicates a
greater difference between the two embeddings.
Accepts 2 embedding vectors.
Returns a float between [0, 2]. If both embeddings are not in enhanced scoring mode,
an InvalidInputException is thrown.
"""
if (
emb1.scoring_mode != ScoringMode.EnhancedEmbedding
or emb1.scoring_mode != emb2.scoring_mode
):
raise InvalidInputException(ERR_INVALID_EMB_MODE)
base_emb1, uncertainty1 = emb1.data[:-1], emb1.data[-1]
base_emb2, uncertainty2 = emb2.data[:-1], emb2.data[-1]
total_uncertainty = uncertainty1 + uncertainty2
if total_uncertainty < 0:
raise InternalErrorException("Uncertainty values cannot be negative.")
attention = 2 * (1 - base_emb1 @ base_emb2) / (1e-10 + total_uncertainty)
dist = attention + np.log(1e-10 + total_uncertainty)
score = np.exp(-dist)
return float(np.clip(score, 0, 2))
@staticmethod
def get_similarity(emb1: Embedding, emb2: Embedding) -> float:
"""
Compute the difference score of two faces embeddings. A larger number indicates a
greater similarity between the two embeddings; a lower number indicates a
greater difference between the two embeddings.
Accepts 2 embedding objects.
Returns a float between [0, 2] for enhanced mode or [0, 4] for standard mode.
If either of the embeddings is None, or if the embeddings are of different
sizes, or if the embeddings have different scoring_method, raises InvalidInputException
"""
if not (
isinstance(emb1, Embedding)
and isinstance(emb2, Embedding)
and len(emb1.data) == len(emb2.data)
):
raise InvalidInputException("Invalid input embedding")
if emb1.scoring_mode != emb2.scoring_mode:
raise InvalidInputException("Scoring mode mismatch for input embeddings")
if emb1.scoring_mode == ScoringMode.EnhancedEmbedding:
score = SDK._get_enhanced_score(emb1, emb2)
elif emb1.scoring_mode == ScoringMode.StandardEmbedding:
score = SDK._get_standard_score(emb1, emb2)
else:
raise InvalidInputException(ERR_INVALID_EMB_MODE)
return score
@staticmethod
def get_match_score(emb1: Embedding, emb2: Embedding) -> int:
"""
Compute the difference score of two faces embeddings. A larger number indicates a
greater similarity between the two embeddings; a lower number indicates a
greater difference between the two embeddings.
Accepts 2 embedding objects.
Returns a int between [0, 1000]. If either of the embeddings is None,
or if the embeddings are of different sizes, or if the embeddings
have different scoring_method, raises InvalidInputException
"""
similarity = SDK.get_similarity(emb1, emb2)
match_score = -1
if emb1.scoring_mode == ScoringMode.EnhancedEmbedding:
match_score = round(
utils.sigmoid_transform(
similarity, ENHANCED_MATCH_SCORE_WEIGHT, ENHANCED_MATCH_SCORE_BIAS
)
* MATCH_SCORE_SCALE
)
elif emb1.scoring_mode == ScoringMode.StandardEmbedding:
match_score = round(
utils.sigmoid_transform(
similarity, STANDARD_MATCH_SCORE_WEIGHT, STANDARD_MATCH_SCORE_BIAS
)
* MATCH_SCORE_SCALE
)
else:
raise InvalidInputException(ERR_INVALID_EMB_MODE)
return int(np.clip(match_score, 0, 1000))
def get_confidence(self, emb1: Embedding, emb2: Embedding) -> float:
"""
Deprecated: This will be removed in the next major release. Use the
get_match_score or get_similarity functions instead.
Compute the probability of two faces being the same using the standard mode.
Accepts 2 embedding objects.
Returns a float between [0, 1]. If either of the embeddings is None,
or if the embeddings are of different sizes, or if the embeddings
have different scoring_method, raises InvalidInputException
"""
warnings.warn(
"""get_confidence is deprecated and will be removed in the next major release.
Use the get_match_score or get_similarity functions instead.""",
DeprecationWarning,
)
if emb1 is not None and emb1.scoring_mode == ScoringMode.EnhancedEmbedding:
emb1 = Embedding(emb1.data, ScoringMode.StandardEmbedding)
if emb2 is not None and emb2.scoring_mode == ScoringMode.EnhancedEmbedding:
emb2 = Embedding(emb2.data, ScoringMode.StandardEmbedding)
score = self.get_similarity(emb1, emb2)
return float(utils.sigmoid_transform(score, self._weight, self._bias))

View File

@ -0,0 +1,219 @@
from __future__ import annotations
import paravision.recognition.sdk
import typing
import numpy
import paravision.recognition.types
_Shape = typing.Tuple[int, ...]
__all__ = [
"Metadata",
"SDK"
]
class Metadata():
def __init__(self) -> None: ...
@property
def embedding_size(self) -> int:
"""
The embedding size of the Recognition models being used.
:type: int
"""
@embedding_size.setter
def embedding_size(self, arg0: int) -> None:
"""
The embedding size of the Recognition models being used.
"""
@property
def engine(self) -> str:
"""
The engine or accelerator of the Recognition SDK instance being used.
:type: str
"""
@engine.setter
def engine(self, arg0: str) -> None:
"""
The engine or accelerator of the Recognition SDK instance being used.
"""
@property
def engine_version(self) -> str:
"""
The version of the engine or accelerator being used.
:type: str
"""
@engine_version.setter
def engine_version(self, arg0: str) -> None:
"""
The version of the engine or accelerator being used.
"""
@property
def generation(self) -> int:
"""
The generation of the Recognition SDK.
:type: int
"""
@generation.setter
def generation(self, arg0: int) -> None:
"""
The generation of the Recognition SDK.
"""
@property
def model(self) -> str:
"""
The name of the Recognition models.
:type: str
"""
@model.setter
def model(self, arg0: str) -> None:
"""
The name of the Recognition models.
"""
@property
def model_version(self) -> str:
"""
The version of the Recognition models.
:type: str
"""
@model_version.setter
def model_version(self, arg0: str) -> None:
"""
The version of the Recognition models.
"""
@property
def sdk_version(self) -> str:
"""
The version of the Recognition SDK.
:type: str
"""
@sdk_version.setter
def sdk_version(self, arg0: str) -> None:
"""
The version of the Recognition SDK.
"""
pass
class SDK():
"""
SDK()
A sdk object contains an instance of the Paravision model and its
associated resources.
SDK objects are long-living and do not need to be re-instantiated between
method calls.
"""
def __init__(self, models_dir: typing.Optional[str] = None, settings: typing.Optional[paravision.recognition.types.Settings] = None) -> None:
"""
Create a new SDK instance with settings as a struct
"""
@typing.overload
def get_bounding_boxes(self, imgs: list[numpy.ndarray], image_source: paravision.recognition.types.ImageSource = ImageSource.UNKNOWN) -> paravision.recognition.types.InferenceResult:
"""
Detect bounding boxes of faces in the image, returning a list of Faces.
"""
@typing.overload
def get_bounding_boxes(self, imgs: list[paravision.recognition.types.Image], detection_model: paravision.recognition.types.ImageSource = '') -> paravision.recognition.types.InferenceResult:
"""
Accepts a list of NumPy arrays (images).
"""
@typing.overload
def get_embedding_from_prepared_image(self, prepared_image: numpy.ndarray) -> paravision.recognition.types.Embedding:
"""
Get the embedding for a prepared image.
"""
@typing.overload
def get_embedding_from_prepared_image(self, prepared_image: paravision.recognition.types.Image) -> paravision.recognition.types.Embedding:
"""
Accepts one prepared image (numpy array).
"""
def get_embeddings(self, faces: list[paravision.recognition.types.Face]) -> None:
"""
Get the embeddings for faces.
"""
@typing.overload
def get_embeddings_from_landmarks(self, image: numpy.ndarray, landmarks: list[paravision.recognition.types.Landmarks]) -> list[paravision.recognition.types.Embedding]:
"""
Get the embeddings for faces.
"""
@typing.overload
def get_embeddings_from_landmarks(self, image: paravision.recognition.types.Image, landmarks: list[paravision.recognition.types.Landmarks]) -> list[paravision.recognition.types.Embedding]:
"""
Accepts a NumPy array (image) and a list of landmarks.
"""
@typing.overload
def get_faces(self, imgs: list[numpy.ndarray], qualities: bool = False, landmarks: bool = False, embeddings: bool = False, image_source: paravision.recognition.types.ImageSource = ImageSource.UNKNOWN) -> paravision.recognition.types.InferenceResult:
"""
Detect faces in the image.
"""
@typing.overload
def get_faces(self, imgs: list[paravision.recognition.types.Image], qualities: bool = False, landmarks: bool = False, embeddings: bool = False, image_source: paravision.recognition.types.ImageSource = ImageSource.UNKNOWN) -> paravision.recognition.types.InferenceResult:
"""
Includes bounding boxes, landmarks, and [optionally] image quality
details.
"""
def get_landmarks(self, faces: list[paravision.recognition.types.Face]) -> None:
"""
Get the landmarks for faces.
"""
@typing.overload
def get_landmarks_from_bounding_boxes(self, img: numpy.ndarray, bboxes: list[paravision.recognition.types.BoundingBox]) -> paravision.recognition.types.InferenceResult:
"""
Get the landmarks from a bounding box.
"""
@typing.overload
def get_landmarks_from_bounding_boxes(self, img: paravision.recognition.types.Image, bboxes: list[paravision.recognition.types.BoundingBox]) -> paravision.recognition.types.InferenceResult:
"""
Accepts a NumPy array (image) and a list of bounding boxes.
"""
@staticmethod
def get_match_score(emb1: paravision.recognition.types.Embedding, emb2: paravision.recognition.types.Embedding, scoring_mode: paravision.recognition.types.ScoringMode = ScoringMode.EnhancedEmbedding) -> int:
"""
Compute the difference score of two faces embeddings. A larger number indicates a
greater similarity between the two embeddings; a lower number indicates a
greater difference between the two embeddings.
"""
@staticmethod
def get_metadata(models_dir: typing.Optional[str] = None) -> Metadata:
"""
Returns metadata for SDK and model info.
"""
def get_qualities(self, faces: list[paravision.recognition.types.Face]) -> None:
"""
Get the quality of the faces in the image.
"""
@staticmethod
def get_similarity(emb1: paravision.recognition.types.Embedding, emb2: paravision.recognition.types.Embedding, scoring_mode: paravision.recognition.types.ScoringMode = ScoringMode.EnhancedEmbedding) -> float:
"""
Compute the difference score of two faces embeddings. A larger number indicates a
greater similarity between the two embeddings; a lower number indicates a
greater difference between the two embeddings.
"""
pass

View File

@ -1,142 +0,0 @@
import tensorrt as trt
import os
from pathlib import Path
from ..exceptions import (
InvalidInputException,
ModelLoadingException,
)
from contextlib import ExitStack
LOGGER = trt.Logger(trt.Logger.Severity.ERROR)
DEFAULT_DETECTION_MAX_BATCH_SIZE = 1
DEFAULT_QUALITY_MAX_BATCH_SIZE = 4
DEFAULT_LANDMARKS_MAX_BATCH_SIZE = 4
DEFAULT_RECOGNITION_MAX_BATCH_SIZE = 4
DEFAULT_ATTRIBUTES_MAX_BATCH_SIZE = 4
DEFAULT_MASK_MAX_BATCH_SIZE = 4
NUM_CHANNELS_RGB = 3
MAX_WORKSPACE_SIZE = 1 << 28
trt.init_libnvinfer_plugins(LOGGER, "")
PLUGIN_CREATORS = trt.get_plugin_registry().plugin_creator_list
def _get_max_batch_size(name, settings):
if name == "detection":
# batching is not enabled for detection yet
return DEFAULT_DETECTION_MAX_BATCH_SIZE
if name == "landmarks":
size = settings.get(
"landmarks_max_batch_size", DEFAULT_LANDMARKS_MAX_BATCH_SIZE
)
elif name == "recognition":
size = settings.get(
"recognition_max_batch_size", DEFAULT_RECOGNITION_MAX_BATCH_SIZE
)
elif name == "attributes":
size = settings.get(
"attributes_max_batch_size", DEFAULT_ATTRIBUTES_MAX_BATCH_SIZE
)
elif name == "mask":
size = settings.get("mask_max_batch_size", DEFAULT_MASK_MAX_BATCH_SIZE)
elif name == "quality":
size = settings.get("quality_max_batch_size", DEFAULT_QUALITY_MAX_BATCH_SIZE)
else:
raise InvalidInputException("Batch size is not specified")
return size
def build_engine(name, models_dir, models_type, engine_path, settings, shape):
if name == "mask":
model_file = os.path.join(models_dir, models_type, f"{name}.onnx")
else:
model_file = os.path.join(models_dir, name, models_type, f"{name}.onnx")
batch_size = _get_max_batch_size(name, settings)
trt_version = int(trt.__version__.split(".")[0])
if trt_version >= 8:
# -1 indicates dynamic batching. Does not work for detection model currently
input_shape = [
batch_size if name == "detection" else -1,
NUM_CHANNELS_RGB,
] + list(shape)
net_flags = 1 << (int)(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
else:
raise ModelLoadingException(
"TensorRT version 8 or higher required to build engine"
)
if not os.path.isfile(model_file):
return None
with ExitStack() as stack:
builder = stack.enter_context(trt.Builder(LOGGER))
config = stack.enter_context(builder.create_builder_config())
network = stack.enter_context(builder.create_network(net_flags))
parser = stack.enter_context(trt.OnnxParser(network, LOGGER))
success = parser.parse_from_file(model_file)
if not success:
raise ModelLoadingException(f"Cannot parse {name} model.")
builder.max_batch_size = batch_size
config.max_workspace_size = MAX_WORKSPACE_SIZE
profile = _create_opt_profile(builder, network, batch_size)
config.add_optimization_profile(profile)
network.get_input(0).shape = input_shape
serialized_engine = builder.build_serialized_network(network, config)
if serialized_engine is None:
raise ModelLoadingException(f"Cannot serialize {name} engine.")
engine_dir = Path(engine_path).parent
engine_dir.mkdir(parents=True, exist_ok=True)
with open(engine_path, "wb") as f:
f.write(serialized_engine)
return serialized_engine
def _create_opt_profile(builder, network, max_batch_size):
profile = builder.create_optimization_profile()
if network.num_inputs <= 0:
return profile
input_ = network.get_input(0)
min_shape = trt.Dims(input_.shape)
min_shape[0] = 1
opt_shape = trt.Dims(input_.shape)
opt_shape[0] = max_batch_size
max_shape = trt.Dims(input_.shape)
max_shape[0] = max_batch_size
profile.set_shape(input_.name, min_shape, opt_shape, max_shape)
return profile
def load_engine(name, engine_path, models_dir, models_type, settings, input_shape):
if not os.path.isfile(engine_path):
serialized_engine = build_engine(
name, models_dir, models_type, engine_path, settings, input_shape
)
else:
with open(engine_path, "rb") as f:
serialized_engine = f.read()
if not serialized_engine:
raise ModelLoadingException(f"Cannot build {name} engine.")
runtime = trt.Runtime(LOGGER)
return runtime.deserialize_cuda_engine(serialized_engine)

View File

@ -1,462 +0,0 @@
import os
import importlib
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit # noqa
from .. import _utils as utils
from ..exceptions import (
ModelLoadingException,
)
from . import utils as trt_utils
from .builder import load_engine
QUALITIES_QUALITIES_NAME = "qualities"
QUALITIES_ACCEPTABILTIES_NAME = "acceptabilities"
LANDMARKS_LANDMARKS_NAME = "landmarks"
ATTRIBUTES_AGES_NAME = "age_probs"
ATTRIBUTES_GENDERS_NAME = "gender_probs"
UNIT_LOWER_LIMIT = 0
UNIT_UPPER_LIMIT = 1
ERR_ENGINE_UNINITIALIZED = "The engine is not initialized."
ERR_MASK_MODEL_NOT_LOADED = "Mask model not loaded."
FD_NAME = "detection"
LM_NAME = "landmarks"
QL_NAME = "quality"
FR_NAME = "recognition"
AT_NAME = "attributes"
MD_NAME = "mask"
ENGINE_EXT = ".engine"
class Engine:
def __init__(self, models_dir, settings):
engine_dirpath = models_dir
try:
paravision_models = importlib.import_module("paravision_models")
if paravision_models.location() == models_dir:
engine_dirpath = paravision_models.TRT_ENGINE_PATH
except (ModuleNotFoundError, AttributeError):
pass
self.stream = cuda.Stream()
(
fd_model_type,
lm_model_type,
ql_model_type,
fr_model_type,
at_model_type,
md_model_type,
) = utils.get_model_types(settings)
self.fd_input_shape = utils.read_fd_input_shape(models_dir, fd_model_type)
fd_engine_path = os.path.join(
engine_dirpath, FD_NAME, fd_model_type, FD_NAME + ENGINE_EXT
)
self.fd_engine = load_engine(
FD_NAME,
fd_engine_path,
models_dir,
fd_model_type,
settings,
self.fd_input_shape,
)
if self.fd_engine:
self.fd_context = self.fd_engine.create_execution_context()
(
self.fd_inputs,
self.fd_outputs,
self.fd_data,
self.fd_bindings,
) = trt_utils.allocate_buffers(self.fd_engine)
self.lm_input_shape = utils.read_lm_input_shape(models_dir)
lm_engine_path = os.path.join(
engine_dirpath, LM_NAME, lm_model_type, LM_NAME + ENGINE_EXT
)
self.lm_engine = load_engine(
LM_NAME,
lm_engine_path,
models_dir,
lm_model_type,
settings,
self.lm_input_shape,
)
if self.lm_engine:
self.lm_context = self.lm_engine.create_execution_context()
(
self.lm_inputs,
self.lm_outputs,
self.lm_data,
self.lm_bindings,
) = trt_utils.allocate_buffers(self.lm_engine)
self.ql_input_shape = utils.read_lm_input_shape(models_dir)
ql_engine_path = os.path.join(
engine_dirpath, QL_NAME, ql_model_type, QL_NAME + ENGINE_EXT
)
self.ql_engine = load_engine(
QL_NAME,
ql_engine_path,
models_dir,
ql_model_type,
settings,
self.ql_input_shape,
)
if self.ql_engine:
self.ql_context = self.ql_engine.create_execution_context()
(
self.ql_inputs,
self.ql_outputs,
self.ql_data,
self.ql_bindings,
) = trt_utils.allocate_buffers(self.ql_engine)
self.fr_input_shape = utils.read_fr_input_shape(models_dir)
fr_engine_path = os.path.join(
engine_dirpath, FR_NAME, fr_model_type, FR_NAME + ENGINE_EXT
)
self.fr_engine = load_engine(
FR_NAME,
fr_engine_path,
models_dir,
fr_model_type,
settings,
self.fr_input_shape,
)
if self.fr_engine:
self.fr_context = self.fr_engine.create_execution_context()
(
self.fr_inputs,
self.fr_outputs,
self.fr_data,
self.fr_bindings,
) = trt_utils.allocate_buffers(self.fr_engine)
self.fr_output_shape = utils.read_fr_output_shape(models_dir)
self.at_input_shape = utils.read_at_input_shape(models_dir)
at_engine_path = os.path.join(
engine_dirpath, AT_NAME, at_model_type, AT_NAME + ENGINE_EXT
)
self.at_engine = load_engine(
AT_NAME,
at_engine_path,
models_dir,
at_model_type,
settings,
self.at_input_shape,
)
if self.at_engine:
self.at_context = self.at_engine.create_execution_context()
(
self.at_inputs,
self.at_outputs,
self.at_data,
self.at_bindings,
) = trt_utils.allocate_buffers(self.at_engine)
# Mask input image is prepared separately as the shape can deviate from landmark input images.
if "mask" in settings:
md_model_path = settings["mask"]["models_dir"]
md_engine_path = os.path.join(
md_model_path, md_model_type, MD_NAME + ENGINE_EXT
)
self.md_input_shape = utils.read_md_input_shape(models_dir)
self.md_engine = load_engine(
MD_NAME,
md_engine_path,
md_model_path,
md_model_type,
settings,
self.md_input_shape,
)
if self.md_engine:
self.md_context = self.md_engine.create_execution_context()
(
self.md_inputs,
self.md_outputs,
self.md_data,
self.md_bindings,
) = trt_utils.allocate_buffers(self.md_engine)
self.mask_enabled = True
else:
self.mask_enabled = False
def predict_bounding_boxes(self, np_imgs):
"""
Args:
np_imgs: (list) list of images loaded in numpy, of format (1, H, W, C)
Returns:
bboxes: (list) list containing arrays of bboxes for each image
in order [x1, y1, x2, y2], scaled between 0, 1
confs: (list) list containing arrays of confidences scores
of the faces for each image
"""
if not self.fd_engine:
raise ModelLoadingException(ERR_ENGINE_UNINITIALIZED)
max_batch_size = self.fd_engine.max_batch_size
bboxes, confidences, img_idxs = [], [], []
for i in range(0, len(np_imgs), max_batch_size):
batch = np_imgs[i : min(len(np_imgs), i + max_batch_size)]
(
bboxes_batch,
confidences_batch,
img_idxs_batch,
) = self._batch_predict_bounding_boxes(batch)
bboxes.extend(bboxes_batch)
confidences.extend(confidences_batch)
img_idxs.extend(img_idxs_batch + i)
bboxes = np.asarray(bboxes).reshape(-1, 4)
confidences = np.asarray(confidences).reshape(-1)
return bboxes, confidences, img_idxs
def _batch_predict_bounding_boxes(self, np_imgs):
np_imgs = np.transpose(np.asarray(np_imgs), [0, 3, 1, 2]).astype(np.float32)
batch_size = len(np_imgs)
results = trt_utils.do_inference(
self.fd_context,
bindings=self.fd_bindings,
inputs=self.fd_inputs,
input_data=np_imgs,
outputs=self.fd_outputs,
output_data=self.fd_data,
stream=self.stream,
batch_size=batch_size,
)
num_detections = int(results[0])
bboxes = results[1].reshape(-1, 4)[:num_detections]
scores = results[2][:num_detections].tolist()
indexes = results[3][:num_detections].astype(np.int32)
return bboxes, scores, indexes
def predict_landmarks(self, np_imgs):
"""
Args:
np_imgs: (list) imgs loaded in numpy of format (1, H, W, C)
Returns:
qualities: (numpy array) qualities values between 0 and 1
lmks: (numpy array) landmarks in the shape of (N, 5, 2)
acceptabilities: (numpy array) acceptabilities values between 0 and 1
"""
if not self.lm_engine:
raise ModelLoadingException(ERR_ENGINE_UNINITIALIZED)
max_batch_size = self.lm_engine.max_batch_size
lmks = []
for i in range(0, len(np_imgs), max_batch_size):
batch = np_imgs[i : min(len(np_imgs), i + max_batch_size)]
lmks_batch = self._batch_predict_landmarks(batch)
lmks.extend(lmks_batch)
return np.asarray(lmks)
def _batch_predict_landmarks(self, np_imgs):
np_imgs = np.transpose(np_imgs, [0, 3, 1, 2]).astype(np.float32)
batch_size = len(np_imgs)
results = trt_utils.do_inference(
self.lm_context,
bindings=self.lm_bindings,
inputs=self.lm_inputs,
input_data=np_imgs,
outputs=self.lm_outputs,
output_data=self.lm_data,
stream=self.stream,
batch_size=batch_size,
)
# because we pre-allocating the buffer to accomodate the max batch size,
# the last elements of the results will be 0 unless we're finding
# landmarks for max_batch_size faces, so we need to explicitly grab
# the elements we want
landmarks = results[self.lm_engine[LANDMARKS_LANDMARKS_NAME] - 1].reshape(
-1, 10
)[:batch_size]
return landmarks
def predict_embeddings(self, np_imgs):
"""
Args:
np_imgs: (list) list of images loaded in numpy of format (1, H, W, C)
Returns:
embs: (numpy array) array of embedding arrays
"""
if not self.fr_engine:
raise ModelLoadingException(ERR_ENGINE_UNINITIALIZED)
max_batch_size = self.fr_engine.max_batch_size
batch_size = len(np_imgs)
embeddings = []
for i in range(0, batch_size, max_batch_size):
batch = np_imgs[i : min(batch_size, i + max_batch_size)]
embs = self._batch_predict_embeddings(batch)
embeddings.extend(embs)
return np.asarray(embeddings).reshape(batch_size, -1)
def _batch_predict_embeddings(self, np_imgs):
np_imgs = np.transpose(np_imgs, [0, 3, 1, 2]).astype(np.float32)
batch_size = len(np_imgs)
results = trt_utils.do_inference(
self.fr_context,
bindings=self.fr_bindings,
inputs=self.fr_inputs,
input_data=np_imgs,
outputs=self.fr_outputs,
output_data=self.fr_data,
stream=self.stream,
batch_size=batch_size,
)
return results[0]
def predict_attributes(self, np_imgs):
if not self.at_engine:
raise ModelLoadingException(ERR_ENGINE_UNINITIALIZED)
max_batch_size = self.at_engine.max_batch_size
batch_size = len(np_imgs)
all_ages, all_genders = [], []
for i in range(0, batch_size, max_batch_size):
batch = np_imgs[i : min(batch_size, i + max_batch_size)]
ages, genders = self._batch_predict_attributes(batch)
all_ages.extend(ages)
all_genders.extend(genders)
return all_ages, all_genders
def _batch_predict_attributes(self, np_imgs):
"""
Args:
np_img: (numpy array) img loaded in numpy of format (1, H, W, C)
Returns:
age_probs: (numpy array) age probabilities in the shape of (N, 1, 7)
gender_probs: (numpy array) gender probabilities in the shape of (N, 1, 2)
"""
np_imgs = np.transpose(np_imgs, [0, 3, 1, 2]).astype(np.float32)
batch_size = len(np_imgs)
results = trt_utils.do_inference(
self.at_context,
bindings=self.at_bindings,
inputs=self.at_inputs,
input_data=np_imgs,
outputs=self.at_outputs,
output_data=self.at_data,
batch_size=batch_size,
stream=self.stream,
)
ages = results[self.at_engine[ATTRIBUTES_AGES_NAME] - 1].reshape(-1, 7)[
:batch_size
]
genders = results[self.at_engine[ATTRIBUTES_GENDERS_NAME] - 1].reshape(-1, 2)[
:batch_size
]
return [ages, genders]
def get_qualities(self, np_imgs):
"""
Args:
np_imgs: (list) imgs loaded in numpy of format (1, H, W, C)
Returns:
qualities: (numpy array) qualities values between 0 and 1
"""
if not self.ql_engine:
raise ModelLoadingException(ERR_ENGINE_UNINITIALIZED)
max_batch_size = self.ql_engine.max_batch_size
qualities, acceptabilities = [], []
for i in range(0, len(np_imgs), max_batch_size):
batch = np_imgs[i : min(len(np_imgs), i + max_batch_size)]
qualities_batch, acceptabilities_batch = self._batch_get_qualities(batch)
qualities.extend(qualities_batch)
acceptabilities.extend(acceptabilities_batch)
return (
np.clip(qualities, UNIT_LOWER_LIMIT, UNIT_UPPER_LIMIT),
np.clip(acceptabilities, UNIT_LOWER_LIMIT, UNIT_UPPER_LIMIT),
)
def _batch_get_qualities(self, np_imgs):
np_imgs = np.transpose(np_imgs, [0, 3, 1, 2]).astype(np.float32)
batch_size = len(np_imgs)
results = trt_utils.do_inference(
self.ql_context,
bindings=self.ql_bindings,
inputs=self.ql_inputs,
input_data=np_imgs,
outputs=self.ql_outputs,
output_data=self.ql_data,
stream=self.stream,
batch_size=batch_size,
)
qualities = results[self.ql_engine[QUALITIES_QUALITIES_NAME] - 1][:batch_size]
acceptabilities = results[self.ql_engine[QUALITIES_ACCEPTABILTIES_NAME] - 1][
:batch_size
]
return qualities, acceptabilities
def check_for_masks(self, np_imgs):
if not self.md_engine:
raise ModelLoadingException(ERR_MASK_MODEL_NOT_LOADED)
max_batch_size = self.md_engine.max_batch_size
batch_size = len(np_imgs)
mask_probabilities = []
for i in range(0, batch_size, max_batch_size):
batch = np_imgs[i : min(batch_size, i + max_batch_size)]
mask_probabilities.extend(self._batch_check_for_masks(batch))
return np.asarray(mask_probabilities)
def _batch_check_for_masks(self, np_imgs):
"""
Args:
np_imgs: (list) imgs loaded in numpy of format (1, H, W, C)
Returns:
mask_probs: (numpy array) mask probabilities in the shape of (N, 1, 1)
"""
np_imgs = np.transpose(np_imgs, [0, 3, 1, 2]).astype(np.float32)
results = trt_utils.do_inference(
self.md_context,
bindings=self.md_bindings,
inputs=self.md_inputs,
input_data=np_imgs,
outputs=self.md_outputs,
output_data=self.md_data,
stream=self.stream,
batch_size=len(np_imgs),
)
return results[0]

View File

@ -1,36 +0,0 @@
IMG_NOFACE = "noface.jpg"
IMG_ONEFACE = "oneface.jpg"
IMG_MANYFACES = "manyfaces.jpg"
IMG_ONEFACE_RECO_INPUT_IMG = "recognition_input_image_openvino.png"
IMG_IDENTITY1_FACE1 = "bhargav.jpg"
IMG_IDENTITY1_FACE2 = "bhargav-3.jpg"
ERR_MISSING_BBOX = "missing bounding box values"
ERR_MISSING_SCORE = "missing score value"
ERR_MISSING_LANDMARKS = "missing landmarks values"
ERR_MISSING_EMBEDDING = "missing embedding value"
ERR_MISSING_MASK_PROB = "missing mask probability value"
ERR_MISSING_FACES = "missing faces"
ERR_UNEXPECTED_LANDMARKS = "unexpected landmarks found"
ERR_UNEXPECTED_QUALITY = "unexpected quality found"
ERR_UNEXPECTED_NUM_FACES = "unexpected number of faces found"
ERR_UNEXPECTED_NUM_INFERENCES = "unexpected number of image inferences found"
ERR_UNEXPECTED_AGES = "unexpected ages found"
ERR_UNEXPECTED_GENDERS = "unexpected genders found"
ERR_UNEXPECTED_AGE = "unexpected age found"
ERR_UNEXPECTED_GENDER = "unexpected gender found"
ERR_INVALID_MASK_PROB = "invalid mask probability value"
ERR_INVALID_MPF = "invalid most prominent face"
ERR_INVALID_SCORING_MODE = "invalid scoring mode"
ERR_INVALID_EMBEDDING_SIZE = "invalid embedding size"
ERR_INVALID_AGES = "invalid ages"
ERR_JSON_FACE = "face is not JSON serializable"
MAX_NO_MASK_SCORE = 0.5
MASK_SCORE = 0.95
EXPECTED_ENHANCED_EMBED_LEN = 257
EXPECTED_STANDARD_EMBED_LEN = 256

View File

@ -1,495 +0,0 @@
import os
import numpy as np
from unittest import TestCase
from ..sdk import SDK
from ..types import ImageInferenceData
from ..engine import Engine
from ..utils import load_image
from ..types import BoundingBox, ScoringMode, Embedding
from ..exceptions import InvalidInputException
from .utils import is_json_serializable
from .constants import (
IMG_NOFACE,
IMG_ONEFACE,
IMG_MANYFACES,
IMG_IDENTITY1_FACE1,
IMG_IDENTITY1_FACE2,
IMG_ONEFACE_RECO_INPUT_IMG,
ERR_MISSING_BBOX,
ERR_MISSING_SCORE,
ERR_MISSING_LANDMARKS,
ERR_MISSING_EMBEDDING,
ERR_MISSING_MASK_PROB,
ERR_MISSING_FACES,
ERR_JSON_FACE,
ERR_UNEXPECTED_LANDMARKS,
ERR_UNEXPECTED_QUALITY,
ERR_UNEXPECTED_NUM_FACES,
ERR_UNEXPECTED_NUM_INFERENCES,
ERR_UNEXPECTED_AGES,
ERR_UNEXPECTED_GENDERS,
ERR_UNEXPECTED_AGE,
ERR_UNEXPECTED_GENDER,
ERR_INVALID_MASK_PROB,
ERR_INVALID_MPF,
MAX_NO_MASK_SCORE,
MASK_SCORE,
ERR_INVALID_SCORING_MODE,
ERR_INVALID_EMBEDDING_SIZE,
ERR_INVALID_AGES,
EXPECTED_ENHANCED_EMBED_LEN,
)
ASSETS_PATH = os.path.join(os.path.dirname(__file__), "assets")
engine_default = None
scoring_mode = None
sdk = None
class TestSDK(TestCase):
@classmethod
def setUpClass(cls):
global sdk
global engine_default
global scoring_mode
engine_default = Engine.OPENVINO
scoring_mode = ScoringMode.EnhancedEmbedding
sdk = SDK(engine=engine_default, settings={"scoring_mode": scoring_mode})
def setUp(self):
self.sdk = sdk
def test_load_image_invalid_input(self):
with self.assertRaises(InvalidInputException):
load_image("invalid-img.jpg")
def test_empty_case(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_NOFACE))]
detection_result = self.sdk.get_faces(
imgs, qualities=True, landmarks=True, embeddings=True
)
faces = detection_result.faces
self.assertEqual(len(faces), 0, msg=ERR_UNEXPECTED_NUM_FACES)
image_inferences = detection_result.image_inferences
self.assertEqual(len(image_inferences), 1, msg=ERR_UNEXPECTED_NUM_INFERENCES)
detection_result = self.sdk.get_bounding_boxes(imgs)
self.assertEqual(len(detection_result.faces), 0, msg=ERR_UNEXPECTED_NUM_FACES)
def test_get_faces(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_faces(
imgs, qualities=True, landmarks=True, embeddings=True
)
faces = detection_result.faces
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
image_inferences = detection_result.image_inferences
self.assertEqual(len(image_inferences), 1, msg=ERR_UNEXPECTED_NUM_INFERENCES)
self.assert_faces(faces)
def test_get_faces_multiple(self):
oneface_img = load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))
noface_img = load_image(os.path.join(ASSETS_PATH, IMG_NOFACE))
manyface_img = load_image(os.path.join(ASSETS_PATH, IMG_MANYFACES))
imgs = [oneface_img, noface_img, manyface_img]
detection_result = self.sdk.get_faces(
imgs, qualities=True, landmarks=True, embeddings=True
)
faces = detection_result.faces
self.assertEqual(len(faces), 9, msg=ERR_UNEXPECTED_NUM_FACES)
self.assert_faces(faces)
image_inferences = detection_result.image_inferences
self.assertEqual(len(image_inferences), 3, msg=ERR_UNEXPECTED_NUM_INFERENCES)
expected_num_faces = [1, 0, 8]
for i, faces in enumerate(expected_num_faces):
self.assertEqual(
len(image_inferences[i].faces),
faces,
msg=f"unexpected number of faces found in image inference {i}",
)
def test_get_attributes(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_IDENTITY1_FACE1))]
detection_result = self.sdk.get_faces(imgs, qualities=True, landmarks=True)
faces = detection_result.faces
self.assertIsNotNone(faces, msg=ERR_MISSING_FACES)
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
face = faces[0]
self.assertIsNone(face.ages, msg=ERR_UNEXPECTED_AGES)
self.assertIsNone(face.genders, msg=ERR_UNEXPECTED_GENDERS)
self.assertIsNone(face.age, msg=ERR_UNEXPECTED_AGE)
self.assertIsNone(face.gender, msg=ERR_UNEXPECTED_GENDER)
self.sdk.get_attributes(faces)
self.assertIsNotNone(face.ages, msg="missing ages")
self.assertIsNotNone(face.genders, msg="missing genders")
self.assertIsNotNone(face.age, msg="missing age")
self.assertTrue(face.age == "20-30", msg="incorrect age")
self.assertIsNotNone(face.gender, msg="missing gender")
self.assertTrue(face.gender == "male", msg="incorrect gender")
self.assertTrue(face.ages[2] > face.ages[0], msg=ERR_INVALID_AGES)
self.assertTrue(face.ages[2] > face.ages[1], msg=ERR_INVALID_AGES)
self.assertTrue(face.ages[2] > face.ages[3], msg=ERR_INVALID_AGES)
self.assertTrue(face.ages[2] > face.ages[4], msg=ERR_INVALID_AGES)
self.assertTrue(face.ages[2] > face.ages[5], msg=ERR_INVALID_AGES)
self.assertTrue(face.ages[2] > face.ages[6], msg=ERR_INVALID_AGES)
self.assertTrue(face.genders[0] > face.genders[1], msg="invalid genders")
self.assertTrue(is_json_serializable(face.asdict()), msg=ERR_JSON_FACE)
def test_get_qualities(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_IDENTITY1_FACE1))]
faces = self.sdk.get_faces(imgs).faces
self.sdk.get_qualities(faces)
self.assertAlmostEqual(faces[0].quality, 0.925, delta=0.001)
self.assertAlmostEqual(faces[0].acceptability, 0.999, delta=0.001)
self.assertTrue(is_json_serializable(faces[0].asdict()), msg=ERR_JSON_FACE)
def test_get_faces_qualties(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_IDENTITY1_FACE1))]
faces = self.sdk.get_faces(imgs, qualities=True).faces
self.assertAlmostEqual(faces[0].quality, 0.925, delta=0.001)
self.assertTrue(is_json_serializable(faces[0].asdict()), msg=ERR_JSON_FACE)
def test_get_bounding_boxes(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_bounding_boxes(imgs)
faces = detection_result.faces
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
f = faces[0]
self.assertIsNotNone(f.bounding_box, msg=ERR_MISSING_BBOX)
self.assertIsNotNone(f.bounding_box.score, msg=ERR_MISSING_SCORE)
self.assertIsNone(f.landmarks, msg=ERR_UNEXPECTED_LANDMARKS)
self.assertIsNone(f.quality, msg=ERR_UNEXPECTED_QUALITY)
self.assertIsNone(f.acceptability, msg="unexpected acceptability")
self.assertIsNone(f.embedding, msg="unexpected embedding")
self.assertTrue(is_json_serializable(f.asdict()), msg=ERR_JSON_FACE)
def test_get_landmarks(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_faces(imgs)
faces = detection_result.faces
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
self.assertIsNone(faces[0].landmarks, msg=ERR_UNEXPECTED_LANDMARKS)
self.sdk.get_landmarks(faces)
self.assertIsNotNone(faces[0].landmarks, msg=ERR_MISSING_LANDMARKS)
self.assertTrue(is_json_serializable(faces[0].asdict()), msg=ERR_JSON_FACE)
def test_get_landmarks_from_bounding_box(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_bounding_boxes(imgs)
faces = detection_result.faces
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
self.assertIsNotNone(faces[0].bounding_box, msg=ERR_MISSING_BBOX)
self.assertIsNone(faces[0].landmarks, msg=ERR_UNEXPECTED_LANDMARKS)
bbox = faces[0].bounding_box
bounding_box = BoundingBox(
bbox.origin.x,
bbox.origin.y,
bbox.origin.x + bbox.width,
bbox.origin.y + bbox.height,
)
result = self.sdk.get_landmarks_from_bounding_boxes(imgs[0], [bounding_box])
self.assertIsNotNone(result.faces[0].landmarks, msg=ERR_MISSING_LANDMARKS)
self.assertTrue(
is_json_serializable(result.faces[0].asdict()), msg=ERR_JSON_FACE
)
def test_get_embeddings(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_faces(imgs, qualities=True, landmarks=True)
faces = detection_result.faces
self.sdk.get_embeddings(faces)
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
f = faces[0]
self.assertIsNotNone(f.bounding_box, msg=ERR_MISSING_BBOX)
self.assertIsNotNone(f.landmarks, msg=ERR_MISSING_LANDMARKS)
self.assertIsNotNone(f.embedding, msg=ERR_MISSING_EMBEDDING)
self.assertEqual(
f.embedding.scoring_mode,
ScoringMode.EnhancedEmbedding,
msg=ERR_INVALID_SCORING_MODE,
)
self.assertTrue(is_json_serializable(f.asdict()), msg=ERR_JSON_FACE)
def test_get_embedding_from_landmarks(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_faces(imgs, embeddings=True)
faces = detection_result.faces
f = faces[0]
landmarks = f.landmarks
embeddings = self.sdk.get_embeddings_from_landmarks(
imgs[0], [landmarks, landmarks]
)
self.assertEqual(len(embeddings), 2)
embedding = embeddings[0]
self.assertTrue(embedding.scoring_mode == ScoringMode.EnhancedEmbedding)
similarity = SDK.get_similarity(f.embedding, embedding)
self.assertAlmostEqual(similarity, 1.51, delta=0.01)
def test_check_for_mask(self):
imgs = [load_image(os.path.join(ASSETS_PATH, "woman-wearing-mask.jpg"))]
detection_result = self.sdk.get_bounding_boxes(imgs)
faces = detection_result.faces
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
self.sdk.get_masks(faces)
f = faces[0]
self.assertIsNotNone(f.mask, msg=ERR_MISSING_MASK_PROB)
self.assertTrue(f.mask >= MASK_SCORE, msg=ERR_INVALID_MASK_PROB)
self.assertTrue(is_json_serializable(f.asdict()), msg=ERR_JSON_FACE)
def test_check_for_no_mask(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_bounding_boxes(imgs)
faces = detection_result.faces
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
self.sdk.get_masks(faces)
f = faces[0]
self.assertIsNotNone(f.mask, msg=ERR_MISSING_MASK_PROB)
self.assertTrue(f.mask < MAX_NO_MASK_SCORE, msg=ERR_INVALID_MASK_PROB)
self.assertTrue(is_json_serializable(f.asdict()), msg=ERR_JSON_FACE)
def test_check_for_no_mask_in_many_faces(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_MANYFACES))]
detection_result = self.sdk.get_bounding_boxes(imgs)
faces = detection_result.faces
self.assertTrue(len(faces) > 1, msg=ERR_UNEXPECTED_NUM_FACES)
self.sdk.get_masks(faces)
for f in faces:
self.assertIsNotNone(f.mask, msg=ERR_MISSING_MASK_PROB)
self.assertTrue(f.mask < MAX_NO_MASK_SCORE, msg=ERR_INVALID_MASK_PROB)
def test_get_most_prominent_face_index_oneface(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
infer_result = self.sdk.get_bounding_boxes(imgs)
self.assertTrue(
len(infer_result.image_inferences) == 1, msg=ERR_UNEXPECTED_NUM_INFERENCES
)
self.assertNotEqual(len(infer_result.faces), 0, msg=ERR_UNEXPECTED_NUM_FACES)
infer_image = infer_result.image_inferences[0]
index = infer_image.most_prominent_face_index()
self.assertTrue(index == 0, msg=ERR_INVALID_MPF)
def test_get_most_prominent_face_index_manyfaces(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_MANYFACES))]
infer_result = self.sdk.get_bounding_boxes(imgs)
self.assertTrue(
len(infer_result.image_inferences) == 1, msg=ERR_UNEXPECTED_NUM_INFERENCES
)
self.assertTrue(len(infer_result.faces) > 0, msg=ERR_UNEXPECTED_NUM_FACES)
infer_image = infer_result.image_inferences[0]
index = infer_image.most_prominent_face_index()
self.assertTrue(index == 3, msg=ERR_INVALID_MPF)
def test_get_most_prominent_face_index_noface(self):
infer_image = ImageInferenceData(128, 128)
index = infer_image.most_prominent_face_index()
self.assertTrue(index == -1, msg=ERR_INVALID_MPF)
def test_get_most_prominent_face_index_invalid_image_dims(self):
infer_image = ImageInferenceData(0, 0)
index = infer_image.most_prominent_face_index()
self.assertTrue(index == -1, msg=ERR_INVALID_MPF)
def test_scoring_same_image(self):
img = load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))
faces = self.sdk.get_faces([img, img], embeddings=True).faces
similarity = SDK.get_similarity(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(similarity, 1.51, delta=0.01)
confidence = self.sdk.get_confidence(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(confidence, 1.0, delta=0.01)
match_score = SDK.get_match_score(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(match_score, 951, delta=2)
def test_scoring_same_identity(self):
img1 = load_image(os.path.join(ASSETS_PATH, IMG_IDENTITY1_FACE1))
img2 = load_image(os.path.join(ASSETS_PATH, IMG_IDENTITY1_FACE2))
faces = self.sdk.get_faces([img1, img2], embeddings=True).faces
similarity = SDK.get_similarity(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(similarity, 0.788, delta=0.001)
confidence = self.sdk.get_confidence(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(confidence, 1.0, delta=0.01)
match_score = SDK.get_match_score(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(match_score, 788, delta=2)
def test_scoring_diff_identity(self):
img1 = load_image(os.path.join(ASSETS_PATH, IMG_IDENTITY1_FACE1))
img2 = load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))
faces = self.sdk.get_faces([img1, img2], embeddings=True).faces
similarity = SDK.get_similarity(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(similarity, 0.05, delta=0.01)
confidence = self.sdk.get_confidence(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(confidence, 0, delta=0.01)
match_score = SDK.get_match_score(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(match_score, 403, delta=2)
def test_get_confidence_invalid_faces(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_MANYFACES))]
faces = self.sdk.get_faces(imgs).faces
with self.assertRaises(InvalidInputException):
self.sdk.get_confidence(faces[0].embedding, faces[1].embedding)
def test_get_similarity_no_embedding(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_MANYFACES))]
faces = self.sdk.get_faces(imgs).faces
with self.assertRaises(InvalidInputException):
SDK.get_similarity(faces[0].embedding, faces[1].embedding)
def test_multi_inference_images(self):
imgs = [
load_image(os.path.join(ASSETS_PATH, IMG_MANYFACES)),
load_image(os.path.join(ASSETS_PATH, IMG_MANYFACES)),
load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE)),
]
infer_result = self.sdk.get_bounding_boxes(imgs)
self.assertTrue(
len(infer_result.image_inferences) == 3, msg=ERR_UNEXPECTED_NUM_INFERENCES
)
self.assertTrue(
len(infer_result.image_inferences[0].faces)
+ len(infer_result.image_inferences[1].faces)
+ len(infer_result.image_inferences[2].faces)
== len(infer_result.faces),
msg="inference image data mismatches faces len",
)
def test_inference_image_data(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
infer_result = self.sdk.get_bounding_boxes(imgs)
faces = infer_result.faces
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
self.sdk.get_qualities(faces)
self.assertAlmostEqual(faces[0].quality, 0.895, delta=0.001)
self.assertTrue(
infer_result.image_inferences[0].faces[0].quality == faces[0].quality,
msg="image inference data and face mismatch",
)
def test_check_embedding(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
ground_truth = np.load(
os.path.join(ASSETS_PATH, "oneface_gen5_fast_enhanced_embedding.npy")
)
detection_result = self.sdk.get_faces(imgs, qualities=True, landmarks=True)
faces = detection_result.faces
self.sdk.get_embeddings(faces)
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
f = faces[0]
self.assertEqual(
len(f.embedding.data), len(ground_truth), msg="Mismatched embedding size"
)
self.assertTrue(
np.allclose(f.embedding.data, ground_truth, rtol=0, atol=35e-4),
msg="Invalid embedding value",
)
self.assertTrue(is_json_serializable(f.asdict()), msg=ERR_JSON_FACE)
def test_get_embedding_from_prepared_image(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_faces(imgs, embeddings=True)
faces = detection_result.faces
f = faces[0]
reco_img = load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE_RECO_INPUT_IMG))
embedding = self.sdk.get_embedding_from_prepared_image(reco_img)
self.assertTrue(len(embedding.data) == EXPECTED_ENHANCED_EMBED_LEN)
self.assertTrue(embedding.scoring_mode == scoring_mode)
self.assertTrue(
np.allclose(f.embedding.data, embedding.data, rtol=0, atol=0.001),
msg="Invalid embedding value",
)
def test_get_embedding_from_prepared_image_none(self):
with self.assertRaises(InvalidInputException):
self.sdk.get_embedding_from_prepared_image(None)
def assert_faces(self, faces):
for f in faces:
self.assertIsNotNone(f.bounding_box, msg=ERR_MISSING_BBOX)
self.assertIsNotNone(f.landmarks, msg=ERR_MISSING_LANDMARKS)
self.assertIsNotNone(f.quality, msg="missing quality")
self.assertIsNotNone(f.acceptability, msg="missing acceptability")
self.assertIsNotNone(
f.recognition_input_image, msg="missing recognition input image"
)
self.assertIsNotNone(
f.landmarks_input_image, msg="missing landmarks input image"
)
self.assertIsNotNone(
f.landmarks_input_bounding_box,
msg="missing landmarks input bounding box",
)
self.assertIsNotNone(f.alignment_image, msg="missing alignment image")
self.assertIsNotNone(
f.alignment_bounding_box, msg="missing alignment bounding box"
)
self.assertIsNotNone(f.embedding, msg=ERR_MISSING_EMBEDDING)
self.assertEqual(
f.embedding.scoring_mode,
ScoringMode.EnhancedEmbedding,
msg=ERR_INVALID_SCORING_MODE,
)
self.assertTrue(
len(f.embedding.data) in Embedding.ENHANCED_SIZES,
msg=ERR_INVALID_EMBEDDING_SIZE,
)
self.assertIsNone(f.ages, msg=ERR_UNEXPECTED_AGES)
self.assertIsNone(f.genders, msg=ERR_UNEXPECTED_GENDERS)
self.assertIsNone(f.age, msg=ERR_UNEXPECTED_AGE)
self.assertIsNone(f.gender, msg=ERR_UNEXPECTED_GENDER)
self.assertTrue(is_json_serializable(f.asdict()), msg=ERR_JSON_FACE)

View File

@ -1,141 +0,0 @@
import os
import numpy as np
from unittest import TestCase
from ..sdk import SDK
from ..engine import Engine
from ..utils import load_image
from ..types import ScoringMode, Embedding
from .constants import (
IMG_ONEFACE,
IMG_IDENTITY1_FACE1,
IMG_IDENTITY1_FACE2,
IMG_ONEFACE_RECO_INPUT_IMG,
ERR_UNEXPECTED_NUM_FACES,
ERR_UNEXPECTED_NUM_INFERENCES,
ERR_MISSING_EMBEDDING,
ERR_INVALID_SCORING_MODE,
ERR_INVALID_EMBEDDING_SIZE,
EXPECTED_STANDARD_EMBED_LEN,
)
ASSETS_PATH = os.path.join(os.path.dirname(__file__), "assets")
engine_default = None
scoring_mode = None
sdk = None
class TestSDK(TestCase):
@classmethod
def setUpClass(cls):
global sdk
global engine_default
global scoring_mode
engine_default = Engine.OPENVINO
scoring_mode = ScoringMode.StandardEmbedding
sdk = SDK(engine=engine_default, settings={"scoring_mode": scoring_mode})
def setUp(self):
self.sdk = sdk
def test_get_faces_with_embeddings(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_faces(imgs, embeddings=True)
faces = detection_result.faces
self.assertEqual(len(faces), 1, msg=ERR_UNEXPECTED_NUM_FACES)
image_inferences = detection_result.image_inferences
self.assertEqual(len(image_inferences), 1, msg=ERR_UNEXPECTED_NUM_INFERENCES)
self.assert_embeddings(faces)
def test_get_embedding_from_landmarks(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_faces(imgs, embeddings=True)
faces = detection_result.faces
f = faces[0]
landmarks = f.landmarks
embeddings = self.sdk.get_embeddings_from_landmarks(imgs[0], [landmarks, landmarks])
self.assertEqual(len(embeddings), 2)
embedding = embeddings[0]
self.assertTrue(embedding.scoring_mode == ScoringMode.StandardEmbedding)
similarity = SDK.get_similarity(f.embedding, embedding)
self.assertAlmostEqual(similarity, 4.0, delta=0.01)
def test_scoring_same_image(self):
img = load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))
faces = self.sdk.get_faces([img, img], embeddings=True).faces
similarity = SDK.get_similarity(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(similarity, 4.0, delta=0.01)
confidence = self.sdk.get_confidence(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(confidence, 1.0, delta=0.01)
match_score = SDK.get_match_score(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(match_score, 957, delta=1)
def test_scoring_same_identity(self):
img1 = load_image(os.path.join(ASSETS_PATH, IMG_IDENTITY1_FACE1))
img2 = load_image(os.path.join(ASSETS_PATH, IMG_IDENTITY1_FACE2))
faces = self.sdk.get_faces([img1, img2], embeddings=True).faces
similarity = SDK.get_similarity(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(similarity, 3.58, delta=0.01)
confidence = self.sdk.get_confidence(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(confidence, 1.0, delta=0.01)
match_score = SDK.get_match_score(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(match_score, 903, delta=2)
def test_scoring_diff_identity(self):
img1 = load_image(os.path.join(ASSETS_PATH, IMG_IDENTITY1_FACE1))
img2 = load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))
faces = self.sdk.get_faces([img1, img2], embeddings=True).faces
similarity = SDK.get_similarity(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(similarity, 1.85, delta=0.01)
confidence = self.sdk.get_confidence(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(confidence, 0, delta=0.01)
match_score = SDK.get_match_score(faces[0].embedding, faces[1].embedding)
self.assertAlmostEqual(match_score, 198, delta=2)
def test_get_embedding_from_prepared_image(self):
imgs = [load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE))]
detection_result = self.sdk.get_faces(
imgs, qualities=True, landmarks=True, embeddings=True
)
faces = detection_result.faces
f = faces[0]
reco_img = load_image(os.path.join(ASSETS_PATH, IMG_ONEFACE_RECO_INPUT_IMG))
embedding = self.sdk.get_embedding_from_prepared_image(reco_img)
self.assertTrue(len(embedding.data) == EXPECTED_STANDARD_EMBED_LEN)
self.assertTrue(embedding.scoring_mode == scoring_mode)
self.assertTrue(
np.allclose(f.embedding.data, embedding.data, rtol=0, atol=0.001),
msg="Invalid embedding value",
)
def assert_embeddings(self, faces):
for f in faces:
self.assertIsNotNone(f.embedding, msg=ERR_MISSING_EMBEDDING)
self.assertEqual(
f.embedding.scoring_mode,
ScoringMode.StandardEmbedding,
msg=ERR_INVALID_SCORING_MODE,
)
self.assertTrue(
len(f.embedding.data) in Embedding.STANDARD_SIZES,
msg=ERR_INVALID_EMBEDDING_SIZE,
)

View File

@ -1,38 +0,0 @@
import numpy as np
from unittest import TestCase
from .._utils import compute_transform
class TestTransform(TestCase):
def test_transform(self):
src_points = [
[146.08132502, 155.9912228],
[218.04209101, 153.17409003],
[176.5086686, 207.03067255],
[153.90101734, 240.53104055],
[214.96274501, 237.63263655],
]
dst_points = [
[38.2946, 51.6963],
[73.5318, 51.5014],
[56.0252, 71.7366],
[41.5493, 92.3655],
[70.7299, 92.2041],
]
trans = compute_transform(src_points, dst_points)
out = np.asarray(
[
[4.79823508e-01, -1.35817363e-02, -2.85523114e01],
[1.35817363e-02, 4.79823508e-01, -2.59931548e01],
]
)
self.assertTrue(
(np.isclose(trans.flatten(), out.flatten()).all()),
msg="The transform wasn't computed sucessfully",
)

View File

@ -1,95 +0,0 @@
from unittest import TestCase
import numpy as np
from .utils import is_json_serializable
from ..types import BoundingBox, Face, Embedding, Landmarks, Point, ScoringMode
class TestBoundingBox(TestCase):
def setUp(self):
self.bb = BoundingBox(1.1, 2.2, 3.3, 4.4)
def test_as_dict(self):
d = self.bb.asdict()
props = ["origin", "width", "height"]
for p in props:
self.assertIn(p, d)
self.assertTrue(is_json_serializable(d))
class TestFaceWithStandardScoringMode(TestCase):
def setUp(self):
self.face = Face(
bounding_box=BoundingBox(*np.random.rand(4)),
)
self.face.quality = 0.5
self.face.embedding = Embedding(
np.random.rand(512), ScoringMode.StandardEmbedding
)
landmarks = np.random.rand(5, 2)
self.face.landmarks = Landmarks(*landmarks)
def test_as_dict(self):
d = self.face.asdict()
self.assertIn("quality", d)
self.assertEqual(d["quality"], 0.5)
props = ["bounding_box", "landmarks", "embedding"]
for p in props:
self.assertIn(p, d)
self.assertIsInstance(d[p], dict)
self.assertTrue(is_json_serializable(d))
class TestFaceWithEnhancedScoringMode(TestCase):
def setUp(self):
self.face = Face(
bounding_box=BoundingBox(*np.random.rand(4)),
)
self.face.embedding = Embedding(
np.random.rand(513), ScoringMode.EnhancedEmbedding
)
def test_as_dict(self):
d = self.face.asdict()
props = ["bounding_box", "embedding"]
for p in props:
self.assertIn(p, d)
self.assertIsInstance(d[p], dict)
self.assertTrue(is_json_serializable(d))
class TestLandmarks(TestCase):
def setUp(self):
self.landmarks = Landmarks((0, 1), (2, 3), (4, 5), (6, 7), (8, 9))
def test_as_dict(self):
d = self.landmarks.asdict()
props = ["left_eye", "right_eye", "nose", "left_mouth", "right_mouth"]
for p in props:
self.assertIn(p, d)
self.assertIsInstance(d[p], dict)
self.assertTrue(is_json_serializable(d))
class TestPoint(TestCase):
def setUp(self):
self.p = Point(1, 2)
def test_as_dict(self):
d = self.p.asdict()
self.assertIn("x", d)
self.assertIn("y", d)
self.assertEqual(d["x"], 1)
self.assertEqual(d["y"], 2)
self.assertTrue(is_json_serializable(d))

View File

@ -1,9 +0,0 @@
import json
def is_json_serializable(x):
try:
json.dumps(x)
return True
except TypeError:
return False

View File

@ -1,624 +0,0 @@
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from enum import IntEnum
from .exceptions import InvalidInputException
AGE_LABELS = ["2-12", "13-19", "20-30", "31-40", "41-50", "51-60", "60+"]
GENDER_LABELS = ["male", "female"]
class Point:
"""
A point within an image, represented by x- and y-coordinates.
Attributes
----------
x : float
The x-coordinate.
y : float
The y-coordinate.
"""
def __init__(self, x: float, y: float):
self._x = x
self._y = y
@property
def x(self):
return self._x
@x.setter
def x(self, x: float):
self._x = x
@property
def y(self):
return self._y
@y.setter
def y(self, y: float):
self._y = y
def __iter__(self):
yield self.x
yield self.y
def __repr__(self):
return f"<Point (x={self._x}, y={self._y})>"
def asdict(self):
"""Convert this object to a dictionary"""
return {"x": self._x, "y": self._y}
def astuple(self):
"""Convert this object to a tuple"""
return self._x, self._y
PointLike = Union[Point, np.ndarray, List[float], Tuple[float, float]]
class ScoringMode(IntEnum):
StandardEmbedding = 1
EnhancedEmbedding = 2
class BoundingBox:
"""
A bounding box, represented by origin(top-left point), width, and height.
Attributes
----------
origin : Point
Point object including coordinates of the top-left corner of the rectangle.
width : float
The width of the rectangle.
height : float
The height of the rectangle.
_score : float
The score for confidence for a face in the bounding box
"""
def __init__(self, x1: float, y1: float, x2: float, y2: float):
self._origin = Point(x1, y1)
self._width = x2 - x1
self._height = y2 - y1
self._score = None
@property
def origin(self):
return self._origin
@origin.setter
def origin(self, origin: Point):
self._origin = origin
@property
def width(self):
return self._width
@width.setter
def width(self, width: float):
self._width = width
@property
def height(self):
return self._height
@height.setter
def height(self, height: float):
self._height = height
@property
def score(self):
return self._score
@score.setter
def score(self, score):
self._score = score
def __repr__(self):
return f"<BoundingBox (origin={self._origin}, width={self._width}, height={self._height})>"
def asdict(self):
"""Convert this object to a dictionary"""
return {
"origin": self._origin.asdict(),
"width": self._width,
"height": self._height,
}
def astuple(self) -> Tuple[float, float, float, float]:
"""Convert this object to a tuple"""
x, y = self._origin.astuple()
return x, y, self._width, self._height
class Landmarks:
"""
A set of facial landmarks, represented by Points.
Attributes
----------
left_eye : Point
The center of the left eye.
right_eye : Point
The center of the right eye.
nose : Point
The tip of the nose.
left_mouth : Point
The left corner of the mouth.
right_mouth : Point
The right corner of the mouth.
"""
def __init__(
self,
left_eye: PointLike,
right_eye: PointLike,
nose: PointLike,
left_mouth: PointLike,
right_mouth: PointLike,
):
self._left_eye = Point(*left_eye)
self._right_eye = Point(*right_eye)
self._nose = Point(*nose)
self._left_mouth = Point(*left_mouth)
self._right_mouth = Point(*right_mouth)
@property
def left_eye(self):
return self._left_eye
@left_eye.setter
def left_eye(self, left_eye: PointLike):
self._left_eye = Point(*left_eye)
@property
def right_eye(self):
return self._right_eye
@right_eye.setter
def right_eye(self, right_eye: PointLike):
self._right_eye = Point(*right_eye)
@property
def nose(self):
return self._nose
@nose.setter
def nose(self, nose: PointLike):
self._nose = Point(*nose)
@property
def left_mouth(self):
return self._left_mouth
@left_mouth.setter
def left_mouth(self, left_mouth: PointLike):
self._left_mouth = Point(*left_mouth)
@property
def right_mouth(self):
return self._right_mouth
@right_mouth.setter
def right_mouth(self, right_mouth: PointLike):
self._right_mouth = Point(*right_mouth)
def __repr__(self):
return (
"<Landmarks "
+ f"(left_eye={repr(self._left_eye)}, right_eye={repr(self._right_eye)}, "
+ f"nose={repr(self._nose)}, "
+ f"left_mouth={repr(self._left_mouth)}, right_mouth={repr(self._right_mouth)})>"
)
def asdict(self):
"""Convert this object to a dictionary"""
return {
"left_eye": self._left_eye.asdict(),
"right_eye": self._right_eye.asdict(),
"nose": self._nose.asdict(),
"left_mouth": self._left_mouth.asdict(),
"right_mouth": self._right_mouth.asdict(),
}
def astuple(self):
"""Convert this object to a tuple"""
return (
self._left_eye.astuple(),
self._right_eye.astuple(),
self._nose.astuple(),
self._left_mouth.astuple(),
self._right_mouth.astuple(),
)
class Embedding:
"""
A numerical representation of a face found in an image.
Attributes
----------
data : numpy.ndarray
The embedding data representing a face.
"""
STANDARD_SIZES = np.array([256, 512, 1024])
ENHANCED_SIZES = STANDARD_SIZES + 1
def __init__(
self,
data: np.ndarray,
scoring_mode: ScoringMode = ScoringMode.EnhancedEmbedding,
):
self._validate_data(data, scoring_mode)
self._data = data
self._scoring_mode = scoring_mode
@property
def data(self) -> np.ndarray:
if (
self._scoring_mode == ScoringMode.StandardEmbedding
and len(self._data) in self.ENHANCED_SIZES
):
return self._data[:-1]
return self._data
@property
def scoring_mode(self):
return self._scoring_mode
def asdict(self):
return {
"data": self._data.tolist(),
"scoring_mode": self._scoring_mode.name,
}
@classmethod
def _validate_data(cls, data: np.ndarray, scoring_mode: ScoringMode):
if scoring_mode == ScoringMode.EnhancedEmbedding:
if len(data) not in cls.ENHANCED_SIZES:
raise InvalidInputException(
f"Invalid embedding size, enhanced embedding size must be one of {cls.ENHANCED_SIZES}"
)
elif scoring_mode == ScoringMode.StandardEmbedding:
if (
len(data) not in cls.ENHANCED_SIZES
and len(data) not in cls.STANDARD_SIZES
):
raise InvalidInputException(
f"Invalid embedding size, standard embedding size must be one of "
f"{cls.ENHANCED_SIZES + cls.STANDARD_SIZES}"
)
else:
raise InvalidInputException("Invalid scoring mode")
class BaseAttributes:
"""
This is an empty class inherited by the AttributesSDK.
For internal use only.
Customers should use the implementation from Attributes SDK:
from paravision.attributes.types import get_attributes, Attributes
attr: Attributes = get_attributes(face)
"""
class Face:
"""
A face, minimally represented by a bounding box.
Attributes
----------
bounding_box : BoundingBox
The bounding box of the face.
landmarks : Landmarks
The locations of various parts of the face.
embedding : Embedding
The embedding representing the face.
ages: numpy.ndarray
The probability distribution over the given age groups.
genders: numpy.ndarry
The probability distribution over the given genders.
quality : float
An overall assessment of how acceptable the face is for facial
recognition. Overall range of [0, 1], "acceptable" quality is >= 0.15.
acceptability: float
The assessment of the acceptability of the face. Overall range [0, 1]
mask: float
The probability of a face wearing mask. Overall range [0, 1]
recognition_input_image, alignment_input_image, landmarks_input_image: numpy.ndarray
Images used at different stages of the detection pipeline.
alignment_bounding_box, landmarks_input_bounding_box: BoundingBox
The bounding boxes that are associated to alignment_input_image and landmarks_input_image
"""
def __init__(self, bounding_box: Optional[BoundingBox] = None):
self._bounding_box = bounding_box
self._landmarks = None
self._embedding = None
self._ages = None
self._genders = None
self._quality = None
self._acceptability = None
self._mask = None
self._recognition_input_image = None
self._landmarks_input_image = None
self._landmarks_input_bounding_box = None
self._alignment_image = None
self._alignment_bounding_box = None
self._attributes = None
@property
def bounding_box(self):
return self._bounding_box
@bounding_box.setter
def bounding_box(self, bbox: BoundingBox):
self._bounding_box = bbox
@property
def landmarks(self):
return self._landmarks
@landmarks.setter
def landmarks(self, landmarks: Landmarks):
self._landmarks = landmarks
@property
def embedding(self):
return self._embedding
@embedding.setter
def embedding(self, embedding: Embedding):
self._embedding = embedding
@property
def age(self):
return None if self._ages is None else AGE_LABELS[np.argmax(self._ages)]
@property
def ages(self):
return self._ages
@ages.setter
def ages(self, ages: Union[List[float], np.ndarray]):
self._ages: Optional[List[float]] = [*ages]
@property
def gender(self):
return (
None if self._genders is None else GENDER_LABELS[np.argmax(self._genders)]
)
@property
def genders(self):
return self._genders
@genders.setter
def genders(self, genders: Union[List[float], np.ndarray]):
self._genders: Optional[List[float]] = [*genders]
@property
def quality(self):
return self._quality
@quality.setter
def quality(self, quality: float):
self._quality = quality
@property
def acceptability(self):
return self._acceptability
@acceptability.setter
def acceptability(self, acceptability: float):
self._acceptability = acceptability
@property
def mask(self):
return self._mask
@mask.setter
def mask(self, mask: float):
self._mask = mask
@property
def recognition_input_image(self):
return self._recognition_input_image
@recognition_input_image.setter
def recognition_input_image(self, recognition_input_image: np.ndarray):
self._recognition_input_image = recognition_input_image
@property
def landmarks_input_image(self):
return self._landmarks_input_image
@landmarks_input_image.setter
def landmarks_input_image(self, landmarks_input_image: np.ndarray):
self._landmarks_input_image = landmarks_input_image
@property
def landmarks_input_bounding_box(self):
return self._landmarks_input_bounding_box
@landmarks_input_bounding_box.setter
def landmarks_input_bounding_box(self, landmarks_input_bbox: BoundingBox):
self._landmarks_input_bounding_box = landmarks_input_bbox
@property
def alignment_image(self):
return self._alignment_image
@alignment_image.setter
def alignment_image(self, alignment_image: np.ndarray):
self._alignment_image = alignment_image
@property
def alignment_bounding_box(self):
return self._alignment_bounding_box
@alignment_bounding_box.setter
def alignment_bounding_box(self, alignment_bbox: BoundingBox):
self._alignment_bounding_box = alignment_bbox
@property
def attributes(self):
return self._attributes
@attributes.setter
def attributes(self, attributes: BaseAttributes):
self._attributes = attributes
def __repr__(self):
return "<Face %s>" % (str(self._bounding_box),)
def asdict(self):
"""Convert this object to a dictionary representation."""
if self._bounding_box is None:
return {}
face: Dict[str, Any] = {"bounding_box": self._bounding_box.asdict()}
if self._landmarks is not None:
face["landmarks"] = self._landmarks.asdict()
if self._embedding is not None:
face["embedding"] = self._embedding.asdict()
if self._quality is not None:
face["quality"] = self._quality
if self._acceptability is not None:
face["acceptability"] = self._acceptability
if self._mask is not None:
face["mask"] = self._mask
return face
class ImageInferenceData:
"""
The result of running get faces pipeline for a single image
Attributes
----------
faces : List<Face>
A list of Face objects detected in the image
width : Int
width of the inference image
height : Int
height of the inference image
"""
def __init__(self, width: int, height: int):
self._width = width
self._height = height
self._faces: List[Face] = []
@property
def width(self):
return self._width
@width.setter
def width(self, width):
self._width = width
@property
def height(self):
return self._height
@height.setter
def height(self, height):
self._height = height
@property
def faces(self):
return self._faces
@faces.setter
def faces(self, faces):
self._faces = faces
def __repr__(self):
return f"<ImageInferenceData {self._faces}>"
def most_prominent_face_index(self) -> int:
if self._height <= 0 or self._width <= 0 or len(self._faces) == 0:
return -1
face_areas = []
for face in self._faces:
bb = face.bounding_box
if bb is None:
continue
x1 = max(min(bb.origin.x, self._width), 0)
y1 = max(min(bb.origin.y, self._height), 0)
x2 = max(min((bb.origin.x + bb.width), self._width), 0)
y2 = max(min((bb.origin.y + bb.height), self._height), 0)
area = (x2 - x1) * (y2 - y1)
face_areas.append(area)
return int(np.argmax(face_areas)) if face_areas else -1
def asdict(self):
"""Convert this object to a dictionary"""
return {"faces": self._faces}
class InferenceResult:
"""
The result of running the get faces pipeline on a group of images
Attributes
----------
faces : List<Face>
A list of Face objects detected in all images
image_inferences : List<ImageInferenceData>
A list of detection results grouped by an image
"""
def __init__(self, image_inferences: List[ImageInferenceData]):
self._image_inferences = image_inferences
self._faces: List[Face] = []
for image_inference in image_inferences:
self._faces.extend(image_inference.faces)
@property
def image_inferences(self):
return self._image_inferences
@image_inferences.setter
def image_inferences(self, image_inferences: List[ImageInferenceData]):
self._image_inferences = image_inferences
@property
def faces(self):
return self._faces
@faces.setter
def faces(self, faces: List[Face]):
self._faces = faces
def __repr__(self):
return f"<InferenceResult {self._image_inferences}>"
def asdict(self):
"""Convert this object to a dictionary"""
return {"image_inferences": self._image_inferences}

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +0,0 @@
import cv2
import numpy as np
from typing import Union
from .exceptions import InvalidInputException
def load_image(img_data_or_path: Union[str, bytes]) -> np.ndarray:
if isinstance(img_data_or_path, str):
img = cv2.imread(img_data_or_path, cv2.IMREAD_COLOR)
else:
img = cv2.imdecode(
np.frombuffer(img_data_or_path.read(), dtype=np.uint8), cv2.IMREAD_COLOR
)
if img is None:
raise InvalidInputException("Couldn't load the invalid input image")
return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

Some files were not shown because too many files have changed in this diff Show More