Skip to content

Commit

Permalink
feat(api,hardware): add HEPA/UV config/control commands to hardware c…
Browse files Browse the repository at this point in the history
…ontroller. (#14489)
  • Loading branch information
vegano1 authored Feb 16, 2024
1 parent 818292c commit cb0d4e6
Show file tree
Hide file tree
Showing 8 changed files with 446 additions and 7 deletions.
18 changes: 17 additions & 1 deletion api/src/opentrons/hardware_control/backends/flex_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
EstopState,
HardwareEventHandler,
HardwareEventUnsubscriber,
HepaFanState,
HepaUVState,
StatusBarState,
)
from opentrons.hardware_control.module_control import AttachedModulesControl
from ..dev_types import OT3AttachedInstruments
from ..types import StatusBarState
from .types import HWStopCondition

Cls = TypeVar("Cls")
Expand Down Expand Up @@ -417,3 +419,17 @@ def check_gripper_position_within_bounds(
hard_limit_upper: float,
) -> None:
...

async def set_hepa_fan_state(self, fan_on: bool, duty_cycle: int) -> bool:
"""Sets the state and duty cycle of the Hepa/UV module."""
...

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
...

async def set_hepa_uv_state(self, light_on: bool, uv_duration_s: int) -> bool:
"""Sets the state and duration (seconds) of the UV light for the Hepa/UV module."""
...

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
...
37 changes: 36 additions & 1 deletion api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@
from opentrons_hardware.hardware_control.gripper_settings import (
get_gripper_jaw_state,
)
from opentrons_hardware.hardware_control.hepa_uv_settings import (
set_hepa_fan_state as set_hepa_fan_state_fw,
get_hepa_fan_state as get_hepa_fan_state_fw,
set_hepa_uv_state as set_hepa_uv_state_fw,
get_hepa_uv_state as get_hepa_uv_state_fw,
)

from opentrons_hardware.drivers.gpio import OT3GPIO, RemoteOT3GPIO
from opentrons_shared_data.pipette.dev_types import PipetteName
Expand All @@ -193,7 +199,7 @@
AttachedGripper,
OT3AttachedInstruments,
)
from ..types import StatusBarState
from ..types import HepaFanState, HepaUVState, StatusBarState

from .types import HWStopCondition
from .flex_protocol import FlexBackend
Expand Down Expand Up @@ -1570,3 +1576,32 @@ def check_gripper_position_within_bounds(
"actual-jaw-width": current_gripper_position,
},
)

async def set_hepa_fan_state(self, fan_on: bool, duty_cycle: int) -> bool:
return await set_hepa_fan_state_fw(self._messenger, fan_on, duty_cycle)

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
res = await get_hepa_fan_state_fw(self._messenger)
return (
HepaFanState(
fan_on=res.fan_on,
duty_cycle=res.duty_cycle,
)
if res
else None
)

async def set_hepa_uv_state(self, light_on: bool, uv_duration_s: int) -> bool:
return await set_hepa_uv_state_fw(self._messenger, light_on, uv_duration_s)

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
res = await get_hepa_uv_state_fw(self._messenger)
return (
HepaUVState(
light_on=res.uv_light_on,
uv_duration_s=res.uv_duration_s,
remaining_time_s=res.remaining_time_s,
)
if res
else None
)
14 changes: 14 additions & 0 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from opentrons.hardware_control.types import (
BoardRevision,
Axis,
HepaFanState,
HepaUVState,
OT3Mount,
OT3AxisMap,
CurrentConfig,
Expand Down Expand Up @@ -803,3 +805,15 @@ def check_gripper_position_within_bounds(
# This is a (pretty bad) simulation of the gripper actually gripping something,
# but it should work.
self._encoder_position[Axis.G] = (hard_limit_upper - jaw_width) / 2

async def set_hepa_fan_state(self, fan_on: bool, duty_cycle: int) -> bool:
return False

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
return None

async def set_hepa_uv_state(self, light_on: bool, timeout_s: int) -> bool:
return False

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
return None
20 changes: 20 additions & 0 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
HardwareEvent,
HardwareEventHandler,
HardwareAction,
HepaFanState,
HepaUVState,
MotionChecks,
SubSystem,
PauseType,
Expand Down Expand Up @@ -2685,3 +2687,21 @@ def estop_acknowledge_and_clear(self) -> EstopOverallStatus:

def get_estop_state(self) -> EstopState:
return self._backend.get_estop_state()

async def set_hepa_fan_state(
self, turn_on: bool = False, duty_cycle: int = 75
) -> bool:
"""Sets the state and duty cycle of the Hepa/UV module."""
return await self._backend.set_hepa_fan_state(turn_on, duty_cycle)

async def get_hepa_fan_state(self) -> Optional[HepaFanState]:
return await self._backend.get_hepa_fan_state()

async def set_hepa_uv_state(
self, turn_on: bool = False, uv_duration_s: int = 900
) -> bool:
"""Sets the state and duration (seconds) of the UV light for the Hepa/UV module."""
return await self._backend.set_hepa_uv_state(turn_on, uv_duration_s)

async def get_hepa_uv_state(self) -> Optional[HepaUVState]:
return await self._backend.get_hepa_uv_state()
13 changes: 13 additions & 0 deletions api/src/opentrons/hardware_control/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,19 @@ class EstopOverallStatus:
right_physical_state: EstopPhysicalStatus


@dataclass
class HepaFanState:
fan_on: bool
duty_cycle: int


@dataclass
class HepaUVState:
light_on: bool
uv_duration_s: int
remaining_time_s: int


@dataclass(frozen=True)
class DoorStateNotification:
event: Literal[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ class SetHepaFanStateRequestPayload(EmptyPayload):
"""A request to set the state and pwm of a the hepa fan."""

duty_cycle: utils.UInt32Field
fan_on: utils.Int8Field
fan_on: utils.UInt8Field


@dataclass(eq=False)
Expand All @@ -655,16 +655,16 @@ class GetHepaFanStatePayloadResponse(EmptyPayload):

@dataclass(eq=False)
class SetHepaUVStateRequestPayload(EmptyPayload):
"""A request to set the state and timeout in seconds of the hepa uv light."""
"""A request to set the state and duration in seconds of the hepa uv light."""

timeout_s: utils.UInt32Field
uv_duration_s: utils.UInt32Field
uv_light_on: utils.UInt8Field


@dataclass(eq=False)
class GetHepaUVStatePayloadResponse(EmptyPayload):
"""A response with the state and timeout in seconds of the hepa uv light."""
"""A response with the state and duration in seconds of the hepa uv light."""

timeout_s: utils.UInt32Field
uv_duration_s: utils.UInt32Field
uv_light_on: utils.UInt8Field
remaining_time_s: utils.UInt32Field
150 changes: 150 additions & 0 deletions hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""Utilities for controlling the hepa/uv extension module."""
import logging
import asyncio
from typing import Optional
from dataclasses import dataclass
from opentrons_hardware.drivers.can_bus.can_messenger import CanMessenger
from opentrons_hardware.firmware_bindings.arbitration_id import ArbitrationId

from opentrons_hardware.firmware_bindings.messages import payloads
from opentrons_hardware.firmware_bindings.messages.messages import MessageDefinition
from opentrons_hardware.firmware_bindings.messages.message_definitions import (
SetHepaFanStateRequest,
GetHepaFanStateRequest,
GetHepaFanStateResponse,
SetHepaUVStateRequest,
GetHepaUVStateRequest,
GetHepaUVStateResponse,
)
from opentrons_hardware.firmware_bindings.utils import (
UInt8Field,
UInt32Field,
)
from opentrons_hardware.firmware_bindings.constants import (
MessageId,
NodeId,
ErrorCode,
)

log = logging.getLogger(__name__)


@dataclass(frozen=True)
class HepaFanState:
"""Hepa Fan Config."""

fan_on: bool
duty_cycle: int


@dataclass(frozen=True)
class HepaUVState:
"""Hepa UV Light Config."""

uv_light_on: bool
uv_duration_s: int
remaining_time_s: int


async def set_hepa_fan_state(
can_messenger: CanMessenger,
fan_on: bool,
duty_cycle: int,
) -> bool:
"""Set the Hepa fan state and duty cycle."""
error = await can_messenger.ensure_send(
node_id=NodeId.hepa_uv,
message=SetHepaFanStateRequest(
payload=payloads.SetHepaFanStateRequestPayload(
duty_cycle=UInt32Field(duty_cycle), fan_on=UInt8Field(fan_on)
),
),
expected_nodes=[NodeId.hepa_uv],
)
if error != ErrorCode.ok:
log.error(f"recieved error trying to set hepa fan state {str(error)}")
return error == ErrorCode.ok


async def get_hepa_fan_state(can_messenger: CanMessenger) -> Optional[HepaFanState]:
"""Gets the state of the Hepa fan."""
fan_state: Optional[HepaFanState] = None

event = asyncio.Event()

def _listener(message: MessageDefinition, arb_id: ArbitrationId) -> None:
nonlocal fan_state
if isinstance(message, GetHepaFanStateResponse):
event.set()
fan_state = HepaFanState(
fan_on=bool(message.payload.fan_on.value),
duty_cycle=int(message.payload.duty_cycle.value),
)

def _filter(arb_id: ArbitrationId) -> bool:
return (NodeId(arb_id.parts.originating_node_id) == NodeId.hepa_uv) and (
MessageId(arb_id.parts.message_id) == MessageId.get_hepa_fan_state_response
)

can_messenger.add_listener(_listener, _filter)
await can_messenger.send(node_id=NodeId.hepa_uv, message=GetHepaFanStateRequest())
try:
await asyncio.wait_for(event.wait(), 1.0)
except asyncio.TimeoutError:
log.warning("hepa fan state request timed out")
finally:
can_messenger.remove_listener(_listener)
return fan_state


async def set_hepa_uv_state(
can_messenger: CanMessenger,
uv_light_on: bool,
uv_duration_s: int,
) -> bool:
"""Sets the Hepa UV light state and duration in seconds."""
error = await can_messenger.ensure_send(
node_id=NodeId.hepa_uv,
message=SetHepaUVStateRequest(
payload=payloads.SetHepaUVStateRequestPayload(
uv_duration_s=UInt32Field(uv_duration_s),
uv_light_on=UInt8Field(uv_light_on),
),
),
expected_nodes=[NodeId.hepa_uv],
)
if error != ErrorCode.ok:
log.error(f"recieved error trying to set hepa uv light state {str(error)}")
return error == ErrorCode.ok


async def get_hepa_uv_state(can_messenger: CanMessenger) -> Optional[HepaUVState]:
"""Gets the state of the Hepa uv light."""
uv_state: Optional[HepaUVState] = None

event = asyncio.Event()

def _listener(message: MessageDefinition, arb_id: ArbitrationId) -> None:
nonlocal uv_state
if isinstance(message, GetHepaUVStateResponse):
event.set()
uv_state = HepaUVState(
uv_light_on=bool(message.payload.uv_light_on.value),
uv_duration_s=int(message.payload.uv_duration_s.value),
remaining_time_s=int(message.payload.remaining_time_s.value),
)

def _filter(arb_id: ArbitrationId) -> bool:
return (NodeId(arb_id.parts.originating_node_id) == NodeId.hepa_uv) and (
MessageId(arb_id.parts.message_id) == MessageId.get_hepa_uv_state_response
)

can_messenger.add_listener(_listener, _filter)
await can_messenger.send(node_id=NodeId.hepa_uv, message=GetHepaUVStateRequest())
try:
await asyncio.wait_for(event.wait(), 1.0)
except asyncio.TimeoutError:
log.warning("hepa uv light state request timed out")
finally:
can_messenger.remove_listener(_listener)
return uv_state
Loading

0 comments on commit cb0d4e6

Please sign in to comment.