@@ -83,7 +83,6 @@ def combine_tf_static_messages(tf_messages: List[Any]) -> Optional[Any]:
8383 return base_message
8484
8585
86- # --- CORRECTED ROBUST STREAMING SYNCHRONIZATION LOGIC ---
8786def read_synchronized_messages_streaming (
8887 bag_file : str ,
8988 topics_to_read : Dict [str , str ],
@@ -115,7 +114,6 @@ def read_synchronized_messages_streaming(
115114 typestore = get_typestore (ros_store )
116115
117116 with AnyReader ([Path (bag_file ).parent ], default_typestore = typestore ) as reader :
118- # *** FIX: Find connections by comparing normalized topic names ***
119117 conn1 , conn2 = None , None
120118 for c in reader .connections :
121119 normalized_conn_topic = c .topic .lstrip ("/" )
@@ -124,7 +122,6 @@ def read_synchronized_messages_streaming(
124122 elif normalized_conn_topic == topic2_norm :
125123 conn2 = c
126124
127- # *** FIX: Provide a much more informative error message ***
128125 if not conn1 or not conn2 :
129126 available_pc_topics = [
130127 c .topic for c in reader .connections if "PointCloud2" in c .msgtype
@@ -194,6 +191,159 @@ def read_synchronized_messages_streaming(
194191 return final_messages
195192
196193
194+ def read_synchronized_image_cloud (
195+ bag_file : str ,
196+ image_topic : str ,
197+ pointcloud_topic : str ,
198+ camerainfo_topic : str ,
199+ topics_to_read : Dict [str , str ],
200+ progress_callback : Optional [Signal ] = None ,
201+ max_time_diff : float = 0.05 ,
202+ frame_samples : int = 6 ,
203+ ros_version : str = "JAZZY" ,
204+ ) -> Dict :
205+ """Synchronize image, point cloud, and camera info by nearest timestamp."""
206+ ros_store = getattr (Stores , f"ROS2_{ ros_version } " )
207+ typestore = get_typestore (ros_store )
208+
209+ img_norm = image_topic .lstrip ("/" )
210+ pc_norm = pointcloud_topic .lstrip ("/" )
211+ cam_norm = camerainfo_topic .lstrip ("/" )
212+ max_diff_ns = int (max_time_diff * 1e9 )
213+
214+ def next_msg (it ):
215+ try :
216+ return next (it )
217+ except StopIteration :
218+ return None
219+
220+ with AnyReader ([Path (bag_file ).parent ], default_typestore = typestore ) as reader :
221+ img_conn = pc_conn = cam_conn = None
222+ tf_connections = []
223+ for c in reader .connections :
224+ norm_topic = c .topic .lstrip ("/" )
225+ if norm_topic == img_norm :
226+ img_conn = c
227+ elif norm_topic == pc_norm :
228+ pc_conn = c
229+ elif norm_topic == cam_norm :
230+ cam_conn = c
231+ elif "tf_static" in c .topic :
232+ tf_connections .append (c )
233+
234+ if not img_conn or not pc_conn or not cam_conn :
235+ missing = [
236+ name
237+ for name , conn in zip (
238+ ["image" , "pointcloud" , "camera_info" ], [img_conn , pc_conn , cam_conn ]
239+ )
240+ if conn is None
241+ ]
242+ raise ValueError (f"Missing connections for: { ', ' .join (missing )} " )
243+
244+ img_iter = reader .messages (connections = [img_conn ])
245+ pc_iter = reader .messages (connections = [pc_conn ])
246+ cam_iter = reader .messages (connections = [cam_conn ])
247+
248+ img_state = next_msg (img_iter )
249+ pc_state = next_msg (pc_iter )
250+ cam_prev = None
251+ cam_state = next_msg (cam_iter )
252+
253+ def closest_camera_info (target_time_ns ):
254+ nonlocal cam_prev , cam_state
255+ while cam_state and cam_state [1 ] < target_time_ns :
256+ cam_prev = cam_state
257+ cam_state = next_msg (cam_iter )
258+
259+ candidates = []
260+ if cam_prev :
261+ candidates .append (cam_prev )
262+ if cam_state :
263+ candidates .append (cam_state )
264+ if not candidates :
265+ return None , None , None
266+
267+ best = min (candidates , key = lambda c : abs (c [1 ] - target_time_ns ))
268+ return best [2 ], abs (best [1 ] - target_time_ns ), best [1 ]
269+
270+ pairs = []
271+ if progress_callback :
272+ progress_callback .emit (10 , "Synchronizing image and point cloud..." )
273+
274+ while img_state and pc_state and len (pairs ) < frame_samples :
275+ _ , t_img , raw_img = img_state
276+ _ , t_pc , raw_pc = pc_state
277+ diff = abs (t_img - t_pc )
278+
279+ if diff <= max_diff_ns :
280+ img_msg = reader .deserialize (raw_img , img_conn .msgtype )
281+ pc_msg = reader .deserialize (raw_pc , pc_conn .msgtype )
282+ cam_raw , cam_diff , cam_time = closest_camera_info (t_img )
283+ if cam_raw is None :
284+ break
285+ cam_msg = reader .deserialize (cam_raw , cam_conn .msgtype )
286+
287+ pairs .append (
288+ {
289+ "image" : {
290+ "timestamp" : t_img ,
291+ "data" : img_msg ,
292+ "topic_type" : topics_to_read [image_topic ],
293+ "time_delta_ns" : diff ,
294+ },
295+ "pointcloud" : {
296+ "timestamp" : t_pc ,
297+ "data" : pc_msg ,
298+ "topic_type" : topics_to_read [pointcloud_topic ],
299+ "time_delta_ns" : diff ,
300+ },
301+ "camera_info" : {
302+ "timestamp" : cam_time if cam_time is not None else t_img ,
303+ "data" : cam_msg ,
304+ "topic_type" : topics_to_read [camerainfo_topic ],
305+ "time_delta_ns" : cam_diff ,
306+ },
307+ }
308+ )
309+
310+ img_state = next_msg (img_iter )
311+ pc_state = next_msg (pc_iter )
312+ elif t_img < t_pc :
313+ img_state = next_msg (img_iter )
314+ else :
315+ pc_state = next_msg (pc_iter )
316+
317+ if not pairs :
318+ raise ValueError (
319+ f"No synchronized image/point cloud pairs within { max_time_diff } s window."
320+ )
321+
322+ frame_samples_dict = {
323+ image_topic : [p ["image" ] for p in pairs ],
324+ pointcloud_topic : [p ["pointcloud" ] for p in pairs ],
325+ camerainfo_topic : [p ["camera_info" ] for p in pairs ],
326+ }
327+
328+ messages = {"frame_samples" : frame_samples_dict }
329+
330+ if tf_connections :
331+ tf_messages = [
332+ reader .deserialize (raw , tf_connections [0 ].msgtype )
333+ for _ , _ , raw in reader .messages (connections = tf_connections )
334+ ]
335+ if tf_messages :
336+ messages [tf_connections [0 ].topic ] = combine_tf_static_messages (tf_messages )
337+
338+ if progress_callback :
339+ best_diff_ms = min (p ["image" ]["time_delta_ns" ] for p in pairs ) / 1e6
340+ progress_callback .emit (
341+ 85 , f"Found { len (pairs )} synchronized pairs (best Δt { best_diff_ms :.2f} ms)."
342+ )
343+
344+ return messages
345+
346+
197347def read_all_messages_optimized (
198348 bag_file : str ,
199349 topics_to_read : dict ,
@@ -348,14 +498,19 @@ def run(self):
348498 ros_version = self .ros_version ,
349499 )
350500 else :
351- raw_messages = read_all_messages_optimized (
501+ image_topic = self .selected_topics_data ["image_topic" ]
502+ pointcloud_topic = self .selected_topics_data ["pointcloud_topic" ]
503+ camerainfo_topic = self .selected_topics_data ["camerainfo_topic" ]
504+ raw_messages = read_synchronized_image_cloud (
352505 self .bag_file ,
506+ image_topic ,
507+ pointcloud_topic ,
508+ camerainfo_topic ,
353509 self .topics_to_read ,
354510 self .progress_updated ,
355- self .total_messages ,
356- self .frame_samples ,
357- self .topic_message_counts ,
358- self .ros_version ,
511+ max_time_diff = self .sync_tolerance ,
512+ frame_samples = self .frame_samples ,
513+ ros_version = self .ros_version ,
359514 )
360515
361516 self .progress_updated .emit (95 , "Finalizing data..." )
0 commit comments