Skip to content

Commit

Permalink
updated scripts and src facenet model according to code review
Browse files Browse the repository at this point in the history
  • Loading branch information
iasonth95 committed Mar 12, 2024
1 parent bdce167 commit 2a6a420
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 122 deletions.
Binary file added image_recognition_face_recognition/doc/1.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added image_recognition_face_recognition/doc/config.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
55 changes: 5 additions & 50 deletions image_recognition_face_recognition/scripts/face_recognition_node
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python

import math
import os
import sys
import diagnostic_updater
Expand All @@ -15,14 +15,12 @@ from image_recognition_msgs.msg import (
)
from image_recognition_msgs.srv import Recognize, Annotate
from image_recognition_util import image_writer
from image_recognition_face_recognition.facenet_recognition import Facenet_recognition
from image_recognition_face_recognition.facenet_recognition import FacenetRecognition


class OpenfaceROS:
def __init__(
self,
align_path,
net_path,
save_images_folder,
topic_save_images,
service_save_images,
Expand All @@ -31,9 +29,6 @@ class OpenfaceROS:
):
"""
OpenfaceROS class that wraps the FaceRecognizer in a ROS node
:param align_path: dlib face align path
:param net_path: openface neural net path
:param save_images_folder: path where to store the images
:param topic_save_images: whether to save images originated from image topic callback
:param service_save_images: whether to save images originated from a service call
Expand All @@ -42,7 +37,7 @@ class OpenfaceROS:
"""

# Openface ROS
self._face_recognizer = Facenet_recognition()
self._face_recognizer = FacenetRecognition()
self._save_images_folder = save_images_folder
self._topic_save_images = topic_save_images
self._service_save_images = service_save_images
Expand All @@ -52,8 +47,6 @@ class OpenfaceROS:
self._bridge = CvBridge()
self._annotate_srv = rospy.Service("annotate", Annotate, self._annotate_srv)
self._recognize_srv = rospy.Service("recognize", Recognize, self._recognize_srv)
self._clear_srv = rospy.Service("clear", Empty, self._clear_srv)

self._image_subscriber = rospy.Subscriber("image", Image, self._image_callback)
self._recognitions_publisher = rospy.Publisher("recognitions", Recognitions, queue_size=10)

Expand All @@ -65,8 +58,6 @@ class OpenfaceROS:
self._result_image_publisher = rospy.Publisher("result_image", Image, queue_size=10)

rospy.loginfo("OpenfaceROS initialized:")
rospy.loginfo(" - dlib_align_path=%s", align_path)
rospy.loginfo(" - openface_net_path=%s", net_path)
rospy.loginfo(" - save_images_folder=%s", save_images_folder)
rospy.loginfo(" - topic_save_images=%s", topic_save_images)
rospy.loginfo(" - service_save_images=%s", service_save_images)
Expand Down Expand Up @@ -104,16 +95,6 @@ class OpenfaceROS:

return {}

def _clear_srv(self, req):
"""
Service to clear the trained faces
:param req: Empty
:return: Empty
"""
self._face_recognizer.clear_trained_faces()
return {}

def _get_recognitions(self, image_msg, save_images, publish_images):
# Convert to opencv image
"""
Expand All @@ -133,8 +114,6 @@ class OpenfaceROS:

images = []
labels = []
# Added this line for testing with some labels
labels = ["jason", "kona"]

# Call facebet neural network in two stages
face_recognitions = self._face_recognizer.face_detection(bgr_image)
Expand All @@ -146,9 +125,7 @@ class OpenfaceROS:
# rospy.loginfo("Face recognitions: %s", face_recognitions)

label_idx = 0
for fr in face_recognitions:
import math

for fr in face_recognitions:
face_recognition = [math.floor(xi) for xi in fr]
if save_images:
label = labels_pred[label_idx]
Expand Down Expand Up @@ -220,52 +197,30 @@ class OpenfaceROS:
# Service response
return {"recognitions": recognitions}

def restore_trained_faces(self, file_name):
self._face_recognizer.restore_trained_faces(file_name)


if __name__ == "__main__":
rospy.init_node("face_recognition")
try:
dlib_shape_predictor_path = os.path.expanduser(
rospy.get_param(
"~align_path",
"~/openface/models/dlib/shape_predictor_68_face_landmarks.dat",
)
)
openface_neural_network_path = os.path.expanduser(
rospy.get_param("~net_path", "~/openface/models/openface/nn4.small2.v1.t7")
)
save_images = rospy.get_param("~save_images", True)
topic_save_images = rospy.get_param("~topic_save_images", False)
service_save_images = rospy.get_param("~service_save_images", True)
topic_publish_result_image = rospy.get_param("~topic_publish_result_image", True)
service_publish_result_image = rospy.get_param("~service_publish_result_image", True)

db = rospy.get_param("~db", None)
if db:
db = os.path.expanduser(db)

save_images_folder = None
if save_images:
save_images_folder = os.path.expanduser(rospy.get_param("~save_images_folder", "/tmp/openface"))
save_images_folder = os.path.expanduser(rospy.get_param("~save_images_folder", "/tmp/facenet_saved_images"))
except KeyError as e:
rospy.logerr("Parameter %s not found" % e)
sys.exit(1)

image_recognition_openface = OpenfaceROS(
dlib_shape_predictor_path,
openface_neural_network_path,
save_images_folder,
topic_save_images,
service_save_images,
topic_publish_result_image,
service_publish_result_image,
)
if db:
rospy.loginfo("loading face database from %s", db)
image_recognition_openface.restore_trained_faces(db)

updater = diagnostic_updater.Updater()
updater.setHardwareID("none")
updater.add(diagnostic_updater.Heartbeat())
Expand Down
24 changes: 6 additions & 18 deletions image_recognition_face_recognition/scripts/get_face_recognition
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
#!/usr/bin/env python
from __future__ import print_function

import argparse

import cv2
from image_recognition_msgs.msg import Recognition
from image_recognition_face_recognition.facenet_recognition import Facenet_recognition
# Assign description to the help doc
import math
from image_recognition_face_recognition.facenet_recognition import FacenetRecognition
from sensor_msgs.msg import RegionOfInterest

from image_recognition_util import image_writer
import argparse
import math
import cv2

parser = argparse.ArgumentParser(description='Get face recognitions')

# Add arguments
parser.add_argument('image', type=str, help='Image')
parser.add_argument('-k', '--align_path', type=str, help='DLib Align path', required=False,
default="~/openface/models/dlib/shape_predictor_68_face_landmarks.dat")
parser.add_argument('-s', '--net_path', type=str, help='Openface neural network path', required=False,
default='~/openface/models/openface/nn4.small2.v1.t7')
parser.add_argument('-d', '--db', type=argparse.FileType('r'), help='Load already trained faces db from file')
parser.add_argument(
'-v', '--verbose', help="Increase output verbosity", action="store_true")
args = parser.parse_args()
Expand All @@ -29,10 +20,7 @@ args = parser.parse_args()
img = cv2.imread(args.image)

# Create openface interface
face_recognizer = Facenet_recognition()

if args.db:
face_recognizer.restore_trained_faces(args.db)
face_recognizer = FacenetRecognition()

recognized_faces = face_recognizer.face_detection(img)
print(recognized_faces)
Expand All @@ -51,4 +39,4 @@ for fr in recognized_faces:
)
annotated_original_image = image_writer.get_annotated_cv_image(img, recognitions)
cv2.imshow("result", annotated_original_image)
cv2.waitKey()
cv2.waitKey(1000)
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import psutil
from typing import List, Tuple, Union
import rospy
import torch
from facenet_pytorch import MTCNN, InceptionResnetV1
from facenet_pytorch.models.mtcnn import MTCNN
import numpy as np


class TrainedFace:
"""
This class serves as a custom struct to store information of people we recognized
"""
def __init__(self, label):
"""
A custom struct to store the names and the embedded representations (tensors) of people
Expand All @@ -26,41 +29,46 @@ def get_representations(self):
return self.representations


class Facenet_recognition:
class FacenetRecognition:
"""
This class handles the recognition using the Facenet model.
"""
def __init__(self):
"""
Constructor for the list which contains the TrainedFace structure
"""
self._trained_faces = []

def print_memory_usage(self, label):
"""
Prints the memory usage as a metric
"""
mem_info = psutil.Process().memory_info()
rospy.loginfo(
f"{label: <10}: vms={mem_info.vms / (1024 * 1024)}, rss={mem_info.rss / (1024 * 1024)}"
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
rospy.loginfo(f"Running on device: {self.device}")
self.mtcnn = MTCNN(
keep_all=True,
image_size=160,
margin=0,
min_face_size=20,
thresholds=[0.6, 0.7, 0.7],
factor=0.709,
post_process=True,
device=self.device,
)

def face_detection(self, img):
def face_detection(self, img: np.ndarray) -> List[Union[Tuple[float, float, float, float], None]]:
"""
Returns the index of the trained face
:param img: inpute image
:return: the bounding boxes of coordinations of the faces it detects
"""
detector = MTCNN(keep_all=True)

# Keep the landmarks for future use
boxes, _, landmarks = detector.detect(img, landmarks=True)
boxes, _, landmarks = self.mtcnn.detect(img, landmarks=True)
return boxes

def _get_dists(self, embeddings):
def _get_dists(self, embeddings: List[torch.Tensor]) -> Tuple[List[float], List[str]]:
"""
Returns min l2 distance of the identified face (compared with the database of know faces)
:param embeddings: the embedded representation(s) of a face(s)
:return: the min distance(s) of the embedded vector compared with the database faces
:return: the corresponding label(s)
:return: the min distance(s) of the embedded vector compared with the database faces from the corresponding label(s)
"""
dist_per_emb_final = []
dist = []
Expand Down Expand Up @@ -100,7 +108,7 @@ def _get_dists(self, embeddings):

return min_value_list_per_emb, labelling

def detection_recognition(self, img, labels, train):
def detection_recognition(self, img: np.ndarray, labels: List[str], train: bool) -> Tuple[List[float], List[str]]:
"""
Returns min l2 distance of the identified face (compared with the database of know faces)
Expand All @@ -112,38 +120,20 @@ def detection_recognition(self, img, labels, train):
:return: the min distance(s) of the embedded vector compared with the database faces
:return: the corresponding label(s)
"""
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
rospy.loginfo(f"Running on device: {device}")
resnet = InceptionResnetV1(pretrained="vggface2").eval().to(self.device)

mtcnn = MTCNN(
keep_all=True,
image_size=160,
margin=0,
min_face_size=20,
thresholds=[0.6, 0.7, 0.7],
factor=0.709,
post_process=True,
device=device,
)

resnet = InceptionResnetV1(pretrained="vggface2").eval().to(device)

x_aligned = mtcnn(img)
print(type(x_aligned))
print(x_aligned.size(), "x_aligned size (1st NN output)")
x_aligned = self.mtcnn(img)
x_aligned = x_aligned.cuda() # add this line
embeddings = resnet(x_aligned).detach().cpu()
rospy.loginfo(f"{embeddings.size()}, {type(embeddings)}, embeddings size")

if not self._trained_faces:
nam = 0
for emb in embeddings:
for nam, emb in enumerate(embeddings):
index = self._get_trained_face_index(labels[nam])
if index == -1:
self._trained_faces.append(TrainedFace(labels[nam]))

self._trained_faces[index].representations.append(emb)
nam = nam + 1

# try:
dist, labelling = self._get_dists(embeddings)
Expand All @@ -152,16 +142,14 @@ def detection_recognition(self, img, labels, train):
rospy.loginfo(f"{labels[0]}, {labelling}, label[0],labelling")

if train:
idx_label = 0
for emb in embeddings:
for idx_label, emb in enumerate(embeddings):
rospy.loginfo(f"{idx_label}, idx_label")
self.train(emb, labelling[idx_label])
idx_label = idx_label + 1
# except:
rospy.loginfo(f"{len(self._trained_faces)}")
return dist, labelling

def _get_trained_face_index(self, label):
def _get_trained_face_index(self, label: str) -> int:
"""
Returns the index of the trained face
Expand All @@ -173,7 +161,7 @@ def _get_trained_face_index(self, label):
return i
return -1

def train(self, face_representation, name):
def train(self, face_representation: np.ndarray, name: str) -> None:
"""
Adds a face to the trained faces, creates a vector representation and adds this
Expand All @@ -190,12 +178,4 @@ def train(self, face_representation, name):
for trained_face in self._trained_faces:
rospy.loginfo(
f"Label: {trained_face.get_label()}, Representations: {len(trained_face.get_representations())}"
)


if __name__ == "__main__":
"""
Runs the whole code. It is not needed if it runs from a ros node.
"""
face_detection = Facenet_recognition()
face_detection.detection_recognition()
)

0 comments on commit 2a6a420

Please sign in to comment.