-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
44ed706
commit b43c559
Showing
4 changed files
with
35 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,8 +7,7 @@ | |
Author: Yuhuang Hu | ||
Email : [email protected] | ||
""" | ||
|
||
from typing import Optional | ||
from __future__ import annotations | ||
|
||
import numpy as np | ||
|
||
|
@@ -27,10 +26,10 @@ class EventContainer: | |
def __init__( | ||
self, | ||
pol_events: np.ndarray, | ||
special_events: Optional[np.ndarray] = None, | ||
frames: Optional[np.ndarray] = None, | ||
frames_ts: Optional[np.ndarray] = None, | ||
imu_events: Optional[np.ndarray] = None, | ||
special_events: np.ndarray | None = None, | ||
frames: np.ndarray | None = None, | ||
frames_ts: np.ndarray | None = None, | ||
imu_events: np.ndarray | None = None, | ||
) -> None: | ||
self.pol_events = pol_events | ||
self.num_pol_events = 0 if pol_events is None else pol_events.shape[0] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,14 +3,11 @@ | |
Author: Yuhuang Hu | ||
Email : [email protected] | ||
""" | ||
from __future__ import annotations | ||
|
||
from abc import abstractmethod | ||
from typing import Any | ||
from typing import Callable | ||
from typing import Dict | ||
from typing import List | ||
from typing import Union | ||
from typing import Optional | ||
from typing import Tuple | ||
from typing import TYPE_CHECKING | ||
|
||
import numpy as np | ||
|
@@ -35,21 +32,21 @@ def __init__(self) -> None: | |
self.handle = None | ||
|
||
self.filter_noise = False | ||
self.noise_filter: Optional["DVSNoise"] = None | ||
self.noise_filter: "DVSNoise" | None = None | ||
|
||
self.configs_list: List[ | ||
Tuple[str, ModuleAddressType, ParameterAddressType] | ||
self.configs_list: list[ | ||
tuple[str, ModuleAddressType, ParameterAddressType] | ||
] = [] | ||
|
||
# Functions for get events number and packet functions | ||
self.get_event_number_funcs: Dict[EventType, Callable] = { | ||
self.get_event_number_funcs: dict[EventType, Callable] = { | ||
libcaer.POLARITY_EVENT: libcaer.caerEventPacketHeaderGetEventNumber, | ||
libcaer.SPECIAL_EVENT: libcaer.caerEventPacketHeaderGetEventNumber, | ||
libcaer.IMU6_EVENT: libcaer.caerEventPacketHeaderGetEventNumber, | ||
libcaer.IMU9_EVENT: libcaer.caerEventPacketHeaderGetEventNumber, | ||
libcaer.SPIKE_EVENT: libcaer.caerEventPacketHeaderGetEventNumber, | ||
} | ||
self.get_event_packet_funcs: Dict[EventType, Callable] = { | ||
self.get_event_packet_funcs: dict[EventType, Callable] = { | ||
libcaer.POLARITY_EVENT: libcaer.caerPolarityEventPacketFromPacketHeader, | ||
libcaer.SPECIAL_EVENT: libcaer.caerSpecialEventPacketFromPacketHeader, | ||
libcaer.FRAME_EVENT: libcaer.caerFrameEventPacketFromPacketHeader, | ||
|
@@ -144,9 +141,7 @@ def send_default_config(self) -> bool: | |
else: | ||
return False | ||
|
||
def set_config( | ||
self, mod_addr: int, param_addr: int, param: Union[int, bool] | ||
) -> bool: | ||
def set_config(self, mod_addr: int, param_addr: int, param: int | bool) -> bool: | ||
"""Sets configuration. | ||
The main function of setting configurations (e.g., bias). | ||
|
@@ -218,8 +213,8 @@ def set_data_exchange_blocking(self, exchange_blocking: bool = True) -> bool: | |
def start_data_stream( | ||
self, | ||
send_default_config: bool = True, | ||
max_packet_size: Optional[int] = None, | ||
max_packet_interval: Optional[int] = None, | ||
max_packet_size: int | None = None, | ||
max_packet_interval: int | None = None, | ||
) -> None: | ||
"""Starts streaming data. | ||
|
@@ -245,7 +240,7 @@ def start_data_stream( | |
self.data_start() | ||
self.set_data_exchange_blocking() | ||
|
||
def get_config(self, mod_addr: int, param_addr: int) -> Optional[Union[int, bool]]: | ||
def get_config(self, mod_addr: int, param_addr: int) -> int | bool | None: | ||
"""Gets Configuration. | ||
# Args: | ||
|
@@ -266,7 +261,7 @@ def get_config(self, mod_addr: int, param_addr: int) -> Optional[Union[int, bool | |
else: | ||
return None | ||
|
||
def get_packet_container(self) -> Tuple[Optional[Any], Optional[int]]: | ||
def get_packet_container(self) -> tuple[Any | None, int | None]: | ||
"""Gets event packet container. | ||
# Returns: A tuple of ``(packet_container, num_event_packets)``. | ||
|
@@ -282,7 +277,7 @@ def get_packet_container(self) -> Tuple[Optional[Any], Optional[int]]: | |
|
||
def get_packet_header( | ||
self, packet_container: Any, idx: int | ||
) -> Tuple[Optional[Any], Optional[Any]]: | ||
) -> tuple[Any | None, Any | None]: | ||
"""Gets a single packet header. | ||
# Args: | ||
|
@@ -303,7 +298,7 @@ def get_packet_header( | |
|
||
def get_event_packet( | ||
self, packet_header: Any, packet_type: EventType | ||
) -> Tuple[int, Any]: | ||
) -> tuple[int, Any]: | ||
"""Gets event packet from packet header. | ||
# Arguments | ||
|
@@ -385,7 +380,7 @@ def save_bias_to_json(self, file_path: str) -> bool: | |
bias_obj = self.get_bias() | ||
return write_json(file_path, bias_obj) | ||
|
||
def get_polarity_event(self, packet_header: Any) -> Tuple[np.ndarray, int]: | ||
def get_polarity_event(self, packet_header: Any) -> tuple[np.ndarray, int]: | ||
"""Gets a packet of polarity event. | ||
# Args: | ||
|
@@ -409,7 +404,7 @@ def get_polarity_event(self, packet_header: Any) -> Tuple[np.ndarray, int]: | |
|
||
return events, num_events | ||
|
||
def get_special_event(self, packet_header: Any) -> Tuple[np.ndarray, int]: | ||
def get_special_event(self, packet_header: Any) -> tuple[np.ndarray, int]: | ||
"""Get a packet of special event. | ||
# Arguments | ||
|
@@ -475,8 +470,8 @@ def open( # type: ignore | |
raise ValueError("The device is failed to open.") | ||
|
||
def get_polarity_hist( | ||
self, packet_header: Any, device_type: Optional[Union[str, int]] = None | ||
) -> Tuple[np.ndarray, int]: | ||
self, packet_header: Any, device_type: str | int | None = None | ||
) -> tuple[np.ndarray, int]: | ||
"""Get the positive and negative histogram for a packet.""" | ||
num_events, polarity = self.get_event_packet( | ||
packet_header, libcaer.POLARITY_EVENT | ||
|
@@ -500,9 +495,9 @@ def get_polarity_hist( | |
def get_frame_event( | ||
self, | ||
packet_header: Any, | ||
device_type: Optional[int] = None, | ||
device_type: int | None = None, | ||
aps_filter_type: int = libcaer.MONO, | ||
) -> Tuple[np.ndarray, int]: | ||
) -> tuple[np.ndarray, int]: | ||
"""Get a packet of frame event. | ||
# Args: | ||
|
@@ -542,7 +537,7 @@ def get_frame_event( | |
|
||
return frame_mat, frame_ts | ||
|
||
def get_imu6_event(self, packet_header: Any) -> Tuple[np.ndarray, int]: | ||
def get_imu6_event(self, packet_header: Any) -> tuple[np.ndarray, int]: | ||
"""Get IMU6 event. | ||
# Args: | ||
|
@@ -563,7 +558,7 @@ def get_imu6_event(self, packet_header: Any) -> Tuple[np.ndarray, int]: | |
|
||
return events, num_events | ||
|
||
def get_imu9_event(self, packet_header: Any) -> Tuple[np.ndarray, int]: | ||
def get_imu9_event(self, packet_header: Any) -> tuple[np.ndarray, int]: | ||
"""Get IMU9 event. | ||
# Args: | ||
|
@@ -585,7 +580,7 @@ def get_imu9_event(self, packet_header: Any) -> Tuple[np.ndarray, int]: | |
|
||
return events, num_events | ||
|
||
def get_spike_event(self, packet_header: Any) -> Tuple[np.ndarray, int]: | ||
def get_spike_event(self, packet_header: Any) -> tuple[np.ndarray, int]: | ||
"""Get Spike Event. | ||
# Args: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,9 @@ | ||
from typing import Dict | ||
from __future__ import annotations | ||
|
||
from typing import Any | ||
|
||
EventType = int | ||
DeviceType = int | ||
ModuleAddressType = int | ||
ParameterAddressType = int | ||
BiasObjectType = Dict[str, Any] | ||
BiasObjectType = dict[str, Any] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,9 +3,9 @@ | |
Author: Yuhuang Hu | ||
Email : [email protected] | ||
""" | ||
from __future__ import annotations | ||
|
||
from typing import Any | ||
from typing import Tuple | ||
from typing import Optional | ||
|
||
import numpy as np | ||
|
||
|
@@ -17,7 +17,7 @@ | |
class DVS128(USBDevice): | ||
"""DVS128. | ||
# Arguments | ||
# Args | ||
device_id: a unique ID to identify the device from others. Will be used as the | ||
source for EventPackets being generate from its data. `default is 1` | ||
bus_number_restrict: restrict the search for viable devices to only this USB | ||
|
@@ -107,7 +107,7 @@ def obtain_device_info(self, handle: Any) -> None: | |
|
||
def get_event( # type: ignore | ||
self, mode: str = "events" | ||
) -> Optional[Tuple[np.ndarray, int, np.ndarray, int]]: | ||
) -> tuple[np.ndarray, int, np.ndarray, int] | None: | ||
"""Get event. | ||
# Returns | ||
|