Skip to content

Commit

Permalink
Added y_dependency distance
Browse files Browse the repository at this point in the history
Signed-off-by: Ken Haagh <[email protected]>
  • Loading branch information
KenH2 committed Dec 12, 2023
1 parent 231d9d0 commit 254039b
Showing 1 changed file with 50 additions and 52 deletions.
102 changes: 50 additions & 52 deletions people_tracking/src/people_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def callback_persons(self, data: DetectedPerson) -> None:
self.update_target(nr_batch)
# rospy.loginfo([person.nr_batch for person in self.detections])
###THRESHOLD FILE = SAME ####

@staticmethod
def element_exists(lst, element):
""" Check if element is in list.
Expand All @@ -241,7 +242,6 @@ def element_exists(lst, element):
return True, idx
except ValueError: # If element is not in the list
return False, None

@staticmethod
def euclidean_distance(point1, point2):
""" Calculate the Euclidean distance between two points.
Expand Down Expand Up @@ -284,7 +284,7 @@ def get_distance_hocs(self, hoc_check, hocs_existing):
s = [self.euclidean_distance(hoc_check[32:64], hoc[32:64]) for hoc in hocs_existing]
# v = [self.euclidean_distance(hoc_check[64:], hoc[64:]) for hoc in hocs_existing]

hsv = zip(h, s) # , v]
hsv = zip(h, s)#, v]
distances = [0.8 * h + 0.2 * s for h, s in hsv]
return min(distances)

Expand All @@ -302,20 +302,17 @@ def check_hoc_data(self, detection, tracked_hocs):
if any(x is not None for x in detection.colour_vectors): # There is HoC data
flag_hoc = True

distance_hoc = [self.get_distance_hocs(detection.colour_vectors[person_idx], tracked_hocs) for person_idx in
range(detection.nr_persons)]
distance_hoc = [self.get_distance_hocs(detection.colour_vectors[person_idx], tracked_hocs) for person_idx in range(detection.nr_persons)]

if any([value < HOC_THRESHOLD for value in distance_hoc]): # Check if any of the data meets the threshold
# Normalize data
max_distance_hoc = max([distance for distance in distance_hoc if
distance < HOC_THRESHOLD]) # get max distance without invalid entries
max_distance_hoc = max([distance for distance in distance_hoc if distance < HOC_THRESHOLD]) # get max distance without invalid entries
if 0 == max_distance_hoc or len(distance_hoc) <= 1:
norm_hoc = [0 for _ in distance_hoc]
else:
norm_hoc = [distance / max_distance_hoc if distance < HOC_THRESHOLD else 2 for distance in
distance_hoc]
norm_hoc = [distance / max_distance_hoc if distance < HOC_THRESHOLD else 2 for distance in distance_hoc]

else: # All values are invalid, thus max normalized distance
else: # All values are invalid, thus max normalized distance
norm_hoc = [2] * detection.nr_persons

else: # There is no HoC data
Expand Down Expand Up @@ -395,14 +392,24 @@ def predict_location(self, data, target_time):

return (x, y, z)

@staticmethod
def distance_positions(p1, p2, y_weight):
"""
Calculate the distance between two points.
:param p1: point to compare against (x,y,z)
:param p2: new point
:param y_weight: how important is the y value [0-1]
:return: distance
"""
return np.sqrt((p2[0] - p1[0]) ** 2 + y_weight*(p2[1] - p1[1]) ** 2 + (p2[2] - p1[2]) ** 2)
def check_da_data(self, new_detection, previous_da_detection):
"""
:param new_detection: detection to calculate the distance per measurement
:param previous_da_detection: measurement to calculate the distance from (aka the previous known target location).
:return:
"""
#ToDO make DA_THRESHOLD Time dependent, make distance calc x,y,z weighted
DA_THRESHOLD = 150
DA_THRESHOLD = 50/0.08 # per unit time

flag_da = True

Expand All @@ -412,21 +419,20 @@ def check_da_data(self, new_detection, previous_da_detection):
else:
predicted_location = (0, 0, 0)

coords_detections = [(new_detection.x_positions[person_idx], new_detection.y_positions[person_idx],
new_detection.z_positions[person_idx]) for person_idx in range(new_detection.nr_persons)]
distance_da = [self.euclidean_distance(predicted_location, detection) for detection in coords_detections]
coords_detections = [(new_detection.x_positions[person_idx], new_detection.y_positions[person_idx], new_detection.z_positions[person_idx]) for person_idx in range(new_detection.nr_persons)]
distance_da = [self.distance_positions(predicted_location, detection, 0.5) for detection in coords_detections]

if any([value < DA_THRESHOLD for value in distance_da]):
delta_t = new_detection.time - previous_da_detection[-1].time
if any([value < DA_THRESHOLD * delta_t for value in distance_da]):

# Normalize data
max_distance = max([distance for distance in distance_da if
distance < DA_THRESHOLD]) # get max distance without invalid entries
max_distance = max([distance for distance in distance_da if distance < DA_THRESHOLD * delta_t]) # get max distance without invalid entries
if 0 == max_distance or len(distance_da) <= 1:
norm_da = [0 for _ in distance_da]
else:
norm_da = [distance / max_distance if distance < DA_THRESHOLD else 2 for distance in distance_da]
norm_da = [distance / max_distance if distance < DA_THRESHOLD * delta_t else 2 for distance in distance_da]

else: # All data is invalid
else: # All data is invalid
norm_da = [2] * new_detection.nr_persons

return flag_da, norm_da
Expand Down Expand Up @@ -455,12 +461,9 @@ def get_target_value(self, new_detection, tracked_hocs, previous_da_detections,
for person in range(new_detection.nr_persons)]

idx_target = combined.index(min(combined))
valid = True if min(
combined) <= 1 else False # combined is larger than 1 if either to many of the targets have invalid measurements or the most important one is invalid.
valid = True if min(combined) <= 1 else False # combined is larger than 1 if either to many of the targets have invalid measurements or the most important one is invalid.

self.target_get_values.append(
[new_detection.nr_batch, flag_face, faces[idx_target], min(faces), flag_hoc, norm_hoc[idx_target],
min(norm_hoc), flag_da, norm_da[idx_target], min(norm_da)])
# self.target_get_values.append([new_detection.nr_batch, flag_face, faces[idx_target], min(faces), flag_hoc, norm_hoc[idx_target], min(norm_hoc), flag_da, norm_da[idx_target], min(norm_da)])

return idx_target, valid

Expand All @@ -470,6 +473,7 @@ def add_approved_target(self, measurement, idx_target, valid):
nr_batch = measurement.nr_batch
time = measurement.time


if idx_target is None:
x = self.approved_targets[-1].x
y = self.approved_targets[-1].y
Expand Down Expand Up @@ -544,23 +548,21 @@ def get_tracked_hocs(self, idx_tracked=None):
def update_target(self, from_batch):
""" Update the self.approved_targets from batch."""

exists_detection, idx_detection = self.element_exists([detection.nr_batch for detection in self.detections],
from_batch)
exists_detection, idx_detection = self.element_exists([detection.nr_batch for detection in self.detections], from_batch)
if not exists_detection: # Make sure existing batch number in detections
return

exist_tracked, idx_tracked = self.element_exists([detection.nr_batch for detection in self.approved_targets],
from_batch)
exist_tracked, idx_tracked = self.element_exists([detection.nr_batch for detection in self.approved_targets], from_batch)

if exist_tracked: # Check new data with existing track.
idx_compare = idx_tracked - 1
if exist_tracked: # Check new data with existing track.
idx_compare = idx_tracked-1
while not self.approved_targets[idx_compare].valid_measurement:
idx_compare -= 1

tracked_hocs = self.get_tracked_hocs(idx_tracked)
# print(f"tracked hocs exists: {tracked_hocs}")
idx_target, valid = self.get_target_value(self.detections[idx_detection], tracked_hocs,
self.approved_targets[idx_compare - 2: idx_compare],
self.approved_targets[idx_compare-2: idx_compare],
self.approved_targets[idx_compare].valid_measurement)

if self.approved_targets[idx_tracked].idx_person == idx_target:
Expand All @@ -572,7 +574,7 @@ def update_target(self, from_batch):

self.approved_targets = self.approved_targets[:idx_compare]

while idx_detection < len(self.detections) - 1:
while idx_detection < len(self.detections)-1:
tracked_hocs = self.get_tracked_hocs()
# print(f"tracked hocs new: {tracked_hocs}")

Expand All @@ -585,16 +587,15 @@ def update_target(self, from_batch):

return

if self.approved_targets[
-1].nr_batch < from_batch: # Add single data association step to the end of target list


if self.approved_targets[-1].nr_batch < from_batch: # Add single data association step to the end of target list

# Get 5 previous hoc measurements from track
tracked_hocs = self.get_tracked_hocs()
# print(f"tracked hocs new: {tracked_hocs}")

idx_target, valid = self.get_target_value(self.detections[idx_detection], tracked_hocs,
self.approved_targets[-3:-1],
self.approved_targets[-1].valid_measurement)
idx_target, valid = self.get_target_value(self.detections[idx_detection], tracked_hocs, self.approved_targets[-3:-1], self.approved_targets[-1].valid_measurement)
self.add_approved_target(self.detections[idx_detection], idx_target, valid)

return
Expand All @@ -608,21 +609,19 @@ def get_weights(flag_face, flag_hoc, flag_da, valid):
:return: weight_face, weight_hoc, weight_da
"""
#TODO update to make da more important and hoc lesss.

# print(f"weight:{sum([flag_face,flag_hoc,flag_da])} {valid}")
if sum([flag_face, flag_hoc, flag_da]) <= 0:
return 0.0, 0.0, 0.0

if not valid: # Use different weights if target was lost in previous steps
if not valid: # Use different weights if target was lost in previous steps
weight_da = 0.0

nr_parameters = sum([flag_face, flag_hoc])
weights = [[1.0, 1.0], # 1 parameter
[0.9, 0.1]] # 2 parameters
weights = [[1.0, 1.0], # 1 parameter
[0.9, 0.1]] # 2 parameters

if flag_face:
weight_face = weights[nr_parameters - 1][0]
weight_face = weights[nr_parameters-1][0]
else:
weight_face = 0.0

Expand All @@ -634,29 +633,28 @@ def get_weights(flag_face, flag_hoc, flag_da, valid):
else:
nr_parameters = sum([flag_face, flag_hoc, flag_da]) # How many measurement types available
current_weight = 2
weights = [[0.0, 0.0, 1.0], # 1 parameter
[0.0, 0.2, 0.8], # 2 parameters
[0.1, 0.2, 0.7]] # 3 parameters
weights = [[0.0, 0.0, 1.0], # 1 parameter
[0.0, 0.2, 0.8], # 2 parameters
[0.1, 0.2, 0.7]] # 3 parameters

if flag_face:
weight_face = weights[nr_parameters - 1][current_weight]
current_weight -= 1
else:
weight_face = 0.0

if flag_da:
weight_da = weights[nr_parameters - 1][current_weight]
else:
weight_da = 0.0

if flag_hoc:
weight_hoc = weights[nr_parameters - 1][current_weight]
current_weight -= 1
else:
weight_hoc = 0.0

if flag_da:
weight_da = weights[nr_parameters - 1][current_weight]
else:
weight_da = 0.0

return weight_face, weight_hoc, weight_da

##########################################
def plot_tracker(self):
""" Plot the trackers on a camera frame and publish it.
Expand Down

0 comments on commit 254039b

Please sign in to comment.