-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Created classes to represent an oscillation of LiDAR Readings * Fixed Linting, Created classes to represent lidar oscillations * Adding lidar parsing and oscillation detection logic * Added lidar_parser module * Delete modules/lidar_oscillation.py * Made changes based on first review * Minor changes: break->continue if lidar_reading is None * fixed linting for break -> continue * Fixed Pylint Errors, function docstrings * Fixed oscillation boundary logic * Added Integration test based on simulated LiDAR readings * fix minor review changes for unit tests
- Loading branch information
Showing
4 changed files
with
431 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
""" | ||
Module to parse LiDAR data, detect oscillations, and return oscillation objects. | ||
""" | ||
|
||
import enum | ||
|
||
from modules import lidar_detection | ||
from modules import lidar_oscillation | ||
|
||
|
||
class Direction(enum.Enum): | ||
""" | ||
Enum for LiDAR scan direction. | ||
""" | ||
|
||
UP = 1 | ||
DOWN = 2 | ||
NONE = 3 | ||
|
||
|
||
class LidarParser: | ||
""" | ||
Class to handle parsing of LiDAR data stream and detecting complete oscillations. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
""" | ||
Private constructor for LidarParser. Use create() method. | ||
""" | ||
|
||
self.lidar_readings = [] | ||
self.current_oscillation = None | ||
|
||
self.last_angle = None | ||
self.direction = Direction.NONE | ||
|
||
def run( | ||
self, detection: lidar_detection.LidarDetection | ||
) -> "tuple[bool, lidar_oscillation.LidarOscillation | None]": | ||
""" | ||
Process a single LidarDetection and return the oscillation if complete. | ||
""" | ||
current_angle = detection.angle | ||
|
||
if self.last_angle is None: | ||
self.last_angle = current_angle | ||
self.lidar_readings.append(detection) | ||
return False, None | ||
|
||
# Detect oscillation on angle change with correct direction reset | ||
if current_angle > self.last_angle and self.direction == Direction.DOWN: | ||
result, oscillation = lidar_oscillation.LidarOscillation.create(self.lidar_readings) | ||
self.direction = Direction.UP | ||
self.lidar_readings = [detection] | ||
self.last_angle = current_angle | ||
return result, oscillation | ||
|
||
if current_angle < self.last_angle and self.direction == Direction.UP: | ||
result, oscillation = lidar_oscillation.LidarOscillation.create(self.lidar_readings) | ||
self.direction = Direction.DOWN | ||
self.lidar_readings = [detection] | ||
self.last_angle = current_angle | ||
return result, oscillation | ||
|
||
if self.direction is Direction.NONE: | ||
self.direction = Direction.UP if current_angle > self.last_angle else Direction.DOWN | ||
|
||
self.lidar_readings.append(detection) | ||
self.last_angle = current_angle | ||
return False, None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
""" | ||
Gets detections and outputs oscillations when complete. | ||
""" | ||
|
||
from . import lidar_parser | ||
from modules import lidar_detection | ||
from modules import lidar_oscillation | ||
from worker import queue_wrapper | ||
from worker import worker_controller | ||
|
||
|
||
def lidar_oscillation_worker( | ||
detection_in_queue: queue_wrapper.QueueWrapper, | ||
oscillation_out_queue: queue_wrapper.QueueWrapper, | ||
controller: worker_controller.WorkerController, | ||
) -> None: | ||
""" | ||
Feeding LidarParser continuously with a stream of LidarDetection. | ||
""" | ||
|
||
parser = lidar_parser.LidarParser() | ||
if not parser: | ||
print("Failed to initialise LidarParser.") | ||
return | ||
|
||
while not controller.is_exit_requested(): | ||
controller.check_pause() | ||
|
||
lidar_reading: lidar_detection.LidarDetection = detection_in_queue.queue.get() | ||
if lidar_reading is None: | ||
break | ||
result, oscillation = parser.run(lidar_reading) | ||
if not result: | ||
continue | ||
|
||
print(f"Oscillation sent to VFH") | ||
oscillation_out_queue.queue.put(oscillation) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
""" | ||
Integration test for lidar_parser_worker. | ||
""" | ||
|
||
import multiprocessing as mp | ||
import queue | ||
|
||
from modules import lidar_detection | ||
from modules import lidar_oscillation | ||
from modules.lidar_parser import lidar_parser_worker | ||
from worker import queue_wrapper | ||
from worker import worker_controller | ||
|
||
# Constants | ||
QUEUE_MAX_SIZE = 10 | ||
|
||
# pylint: disable=duplicate-code | ||
|
||
|
||
def simulate_lidar_detection_worker(detection_in_queue: queue_wrapper.QueueWrapper) -> None: | ||
""" | ||
Simulates a LiDAR detection stream and places it into the input queue. | ||
""" | ||
# Simulate LiDAR detections with multiple oscillations | ||
for angle in range(-90, 91, 2): | ||
distance = 20.0 if angle < 0 else 7.5 | ||
result, detection = lidar_detection.LidarDetection.create(distance, angle) | ||
assert result | ||
assert detection is not None | ||
detection_in_queue.queue.put(detection) | ||
|
||
for angle in range(89, -86, -2): | ||
distance = 7.5 if angle >= 0 else 20.0 | ||
result, detection = lidar_detection.LidarDetection.create(distance, angle) | ||
assert result | ||
assert detection is not None | ||
detection_in_queue.queue.put(detection) | ||
|
||
for angle in range(-84, 91, 2): | ||
distance = 20.0 if angle < 0 else 7.5 | ||
result, detection = lidar_detection.LidarDetection.create(distance, angle) | ||
assert result | ||
assert detection is not None | ||
detection_in_queue.queue.put(detection) | ||
|
||
for angle in range(89, 59, -6): | ||
result, detection = lidar_detection.LidarDetection.create(7.5, angle) | ||
assert result | ||
assert detection is not None | ||
detection_in_queue.queue.put(detection) | ||
assert result | ||
assert detection is not None | ||
detection_in_queue.queue.put(detection) | ||
|
||
detection_in_queue.queue.put(None) | ||
|
||
|
||
def main() -> int: | ||
""" | ||
Main function to test lidar_parser_worker. | ||
""" | ||
# Setup | ||
controller = worker_controller.WorkerController() | ||
mp_manager = mp.Manager() | ||
|
||
detection_in_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) | ||
oscillation_out_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) | ||
|
||
worker = mp.Process( | ||
target=lidar_parser_worker.lidar_parser_worker, | ||
args=(detection_in_queue, oscillation_out_queue, controller), | ||
) | ||
|
||
# Run | ||
worker.start() | ||
|
||
simulate_lidar_detection_worker(detection_in_queue) | ||
|
||
# Test | ||
oscillation_count = 0 | ||
while True: | ||
try: | ||
oscillation: lidar_oscillation.LidarOscillation = ( | ||
oscillation_out_queue.queue.get_nowait() | ||
) | ||
|
||
assert oscillation is not None | ||
assert isinstance(oscillation, lidar_oscillation.LidarOscillation) | ||
|
||
assert len(oscillation.readings) > 0 | ||
print( | ||
f"Oscillation {oscillation_count + 1} detected with {len(oscillation.readings)} readings. " | ||
f"Angles: {[reading.angle for reading in oscillation.readings]}" | ||
) | ||
|
||
oscillation_count += 1 | ||
|
||
except queue.Empty: | ||
continue | ||
|
||
assert oscillation_count > 0 | ||
print(f"Total oscillations detected: {oscillation_count}") | ||
|
||
# Teardown | ||
controller.request_exit() | ||
detection_in_queue.fill_and_drain_queue() | ||
worker.join() | ||
|
||
return 0 | ||
|
||
|
||
if __name__ == "__main__": | ||
result_main = main() | ||
if result_main < 0: | ||
print(f"ERROR: Status code: {result_main}") | ||
|
||
print("Done!") |
Oops, something went wrong.