Skip to content

Commit

Permalink
Initial body size measurement attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
Miguelmelon committed May 27, 2024
1 parent 29d79a1 commit 7a5b0a6
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 13 deletions.
37 changes: 28 additions & 9 deletions people_tracking_v2/scripts/pose_estimation_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from people_tracking.yolo_pose_wrapper import YoloPoseWrapper


class PoseEstimationNode:
def __init__(
self,
Expand Down Expand Up @@ -49,7 +48,7 @@ def __init__(
# uses a thread per pub/sub and service. Since the openpose wrapper is created in the main thread, we have
# to communicate our openpose requests (inputs) to the main thread where the request is processed by the
# openpose wrapper (step 1).
# We have a separate spin loop in the main thead that checks whether there are items in the input q and
# We have a separate spin loop in the main thread that checks whether there are items in the input q and
# processes these using the Openpose wrapper (step 2).
# When the processing has finished, we add the result in the corresponding output queue (specified by the
# request in the input queue) (step 3).
Expand Down Expand Up @@ -86,6 +85,18 @@ def __init__(

def _image_callback(self, image_msg):
self._input_q.put((image_msg, self._topic_save_images, self._topic_publish_result_image, False))
recognitions, result_image, pose_details = self._wrapper.detect_poses(self._bridge.imgmsg_to_cv2(image_msg, "bgr8"))

# Calculate distances
for pose in pose_details:
if "LShoulder" in pose and "RShoulder" in pose:
shoulder_distance = self._wrapper.compute_distance(pose["LShoulder"], pose["RShoulder"])
rospy.loginfo(f"Shoulder Distance: {shoulder_distance:.2f}")

if "LHip" in pose and "RHip" in pose:
hip_distance = self._wrapper.compute_distance(pose["LHip"], pose["RHip"])
rospy.loginfo(f"Hip Distance: {hip_distance:.2f}")

self._recognitions_publisher.publish(
Recognitions(header=image_msg.header, recognitions=self._subscriber_output_q.get())
)
Expand All @@ -109,13 +120,13 @@ def _get_recognitions(self, image_msg, save_images, publish_images):
rospy.logerr(f"Could not convert to opencv image: {e}")
return []

recognitions, result_image = self._wrapper.detect_poses(bgr_image)
recognitions, result_image, pose_details = self._wrapper.detect_poses(bgr_image)

# Log the number of poses detected
#if recognitions:
# rospy.loginfo(f"Detected {len(recognitions)} poses")
#else:
# rospy.loginfo("No poses detected")
if recognitions:
rospy.loginfo(f"Detected {len(recognitions)} poses")
else:
rospy.loginfo("No poses detected")

# Write images
if save_images:
Expand All @@ -125,7 +136,16 @@ def _get_recognitions(self, image_msg, save_images, publish_images):
if publish_images:
self._result_image_publisher.publish(self._bridge.cv2_to_imgmsg(result_image, "bgr8"))

# Service response
# Calculate distances and log them
for pose in pose_details:
if "LShoulder" in pose and "RShoulder" in pose:
shoulder_distance = self._wrapper.compute_distance(pose["LShoulder"], pose["RShoulder"])
rospy.loginfo(f"Shoulder Distance: {shoulder_distance:.2f}")

if "LHip" in pose and "RHip" in pose:
hip_distance = self._wrapper.compute_distance(pose["LHip"], pose["RHip"])
rospy.loginfo(f"Hip Distance: {hip_distance:.2f}")

return recognitions

def spin(self, check_master: bool = False):
Expand Down Expand Up @@ -153,7 +173,6 @@ def spin(self, check_master: bool = False):

return 0


if __name__ == "__main__":
rospy.init_node("pose_estimation")

Expand Down
16 changes: 12 additions & 4 deletions people_tracking_v2/src/people_tracking/yolo_pose_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

ALLOWED_DEVICE_TYPES = ["cpu", "cuda"]


class YoloPoseWrapper:
def __init__(self, model_name: str = "yolov8n-pose.pt", device: str = "cuda:0", verbose: bool = False):
try:
Expand Down Expand Up @@ -51,21 +50,23 @@ def __init__(self, model_name: str = "yolov8n-pose.pt", device: str = "cuda:0",

self._model = YOLO(model=model_name, task="pose", verbose=verbose).to(device)

def detect_poses(self, image: np.ndarray, conf: float = 0.25) -> Tuple[List[Recognition], np.ndarray]:
def detect_poses(self, image: np.ndarray, conf: float = 0.25) -> Tuple[List[Recognition], np.ndarray, List[dict]]:
# Detect poses
# This is a wrapper of predict, but we might want to use track
results: List[Results] = self._model(image, conf=conf) # Accepts a list

if not results:
return [], image
return [], image, []

recognitions = []
pose_details = []
result = results[0] # Only using
overlayed_image = result.plot(boxes=False)

body_parts = list(BODY_PARTS.values())

for i, person in enumerate(result.keypoints.cpu().numpy()):
pose = {}
for j, (x, y, pred_conf) in enumerate(person.data[0]):
if pred_conf > 0 and x > 0 and y > 0:
recognitions.append(
Expand All @@ -82,5 +83,12 @@ def detect_poses(self, image: np.ndarray, conf: float = 0.25) -> Tuple[List[Reco
),
)
)
pose[body_parts[j]] = (x, y)
pose_details.append(pose)

return recognitions, overlayed_image, pose_details

return recognitions, overlayed_image
@staticmethod
def compute_distance(point1: Tuple[float, float], point2: Tuple[float, float]) -> float:
"""Compute the Euclidean distance between two points."""
return np.linalg.norm(np.array(point1) - np.array(point2))

0 comments on commit 7a5b0a6

Please sign in to comment.