2
2
import sys
3
3
import rospy
4
4
import cv2
5
- import math
6
5
from cv_bridge import CvBridge
7
6
import copy
8
7
9
- from typing import List , Union
10
-
8
+ from typing import List
9
+ from collections import namedtuple
11
10
from UKFclass import *
12
11
13
12
# MSGS
14
13
from sensor_msgs .msg import Image
15
- from people_tracking .msg import ColourCheckedTarget , ColourTarget
16
- from people_tracking .msg import DetectedPerson
14
+ from people_tracking .msg import ColourCheckedTarget , ColourTarget , DetectedPerson
17
15
18
16
# SrvS
19
17
from std_srvs .srv import Empty
24
22
laptop = sys .argv [1 ]
25
23
name_subscriber_RGB = 'video_frames' if laptop == "True" else '/hero/head_rgbd_sensor/rgb/image_raw'
26
24
27
- CAMERA_WIDTH = 640
28
- CAMERA_HEIGHT = 480
29
-
30
- from collections import namedtuple
31
-
32
- # Detection = namedtuple("Detection", ["batch_nr", "idx_person", "time", "x", "y", "z"])
33
- Persons = namedtuple ("Persons" , ["nr_batch" , "time" , "nr_persons" , "x_positions" , "y_positions" , "z_positions" , "colour_vectors" , "face_detected" ])
25
+ Persons = namedtuple ("Persons" ,
26
+ ["nr_batch" , "time" , "nr_persons" , "x_positions" , "y_positions" , "z_positions" , "colour_vectors" ,
27
+ "face_detected" ])
34
28
Target = namedtuple ("Target" , ["nr_batch" , "time" , "idx_person" , "x" , "y" , "z" ])
35
29
Target_hoc = namedtuple ("Target_hoc" , ["nr_batch" , "colour_vector" ])
36
30
@@ -64,32 +58,31 @@ def __init__(self) -> None:
64
58
self .approved_targets_hoc = [] # HoC's from approved target (only keep last 10).
65
59
self .time_since_identifiers = None
66
60
67
- # self.hoc_detections = [] # latest detected persons with hoc
68
- # self.new_hoc_detection = False
69
- # self.time_hoc_detection_sec = None
70
- #
71
- # self.face_detections = [] # latest detected persons with face
72
- # self.new_face_detection = False
73
- # self.time_face_detection_sec = None
61
+ self .tracked_plottable = True
74
62
75
63
self .detections = [] # All persons detected per frame
76
64
self .new_detections = False
77
65
78
66
self .latest_image = None
79
67
80
- # self.ukf_hoc = UKF()
81
- # self.data_hoc = []
82
- #
83
- # self.ukf_face = UKF()
84
- # self.data_face = [] # list of data added to confirmed and not to face
85
- #
86
- # self.ukf_data_association = UKF() # UKF with the data association
87
- # self.data_data_association = []
88
- #
89
- # self.ukf_confirmed = UKF()
68
+ self .ukf_from_data = UKF ()
69
+ self .ukf_past_data = UKF ()
90
70
91
71
def reset (self ):
92
72
""" Reset all stored variables in Class to their default values."""
73
+ self .approved_targets = [Target (0 , 0 , 0 , 320 , 240 , 0 )]
74
+ self .approved_targets_hoc = [] # HoC's from approved target (only keep last 10).
75
+ self .time_since_identifiers = None
76
+
77
+ self .tracked_plottable = True
78
+
79
+ self .detections = [] # All persons detected per frame
80
+ self .new_detections = False
81
+
82
+ self .latest_image = None
83
+
84
+ self .ukf_from_data = UKF ()
85
+ self .ukf_past_data = UKF ()
93
86
94
87
@staticmethod
95
88
def euclidean_distance (point1 , point2 ):
@@ -150,7 +143,7 @@ def get_latest_image(self, data: Image) -> None:
150
143
""" Get the most recent frame/image from the camera."""
151
144
self .latest_image = data
152
145
153
- def callback_hoc (self , data : ColourCheckedTarget , amount_detections_stored : int = 100 ) -> None :
146
+ def callback_hoc (self , data : ColourCheckedTarget ) -> None :
154
147
""" Add the latest HoC detection to the storage."""
155
148
batch_nr = data .nr_batch
156
149
colour_vectors = [data .colour_vectors [i :i + 32 * 3 ] for i in range (0 , len (data .colour_vectors ), 32 * 3 )]
@@ -160,10 +153,11 @@ def callback_hoc(self, data: ColourCheckedTarget, amount_detections_stored: int
160
153
nr_batch , time , nr_persons , x_positions , y_positions , z_positions , _ , face_detected = self .detections [idx ]
161
154
self .detections [idx ] = Persons (nr_batch , time , nr_persons , x_positions , y_positions , z_positions ,
162
155
colour_vectors , face_detected )
156
+ self .new_detections = True
163
157
else :
164
158
rospy .loginfo ("HoC detection not used" )
165
159
166
- def callback_face (self , data : ColourCheckedTarget , amount_detections_stored : int = 100 ) -> None :
160
+ def callback_face (self , data : ColourCheckedTarget ) -> None :
167
161
""" Add the latest Face detection to the storage."""
168
162
batch_nr = data .batch_nr
169
163
face_detections = data .face_detections
@@ -172,6 +166,7 @@ def callback_face(self, data: ColourCheckedTarget, amount_detections_stored: int
172
166
nr_batch , time , nr_persons , x_positions , y_positions , z_positions , colour_vectors , _ = self .detections [idx ]
173
167
self .detections [idx ] = Persons (nr_batch , time , nr_persons , x_positions , y_positions , z_positions ,
174
168
colour_vectors , face_detections )
169
+ self .new_detections = True
175
170
else :
176
171
rospy .loginfo ("Face detection not used" )
177
172
@@ -183,18 +178,17 @@ def callback_persons(self, data: DetectedPerson) -> None:
183
178
x_positions = data .x_positions
184
179
y_positions = data .y_positions
185
180
z_positions = data .z_positions
186
-
187
- if len (self .detections ) >= 100 :
188
- self .detections .pop (0 )
181
+ #
182
+ # if len(self.detections) >= 100:
183
+ # self.detections.pop(0)
189
184
colour_vectors = [None ] * nr_persons
190
185
face_detected = [None ] * nr_persons
191
186
self .detections .append (
192
187
Persons (nr_batch , time , nr_persons , x_positions , y_positions , z_positions , colour_vectors , face_detected ))
193
188
self .new_detections = True
194
189
# rospy.loginfo([person.nr_batch for person in self.detections])
195
190
196
-
197
- def plot_tracker (self ): # ToDo Update image plotter with useful new things
191
+ def plot_tracker (self ):
198
192
""" Plot the trackers on a camera frame and publish it.
199
193
This can be used to visualise all the output from the trackers. [x,y coords] Currently not for depth
200
194
"""
@@ -203,40 +197,25 @@ def plot_tracker(self): # ToDo Update image plotter with useful new things
203
197
latest_image = self .latest_image
204
198
cv_image = bridge .imgmsg_to_cv2 (latest_image , desired_encoding = 'passthrough' )
205
199
206
- if len (self .approved_targets ) > 0 : # Plot latest approved measurement
200
+ if len (self .approved_targets ) > 0 and self . tracked_plottable : # Plot latest approved measurement
207
201
x_approved = self .approved_targets [- 1 ].x
208
202
y_approved = self .approved_targets [- 1 ].y
209
203
cv2 .circle (cv_image , (x_approved , y_approved ), 5 , (255 , 0 , 0 , 50 ), - 1 ) # BGR
210
204
211
- # current_time = float(rospy.get_time())
212
- # if self.ukf_data_association.current_time < current_time: # Get prediction for current time
213
- # ukf_predict = copy.deepcopy(self.ukf_data_association)
214
- # ukf_predict.predict(current_time)
215
- # else:
216
- # ukf_predict = self.ukf_data_association
217
- #
218
- # x_hoc = int(self.ukf_confirmed.kf.x[0])
219
- # y_hoc = int(self.ukf_confirmed.kf.x[2])
220
- #
221
- # x_position = int(ukf_predict.kf.x[0])
222
- # y_position = int(ukf_predict.kf.x[2])
223
-
224
- # rospy.loginfo("predict x: %s, y: %s; measured x: %s, y: %s, HoC x: %s, y: %s", x_position, y_position,
225
- # self.data_data_association[-1][-3], self.data_data_association[-1][-2], x_hoc, y_hoc)
226
- tracker_image = bridge .cv2_to_imgmsg (cv_image , encoding = "passthrough" )
227
- self .publisher_debug .publish (tracker_image )
228
-
205
+ # Get location with UKF
206
+ current_time = float (rospy .get_time ())
207
+ if self .ukf_from_data .current_time < current_time : # Get prediction for current time
208
+ ukf_predict = copy .deepcopy (self .ukf_past_data )
209
+ ukf_predict .predict (current_time )
210
+ else :
211
+ ukf_predict = self .ukf_from_data
229
212
213
+ x_ukf = int (ukf_predict .kf .x [0 ])
214
+ y_ukf = int (ukf_predict .kf .x [2 ])
215
+ cv2 .circle (cv_image , (x_ukf , y_ukf ), 5 , (0 , 255 , 0 , 50 ), - 1 ) # BGR
230
216
231
- # def update_ukf_tracker(self, ukf: UKF, ukf_update_data, ukf_data, time):
232
- # """ Update the known UKF up until given index."""
233
- #
234
- # if ukf.current_time < time:
235
- # for entry in ukf_update_data:
236
- # z = [entry.x, entry.y, entry.z]
237
- # ukf.update(entry.time, z)
238
- #
239
- # ukf_data.append(ukf_update_data)
217
+ tracker_image = bridge .cv2_to_imgmsg (cv_image , encoding = "passthrough" )
218
+ self .publisher_debug .publish (tracker_image )
240
219
241
220
def get_distance_hoc (self , hoc ):
242
221
""" Compare given hoc to targets last 10 HoC's. Return the smallest distance of hoc. 0 means perfect match."""
@@ -252,13 +231,33 @@ def get_distance_hoc(self, hoc):
252
231
253
232
def track_person (self ):
254
233
""" Track the target."""
255
- self .approved_targets = self .remove_outside_batches (self .approved_targets , 0 , 0 )
256
- self .approved_targets_hoc = self .remove_outside_batches (self .approved_targets_hoc , 0 , 0 )
234
+ rospy .loginfo (f"target: { len (self .approved_targets )} hoc: { len (self .approved_targets_hoc )} " )
257
235
258
- for idx , measurement in enumerate (
259
- self .detections ): # Do data association with other data through all detections
236
+ # Clear approved targets to 1
237
+ if len (self .approved_targets ) > 100 :
238
+ start_batch = self .approved_targets [- 101 ].nr_batch
239
+ else :
240
+ start_batch = self .approved_targets [0 ].nr_batch
241
+
242
+ self .approved_targets = self .remove_outside_batches (self .approved_targets , start_batch , start_batch )
243
+
244
+ # Clear hoc to 10 measurements
245
+ self .approved_targets_hoc = self .remove_outside_batches (self .approved_targets_hoc , 0 , start_batch )
246
+ while len (self .approved_targets_hoc ) > 10 :
247
+ self .approved_targets_hoc .pop (0 )
248
+
249
+ # UKF
250
+ self .ukf_from_data = UKF ()
251
+ self .ukf_from_data .update (self .approved_targets [0 ].time ,
252
+ [self .approved_targets [0 ].x , self .approved_targets [0 ].y , self .approved_targets [0 ].z ])
260
253
261
- if measurement .nr_persons <= 0 : # Skip measurement if no persons
254
+ self .tracked_plottable = False
255
+
256
+ base_idx = len (self .detections ) - 100
257
+ for idx_raw , measurement in enumerate (
258
+ self .detections [- 100 :]): # Do data association with other data through all detections
259
+ idx = base_idx + idx_raw
260
+ if measurement .nr_persons <= 0 : # Skip measurement if no persons
262
261
continue
263
262
264
263
flag_hoc = False
@@ -268,7 +267,7 @@ def track_person(self):
268
267
if any (x is not None for x in measurement .face_detected ): # There is a face in the measurement
269
268
faces = measurement .face_detected
270
269
flag_face = True
271
- else : # There is no face daa
270
+ else : # There is no face data
272
271
faces = [0 ] * measurement .nr_persons
273
272
274
273
if any (x is not None for x in measurement .colour_vectors ): # There is HoC data
@@ -280,7 +279,7 @@ def track_person(self):
280
279
# Normalize data
281
280
max_distance_hoc = max (distance_hoc )
282
281
if 0 == max_distance_hoc :
283
- norm_hoc_distance = [0 for distance in distance_hoc ]
282
+ norm_hoc_distance = [0 for _ in distance_hoc ]
284
283
else :
285
284
norm_hoc_distance = [1 - distance / max_distance_hoc for distance in distance_hoc ]
286
285
@@ -290,7 +289,7 @@ def track_person(self):
290
289
norm_hoc_distance = [0 ] * measurement .nr_persons
291
290
292
291
previous_target_coords = (
293
- self .approved_targets [- 1 ].x , self .approved_targets [- 1 ].y , self .approved_targets [- 1 ].z )
292
+ self .approved_targets [- 1 ].x , self .approved_targets [- 1 ].y , self .approved_targets [- 1 ].z )
294
293
distance_da = []
295
294
for person_idx in range (measurement .nr_persons ):
296
295
position = (measurement .x_positions [person_idx ], measurement .y_positions [person_idx ],
@@ -299,8 +298,9 @@ def track_person(self):
299
298
300
299
# Normalize data
301
300
max_distance_da = max (distance_da )
301
+ # rospy.loginfo(f"euclid:{distance_da}")
302
302
if 0 == max_distance_da :
303
- norm_distance_da = [0 for value in distance_da ]
303
+ norm_distance_da = [0 for _ in distance_da ]
304
304
else :
305
305
norm_distance_da = [1 - value / max_distance_da for value in distance_da ]
306
306
@@ -309,7 +309,6 @@ def track_person(self):
309
309
310
310
nr_batch = measurement .nr_batch
311
311
time = measurement .time
312
- idx_target = None
313
312
314
313
nr_parameters = sum ([flag_face , flag_hoc , flag_da ])
315
314
current_weight = 2
@@ -346,10 +345,13 @@ def track_person(self):
346
345
z = measurement .z_positions [idx_target ]
347
346
self .approved_targets .append (Target (nr_batch , time , idx_target , x , y , z ))
348
347
self .time_since_identifiers = time
348
+ self .ukf_from_data .update (time , [x , y , z ]) # UKF
349
349
350
350
if flag_hoc :
351
351
self .approved_targets_hoc .append (Target_hoc (nr_batch , measurement .colour_vectors [idx_target ]))
352
352
# rospy.loginfo([(target.nr_batch, target.idx_person) for target in self.approved_targets])
353
+ self .ukf_past_data = copy .deepcopy (self .ukf_from_data )
354
+ self .tracked_plottable = True
353
355
354
356
def loop (self ):
355
357
""" Loop that repeats itself at self.rate. Can be used to execute methods at given rate. """
@@ -359,11 +361,20 @@ def loop(self):
359
361
if self .latest_image is not None :
360
362
self .plot_tracker ()
361
363
362
- # ToDo Add logic for when to run track person function
364
+ # Remove detections older than 500 entries ago
365
+ while len (self .detections ) > 500 :
366
+ self .detections .pop (0 )
367
+
368
+ # ToDo Move the picture plotter to different node -> might help with visible lag spikes in tracker
363
369
if self .new_detections :
364
370
self .track_person ()
365
371
self .new_detections = False
366
372
373
+ # Add target lost check (based on time)
374
+ if self .time_since_identifiers is None :
375
+ continue
376
+ if rospy .get_time () - self .time_since_identifiers > 3 :
377
+ rospy .loginfo ("Target Lost (ID'ers)" )
367
378
368
379
self .rate .sleep ()
369
380
@@ -377,11 +388,6 @@ def remove_outside_batches(lst: List, start_batch: int = 0, end_batch: int = flo
377
388
return result
378
389
379
390
380
- # ToDo Add detection data trimmer
381
- # ToDo Add target lost check (based on time)
382
- # ToDo Add clearing logic for HoC measurement and target data
383
- # ToDo Add UKF
384
-
385
391
if __name__ == '__main__' :
386
392
try :
387
393
node_pt = PeopleTracker ()
0 commit comments