diff --git a/modules/cluster_estimation/cluster_estimation_worker.py b/modules/cluster_estimation/cluster_estimation_worker.py index 17c58765..bd857e3e 100644 --- a/modules/cluster_estimation/cluster_estimation_worker.py +++ b/modules/cluster_estimation/cluster_estimation_worker.py @@ -5,6 +5,7 @@ import os import pathlib +from modules import detection_in_world from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import cluster_estimation @@ -76,6 +77,18 @@ def cluster_estimation_worker( input_data = input_queue.queue.get() if input_data is None: + local_logger.info("Recieved type None, exiting.") + break + + is_invalid = False + + for single_input in input_data: + if not isinstance(single_input, detection_in_world.DetectionInWorld): + local_logger.warning(f"Skipping unexpected input: {input}") + is_invalid = True + break + + if is_invalid: continue # TODO: When to override diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index 8339587f..6e5f3b66 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -6,6 +6,7 @@ import pathlib import queue +from modules import object_in_world from . import communications from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller @@ -59,7 +60,24 @@ def communications_worker( while not controller.is_exit_requested(): controller.check_pause() - result, value = comm.run(input_queue.queue.get()) + input_data = input_queue.queue.get() + + if input_data is None: + local_logger.info("Recieved type None, exiting.") + break + + is_invalid = False + + for single_input in input_data: + if not isinstance(single_input, object_in_world.ObjectInWorld): + local_logger.warning(f"Skipping unexpected input: {input}") + is_invalid = True + break + + if is_invalid: + continue + + result, value = comm.run(input_data) if not result: continue diff --git a/modules/detect_target/detect_target_worker.py b/modules/detect_target/detect_target_worker.py index 8ce5cbc4..17b5cf43 100644 --- a/modules/detect_target/detect_target_worker.py +++ b/modules/detect_target/detect_target_worker.py @@ -5,6 +5,7 @@ import os import pathlib +from modules import image_and_time from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import detect_target_factory @@ -64,8 +65,13 @@ def detect_target_worker( input_data = input_queue.queue.get() if input_data is None: + local_logger.info("Recieved type None, exiting.") break + if not isinstance(input_data, image_and_time.ImageAndTime): + local_logger.warning(f"Skipping unexpected input: {input}") + continue + result, value = detector.run(input_data) if not result: continue diff --git a/modules/geolocation/geolocation_worker.py b/modules/geolocation/geolocation_worker.py index 11781688..4e7ac6d7 100644 --- a/modules/geolocation/geolocation_worker.py +++ b/modules/geolocation/geolocation_worker.py @@ -5,6 +5,7 @@ import os import pathlib +from modules import merged_odometry_detections from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import camera_properties @@ -55,8 +56,13 @@ def geolocation_worker( input_data = input_queue.queue.get() if input_data is None: + local_logger.info("Recieved type None, exiting.") break + if not isinstance(input_data, merged_odometry_detections.MergedOdometryDetections): + local_logger.warning(f"Skipping unexpected input: {input}") + continue + result, value = locator.run(input_data) if not result: continue diff --git a/utilities/workers/worker_manager.py b/utilities/workers/worker_manager.py index be7480a7..fcf8cdfb 100644 --- a/utilities/workers/worker_manager.py +++ b/utilities/workers/worker_manager.py @@ -247,8 +247,8 @@ def check_and_restart_dead_workers(self) -> bool: # Draining the preceding queue ensures that the preceding queue data wasn't what # caused the worker to fail. Draining the succeeding queues is not needed # because a worker that died would not have put bad data into the queue. - input_queues = self.__worker_properties.get_input_queues() - for queue in input_queues: - queue.drain_queue() + # input_queues = self.__worker_properties.get_input_queues() + # for queue in input_queues: + # queue.drain_queue() return True