Skip to content

Commit

Permalink
fix(hardware-testing): flex-stacker QC script improvements (#16961)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahiuchingau authored Nov 22, 2024
1 parent d5b7e61 commit 9b9ed16
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from hardware_testing.data.csv_report import CSVReport

from .config import TestSection, TestConfig, build_report, TESTS
from .driver import FlexStacker
from .driver import FlexStacker, PlatformStatus


def build_stacker_report(is_simulating: bool) -> Tuple[CSVReport, FlexStacker]:
Expand Down Expand Up @@ -42,6 +42,28 @@ async def _main(cfg: TestConfig) -> None:
# BUILD REPORT
report, stacker = build_stacker_report(cfg.simulate)

if not cfg.simulate:
# Perform initial checks before starting tests
# 1. estop should not be pressed
# 2. platform should be removed
if stacker.get_estop():
ui.print_error("ESTOP is pressed, please release it before starting")
ui.get_user_ready("Release ESTOP")
if stacker.get_estop():
ui.print_error("ESTOP is still pressed, cannot start tests")
return

platform_state = stacker.get_platform_status()
if platform_state is PlatformStatus.ERROR:
ui.print_error("Platform sensors are not working properly, aborting")
return
if platform_state is not PlatformStatus.REMOVED:
ui.print_error("Platform must be removed from the carrier before starting")
ui.get_user_ready("Remove platform from {platform_state.value}")
if stacker.get_platform_status() is not PlatformStatus.REMOVED:
ui.print_error("Platform is still detected, cannot start tests")
return

# RUN TESTS
for section, test_run in cfg.tests.items():
ui.print_title(section.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,22 @@ class TestConfig:
TestSection.Z_AXIS,
test_z_axis.run,
),
(
TestSection.L_AXIS,
test_l_axis.run,
),
(
TestSection.X_AXIS,
test_x_axis.run,
),
(
TestSection.DOOR_SWITCH,
test_door_switch.run,
TestSection.L_AXIS,
test_l_axis.run,
),
(
TestSection.ESTOP,
test_estop.run,
),
(
TestSection.DOOR_SWITCH,
test_door_switch.run,
),
]


Expand All @@ -75,21 +75,21 @@ def build_report(test_name: str) -> CSVReport:
title=TestSection.Z_AXIS.value,
lines=test_z_axis.build_csv_lines(),
),
CSVSection(
title=TestSection.L_AXIS.value,
lines=test_l_axis.build_csv_lines(),
),
CSVSection(
title=TestSection.X_AXIS.value,
lines=test_x_axis.build_csv_lines(),
),
CSVSection(
title=TestSection.DOOR_SWITCH.value,
lines=test_door_switch.build_csv_lines(),
title=TestSection.L_AXIS.value,
lines=test_l_axis.build_csv_lines(),
),
CSVSection(
title=TestSection.ESTOP.value,
lines=test_estop.build_csv_lines(),
),
CSVSection(
title=TestSection.DOOR_SWITCH.value,
lines=test_door_switch.build_csv_lines(),
),
],
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""FLEX Stacker Driver."""
from typing import Tuple
from dataclasses import dataclass
import serial # type: ignore[import]
from serial.tools.list_ports import comports # type: ignore[import]
Expand Down Expand Up @@ -38,6 +39,26 @@ def __str__(self) -> str:
return self.name


class PlatformStatus(Enum):
"""Platform Status."""

REMOVED = 0
EXTENTED = 1
RETRACTED = 2
ERROR = 4

@classmethod
def from_tuple(cls, status: Tuple[int, int]) -> "PlatformStatus":
"""Get platform status from tuple."""
if status == (0, 0):
return PlatformStatus.REMOVED
if status == (1, 0):
return PlatformStatus.EXTENTED
if status == (0, 1):
return PlatformStatus.RETRACTED
return PlatformStatus.ERROR


class Direction(Enum):
"""Direction."""

Expand Down Expand Up @@ -67,9 +88,9 @@ class MoveParams:

def __str__(self) -> str:
"""Convert to string."""
v = "V:" + str(self.max_speed) if self.max_speed else ""
a = "A:" + str(self.acceleration) if self.acceleration else ""
d = "D:" + str(self.max_speed_discont) if self.max_speed_discont else ""
v = "V" + str(self.max_speed) if self.max_speed else ""
a = "A" + str(self.acceleration) if self.acceleration else ""
d = "D" + str(self.max_speed_discont) if self.max_speed_discont else ""
return f"{v} {a} {d}".strip()


Expand Down Expand Up @@ -100,7 +121,7 @@ def __init__(self, port: str, simulating: bool = False) -> None:

def _send_and_recv(self, msg: str, guard_ret: str = "") -> str:
"""Internal utility to send a command and receive the response."""
assert self._simulating
assert not self._simulating
self._serial.write(msg.encode())
ret = self._serial.readline()
if guard_ret:
Expand Down Expand Up @@ -142,7 +163,7 @@ def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
if self._simulating:
return True

_LS_RE = re.compile(rf"^M119 .*{axis.name}{direction.name[0]}:(\d) .* OK\n")
_LS_RE = re.compile(rf"^M119 .*{axis.name}{direction.name[0]}:(\d).* OK\n")
res = self._send_and_recv("M119\n", "M119 XE:")
match = _LS_RE.match(res)
assert match, f"Incorrect Response for limit switch: {res}"
Expand All @@ -156,12 +177,23 @@ def get_platform_sensor(self, direction: Direction) -> bool:
if self._simulating:
return True

_LS_RE = re.compile(rf"^M121 .*{direction.name[0]}:(\d) .* OK\n")
res = self._send_and_recv("M121\n", "M119 E:")
_LS_RE = re.compile(rf"^M121 .*{direction.name[0]}:(\d).* OK\n")
res = self._send_and_recv("M121\n", "M121 E:")
match = _LS_RE.match(res)
assert match, f"Incorrect Response for platform sensor: {res}"
return bool(int(match.group(1)))

def get_platform_status(self) -> PlatformStatus:
"""Get platform status."""
if self._simulating:
return PlatformStatus.REMOVED

_LS_RE = re.compile(r"^M121 E:(\d) R:(\d) OK\n")
res = self._send_and_recv("M121\n", "M121 ")
match = _LS_RE.match(res)
assert match, f"Incorrect Response for platform status: {res}"
return PlatformStatus.from_tuple((int(match.group(1)), int(match.group(2))))

def get_hopper_door_closed(self) -> bool:
"""Get whether or not door is closed.
Expand Down Expand Up @@ -205,7 +237,7 @@ def move_to_limit_switch(
if self._simulating:
return
self._send_and_recv(
f"G5 {axis.name}{direction.value} {params or ''}\n", "G0 OK"
f"G5 {axis.name}{direction.value} {params or ''}\n", "G5 OK"
)

def __del__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ def axis_at_limit(driver: FlexStacker, axis: StackerAxis) -> Direction:

def run(driver: FlexStacker, report: CSVReport, section: str) -> None:
"""Run."""
if not driver._simulating and driver.get_estop():
raise RuntimeError("E-Stop is either triggered/not attached.")

x_limit = axis_at_limit(driver, StackerAxis.X)
z_limit = axis_at_limit(driver, StackerAxis.Z)
l_limit = axis_at_limit(driver, StackerAxis.L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@
from .driver import FlexStacker, StackerAxis, Direction


class LimitSwitchError(Exception):
"""Limit Switch Error."""

pass


def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]:
"""Build CSV Lines."""
return [
Expand All @@ -35,12 +29,12 @@ def get_latch_held_switch(driver: FlexStacker) -> bool:

def close_latch(driver: FlexStacker) -> None:
"""Close latch."""
driver.move_to_limit_switch(StackerAxis.L, Direction.EXTENT)
driver.move_to_limit_switch(StackerAxis.L, Direction.RETRACT)


def open_latch(driver: FlexStacker) -> None:
"""Open latch."""
driver.move_in_mm(StackerAxis.L, -22)
driver.move_in_mm(StackerAxis.L, 22)


def run(driver: FlexStacker, report: CSVReport, section: str) -> None:
Expand Down

0 comments on commit 9b9ed16

Please sign in to comment.