Skip to content

Commit

Permalink
clustering integration
Browse files Browse the repository at this point in the history
  • Loading branch information
ashum68 committed Aug 21, 2024
1 parent 2276ef2 commit bf9a02c
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 28 deletions.
4 changes: 4 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ detection:
high_angle: 170 # degrees
rotate_speed: 5

clustering:
max_cluster_distance: 0.2 # metres

data_merge:
merge_data_type: "OBSTACLES" # "OBSTACLES" or "DETECTIONS"
delay: 0.1 # seconds

decision:
Expand Down
28 changes: 28 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import yaml

from modules.clustering import clustering_worker
from modules.data_merge import data_merge_worker
from modules.decision import decision_worker
from modules.detection import detection_worker
Expand Down Expand Up @@ -55,6 +56,9 @@ def main() -> int:
HIGH_ANGLE = config["detection"]["high_angle"]
ROTATE_SPEED = config["detection"]["rotate_speed"]

MAX_CLUSTER_DISTANCE = config["clustering"]["max_cluster_distance"]

MERGE_DATA_TYPE = config["data_merge"]["merge_data_type"]
DELAY = config["data_merge"]["delay"]

OBJECT_PROXIMITY_LIMIT = config["decision"]["object_proximity_limit"]
Expand All @@ -71,6 +75,11 @@ def main() -> int:

flight_interface_to_data_merge_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
detection_to_data_merge_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
detection_to_clustering_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
clustering_to_cluster_classification_queue = queue_wrapper.QueueWrapper(
mp_manager, QUEUE_MAX_SIZE
)
obstacle_to_data_merge_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
merged_to_decision_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)
command_to_flight_interface_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE)

Expand All @@ -97,15 +106,30 @@ def main() -> int:
HIGH_ANGLE,
ROTATE_SPEED,
detection_to_data_merge_queue,
detection_to_clustering_queue,
controller,
),
)

clustering_process = mp.Process(
target=clustering_worker.clustering_worker,
args=(
MAX_CLUSTER_DISTANCE,
detection_to_clustering_queue,
clustering_to_cluster_classification_queue,
controller,
),
)

# cluster_classification_process will go here.

data_merge_process = mp.Process(
target=data_merge_worker.data_merge_worker,
args=(
MERGE_DATA_TYPE,
DELAY,
detection_to_data_merge_queue,
obstacle_to_data_merge_queue,
flight_interface_to_data_merge_queue,
merged_to_decision_queue,
controller,
Expand All @@ -127,6 +151,7 @@ def main() -> int:
# Run
flight_interface_process.start()
detection_process.start()
clustering_process.start()
data_merge_process.start()
decision_process.start()

Expand All @@ -141,11 +166,14 @@ def main() -> int:
# Teardown
flight_interface_to_data_merge_queue.fill_and_drain_queue()
detection_to_data_merge_queue.fill_and_drain_queue()
detection_to_clustering_queue.fill_and_drain_queue()
clustering_to_cluster_classification_queue.fill_and_drain_queue()
merged_to_decision_queue.fill_and_drain_queue()
command_to_flight_interface_queue.fill_and_drain_queue()

flight_interface_process.join()
detection_process.join()
clustering_process.join()
data_merge_process.join()
decision_process.join()

Expand Down
84 changes: 67 additions & 17 deletions modules/data_merge/data_merge_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,34 @@
Merges local drone odometry with LiDAR detections
"""

import enum
import queue
import time

from worker import queue_wrapper
from worker import worker_controller

from modules import detections_and_odometry
from modules import drone_odometry_local
from modules import lidar_detection
from modules import obstacle
from modules import obstacles_and_odometry

from worker import queue_wrapper
from worker import worker_controller


class MergeDataType(enum.Enum):
"""
Types of data to merge with odometry.
"""

OBSTACLES = 0
DETECTIONS = 0


def data_merge_worker(
delay: float,
merge_data_type: str,
detection_input_queue: queue_wrapper.QueueWrapper,
obstacle_input_queue: queue_wrapper.QueueWrapper,
odometry_input_queue: queue_wrapper.QueueWrapper,
output_queue: queue_wrapper.QueueWrapper,
controller: worker_controller.WorkerController,
Expand All @@ -27,28 +41,64 @@ def data_merge_worker(
detection_input_queue, odometry_input_queue, output_queue are data queues.
controller is how the main process communicates to this worker process.
"""
if merge_data_type == "OBSTACLES":
merge_data_type = MergeDataType.OBSTACLES
elif merge_data_type == "DETECTIONS":
merge_data_type = MergeDataType.DETECTIONS

detections = []
obstacles = []
while not controller.is_exit_requested():
controller.check_pause()

try:
detection: lidar_detection.LidarDetection = detection_input_queue.queue.get_nowait()
detections.append(detection)
except queue.Empty:
time.sleep(delay)
if merge_data_type == MergeDataType.OBSTACLES:
try:
new_obstacle: obstacle.Obstacle = obstacle_input_queue.queue.get_nowait()
obstacles.append(new_obstacle)
except queue.Empty:
if len(obstacles) == 0:
continue
time.sleep(delay)

try:
odometry: drone_odometry_local.DroneOdometryLocal = (
odometry_input_queue.queue.get_nowait()
try:
odometry: drone_odometry_local.DroneOdometryLocal = (
odometry_input_queue.queue.get_nowait()
)
except queue.Empty:
continue

result, merged = obstacles_and_odometry.ObstaclesAndOdometry.create(
detections, odometry
)
if not result:
continue
obstacles = []

elif merge_data_type == MergeDataType.DETECTIONS:
try:
detection: lidar_detection.LidarDetection = detection_input_queue.queue.get_nowait()
detections.append(detection)
except queue.Empty:
if len(detections) == 0:
continue
time.sleep(delay)

except queue.Empty:
continue
try:
odometry: drone_odometry_local.DroneOdometryLocal = (
odometry_input_queue.queue.get_nowait()
)

result, merged = detections_and_odometry.DetectionsAndOdometry.create(detections, odometry)
except queue.Empty:
continue

if not result:
continue
result, merged = detections_and_odometry.DetectionsAndOdometry.create(
detections, odometry
)
if not result:
continue
detections = []
else:
# log error
return

detections = []
output_queue.queue.put(merged)
32 changes: 24 additions & 8 deletions modules/decision/decision.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .. import decision_command
from .. import detections_and_odometry
from .. import drone_odometry_local
from .. import obstacles_and_odometry


class Decision:
Expand All @@ -20,22 +21,22 @@ def __init__(self, proximity_limit: float, max_history: int, command_timeout: fl
Initialize current drone state and its lidar detections list.
"""
self.proximity_limit = proximity_limit
self.detections_and_odometries = deque(maxlen=max_history)
self.merged_odometries = deque(maxlen=max_history)
self.command_timeout = command_timeout
self.__command_requested = False
self.__last_command_sent = None

def run_simple_decision(
self,
detections_and_odometries: "deque[detections_and_odometry.DetectionsAndOdometry]",
merged_odometries: "deque[detections_and_odometry.DetectionsAndOdometry]",
proximity_limit: float,
current_flight_mode: drone_odometry_local.FlightMode,
) -> "tuple[bool, decision_command.DecisionCommand | None]":
"""
Runs simple collision avoidance where drone will stop within a set distance of an object.
"""
start_time = 0
for lidar_scan_and_odometry in detections_and_odometries:
for lidar_scan_and_odometry in merged_odometries:
detections = lidar_scan_and_odometry.detections

if self.__command_requested and self.__last_command_sent == current_flight_mode:
Expand Down Expand Up @@ -73,14 +74,29 @@ def run_simple_decision(
)
return False, None

def run_obstacle_avoidance(
self, obstacles: "deque[obstacles_and_odometry.ObstaclesAndOdometry]"
) -> "tuple[False, None]":
"""
Run obstacle avoidance algorithm.
"""
# TODO
print(obstacles)
return False, None

def run(
self, merged_data: detections_and_odometry.DetectionsAndOdometry
self,
merged_data: "detections_and_odometry.DetectionsAndOdometry | obstacles_and_odometry.ObstaclesAndOdometry",
) -> "tuple[bool, decision_command.DecisionCommand | None]":
"""
Run obstacle avoidance.
"""
current_flight_mode = merged_data.odometry.flight_mode
self.detections_and_odometries.append(merged_data)
return self.run_simple_decision(
self.detections_and_odometries, self.proximity_limit, current_flight_mode
)
self.merged_odometries.append(merged_data)
if str(type(merged_data)) == "<class 'detections_and_odometry.DetectionsAndOdometry'>":
return self.run_simple_decision(
self.merged_odometries, self.proximity_limit, current_flight_mode
)
if str(type(merged_data)) == "<class 'obstacles_and_odometry.ObstaclesAndOdometry'>":
return self.run_obstacle_avoidance(self.merged_odometries)
return False, None
14 changes: 13 additions & 1 deletion modules/decision/decision_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

from modules import detections_and_odometry
from modules import obstacle
from worker import queue_wrapper
from worker import worker_controller
from . import decision
Expand All @@ -13,6 +14,7 @@ def decision_worker(
max_history: int,
command_timeout: float,
merged_in_queue: queue_wrapper.QueueWrapper,
obstacle_in_queue: queue_wrapper.QueueWrapper,
command_out_queue: queue_wrapper.QueueWrapper,
controller: worker_controller.WorkerController,
) -> None:
Expand All @@ -29,13 +31,23 @@ def decision_worker(
while not controller.is_exit_requested():
controller.check_pause()

merged_data: detections_and_odometry.DetectionsAndOdometry = merged_in_queue.queue.get()
merged_data: detections_and_odometry.DetectionsAndOdometry = (
merged_in_queue.queue.get_nowait()
)
if merged_data is None:
break

obstacle_data: obstacle.Obstacle = obstacle_in_queue.queue.get_nowait()
if obstacle_data is None:
break

result, value = decider.run(merged_data)
if not result:
continue

result, value = decider.run_obstacle_avoidance(obstacle_data)
if not result:
continue

print(f"Decision: Command sent: {value.command}")
command_out_queue.queue.put(value)
6 changes: 4 additions & 2 deletions modules/detection/detection_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def detection_worker(
low_angle: float,
high_angle: float,
rotate_speed: int,
output_queue: queue_wrapper.QueueWrapper,
detection_to_clustering_queue: queue_wrapper.QueueWrapper,
detection_to_data_merge_queue: queue_wrapper.QueueWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Expand Down Expand Up @@ -46,4 +47,5 @@ def detection_worker(
if not result:
continue

output_queue.queue.put(value)
detection_to_clustering_queue.queue.put(value)
detection_to_data_merge_queue.queue.put(value)

0 comments on commit bf9a02c

Please sign in to comment.