diff --git a/config.yaml b/config.yaml index 44bdd6df..c10d0c4e 100644 --- a/config.yaml +++ b/config.yaml @@ -39,3 +39,6 @@ cluster_estimation: min_activation_threshold: 25 min_new_points_to_run: 5 random_state: 0 + +communications: + timeout: 30.0 # seconds diff --git a/main_2024.py b/main_2024.py index 29ec29ad..9c59908d 100644 --- a/main_2024.py +++ b/main_2024.py @@ -12,6 +12,7 @@ # Used in type annotation of flight interface output # pylint: disable-next=unused-import from modules import odometry_and_time +from modules.communications import communications_worker from modules.detect_target import detect_target_factory from modules.detect_target import detect_target_worker from modules.flight_interface import flight_interface_worker @@ -86,8 +87,8 @@ def main() -> int: VIDEO_INPUT_SAVE_PREFIX = str(pathlib.Path(logging_path, VIDEO_INPUT_SAVE_NAME_PREFIX)) DETECT_TARGET_WORKER_COUNT = config["detect_target"]["worker_count"] - detect_target_option_int = config["detect_target"]["option"] - DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(detect_target_option_int) + DETECT_TARGET_OPTION_INT = config["detect_target"]["option"] + DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(DETECT_TARGET_OPTION_INT) DETECT_TARGET_DEVICE = "cpu" if args.cpu else config["detect_target"]["device"] DETECT_TARGET_MODEL_PATH = config["detect_target"]["model_path"] DETECT_TARGET_OVERRIDE_FULL_PRECISION = args.full @@ -117,6 +118,8 @@ def main() -> int: MIN_NEW_POINTS_TO_RUN = config["cluster_estimation"]["min_new_points_to_run"] RANDOM_STATE = config["cluster_estimation"]["random_state"] + COMMUNICATIONS_TIMEOUT = config["communications"]["timeout"] + # pylint: enable=invalid-name except KeyError as exception: main_logger.error(f"Config key(s) not found: {exception}", True) @@ -141,6 +144,10 @@ def main() -> int: mp_manager, QUEUE_MAX_SIZE, ) + flight_interface_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, + QUEUE_MAX_SIZE, + ) data_merge_to_geolocation_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, @@ -153,11 +160,14 @@ def main() -> int: mp_manager, QUEUE_MAX_SIZE, ) - cluster_estimation_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( + cluster_estimation_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, + QUEUE_MAX_SIZE, + ) + communications_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, ) - result, camera_intrinsics = camera_properties.CameraIntrinsics.create( GEOLOCATION_RESOLUTION_X, GEOLOCATION_RESOLUTION_Y, @@ -238,7 +248,10 @@ def main() -> int: FLIGHT_INTERFACE_WORKER_PERIOD, ), input_queues=[flight_interface_decision_queue], - output_queues=[flight_interface_to_data_merge_queue], + output_queues=[ + flight_interface_to_data_merge_queue, + flight_interface_to_communications_queue, + ], controller=controller, local_logger=main_logger, ) @@ -292,7 +305,7 @@ def main() -> int: target=cluster_estimation_worker.cluster_estimation_worker, work_arguments=(MIN_ACTIVATION_THRESHOLD, MIN_NEW_POINTS_TO_RUN, RANDOM_STATE), input_queues=[geolocation_to_cluster_estimation_queue], - output_queues=[cluster_estimation_to_main_queue], + output_queues=[cluster_estimation_to_communications_queue], controller=controller, local_logger=main_logger, ) @@ -303,6 +316,24 @@ def main() -> int: # Get Pylance to stop complaining assert cluster_estimation_worker_properties is not None + result, communications_worker_properties = worker_manager.WorkerProperties.create( + count=1, + target=communications_worker.communications_worker, + work_arguments=(COMMUNICATIONS_TIMEOUT,), + input_queues=[ + flight_interface_to_communications_queue, + cluster_estimation_to_communications_queue, + ], + output_queues=[communications_to_main_queue], + controller=controller, + local_logger=main_logger, + ) + if not result: + main_logger.error("Failed to create arguments for Communications Worker", True) + return -1 + + assert communications_worker_properties is not None + # Create managers worker_managers = [] @@ -384,6 +415,19 @@ def main() -> int: worker_managers.append(cluster_estimation_manager) + result, communications_manager = worker_manager.WorkerManager.create( + worker_properties=communications_worker_properties, + local_logger=main_logger, + ) + if not result: + main_logger.error("Failed to create manager for Communications", True) + return -1 + + # Get Pylance to stop complaining + assert communications_manager is not None + + worker_managers.append(communications_manager) + # Run for manager in worker_managers: manager.start_workers() @@ -396,16 +440,12 @@ def main() -> int: return -1 try: - cluster_estimations = cluster_estimation_to_main_queue.queue.get_nowait() + cluster_estimations = communications_to_main_queue.queue.get_nowait() except queue.Empty: cluster_estimations = None if cluster_estimations is not None: - for cluster in cluster_estimations: - main_logger.debug("Cluster in world: " + True) - main_logger.debug("Cluster location x: " + str(cluster.location_x)) - main_logger.debug("Cluster location y: " + str(cluster.location_y)) - main_logger.debug("Cluster spherical variance: " + str(cluster.spherical_variance)) + main_logger.debug(f"Clusters: {cluster_estimations}") if cv2.waitKey(1) == ord("q"): # type: ignore main_logger.info("Exiting main loop", True) break @@ -416,10 +456,12 @@ def main() -> int: video_input_to_detect_target_queue.fill_and_drain_queue() detect_target_to_data_merge_queue.fill_and_drain_queue() flight_interface_to_data_merge_queue.fill_and_drain_queue() + flight_interface_to_communications_queue.fill_and_drain_queue() data_merge_to_geolocation_queue.fill_and_drain_queue() geolocation_to_cluster_estimation_queue.fill_and_drain_queue() + cluster_estimation_to_communications_queue.fill_and_drain_queue() + communications_to_main_queue.fill_and_drain_queue() flight_interface_decision_queue.fill_and_drain_queue() - cluster_estimation_to_main_queue.fill_and_drain_queue() for manager in worker_managers: manager.join_workers() diff --git a/modules/common b/modules/common index a0aac8ce..a256a497 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit a0aac8ce29273a6a1ca397a2229770add760835e +Subproject commit a256a49778d1154e03683c3b5e2fe6cb215d00e7 diff --git a/modules/communications/communications.py b/modules/communications/communications.py new file mode 100644 index 00000000..b1127475 --- /dev/null +++ b/modules/communications/communications.py @@ -0,0 +1,90 @@ +""" +Logs data and forwards it. +""" + +import time + +from .. import object_in_world +from ..common.modules import position_global +from ..common.modules import position_local +from ..common.modules.logger import logger +from ..common.modules.mavlink import local_global_conversion + + +class Communications: + """ + Currently logs data only. + """ + + __create_key = object() + + @classmethod + def create( + cls, + home_position: position_global.PositionGlobal, + local_logger: logger.Logger, + ) -> "tuple[True, Communications] | tuple[False, None]": + """ + Logs data and forwards it. + + home_position: Take-off position of drone. + + Returns: Success, class object. + """ + + return True, Communications(cls.__create_key, home_position, local_logger) + + def __init__( + self, + class_private_create_key: object, + home_position: position_global.PositionGlobal, + local_logger: logger.Logger, + ) -> None: + """ + Private constructor, use create() method. + """ + assert class_private_create_key is Communications.__create_key, "Use create() method" + + self.__home_position = home_position + self.__logger = local_logger + + def run( + self, + objects_in_world: list[object_in_world.ObjectInWorld], + ) -> tuple[True, list[object_in_world.ObjectInWorld]] | tuple[False, None]: + + objects_in_world_global = [] + for object_in_world in objects_in_world: + # We assume detected objects are on the ground + north = object_in_world.location_x + east = object_in_world.location_y + down = 0.0 + + result, object_position_local = position_local.PositionLocal.create( + north, + east, + down, + ) + if not result: + self.__logger.warning( + f"Could not convert ObjectInWorld to PositionLocal:\nobject in world: {object_in_world}" + ) + return False, None + + result, object_in_world_global = ( + local_global_conversion.position_global_from_position_local( + self.__home_position, object_position_local + ) + ) + if not result: + # Log nothing if at least one of the conversions failed + self.__logger.warning( + f"position_global_from_position_local conversion failed:\nhome_position: {self.__home_position}\nobject_position_local: {object_position_local}" + ) + return False, None + + objects_in_world_global.append(object_in_world_global) + + self.__logger.info(f"{time.time()}: {objects_in_world_global}") + + return True, objects_in_world diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py new file mode 100644 index 00000000..8339587f --- /dev/null +++ b/modules/communications/communications_worker.py @@ -0,0 +1,66 @@ +""" +Logs data and forwards it. +""" + +import os +import pathlib +import queue + +from . import communications +from utilities.workers import queue_proxy_wrapper +from utilities.workers import worker_controller +from ..common.modules.logger import logger + + +def communications_worker( + timeout: float, + home_position_queue: queue_proxy_wrapper.QueueProxyWrapper, + input_queue: queue_proxy_wrapper.QueueProxyWrapper, + output_queue: queue_proxy_wrapper.QueueProxyWrapper, + controller: worker_controller.WorkerController, +) -> None: + """ + Worker process. + + home_position_queue contains home positions for creating communications object. + input_queue and output_queue are data queues. + controller is how the main process communicates to this worker process. + """ + + worker_name = pathlib.Path(__file__).stem + process_id = os.getpid() + result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True) + if not result: + print("ERROR: Worker failed to create logger") + return + + # Get Pylance to stop complaining + assert local_logger is not None + + local_logger.info("Logger initialized", True) + + # Get home position + try: + home_position = home_position_queue.queue.get(timeout=timeout) + except queue.Empty: + local_logger.error("Home position queue timed out on startup", True) + return + + local_logger.info(f"Home position received: {home_position}", True) + + result, comm = communications.Communications.create(home_position, local_logger) + if not result: + local_logger.error("Worker failed to create class object", True) + return + + # Get Pylance to stop complaining + assert comm is not None + + while not controller.is_exit_requested(): + controller.check_pause() + + result, value = comm.run(input_queue.queue.get()) + if not result: + continue + + output_queue.queue.put(value) diff --git a/modules/flight_interface/flight_interface.py b/modules/flight_interface/flight_interface.py index e1b47d86..ef976278 100644 --- a/modules/flight_interface/flight_interface.py +++ b/modules/flight_interface/flight_interface.py @@ -47,7 +47,7 @@ def create( # Get Pylance to stop complaining assert home_position is not None - local_logger.info(str(home_position), True) + local_logger.info(f"Home position: {home_position}", True) return True, FlightInterface(cls.__create_key, controller, home_position, local_logger) @@ -67,6 +67,12 @@ def __init__( self.__home_position = home_position self.__logger = local_logger + def get_home_position(self) -> position_global.PositionGlobal: + """ + Accessor for home position. + """ + return self.__home_position + def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": """ Returns a possible OdometryAndTime with current timestamp. diff --git a/modules/flight_interface/flight_interface_worker.py b/modules/flight_interface/flight_interface_worker.py index 2ead7bae..41610a73 100644 --- a/modules/flight_interface/flight_interface_worker.py +++ b/modules/flight_interface/flight_interface_worker.py @@ -19,6 +19,7 @@ def flight_interface_worker( period: float, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, + communications_output_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -53,6 +54,9 @@ def flight_interface_worker( # Get Pylance to stop complaining assert interface is not None + home_position = interface.get_home_position() + communications_output_queue.queue.put(home_position) + while not controller.is_exit_requested(): controller.check_pause() diff --git a/tests/integration/test_flight_interface_worker.py b/tests/integration/test_flight_interface_worker.py index 17f1983b..af52cc9d 100644 --- a/tests/integration/test_flight_interface_worker.py +++ b/tests/integration/test_flight_interface_worker.py @@ -103,6 +103,7 @@ def main() -> int: mp_manager = mp.Manager() out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) worker = mp.Process( @@ -114,6 +115,7 @@ def main() -> int: FLIGHT_INTERFACE_WORKER_PERIOD, in_queue, # Added input_queue out_queue, + home_position_out_queue, controller, ), ) @@ -124,6 +126,8 @@ def main() -> int: time.sleep(3) # Test + home_position = home_position_out_queue.queue.get() + assert home_position is not None # Run the apply_decision tests test_result = apply_decision_test(in_queue, out_queue)