-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
new vfh branch where I have not included common
- Loading branch information
Showing
4 changed files
with
406 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
""" | ||
Calculates obstacle densities for specified sector widths for drone's 2D field of view | ||
""" | ||
|
||
import numpy as np | ||
|
||
from modules import lidar_oscillation | ||
from modules import polar_obstacle_density | ||
|
||
|
||
class VectorFieldHistogram: | ||
""" | ||
Class to generate PolarObstacleDensity from a LidarOscillation using the Vector Field Histogram (VFH) method. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
sector_width: float, | ||
max_vector_magnitude: float, | ||
linear_decay_rate: float, | ||
confidence_value: float, | ||
start_angle: float, | ||
end_angle: float, | ||
) -> None: | ||
""" | ||
Initialize the VectorFieldHistogram with the following parameters: | ||
- sector_width (degrees): The angular size of each sector. | ||
- max_vector_magnitude (unitless): The maximum magnitude applied to obstacle vectors in | ||
SectorObstacleDensities, representing the highest obstacle density for a sector. | ||
- linear_decay_rate (unitless): The rate at which the magnitude of SectorObstacleDensity | ||
reduces with increasing distance. | ||
- confidence_value (unitless): The certainty value applied to each detection, to adjust for LiDAR error. | ||
""" | ||
|
||
if sector_width <= 0: | ||
sector_width = 2 | ||
if not 0 <= max_vector_magnitude <= 1: | ||
max_vector_magnitude = 1 | ||
if not 0 <= linear_decay_rate <= 1: | ||
linear_decay_rate = 0.1 | ||
if not 0 <= confidence_value <= 1: | ||
confidence_value = 0.9 | ||
if start_angle >= end_angle: | ||
start_angle = -90 | ||
end_angle = 90 | ||
|
||
self.sector_width = sector_width | ||
self.start_angle = start_angle | ||
self.end_angle = end_angle | ||
self.num_sectors = int((end_angle - start_angle) / self.sector_width) | ||
|
||
self.max_vector_magnitude = max_vector_magnitude | ||
self.linear_decay_rate = linear_decay_rate | ||
self.confidence_value = confidence_value | ||
|
||
def run( | ||
self, oscillation: lidar_oscillation.LidarOscillation | ||
) -> "tuple[bool, polar_obstacle_density.PolarObstacleDensity | None]": | ||
""" | ||
Generate a PolarObstacleDensity from the LidarOscillation. | ||
""" | ||
|
||
object_densities = np.zeros(self.num_sectors) | ||
|
||
for reading in oscillation.readings: | ||
angle = reading.angle | ||
|
||
if angle < self.start_angle or angle > self.end_angle: | ||
continue | ||
|
||
# Convert angle to sector index | ||
sector_index = int((angle - self.start_angle) / self.sector_width) | ||
sector_index = min(sector_index, self.num_sectors - 1) | ||
|
||
distance_factor = self.max_vector_magnitude - self.linear_decay_rate * reading.distance | ||
obstacle_magnitude = (self.confidence_value**2) * distance_factor | ||
|
||
# Accumulate obstacle densities | ||
object_densities[sector_index] += max(obstacle_magnitude, 0) | ||
|
||
sector_densities = [] | ||
for i, density in enumerate(object_densities): | ||
angle_start = self.start_angle + i * self.sector_width | ||
angle_end = angle_start + self.sector_width | ||
result, sector_density = polar_obstacle_density.SectorObstacleDensity.create( | ||
angle_start, angle_end, density | ||
) | ||
if result: | ||
sector_densities.append(sector_density) | ||
|
||
result, polar_obstacle_density_object = polar_obstacle_density.PolarObstacleDensity.create( | ||
sector_densities | ||
) | ||
return result, polar_obstacle_density_object |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
""" | ||
Continuously outputs PolarObstacleDensity for a stream of LidarOscillation objects | ||
""" | ||
|
||
from modules import vfh | ||
from modules import lidar_oscillation | ||
from worker import queue_wrapper | ||
from worker import worker_controller | ||
|
||
|
||
def vector_field_histogram_worker( | ||
sector_width: float, | ||
max_vector_magnitude: float, | ||
linear_decay_rate: float, | ||
confidence_value: float, | ||
start_angle: float, | ||
end_angle: float, | ||
oscillation_in_queue: queue_wrapper.QueueWrapper, | ||
density_out_queue: queue_wrapper.QueueWrapper, | ||
controller: worker_controller.WorkerController, | ||
) -> None: | ||
""" | ||
Process LIDAR oscillations into obstacle densities. | ||
""" | ||
|
||
# Initialize the VectorFieldHistogram | ||
vfh_instance = vfh.VectorFieldHistogram( | ||
sector_width, | ||
max_vector_magnitude, | ||
linear_decay_rate, | ||
confidence_value, | ||
start_angle, | ||
end_angle, | ||
) | ||
|
||
while not controller.is_exit_requested(): | ||
controller.check_pause() | ||
|
||
oscillation: lidar_oscillation.LidarOscillation = oscillation_in_queue.queue.get() | ||
if lidar_oscillation is None: | ||
break | ||
|
||
result, polar_density = vfh_instance.run(oscillation) | ||
if not result: | ||
continue | ||
|
||
print("PolarObstacleDensity sent to Decision module") | ||
density_out_queue.queue.put(polar_density) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
""" | ||
VFH integration test. | ||
""" | ||
|
||
import multiprocessing as mp | ||
import queue | ||
import numpy as np | ||
|
||
from modules import lidar_detection | ||
from modules import lidar_oscillation | ||
from modules.vfh import vfh | ||
from worker import queue_wrapper | ||
from worker import worker_controller | ||
|
||
# Constants | ||
QUEUE_MAX_SIZE = 10 | ||
SECTOR_WIDTH = 5.0 | ||
MAX_VECTOR_MAGNITUDE = 1.0 | ||
LINEAR_DECAY_RATE = 0.05 | ||
CONFIDENCE_VALUE = 1.0 | ||
START_ANGLE = -90.0 | ||
END_ANGLE = 90.0 | ||
THRESHOLD = 0.5 | ||
|
||
# pylint: disable=duplicate-code | ||
|
||
|
||
def simulate_oscillation_worker(in_queue: queue_wrapper.QueueWrapper) -> None: | ||
""" | ||
Simulate LidarOscillation and push it to the queue. | ||
""" | ||
readings = [] | ||
|
||
for angle in np.arange(-90, -30, SECTOR_WIDTH): | ||
result, detection = lidar_detection.LidarDetection.create(angle=angle, distance=5.0) | ||
assert result, "Failed to create LidarDetection" | ||
readings.append(detection) | ||
|
||
for angle in np.arange(-30, 30, SECTOR_WIDTH): | ||
result, detection = lidar_detection.LidarDetection.create(angle=angle, distance=100.0) | ||
assert result, "Failed to create LidarDetection" | ||
readings.append(detection) | ||
|
||
for angle in np.arange(30, 90, SECTOR_WIDTH): | ||
result, detection = lidar_detection.LidarDetection.create(angle=angle, distance=5.0) | ||
assert result, "Failed to create LidarDetection" | ||
readings.append(detection) | ||
|
||
result, oscillation = lidar_oscillation.LidarOscillation.create(readings) | ||
assert result, "Failed to create LidarOscillation" | ||
assert oscillation is not None | ||
|
||
in_queue.queue.put(oscillation) | ||
|
||
result, populated_oscillation = lidar_oscillation.LidarOscillation.create( | ||
[ | ||
lidar_detection.LidarDetection.create(angle=angle, distance=50.0)[1] | ||
for angle in range(-90, 95, 5) | ||
] | ||
) | ||
assert result, "Failed to create a blank LidarOscillation" | ||
assert populated_oscillation is not None | ||
|
||
in_queue.queue.put(populated_oscillation) | ||
|
||
|
||
def main() -> int: | ||
""" | ||
Main function for the VFH integration test. | ||
""" | ||
# Setup | ||
controller = worker_controller.WorkerController() | ||
mp_manager = mp.Manager() | ||
|
||
oscillation_in_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) | ||
|
||
vfh_instance = vfh.VectorFieldHistogram( | ||
sector_width=SECTOR_WIDTH, | ||
max_vector_magnitude=MAX_VECTOR_MAGNITUDE, | ||
linear_decay_rate=LINEAR_DECAY_RATE, | ||
confidence_value=CONFIDENCE_VALUE, | ||
start_angle=START_ANGLE, | ||
end_angle=END_ANGLE, | ||
) | ||
|
||
# Simulate LidarOscillation data | ||
simulate_oscillation_worker(oscillation_in_queue) | ||
|
||
# Test | ||
density_count = 0 | ||
while True: | ||
try: | ||
input_data: lidar_oscillation.LidarOscillation = oscillation_in_queue.queue.get_nowait() | ||
assert input_data is not None | ||
|
||
result, output_data = vfh_instance.run(input_data) | ||
assert result, "VFH computation failed" | ||
assert output_data is not None | ||
|
||
# Print detailed debug information | ||
print(f"PolarObstacleDensity {density_count + 1}:") | ||
print(f" Sectors: {len(output_data.sector_densities)}") | ||
print( | ||
f" Sector Densities: {[sector.density for sector in output_data.sector_densities]}" | ||
) | ||
print(f" Angles: {[sector.angle_start for sector in output_data.sector_densities]}") | ||
|
||
density_count += 1 | ||
|
||
except queue.Empty: | ||
break | ||
|
||
assert density_count > 0, "No PolarObstacleDensity objects were processed" | ||
print(f"Total PolarObstacleDensities processed: {density_count}") | ||
|
||
# Teardown | ||
controller.request_exit() | ||
oscillation_in_queue.fill_and_drain_queue() | ||
|
||
return 0 | ||
|
||
|
||
if __name__ == "__main__": | ||
result_main = main() | ||
if result_main < 0: | ||
print(f"ERROR: Status code: {result_main}") | ||
|
||
print("Done!") |
Oops, something went wrong.