improved modularisation and base class for face rec
This commit is contained in:
commit
a28db12949
BIN
doc/yoloserv.odt
BIN
doc/yoloserv.odt
Binary file not shown.
@ -54,6 +54,8 @@ export YOLO=$PWD/../yoloserv
|
|||||||
cat<<EOF
|
cat<<EOF
|
||||||
UKDIHOST=$UKDIHOST
|
UKDIHOST=$UKDIHOST
|
||||||
UKDIPORT=$UKDIPORT
|
UKDIPORT=$UKDIPORT
|
||||||
|
YOLOHOST=$YOLOHOST
|
||||||
|
YOLOPORT=$YOLOPORT
|
||||||
CORE=$CORE
|
CORE=$CORE
|
||||||
UKDI=$UKDI
|
UKDI=$UKDI
|
||||||
EOF
|
EOF
|
||||||
@ -61,6 +63,8 @@ EOF
|
|||||||
f_read_ukdi
|
f_read_ukdi
|
||||||
|
|
||||||
ZZZ=5
|
ZZZ=5
|
||||||
|
mkdir -p $UKDI_yolo_indir
|
||||||
|
mkdir -p $UKDI_yolo_outdir
|
||||||
|
|
||||||
|
|
||||||
# Determina all the possible libs based on whats in UKDI_yolo_devices
|
# Determina all the possible libs based on whats in UKDI_yolo_devices
|
||||||
@ -72,6 +76,7 @@ modules/openvino/python3.10/dist-packages/openvino/inference_engine"
|
|||||||
PYP_PARAVISION="modules"
|
PYP_PARAVISION="modules"
|
||||||
PYP_DEEPFACE="modules"
|
PYP_DEEPFACE="modules"
|
||||||
PYP_YOLOV5="modules/yolov5-face_Jan1"
|
PYP_YOLOV5="modules/yolov5-face_Jan1"
|
||||||
|
LIB_SEEK="modules/seek/Seekware_SDK_3.6.0.0/lib/x86_64-linux-gnu/"
|
||||||
|
|
||||||
# WHich libs are loaded depends on the YOLO device list
|
# WHich libs are loaded depends on the YOLO device list
|
||||||
LLP="."
|
LLP="."
|
||||||
@ -89,6 +94,9 @@ do
|
|||||||
"deepface") LLP="$LLP:$LIB_DEEPFACE"
|
"deepface") LLP="$LLP:$LIB_DEEPFACE"
|
||||||
PYP="$PYP:$PYP_DEEPFACE"
|
PYP="$PYP:$PYP_DEEPFACE"
|
||||||
;;
|
;;
|
||||||
|
"seek") LLP="$LLP:$LIB_SEEK"
|
||||||
|
PYP="$PYP:src"
|
||||||
|
;;
|
||||||
"face_recognition") LLP="$LLP:$LIB_DEEPFACE"
|
"face_recognition") LLP="$LLP:$LIB_DEEPFACE"
|
||||||
PYP="$PYP:$PYP_DEEPFACE"
|
PYP="$PYP:$PYP_DEEPFACE"
|
||||||
;;
|
;;
|
||||||
|
|||||||
197
src/deepfacex.py
197
src/deepfacex.py
@ -2,6 +2,7 @@ import sys
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import cv2
|
import cv2
|
||||||
|
import time
|
||||||
# Generic deepface
|
# Generic deepface
|
||||||
from deepface import DeepFace
|
from deepface import DeepFace
|
||||||
from deepface.detectors import FaceDetector
|
from deepface.detectors import FaceDetector
|
||||||
@ -24,124 +25,72 @@ class Deepfacex(FaceClass):
|
|||||||
img1_path = ""
|
img1_path = ""
|
||||||
img2_path = ""
|
img2_path = ""
|
||||||
backend = "ssd"
|
backend = "ssd"
|
||||||
|
model = "Facenet512"
|
||||||
fn512_model = None
|
fn512_model = None
|
||||||
resnet_model = None
|
resnet_model = None
|
||||||
visual = 0
|
visual = 0
|
||||||
tree = { "img1_faces":0, "img1_best":0, "img1_qual":0, "img2_faces":0, "img2_best": 0, "img2_qual":0, "threshold":0, "score":0, "matched":0 }
|
tree = { "img1_faces":0, "img1_best":0, "img1_qual":0, "img2_faces":0, "img2_best": 0, "img2_qual":0, "threshold":0, "score":0, "matched":0 }
|
||||||
|
|
||||||
|
|
||||||
|
# @doc Initialise the class and choose the backends and models
|
||||||
def init(self, backend, model):
|
def init(self, backend, model):
|
||||||
print("Loading models...")
|
print("Loading models...")
|
||||||
self.backend = backend
|
self.backend = backend
|
||||||
self.model = model
|
self.model = model
|
||||||
|
self.detector = FaceDetector.build_model(self.backend) #set opencv, ssd, dlib, mtcnn or retinaface
|
||||||
#self.fn512_model = model_from_json(open("modules/facenet512/facenet_model.json", "r").read())
|
#self.fn512_model = model_from_json(open("modules/facenet512/facenet_model.json", "r").read())
|
||||||
#self.fn512_model.load_weights('modules/facenet512/facenet_weights.h5')
|
#self.fn512_model.load_weights('modules/facenet512/facenet_weights.h5')
|
||||||
#self.resnet_model = InceptionResNetV1()
|
#self.resnet_model = InceptionResNetV1()
|
||||||
#model.summary()
|
#model.summary()
|
||||||
print("Loaded models ;)")
|
print("Loaded models ;)")
|
||||||
|
|
||||||
def clear(self):
|
|
||||||
self.imgs = {}
|
|
||||||
self.faces = {}
|
|
||||||
|
|
||||||
def l2_normalize(x):
|
|
||||||
return x / np.sqrt(np.sum(np.multiply(x, x)))
|
|
||||||
|
|
||||||
|
|
||||||
def dlib_detector(self, name):
|
# @doc find all the faces in the named image
|
||||||
UPSAMPLE = 1
|
def detect(self, name):
|
||||||
detector = dlib.get_frontal_face_detector()
|
boxes = []
|
||||||
#img = dlib.load_rgb_image(fname)
|
self.boxes = []
|
||||||
dets = detector(self.imgs[name], UPSAMPLE)
|
print("** deepface::detect ... %s" % name)
|
||||||
self.tree["img1_faces"] = len(dets)
|
try:
|
||||||
print("Number of faces detected in %s: %d" % (name, len(dets)))
|
# Get all faces from images with qualities, landmarks, and embeddings
|
||||||
for i, d in enumerate(dets):
|
faces = FaceDetector.detect_faces(self.detector, self.backend, self.imgs[name])
|
||||||
print("Detection %d: Left: %d Top: %d Right: %d Bottom: %d Confidence: %f" % (i, d.left(), d.top(), d.right(), d.bottom(), d.confidence() ) )
|
for a in faces:
|
||||||
print("Biggest face was %d" % maxi )
|
boxes.append(a[1])
|
||||||
return dets, scores, 0
|
# box is somehow = y1 / x2 / y2 / x1 for face_recognition .
|
||||||
|
# =
|
||||||
|
# Thats crazy lol. We need to fix that to x1, y1, x2, y2 with origin at top left
|
||||||
#def crop(self,img,topleft,botright):
|
for b in boxes:
|
||||||
|
self.boxes.append((b[0],b[1],b[0]+b[2],b[1]+b[3]))
|
||||||
|
print("found %d boxes for %s" % (len(self.boxes), name) )
|
||||||
def dlib_detector2(self, name):
|
except Exception as ex:
|
||||||
UPSAMPLE = 1
|
self.errstr = "image processing exception at get_faces: "+str(ex)
|
||||||
detector = dlib.get_frontal_face_detector()
|
return '{ "status":222310, "remark":"image processing exception", "guilty_param":"error", "guilty_value":"%s" }' % str(ex)
|
||||||
#img = dlib.load_rgb_image(fname)
|
return '{ "status":0, "remark":"OK", "faces":%d, "boxes":%s }' % (len(self.boxes), json.dumps(self.boxes))
|
||||||
dets, scores, idx = detector.run(self.imgs[name], UPSAMPLE, -1)
|
|
||||||
print("Number of faces detected: %d" % len(dets))
|
|
||||||
for i, d in enumerate(dets):
|
|
||||||
#print(d.__dir__())
|
|
||||||
print("Detection %d: Left: %d Top: %d Right: %d Bottom: %d Confidence: %f" % (i, d.left(), d.top(), d.right(), d.bottom(), scores[i] ) )
|
|
||||||
cropped = self.crop(self.imgs[name], d.left(), d.top(), d.right(), d.bottom())
|
|
||||||
cv2.imwrite("/tmp/%s_%d.png" % (name,i), cropped)
|
|
||||||
return dets, scores, 0
|
|
||||||
|
|
||||||
def crop(self, img, x1,y1,x2,y2):
|
|
||||||
#shape(img)
|
|
||||||
if x1<0 or x2<0 or y1<0 or y2<0:
|
|
||||||
return img
|
|
||||||
print("cropping %d/%d to %d/%d" % (x1,y1, x2,y2) )
|
|
||||||
cropped = img[ x1:x2, y1:y2 ]
|
|
||||||
return cropped
|
|
||||||
|
|
||||||
# Detects all the faces
|
|
||||||
def detect_all(self, name, fname):
|
|
||||||
print("Finding faces in %s: %s" % (name,fname))
|
|
||||||
detector = FaceDetector.build_model(self.backend) #set opencv, ssd, dlib, mtcnn or retinaface
|
|
||||||
self.faces[name] = FaceDetector.detect_faces(detector, self.backend, fname)
|
|
||||||
print(" Found %d faces for %s" % (len(self.faces), name))
|
|
||||||
cv2.imwrite("/tmp/%s_det.png" % (name,i), self.faces[name])
|
|
||||||
return len(self.faces[name])
|
|
||||||
|
|
||||||
|
|
||||||
# Get main face
|
|
||||||
def get_main_face(self):
|
|
||||||
faces = DeepFace.find_faces(self.img2_path)
|
|
||||||
print(faces)
|
|
||||||
|
|
||||||
|
|
||||||
# Compare the two pics
|
# Compare the two pics
|
||||||
def process(self):
|
def compare(self,name1,name2):
|
||||||
print("Matching %s vs %s" % (self.imfiles[0], self.imfiles[1]))
|
print("Matching %s vs %s" % (name1, name2))
|
||||||
verification = DeepFace.verify(img1_path = self.imfiles[0], img2_path = self.imfiles[1], model_name=self.model,\
|
f1 = "/tmp/%s.png" % (name1)
|
||||||
|
f2 = "/tmp/%s.png" % (name2)
|
||||||
|
self.save(name1,"/tmp")
|
||||||
|
self.save(name1,"/tmp")
|
||||||
|
verification = DeepFace.verify(img1_path = f1, img2_path = f2, model_name=self.model,\
|
||||||
detector_backend=self.backend, distance_metric="euclidean", enforce_detection=False, align=True, normalization="base")
|
detector_backend=self.backend, distance_metric="euclidean", enforce_detection=False, align=True, normalization="base")
|
||||||
return verification
|
return json.dumps(verification)
|
||||||
|
|
||||||
def facematch(self, name1, name2):
|
|
||||||
dets1, scores1, best1 = self.dlib_detector2(name1)
|
|
||||||
self.tree["img1_name"] = name1
|
|
||||||
self.tree["img1_faces"] = len(dets1)
|
|
||||||
if len(dets1) > 0:
|
|
||||||
self.tree["img1_best"] = best1
|
|
||||||
self.tree["img1_qual"] = scores1[best1]
|
|
||||||
|
|
||||||
dets2, scores2, best2 = self.dlib_detector2(name2)
|
|
||||||
self.tree["imgs_name"] = name2
|
|
||||||
self.tree["img2_faces"] = len(dets2)
|
|
||||||
if len(dets2) > 0:
|
|
||||||
self.tree["img2_best"] = best2
|
|
||||||
self.tree["img2_qual"] = scores2[best2]
|
|
||||||
|
|
||||||
if len(dets1) < 1:
|
|
||||||
return '{ "status":787101, "remark":"no faces in cam image", "data":%s }' % (json.dumps(self.tree))
|
|
||||||
if len(dets2) < 1:
|
|
||||||
return '{ "status":787102, "remark":"no faces in ID image", "data":%s }' % (json.dumps(self.tree))
|
|
||||||
|
|
||||||
verif = d.process()
|
|
||||||
self.tree["score"] = verif["distance"]
|
|
||||||
self.tree["threshold"] = verif["threshold"]
|
|
||||||
if verif["distance"] > verif["threshold"]:
|
|
||||||
self.tree["matched"] = 1
|
|
||||||
|
|
||||||
return '{ "status":0, "remark":"OK", "data":%s }' % (json.dumps(self.tree))
|
|
||||||
|
|
||||||
|
|
||||||
def analyse(self):
|
def metadata(self,name):
|
||||||
analysis = DeepFace.analyze(img_path = self.img1_path, actions = ["age", "gender", "emotion", "race"])
|
f1 = "/tmp/%s.png" % (name)
|
||||||
return json.dumps(analysis)
|
metadata = DeepFace.analyze(img_path = f1, actions = ["age", "gender", "emotion", "race"])
|
||||||
|
return json.dumps(metadata)
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
# params - param1 = detector name [ 'opencv', 'ssd', 'dlib', 'mtcnn', 'retinaface', 'mediapipe', 'yolov8', 'yunet', 'fastmtcnn' ]
|
||||||
|
# param2 = matcher name [ "VGG-Face", "Facenet", "Facenet512", "OpenFace", "DeepFace", "DeepID", "ArcFace", "Dlib", "SFace" ]
|
||||||
|
# param3 = treamtent
|
||||||
|
#
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# tests
|
# tests
|
||||||
# big/clear big/clear small/side small/clear obscured ig card IR cam some rotated
|
# big/clear big/clear small/side small/clear obscured ig card IR cam some rotated
|
||||||
@ -149,57 +98,45 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
# Test the dlib image detector
|
# Test the dlib image detector
|
||||||
d = Deepfacex()
|
d = Deepfacex()
|
||||||
d.init("yolov8","SFace")
|
d.init(sys.argv[2],sys.argv[3])
|
||||||
|
|
||||||
|
|
||||||
# kiosk test
|
|
||||||
if sys.argv[1]=="kiosk":
|
if sys.argv[1]=="kiosk":
|
||||||
print(d.load("localcam","regula","/tmp/localcam.png","/tmp/regula/Portrait_0.jpg"))
|
jsonstr = d.crowd_vs_govid("pic1", "testimg/ox.jpg", 0, "pic2", "testimg/ox_govid.jpg", 0.25)
|
||||||
d.detect_all("localcam", "/tmp/localcam.png")
|
print(jsonstr)
|
||||||
d.detect_all("regula", "/tmp/regula/Portrait_0.jpg")
|
|
||||||
#print (d.facematch("localcam","regula"))
|
|
||||||
|
|
||||||
# quick test
|
if sys.argv[1]=="messi":
|
||||||
if sys.argv[1]=="quick":
|
jsonstr = d.crowd_vs_govid("pic1", "testimg/messi4.jpg", 0, "pic2", "testimg/messi2.jpg", 0.25)
|
||||||
d.visual = 1
|
print(jsonstr)
|
||||||
d.load("messi1","messi2","testimg/messi1.jpg","testimg/messi2.jpg")
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
# 99.4% detection
|
if sys.argv[1]=="maiden":
|
||||||
if sys.argv[1]=="detect":
|
jsonstr = d.crowd_vs_govid("pic1", "testimg/ironmaiden.jpg", 0, "pic2", "testimg/davemurray.jpg", 0.25)
|
||||||
|
print(jsonstr)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if sys.argv[3]=="detectN":
|
||||||
# lfw
|
# lfw
|
||||||
n=0
|
n=0
|
||||||
|
ims =0
|
||||||
print("LFW Testing")
|
print("LFW Testing")
|
||||||
for lfwdir in sorted(os.listdir("/home/carl/Downloads/lfw")):
|
t1 = time.time()
|
||||||
for lfw in sorted(os.listdir("/home/carl/Downloads/lfw/" + lfwdir)):
|
for lfwdir in sorted(os.listdir("lfw")):
|
||||||
d.load(lfw, "/home/carl/Downloads/lfw/" + lfwdir + "/" + lfw)
|
for lfw in sorted(os.listdir("lfw/" + lfwdir)):
|
||||||
d.dlib_detector(lfw)
|
#d.load1(lfw, "lfw/" + lfwdir + "/" + lfw)
|
||||||
|
ims += d.detect_all(lfw, "lfw/" + lfwdir + "/" + lfw)
|
||||||
d.clear()
|
d.clear()
|
||||||
n+=1
|
n+=1
|
||||||
if n > 100:
|
if n > 10:
|
||||||
sys.exit(0)
|
t2 = time.time()
|
||||||
|
print ("completed %d , detected %d in %d seconds" % (n,ims,(t2-t1)) )
|
||||||
|
|
||||||
|
|
||||||
# Three difficult faces; retinaface,yolov8 find all 3; dlib,mtcnn, finds 2; opencv,ssd,mediapipe,yunet find <=1
|
|
||||||
if sys.argv[1]=="algo1a":
|
|
||||||
mod = 'Dlib'
|
|
||||||
d.load("ox_matt","testimg/ox_matt.png")
|
|
||||||
for back in [ 'opencv', 'ssd', 'dlib', 'mtcnn', 'retinaface', 'mediapipe', 'yolov8', 'yunet', 'fastmtcnn' ]:
|
|
||||||
d.init(back,mod)
|
|
||||||
print (back,mod,d.detect_all("ox_matt"))
|
|
||||||
sys.exit(0)
|
|
||||||
# Five clear but rotated faces; dlib,mtcnn,retinaface,yolov8 find all 5; ssd find 4; opencv,mediapipe,yunet find <=3
|
|
||||||
if sys.argv[1]=="algo1b":
|
|
||||||
mod = 'Dlib'
|
|
||||||
d.load("5people","testimg/fivepeople.jpg")
|
|
||||||
for back in [ 'opencv', 'ssd', 'dlib', 'mtcnn', 'retinaface', 'mediapipe', 'yolov8', 'yunet', 'fastmtcnn' ]:
|
|
||||||
d.init(back,mod)
|
|
||||||
print (back,mod,d.detect_all("5people"))
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
|
|
||||||
# Try and find difficult messi matches;
|
# Try and find difficult messi matches;
|
||||||
# Best performers: Dlib, Sface
|
# Best performers: Dlib, Sface
|
||||||
if sys.argv[1]=="algo2a":
|
if sys.argv[3]=="algo2a":
|
||||||
back = "yolov8"
|
back = "yolov8"
|
||||||
for mod in [ "VGG-Face", "Facenet", "Facenet512", "OpenFace", "DeepFace", "DeepID", "ArcFace", "Dlib", "SFace" ]:
|
for mod in [ "VGG-Face", "Facenet", "Facenet512", "OpenFace", "DeepFace", "DeepID", "ArcFace", "Dlib", "SFace" ]:
|
||||||
d.init(back,mod)
|
d.init(back,mod)
|
||||||
@ -219,7 +156,7 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
# Get stats on a particular algorithm. dlib+DLib=99/100|971/1000=97.1%; dlib+Facenet512=94/100; dlib+VGG=87100;
|
# Get stats on a particular algorithm. dlib+DLib=99/100|971/1000=97.1%; dlib+Facenet512=94/100; dlib+VGG=87100;
|
||||||
# dlib+SFace=100/100|199/200|991/1000==99.1; 1662/1680=98.9%
|
# dlib+SFace=100/100|199/200|991/1000==99.1; 1662/1680=98.9%
|
||||||
if sys.argv[1]=="stats":
|
if sys.argv[3]=="stats":
|
||||||
# lfw
|
# lfw
|
||||||
n=0
|
n=0
|
||||||
print("LFW Stats")
|
print("LFW Stats")
|
||||||
|
|||||||
@ -52,19 +52,28 @@ class FaceRecognition(FaceClass):
|
|||||||
IN_ENGLISH = "A class for checking the similarity between two faces."
|
IN_ENGLISH = "A class for checking the similarity between two faces."
|
||||||
id_qual = 0.7
|
id_qual = 0.7
|
||||||
photo_qual = 0.7
|
photo_qual = 0.7
|
||||||
|
emo_model = None
|
||||||
face_match_json = None
|
face_match_json = None
|
||||||
conf = []
|
conf = []
|
||||||
|
|
||||||
|
def init(self,backend=None,model=None):
|
||||||
|
model = load_model("./emotion_detector_models/model.hdf5")
|
||||||
|
|
||||||
|
|
||||||
# @doc find all the faces in the named image
|
# @doc find all the faces in the named image
|
||||||
def get_faces(self, name):
|
def detect(self, name):
|
||||||
print("** get_faces ... %s" % name)
|
boxes = []
|
||||||
|
self.boxes = []
|
||||||
|
print("** face_recognition::detect ... %s" % name)
|
||||||
try:
|
try:
|
||||||
# Get all faces from images with qualities, landmarks, and embeddings
|
# Get all faces from images with qualities, landmarks, and embeddings
|
||||||
self.boxes = face_recognition.face_locations(self.imgs[name], number_of_times_to_upsample=2, model='hog')
|
boxes = face_recognition.face_locations(self.imgs[name], number_of_times_to_upsample=2, model='hog')
|
||||||
|
# box is somehow = y1 / x2 / y2 / x1 for face_recognition .
|
||||||
|
# =
|
||||||
|
# Thats crazy lol. We need to fix that to x1, y1, x2, y2 with origin at top left
|
||||||
|
for b in boxes:
|
||||||
|
self.boxes.append((b[3],b[0],b[1],b[2]))
|
||||||
print("found %d boxes for %s" % (len(self.boxes), name) )
|
print("found %d boxes for %s" % (len(self.boxes), name) )
|
||||||
# Get numerical representation of faces (required for face match)
|
|
||||||
self.encs[name] = face_recognition.face_encodings(self.imgs[name])[0]
|
|
||||||
#print("encoding for %s : " % name, self.encs[name])
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
self.errstr = "image processing exception at get_faces: "+str(ex)
|
self.errstr = "image processing exception at get_faces: "+str(ex)
|
||||||
return '{ "status":222310, "remark":"image processing exception", "guilty_param":"error", "guilty_value":"%s" }' % str(ex)
|
return '{ "status":222310, "remark":"image processing exception", "guilty_param":"error", "guilty_value":"%s" }' % str(ex)
|
||||||
@ -72,41 +81,52 @@ class FaceRecognition(FaceClass):
|
|||||||
|
|
||||||
|
|
||||||
# @doc find the landmarks of the given face
|
# @doc find the landmarks of the given face
|
||||||
def get_landmarks(self, name):
|
def landmarks(self, name):
|
||||||
landmarks = face_recognition.face_landmarks(self.imgs[name])
|
landmarks = face_recognition.face_landmarks(self.imgs[name])
|
||||||
return '{ "status":0, "remark":"OK", "landmarks":%s }' % json.dumps(landmarks)
|
return '{ "status":0, "remark":"OK", "landmarks":%s }' % json.dumps(landmarks)
|
||||||
|
|
||||||
|
|
||||||
|
# @doc find the metadata of the given face
|
||||||
|
def metadata(self, name):
|
||||||
|
emotion_dict= {'Angry': 0, 'Sad': 5, 'Neutral': 4, 'Disgust': 1, 'Surprise': 6, 'Fear': 2, 'Happy': 3}
|
||||||
|
face_image = cv2.imread("..test_images/39.jpg")
|
||||||
|
face_image = cv2.resize(face_image, (48,48))
|
||||||
|
face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
|
||||||
|
face_image = np.reshape(face_image, [1, face_image.shape[0], face_image.shape[1], 1])
|
||||||
|
model = load_model("./emotion_detector_models/model.hdf5")
|
||||||
|
predicted_class = np.argmax(model.predict(face_image))
|
||||||
|
label_map = dict((v,k) for k,v in emotion_dict.items())
|
||||||
|
predicted_label = label_map[predicted_class]
|
||||||
|
return '{ "status":88324, "remark":"override this", "landmarks":%s }' % json.dumps(landmarks)
|
||||||
|
|
||||||
|
|
||||||
# @doc compare two named images, previously loaded
|
# @doc compare two named images, previously loaded
|
||||||
def compare(self, name1, name2):
|
def compare(self, name1, name2):
|
||||||
print("** computing ... %s vs %s" % (name1,name2))
|
print("** face_recognition::compare ... %s vs %s" % (name1,name2))
|
||||||
|
self.encs[name1] = face_recognition.face_encodings(self.imgs[name1])
|
||||||
|
self.encs[name2] = face_recognition.face_encodings(self.imgs[name2])
|
||||||
|
if self.encs[name1]==[]:
|
||||||
|
return '{ "status":14330, "remark":"could not encode image", "guilty_param":"img1", "guilty_value":"%s" }' % name1
|
||||||
|
if self.encs[name2]==[]:
|
||||||
|
return '{ "status":14331, "remark":"could not encode image", "guilty_param":"img2", "guilty_value":"%s" }' % name2
|
||||||
|
print(self.encs.keys())
|
||||||
|
print(self.encs[name1])
|
||||||
|
print(self.encs[name2])
|
||||||
try:
|
try:
|
||||||
res = face_recognition.compare_faces([self.encs[name1]], self.encs[name2])
|
res = face_recognition.compare_faces([self.encs[name1][0]], self.encs[name2][0])
|
||||||
print("Match is ",res)
|
print("Match is ",res)
|
||||||
self.match_score = 1000 * (1 - face_recognition.face_distance([self.encs[name1]], self.encs[name2]))
|
self.match_score = 1000 * (1 - face_recognition.face_distance([self.encs[name1][0]], self.encs[name2][0]))
|
||||||
print("Score is ",self.match_score)
|
print("Score is ",self.match_score)
|
||||||
|
|
||||||
# Create .json
|
# Create .json
|
||||||
self.tree["score"] = self.match_score[0]
|
self.tree["score"] = self.match_score[0]
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
|
print("** face_recognition::compare exception ... " + str(ex) )
|
||||||
self.errstr = "image comparison exception at compute_scores: "+str(ex)
|
self.errstr = "image comparison exception at compute_scores: "+str(ex)
|
||||||
return '{ "status":332410, "remark":"%s" }' % self.errstr
|
return '{ "status":332410, "remark":"%s" }' % self.errstr
|
||||||
return '{ "status":0, "remark":"OK", "score":%d }' % self.match_score[0]
|
return '{ "status":0, "remark":"OK", "score":%d }' % self.match_score[0]
|
||||||
|
|
||||||
|
|
||||||
def get_scores(self):
|
|
||||||
return json.dumps(self.tree)
|
|
||||||
|
|
||||||
|
|
||||||
def detect_all(self):
|
|
||||||
n=0
|
|
||||||
self.load("localcam", "/tmp/localcam.png", "regula", "/tmp/regula/Portrait_0.jpg")
|
|
||||||
self.get_faces("localcam")
|
|
||||||
self.get_faces("regula")
|
|
||||||
print("computed ... ", d.compute_scores("regula","localcam"))
|
|
||||||
print("scores ... " , d.get_scores())
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
@ -114,24 +134,25 @@ if __name__ == '__main__':
|
|||||||
d = FaceRecognition()
|
d = FaceRecognition()
|
||||||
|
|
||||||
if sys.argv[1]=="kiosk":
|
if sys.argv[1]=="kiosk":
|
||||||
# lfw
|
jsonstr = d.crowd_vs_govid("pic1", "testimg/ox.jpg", 0, "pic2", "testimg/ox_govid.jpg", 0.25)
|
||||||
n=0
|
print(jsonstr)
|
||||||
d.load("localcam", "/tmp/localcam.png", "regula", "/tmp/regula/Portrait_0.jpg")
|
|
||||||
d.get_faces("localcam")
|
if sys.argv[1]=="messi":
|
||||||
d.get_faces("regula")
|
jsonstr = d.crowd_vs_govid("pic1", "testimg/messi4.jpg", 0, "pic2", "testimg/messi2.jpg", 0)
|
||||||
print("computed ... ", d.compute_scores("regula","localcam"))
|
print(jsonstr)
|
||||||
print("scores ... " , d.get_scores())
|
|
||||||
#print(d.get_landmarks("localcam"))
|
if sys.argv[1]=="maiden":
|
||||||
#print(d.get_landmarks("regula"))
|
jsonstr = d.crowd_vs_govid("pic1", "testimg/ironmaiden.jpg", 0, "pic2", "testimg/davemurray.jpg", 0)
|
||||||
sys.exit(0)
|
print(jsonstr)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if sys.argv[1]=="match":
|
if sys.argv[1]=="match":
|
||||||
# lfw
|
# lfw
|
||||||
n=0
|
n=0
|
||||||
print("LFW Matching")
|
print("LFW Matching")
|
||||||
for lfw in sorted(os.listdir("/home/carl/Downloads/lfw/0002")):
|
for lfw in sorted(os.listdir("lfw/0001")):
|
||||||
d.load2("localcam","regula", "/home/carl/" + lfw, "/home/carl/Downloads/lfw/0002/" + lfw)
|
d.load2("localcam","regula", "lfw/0001" + lfw, "lfw/0002/" + lfw)
|
||||||
d.get_faces()
|
d.get_faces()
|
||||||
d.compute_scores()
|
d.compute_scores()
|
||||||
print(d.get_scores())
|
print(d.get_scores())
|
||||||
|
|||||||
256
src/faceclass.py
256
src/faceclass.py
@ -3,6 +3,8 @@ import os
|
|||||||
import cv2
|
import cv2
|
||||||
import json
|
import json
|
||||||
import base64
|
import base64
|
||||||
|
import time
|
||||||
|
from copy import copy, deepcopy
|
||||||
|
|
||||||
from matplotlib import pyplot as plt
|
from matplotlib import pyplot as plt
|
||||||
|
|
||||||
@ -12,21 +14,43 @@ class FaceClass(object):
|
|||||||
encs = {}
|
encs = {}
|
||||||
faces = {}
|
faces = {}
|
||||||
visual = 0
|
visual = 0
|
||||||
imfiles = []
|
files = {}
|
||||||
imnames = {}
|
boxes = []
|
||||||
errstr = ""
|
errstr = ""
|
||||||
tree = { "device1":"NA", "device2":"NA", "threshold":380, "device1_qual":0.5, "device2_qual":0.5, "score":0 }
|
tree = { "device1":"NA", "device2":"NA", "threshold":380, "device1_qual":0.5, "device2_qual":0.5, "score":0 }
|
||||||
|
|
||||||
|
# # # # #####
|
||||||
|
# ## # # #
|
||||||
|
# # # # # #
|
||||||
|
# # # # # #
|
||||||
|
# # ## # #
|
||||||
|
# # # # #
|
||||||
|
|
||||||
|
|
||||||
# Prep tasks
|
# Prep tasks
|
||||||
def init(self):
|
def init(self, backend=None, mod=None):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
self.imgs = {}
|
||||||
|
self.files = {}
|
||||||
|
self.faces = {}
|
||||||
|
|
||||||
# Load a pics using the device label
|
|
||||||
|
# #### ## #####
|
||||||
|
# # # # # # #
|
||||||
|
# # # # # # #
|
||||||
|
# # # ###### # #
|
||||||
|
# # # # # # #
|
||||||
|
###### #### # # #####
|
||||||
|
|
||||||
|
# @doc Load a pic using the device label
|
||||||
def load1(self, name,fname):
|
def load1(self, name,fname):
|
||||||
|
print(" Loading image '%s' from file %s" % (name, fname))
|
||||||
if not os.path.isfile(fname):
|
if not os.path.isfile(fname):
|
||||||
|
print(" * file not found: %s" % (fname))
|
||||||
return '{ "status":442565, "remark":"file name not found", "guilty_param":"fname", "guilty_value":"%s" }' % (fname)
|
return '{ "status":442565, "remark":"file name not found", "guilty_param":"fname", "guilty_value":"%s" }' % (fname)
|
||||||
|
self.files[name] = fname
|
||||||
self.imgs[name] = cv2.imread(fname)
|
self.imgs[name] = cv2.imread(fname)
|
||||||
print(" Loaded %s from file %s" % (name, fname))
|
print(" Loaded %s from file %s" % (name, fname))
|
||||||
return '{ "status":0, "remark":"OK", "name":"%s", "fname":"%s" }' % (name,fname)
|
return '{ "status":0, "remark":"OK", "name":"%s", "fname":"%s" }' % (name,fname)
|
||||||
@ -34,96 +58,192 @@ class FaceClass(object):
|
|||||||
|
|
||||||
def load2(self, name1,fname1,name2,fname2):
|
def load2(self, name1,fname1,name2,fname2):
|
||||||
print("FaceClass loading files ....................... ")
|
print("FaceClass loading files ....................... ")
|
||||||
if not os.path.isfile(fname1):
|
self.load1(name1,fname1)
|
||||||
print("Cant access file ",fname1)
|
self.load1(name2,fname2)
|
||||||
return '{ "status":442566, "remark":"file not found", "guilty_param":"fname", "guilty_value":"%s" }' % (fname1)
|
|
||||||
if not os.path.isfile(fname2):
|
|
||||||
print("Cant access file ",fname2)
|
|
||||||
return '{ "status":442567, "remark":"file not found", "guilty_param":"fname", "guilty_value":"%s" }' % (fname2)
|
|
||||||
self.imfiles.append(fname1)
|
|
||||||
self.imfiles.append(fname2)
|
|
||||||
self.imnames[name1] = fname1
|
|
||||||
self.imnames[name2] = fname2
|
|
||||||
self.imgs[name1] = cv2.imread(fname1)
|
|
||||||
self.imgs[name2] = cv2.imread(fname2)
|
|
||||||
if self.visual:
|
|
||||||
p1 = plt.imshow(name1, self.imgs[name1])
|
|
||||||
p2.imshow(name2, self.imgs[name2])
|
|
||||||
p1.show()
|
|
||||||
print("FaceClass: Loaded %s from file %s" % (name1, fname1))
|
|
||||||
print("FaceClass: Loaded %s from file %s" % (name2, fname2))
|
|
||||||
return '{ "status":0, "remark":"OK", "name1":"%s", "fname1":"%s", "name2":"%s", "fname2":"%s" }' % (name1,fname1,name2,fname2)
|
return '{ "status":0, "remark":"OK", "name1":"%s", "fname1":"%s", "name2":"%s", "fname2":"%s" }' % (name1,fname1,name2,fname2)
|
||||||
|
|
||||||
|
|
||||||
# @doc draw a box and save with a new name
|
##### ###### ##### ###### #### #####
|
||||||
def rect(self, name, x1, y1, x2, y2, newname):
|
# # # # # # # #
|
||||||
self.imgs[newname] = cv2.rectangle(self.imgs[name],(x1,y1),(x2,y2), (0,255,0), 4)
|
# # ##### # ##### # #
|
||||||
|
# # # # # # #
|
||||||
|
# # # # # # # #
|
||||||
|
##### ###### # ###### #### #
|
||||||
|
|
||||||
# @doc crop an image and save with a new name
|
|
||||||
def crop(self, name, x1, y1, x2, y2, newname):
|
|
||||||
print(x1,y1,x2,y2)
|
|
||||||
self.imgs[newname] = self.imgs[name][x1:x2, y1:y2]
|
|
||||||
|
|
||||||
# Load the config
|
# Find all faces - put resul in self.boxes - an array of rectangles where faces are
|
||||||
def init(self):
|
def detect(self,name):
|
||||||
with open("/etc/ukdi.json","r") as f:
|
self.faces = []
|
||||||
self.conf = json.loads(f.read())
|
|
||||||
|
|
||||||
# Find all faces
|
|
||||||
def get_faces(self,name):
|
|
||||||
return '{ "status":88241, "remark":"override this!" }'
|
return '{ "status":88241, "remark":"override this!" }'
|
||||||
|
|
||||||
# Find best face
|
|
||||||
def get_best_face(self):
|
|
||||||
return '{ "status":88242, "remark":"override this!" }'
|
|
||||||
|
|
||||||
|
|
||||||
# @doc find the biggest face in the named image
|
# @doc find the biggest face in the named image
|
||||||
def get_ideal(self, name):
|
# Imge has origin at top left and value are b = [ X1, Y1, X2, Y2 ]
|
||||||
self.boxes = []
|
def ideal(self, name, rectname, cropname):
|
||||||
self.get_faces(name)
|
|
||||||
found = -1
|
found = -1
|
||||||
ix = 0
|
|
||||||
biggest = -1
|
biggest = -1
|
||||||
for b in self.boxes:
|
print("** faceclass::ideal ... %s with %d boxes" % (name, len(self.boxes) ))
|
||||||
print(json.dumps(b))
|
# resize boxes
|
||||||
area = b[3] * b[2]
|
for i in range(len(self.boxes)):
|
||||||
|
b = self.boxes[i]
|
||||||
|
area = (b[2]-b[0]) * (b[3]-b[1])
|
||||||
print(b,area)
|
print(b,area)
|
||||||
if area > biggest:
|
if area > biggest:
|
||||||
found = ix
|
found = i
|
||||||
biggest = area
|
biggest = area
|
||||||
# box returns (left,top,right,bottom) - ffs
|
|
||||||
#self.crop(name,b[1],b[0],b[3],b[2],"_crop")
|
|
||||||
#self.rect(name,b[0],b[1],b[2],b[3],"_rect")
|
|
||||||
# rect expects x1,y1,x2,y2
|
|
||||||
self.rect(name,b[0],b[1],b[1]+b[3],b[0]+b[2],"_rect")
|
|
||||||
ix+=1
|
|
||||||
if found < 0:
|
if found < 0:
|
||||||
return '{ "status":8572421, "remark":"no ideal face", "guilty_param":"name", "guilty_value":"%s" }' % name
|
return '{ "status":8572421, "remark":"no ideal face", "guilty_param":"name", "guilty_value":"%s" }' % name
|
||||||
|
b = self.boxes[found]
|
||||||
|
# extract crops and highlights - colours are BGR
|
||||||
|
self.imgs[cropname] = self.crop(name,b[0],b[1],b[2],b[3])
|
||||||
|
self.imgs[rectname] = deepcopy(self.imgs[name])
|
||||||
|
#print(self.imgs[name])
|
||||||
|
#print(self.imgs[rectname])
|
||||||
|
for bs in self.boxes:
|
||||||
|
self.rect(rectname,bs[0],bs[1],bs[2],bs[3],(255,0,0))
|
||||||
|
self.rect(rectname,b[0],b[1],b[2],b[3],(0,255,0))
|
||||||
return '{ "status":0, "remark":"OK", "faces":%d, "ideal_ix":%s, "ideal_area":%d, "boxes":%s }' % (len(self.boxes), found, biggest, json.dumps(self.boxes))
|
return '{ "status":0, "remark":"OK", "faces":%d, "ideal_ix":%s, "ideal_area":%d, "boxes":%s }' % (len(self.boxes), found, biggest, json.dumps(self.boxes))
|
||||||
|
|
||||||
|
# Find landmarks
|
||||||
|
def landmarks(self):
|
||||||
|
return '{ "status":88243, "remark":"override this!" }'
|
||||||
|
|
||||||
|
# Find metadata (age etc)
|
||||||
|
def metadata(self,name):
|
||||||
|
return '{ "status":88244, "remark":"override this!" }'
|
||||||
|
|
||||||
|
|
||||||
|
#### #### # # ##### ## ##### ######
|
||||||
|
# # # # ## ## # # # # # # #
|
||||||
|
# # # # ## # # # # # # # #####
|
||||||
|
# # # # # ##### ###### ##### #
|
||||||
|
# # # # # # # # # # # #
|
||||||
|
#### #### # # # # # # # ######
|
||||||
|
|
||||||
|
|
||||||
|
# Match two faces
|
||||||
|
def compare(self,name1,name2):
|
||||||
|
return '{ "status":88245, "remark":"override this!" }'
|
||||||
|
|
||||||
|
def detect_all(self):
|
||||||
|
n=0
|
||||||
|
self.load("localcam", "/tmp/localcam.png", "regula", "/tmp/regula/Portrait_0.jpg")
|
||||||
|
self.get_faces("localcam")
|
||||||
|
self.get_faces("regula")
|
||||||
|
self.get_base_face()
|
||||||
|
print("computed ... ", d.compute_scores("regula","localcam"))
|
||||||
|
print("scores ... " , d.get_scores())
|
||||||
|
|
||||||
|
|
||||||
|
# @doc This does everything for you. If you are smartserv, "crowd" means cam and "govid" means regula pic
|
||||||
|
def crowd_vs_govid(self, name1,file1,scale1, name2,file2,scale2):
|
||||||
|
print("##1##")
|
||||||
|
t = time.time()
|
||||||
|
print ("///t = ", time.time() - t)
|
||||||
|
self.load1(name1, file1)
|
||||||
|
if scale1 !=0:
|
||||||
|
self.shrink(name1,scale1)
|
||||||
|
jsonstr = self.detect(name1)
|
||||||
|
if json.loads(jsonstr)["status"]!=0:
|
||||||
|
return jsonstr
|
||||||
|
self.boxscale(name1,0.3)
|
||||||
|
self.ideal(name1,"pic1_rect","pic1_crop")
|
||||||
|
self.save(name1,"/tmp")
|
||||||
|
self.save("pic1_rect","/tmp")
|
||||||
|
self.save("pic1_crop","/tmp")
|
||||||
|
print(self.imgs.keys())
|
||||||
|
print ("///t = ", time.time() - t)
|
||||||
|
|
||||||
|
print("##2##")
|
||||||
|
print ("///t = ", time.time() - t)
|
||||||
|
self.load1(name2, file2)
|
||||||
|
if scale2 !=0:
|
||||||
|
self.shrink(name2,scale2)
|
||||||
|
self.save(name2,"/tmp")
|
||||||
|
jsonstr = self.detect(name2)
|
||||||
|
if json.loads(jsonstr)["status"]!=0:
|
||||||
|
return jsonstr
|
||||||
|
self.boxscale(name2,0.3)
|
||||||
|
self.ideal(name2,"pic2_rect","pic2_crop")
|
||||||
|
self.save(name2,"/tmp")
|
||||||
|
self.save("pic2_rect","/tmp")
|
||||||
|
self.save("pic2_crop","/tmp")
|
||||||
|
print(self.imgs.keys())
|
||||||
|
print ("///t = ", time.time() - t)
|
||||||
|
|
||||||
|
print("##R##")
|
||||||
|
jsonstr = self.compare("pic1_crop","pic2_crop")
|
||||||
|
print(jsonstr)
|
||||||
|
print ("///t = ", time.time() - t)
|
||||||
|
|
||||||
|
|
||||||
|
###### ##### # #####
|
||||||
|
# # # # #
|
||||||
|
##### # # # #
|
||||||
|
# # # # #
|
||||||
|
# # # # #
|
||||||
|
###### ##### # #
|
||||||
|
|
||||||
|
|
||||||
|
# use this when the face isolation boxes arent big enough
|
||||||
|
def boxscale(self,name,skale=0.2):
|
||||||
|
i = 0
|
||||||
|
for b in self.boxes:
|
||||||
|
self.boxes[i] = self.rebox(b[0],b[1],b[2],b[3],self.imgs[name].shape, skale)
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
def rebox(self,x1,y1,x2,y2,shape,scale=0.2):
|
||||||
|
print("!!!!!!1 rebox with shape ",shape)
|
||||||
|
xx1 = x1 - int((x2-x1)*scale)
|
||||||
|
xx2 = x2 + int((x2-x1)*scale)
|
||||||
|
yy1 = y1 - int((y2-y1)*scale)
|
||||||
|
yy2 = y2 + int((y2-y1)*scale)
|
||||||
|
if xx1 < 0:
|
||||||
|
xx1 = 0
|
||||||
|
if yy1 < 0:
|
||||||
|
yy1 = 0
|
||||||
|
if xx2 > shape[1]:
|
||||||
|
xx2 = shape[1]
|
||||||
|
if yy2 > shape[0]:
|
||||||
|
yy2 = shape[0]
|
||||||
|
return (xx1,yy1,xx2,yy2)
|
||||||
|
|
||||||
|
# @doc draw a box on an image
|
||||||
|
def rect(self, name, x1, y1, x2, y2, color):
|
||||||
|
#print ("recting ",x1,y1,x2,y2)
|
||||||
|
cv2.rectangle(self.imgs[name],(x1,y1),(x2,y2), color, 4)
|
||||||
|
|
||||||
|
# @doc crop an image, allowing a gutter.
|
||||||
|
def crop(self, name, x1, y1, x2, y2):
|
||||||
|
print ("cropping ",x1,y1,x2,y2)
|
||||||
|
return self.imgs[name][y1:y2 , x1:x2]
|
||||||
|
|
||||||
|
# @doc crop an image, allowing a gutter.
|
||||||
|
def shrink(self, name, skale=0.5):
|
||||||
|
print ("shrinking ",name)
|
||||||
|
self.imgs[name] = cv2.resize(self.imgs[name],None,fx=skale,fy=skale)
|
||||||
|
|
||||||
|
##### ###### ##### # # ##### # #
|
||||||
|
# # # # # # # # ## #
|
||||||
|
# # ##### # # # # # # # #
|
||||||
|
##### # # # # ##### # # #
|
||||||
|
# # # # # # # # # ##
|
||||||
|
# # ###### # #### # # # #
|
||||||
|
|
||||||
|
def get_scores(self):
|
||||||
|
return json.dumps(self.tree)
|
||||||
|
|
||||||
# return a base64 version of the pic in memory
|
# return a base64 version of the pic in memory
|
||||||
def dump64(self,which,format="png"):
|
def dump64(self,which):
|
||||||
if format=="jpg":
|
|
||||||
_ , im_arr = cv2.imencode('.jpg', self.imgs[which])
|
|
||||||
img_as_txt = base64.b64encode(im_arr)
|
|
||||||
return b'data:image/jpeg;base64,'+img_as_txt
|
|
||||||
_ , im_arr = cv2.imencode('.png', self.imgs[which])
|
_ , im_arr = cv2.imencode('.png', self.imgs[which])
|
||||||
img_as_txt = base64.b64encode(im_arr)
|
img_as_txt = base64.b64encode(im_arr)
|
||||||
return b'data:image/png;base64, '+img_as_txt
|
return b'data:image/png;base64, '+img_as_txt
|
||||||
|
|
||||||
# Find landmarks
|
def save(self,which,dir,format="png"):
|
||||||
def get_landmarks(self):
|
cv2.imwrite(dir + "/" + which + "." + format, self.imgs[which])
|
||||||
return '{ "status":88243, "remark":"override this!" }'
|
|
||||||
|
|
||||||
# Find metadata (age etc)
|
|
||||||
def get_metadata(self,name):
|
|
||||||
return '{ "status":88244, "remark":"override this!" }'
|
|
||||||
|
|
||||||
# Match two faces
|
|
||||||
def process(self,name1,name2):
|
|
||||||
return '{ "status":88245, "remark":"override this!" }'
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -2,7 +2,9 @@
|
|||||||
# Paravision based face matcher
|
# Paravision based face matcher
|
||||||
#
|
#
|
||||||
import json
|
import json
|
||||||
|
import sys
|
||||||
|
|
||||||
|
### export PYTHONPATH=/wherever/yoloserv/modules ... as long as "paravision/.../" is in there
|
||||||
from paravision.recognition.exceptions import ParavisionException
|
from paravision.recognition.exceptions import ParavisionException
|
||||||
from paravision.recognition.engine import Engine
|
from paravision.recognition.engine import Engine
|
||||||
from paravision.recognition.sdk import SDK
|
from paravision.recognition.sdk import SDK
|
||||||
@ -10,27 +12,45 @@ import paravision.recognition.utils as pru
|
|||||||
#from openvino.inference_engine import Engineq
|
#from openvino.inference_engine import Engineq
|
||||||
|
|
||||||
#from deepface.basemodels import VGGFace, OpenFace, Facenet, FbDeepFace, DeepID
|
#from deepface.basemodels import VGGFace, OpenFace, Facenet, FbDeepFace, DeepID
|
||||||
|
from faceclass import FaceClass
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Facematch(object):
|
class Paravision(FaceClass):
|
||||||
|
|
||||||
def init(self):
|
def init(self,backend=None,model=None):
|
||||||
print("@@@ initialising paravision")
|
print("@@@ initialising paravision")
|
||||||
try:
|
try:
|
||||||
self.sdk = SDK(engine=Engine.AUTO)
|
self.sdk = SDK(engine=Engine.AUTO)
|
||||||
except ParavisionException:
|
except ParavisionException:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Read an image from a file
|
|
||||||
def loadiii(self, imgpath):
|
|
||||||
if not os.path.exists(imgpath):
|
# @doc find all the faces in the named image
|
||||||
print("File not found ",imgpath)
|
def detect(self, name):
|
||||||
return False
|
boxes = []
|
||||||
self.imgpath = imgpath
|
self.boxes = []
|
||||||
self.image = pru.load_image(imgpath)
|
print("** face_recognition::detect ... %s" % name)
|
||||||
print(self.image)
|
try:
|
||||||
return True
|
# Get all faces from images with qualities, landmarks, and embeddings
|
||||||
|
res = self.sdk.get_faces([self.imgs[name]], qualities=True, landmarks=True, embeddings=True)
|
||||||
|
boxes = res.faces
|
||||||
|
print(boxes)
|
||||||
|
for a in boxes:
|
||||||
|
print(box)
|
||||||
|
# box is somehow = y1 / x2 / y2 / x1 for face_recognition .
|
||||||
|
# =
|
||||||
|
# Thats crazy lol. We need to fix that to x1, y1, x2, y2 with origin at top left
|
||||||
|
for b in boxes:
|
||||||
|
self.boxes.append((b[3],b[0],b[1],b[2]))
|
||||||
|
print("found %d boxes for %s" % (len(self.boxes), name) )
|
||||||
|
except Exception as ex:
|
||||||
|
self.errstr = "image processing exception at get_faces: "+str(ex)
|
||||||
|
return '{ "status":222310, "remark":"image processing exception", "guilty_param":"error", "guilty_value":"%s" }' % str(ex)
|
||||||
|
return '{ "status":0, "remark":"OK", "faces":%d, "boxes":%s }' % (len(self.boxes), json.dumps(self.boxes))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Assess the face that was read in
|
# Assess the face that was read in
|
||||||
def processiii(self):
|
def processiii(self):
|
||||||
@ -146,3 +166,34 @@ class Facematch(object):
|
|||||||
def get_scores(self):
|
def get_scores(self):
|
||||||
return json.dumps(self.face_match_json)
|
return json.dumps(self.face_match_json)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
d = Paravision()
|
||||||
|
d.init()
|
||||||
|
|
||||||
|
if sys.argv[1]=="messia":
|
||||||
|
jsonstr = d.load1("pic1", "testimg/messi4.jpg")
|
||||||
|
print(jsonstr)
|
||||||
|
jsonstr = d.detect("pic1")
|
||||||
|
print(jsonstr)
|
||||||
|
|
||||||
|
if sys.argv[1]=="test":
|
||||||
|
d.load1("pic1", "testimg/ox.jpg")
|
||||||
|
d.detect("pic1")
|
||||||
|
|
||||||
|
if sys.argv[1]=="kiosk":
|
||||||
|
jsonstr = d.crowd_vs_govid("pic1", "testimg/ox.jpg", 0, "pic2", "testimg/ox_govid.jpg", 0.25)
|
||||||
|
print(jsonstr)
|
||||||
|
|
||||||
|
if sys.argv[1]=="messi":
|
||||||
|
jsonstr = d.crowd_vs_govid("pic1", "testimg/messi4.jpg", 0, "pic2", "testimg/messi2.jpg", 0)
|
||||||
|
print(jsonstr)
|
||||||
|
|
||||||
|
if sys.argv[1]=="maiden":
|
||||||
|
jsonstr = d.crowd_vs_govid("pic1", "testimg/ironmaiden.jpg", 0, "pic2", "testimg/davemurray.jpg", 0)
|
||||||
|
print(jsonstr)
|
||||||
28
src/seek.py
28
src/seek.py
@ -32,15 +32,14 @@ import base64
|
|||||||
import subprocess
|
import subprocess
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from ukdi import UKDI
|
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
class Seek(UKDI):
|
class Seek():
|
||||||
|
|
||||||
# get this from lsusb -vd VEND:PROD
|
# get this from lsusb -vd VEND:PROD
|
||||||
CAM = None
|
CAM = None
|
||||||
campath = "/home/disp/cam_seek/bin/seek"
|
seekbin = "/home/disp/yoloserv/modules/seek/bin/seek"
|
||||||
X = 1902
|
X = 1902
|
||||||
Y = 1080
|
Y = 1080
|
||||||
min = 0.0
|
min = 0.0
|
||||||
@ -53,7 +52,6 @@ class Seek(UKDI):
|
|||||||
def describe(self):
|
def describe(self):
|
||||||
return "UKDI Seek camera read"
|
return "UKDI Seek camera read"
|
||||||
|
|
||||||
|
|
||||||
def open(self):
|
def open(self):
|
||||||
# does nothing
|
# does nothing
|
||||||
return
|
return
|
||||||
@ -64,7 +62,7 @@ class Seek(UKDI):
|
|||||||
|
|
||||||
# Red the image from a camera as numpy-ready CSV buffer.
|
# Red the image from a camera as numpy-ready CSV buffer.
|
||||||
def hid_read(self):
|
def hid_read(self):
|
||||||
result = subprocess.run([self.campath], stdout=subprocess.PIPE)
|
result = subprocess.run([self.seekbin], stdout=subprocess.PIPE)
|
||||||
rows = result.stdout.split(b'\n')
|
rows = result.stdout.split(b'\n')
|
||||||
print(rows[0],rows[1],rows[3],rows[4])
|
print(rows[0],rows[1],rows[3],rows[4])
|
||||||
self.X = int(rows[0].decode("utf-8"))
|
self.X = int(rows[0].decode("utf-8"))
|
||||||
@ -78,22 +76,30 @@ class Seek(UKDI):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
# return a PNG version of the pic in memory
|
# return a PNG version of the pic in memory
|
||||||
def png(self):
|
def png64(self):
|
||||||
_ , im_arr = cv2.imencode('.png', self.data)
|
_ , im_arr = cv2.imencode('.png', self.data)
|
||||||
img_as_txt = base64.b64encode(im_arr)
|
img_as_txt = base64.b64encode(im_arr)
|
||||||
return b'data:image/png;base64, '+img_as_txt
|
return 'data:image/png;base64, '+img_as_txt.decode()
|
||||||
|
|
||||||
# return a JPG version of the pic in memory
|
# return a JPG version of the pic in memory
|
||||||
def jpg(self):
|
def jpg64(self):
|
||||||
_ , im_arr = cv2.imencode('.jpg', self.data)
|
_ , im_arr = cv2.imencode('.jpg', self.data)
|
||||||
img_as_txt = base64.b64encode(im_arr)
|
img_as_txt = base64.b64encode(im_arr)
|
||||||
return b'data:image/jpeg;base64,'+img_as_txt
|
return 'data:image/jpeg;base64,'+img_as_txt.decode()
|
||||||
|
|
||||||
# return a BMP version of the pic in memory
|
# return a BMP version of the pic in memory
|
||||||
def bmp(self):
|
def bmp64(self):
|
||||||
_ , im_arr = cv2.imencode('.bmp', self.data)
|
_ , im_arr = cv2.imencode('.bmp', self.data)
|
||||||
img_as_txt = base64.b64encode(im_arr)
|
img_as_txt = base64.b64encode(im_arr)
|
||||||
return b'data:image/bmp;base64,'+img_as_txt
|
return 'data:image/bmp;base64,'+img_as_txt.decode()
|
||||||
|
|
||||||
|
|
||||||
|
def img(self,type):
|
||||||
|
_ , im_arr = cv2.imencode(type, self.data)
|
||||||
|
return im_arr
|
||||||
|
|
||||||
|
def numpy(self):
|
||||||
|
return self.data
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|||||||
@ -14,13 +14,15 @@ import copy
|
|||||||
import numpy
|
import numpy
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Yoloserv contains references to a number of packages that do different things.
|
# Yoloserv contains references to a number of packages that do different things.
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
|
||||||
class yoloserv(object):
|
class yoloserv(object):
|
||||||
|
|
||||||
|
READY = 0
|
||||||
|
BUSY = 1
|
||||||
|
OVERRUN = 5
|
||||||
yolo = None
|
yolo = None
|
||||||
device = None
|
device = None
|
||||||
indir = None
|
indir = None
|
||||||
@ -33,7 +35,7 @@ class yoloserv(object):
|
|||||||
ir_camera = None
|
ir_camera = None
|
||||||
devices = []
|
devices = []
|
||||||
points = []
|
points = []
|
||||||
|
state = "READY"
|
||||||
|
|
||||||
# Nature of init depends on the required algotithms listed in /etc/ukdi.conf
|
# Nature of init depends on the required algotithms listed in /etc/ukdi.conf
|
||||||
# eg :: "yolo_devices": "detect_face,facematch"
|
# eg :: "yolo_devices": "detect_face,facematch"
|
||||||
@ -44,11 +46,12 @@ class yoloserv(object):
|
|||||||
# flir - - expensive IR camera
|
# flir - - expensive IR camera
|
||||||
# seek - - "cheap" IR camera
|
# seek - - "cheap" IR camera
|
||||||
# palmvein - - palm vein detection
|
# palmvein - - palm vein detection
|
||||||
|
# @doc Initialisation takes device specs from /etc/ukdi.json
|
||||||
def initialise(self):
|
def initialise(self):
|
||||||
with open("/etc/ukdi.json","r") as f:
|
with open("/etc/ukdi.json","r") as f:
|
||||||
self.conf = json.loads(f.read())
|
self.conf = json.loads(f.read())
|
||||||
|
|
||||||
print("Init yoloserv: %s @ %s %s " % (self.conf["yolo_devices"], self.conf["yolo_indir"], self.conf["yolo_outdir"]) )
|
print("Init yoloserv: %s @ %s %s " % (self.conf["yolo_devices"], self.indir, self.outdir) )
|
||||||
self.devices = self.conf["yolo_devices"].split(",")
|
self.devices = self.conf["yolo_devices"].split(",")
|
||||||
self.indir = self.conf["yolo_indir"]
|
self.indir = self.conf["yolo_indir"]
|
||||||
self.outdir = self.conf["yolo_outdir"]
|
self.outdir = self.conf["yolo_outdir"]
|
||||||
@ -59,11 +62,13 @@ class yoloserv(object):
|
|||||||
from para_facematch import Facematch
|
from para_facematch import Facematch
|
||||||
self.facematcher = Facematch()
|
self.facematcher = Facematch()
|
||||||
self.facematcher.init()
|
self.facematcher.init()
|
||||||
|
|
||||||
if "deepface" in self.devices:
|
if "deepface" in self.devices:
|
||||||
print("Loading deepface facematch...")
|
print("Loading deepface facematch...")
|
||||||
from deepfacex import Deepfacex
|
from deepfacex import Deepfacex
|
||||||
self.facematcher = Deepfacex()
|
self.facematcher = Deepfacex()
|
||||||
self.facematcher.init("dlib","SFace")
|
self.facematcher.init("dlib","SFace")
|
||||||
|
|
||||||
if "face_recognition" in self.devices:
|
if "face_recognition" in self.devices:
|
||||||
print("Loading face_recognition facematch...")
|
print("Loading face_recognition facematch...")
|
||||||
from face_recognitionx import FaceRecognition
|
from face_recognitionx import FaceRecognition
|
||||||
@ -84,9 +89,8 @@ class yoloserv(object):
|
|||||||
self.camera = Camera()
|
self.camera = Camera()
|
||||||
self.camera.init()
|
self.camera.init()
|
||||||
if "seek" in self.devices:
|
if "seek" in self.devices:
|
||||||
print("AAAAAAAAAAA Loading seek IR... [NOT YET IMPLEMETED]")
|
from seek import Seek
|
||||||
self.ircamera = Seek()
|
self.seek = Seek()
|
||||||
self.ircamera.init()
|
|
||||||
if "flir" in self.devices:
|
if "flir" in self.devices:
|
||||||
print("AAAAAAAAAAA Loading flir IR... [NOT YET IMPLEMETED]")
|
print("AAAAAAAAAAA Loading flir IR... [NOT YET IMPLEMETED]")
|
||||||
self.ircamera = Flir()
|
self.ircamera = Flir()
|
||||||
@ -108,13 +112,13 @@ class yoloserv(object):
|
|||||||
print("AAAAAAAAAAA Loading yolov5 object detection...")
|
print("AAAAAAAAAAA Loading yolov5 object detection...")
|
||||||
from yolov5 import Yolov5
|
from yolov5 import Yolov5
|
||||||
self.detector = Yolov5()
|
self.detector = Yolov5()
|
||||||
#self.detector.init(self.conf["yolo_indir"],self.conf["yolo_outdir"])
|
#self.detector.init(self.indir,self.outdir)
|
||||||
|
|
||||||
if "yolov8" in self.devices:
|
if "yolov8" in self.devices:
|
||||||
print("AAAAAAAAAAA Loading yolov8 object detection...")
|
print("AAAAAAAAAAA Loading yolov8 object detection...")
|
||||||
from yolov8 import Yolov8
|
from yolov8 import Yolov8
|
||||||
self.detector = Yolov8()
|
self.detector = Yolov8()
|
||||||
#self.detector.init(self.conf["yolo_indir"],self.conf["yolo_outdir"])
|
#self.detector.init(self.indir,self.outdir)
|
||||||
|
|
||||||
# Intoxication
|
# Intoxication
|
||||||
if "intox" in self.devices:
|
if "intox" in self.devices:
|
||||||
@ -123,14 +127,15 @@ class yoloserv(object):
|
|||||||
self.intox_detector.init()
|
self.intox_detector.init()
|
||||||
|
|
||||||
|
|
||||||
# clear memory of all image data
|
# @doc clear memory of all image data
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
def svc_init(self):
|
def svc_init(self):
|
||||||
self.facematcher.init()
|
self.facematcher.init()
|
||||||
return '{ "status":0, "remark":"OK" }'
|
return '{ "status":0, "remark":"OK" }'
|
||||||
|
|
||||||
# Acquire image - the device used depends on the device list and what actual file was loaded as self.camera
|
# @doc Acquire image -
|
||||||
# @doc acquires an image from the camera (test OK CG 2024-0724)
|
# @doc the device used depends on the device list and what actual file was loaded as self.camera
|
||||||
|
# @doc acquires an image from the camera (test OK CG 2024-0724)
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
def svc_acquire(self,camidx=0):
|
def svc_acquire(self,camidx=0):
|
||||||
self.camera.acquire(camidx)
|
self.camera.acquire(camidx)
|
||||||
@ -180,6 +185,11 @@ class yoloserv(object):
|
|||||||
def svc_load_imgs(self,name1,infile1,name2,infile2):
|
def svc_load_imgs(self,name1,infile1,name2,infile2):
|
||||||
return self.facematcher.load2(name1, self.indir + infile1, name2, self.indir + infile2)
|
return self.facematcher.load2(name1, self.indir + infile1, name2, self.indir + infile2)
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
|
def svc_detect_faces(self,infile):
|
||||||
|
nfaces = self.facematcher.detect_all(self.indir + infile)
|
||||||
|
return '{ "status":0, "remark":"found faces", "count":%d }' % (nfaces)
|
||||||
|
|
||||||
# @doc find all the faces in the named image that was loaded using the above calls (test OK CG 2024-0724)
|
# @doc find all the faces in the named image that was loaded using the above calls (test OK CG 2024-0724)
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
def svc_faces(self,which):
|
def svc_faces(self,which):
|
||||||
@ -325,18 +335,18 @@ class yoloserv(object):
|
|||||||
|
|
||||||
|
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
def irstill(self,ident):
|
def irstill(self,ident,type="png"):
|
||||||
if self.state == self.BUSY:
|
if self.state == self.BUSY:
|
||||||
return '{ "status":9, "remark":"BUSY" }'
|
return '{ "status":9, "remark":"BUSY" }'
|
||||||
self.state = self.BUSY
|
self.state = self.BUSY
|
||||||
self.devices["seek"].open()
|
self.seek.open()
|
||||||
self.devices["seek"].hid_read()
|
self.seek.hid_read()
|
||||||
self.devices["seek"].close()
|
self.seek.close()
|
||||||
self.state = self.READY
|
self.state = self.READY
|
||||||
f = open("/tmp/%s.bmp" % ident, "wb")
|
f = open("/tmp/%s.%s" % (ident,type), "wb")
|
||||||
f.write( self.devices["seek"].bmp() )
|
f.write( self.seek.img(".%s" % type) )
|
||||||
f.close()
|
f.close()
|
||||||
return self.devices["seek"].jpg()
|
return self.seek.png64()
|
||||||
|
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
def yolo(self,ident):
|
def yolo(self,ident):
|
||||||
@ -416,13 +426,13 @@ class yoloserv(object):
|
|||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# Deal with the incoming call parameters
|
# Deal with the incoming call parameters
|
||||||
servport = int(sys.argv[1])
|
servport = int(sys.argv[1])
|
||||||
indir = sys.argv[2]
|
|
||||||
outdir = sys.argv[3]
|
|
||||||
|
|
||||||
# Initialise the webserver
|
# Initialise the webserver
|
||||||
s = yoloserv()
|
s = yoloserv()
|
||||||
s.initialise()
|
s.initialise()
|
||||||
#s.initialise(indir,outdir,weightsfile)
|
s.indir = sys.argv[2]
|
||||||
|
s.outdir = sys.argv[3]
|
||||||
|
|
||||||
cherrypy.config.update({'server.socket_host': '0.0.0.0',
|
cherrypy.config.update({'server.socket_host': '0.0.0.0',
|
||||||
'server.socket_port': servport})
|
'server.socket_port': servport})
|
||||||
cherrypy.quickstart(s, '/')
|
cherrypy.quickstart(s, '/')
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user