diff --git a/image_recognition_face_recognition/doc/1.jpg b/image_recognition_face_recognition/doc/1.jpg new file mode 100644 index 00000000..edf9fdc3 Binary files /dev/null and b/image_recognition_face_recognition/doc/1.jpg differ diff --git a/image_recognition_face_recognition/doc/annotate.png b/image_recognition_face_recognition/doc/annotate.png new file mode 100644 index 00000000..5df901fd Binary files /dev/null and b/image_recognition_face_recognition/doc/annotate.png differ diff --git a/image_recognition_face_recognition/doc/config.png b/image_recognition_face_recognition/doc/config.png new file mode 100644 index 00000000..62b5c19b Binary files /dev/null and b/image_recognition_face_recognition/doc/config.png differ diff --git a/image_recognition_face_recognition/doc/example.png b/image_recognition_face_recognition/doc/example.png new file mode 100644 index 00000000..32a2a5e7 Binary files /dev/null and b/image_recognition_face_recognition/doc/example.png differ diff --git a/image_recognition_face_recognition/doc/test_face.png b/image_recognition_face_recognition/doc/test_face.png new file mode 100644 index 00000000..e1eac45b Binary files /dev/null and b/image_recognition_face_recognition/doc/test_face.png differ diff --git a/image_recognition_face_recognition/scripts/face_recognition_node b/image_recognition_face_recognition/scripts/face_recognition_node index 8e93c573..4af7af76 100755 --- a/image_recognition_face_recognition/scripts/face_recognition_node +++ b/image_recognition_face_recognition/scripts/face_recognition_node @@ -1,5 +1,5 @@ #!/usr/bin/env python - +import math import os import sys import diagnostic_updater @@ -15,14 +15,12 @@ from image_recognition_msgs.msg import ( ) from image_recognition_msgs.srv import Recognize, Annotate from image_recognition_util import image_writer -from image_recognition_face_recognition.facenet_recognition import Facenet_recognition +from image_recognition_face_recognition.facenet_recognition import FacenetRecognition class OpenfaceROS: def __init__( self, - align_path, - net_path, save_images_folder, topic_save_images, service_save_images, @@ -31,9 +29,6 @@ class OpenfaceROS: ): """ OpenfaceROS class that wraps the FaceRecognizer in a ROS node - - :param align_path: dlib face align path - :param net_path: openface neural net path :param save_images_folder: path where to store the images :param topic_save_images: whether to save images originated from image topic callback :param service_save_images: whether to save images originated from a service call @@ -42,7 +37,7 @@ class OpenfaceROS: """ # Openface ROS - self._face_recognizer = Facenet_recognition() + self._face_recognizer = FacenetRecognition() self._save_images_folder = save_images_folder self._topic_save_images = topic_save_images self._service_save_images = service_save_images @@ -52,8 +47,6 @@ class OpenfaceROS: self._bridge = CvBridge() self._annotate_srv = rospy.Service("annotate", Annotate, self._annotate_srv) self._recognize_srv = rospy.Service("recognize", Recognize, self._recognize_srv) - self._clear_srv = rospy.Service("clear", Empty, self._clear_srv) - self._image_subscriber = rospy.Subscriber("image", Image, self._image_callback) self._recognitions_publisher = rospy.Publisher("recognitions", Recognitions, queue_size=10) @@ -65,8 +58,6 @@ class OpenfaceROS: self._result_image_publisher = rospy.Publisher("result_image", Image, queue_size=10) rospy.loginfo("OpenfaceROS initialized:") - rospy.loginfo(" - dlib_align_path=%s", align_path) - rospy.loginfo(" - openface_net_path=%s", net_path) rospy.loginfo(" - save_images_folder=%s", save_images_folder) rospy.loginfo(" - topic_save_images=%s", topic_save_images) rospy.loginfo(" - service_save_images=%s", service_save_images) @@ -104,16 +95,6 @@ class OpenfaceROS: return {} - def _clear_srv(self, req): - """ - Service to clear the trained faces - - :param req: Empty - :return: Empty - """ - self._face_recognizer.clear_trained_faces() - return {} - def _get_recognitions(self, image_msg, save_images, publish_images): # Convert to opencv image """ @@ -133,8 +114,6 @@ class OpenfaceROS: images = [] labels = [] - # Added this line for testing with some labels - labels = ["jason", "kona"] # Call facebet neural network in two stages face_recognitions = self._face_recognizer.face_detection(bgr_image) @@ -146,9 +125,7 @@ class OpenfaceROS: # rospy.loginfo("Face recognitions: %s", face_recognitions) label_idx = 0 - for fr in face_recognitions: - import math - + for fr in face_recognitions: face_recognition = [math.floor(xi) for xi in fr] if save_images: label = labels_pred[label_idx] @@ -220,52 +197,30 @@ class OpenfaceROS: # Service response return {"recognitions": recognitions} - def restore_trained_faces(self, file_name): - self._face_recognizer.restore_trained_faces(file_name) - if __name__ == "__main__": rospy.init_node("face_recognition") try: - dlib_shape_predictor_path = os.path.expanduser( - rospy.get_param( - "~align_path", - "~/openface/models/dlib/shape_predictor_68_face_landmarks.dat", - ) - ) - openface_neural_network_path = os.path.expanduser( - rospy.get_param("~net_path", "~/openface/models/openface/nn4.small2.v1.t7") - ) save_images = rospy.get_param("~save_images", True) topic_save_images = rospy.get_param("~topic_save_images", False) service_save_images = rospy.get_param("~service_save_images", True) topic_publish_result_image = rospy.get_param("~topic_publish_result_image", True) service_publish_result_image = rospy.get_param("~service_publish_result_image", True) - db = rospy.get_param("~db", None) - if db: - db = os.path.expanduser(db) - save_images_folder = None if save_images: - save_images_folder = os.path.expanduser(rospy.get_param("~save_images_folder", "/tmp/openface")) + save_images_folder = os.path.expanduser(rospy.get_param("~save_images_folder", "/tmp/facenet_saved_images")) except KeyError as e: rospy.logerr("Parameter %s not found" % e) sys.exit(1) image_recognition_openface = OpenfaceROS( - dlib_shape_predictor_path, - openface_neural_network_path, save_images_folder, topic_save_images, service_save_images, topic_publish_result_image, service_publish_result_image, ) - if db: - rospy.loginfo("loading face database from %s", db) - image_recognition_openface.restore_trained_faces(db) - updater = diagnostic_updater.Updater() updater.setHardwareID("none") updater.add(diagnostic_updater.Heartbeat()) diff --git a/image_recognition_face_recognition/scripts/get_face_recognition b/image_recognition_face_recognition/scripts/get_face_recognition index b34c6e2a..bc10361a 100755 --- a/image_recognition_face_recognition/scripts/get_face_recognition +++ b/image_recognition_face_recognition/scripts/get_face_recognition @@ -1,26 +1,17 @@ #!/usr/bin/env python from __future__ import print_function - -import argparse - -import cv2 from image_recognition_msgs.msg import Recognition -from image_recognition_face_recognition.facenet_recognition import Facenet_recognition -# Assign description to the help doc -import math +from image_recognition_face_recognition.facenet_recognition import FacenetRecognition from sensor_msgs.msg import RegionOfInterest - from image_recognition_util import image_writer +import argparse +import math +import cv2 parser = argparse.ArgumentParser(description='Get face recognitions') # Add arguments parser.add_argument('image', type=str, help='Image') -parser.add_argument('-k', '--align_path', type=str, help='DLib Align path', required=False, - default="~/openface/models/dlib/shape_predictor_68_face_landmarks.dat") -parser.add_argument('-s', '--net_path', type=str, help='Openface neural network path', required=False, - default='~/openface/models/openface/nn4.small2.v1.t7') -parser.add_argument('-d', '--db', type=argparse.FileType('r'), help='Load already trained faces db from file') parser.add_argument( '-v', '--verbose', help="Increase output verbosity", action="store_true") args = parser.parse_args() @@ -29,10 +20,7 @@ args = parser.parse_args() img = cv2.imread(args.image) # Create openface interface -face_recognizer = Facenet_recognition() - -if args.db: - face_recognizer.restore_trained_faces(args.db) +face_recognizer = FacenetRecognition() recognized_faces = face_recognizer.face_detection(img) print(recognized_faces) @@ -51,4 +39,4 @@ for fr in recognized_faces: ) annotated_original_image = image_writer.get_annotated_cv_image(img, recognitions) cv2.imshow("result", annotated_original_image) -cv2.waitKey() +cv2.waitKey(1000) \ No newline at end of file diff --git a/image_recognition_face_recognition/src/image_recognition_face_recognition/facenet_recognition.py b/image_recognition_face_recognition/src/image_recognition_face_recognition/facenet_recognition.py index dfe04c68..0fea8351 100644 --- a/image_recognition_face_recognition/src/image_recognition_face_recognition/facenet_recognition.py +++ b/image_recognition_face_recognition/src/image_recognition_face_recognition/facenet_recognition.py @@ -1,11 +1,14 @@ -import psutil +from typing import List, Tuple, Union import rospy import torch from facenet_pytorch import MTCNN, InceptionResnetV1 -from facenet_pytorch.models.mtcnn import MTCNN +import numpy as np class TrainedFace: + """ + This class serves as a custom struct to store information of people we recognized + """ def __init__(self, label): """ A custom struct to store the names and the embedded representations (tensors) of people @@ -26,41 +29,46 @@ def get_representations(self): return self.representations -class Facenet_recognition: +class FacenetRecognition: + """ + This class handles the recognition using the Facenet model. + """ def __init__(self): """ Constructor for the list which contains the TrainedFace structure """ self._trained_faces = [] - - def print_memory_usage(self, label): - """ - Prints the memory usage as a metric - """ - mem_info = psutil.Process().memory_info() - rospy.loginfo( - f"{label: <10}: vms={mem_info.vms / (1024 * 1024)}, rss={mem_info.rss / (1024 * 1024)}" + self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + rospy.loginfo(f"Running on device: {self.device}") + self.mtcnn = MTCNN( + keep_all=True, + image_size=160, + margin=0, + min_face_size=20, + thresholds=[0.6, 0.7, 0.7], + factor=0.709, + post_process=True, + device=self.device, ) - - def face_detection(self, img): + + def face_detection(self, img: np.ndarray) -> List[Union[Tuple[float, float, float, float], None]]: """ Returns the index of the trained face :param img: inpute image :return: the bounding boxes of coordinations of the faces it detects """ - detector = MTCNN(keep_all=True) + # Keep the landmarks for future use - boxes, _, landmarks = detector.detect(img, landmarks=True) + boxes, _, landmarks = self.mtcnn.detect(img, landmarks=True) return boxes - def _get_dists(self, embeddings): + def _get_dists(self, embeddings: List[torch.Tensor]) -> Tuple[List[float], List[str]]: """ Returns min l2 distance of the identified face (compared with the database of know faces) :param embeddings: the embedded representation(s) of a face(s) - :return: the min distance(s) of the embedded vector compared with the database faces - :return: the corresponding label(s) + :return: the min distance(s) of the embedded vector compared with the database faces from the corresponding label(s) """ dist_per_emb_final = [] dist = [] @@ -100,7 +108,7 @@ def _get_dists(self, embeddings): return min_value_list_per_emb, labelling - def detection_recognition(self, img, labels, train): + def detection_recognition(self, img: np.ndarray, labels: List[str], train: bool) -> Tuple[List[float], List[str]]: """ Returns min l2 distance of the identified face (compared with the database of know faces) @@ -112,38 +120,20 @@ def detection_recognition(self, img, labels, train): :return: the min distance(s) of the embedded vector compared with the database faces :return: the corresponding label(s) """ - device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") - rospy.loginfo(f"Running on device: {device}") + resnet = InceptionResnetV1(pretrained="vggface2").eval().to(self.device) - mtcnn = MTCNN( - keep_all=True, - image_size=160, - margin=0, - min_face_size=20, - thresholds=[0.6, 0.7, 0.7], - factor=0.709, - post_process=True, - device=device, - ) - - resnet = InceptionResnetV1(pretrained="vggface2").eval().to(device) - - x_aligned = mtcnn(img) - print(type(x_aligned)) - print(x_aligned.size(), "x_aligned size (1st NN output)") + x_aligned = self.mtcnn(img) x_aligned = x_aligned.cuda() # add this line embeddings = resnet(x_aligned).detach().cpu() rospy.loginfo(f"{embeddings.size()}, {type(embeddings)}, embeddings size") if not self._trained_faces: - nam = 0 - for emb in embeddings: + for nam, emb in enumerate(embeddings): index = self._get_trained_face_index(labels[nam]) if index == -1: self._trained_faces.append(TrainedFace(labels[nam])) self._trained_faces[index].representations.append(emb) - nam = nam + 1 # try: dist, labelling = self._get_dists(embeddings) @@ -152,16 +142,14 @@ def detection_recognition(self, img, labels, train): rospy.loginfo(f"{labels[0]}, {labelling}, label[0],labelling") if train: - idx_label = 0 - for emb in embeddings: + for idx_label, emb in enumerate(embeddings): rospy.loginfo(f"{idx_label}, idx_label") self.train(emb, labelling[idx_label]) - idx_label = idx_label + 1 # except: rospy.loginfo(f"{len(self._trained_faces)}") return dist, labelling - def _get_trained_face_index(self, label): + def _get_trained_face_index(self, label: str) -> int: """ Returns the index of the trained face @@ -173,7 +161,7 @@ def _get_trained_face_index(self, label): return i return -1 - def train(self, face_representation, name): + def train(self, face_representation: np.ndarray, name: str) -> None: """ Adds a face to the trained faces, creates a vector representation and adds this @@ -190,12 +178,4 @@ def train(self, face_representation, name): for trained_face in self._trained_faces: rospy.loginfo( f"Label: {trained_face.get_label()}, Representations: {len(trained_face.get_representations())}" - ) - - -if __name__ == "__main__": - """ - Runs the whole code. It is not needed if it runs from a ros node. - """ - face_detection = Facenet_recognition() - face_detection.detection_recognition() + ) \ No newline at end of file