492 lines
19 KiB
Plaintext
492 lines
19 KiB
Plaintext
# Pre-reqs: tqdm pandas seaborn thop
|
|
import cherrypy
|
|
|
|
import time
|
|
import os
|
|
import sys
|
|
import json
|
|
from pathlib import Path
|
|
|
|
# General image processing
|
|
import cv2
|
|
import torch
|
|
import torch.backends.cudnn as cudnn
|
|
from numpy import random
|
|
import copy
|
|
import numpy
|
|
|
|
# Specific YOLO package directories (check you PYTHONPATH)
|
|
from models.experimental import attempt_load
|
|
from utils.datasets import letterbox
|
|
from utils.general import check_img_size, non_max_suppression_face, scale_coords, xyxy2xywh
|
|
from utils.torch_utils import time_synchronized
|
|
|
|
|
|
|
|
class yoloserv(object):
|
|
|
|
yolo = None
|
|
device = None
|
|
DEVICE = "cpu"
|
|
imgdir = None
|
|
outdir = None
|
|
face_detector = None
|
|
palm_detector = None
|
|
face_matcher = None
|
|
palm_matcher = None
|
|
ir_camera = None
|
|
|
|
points = []
|
|
|
|
|
|
# Nature of init depends on the required algotithms listed in /etc/ukdi.conf
|
|
# eg :: "yolo_devices": "detect_face,facematch"
|
|
# detect_face - - fnd the most significant face in a crowd (works for IR)
|
|
# paravision - - proprietary face matching (high quality)
|
|
# facematch - - open source face matching (decent quality)
|
|
# realsense - - intel realsense camera (unstable at best)
|
|
# seek - - seek IR camera
|
|
# palmvein - - palm vein detection
|
|
def initialise(self):
|
|
with open("/etc/ukdi.json","r") as f:
|
|
self.conf = json.loads(f.read())
|
|
self.device_list = self.conf["yolo_devices"].split(",")
|
|
self.imgdir = self.conf("yolo_indir")
|
|
self.outdir = self.conf("yolo_outdir")
|
|
if "detect_face" in self.device_list and self.conf["emulate_facematch"]==0:
|
|
self.init_detect_face()
|
|
if "paravision" in self.device_list and self.conf["emulate_facematch"]==0:
|
|
self.face_detector = self.init_paravision()
|
|
if "facematch" in self.device_list and self.conf["emulate_facematch"]==0:
|
|
self.face_detector = self.init_facematch()
|
|
if "realsense" in self.device_list:
|
|
self.face_detector = self.init_realsense()
|
|
if "seek" in self.device_list:
|
|
self.ir_camera() = self.seek_init()
|
|
if "flir" in self.device_list:
|
|
self.ir_camera() = self.flir_init()
|
|
if "palmvein" in self.device_list:
|
|
self.palm_detector = selt.init_palmvein()
|
|
if "fjpalmvein" in self.device_list:
|
|
self.palm_detector = selt.init_jvpalmvein()
|
|
|
|
|
|
|
|
dev svc_detect_face(self,imfile):
|
|
self.face_detector.detect(imfile)
|
|
|
|
dev svc_match_face(self,imfile1,imfile2):
|
|
return self.face_d.detect(imfile1,imfile2)
|
|
|
|
dev svc_detect_face(self,imfile):
|
|
return self.face_detector.detect(imfile)
|
|
|
|
dev svc_detect_face(self,imfile):
|
|
return self.face_detector.detect(imfile)
|
|
|
|
|
|
##### ###### ##### ###### #### #####
|
|
# # # # # # # #
|
|
# # ##### # ##### # #
|
|
# # # # # # #
|
|
# # # # # # # #
|
|
##### ###### # ###### #### #
|
|
|
|
|
|
def fm_init(self):
|
|
print("@@@ initialising facematch")
|
|
try:
|
|
self.sdk = SDK(engine=Engine.OPENVINO)
|
|
except ParavisionException:
|
|
pass
|
|
|
|
def fm_load(self, dev1, dev2, id_image_filepath, photo_image_filepath):
|
|
self.dev1 = dev1
|
|
self.dev2 = dev2
|
|
try:
|
|
# Load images
|
|
self.id_image = load_image(id_image_filepath)
|
|
self.photo_image = load_image(photo_image_filepath)
|
|
print("++++++++++++++++ ",self.id_image)
|
|
return True
|
|
except:
|
|
return None
|
|
|
|
def fm_get_faces(self):
|
|
try:
|
|
# Get all faces from images with qualities, landmarks, and embeddings
|
|
self.inference_result = self.sdk.get_faces([self.id_image, self.photo_image], qualities=True, landmarks=True, embeddings=True)
|
|
self.image_inference_result = self.inference_result.image_inferences
|
|
if len(self.image_inference_result)==0:
|
|
return "no inferences found"
|
|
|
|
# Get most prominent face
|
|
self.id_face = self.image_inference_result[0].most_prominent_face_index()
|
|
self.photo_face = self.image_inference_result[1].most_prominent_face_index()
|
|
if self.id_face<0:
|
|
return "no id face found"
|
|
if self.photo_face<0:
|
|
return "no live face found"
|
|
|
|
# Get numerical representation of faces (required for face match)
|
|
if (len(self.image_inference_result)<2):
|
|
return "ID or human face could not be recognised"
|
|
self.id_emb = self.image_inference_result[0].faces[self.id_face].embedding
|
|
self.photo_emb = self.image_inference_result[1].faces[self.photo_face].embedding
|
|
|
|
except Exception as ex:
|
|
return "image processing exception "+str(ex)
|
|
|
|
return None
|
|
|
|
|
|
# return " id=%d photo=%d result=%d " % (self.id_face, self.photo_face, len(self.image_inference_result))
|
|
|
|
|
|
def fm_compute_scores(self):
|
|
try:
|
|
# Get image quality scores (how 'good' a face is)
|
|
self.id_qual = self.image_inference_result[0].faces[self.id_face].quality
|
|
self.photo_qual = self.image_inference_result[1].faces[self.photo_face].quality
|
|
|
|
self.id_qual = round(self.id_qual, 3)
|
|
self.photo_qual = round(self.photo_qual, 3)
|
|
|
|
# Get face match score
|
|
self.match_score = self.sdk.get_match_score(self.id_emb, self.photo_emb)
|
|
|
|
# Create .json
|
|
self.face_match_json = {"device1":self.dev1,
|
|
"device2":self.dev2,
|
|
"passmark":500,
|
|
"device1_qual":self.id_qual,
|
|
"device2_qual":self.photo_qual,
|
|
"match_score":self.match_score}
|
|
|
|
#return json.dumps(self.face_match_json)
|
|
|
|
#print(self.face_match_json)
|
|
|
|
# Send to core
|
|
#url = "%s/notify/%s/%s" % (self.conf["core"], self.conf["identity"], face_match_json)
|
|
#url = url.replace(" ", "%20") # Remove spaces
|
|
#buf = []
|
|
#req = urllib.request.Request( url )
|
|
#with urllib.request.urlopen(req) as response:
|
|
#print(response.read())
|
|
|
|
except Exception as ex:
|
|
return str(ex)
|
|
|
|
|
|
def get_scores(self):
|
|
return json.dumps(self.face_match_json)
|
|
|
|
|
|
##### ## ##### ## # # # #### # #
|
|
# # # # # # # # # # # # ## #
|
|
# # # # # # # # # # # #### # # #
|
|
##### ###### ##### ###### # # # # # # #
|
|
# # # # # # # # # # # # # ##
|
|
# # # # # # # ## # #### # #
|
|
|
|
def pv_init(self):
|
|
print("@@@ initialising paravision")
|
|
from paravision.recognition import SDK, Engine
|
|
import paravision.recognition.utils as pru
|
|
from paravision.recognition.exceptions import ParavisionException
|
|
try:
|
|
self.sdk = SDK(engine=Engine.AUTO)
|
|
except ParavisionException:
|
|
pass
|
|
|
|
def pv_read(self, imgpath):
|
|
if not os.path.exists(imgpath):
|
|
print("File not found ",imgpath)
|
|
return False
|
|
self.imgpath = imgpath
|
|
self.image = pru.load_image(imgpath)
|
|
print(self.image)
|
|
return True
|
|
|
|
def pv_process(self):
|
|
# Get all faces metadata
|
|
print("Finding faces in %s" %(self.imgpath))
|
|
faces = self.sdk.get_faces([self.image], qualities=True, landmarks=True, embeddings=True)
|
|
print("Getting metadata")
|
|
inferences = faces.image_inferences
|
|
print("Getting best face")
|
|
ix = inferences[0].most_prominent_face_index()
|
|
print("Getting a mathematical mode of that best face")
|
|
self.model = inferences[0].faces[ix].embedding
|
|
print("Getting image quality scores..")
|
|
self.score = round(1000*inferences[0].faces[ix].quality)
|
|
print("Score was %d" %(self.score))
|
|
return self.score
|
|
|
|
def pv_compare(self,other):
|
|
# Get face match score
|
|
return self.sdk.get_match_score(self.model, other.model)
|
|
|
|
|
|
|
|
|
|
###### ## #### ###### # # ##### #### # #
|
|
# # # # # # ## ## # # # # #
|
|
##### # # # ##### # ## # # # ######
|
|
# ###### # # # # # # # #
|
|
# # # # # # # # # # # # #
|
|
# # # #### ###### # # # #### # #
|
|
|
|
def init_facematch(self):
|
|
print("@@@ initialising realsense")
|
|
import pyrealsense2 as rs
|
|
|
|
|
|
##### ###### ## # #### ###### # # ####
|
|
# # # # # # # # ## # #
|
|
# # ##### # # # #### ##### # # # ####
|
|
##### # ###### # # # # # # #
|
|
# # # # # # # # # # ## # #
|
|
# # ###### # # ###### #### ###### # # ####
|
|
|
|
def init_realsense(self):
|
|
print("@@@ initialising realsense")
|
|
import pyrealsense2 as rs
|
|
|
|
|
|
|
|
|
|
#### ##### ###### # # #### # #
|
|
# # # # # ## # # # # #
|
|
# # # # ##### # # # # # #
|
|
# # ##### # # # # # # #
|
|
# # # # # ## # # # #
|
|
#### # ###### # # #### ##
|
|
|
|
def init_opencv(self):
|
|
print("@@@ initialising opencv")
|
|
|
|
|
|
|
|
#######
|
|
# # #### # #### # # #
|
|
# # # # # # # # # #
|
|
# # # # # # # # #####
|
|
# # # # # # # # #
|
|
# # # # # # # # # #
|
|
# #### ###### #### ## #####
|
|
|
|
# Set up the model and compute device (takes a while, hence this being a server)
|
|
# Example weightsfile: runs/train/exp/weights/yolov5m6_face.pt
|
|
def v5_init(self, imgdir, outdir, weightsfile):
|
|
print("@@@ initialising yolov5")
|
|
self.weightsfile = weightsfile
|
|
self.imgdir = imgdir
|
|
self.outdir = outdir
|
|
if torch.cuda.is_available():
|
|
self.DEVICE = "cuda"
|
|
print("Setting up the %s device..." % (self.DEVICE))
|
|
self.device = torch.device(self.DEVICE)
|
|
print("Setting up the yolo class...")
|
|
# Returns a processor loaded with the weights and hook to the compute device
|
|
self.yolo = attempt_load(self.weightsfile, map_location=self.DEVICE) # load FP32 model
|
|
|
|
|
|
# This does all the heavy lifting. Just give it fully qualified file names for input and output images
|
|
@cherrypy.expose
|
|
def y5_process(self, imgfile):
|
|
# Get landmarks for the enarest face
|
|
l = self.y5_detect_nearest("%s/%s" % (self.imgdir, imgfile))
|
|
S = json.dumps(self.points)
|
|
return S
|
|
#left eye, right eye, nose, left mouth, right mouth, left inner eyebrow, right inner eyebrow (X, Y)
|
|
#return('{ "el":[%f,%f], "er":[%f,%f], "nn":[%f,%f], "ml":[%f,%f], "mr":[%f,%f], "il":[%f,%f], "ir":[%f,%f], "xyxy":[%d,%d,%d,%d] }'\
|
|
# % ( l[0], l[1], l[2], l[3], l[4], l[5], l[6], l[7], l[8], l[9], l[10], l[11], l[12], l[13], l[14], l[15], l[16], l[17] ))
|
|
#print("Landmarks:", landmarks_all)
|
|
|
|
|
|
# Detect the most significant (biggest, most central) face in the scene.
|
|
# This method is kind of ugly and does everything at once. Refactor?
|
|
def y5_detect_nearest(self, imgfile, just_the_face=False):
|
|
print("Detecting ",imgfile)
|
|
# Set some config and load the image
|
|
img_size = 320
|
|
conf_thres = 0.3
|
|
iou_thres = 0.5
|
|
|
|
# Load the image and make some copies
|
|
orgimg = cv2.imread(imgfile) # BGR
|
|
img0 = copy.deepcopy(orgimg)
|
|
out_img = copy.deepcopy(orgimg)
|
|
assert orgimg is not None, 'Image Not Found %s' % (imgfile)
|
|
h0, w0 = orgimg.shape[:2] # orig hw
|
|
|
|
# reformat and resize the image
|
|
r = img_size / max(h0, w0)
|
|
if r != 1: # always resize down, only resize up if training with augmentation
|
|
interp = cv2.INTER_AREA if r < 1 else cv2.INTER_LINEAR
|
|
img0 = cv2.resize(img0, (int(w0 * r), int(h0 * r)), interpolation=interp)
|
|
imgsz = check_img_size(img_size, s=self.yolo.stride.max()) # check img_size
|
|
img = letterbox(img0, new_shape=imgsz)[0]
|
|
img = img[:, :, ::-1].transpose(2, 0, 1).copy() # BGR to RGB, to 3x416x416
|
|
|
|
# Convert the image to a torch image (tensor) values 0->1
|
|
img = torch.from_numpy(img).to(self.device)
|
|
img = img.float() # uint8 to fp16/32
|
|
img /= 255.0 # 0 - 255 to 0.0 - 1.0
|
|
if img.ndimension() == 3:
|
|
img = img.unsqueeze(0)
|
|
#print(img)
|
|
|
|
# Find all faces
|
|
# This fails on some versions of yolo. If it does:
|
|
# sudo vi `locate upsampling.py`
|
|
# COmment out "recompute_scale_factor=self.recompute_scale_factor)" at about line 157
|
|
all_faces = self.yolo(img)
|
|
face0 = all_faces[0]
|
|
|
|
# Apply NMS
|
|
face = non_max_suppression_face(face0, conf_thres, iou_thres)
|
|
|
|
print('img.shape: ', img.shape)
|
|
print('orgimg.shape: ', orgimg.shape)
|
|
|
|
landmarks = []
|
|
landmarks_eyebrows = []
|
|
xyxy = []
|
|
# Process detections
|
|
for i, det in enumerate(face): # detections per image
|
|
gn = torch.tensor(orgimg.shape)[[1, 0, 1, 0]].to(self.device) # normalization gain whwh
|
|
gn_lks = torch.tensor(orgimg.shape)[[1, 0, 1, 0, 1, 0, 1, 0, 1, 0]].to(self.device) # normalization gain landmarks
|
|
if len(det):
|
|
# Rescale boxes from img_size to im0 size
|
|
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], orgimg.shape).round()
|
|
|
|
# Print results
|
|
for c in det[:, -1].unique():
|
|
n = (det[:, -1] == c).sum() # detections per class
|
|
|
|
det[:, 5:15] = self.y5_scale_coords_landmarks(img.shape[2:], det[:, 5:15], orgimg.shape).round()
|
|
|
|
for j in range(det.size()[0]):
|
|
xywh = (xyxy2xywh(det[j, :4].view(1, 4)) / gn).view(-1).tolist()
|
|
conf = det[j, 4].cpu().numpy()
|
|
landmarks = (det[j, 5:15].view(1, 10) / gn_lks).view(-1).tolist()
|
|
class_num = det[j, 15].cpu().numpy()
|
|
#orgimg = show_results(orgimg, xywh, conf, landmarks, class_num)
|
|
|
|
#estimate eyebrow locations
|
|
landmarks_eyebrows = self.y5_calc_eyebrows(landmarks)
|
|
newimg, xyxy = self.y5_show_results(orgimg, xywh, conf, landmarks, class_num, landmarks_eyebrows)
|
|
|
|
landmarks_all = landmarks + landmarks_eyebrows + xyxy
|
|
shrunk = out_img[ xyxy[1]:xyxy[3], xyxy[0]:xyxy[2], 0:3 ]
|
|
if just_the_face:
|
|
points = xyxy
|
|
cv2.imwrite("%s/yolo.jpg" %(self.outdir), newimg)
|
|
cv2.imwrite("/%s/shrunk.jpg" % (self.outdir),shrunk)
|
|
shrunk.tofile("/%s/shrunk.raw" % (self.outdir))
|
|
return landmarks_all
|
|
|
|
|
|
def y5_scale_coords_landmarks(self, img1_shape, coords, img0_shape, ratio_pad=None):
|
|
# Rescale coords (xyxy) from img1_shape to img0_shape
|
|
if ratio_pad is None: # calculate from img0_shape
|
|
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
|
|
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding
|
|
else:
|
|
gain = ratio_pad[0][0]
|
|
pad = ratio_pad[1]
|
|
|
|
coords[:, [0, 2, 4, 6, 8]] -= pad[0] # x padding
|
|
coords[:, [1, 3, 5, 7, 9]] -= pad[1] # y padding
|
|
coords[:, :10] /= gain
|
|
#clip_coords(coords, img0_shape)
|
|
coords[:, 0].clamp_(0, img0_shape[1]) # x1
|
|
coords[:, 1].clamp_(0, img0_shape[0]) # y1
|
|
coords[:, 2].clamp_(0, img0_shape[1]) # x2
|
|
coords[:, 3].clamp_(0, img0_shape[0]) # y2
|
|
coords[:, 4].clamp_(0, img0_shape[1]) # x3
|
|
coords[:, 5].clamp_(0, img0_shape[0]) # y3
|
|
coords[:, 6].clamp_(0, img0_shape[1]) # x4
|
|
coords[:, 7].clamp_(0, img0_shape[0]) # y4
|
|
coords[:, 8].clamp_(0, img0_shape[1]) # x5
|
|
coords[:, 9].clamp_(0, img0_shape[0]) # y5
|
|
return coords
|
|
|
|
|
|
def y5_pixval(self,img,x,y):
|
|
# return the pixel value at the point
|
|
return numpy.average(img[x-1:x+1,y-1:y+1])
|
|
|
|
# Render green square and landmark dots on the original image, and return the image
|
|
def y5_show_results(self, img, xywh, conf, landmarks, class_num, landmarks_eyebrows):
|
|
h,w,c = img.shape
|
|
tl = 1 or round(0.002 * (h + w) / 2) + 1 # line/font thickness
|
|
x1 = int(xywh[0] * w - 0.5 * xywh[2] * w)
|
|
y1 = int(xywh[1] * h - 0.5 * xywh[3] * h)
|
|
x2 = int(xywh[0] * w + 0.5 * xywh[2] * w)
|
|
y2 = int(xywh[1] * h + 0.5 * xywh[3] * h)
|
|
cv2.rectangle(img, (x1,y1), (x2, y2), (0,255,0), thickness=tl, lineType=cv2.LINE_AA)
|
|
|
|
clors = [(255,0,0),(0,255,0),(0,0,255),(255,255,0),(0,255,255)]
|
|
|
|
clors2 = [(155,10,10),(10,155,10)]
|
|
|
|
self.points = []
|
|
for i in range(5):
|
|
point_x = int(landmarks[2 * i] * w)
|
|
point_y = int(landmarks[2 * i + 1] * h)
|
|
self.points.append((point_x,point_y,self.pixval(img,point_x,point_y)))
|
|
cv2.circle(img, (point_x, point_y), tl+1, clors[i], -1)
|
|
|
|
for i in range(2):
|
|
point_x = int(landmarks_eyebrows[2 * i] * w)
|
|
point_y = int(landmarks_eyebrows[2 * i + 1] * h)
|
|
self.points.append((point_x,point_y,self.pixval(img,point_x,point_y)))
|
|
cv2.circle(img, (point_x, point_y), tl+1, clors2[i], -1)
|
|
|
|
tf = max(tl - 1, 1) # font thickness
|
|
label = str(conf)[:5]
|
|
cv2.putText(img, label, (x1, y1 - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
|
|
return img, [x1,y1,x2,y2]
|
|
|
|
|
|
# DO some maths to figure out eyebrow positions relative to the eyes
|
|
def y5_calc_eyebrows(self, landmarks):
|
|
landmarks_eyes = numpy.array(landmarks[0:4], dtype=numpy.float32)
|
|
|
|
difx_eye = landmarks_eyes[2] - landmarks_eyes[0]
|
|
ebx1 = landmarks_eyes[0] + (difx_eye/4)
|
|
ebx2 = landmarks_eyes[2] - (difx_eye/4)
|
|
|
|
dify_eye = 25*difx_eye/63
|
|
eby1 = landmarks_eyes[1] - dify_eye
|
|
eby2 = landmarks_eyes[3] - dify_eye
|
|
|
|
landmarks_eyebrows = numpy.array([ebx1, eby1, ebx2, eby2])
|
|
landmarks_eyebrows = landmarks_eyebrows.tolist()
|
|
#print('landmarks:', landmarks)
|
|
#print('eyes:', landmarks_eyes)
|
|
#print('eyebrows:', landmarks_eyebrows)
|
|
return landmarks_eyebrows
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Deal with the incoming call parameters
|
|
servport = int(sys.argv[1])
|
|
imgdir = sys.argv[2]
|
|
outdir = sys.argv[3]
|
|
weightsfile = sys.argv[4]
|
|
|
|
# Initialise the webserver
|
|
s = yoloserv()
|
|
s.initialise()
|
|
#s.initialise(imgdir,outdir,weightsfile)
|
|
cherrypy.config.update({'server.socket_host': '0.0.0.0',
|
|
'server.socket_port': servport})
|
|
cherrypy.quickstart(s, '/')
|
|
|