-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added multiprocessing worker and conversion methods
- Loading branch information
Showing
4 changed files
with
157 additions
and
51 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
import pymap3d as pymap | ||
|
||
from .. import drone_odometry_local | ||
from ..common.mavlink.modules import drone_odometry | ||
|
||
|
||
def position_global_to_local( | ||
global_position, home_location | ||
) -> "tuple[bool, drone_odometry_local.DronePositionLocal | None]": | ||
""" | ||
Converts global position (geodetic) to local position (NED) | ||
""" | ||
north, east, down = pymap.geodetic2ned( | ||
global_position.latitude, | ||
global_position.longitude, | ||
global_position.altitude, | ||
home_location.latitude, | ||
home_location.longitude, | ||
home_location.altitude, | ||
) | ||
|
||
result, local_position = drone_odometry_local.DronePositionLocal.create(north, east, down) | ||
if not result: | ||
return False, None | ||
|
||
return True, local_position | ||
|
||
|
||
def position_local_to_global( | ||
local_position, home_location | ||
) -> "tuple[bool, drone_odometry.DronePosition | None]": | ||
""" | ||
Converts local position (NED) to global position (geodetic) | ||
""" | ||
latitude, longitude, altitude = pymap.ned2geodetic( | ||
local_position.north, | ||
local_position.east, | ||
local_position.down, | ||
home_location.latitude, | ||
home_location.longitude, | ||
home_location.altitude, | ||
) | ||
|
||
result, global_position = drone_odometry.DronePosition.create(latitude, longitude, altitude) | ||
if not result: | ||
return False, None | ||
|
||
return True, global_position |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,53 +1,58 @@ | ||
|
||
from . import conversions # to be implemented | ||
from . import conversions | ||
from .. import drone_odometry_local | ||
|
||
from ..common.mavlink.modules import drone_odometry | ||
from ..common.mavlink.modules import flight_controller | ||
|
||
|
||
class FlightInterface: | ||
""" | ||
Create flight controller and sets home location | ||
""" | ||
|
||
__create_key = object() | ||
|
||
def create( | ||
cls, | ||
address: str, | ||
timeout_home: float | ||
) -> "tuple[bool, FlightInterface | None]": | ||
@classmethod | ||
def create(cls, address: str, timeout_home: float) -> "tuple[bool, FlightInterface | None]": | ||
""" | ||
address: TCP address or port. | ||
timeout_home: Timeout for home location in seconds. | ||
""" | ||
result, controller = flight_controller.FlightController.create(address) | ||
if not result: | ||
return False, None | ||
|
||
result, home_location = controller.get_home_location(timeout_home) | ||
if not result: | ||
return False, None | ||
|
||
return True, FlightInterface(cls.__create_key, controller, home_location) | ||
|
||
def __init__( | ||
self, | ||
self, | ||
create_key: object, | ||
controller: flight_controller.FlightController, | ||
home_location: drone_odometry.DroneLocation, | ||
home_location: drone_odometry.DroneLocation, | ||
) -> None: | ||
assert create_key is FlightInterface.__create_key, "Use create() method" | ||
|
||
self.controller = controller | ||
self.home_location = home_location | ||
|
||
def run(self) -> "tuple[bool, drone_odometry_local.DroneOdometryLocal | None]": | ||
|
||
result, drone_odometry = self.controller.get_odometry() | ||
""" | ||
Returns local drone odometry with timestamp | ||
""" | ||
result, odometry = self.controller.get_odometry() | ||
if not result: | ||
return False, None | ||
|
||
drone_position = drone_odometry.position | ||
result, drone_position_local = conversions.global_to_local(drone_position, self.home_location) | ||
global_position = odometry.position | ||
|
||
result, local_position = conversions.global_to_local(global_position, self.home_location) | ||
if not result: | ||
return False, None | ||
|
||
drone_orientation = drone_odometry.orientation | ||
drone_orientation = odometry.orientation | ||
|
||
return drone_odometry_local.DroneOdometryLocal.create(drone_position_local, drone_orientation) | ||
|
||
return drone_odometry_local.DroneOdometryLocal.create(local_position, drone_orientation) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import time | ||
|
||
from . import flight_interface | ||
from ..worker import queue_wrapper | ||
from ..worker import worker_controller | ||
|
||
|
||
def flight_interface_worker( | ||
address: str, | ||
timeout: float, | ||
period: float, | ||
output_queue: queue_wrapper.QueueWrapper, | ||
controller: worker_controller.WorkerController, | ||
) -> None: | ||
""" | ||
Worker process. | ||
address, timeout is initial setting. | ||
period is minimum period between loops. | ||
output_queue is the data queue. | ||
controller is how the main process communicates to this worker process. | ||
""" | ||
result, interface = flight_interface.FlightInterface.create(address, timeout) | ||
if not result: | ||
return | ||
|
||
while not controller.is_exit_requested(): | ||
controller.check_pause() | ||
|
||
time.sleep(period) | ||
|
||
result, value = interface.run() | ||
if not result: | ||
continue | ||
|
||
output_queue.queue.put(value) |