Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a communication worker for logging cluster estimation #227

Merged
merged 32 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4ef1755
initial commit
davisliu2006 Nov 1, 2024
99f2d28
Made fixes
AshishA26 Nov 1, 2024
b50d714
Modify home location to be part of create for flight interface and co…
maxlou05 Nov 2, 2024
39c55f0
combined all logging together
maxlou05 Nov 2, 2024
1bd2cb6
Add log_to_kml.py
Evang264 Nov 14, 2024
89a7221
Refactor for common changes
wdan31 Nov 20, 2024
0e8263d
Fixed communications module imports and implemented object position l…
wdan31 Nov 21, 2024
de18002
Added communications logger for logging cluster estimation
wdan31 Nov 21, 2024
a0b0560
Formatted python
wdan31 Nov 21, 2024
1173347
Added communcations worker manager
wdan31 Nov 21, 2024
7cf4667
Cluster estimations takes from communication to main queue
wdan31 Nov 21, 2024
16cc619
Fixed communications worker
wdan31 Nov 21, 2024
8ff6035
Reformatted kml conversion
wdan31 Nov 21, 2024
7562a88
initial commit
davisliu2006 Nov 1, 2024
5599ccc
Made fixes
AshishA26 Nov 1, 2024
9d3b588
Modify home location to be part of create for flight interface and co…
maxlou05 Nov 2, 2024
2501414
combined all logging together
maxlou05 Nov 2, 2024
e4029e7
Add log_to_kml.py
Evang264 Nov 14, 2024
9ede6b4
Refactor for common changes
wdan31 Nov 20, 2024
2be3b1d
Fixed communications module imports and implemented object position l…
wdan31 Nov 21, 2024
dea2413
Added communications logger for logging cluster estimation
wdan31 Nov 21, 2024
76ab64f
Formatted python
wdan31 Nov 21, 2024
194f0d8
Added communcations worker manager
wdan31 Nov 21, 2024
df2d4de
Cluster estimations takes from communication to main queue
wdan31 Nov 21, 2024
e214595
Fixed communications worker
wdan31 Nov 21, 2024
92c3299
Reformatted kml conversion
wdan31 Nov 21, 2024
c7ca9c7
Fixed logging and types in communications modules
wdan31 Nov 22, 2024
ce99100
Added timeout to communications worker
wdan31 Nov 22, 2024
4ebfdae
Merge branch 'comm-worker-logs-cluster' of https://github.com/UWARG/c…
wdan31 Nov 22, 2024
55755ca
Reworded logs from communications
wdan31 Nov 23, 2024
affadfb
Reworded logs from communications
wdan31 Nov 23, 2024
9e8df82
Updated cluster estimation logging
wdan31 Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ cluster_estimation:
min_activation_threshold: 25
min_new_points_to_run: 5
random_state: 0

communications:
timeout: 10.0 # seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this timeout longer, like 30 seconds? I feel like main_2024 takes a while to boot up

64 changes: 55 additions & 9 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# Used in type annotation of flight interface output
# pylint: disable-next=unused-import
from modules import odometry_and_time
from modules.communications import communications_worker
from modules.detect_target import detect_target_factory
from modules.detect_target import detect_target_worker
from modules.flight_interface import flight_interface_worker
Expand Down Expand Up @@ -86,8 +87,8 @@ def main() -> int:
VIDEO_INPUT_SAVE_PREFIX = str(pathlib.Path(logging_path, VIDEO_INPUT_SAVE_NAME_PREFIX))

DETECT_TARGET_WORKER_COUNT = config["detect_target"]["worker_count"]
detect_target_option_int = config["detect_target"]["option"]
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(detect_target_option_int)
DETECT_TARGET_OPTION_INT = config["detect_target"]["option"]
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(DETECT_TARGET_OPTION_INT)
DETECT_TARGET_DEVICE = "cpu" if args.cpu else config["detect_target"]["device"]
DETECT_TARGET_MODEL_PATH = config["detect_target"]["model_path"]
DETECT_TARGET_OVERRIDE_FULL_PRECISION = args.full
Expand Down Expand Up @@ -117,6 +118,8 @@ def main() -> int:
MIN_NEW_POINTS_TO_RUN = config["cluster_estimation"]["min_new_points_to_run"]
RANDOM_STATE = config["cluster_estimation"]["random_state"]

COMMUNICATIONS_TIMEOUT = config["communications"]["timeout"]

# pylint: enable=invalid-name
except KeyError as exception:
main_logger.error(f"Config key(s) not found: {exception}", True)
Expand All @@ -141,6 +144,10 @@ def main() -> int:
mp_manager,
QUEUE_MAX_SIZE,
)
flight_interface_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)
data_merge_to_geolocation_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
Expand All @@ -153,11 +160,14 @@ def main() -> int:
mp_manager,
QUEUE_MAX_SIZE,
)
cluster_estimation_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper(
cluster_estimation_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)
communications_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)

result, camera_intrinsics = camera_properties.CameraIntrinsics.create(
GEOLOCATION_RESOLUTION_X,
GEOLOCATION_RESOLUTION_Y,
Expand Down Expand Up @@ -238,7 +248,10 @@ def main() -> int:
FLIGHT_INTERFACE_WORKER_PERIOD,
),
input_queues=[flight_interface_decision_queue],
output_queues=[flight_interface_to_data_merge_queue],
output_queues=[
flight_interface_to_data_merge_queue,
flight_interface_to_communications_queue,
],
controller=controller,
local_logger=main_logger,
)
Expand Down Expand Up @@ -292,7 +305,7 @@ def main() -> int:
target=cluster_estimation_worker.cluster_estimation_worker,
work_arguments=(MIN_ACTIVATION_THRESHOLD, MIN_NEW_POINTS_TO_RUN, RANDOM_STATE),
input_queues=[geolocation_to_cluster_estimation_queue],
output_queues=[cluster_estimation_to_main_queue],
output_queues=[cluster_estimation_to_communications_queue],
controller=controller,
local_logger=main_logger,
)
Expand All @@ -303,6 +316,24 @@ def main() -> int:
# Get Pylance to stop complaining
assert cluster_estimation_worker_properties is not None

result, communications_worker_properties = worker_manager.WorkerProperties.create(
count=1,
target=communications_worker.communications_worker,
work_arguments=(COMMUNICATIONS_TIMEOUT,),
input_queues=[
flight_interface_to_communications_queue,
cluster_estimation_to_communications_queue,
],
output_queues=[communications_to_main_queue],
controller=controller,
local_logger=main_logger,
)
if not result:
main_logger.error("Failed to create arguments for Communications Worker", True)
return -1

assert communications_worker_properties is not None

# Create managers
worker_managers = []

Expand Down Expand Up @@ -384,6 +415,19 @@ def main() -> int:

worker_managers.append(cluster_estimation_manager)

result, communications_manager = worker_manager.WorkerManager.create(
worker_properties=communications_worker_properties,
local_logger=main_logger,
)
if not result:
main_logger.error("Failed to create manager for Communications", True)
return -1

# Get Pylance to stop complaining
assert communications_manager is not None

worker_managers.append(communications_manager)

# Run
for manager in worker_managers:
manager.start_workers()
Expand All @@ -396,13 +440,13 @@ def main() -> int:
return -1

try:
cluster_estimations = cluster_estimation_to_main_queue.queue.get_nowait()
cluster_estimations = communications_to_main_queue.queue.get_nowait()
except queue.Empty:
cluster_estimations = None

if cluster_estimations is not None:
for cluster in cluster_estimations:
main_logger.debug("Cluster in world: " + True)
main_logger.debug("Cluster in world: ", True)
main_logger.debug("Cluster location x: " + str(cluster.location_x))
main_logger.debug("Cluster location y: " + str(cluster.location_y))
main_logger.debug("Cluster spherical variance: " + str(cluster.spherical_variance))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove all of this and just log logger.debug(f"clusters: {detections_in_world}") as it is in cluster_estimation_worker?

Expand All @@ -416,10 +460,12 @@ def main() -> int:
video_input_to_detect_target_queue.fill_and_drain_queue()
detect_target_to_data_merge_queue.fill_and_drain_queue()
flight_interface_to_data_merge_queue.fill_and_drain_queue()
flight_interface_to_communications_queue.fill_and_drain_queue()
data_merge_to_geolocation_queue.fill_and_drain_queue()
geolocation_to_cluster_estimation_queue.fill_and_drain_queue()
cluster_estimation_to_communications_queue.fill_and_drain_queue()
communications_to_main_queue.fill_and_drain_queue()
flight_interface_decision_queue.fill_and_drain_queue()
cluster_estimation_to_main_queue.fill_and_drain_queue()

for manager in worker_managers:
manager.join_workers()
Expand Down
89 changes: 89 additions & 0 deletions modules/communications/communications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""
Logs data and forwards it.
"""

import time

from .. import object_in_world
from ..common.modules import position_global
from ..common.modules import position_local
Xierumeng marked this conversation as resolved.
Show resolved Hide resolved
from ..common.modules.logger import logger
from ..common.modules.mavlink import local_global_conversion


class Communications:
"""
Currently logs data only.
"""

__create_key = object()

@classmethod
def create(
cls,
home_position: position_global.PositionGlobal,
local_logger: logger.Logger,
) -> "tuple[True, Communications] | tuple[False, None]":
"""
Logs data and forwards it.

home_position: Take-off position of drone.

Returns: Success, class object.
"""

return True, Communications(cls.__create_key, home_position, local_logger)

def __init__(
self,
class_private_create_key: object,
home_position: position_global.PositionGlobal,
local_logger: logger.Logger,
) -> None:
"""
Private constructor, use create() method.
"""
assert class_private_create_key is Communications.__create_key, "Use create() method"

self.__home_position = home_position
self.__logger = local_logger

def run(
self,
objects_in_world: list[object_in_world.ObjectInWorld],
) -> tuple[True, list[object_in_world.ObjectInWorld]] | tuple[False, None]:

objects_in_world_global = []
for object_in_world in objects_in_world:
north = object_in_world.location_x
east = object_in_world.location_y
down = 0.0
maxlou05 marked this conversation as resolved.
Show resolved Hide resolved

result, object_position_local = position_local.PositionLocal.create(
north,
east,
down,
)
if not result:
self.__logger.warning(
f"Could not convert ObjectInWorld to PositionLocal:\nobject in world: {object_in_world}"
)
return False, None

result, object_in_world_global = (
local_global_conversion.position_global_from_position_local(
self.__home_position, object_position_local
)
)
if not result:
# Log nothing if at least one of the conversions failed
self.__logger.warning(
f"drone_position_global_from_local conversion failed:\nhome_position: {self.__home_position}\nobject_position_local: {object_position_local}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you still need to change this to position global from position local

)
return False, None

objects_in_world_global.append(object_in_world_global)

self.__logger.info(f"{time.time()}: {objects_in_world_global}")

return True, objects_in_world
66 changes: 66 additions & 0 deletions modules/communications/communications_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
Logs data and forwards it.
"""

import os
import pathlib
import queue

from . import communications
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from ..common.modules.logger import logger


def communications_worker(
timeout: float,
home_position_queue: queue_proxy_wrapper.QueueProxyWrapper,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process.

home_position_queue contains home positions for creating communications object.
input_queue and output_queue are data queues.
controller is how the main process communicates to this worker process.
"""

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
print("ERROR: Worker failed to create logger")
return

# Get Pylance to stop complaining
assert local_logger is not None

local_logger.info("Logger initialized", True)

# Get home position
try:
home_position = home_position_queue.queue.get(timeout=timeout)
except queue.Empty:
maxlou05 marked this conversation as resolved.
Show resolved Hide resolved
local_logger.error("Home position queue timed out on startup", True)
return

local_logger.info(f"Home position received: {home_position}", True)

result, comm = communications.Communications.create(home_position, local_logger)
if not result:
local_logger.error("Worker failed to create class object", True)
return

# Get Pylance to stop complaining
assert comm is not None

while not controller.is_exit_requested():
controller.check_pause()

result, value = comm.run(input_queue.queue.get())
if not result:
continue

output_queue.queue.put(value)
58 changes: 58 additions & 0 deletions modules/communications/log_to_kml.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in a separate PR in repository common.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete this. Evan or I will move it to post-processing

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
Convert log file to KML file.
"""

import pathlib
import re

from modules.common.modules.kml import kml_conversion
from modules.common.modules import location_global


def convert_log_to_kml(
log_file: str, document_name_prefix: str, save_directory: str
) -> "tuple[bool, pathlib.Path | None]":
"""Given a log file with a specific format, return a corresponding KML file.

Args:
log_file (str): Path to the log file
document_name_prefix (str): Prefix name for saved KML file.
save_directory (str): Directory to save the KML file to.

Returns:
tuple[bool, pathlib.Path | None]: Returns (False, None) if function
failed to execute, otherwise (True, path) where path a pathlib.Path
object pointing to the KML file.
"""
locations = []

try:
with open(log_file, "r") as f:
for line in f:
# find all the latitudes and longitudes within the line
latitudes = re.findall(r"latitude: (-?\d+\.\d+)", line)
longitudes = re.findall(r"longitude: (-?\d+\.\d+)", line)

# we must find equal number of latitude and longitude numbers,
# otherwise that means the log file is improperly formatted or
# the script failed to detect all locations
if len(latitudes) != len(longitudes):
print("Number of latitudes and longitudes found are different.")
print(f"# of altitudes: {len(latitudes)}, # of longitudes: {len(longitudes)}")
return False, None

latitudes = list(map(float, latitudes))
longitudes = list(map(float, longitudes))

for i in range(len(latitudes)):
success, location = location_global.LocationGlobal.create(
latitudes[i], longitudes[i]
)
if not success:
return False, None
locations.append(location)

return kml_conversion.locations_to_kml(locations, document_name_prefix, save_directory)
except Exception as e:
print(e.with_traceback())
return False, None
8 changes: 7 additions & 1 deletion modules/flight_interface/flight_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def create(
# Get Pylance to stop complaining
assert home_position is not None

local_logger.info(str(home_position), True)
local_logger.info(f"Home position: {home_position}", True)

return True, FlightInterface(cls.__create_key, controller, home_position, local_logger)

Expand All @@ -67,6 +67,12 @@ def __init__(
self.__home_position = home_position
self.__logger = local_logger

def get_home_position(self) -> position_global.PositionGlobal:
"""
Accessor for home position.
"""
return self.__home_position

def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":
"""
Returns a possible OdometryAndTime with current timestamp.
Expand Down
Loading