Skip to content

Commit

Permalink
reconfigure queues and processes
Browse files Browse the repository at this point in the history
  • Loading branch information
ashum68 committed Aug 22, 2024
1 parent 3710dc0 commit af01b19
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 51 deletions.
2 changes: 1 addition & 1 deletion config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Global constants for main.

queue_max_size: 10
obstacle_avoidance_mode: "simple"

flight_interface:
address: "/dev/ttyUSB0"
Expand All @@ -21,7 +22,6 @@ clustering:
max_cluster_distance: 0.2 # metres

data_merge:
merge_data_type: "OBSTACLES" # "OBSTACLES" or "DETECTIONS"
delay: 0.1 # seconds

decision:
Expand Down
133 changes: 83 additions & 50 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from modules.clustering import clustering_worker
from modules.data_merge import data_merge_worker
from modules.decision import decision_worker
from modules.deflection import deflection_worker
from modules.detection import detection_worker
from modules.flight_interface import flight_interface_worker
from worker import queue_wrapper
Expand Down Expand Up @@ -43,6 +44,7 @@ def main() -> int:
# Local constants
# pylint: disable=invalid-name
QUEUE_MAX_SIZE = config["queue_max_size"]
OBSTACLE_AVOIDANCE_MODE = config["obstacle_avoidance_mode"] # either "simple" or "normal"

FLIGHT_INTERFACE_ADDRESS = config["flight_interface"]["address"]
FLIGHT_INTERFACE_TIMEOUT = config["flight_interface"]["timeout"]
Expand All @@ -61,7 +63,6 @@ def main() -> int:

MAX_CLUSTER_DISTANCE = config["clustering"]["max_cluster_distance"]

MERGE_DATA_TYPE = config["data_merge"]["merge_data_type"]
DELAY = config["data_merge"]["delay"]

OBJECT_PROXIMITY_LIMIT = config["decision"]["object_proximity_limit"]
Expand All @@ -78,12 +79,9 @@ def main() -> int:

flight_interface_to_data_merge_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
detection_to_data_merge_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
detection_to_clustering_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
clustering_to_cluster_classification_queue = queue_wrapper.QueueWrapper(
mp_manager, QUEUE_MAX_SIZE
)
obstacle_to_data_merge_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
merged_to_decision_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
merged_to_decision_queue = None
merged_to_clustering_queue = None
clustering_to_deflection_queue = None
command_to_flight_interface_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)

flight_interface_process = mp.Process(
Expand All @@ -110,54 +108,85 @@ def main() -> int:
HIGH_ANGLE,
ROTATE_SPEED,
detection_to_data_merge_queue,
detection_to_clustering_queue,
controller,
),
)

clustering_process = mp.Process(
target=clustering_worker.clustering_worker,
args=(
MAX_CLUSTER_DISTANCE,
detection_to_clustering_queue,
clustering_to_cluster_classification_queue,
controller,
),
)

# cluster_classification_process will go here.

data_merge_process = mp.Process(
target=data_merge_worker.data_merge_worker,
args=(
MERGE_DATA_TYPE,
DELAY,
detection_to_data_merge_queue,
obstacle_to_data_merge_queue,
flight_interface_to_data_merge_queue,
merged_to_decision_queue,
controller,
),
)

decision_process = mp.Process(
target=decision_worker.decision_worker,
args=(
OBJECT_PROXIMITY_LIMIT,
MAX_HISTORY,
COMMAND_TIMEOUT,
merged_to_decision_queue,
command_to_flight_interface_queue,
controller,
),
)
data_merge_process = None
clustering_process = None
deflection_process = None
decision_process = None

if OBSTACLE_AVOIDANCE_MODE == "simple":
merged_to_decision_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)

data_merge_process = mp.Process(
target=data_merge_worker.data_merge_worker,
args=(
DELAY,
detection_to_data_merge_queue,
flight_interface_to_data_merge_queue,
merged_to_decision_queue,
controller,
),
)

decision_process = mp.Process(
target=decision_worker.decision_worker,
args=(
OBJECT_PROXIMITY_LIMIT,
MAX_HISTORY,
COMMAND_TIMEOUT,
merged_to_decision_queue,
command_to_flight_interface_queue,
controller,
),
)

elif OBSTACLE_AVOIDANCE_MODE == "normal":
merged_to_clustering_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
clustering_to_deflection_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)

data_merge_process = mp.Process(
target=data_merge_worker.data_merge_worker,
args=(
DELAY,
detection_to_data_merge_queue,
flight_interface_to_data_merge_queue,
merged_to_clustering_queue,
controller,
),
)

clustering_process = mp.Process(
target=clustering_worker.clustering_worker,
args=(
MAX_CLUSTER_DISTANCE,
merged_to_clustering_queue,
clustering_to_deflection_queue,
controller,
),
)

deflection_process = mp.Process(
target=deflection_worker.deflection_worker,
args=(
clustering_to_deflection_queue,
command_to_flight_interface_queue,
controller,
),
)

# Run
flight_interface_process.start()
detection_process.start()
clustering_process.start()
if clustering_process is not None:
clustering_process.start()
data_merge_process.start()
decision_process.start()
if decision_process is not None:
decision_process.start()
if deflection_process is not None:
deflection_process.start()

while True:
try:
Expand All @@ -170,16 +199,20 @@ def main() -> int:
# Teardown
flight_interface_to_data_merge_queue.fill_and_drain_queue()
detection_to_data_merge_queue.fill_and_drain_queue()
detection_to_clustering_queue.fill_and_drain_queue()
clustering_to_cluster_classification_queue.fill_and_drain_queue()
merged_to_decision_queue.fill_and_drain_queue()
merged_to_clustering_queue.fill_and_drain_queue()
clustering_to_deflection_queue.fill_and_drain_queue()
command_to_flight_interface_queue.fill_and_drain_queue()

flight_interface_process.join()
detection_process.join()
clustering_process.join()
if clustering_process is not None:
clustering_process.join()
data_merge_process.join()
decision_process.join()
if decision_process is not None:
decision_process.join()
if deflection_process is not None:
deflection_process.join()

return 0

Expand Down
Empty file added modules/deflection/__init__.py
Empty file.
20 changes: 20 additions & 0 deletions modules/deflection/deflection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""
Deflection.
"""


class Deflection:
"""
Deflection.
"""

def __init__(self) -> None:
"""
Initialization.
"""

def run(self) -> "tuple[False, None]":
"""
Run.
"""
return False, None
36 changes: 36 additions & 0 deletions modules/deflection/deflection_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""
Gets obstacles and odometry and outputs a decision.
"""

from modules import obstacles_and_odometry
from worker import queue_wrapper
from worker import worker_controller
from . import deflection


def deflection_worker(
cluster_in_queue: queue_wrapper.QueueWrapper,
command_out_queue: queue_wrapper.QueueWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process.
cluster_in_queue, command_out_queue are data queues.
controller is how the main process communicates to this worker process.
"""

deflecter = deflection.Deflection()

while not controller.is_exit_requested():
controller.check_pause()

cluster_data: obstacles_and_odometry.ObstaclesAndOdometry = cluster_in_queue.queue.get()
if cluster_data is None:
break

result, value = deflecter.run(cluster_data)
if not result:
continue

command_out_queue.queue.put(value)

0 comments on commit af01b19

Please sign in to comment.