-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added a communication worker for logging cluster estimation #227
Changes from 29 commits
4ef1755
99f2d28
b50d714
39c55f0
1bd2cb6
89a7221
0e8263d
de18002
a0b0560
1173347
7cf4667
16cc619
8ff6035
7562a88
5599ccc
9d3b588
2501414
e4029e7
9ede6b4
2be3b1d
dea2413
76ab64f
194f0d8
df2d4de
e214595
92c3299
c7ca9c7
ce99100
4ebfdae
55755ca
affadfb
9e8df82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
# Used in type annotation of flight interface output | ||
# pylint: disable-next=unused-import | ||
from modules import odometry_and_time | ||
from modules.communications import communications_worker | ||
from modules.detect_target import detect_target_factory | ||
from modules.detect_target import detect_target_worker | ||
from modules.flight_interface import flight_interface_worker | ||
|
@@ -86,8 +87,8 @@ def main() -> int: | |
VIDEO_INPUT_SAVE_PREFIX = str(pathlib.Path(logging_path, VIDEO_INPUT_SAVE_NAME_PREFIX)) | ||
|
||
DETECT_TARGET_WORKER_COUNT = config["detect_target"]["worker_count"] | ||
detect_target_option_int = config["detect_target"]["option"] | ||
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(detect_target_option_int) | ||
DETECT_TARGET_OPTION_INT = config["detect_target"]["option"] | ||
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(DETECT_TARGET_OPTION_INT) | ||
DETECT_TARGET_DEVICE = "cpu" if args.cpu else config["detect_target"]["device"] | ||
DETECT_TARGET_MODEL_PATH = config["detect_target"]["model_path"] | ||
DETECT_TARGET_OVERRIDE_FULL_PRECISION = args.full | ||
|
@@ -117,6 +118,8 @@ def main() -> int: | |
MIN_NEW_POINTS_TO_RUN = config["cluster_estimation"]["min_new_points_to_run"] | ||
RANDOM_STATE = config["cluster_estimation"]["random_state"] | ||
|
||
COMMUNICATIONS_TIMEOUT = config["communications"]["timeout"] | ||
|
||
# pylint: enable=invalid-name | ||
except KeyError as exception: | ||
main_logger.error(f"Config key(s) not found: {exception}", True) | ||
|
@@ -141,6 +144,10 @@ def main() -> int: | |
mp_manager, | ||
QUEUE_MAX_SIZE, | ||
) | ||
flight_interface_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper( | ||
mp_manager, | ||
QUEUE_MAX_SIZE, | ||
) | ||
data_merge_to_geolocation_queue = queue_proxy_wrapper.QueueProxyWrapper( | ||
mp_manager, | ||
QUEUE_MAX_SIZE, | ||
|
@@ -153,11 +160,14 @@ def main() -> int: | |
mp_manager, | ||
QUEUE_MAX_SIZE, | ||
) | ||
cluster_estimation_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( | ||
cluster_estimation_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper( | ||
mp_manager, | ||
QUEUE_MAX_SIZE, | ||
) | ||
communications_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( | ||
mp_manager, | ||
QUEUE_MAX_SIZE, | ||
) | ||
|
||
result, camera_intrinsics = camera_properties.CameraIntrinsics.create( | ||
GEOLOCATION_RESOLUTION_X, | ||
GEOLOCATION_RESOLUTION_Y, | ||
|
@@ -238,7 +248,10 @@ def main() -> int: | |
FLIGHT_INTERFACE_WORKER_PERIOD, | ||
), | ||
input_queues=[flight_interface_decision_queue], | ||
output_queues=[flight_interface_to_data_merge_queue], | ||
output_queues=[ | ||
flight_interface_to_data_merge_queue, | ||
flight_interface_to_communications_queue, | ||
], | ||
controller=controller, | ||
local_logger=main_logger, | ||
) | ||
|
@@ -292,7 +305,7 @@ def main() -> int: | |
target=cluster_estimation_worker.cluster_estimation_worker, | ||
work_arguments=(MIN_ACTIVATION_THRESHOLD, MIN_NEW_POINTS_TO_RUN, RANDOM_STATE), | ||
input_queues=[geolocation_to_cluster_estimation_queue], | ||
output_queues=[cluster_estimation_to_main_queue], | ||
output_queues=[cluster_estimation_to_communications_queue], | ||
controller=controller, | ||
local_logger=main_logger, | ||
) | ||
|
@@ -303,6 +316,24 @@ def main() -> int: | |
# Get Pylance to stop complaining | ||
assert cluster_estimation_worker_properties is not None | ||
|
||
result, communications_worker_properties = worker_manager.WorkerProperties.create( | ||
count=1, | ||
target=communications_worker.communications_worker, | ||
work_arguments=(COMMUNICATIONS_TIMEOUT,), | ||
input_queues=[ | ||
flight_interface_to_communications_queue, | ||
cluster_estimation_to_communications_queue, | ||
], | ||
output_queues=[communications_to_main_queue], | ||
controller=controller, | ||
local_logger=main_logger, | ||
) | ||
if not result: | ||
main_logger.error("Failed to create arguments for Communications Worker", True) | ||
return -1 | ||
|
||
assert communications_worker_properties is not None | ||
|
||
# Create managers | ||
worker_managers = [] | ||
|
||
|
@@ -384,6 +415,19 @@ def main() -> int: | |
|
||
worker_managers.append(cluster_estimation_manager) | ||
|
||
result, communications_manager = worker_manager.WorkerManager.create( | ||
worker_properties=communications_worker_properties, | ||
local_logger=main_logger, | ||
) | ||
if not result: | ||
main_logger.error("Failed to create manager for Communications", True) | ||
return -1 | ||
|
||
# Get Pylance to stop complaining | ||
assert communications_manager is not None | ||
|
||
worker_managers.append(communications_manager) | ||
|
||
# Run | ||
for manager in worker_managers: | ||
manager.start_workers() | ||
|
@@ -396,13 +440,13 @@ def main() -> int: | |
return -1 | ||
|
||
try: | ||
cluster_estimations = cluster_estimation_to_main_queue.queue.get_nowait() | ||
cluster_estimations = communications_to_main_queue.queue.get_nowait() | ||
except queue.Empty: | ||
cluster_estimations = None | ||
|
||
if cluster_estimations is not None: | ||
for cluster in cluster_estimations: | ||
main_logger.debug("Cluster in world: " + True) | ||
main_logger.debug("Cluster in world: ", True) | ||
main_logger.debug("Cluster location x: " + str(cluster.location_x)) | ||
main_logger.debug("Cluster location y: " + str(cluster.location_y)) | ||
main_logger.debug("Cluster spherical variance: " + str(cluster.spherical_variance)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we remove all of this and just log |
||
|
@@ -416,10 +460,12 @@ def main() -> int: | |
video_input_to_detect_target_queue.fill_and_drain_queue() | ||
detect_target_to_data_merge_queue.fill_and_drain_queue() | ||
flight_interface_to_data_merge_queue.fill_and_drain_queue() | ||
flight_interface_to_communications_queue.fill_and_drain_queue() | ||
data_merge_to_geolocation_queue.fill_and_drain_queue() | ||
geolocation_to_cluster_estimation_queue.fill_and_drain_queue() | ||
cluster_estimation_to_communications_queue.fill_and_drain_queue() | ||
communications_to_main_queue.fill_and_drain_queue() | ||
flight_interface_decision_queue.fill_and_drain_queue() | ||
cluster_estimation_to_main_queue.fill_and_drain_queue() | ||
|
||
for manager in worker_managers: | ||
manager.join_workers() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
""" | ||
Logs data and forwards it. | ||
""" | ||
|
||
import time | ||
|
||
from .. import object_in_world | ||
from ..common.modules import position_global | ||
from ..common.modules import position_local | ||
Xierumeng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from ..common.modules.logger import logger | ||
from ..common.modules.mavlink import local_global_conversion | ||
|
||
|
||
class Communications: | ||
""" | ||
Currently logs data only. | ||
""" | ||
|
||
__create_key = object() | ||
|
||
@classmethod | ||
def create( | ||
cls, | ||
home_position: position_global.PositionGlobal, | ||
local_logger: logger.Logger, | ||
) -> "tuple[True, Communications] | tuple[False, None]": | ||
""" | ||
Logs data and forwards it. | ||
|
||
home_position: Take-off position of drone. | ||
|
||
Returns: Success, class object. | ||
""" | ||
|
||
return True, Communications(cls.__create_key, home_position, local_logger) | ||
|
||
def __init__( | ||
self, | ||
class_private_create_key: object, | ||
home_position: position_global.PositionGlobal, | ||
local_logger: logger.Logger, | ||
) -> None: | ||
""" | ||
Private constructor, use create() method. | ||
""" | ||
assert class_private_create_key is Communications.__create_key, "Use create() method" | ||
|
||
self.__home_position = home_position | ||
self.__logger = local_logger | ||
|
||
def run( | ||
self, | ||
objects_in_world: list[object_in_world.ObjectInWorld], | ||
) -> tuple[True, list[object_in_world.ObjectInWorld]] | tuple[False, None]: | ||
|
||
objects_in_world_global = [] | ||
for object_in_world in objects_in_world: | ||
north = object_in_world.location_x | ||
east = object_in_world.location_y | ||
down = 0.0 | ||
maxlou05 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
result, object_position_local = position_local.PositionLocal.create( | ||
north, | ||
east, | ||
down, | ||
) | ||
if not result: | ||
self.__logger.warning( | ||
f"Could not convert ObjectInWorld to PositionLocal:\nobject in world: {object_in_world}" | ||
) | ||
return False, None | ||
|
||
result, object_in_world_global = ( | ||
local_global_conversion.position_global_from_position_local( | ||
self.__home_position, object_position_local | ||
) | ||
) | ||
if not result: | ||
# Log nothing if at least one of the conversions failed | ||
self.__logger.warning( | ||
f"drone_position_global_from_local conversion failed:\nhome_position: {self.__home_position}\nobject_position_local: {object_position_local}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you still need to change this to position global from position local |
||
) | ||
return False, None | ||
|
||
objects_in_world_global.append(object_in_world_global) | ||
|
||
self.__logger.info(f"{time.time()}: {objects_in_world_global}") | ||
|
||
return True, objects_in_world |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
""" | ||
Logs data and forwards it. | ||
""" | ||
|
||
import os | ||
import pathlib | ||
import queue | ||
|
||
from . import communications | ||
from utilities.workers import queue_proxy_wrapper | ||
from utilities.workers import worker_controller | ||
from ..common.modules.logger import logger | ||
|
||
|
||
def communications_worker( | ||
timeout: float, | ||
home_position_queue: queue_proxy_wrapper.QueueProxyWrapper, | ||
input_queue: queue_proxy_wrapper.QueueProxyWrapper, | ||
output_queue: queue_proxy_wrapper.QueueProxyWrapper, | ||
controller: worker_controller.WorkerController, | ||
) -> None: | ||
""" | ||
Worker process. | ||
|
||
home_position_queue contains home positions for creating communications object. | ||
input_queue and output_queue are data queues. | ||
controller is how the main process communicates to this worker process. | ||
""" | ||
|
||
worker_name = pathlib.Path(__file__).stem | ||
process_id = os.getpid() | ||
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True) | ||
if not result: | ||
print("ERROR: Worker failed to create logger") | ||
return | ||
|
||
# Get Pylance to stop complaining | ||
assert local_logger is not None | ||
|
||
local_logger.info("Logger initialized", True) | ||
|
||
# Get home position | ||
try: | ||
home_position = home_position_queue.queue.get(timeout=timeout) | ||
except queue.Empty: | ||
maxlou05 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
local_logger.error("Home position queue timed out on startup", True) | ||
return | ||
|
||
local_logger.info(f"Home position received: {home_position}", True) | ||
|
||
result, comm = communications.Communications.create(home_position, local_logger) | ||
if not result: | ||
local_logger.error("Worker failed to create class object", True) | ||
return | ||
|
||
# Get Pylance to stop complaining | ||
assert comm is not None | ||
|
||
while not controller.is_exit_requested(): | ||
controller.check_pause() | ||
|
||
result, value = comm.run(input_queue.queue.get()) | ||
if not result: | ||
continue | ||
|
||
output_queue.queue.put(value) |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be in a separate PR in repository common. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please delete this. Evan or I will move it to post-processing |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
""" | ||
Convert log file to KML file. | ||
""" | ||
|
||
import pathlib | ||
import re | ||
|
||
from modules.common.modules.kml import kml_conversion | ||
from modules.common.modules import location_global | ||
|
||
|
||
def convert_log_to_kml( | ||
log_file: str, document_name_prefix: str, save_directory: str | ||
) -> "tuple[bool, pathlib.Path | None]": | ||
"""Given a log file with a specific format, return a corresponding KML file. | ||
|
||
Args: | ||
log_file (str): Path to the log file | ||
document_name_prefix (str): Prefix name for saved KML file. | ||
save_directory (str): Directory to save the KML file to. | ||
|
||
Returns: | ||
tuple[bool, pathlib.Path | None]: Returns (False, None) if function | ||
failed to execute, otherwise (True, path) where path a pathlib.Path | ||
object pointing to the KML file. | ||
""" | ||
locations = [] | ||
|
||
try: | ||
with open(log_file, "r") as f: | ||
for line in f: | ||
# find all the latitudes and longitudes within the line | ||
latitudes = re.findall(r"latitude: (-?\d+\.\d+)", line) | ||
longitudes = re.findall(r"longitude: (-?\d+\.\d+)", line) | ||
|
||
# we must find equal number of latitude and longitude numbers, | ||
# otherwise that means the log file is improperly formatted or | ||
# the script failed to detect all locations | ||
if len(latitudes) != len(longitudes): | ||
print("Number of latitudes and longitudes found are different.") | ||
print(f"# of altitudes: {len(latitudes)}, # of longitudes: {len(longitudes)}") | ||
return False, None | ||
|
||
latitudes = list(map(float, latitudes)) | ||
longitudes = list(map(float, longitudes)) | ||
|
||
for i in range(len(latitudes)): | ||
success, location = location_global.LocationGlobal.create( | ||
latitudes[i], longitudes[i] | ||
) | ||
if not success: | ||
return False, None | ||
locations.append(location) | ||
|
||
return kml_conversion.locations_to_kml(locations, document_name_prefix, save_directory) | ||
except Exception as e: | ||
print(e.with_traceback()) | ||
return False, None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this timeout longer, like 30 seconds? I feel like main_2024 takes a while to boot up